1
# Copyright (C) 2006, 2007 Canonical Ltd
1
# (C) 2006 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
34
33
UnknownFormatError,
35
34
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
36
import bzrlib.repository as repository
38
37
from bzrlib.tests import TestCase, TestCaseWithTransport
39
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
49
43
class TestDefaultFormat(TestCase):
51
45
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
54
46
old_format = repository.RepositoryFormat.get_default_format()
55
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
47
# default is None - we cannot create a Repository independently yet
48
self.assertTrue(isinstance(old_format, repository.RepositoryFormat7))
49
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
63
50
# creating a repository should now create an instrumented dir.
65
52
# the default branch format is used by the meta dir format
66
53
# which is not the default bzrdir format at this point
67
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
54
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
68
55
result = dir.create_repository()
69
56
self.assertEqual(result, 'A bzr repository dir')
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.remove('sample')
73
bzrdir.format_registry.register('default', old_default, '')
74
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
58
repository.RepositoryFormat.set_default_format(old_format)
59
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
78
62
class SampleRepositoryFormat(repository.RepositoryFormat):
89
73
def initialize(self, a_bzrdir, shared=False):
90
74
"""Initialize a repository in a BzrDir"""
91
75
t = a_bzrdir.get_repository_transport(self)
92
t.put_bytes('format', self.get_format_string())
76
t.put('format', StringIO(self.get_format_string()))
93
77
return 'A bzr repository dir'
95
79
def is_supported(self):
113
97
t = get_transport(url)
114
98
found_format = repository.RepositoryFormat.find_format(dir)
115
99
self.failUnless(isinstance(found_format, format.__class__))
116
check_format(weaverepo.RepositoryFormat7(), "bar")
100
check_format(repository.RepositoryFormat7(), "bar")
118
102
def test_find_format_no_repository(self):
119
103
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
149
133
def test_no_ancestry_weave(self):
150
134
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
151
repo = weaverepo.RepositoryFormat6().initialize(control)
135
repo = repository.RepositoryFormat6().initialize(control)
152
136
# We no longer need to create the ancestry.weave file
153
137
# since it is *never* used.
154
138
self.assertRaises(NoSuchFile,
161
145
def test_disk_layout(self):
162
146
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
163
repo = weaverepo.RepositoryFormat7().initialize(control)
147
repo = repository.RepositoryFormat7().initialize(control)
164
148
# in case of side effects of locking.
165
149
repo.lock_write()
183
167
def test_shared_disk_layout(self):
184
168
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
185
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
169
repo = repository.RepositoryFormat7().initialize(control, shared=True)
187
171
# format 'Bazaar-NG Repository format 7'
188
172
# inventory.weave == empty_weave
205
189
def test_creates_lockdir(self):
206
190
"""Make sure it appears to be controlled by a LockDir existence"""
207
191
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
192
repo = repository.RepositoryFormat7().initialize(control, shared=True)
209
193
t = control.get_repository_transport(None)
210
194
# TODO: Should check there is a 'lock' toplevel directory,
211
195
# regardless of contents
212
196
self.assertFalse(t.has('lock/held/info'))
213
197
repo.lock_write()
215
self.assertTrue(t.has('lock/held/info'))
217
# unlock so we don't get a warning about failing to do so
198
self.assertTrue(t.has('lock/held/info'))
220
200
def test_uses_lockdir(self):
221
201
"""repo format 7 actually locks on lockdir"""
222
202
base_url = self.get_url()
223
203
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
224
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
204
repo = repository.RepositoryFormat7().initialize(control, shared=True)
225
205
t = control.get_repository_transport(None)
226
206
repo.lock_write()
236
216
def test_shared_no_tree_disk_layout(self):
237
217
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
218
repo = repository.RepositoryFormat7().initialize(control, shared=True)
239
219
repo.set_make_working_trees(False)
241
221
# format 'Bazaar-NG Repository format 7'
265
245
def test_disk_layout(self):
266
246
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
267
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
247
repo = repository.RepositoryFormatKnit1().initialize(control)
268
248
# in case of side effects of locking.
269
249
repo.lock_write()
282
262
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
283
263
self.check_knits(t)
285
def assertHasKnit(self, t, knit_name):
286
"""Assert that knit_name exists on t."""
287
self.assertEqualDiff('# bzr knit index 8\n',
288
t.get(knit_name + '.kndx').read())
290
self.assertTrue(t.has(knit_name + '.knit'))
292
265
def check_knits(self, t):
293
266
"""check knit content for a repository."""
294
self.assertHasKnit(t, 'inventory')
295
self.assertHasKnit(t, 'revisions')
296
self.assertHasKnit(t, 'signatures')
267
self.assertEqualDiff('# bzr knit index 7\n',
268
t.get('inventory.kndx').read())
270
self.assertTrue(t.has('inventory.knit'))
271
self.assertEqualDiff('# bzr knit index 7\n',
272
t.get('revisions.kndx').read())
274
self.assertTrue(t.has('revisions.knit'))
275
self.assertEqualDiff('# bzr knit index 7\n',
276
t.get('signatures.kndx').read())
278
self.assertTrue(t.has('signatures.knit'))
298
280
def test_shared_disk_layout(self):
299
281
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
300
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
282
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
302
284
# format 'Bazaar-NG Knit Repository Format 1'
303
285
# lock: is a directory
317
299
def test_shared_no_tree_disk_layout(self):
318
300
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
319
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
301
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
320
302
repo.set_make_working_trees(False)
322
304
# format 'Bazaar-NG Knit Repository Format 1'
338
320
self.check_knits(t)
341
class DummyRepository(object):
342
"""A dummy repository for testing."""
346
def supports_rich_root(self):
350
class InterDummy(repository.InterRepository):
351
"""An inter-repository optimised code path for DummyRepository.
353
This is for use during testing where we use DummyRepository as repositories
323
class InterString(repository.InterRepository):
324
"""An inter-repository optimised code path for strings.
326
This is for use during testing where we use strings as repositories
354
327
so that none of the default regsitered inter-repository classes will
359
332
def is_compatible(repo_source, repo_target):
360
"""InterDummy is compatible with DummyRepository."""
361
return (isinstance(repo_source, DummyRepository) and
362
isinstance(repo_target, DummyRepository))
333
"""InterString is compatible with strings-as-repos."""
334
return isinstance(repo_source, str) and isinstance(repo_target, str)
365
337
class TestInterRepository(TestCaseWithTransport):
371
343
# This also tests that the default registered optimised interrepository
372
344
# classes do not barf inappropriately when a surprising repository type
373
345
# is handed to them.
374
dummy_a = DummyRepository()
375
dummy_b = DummyRepository()
346
dummy_a = "Repository 1."
347
dummy_b = "Repository 2."
376
348
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
378
350
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
379
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
381
The effective default is now InterSameDataRepository because there is
382
no actual sane default in the presence of incompatible data models.
351
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
384
352
inter_repo = repository.InterRepository.get(repo_a, repo_b)
385
self.assertEqual(repository.InterSameDataRepository,
353
self.assertEqual(repository.InterRepository,
386
354
inter_repo.__class__)
387
355
self.assertEqual(repo_a, inter_repo.source)
388
356
self.assertEqual(repo_b, inter_repo.target)
393
361
# and that it is correctly selected when given a repository
394
362
# pair that it returns true on for the is_compatible static method
396
dummy_a = DummyRepository()
397
dummy_b = DummyRepository()
398
repo = self.make_repository('.')
399
# hack dummies to look like repo somewhat.
400
dummy_a._serializer = repo._serializer
401
dummy_b._serializer = repo._serializer
402
repository.InterRepository.register_optimiser(InterDummy)
364
dummy_a = "Repository 1."
365
dummy_b = "Repository 2."
366
repository.InterRepository.register_optimiser(InterString)
404
# we should get the default for something InterDummy returns False
368
# we should get the default for something InterString returns False
406
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
407
self.assertGetsDefaultInterRepository(dummy_a, repo)
408
# and we should get an InterDummy for a pair it 'likes'
409
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
370
self.assertFalse(InterString.is_compatible(dummy_a, None))
371
self.assertGetsDefaultInterRepository(dummy_a, None)
372
# and we should get an InterString for a pair it 'likes'
373
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
410
374
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
411
self.assertEqual(InterDummy, inter_repo.__class__)
375
self.assertEqual(InterString, inter_repo.__class__)
412
376
self.assertEqual(dummy_a, inter_repo.source)
413
377
self.assertEqual(dummy_b, inter_repo.target)
415
repository.InterRepository.unregister_optimiser(InterDummy)
379
repository.InterRepository.unregister_optimiser(InterString)
416
380
# now we should get the default InterRepository object again.
417
381
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
422
386
def test_is_compatible_and_registered(self):
423
387
# InterWeaveRepo is compatible when either side
424
388
# is a format 5/6/7 branch
425
from bzrlib.repofmt import knitrepo, weaverepo
426
formats = [weaverepo.RepositoryFormat5(),
427
weaverepo.RepositoryFormat6(),
428
weaverepo.RepositoryFormat7()]
429
incompatible_formats = [weaverepo.RepositoryFormat4(),
430
knitrepo.RepositoryFormatKnit1(),
389
formats = [repository.RepositoryFormat5(),
390
repository.RepositoryFormat6(),
391
repository.RepositoryFormat7()]
392
incompatible_formats = [repository.RepositoryFormat4(),
393
repository.RepositoryFormatKnit1(),
432
395
repo_a = self.make_repository('a')
433
396
repo_b = self.make_repository('b')
454
417
t = get_transport(self.get_url('.'))
455
418
t.mkdir('repository')
456
419
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
457
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
458
target_format = knitrepo.RepositoryFormatKnit1()
420
repo = repository.RepositoryFormat7().initialize(repo_dir)
421
target_format = repository.RepositoryFormatKnit1()
459
422
converter = repository.CopyConverter(target_format)
460
423
pb = bzrlib.ui.ui_factory.nested_progress_bar()
465
428
repo = repo_dir.open_repository()
466
429
self.assertTrue(isinstance(target_format, repo._format.__class__))
469
class TestMisc(TestCase):
471
def test_unescape_xml(self):
472
"""We get some kind of error when malformed entities are passed"""
473
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
478
def test_convert(self):
479
"""Ensure the upgrade adds weaves for roots"""
480
format = bzrdir.BzrDirMetaFormat1()
481
format.repository_format = knitrepo.RepositoryFormatKnit1()
482
tree = self.make_branch_and_tree('.', format)
483
tree.commit("Dull commit", rev_id="dull")
484
revision_tree = tree.branch.repository.revision_tree('dull')
485
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
486
revision_tree.inventory.root.file_id)
487
format = bzrdir.BzrDirMetaFormat1()
488
format.repository_format = knitrepo.RepositoryFormatKnit3()
489
upgrade.Convert('.', format)
490
tree = workingtree.WorkingTree.open('.')
491
revision_tree = tree.branch.repository.revision_tree('dull')
492
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
493
tree.commit("Another dull commit", rev_id='dull2')
494
revision_tree = tree.branch.repository.revision_tree('dull2')
495
self.assertEqual('dull', revision_tree.inventory.root.revision)