1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
49
class TestDefaultFormat(TestCase):
51
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
54
old_format = repository.RepositoryFormat.get_default_format()
55
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
63
# creating a repository should now create an instrumented dir.
65
# the default branch format is used by the meta dir format
66
# which is not the default bzrdir format at this point
67
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
68
result = dir.create_repository()
69
self.assertEqual(result, 'A bzr repository dir')
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.register('default', old_default, '')
73
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
77
class SampleRepositoryFormat(repository.RepositoryFormat):
80
this format is initializable, unsupported to aid in testing the
81
open and open(unsupported=True) routines.
84
def get_format_string(self):
85
"""See RepositoryFormat.get_format_string()."""
86
return "Sample .bzr repository format."
88
def initialize(self, a_bzrdir, shared=False):
89
"""Initialize a repository in a BzrDir"""
90
t = a_bzrdir.get_repository_transport(self)
91
t.put_bytes('format', self.get_format_string())
92
return 'A bzr repository dir'
94
def is_supported(self):
97
def open(self, a_bzrdir, _found=False):
98
return "opened repository."
101
class TestRepositoryFormat(TestCaseWithTransport):
102
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
104
def test_find_format(self):
105
# is the right format object found for a repository?
106
# create a branch with a few known format objects.
107
# this is not quite the same as
108
self.build_tree(["foo/", "bar/"])
109
def check_format(format, url):
110
dir = format._matchingbzrdir.initialize(url)
111
format.initialize(dir)
112
t = get_transport(url)
113
found_format = repository.RepositoryFormat.find_format(dir)
114
self.failUnless(isinstance(found_format, format.__class__))
115
check_format(weaverepo.RepositoryFormat7(), "bar")
117
def test_find_format_no_repository(self):
118
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
119
self.assertRaises(errors.NoRepositoryPresent,
120
repository.RepositoryFormat.find_format,
123
def test_find_format_unknown_format(self):
124
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
125
SampleRepositoryFormat().initialize(dir)
126
self.assertRaises(UnknownFormatError,
127
repository.RepositoryFormat.find_format,
130
def test_register_unregister_format(self):
131
format = SampleRepositoryFormat()
133
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
135
format.initialize(dir)
136
# register a format for it.
137
repository.RepositoryFormat.register_format(format)
138
# which repository.Open will refuse (not supported)
139
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
140
# but open(unsupported) will work
141
self.assertEqual(format.open(dir), "opened repository.")
142
# unregister the format
143
repository.RepositoryFormat.unregister_format(format)
146
class TestFormat6(TestCaseWithTransport):
148
def test_no_ancestry_weave(self):
149
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
150
repo = weaverepo.RepositoryFormat6().initialize(control)
151
# We no longer need to create the ancestry.weave file
152
# since it is *never* used.
153
self.assertRaises(NoSuchFile,
154
control.transport.get,
158
class TestFormat7(TestCaseWithTransport):
160
def test_disk_layout(self):
161
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
162
repo = weaverepo.RepositoryFormat7().initialize(control)
163
# in case of side effects of locking.
167
# format 'Bazaar-NG Repository format 7'
169
# inventory.weave == empty_weave
170
# empty revision-store directory
171
# empty weaves directory
172
t = control.get_repository_transport(None)
173
self.assertEqualDiff('Bazaar-NG Repository format 7',
174
t.get('format').read())
175
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
176
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
177
self.assertEqualDiff('# bzr weave file v5\n'
180
t.get('inventory.weave').read())
182
def test_shared_disk_layout(self):
183
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
184
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
186
# format 'Bazaar-NG Repository format 7'
187
# inventory.weave == empty_weave
188
# empty revision-store directory
189
# empty weaves directory
190
# a 'shared-storage' marker file.
191
# lock is not present when unlocked
192
t = control.get_repository_transport(None)
193
self.assertEqualDiff('Bazaar-NG Repository format 7',
194
t.get('format').read())
195
self.assertEqualDiff('', t.get('shared-storage').read())
196
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
197
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
198
self.assertEqualDiff('# bzr weave file v5\n'
201
t.get('inventory.weave').read())
202
self.assertFalse(t.has('branch-lock'))
204
def test_creates_lockdir(self):
205
"""Make sure it appears to be controlled by a LockDir existence"""
206
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
207
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
208
t = control.get_repository_transport(None)
209
# TODO: Should check there is a 'lock' toplevel directory,
210
# regardless of contents
211
self.assertFalse(t.has('lock/held/info'))
214
self.assertTrue(t.has('lock/held/info'))
216
# unlock so we don't get a warning about failing to do so
219
def test_uses_lockdir(self):
220
"""repo format 7 actually locks on lockdir"""
221
base_url = self.get_url()
222
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
223
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
224
t = control.get_repository_transport(None)
228
# make sure the same lock is created by opening it
229
repo = repository.Repository.open(base_url)
231
self.assertTrue(t.has('lock/held/info'))
233
self.assertFalse(t.has('lock/held/info'))
235
def test_shared_no_tree_disk_layout(self):
236
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
237
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
238
repo.set_make_working_trees(False)
240
# format 'Bazaar-NG Repository format 7'
242
# inventory.weave == empty_weave
243
# empty revision-store directory
244
# empty weaves directory
245
# a 'shared-storage' marker file.
246
t = control.get_repository_transport(None)
247
self.assertEqualDiff('Bazaar-NG Repository format 7',
248
t.get('format').read())
249
## self.assertEqualDiff('', t.get('lock').read())
250
self.assertEqualDiff('', t.get('shared-storage').read())
251
self.assertEqualDiff('', t.get('no-working-trees').read())
252
repo.set_make_working_trees(True)
253
self.assertFalse(t.has('no-working-trees'))
254
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
255
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
256
self.assertEqualDiff('# bzr weave file v5\n'
259
t.get('inventory.weave').read())
262
class TestFormatKnit1(TestCaseWithTransport):
264
def test_disk_layout(self):
265
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
266
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
267
# in case of side effects of locking.
271
# format 'Bazaar-NG Knit Repository Format 1'
272
# lock: is a directory
273
# inventory.weave == empty_weave
274
# empty revision-store directory
275
# empty weaves directory
276
t = control.get_repository_transport(None)
277
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
278
t.get('format').read())
279
# XXX: no locks left when unlocked at the moment
280
# self.assertEqualDiff('', t.get('lock').read())
281
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
284
def assertHasKnit(self, t, knit_name):
285
"""Assert that knit_name exists on t."""
286
self.assertEqualDiff('# bzr knit index 8\n',
287
t.get(knit_name + '.kndx').read())
289
self.assertTrue(t.has(knit_name + '.knit'))
291
def check_knits(self, t):
292
"""check knit content for a repository."""
293
self.assertHasKnit(t, 'inventory')
294
self.assertHasKnit(t, 'revisions')
295
self.assertHasKnit(t, 'signatures')
297
def test_shared_disk_layout(self):
298
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
299
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
301
# format 'Bazaar-NG Knit Repository Format 1'
302
# lock: is a directory
303
# inventory.weave == empty_weave
304
# empty revision-store directory
305
# empty weaves directory
306
# a 'shared-storage' marker file.
307
t = control.get_repository_transport(None)
308
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
309
t.get('format').read())
310
# XXX: no locks left when unlocked at the moment
311
# self.assertEqualDiff('', t.get('lock').read())
312
self.assertEqualDiff('', t.get('shared-storage').read())
313
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
316
def test_shared_no_tree_disk_layout(self):
317
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
318
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
319
repo.set_make_working_trees(False)
321
# format 'Bazaar-NG Knit Repository Format 1'
323
# inventory.weave == empty_weave
324
# empty revision-store directory
325
# empty weaves directory
326
# a 'shared-storage' marker file.
327
t = control.get_repository_transport(None)
328
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
329
t.get('format').read())
330
# XXX: no locks left when unlocked at the moment
331
# self.assertEqualDiff('', t.get('lock').read())
332
self.assertEqualDiff('', t.get('shared-storage').read())
333
self.assertEqualDiff('', t.get('no-working-trees').read())
334
repo.set_make_working_trees(True)
335
self.assertFalse(t.has('no-working-trees'))
336
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
340
class DummyRepository(object):
341
"""A dummy repository for testing."""
345
def supports_rich_root(self):
349
class InterDummy(repository.InterRepository):
350
"""An inter-repository optimised code path for DummyRepository.
352
This is for use during testing where we use DummyRepository as repositories
353
so that none of the default regsitered inter-repository classes will
358
def is_compatible(repo_source, repo_target):
359
"""InterDummy is compatible with DummyRepository."""
360
return (isinstance(repo_source, DummyRepository) and
361
isinstance(repo_target, DummyRepository))
364
class TestInterRepository(TestCaseWithTransport):
366
def test_get_default_inter_repository(self):
367
# test that the InterRepository.get(repo_a, repo_b) probes
368
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
369
# true and returns a default inter_repo otherwise.
370
# This also tests that the default registered optimised interrepository
371
# classes do not barf inappropriately when a surprising repository type
373
dummy_a = DummyRepository()
374
dummy_b = DummyRepository()
375
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
377
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
378
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
380
The effective default is now InterSameDataRepository because there is
381
no actual sane default in the presence of incompatible data models.
383
inter_repo = repository.InterRepository.get(repo_a, repo_b)
384
self.assertEqual(repository.InterSameDataRepository,
385
inter_repo.__class__)
386
self.assertEqual(repo_a, inter_repo.source)
387
self.assertEqual(repo_b, inter_repo.target)
389
def test_register_inter_repository_class(self):
390
# test that a optimised code path provider - a
391
# InterRepository subclass can be registered and unregistered
392
# and that it is correctly selected when given a repository
393
# pair that it returns true on for the is_compatible static method
395
dummy_a = DummyRepository()
396
dummy_b = DummyRepository()
397
repo = self.make_repository('.')
398
# hack dummies to look like repo somewhat.
399
dummy_a._serializer = repo._serializer
400
dummy_b._serializer = repo._serializer
401
repository.InterRepository.register_optimiser(InterDummy)
403
# we should get the default for something InterDummy returns False
405
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
406
self.assertGetsDefaultInterRepository(dummy_a, repo)
407
# and we should get an InterDummy for a pair it 'likes'
408
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
409
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
410
self.assertEqual(InterDummy, inter_repo.__class__)
411
self.assertEqual(dummy_a, inter_repo.source)
412
self.assertEqual(dummy_b, inter_repo.target)
414
repository.InterRepository.unregister_optimiser(InterDummy)
415
# now we should get the default InterRepository object again.
416
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
419
class TestInterWeaveRepo(TestCaseWithTransport):
421
def test_is_compatible_and_registered(self):
422
# InterWeaveRepo is compatible when either side
423
# is a format 5/6/7 branch
424
from bzrlib.repofmt import knitrepo, weaverepo
425
formats = [weaverepo.RepositoryFormat5(),
426
weaverepo.RepositoryFormat6(),
427
weaverepo.RepositoryFormat7()]
428
incompatible_formats = [weaverepo.RepositoryFormat4(),
429
knitrepo.RepositoryFormatKnit1(),
431
repo_a = self.make_repository('a')
432
repo_b = self.make_repository('b')
433
is_compatible = repository.InterWeaveRepo.is_compatible
434
for source in incompatible_formats:
435
# force incompatible left then right
436
repo_a._format = source
437
repo_b._format = formats[0]
438
self.assertFalse(is_compatible(repo_a, repo_b))
439
self.assertFalse(is_compatible(repo_b, repo_a))
440
for source in formats:
441
repo_a._format = source
442
for target in formats:
443
repo_b._format = target
444
self.assertTrue(is_compatible(repo_a, repo_b))
445
self.assertEqual(repository.InterWeaveRepo,
446
repository.InterRepository.get(repo_a,
450
class TestRepositoryConverter(TestCaseWithTransport):
452
def test_convert_empty(self):
453
t = get_transport(self.get_url('.'))
454
t.mkdir('repository')
455
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
456
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
457
target_format = knitrepo.RepositoryFormatKnit1()
458
converter = repository.CopyConverter(target_format)
459
pb = bzrlib.ui.ui_factory.nested_progress_bar()
461
converter.convert(repo, pb)
464
repo = repo_dir.open_repository()
465
self.assertTrue(isinstance(target_format, repo._format.__class__))
468
class TestMisc(TestCase):
470
def test_unescape_xml(self):
471
"""We get some kind of error when malformed entities are passed"""
472
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
475
class TestRepositoryFormatKnit3(TestCaseWithTransport):
477
def test_convert(self):
478
"""Ensure the upgrade adds weaves for roots"""
479
format = bzrdir.BzrDirMetaFormat1()
480
format.repository_format = knitrepo.RepositoryFormatKnit1()
481
tree = self.make_branch_and_tree('.', format)
482
tree.commit("Dull commit", rev_id="dull")
483
revision_tree = tree.branch.repository.revision_tree('dull')
484
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
485
revision_tree.inventory.root.file_id)
486
format = bzrdir.BzrDirMetaFormat1()
487
format.repository_format = knitrepo.RepositoryFormatKnit3()
488
upgrade.Convert('.', format)
489
tree = workingtree.WorkingTree.open('.')
490
revision_tree = tree.branch.repository.revision_tree('dull')
491
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
492
tree.commit("Another dull commit", rev_id='dull2')
493
revision_tree = tree.branch.repository.revision_tree('dull2')
494
self.assertEqual('dull', revision_tree.inventory.root.revision)