1
# (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for InterRepository implementastions."""
21
import bzrlib.bzrdir as bzrdir
22
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
23
import bzrlib.errors as errors
24
from bzrlib.errors import (FileExists,
27
UninitializableFormat,
30
import bzrlib.repository as repository
31
from bzrlib.revision import NULL_REVISION
32
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
33
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
34
from bzrlib.transport import get_transport
37
class TestCaseWithInterRepository(TestCaseWithBzrDir):
40
super(TestCaseWithInterRepository, self).setUp()
42
def make_branch(self, relpath):
43
repo = self.make_repository(relpath)
44
return repo.bzrdir.create_branch()
46
def make_bzrdir(self, relpath, bzrdir_format=None):
48
url = self.get_url(relpath)
49
segments = url.split('/')
50
if segments and segments[-1] not in ('', '.'):
51
parent = '/'.join(segments[:-1])
52
t = get_transport(parent)
57
if bzrdir_format is None:
58
bzrdir_format = self.repository_format._matchingbzrdir
59
return bzrdir_format.initialize(url)
60
except UninitializableFormat:
61
raise TestSkipped("Format %s is not initializable.")
63
def make_repository(self, relpath):
64
made_control = self.make_bzrdir(relpath)
65
return self.repository_format.initialize(made_control)
67
def make_to_repository(self, relpath):
68
made_control = self.make_bzrdir(relpath,
69
self.repository_format_to._matchingbzrdir)
70
return self.repository_format_to.initialize(made_control)
73
class TestInterRepository(TestCaseWithInterRepository):
75
def test_interrepository_get_returns_correct_optimiser(self):
76
# we assume the optimising code paths are triggered
77
# by the type of the repo not the transport - at this point.
78
# we may need to update this test if this changes.
79
source_repo = self.make_repository("source")
80
target_repo = self.make_to_repository("target")
81
interrepo = repository.InterRepository.get(source_repo, target_repo)
82
self.assertEqual(self.interrepo_class, interrepo.__class__)
85
tree_a = self.make_branch_and_tree('a')
86
self.build_tree(['a/foo'])
87
tree_a.add('foo', 'file1')
88
tree_a.commit('rev1', rev_id='rev1')
89
def check_push_rev1(repo):
90
# ensure the revision is missing.
91
self.assertRaises(NoSuchRevision, repo.get_revision, 'rev1')
92
# fetch with a limit of NULL_REVISION and an explicit progress bar.
93
repo.fetch(tree_a.branch.repository,
94
revision_id=NULL_REVISION,
95
pb=bzrlib.progress.DummyProgress())
96
# nothing should have been pushed
97
self.assertFalse(repo.has_revision('rev1'))
98
# fetch with a default limit (grab everything)
99
repo.fetch(tree_a.branch.repository)
100
# check that b now has all the data from a's first commit.
101
rev = repo.get_revision('rev1')
102
tree = repo.revision_tree('rev1')
103
tree.get_file_text('file1')
105
if tree.inventory[file_id].kind == "file":
106
tree.get_file(file_id).read()
108
# makes a target version repo
109
repo_b = self.make_to_repository('b')
110
check_push_rev1(repo_b)
112
def test_fetch_missing_revision_same_location_fails(self):
113
repo_a = self.make_repository('.')
114
repo_b = repository.Repository.open('.')
115
self.assertRaises(errors.NoSuchRevision, repo_b.fetch, repo_a, revision_id='XXX')
117
def test_fetch_same_location_trivial_works(self):
118
repo_a = self.make_repository('.')
119
repo_b = repository.Repository.open('.')
123
class TestCaseWithComplexRepository(TestCaseWithInterRepository):
126
super(TestCaseWithComplexRepository, self).setUp()
127
tree_a = self.make_branch_and_tree('a')
128
self.bzrdir = tree_a.branch.bzrdir
129
# add a corrupt inventory 'orphan'
130
tree_a.branch.repository.control_weaves.add_text(
131
'inventory', 'orphan', [], [],
132
tree_a.branch.repository.get_transaction())
133
# add a real revision 'rev1'
134
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
135
# add a real revision 'rev2' based on rev1
136
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
138
def test_missing_revision_ids(self):
139
# revision ids in repository A but not B are returned, fake ones
140
# are stripped. (fake meaning no revision object, but an inventory
141
# as some formats keyed off inventory data in the past.
142
# make a repository to compare against that claims to have rev1
143
repo_b = self.make_to_repository('rev1_only')
144
repo_a = self.bzrdir.open_repository()
145
repo_b.fetch(repo_a, 'rev1')
146
# check the test will be valid
147
self.assertFalse(repo_b.has_revision('rev2'))
148
self.assertEqual(['rev2'],
149
repo_b.missing_revision_ids(repo_a))
151
def test_missing_revision_ids_revision_limited(self):
152
# revision ids in repository A that are not referenced by the
153
# requested revision are not returned.
154
# make a repository to compare against that is empty
155
repo_b = self.make_to_repository('empty')
156
repo_a = self.bzrdir.open_repository()
157
self.assertEqual(['rev1'],
158
repo_b.missing_revision_ids(repo_a, revision_id='rev1'))