1
# (C) 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrdir implementations - tests a bzrdir format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
33
import bzrlib.repository as repository
34
from bzrlib.revision import NULL_REVISION
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
37
from bzrlib.trace import mutter
38
import bzrlib.transactions as transactions
39
from bzrlib.transport import get_transport
40
from bzrlib.upgrade import upgrade
41
from bzrlib.workingtree import WorkingTree
44
class TestCaseWithRepository(TestCaseWithBzrDir):
47
super(TestCaseWithRepository, self).setUp()
49
def make_branch(self, relpath):
50
repo = self.make_repository(relpath)
51
return repo.bzrdir.create_branch()
53
def make_bzrdir(self, relpath):
55
url = self.get_url(relpath)
56
segments = url.split('/')
57
if segments and segments[-1] not in ('', '.'):
58
parent = '/'.join(segments[:-1])
59
t = get_transport(parent)
64
return self.bzrdir_format.initialize(url)
65
except UninitializableFormat:
66
raise TestSkipped("Format %s is not initializable.")
68
def make_repository(self, relpath):
69
made_control = self.make_bzrdir(relpath)
70
return self.repository_format.initialize(made_control)
73
class TestRepository(TestCaseWithRepository):
75
def test_clone_to_default_format(self):
76
#TODO: Test that cloning a repository preserves all the information
77
# such as signatures[not tested yet] etc etc.
78
# when changing to the current default format.
79
tree_a = self.make_branch_and_tree('a')
80
self.build_tree(['a/foo'])
81
tree_a.add('foo', 'file1')
82
tree_a.commit('rev1', rev_id='rev1')
83
bzrdirb = self.make_bzrdir('b')
84
repo_b = tree_a.branch.repository.clone(bzrdirb)
85
tree_b = repo_b.revision_tree('rev1')
86
tree_b.get_file_text('file1')
87
rev1 = repo_b.get_revision('rev1')
89
def test_clone_specific_format(self):
92
def test_format_initialize_find_open(self):
93
# loopback test to check the current format initializes to itself.
94
if not self.repository_format.is_supported():
95
# unsupported formats are not loopback testable
96
# because the default open will not open them and
97
# they may not be initializable.
99
# supported formats must be able to init and open
100
t = get_transport(self.get_url())
101
readonly_t = get_transport(self.get_readonly_url())
102
made_control = self.bzrdir_format.initialize(t.base)
103
made_repo = self.repository_format.initialize(made_control)
104
self.failUnless(isinstance(made_repo, repository.Repository))
105
self.assertEqual(made_control, made_repo.bzrdir)
107
# find it via bzrdir opening:
108
opened_control = bzrdir.BzrDir.open(readonly_t.base)
109
direct_opened_repo = opened_control.open_repository()
110
self.assertEqual(direct_opened_repo.__class__, made_repo.__class__)
111
self.assertEqual(opened_control, direct_opened_repo.bzrdir)
113
self.failUnless(isinstance(direct_opened_repo._format,
114
self.repository_format.__class__))
115
# find it via Repository.open
116
opened_repo = repository.Repository.open(readonly_t.base)
117
self.failUnless(isinstance(opened_repo, made_repo.__class__))
118
self.assertEqual(made_repo._format.__class__,
119
opened_repo._format.__class__)
120
# if it has a unique id string, can we probe for it ?
122
self.repository_format.get_format_string()
123
except NotImplementedError:
125
self.assertEqual(self.repository_format,
126
repository.RepositoryFormat.find_format(opened_control))
128
def test_create_repository(self):
129
# bzrdir can construct a repository for itself.
130
if not self.bzrdir_format.is_supported():
131
# unsupported formats are not loopback testable
132
# because the default open will not open them and
133
# they may not be initializable.
135
t = get_transport(self.get_url())
136
made_control = self.bzrdir_format.initialize(t.base)
137
made_repo = made_control.create_repository()
138
self.failUnless(isinstance(made_repo, repository.Repository))
139
self.assertEqual(made_control, made_repo.bzrdir)
141
def test_create_repository_shared(self):
142
# bzrdir can construct a shared repository.
143
if not self.bzrdir_format.is_supported():
144
# unsupported formats are not loopback testable
145
# because the default open will not open them and
146
# they may not be initializable.
148
t = get_transport(self.get_url())
149
made_control = self.bzrdir_format.initialize(t.base)
151
made_repo = made_control.create_repository(shared=True)
152
except errors.IncompatibleFormat:
153
# not all repository formats understand being shared, or
154
# may only be shared in some circumstances.
156
self.failUnless(isinstance(made_repo, repository.Repository))
157
self.assertEqual(made_control, made_repo.bzrdir)
158
self.assertTrue(made_repo.is_shared())
160
def test_revision_tree(self):
161
wt = self.make_branch_and_tree('.')
162
wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
163
tree = wt.branch.repository.revision_tree('revision-1')
164
self.assertEqual(list(tree.list_files()), [])
165
tree = wt.branch.repository.revision_tree(None)
166
self.assertEqual(len(tree.list_files()), 0)
167
tree = wt.branch.repository.revision_tree(NULL_REVISION)
168
self.assertEqual(len(tree.list_files()), 0)
170
def test_fetch(self):
171
# smoke test fetch to ensure that the convenience function works.
172
# it is defined as a convenience function with the underlying
173
# functionality provided by an InterRepository
174
tree_a = self.make_branch_and_tree('a')
175
self.build_tree(['a/foo'])
176
tree_a.add('foo', 'file1')
177
tree_a.commit('rev1', rev_id='rev1')
178
# fetch with a default limit (grab everything)
179
repo = bzrdir.BzrDir.create_repository(self.get_url('b'))
180
repo.fetch(tree_a.branch.repository,
182
pb=bzrlib.progress.DummyProgress())
184
def test_clone_bzrdir_repository_revision(self):
185
# make a repository with some revisions,
186
# and clone it, this should not have unreferenced revisions.
187
# also: test cloning with a revision id of NULL_REVISION -> empty repo.
188
raise TestSkipped('revision limiting is not implemented yet.')
190
def test_clone_repository_basis_revision(self):
191
raise TestSkipped('the use of a basis should not add noise data to the result.')
193
def test_clone_repository_incomplete_source_with_basis(self):
194
# ensure that basis really does grab from the basis by having incomplete source
195
tree = self.make_branch_and_tree('commit_tree')
196
self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
198
tree.commit('revision 1', rev_id='1')
199
source = self.make_repository('source')
200
# this gives us an incomplete repository
201
tree.bzrdir.open_repository().copy_content_into(source)
202
tree.commit('revision 2', rev_id='2', allow_pointless=True)
203
self.assertFalse(source.has_revision('2'))
204
target = source.bzrdir.clone(self.get_url('target'), basis=tree.bzrdir)
205
self.assertTrue(target.open_repository().has_revision('2'))
207
def test_clone_shared_no_tree(self):
208
# cloning a shared repository keeps it shared
209
# and preserves the make_working_tree setting.
210
made_control = self.make_bzrdir('source')
212
made_repo = made_control.create_repository(shared=True)
213
except errors.IncompatibleFormat:
214
# not all repository formats understand being shared, or
215
# may only be shared in some circumstances.
217
made_repo.set_make_working_trees(False)
218
result = made_control.clone(self.get_url('target'))
219
self.failUnless(isinstance(made_repo, repository.Repository))
220
self.assertEqual(made_control, made_repo.bzrdir)
221
self.assertTrue(result.open_repository().is_shared())
222
self.assertFalse(result.open_repository().make_working_trees())
225
class TestCaseWithComplexRepository(TestCaseWithRepository):
228
super(TestCaseWithComplexRepository, self).setUp()
229
tree_a = self.make_branch_and_tree('a')
230
self.bzrdir = tree_a.branch.bzrdir
231
# add a corrupt inventory 'orphan'
232
# this may need some generalising for knits.
233
tree_a.branch.repository.control_weaves.add_text(
234
'inventory', 'orphan', [], [],
235
tree_a.branch.repository.get_transaction())
236
# add a real revision 'rev1'
237
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
238
# add a real revision 'rev2' based on rev1
239
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
241
def test_all_revision_ids(self):
242
# all_revision_ids -> all revisions
243
self.assertEqual(['rev1', 'rev2'],
244
self.bzrdir.open_repository().all_revision_ids())
246
def test_get_ancestry_missing_revision(self):
247
# get_ancestry(revision that is in some data but not fully installed
249
self.assertRaises(errors.NoSuchRevision,
250
self.bzrdir.open_repository().get_ancestry, 'orphan')