1
# Copyright (C) 2006, 2007 Canonical Ltd
1
# Copyright (C) 2006 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
34
33
UnknownFormatError,
35
34
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
36
import bzrlib.repository as repository
38
37
from bzrlib.tests import TestCase, TestCaseWithTransport
39
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
49
43
class TestDefaultFormat(TestCase):
51
45
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
54
46
old_format = repository.RepositoryFormat.get_default_format()
55
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
47
self.assertTrue(isinstance(old_format, repository.RepositoryFormatKnit1))
48
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
63
49
# creating a repository should now create an instrumented dir.
65
51
# the default branch format is used by the meta dir format
68
54
result = dir.create_repository()
69
55
self.assertEqual(result, 'A bzr repository dir')
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.remove('sample')
73
bzrdir.format_registry.register('default', old_default, '')
74
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
57
repository.RepositoryFormat.set_default_format(old_format)
58
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
78
61
class SampleRepositoryFormat(repository.RepositoryFormat):
89
72
def initialize(self, a_bzrdir, shared=False):
90
73
"""Initialize a repository in a BzrDir"""
91
74
t = a_bzrdir.get_repository_transport(self)
92
t.put_bytes('format', self.get_format_string())
75
t.put('format', StringIO(self.get_format_string()))
93
76
return 'A bzr repository dir'
95
78
def is_supported(self):
113
96
t = get_transport(url)
114
97
found_format = repository.RepositoryFormat.find_format(dir)
115
98
self.failUnless(isinstance(found_format, format.__class__))
116
check_format(weaverepo.RepositoryFormat7(), "bar")
99
check_format(repository.RepositoryFormat7(), "bar")
118
101
def test_find_format_no_repository(self):
119
102
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
149
132
def test_no_ancestry_weave(self):
150
133
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
151
repo = weaverepo.RepositoryFormat6().initialize(control)
134
repo = repository.RepositoryFormat6().initialize(control)
152
135
# We no longer need to create the ancestry.weave file
153
136
# since it is *never* used.
154
137
self.assertRaises(NoSuchFile,
161
144
def test_disk_layout(self):
162
145
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
163
repo = weaverepo.RepositoryFormat7().initialize(control)
146
repo = repository.RepositoryFormat7().initialize(control)
164
147
# in case of side effects of locking.
165
148
repo.lock_write()
183
166
def test_shared_disk_layout(self):
184
167
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
185
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
168
repo = repository.RepositoryFormat7().initialize(control, shared=True)
187
170
# format 'Bazaar-NG Repository format 7'
188
171
# inventory.weave == empty_weave
205
188
def test_creates_lockdir(self):
206
189
"""Make sure it appears to be controlled by a LockDir existence"""
207
190
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
191
repo = repository.RepositoryFormat7().initialize(control, shared=True)
209
192
t = control.get_repository_transport(None)
210
193
# TODO: Should check there is a 'lock' toplevel directory,
211
194
# regardless of contents
221
204
"""repo format 7 actually locks on lockdir"""
222
205
base_url = self.get_url()
223
206
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
224
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
207
repo = repository.RepositoryFormat7().initialize(control, shared=True)
225
208
t = control.get_repository_transport(None)
226
209
repo.lock_write()
236
219
def test_shared_no_tree_disk_layout(self):
237
220
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
221
repo = repository.RepositoryFormat7().initialize(control, shared=True)
239
222
repo.set_make_working_trees(False)
241
224
# format 'Bazaar-NG Repository format 7'
265
248
def test_disk_layout(self):
266
249
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
267
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
250
repo = repository.RepositoryFormatKnit1().initialize(control)
268
251
# in case of side effects of locking.
269
252
repo.lock_write()
298
281
def test_shared_disk_layout(self):
299
282
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
300
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
283
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
302
285
# format 'Bazaar-NG Knit Repository Format 1'
303
286
# lock: is a directory
317
300
def test_shared_no_tree_disk_layout(self):
318
301
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
319
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
302
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
320
303
repo.set_make_working_trees(False)
322
305
# format 'Bazaar-NG Knit Repository Format 1'
338
321
self.check_knits(t)
341
class DummyRepository(object):
342
"""A dummy repository for testing."""
346
def supports_rich_root(self):
350
class InterDummy(repository.InterRepository):
351
"""An inter-repository optimised code path for DummyRepository.
353
This is for use during testing where we use DummyRepository as repositories
324
class InterString(repository.InterRepository):
325
"""An inter-repository optimised code path for strings.
327
This is for use during testing where we use strings as repositories
354
328
so that none of the default regsitered inter-repository classes will
359
333
def is_compatible(repo_source, repo_target):
360
"""InterDummy is compatible with DummyRepository."""
361
return (isinstance(repo_source, DummyRepository) and
362
isinstance(repo_target, DummyRepository))
334
"""InterString is compatible with strings-as-repos."""
335
return isinstance(repo_source, str) and isinstance(repo_target, str)
365
338
class TestInterRepository(TestCaseWithTransport):
371
344
# This also tests that the default registered optimised interrepository
372
345
# classes do not barf inappropriately when a surprising repository type
373
346
# is handed to them.
374
dummy_a = DummyRepository()
375
dummy_b = DummyRepository()
347
dummy_a = "Repository 1."
348
dummy_b = "Repository 2."
376
349
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
378
351
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
379
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
381
The effective default is now InterSameDataRepository because there is
382
no actual sane default in the presence of incompatible data models.
352
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
384
353
inter_repo = repository.InterRepository.get(repo_a, repo_b)
385
self.assertEqual(repository.InterSameDataRepository,
354
self.assertEqual(repository.InterRepository,
386
355
inter_repo.__class__)
387
356
self.assertEqual(repo_a, inter_repo.source)
388
357
self.assertEqual(repo_b, inter_repo.target)
393
362
# and that it is correctly selected when given a repository
394
363
# pair that it returns true on for the is_compatible static method
396
dummy_a = DummyRepository()
397
dummy_b = DummyRepository()
398
repo = self.make_repository('.')
399
# hack dummies to look like repo somewhat.
400
dummy_a._serializer = repo._serializer
401
dummy_b._serializer = repo._serializer
402
repository.InterRepository.register_optimiser(InterDummy)
365
dummy_a = "Repository 1."
366
dummy_b = "Repository 2."
367
repository.InterRepository.register_optimiser(InterString)
404
# we should get the default for something InterDummy returns False
369
# we should get the default for something InterString returns False
406
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
407
self.assertGetsDefaultInterRepository(dummy_a, repo)
408
# and we should get an InterDummy for a pair it 'likes'
409
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
371
self.assertFalse(InterString.is_compatible(dummy_a, None))
372
self.assertGetsDefaultInterRepository(dummy_a, None)
373
# and we should get an InterString for a pair it 'likes'
374
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
410
375
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
411
self.assertEqual(InterDummy, inter_repo.__class__)
376
self.assertEqual(InterString, inter_repo.__class__)
412
377
self.assertEqual(dummy_a, inter_repo.source)
413
378
self.assertEqual(dummy_b, inter_repo.target)
415
repository.InterRepository.unregister_optimiser(InterDummy)
380
repository.InterRepository.unregister_optimiser(InterString)
416
381
# now we should get the default InterRepository object again.
417
382
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
422
387
def test_is_compatible_and_registered(self):
423
388
# InterWeaveRepo is compatible when either side
424
389
# is a format 5/6/7 branch
425
from bzrlib.repofmt import knitrepo, weaverepo
426
formats = [weaverepo.RepositoryFormat5(),
427
weaverepo.RepositoryFormat6(),
428
weaverepo.RepositoryFormat7()]
429
incompatible_formats = [weaverepo.RepositoryFormat4(),
430
knitrepo.RepositoryFormatKnit1(),
390
formats = [repository.RepositoryFormat5(),
391
repository.RepositoryFormat6(),
392
repository.RepositoryFormat7()]
393
incompatible_formats = [repository.RepositoryFormat4(),
394
repository.RepositoryFormatKnit1(),
432
396
repo_a = self.make_repository('a')
433
397
repo_b = self.make_repository('b')
454
418
t = get_transport(self.get_url('.'))
455
419
t.mkdir('repository')
456
420
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
457
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
458
target_format = knitrepo.RepositoryFormatKnit1()
421
repo = repository.RepositoryFormat7().initialize(repo_dir)
422
target_format = repository.RepositoryFormatKnit1()
459
423
converter = repository.CopyConverter(target_format)
460
424
pb = bzrlib.ui.ui_factory.nested_progress_bar()
465
429
repo = repo_dir.open_repository()
466
430
self.assertTrue(isinstance(target_format, repo._format.__class__))
469
class TestMisc(TestCase):
471
def test_unescape_xml(self):
472
"""We get some kind of error when malformed entities are passed"""
473
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
478
def test_convert(self):
479
"""Ensure the upgrade adds weaves for roots"""
480
format = bzrdir.BzrDirMetaFormat1()
481
format.repository_format = knitrepo.RepositoryFormatKnit1()
482
tree = self.make_branch_and_tree('.', format)
483
tree.commit("Dull commit", rev_id="dull")
484
revision_tree = tree.branch.repository.revision_tree('dull')
485
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
486
revision_tree.inventory.root.file_id)
487
format = bzrdir.BzrDirMetaFormat1()
488
format.repository_format = knitrepo.RepositoryFormatKnit3()
489
upgrade.Convert('.', format)
490
tree = workingtree.WorkingTree.open('.')
491
revision_tree = tree.branch.repository.revision_tree('dull')
492
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
493
tree.commit("Another dull commit", rev_id='dull2')
494
revision_tree = tree.branch.repository.revision_tree('dull2')
495
self.assertEqual('dull', revision_tree.inventory.root.revision)