1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
49
class TestDefaultFormat(TestCase):
51
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
54
old_format = repository.RepositoryFormat.get_default_format()
55
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
63
# creating a repository should now create an instrumented dir.
65
# the default branch format is used by the meta dir format
66
# which is not the default bzrdir format at this point
67
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
68
result = dir.create_repository()
69
self.assertEqual(result, 'A bzr repository dir')
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.remove('sample')
73
bzrdir.format_registry.register('default', old_default, '')
74
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
78
class SampleRepositoryFormat(repository.RepositoryFormat):
81
this format is initializable, unsupported to aid in testing the
82
open and open(unsupported=True) routines.
85
def get_format_string(self):
86
"""See RepositoryFormat.get_format_string()."""
87
return "Sample .bzr repository format."
89
def initialize(self, a_bzrdir, shared=False):
90
"""Initialize a repository in a BzrDir"""
91
t = a_bzrdir.get_repository_transport(self)
92
t.put_bytes('format', self.get_format_string())
93
return 'A bzr repository dir'
95
def is_supported(self):
98
def open(self, a_bzrdir, _found=False):
99
return "opened repository."
102
class TestRepositoryFormat(TestCaseWithTransport):
103
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
105
def test_find_format(self):
106
# is the right format object found for a repository?
107
# create a branch with a few known format objects.
108
# this is not quite the same as
109
self.build_tree(["foo/", "bar/"])
110
def check_format(format, url):
111
dir = format._matchingbzrdir.initialize(url)
112
format.initialize(dir)
113
t = get_transport(url)
114
found_format = repository.RepositoryFormat.find_format(dir)
115
self.failUnless(isinstance(found_format, format.__class__))
116
check_format(weaverepo.RepositoryFormat7(), "bar")
118
def test_find_format_no_repository(self):
119
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
120
self.assertRaises(errors.NoRepositoryPresent,
121
repository.RepositoryFormat.find_format,
124
def test_find_format_unknown_format(self):
125
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
126
SampleRepositoryFormat().initialize(dir)
127
self.assertRaises(UnknownFormatError,
128
repository.RepositoryFormat.find_format,
131
def test_register_unregister_format(self):
132
format = SampleRepositoryFormat()
134
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
136
format.initialize(dir)
137
# register a format for it.
138
repository.RepositoryFormat.register_format(format)
139
# which repository.Open will refuse (not supported)
140
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
141
# but open(unsupported) will work
142
self.assertEqual(format.open(dir), "opened repository.")
143
# unregister the format
144
repository.RepositoryFormat.unregister_format(format)
147
class TestFormat6(TestCaseWithTransport):
149
def test_no_ancestry_weave(self):
150
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
151
repo = weaverepo.RepositoryFormat6().initialize(control)
152
# We no longer need to create the ancestry.weave file
153
# since it is *never* used.
154
self.assertRaises(NoSuchFile,
155
control.transport.get,
159
class TestFormat7(TestCaseWithTransport):
161
def test_disk_layout(self):
162
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
163
repo = weaverepo.RepositoryFormat7().initialize(control)
164
# in case of side effects of locking.
168
# format 'Bazaar-NG Repository format 7'
170
# inventory.weave == empty_weave
171
# empty revision-store directory
172
# empty weaves directory
173
t = control.get_repository_transport(None)
174
self.assertEqualDiff('Bazaar-NG Repository format 7',
175
t.get('format').read())
176
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
177
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
178
self.assertEqualDiff('# bzr weave file v5\n'
181
t.get('inventory.weave').read())
183
def test_shared_disk_layout(self):
184
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
185
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
187
# format 'Bazaar-NG Repository format 7'
188
# inventory.weave == empty_weave
189
# empty revision-store directory
190
# empty weaves directory
191
# a 'shared-storage' marker file.
192
# lock is not present when unlocked
193
t = control.get_repository_transport(None)
194
self.assertEqualDiff('Bazaar-NG Repository format 7',
195
t.get('format').read())
196
self.assertEqualDiff('', t.get('shared-storage').read())
197
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
198
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
199
self.assertEqualDiff('# bzr weave file v5\n'
202
t.get('inventory.weave').read())
203
self.assertFalse(t.has('branch-lock'))
205
def test_creates_lockdir(self):
206
"""Make sure it appears to be controlled by a LockDir existence"""
207
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
209
t = control.get_repository_transport(None)
210
# TODO: Should check there is a 'lock' toplevel directory,
211
# regardless of contents
212
self.assertFalse(t.has('lock/held/info'))
215
self.assertTrue(t.has('lock/held/info'))
217
# unlock so we don't get a warning about failing to do so
220
def test_uses_lockdir(self):
221
"""repo format 7 actually locks on lockdir"""
222
base_url = self.get_url()
223
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
224
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
225
t = control.get_repository_transport(None)
229
# make sure the same lock is created by opening it
230
repo = repository.Repository.open(base_url)
232
self.assertTrue(t.has('lock/held/info'))
234
self.assertFalse(t.has('lock/held/info'))
236
def test_shared_no_tree_disk_layout(self):
237
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
239
repo.set_make_working_trees(False)
241
# format 'Bazaar-NG Repository format 7'
243
# inventory.weave == empty_weave
244
# empty revision-store directory
245
# empty weaves directory
246
# a 'shared-storage' marker file.
247
t = control.get_repository_transport(None)
248
self.assertEqualDiff('Bazaar-NG Repository format 7',
249
t.get('format').read())
250
## self.assertEqualDiff('', t.get('lock').read())
251
self.assertEqualDiff('', t.get('shared-storage').read())
252
self.assertEqualDiff('', t.get('no-working-trees').read())
253
repo.set_make_working_trees(True)
254
self.assertFalse(t.has('no-working-trees'))
255
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
256
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
257
self.assertEqualDiff('# bzr weave file v5\n'
260
t.get('inventory.weave').read())
263
class TestFormatKnit1(TestCaseWithTransport):
265
def test_disk_layout(self):
266
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
267
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
268
# in case of side effects of locking.
272
# format 'Bazaar-NG Knit Repository Format 1'
273
# lock: is a directory
274
# inventory.weave == empty_weave
275
# empty revision-store directory
276
# empty weaves directory
277
t = control.get_repository_transport(None)
278
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
279
t.get('format').read())
280
# XXX: no locks left when unlocked at the moment
281
# self.assertEqualDiff('', t.get('lock').read())
282
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
285
def assertHasKnit(self, t, knit_name):
286
"""Assert that knit_name exists on t."""
287
self.assertEqualDiff('# bzr knit index 8\n',
288
t.get(knit_name + '.kndx').read())
290
self.assertTrue(t.has(knit_name + '.knit'))
292
def check_knits(self, t):
293
"""check knit content for a repository."""
294
self.assertHasKnit(t, 'inventory')
295
self.assertHasKnit(t, 'revisions')
296
self.assertHasKnit(t, 'signatures')
298
def test_shared_disk_layout(self):
299
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
300
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
302
# format 'Bazaar-NG Knit Repository Format 1'
303
# lock: is a directory
304
# inventory.weave == empty_weave
305
# empty revision-store directory
306
# empty weaves directory
307
# a 'shared-storage' marker file.
308
t = control.get_repository_transport(None)
309
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
310
t.get('format').read())
311
# XXX: no locks left when unlocked at the moment
312
# self.assertEqualDiff('', t.get('lock').read())
313
self.assertEqualDiff('', t.get('shared-storage').read())
314
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
317
def test_shared_no_tree_disk_layout(self):
318
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
319
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
320
repo.set_make_working_trees(False)
322
# format 'Bazaar-NG Knit Repository Format 1'
324
# inventory.weave == empty_weave
325
# empty revision-store directory
326
# empty weaves directory
327
# a 'shared-storage' marker file.
328
t = control.get_repository_transport(None)
329
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
330
t.get('format').read())
331
# XXX: no locks left when unlocked at the moment
332
# self.assertEqualDiff('', t.get('lock').read())
333
self.assertEqualDiff('', t.get('shared-storage').read())
334
self.assertEqualDiff('', t.get('no-working-trees').read())
335
repo.set_make_working_trees(True)
336
self.assertFalse(t.has('no-working-trees'))
337
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
341
class DummyRepository(object):
342
"""A dummy repository for testing."""
346
def supports_rich_root(self):
350
class InterDummy(repository.InterRepository):
351
"""An inter-repository optimised code path for DummyRepository.
353
This is for use during testing where we use DummyRepository as repositories
354
so that none of the default regsitered inter-repository classes will
359
def is_compatible(repo_source, repo_target):
360
"""InterDummy is compatible with DummyRepository."""
361
return (isinstance(repo_source, DummyRepository) and
362
isinstance(repo_target, DummyRepository))
365
class TestInterRepository(TestCaseWithTransport):
367
def test_get_default_inter_repository(self):
368
# test that the InterRepository.get(repo_a, repo_b) probes
369
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
370
# true and returns a default inter_repo otherwise.
371
# This also tests that the default registered optimised interrepository
372
# classes do not barf inappropriately when a surprising repository type
374
dummy_a = DummyRepository()
375
dummy_b = DummyRepository()
376
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
378
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
379
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
381
The effective default is now InterSameDataRepository because there is
382
no actual sane default in the presence of incompatible data models.
384
inter_repo = repository.InterRepository.get(repo_a, repo_b)
385
self.assertEqual(repository.InterSameDataRepository,
386
inter_repo.__class__)
387
self.assertEqual(repo_a, inter_repo.source)
388
self.assertEqual(repo_b, inter_repo.target)
390
def test_register_inter_repository_class(self):
391
# test that a optimised code path provider - a
392
# InterRepository subclass can be registered and unregistered
393
# and that it is correctly selected when given a repository
394
# pair that it returns true on for the is_compatible static method
396
dummy_a = DummyRepository()
397
dummy_b = DummyRepository()
398
repo = self.make_repository('.')
399
# hack dummies to look like repo somewhat.
400
dummy_a._serializer = repo._serializer
401
dummy_b._serializer = repo._serializer
402
repository.InterRepository.register_optimiser(InterDummy)
404
# we should get the default for something InterDummy returns False
406
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
407
self.assertGetsDefaultInterRepository(dummy_a, repo)
408
# and we should get an InterDummy for a pair it 'likes'
409
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
410
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
411
self.assertEqual(InterDummy, inter_repo.__class__)
412
self.assertEqual(dummy_a, inter_repo.source)
413
self.assertEqual(dummy_b, inter_repo.target)
415
repository.InterRepository.unregister_optimiser(InterDummy)
416
# now we should get the default InterRepository object again.
417
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
420
class TestInterWeaveRepo(TestCaseWithTransport):
422
def test_is_compatible_and_registered(self):
423
# InterWeaveRepo is compatible when either side
424
# is a format 5/6/7 branch
425
from bzrlib.repofmt import knitrepo, weaverepo
426
formats = [weaverepo.RepositoryFormat5(),
427
weaverepo.RepositoryFormat6(),
428
weaverepo.RepositoryFormat7()]
429
incompatible_formats = [weaverepo.RepositoryFormat4(),
430
knitrepo.RepositoryFormatKnit1(),
432
repo_a = self.make_repository('a')
433
repo_b = self.make_repository('b')
434
is_compatible = repository.InterWeaveRepo.is_compatible
435
for source in incompatible_formats:
436
# force incompatible left then right
437
repo_a._format = source
438
repo_b._format = formats[0]
439
self.assertFalse(is_compatible(repo_a, repo_b))
440
self.assertFalse(is_compatible(repo_b, repo_a))
441
for source in formats:
442
repo_a._format = source
443
for target in formats:
444
repo_b._format = target
445
self.assertTrue(is_compatible(repo_a, repo_b))
446
self.assertEqual(repository.InterWeaveRepo,
447
repository.InterRepository.get(repo_a,
451
class TestRepositoryConverter(TestCaseWithTransport):
453
def test_convert_empty(self):
454
t = get_transport(self.get_url('.'))
455
t.mkdir('repository')
456
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
457
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
458
target_format = knitrepo.RepositoryFormatKnit1()
459
converter = repository.CopyConverter(target_format)
460
pb = bzrlib.ui.ui_factory.nested_progress_bar()
462
converter.convert(repo, pb)
465
repo = repo_dir.open_repository()
466
self.assertTrue(isinstance(target_format, repo._format.__class__))
469
class TestMisc(TestCase):
471
def test_unescape_xml(self):
472
"""We get some kind of error when malformed entities are passed"""
473
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
478
def test_convert(self):
479
"""Ensure the upgrade adds weaves for roots"""
480
format = bzrdir.BzrDirMetaFormat1()
481
format.repository_format = knitrepo.RepositoryFormatKnit1()
482
tree = self.make_branch_and_tree('.', format)
483
tree.commit("Dull commit", rev_id="dull")
484
revision_tree = tree.branch.repository.revision_tree('dull')
485
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
486
revision_tree.inventory.root.file_id)
487
format = bzrdir.BzrDirMetaFormat1()
488
format.repository_format = knitrepo.RepositoryFormatKnit3()
489
upgrade.Convert('.', format)
490
tree = workingtree.WorkingTree.open('.')
491
revision_tree = tree.branch.repository.revision_tree('dull')
492
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
493
tree.commit("Another dull commit", rev_id='dull2')
494
revision_tree = tree.branch.repository.revision_tree('dull2')
495
self.assertEqual('dull', revision_tree.inventory.root.revision)