1
# Copyright (C) 2006, 2007 Canonical Ltd
1
# Copyright (C) 2006 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
34
33
UnknownFormatError,
35
34
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
36
import bzrlib.repository as repository
38
37
from bzrlib.tests import TestCase, TestCaseWithTransport
39
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
41
from bzrlib import upgrade, workingtree
49
44
class TestDefaultFormat(TestCase):
51
46
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
47
private_default = repository._default_format.__class__
54
48
old_format = repository.RepositoryFormat.get_default_format()
55
49
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
50
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
63
51
# creating a repository should now create an instrumented dir.
65
53
# the default branch format is used by the meta dir format
68
56
result = dir.create_repository()
69
57
self.assertEqual(result, 'A bzr repository dir')
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.remove('sample')
73
bzrdir.format_registry.register('default', old_default, '')
74
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
59
repository.RepositoryFormat.set_default_format(old_format)
60
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
78
63
class SampleRepositoryFormat(repository.RepositoryFormat):
113
98
t = get_transport(url)
114
99
found_format = repository.RepositoryFormat.find_format(dir)
115
100
self.failUnless(isinstance(found_format, format.__class__))
116
check_format(weaverepo.RepositoryFormat7(), "bar")
101
check_format(repository.RepositoryFormat7(), "bar")
118
103
def test_find_format_no_repository(self):
119
104
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
149
134
def test_no_ancestry_weave(self):
150
135
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
151
repo = weaverepo.RepositoryFormat6().initialize(control)
136
repo = repository.RepositoryFormat6().initialize(control)
152
137
# We no longer need to create the ancestry.weave file
153
138
# since it is *never* used.
154
139
self.assertRaises(NoSuchFile,
161
146
def test_disk_layout(self):
162
147
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
163
repo = weaverepo.RepositoryFormat7().initialize(control)
148
repo = repository.RepositoryFormat7().initialize(control)
164
149
# in case of side effects of locking.
165
150
repo.lock_write()
183
168
def test_shared_disk_layout(self):
184
169
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
185
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
170
repo = repository.RepositoryFormat7().initialize(control, shared=True)
187
172
# format 'Bazaar-NG Repository format 7'
188
173
# inventory.weave == empty_weave
205
190
def test_creates_lockdir(self):
206
191
"""Make sure it appears to be controlled by a LockDir existence"""
207
192
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
193
repo = repository.RepositoryFormat7().initialize(control, shared=True)
209
194
t = control.get_repository_transport(None)
210
195
# TODO: Should check there is a 'lock' toplevel directory,
211
196
# regardless of contents
221
206
"""repo format 7 actually locks on lockdir"""
222
207
base_url = self.get_url()
223
208
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
224
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
209
repo = repository.RepositoryFormat7().initialize(control, shared=True)
225
210
t = control.get_repository_transport(None)
226
211
repo.lock_write()
236
221
def test_shared_no_tree_disk_layout(self):
237
222
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
223
repo = repository.RepositoryFormat7().initialize(control, shared=True)
239
224
repo.set_make_working_trees(False)
241
226
# format 'Bazaar-NG Repository format 7'
265
250
def test_disk_layout(self):
266
251
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
267
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
252
repo = repository.RepositoryFormatKnit1().initialize(control)
268
253
# in case of side effects of locking.
269
254
repo.lock_write()
298
283
def test_shared_disk_layout(self):
299
284
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
300
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
285
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
302
287
# format 'Bazaar-NG Knit Repository Format 1'
303
288
# lock: is a directory
317
302
def test_shared_no_tree_disk_layout(self):
318
303
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
319
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
304
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
320
305
repo.set_make_working_trees(False)
322
307
# format 'Bazaar-NG Knit Repository Format 1'
338
323
self.check_knits(t)
341
class DummyRepository(object):
342
"""A dummy repository for testing."""
346
def supports_rich_root(self):
350
class InterDummy(repository.InterRepository):
351
"""An inter-repository optimised code path for DummyRepository.
353
This is for use during testing where we use DummyRepository as repositories
326
class InterString(repository.InterRepository):
327
"""An inter-repository optimised code path for strings.
329
This is for use during testing where we use strings as repositories
354
330
so that none of the default regsitered inter-repository classes will
359
335
def is_compatible(repo_source, repo_target):
360
"""InterDummy is compatible with DummyRepository."""
361
return (isinstance(repo_source, DummyRepository) and
362
isinstance(repo_target, DummyRepository))
336
"""InterString is compatible with strings-as-repos."""
337
return isinstance(repo_source, str) and isinstance(repo_target, str)
365
340
class TestInterRepository(TestCaseWithTransport):
371
346
# This also tests that the default registered optimised interrepository
372
347
# classes do not barf inappropriately when a surprising repository type
373
348
# is handed to them.
374
dummy_a = DummyRepository()
375
dummy_b = DummyRepository()
349
dummy_a = "Repository 1."
350
dummy_b = "Repository 2."
376
351
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
378
353
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
379
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
381
The effective default is now InterSameDataRepository because there is
382
no actual sane default in the presence of incompatible data models.
354
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
384
355
inter_repo = repository.InterRepository.get(repo_a, repo_b)
385
self.assertEqual(repository.InterSameDataRepository,
356
self.assertEqual(repository.InterRepository,
386
357
inter_repo.__class__)
387
358
self.assertEqual(repo_a, inter_repo.source)
388
359
self.assertEqual(repo_b, inter_repo.target)
393
364
# and that it is correctly selected when given a repository
394
365
# pair that it returns true on for the is_compatible static method
396
dummy_a = DummyRepository()
397
dummy_b = DummyRepository()
398
repo = self.make_repository('.')
399
# hack dummies to look like repo somewhat.
400
dummy_a._serializer = repo._serializer
401
dummy_b._serializer = repo._serializer
402
repository.InterRepository.register_optimiser(InterDummy)
367
dummy_a = "Repository 1."
368
dummy_b = "Repository 2."
369
repository.InterRepository.register_optimiser(InterString)
404
# we should get the default for something InterDummy returns False
371
# we should get the default for something InterString returns False
406
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
407
self.assertGetsDefaultInterRepository(dummy_a, repo)
408
# and we should get an InterDummy for a pair it 'likes'
409
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
373
self.assertFalse(InterString.is_compatible(dummy_a, None))
374
self.assertGetsDefaultInterRepository(dummy_a, None)
375
# and we should get an InterString for a pair it 'likes'
376
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
410
377
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
411
self.assertEqual(InterDummy, inter_repo.__class__)
378
self.assertEqual(InterString, inter_repo.__class__)
412
379
self.assertEqual(dummy_a, inter_repo.source)
413
380
self.assertEqual(dummy_b, inter_repo.target)
415
repository.InterRepository.unregister_optimiser(InterDummy)
382
repository.InterRepository.unregister_optimiser(InterString)
416
383
# now we should get the default InterRepository object again.
417
384
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
422
389
def test_is_compatible_and_registered(self):
423
390
# InterWeaveRepo is compatible when either side
424
391
# is a format 5/6/7 branch
425
from bzrlib.repofmt import knitrepo, weaverepo
426
formats = [weaverepo.RepositoryFormat5(),
427
weaverepo.RepositoryFormat6(),
428
weaverepo.RepositoryFormat7()]
429
incompatible_formats = [weaverepo.RepositoryFormat4(),
430
knitrepo.RepositoryFormatKnit1(),
392
formats = [repository.RepositoryFormat5(),
393
repository.RepositoryFormat6(),
394
repository.RepositoryFormat7()]
395
incompatible_formats = [repository.RepositoryFormat4(),
396
repository.RepositoryFormatKnit1(),
432
398
repo_a = self.make_repository('a')
433
399
repo_b = self.make_repository('b')
454
420
t = get_transport(self.get_url('.'))
455
421
t.mkdir('repository')
456
422
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
457
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
458
target_format = knitrepo.RepositoryFormatKnit1()
423
repo = repository.RepositoryFormat7().initialize(repo_dir)
424
target_format = repository.RepositoryFormatKnit1()
459
425
converter = repository.CopyConverter(target_format)
460
426
pb = bzrlib.ui.ui_factory.nested_progress_bar()
473
439
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
442
class TestRepositoryFormatKnit2(TestCaseWithTransport):
478
444
def test_convert(self):
479
445
"""Ensure the upgrade adds weaves for roots"""
480
446
format = bzrdir.BzrDirMetaFormat1()
481
format.repository_format = knitrepo.RepositoryFormatKnit1()
447
format.repository_format = repository.RepositoryFormatKnit1()
482
448
tree = self.make_branch_and_tree('.', format)
483
449
tree.commit("Dull commit", rev_id="dull")
484
450
revision_tree = tree.branch.repository.revision_tree('dull')
485
451
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
486
452
revision_tree.inventory.root.file_id)
487
453
format = bzrdir.BzrDirMetaFormat1()
488
format.repository_format = knitrepo.RepositoryFormatKnit3()
454
format.repository_format = repository.RepositoryFormatKnit2()
489
455
upgrade.Convert('.', format)
490
456
tree = workingtree.WorkingTree.open('.')
491
457
revision_tree = tree.branch.repository.revision_tree('dull')