1
# (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
26
from StringIO import StringIO
29
import bzrlib.bzrdir as bzrdir
30
import bzrlib.errors as errors
31
from bzrlib.errors import (NotBranchError,
34
UnsupportedFormatError,
36
import bzrlib.repository as repository
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
43
class TestDefaultFormat(TestCase):
45
def test_get_set_default_format(self):
46
old_format = repository.RepositoryFormat.get_default_format()
47
# default is None - we cannot create a Repository independently yet
48
self.assertTrue(isinstance(old_format, repository.RepositoryFormat7))
49
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
50
# creating a repository should now create an instrumented dir.
52
# the default branch format is used by the meta dir format
53
# which is not the default bzrdir format at this point
54
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
55
result = dir.create_repository()
56
self.assertEqual(result, 'A bzr repository dir')
58
repository.RepositoryFormat.set_default_format(old_format)
59
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
62
class SampleRepositoryFormat(repository.RepositoryFormat):
65
this format is initializable, unsupported to aid in testing the
66
open and open(unsupported=True) routines.
69
def get_format_string(self):
70
"""See RepositoryFormat.get_format_string()."""
71
return "Sample .bzr repository format."
73
def initialize(self, a_bzrdir, shared=False):
74
"""Initialize a repository in a BzrDir"""
75
t = a_bzrdir.get_repository_transport(self)
76
t.put('format', StringIO(self.get_format_string()))
77
return 'A bzr repository dir'
79
def is_supported(self):
82
def open(self, a_bzrdir, _found=False):
83
return "opened repository."
86
class TestRepositoryFormat(TestCaseWithTransport):
87
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
89
def test_find_format(self):
90
# is the right format object found for a repository?
91
# create a branch with a few known format objects.
92
# this is not quite the same as
93
self.build_tree(["foo/", "bar/"])
94
def check_format(format, url):
95
dir = format._matchingbzrdir.initialize(url)
96
format.initialize(dir)
97
t = get_transport(url)
98
found_format = repository.RepositoryFormat.find_format(dir)
99
self.failUnless(isinstance(found_format, format.__class__))
100
check_format(repository.RepositoryFormat7(), "bar")
102
def test_find_format_no_repository(self):
103
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
104
self.assertRaises(errors.NoRepositoryPresent,
105
repository.RepositoryFormat.find_format,
108
def test_find_format_unknown_format(self):
109
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
110
SampleRepositoryFormat().initialize(dir)
111
self.assertRaises(UnknownFormatError,
112
repository.RepositoryFormat.find_format,
115
def test_register_unregister_format(self):
116
format = SampleRepositoryFormat()
118
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
120
format.initialize(dir)
121
# register a format for it.
122
repository.RepositoryFormat.register_format(format)
123
# which repository.Open will refuse (not supported)
124
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
125
# but open(unsupported) will work
126
self.assertEqual(format.open(dir), "opened repository.")
127
# unregister the format
128
repository.RepositoryFormat.unregister_format(format)
131
class TestFormat6(TestCaseWithTransport):
133
def test_no_ancestry_weave(self):
134
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
135
repo = repository.RepositoryFormat6().initialize(control)
136
# We no longer need to create the ancestry.weave file
137
# since it is *never* used.
138
self.assertRaises(NoSuchFile,
139
control.transport.get,
143
class TestFormat7(TestCaseWithTransport):
145
def test_disk_layout(self):
146
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
147
repo = repository.RepositoryFormat7().initialize(control)
148
# in case of side effects of locking.
152
# format 'Bazaar-NG Repository format 7'
154
# inventory.weave == empty_weave
155
# empty revision-store directory
156
# empty weaves directory
157
t = control.get_repository_transport(None)
158
self.assertEqualDiff('Bazaar-NG Repository format 7',
159
t.get('format').read())
160
self.assertEqualDiff('', t.get('lock').read())
161
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
162
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
163
self.assertEqualDiff('# bzr weave file v5\n'
166
t.get('inventory.weave').read())
168
def test_shared_disk_layout(self):
169
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
170
repo = repository.RepositoryFormat7().initialize(control, shared=True)
172
# format 'Bazaar-NG Repository format 7'
174
# inventory.weave == empty_weave
175
# empty revision-store directory
176
# empty weaves directory
177
# a 'shared-storage' marker file.
178
t = control.get_repository_transport(None)
179
self.assertEqualDiff('Bazaar-NG Repository format 7',
180
t.get('format').read())
181
self.assertEqualDiff('', t.get('lock').read())
182
self.assertEqualDiff('', t.get('shared-storage').read())
183
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
184
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
185
self.assertEqualDiff('# bzr weave file v5\n'
188
t.get('inventory.weave').read())
190
def test_shared_no_tree_disk_layout(self):
191
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
192
repo = repository.RepositoryFormat7().initialize(control, shared=True)
193
repo.set_make_working_trees(False)
195
# format 'Bazaar-NG Repository format 7'
197
# inventory.weave == empty_weave
198
# empty revision-store directory
199
# empty weaves directory
200
# a 'shared-storage' marker file.
201
t = control.get_repository_transport(None)
202
self.assertEqualDiff('Bazaar-NG Repository format 7',
203
t.get('format').read())
204
self.assertEqualDiff('', t.get('lock').read())
205
self.assertEqualDiff('', t.get('shared-storage').read())
206
self.assertEqualDiff('', t.get('no-working-trees').read())
207
repo.set_make_working_trees(True)
208
self.assertFalse(t.has('no-working-trees'))
209
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
210
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
211
self.assertEqualDiff('# bzr weave file v5\n'
214
t.get('inventory.weave').read())
217
class TestFormatKnit1(TestCaseWithTransport):
219
def test_disk_layout(self):
220
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
221
repo = repository.RepositoryFormatKnit1().initialize(control)
222
# in case of side effects of locking.
226
# format 'Bazaar-NG Knit Repository Format 1'
228
# inventory.weave == empty_weave
229
# empty revision-store directory
230
# empty weaves directory
231
t = control.get_repository_transport(None)
232
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
233
t.get('format').read())
234
self.assertEqualDiff('', t.get('lock').read())
235
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
236
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
237
# cheating and using a weave for now.
238
self.assertEqualDiff('# bzr weave file v5\n'
241
t.get('inventory.weave').read())
243
def test_shared_disk_layout(self):
244
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
245
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
247
# format 'Bazaar-NG Knit Repository Format 1'
249
# inventory.weave == empty_weave
250
# empty revision-store directory
251
# empty weaves directory
252
# a 'shared-storage' marker file.
253
t = control.get_repository_transport(None)
254
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
255
t.get('format').read())
256
self.assertEqualDiff('', t.get('lock').read())
257
self.assertEqualDiff('', t.get('shared-storage').read())
258
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
259
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
260
# cheating and using a weave for now.
261
self.assertEqualDiff('# bzr weave file v5\n'
264
t.get('inventory.weave').read())
266
def test_shared_no_tree_disk_layout(self):
267
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
268
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
269
repo.set_make_working_trees(False)
271
# format 'Bazaar-NG Knit Repository Format 1'
273
# inventory.weave == empty_weave
274
# empty revision-store directory
275
# empty weaves directory
276
# a 'shared-storage' marker file.
277
t = control.get_repository_transport(None)
278
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
279
t.get('format').read())
280
self.assertEqualDiff('', t.get('lock').read())
281
self.assertEqualDiff('', t.get('shared-storage').read())
282
self.assertEqualDiff('', t.get('no-working-trees').read())
283
repo.set_make_working_trees(True)
284
self.assertFalse(t.has('no-working-trees'))
285
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
286
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
287
# cheating and using a weave for now.
288
self.assertEqualDiff('# bzr weave file v5\n'
291
t.get('inventory.weave').read())
294
class InterString(repository.InterRepository):
295
"""An inter-repository optimised code path for strings.
297
This is for use during testing where we use strings as repositories
298
so that none of the default regsitered inter-repository classes will
303
def is_compatible(repo_source, repo_target):
304
"""InterString is compatible with strings-as-repos."""
305
return isinstance(repo_source, str) and isinstance(repo_target, str)
308
class TestInterRepository(TestCaseWithTransport):
310
def test_get_default_inter_repository(self):
311
# test that the InterRepository.get(repo_a, repo_b) probes
312
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
313
# true and returns a default inter_repo otherwise.
314
# This also tests that the default registered optimised interrepository
315
# classes do not barf inappropriately when a surprising repository type
317
dummy_a = "Repository 1."
318
dummy_b = "Repository 2."
319
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
321
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
322
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
323
inter_repo = repository.InterRepository.get(repo_a, repo_b)
324
self.assertEqual(repository.InterRepository,
325
inter_repo.__class__)
326
self.assertEqual(repo_a, inter_repo.source)
327
self.assertEqual(repo_b, inter_repo.target)
329
def test_register_inter_repository_class(self):
330
# test that a optimised code path provider - a
331
# InterRepository subclass can be registered and unregistered
332
# and that it is correctly selected when given a repository
333
# pair that it returns true on for the is_compatible static method
335
dummy_a = "Repository 1."
336
dummy_b = "Repository 2."
337
repository.InterRepository.register_optimiser(InterString)
339
# we should get the default for something InterString returns False
341
self.assertFalse(InterString.is_compatible(dummy_a, None))
342
self.assertGetsDefaultInterRepository(dummy_a, None)
343
# and we should get an InterString for a pair it 'likes'
344
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
345
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
346
self.assertEqual(InterString, inter_repo.__class__)
347
self.assertEqual(dummy_a, inter_repo.source)
348
self.assertEqual(dummy_b, inter_repo.target)
350
repository.InterRepository.unregister_optimiser(InterString)
351
# now we should get the default InterRepository object again.
352
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
355
class TestInterWeaveRepo(TestCaseWithTransport):
357
def test_is_compatible_and_registered(self):
358
# InterWeaveRepo is compatible when either side
359
# is a format 5/6/7 branch
360
formats = [repository.RepositoryFormat5(),
361
repository.RepositoryFormat6(),
362
repository.RepositoryFormat7()]
363
incompatible_formats = [repository.RepositoryFormat4(),
364
repository.RepositoryFormatKnit1(),
366
repo_a = self.make_repository('a')
367
repo_b = self.make_repository('b')
368
is_compatible = repository.InterWeaveRepo.is_compatible
369
for source in incompatible_formats:
370
# force incompatible left then right
371
repo_a._format = source
372
repo_b._format = formats[0]
373
self.assertFalse(is_compatible(repo_a, repo_b))
374
self.assertFalse(is_compatible(repo_b, repo_a))
375
for source in formats:
376
repo_a._format = source
377
for target in formats:
378
repo_b._format = target
379
self.assertTrue(is_compatible(repo_a, repo_b))
380
self.assertEqual(repository.InterWeaveRepo,
381
repository.InterRepository.get(repo_a,
385
class TestRepositoryConverter(TestCaseWithTransport):
387
def test_convert_empty(self):
388
t = get_transport(self.get_url('.'))
389
t.mkdir('repository')
390
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
391
repo = repository.RepositoryFormat7().initialize(repo_dir)
392
target_format = repository.RepositoryFormatKnit1()
393
pb = bzrlib.ui.ui_factory.progress_bar()
395
converter = repository.CopyConverter(target_format)
396
converter.convert(repo, pb)
397
repo = repo_dir.open_repository()
398
self.assertTrue(isinstance(target_format, repo._format.__class__))