1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import (
40
TestCaseWithTransport,
43
from bzrlib.transport import get_transport
44
from bzrlib.transport.memory import MemoryServer
45
from bzrlib.util import bencode
51
from bzrlib.repofmt import knitrepo, weaverepo
54
class TestDefaultFormat(TestCase):
56
def test_get_set_default_format(self):
57
old_default = bzrdir.format_registry.get('default')
58
private_default = old_default().repository_format.__class__
59
old_format = repository.RepositoryFormat.get_default_format()
60
self.assertTrue(isinstance(old_format, private_default))
61
def make_sample_bzrdir():
62
my_bzrdir = bzrdir.BzrDirMetaFormat1()
63
my_bzrdir.repository_format = SampleRepositoryFormat()
65
bzrdir.format_registry.remove('default')
66
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
67
bzrdir.format_registry.set_default('sample')
68
# creating a repository should now create an instrumented dir.
70
# the default branch format is used by the meta dir format
71
# which is not the default bzrdir format at this point
72
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
73
result = dir.create_repository()
74
self.assertEqual(result, 'A bzr repository dir')
76
bzrdir.format_registry.remove('default')
77
bzrdir.format_registry.remove('sample')
78
bzrdir.format_registry.register('default', old_default, '')
79
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
83
class SampleRepositoryFormat(repository.RepositoryFormat):
86
this format is initializable, unsupported to aid in testing the
87
open and open(unsupported=True) routines.
90
def get_format_string(self):
91
"""See RepositoryFormat.get_format_string()."""
92
return "Sample .bzr repository format."
94
def initialize(self, a_bzrdir, shared=False):
95
"""Initialize a repository in a BzrDir"""
96
t = a_bzrdir.get_repository_transport(self)
97
t.put_bytes('format', self.get_format_string())
98
return 'A bzr repository dir'
100
def is_supported(self):
103
def open(self, a_bzrdir, _found=False):
104
return "opened repository."
107
class TestRepositoryFormat(TestCaseWithTransport):
108
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
110
def test_find_format(self):
111
# is the right format object found for a repository?
112
# create a branch with a few known format objects.
113
# this is not quite the same as
114
self.build_tree(["foo/", "bar/"])
115
def check_format(format, url):
116
dir = format._matchingbzrdir.initialize(url)
117
format.initialize(dir)
118
t = get_transport(url)
119
found_format = repository.RepositoryFormat.find_format(dir)
120
self.failUnless(isinstance(found_format, format.__class__))
121
check_format(weaverepo.RepositoryFormat7(), "bar")
123
def test_find_format_no_repository(self):
124
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
125
self.assertRaises(errors.NoRepositoryPresent,
126
repository.RepositoryFormat.find_format,
129
def test_find_format_unknown_format(self):
130
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
131
SampleRepositoryFormat().initialize(dir)
132
self.assertRaises(UnknownFormatError,
133
repository.RepositoryFormat.find_format,
136
def test_register_unregister_format(self):
137
format = SampleRepositoryFormat()
139
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
141
format.initialize(dir)
142
# register a format for it.
143
repository.RepositoryFormat.register_format(format)
144
# which repository.Open will refuse (not supported)
145
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
146
# but open(unsupported) will work
147
self.assertEqual(format.open(dir), "opened repository.")
148
# unregister the format
149
repository.RepositoryFormat.unregister_format(format)
152
class TestFormat6(TestCaseWithTransport):
154
def test_no_ancestry_weave(self):
155
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
156
repo = weaverepo.RepositoryFormat6().initialize(control)
157
# We no longer need to create the ancestry.weave file
158
# since it is *never* used.
159
self.assertRaises(NoSuchFile,
160
control.transport.get,
164
class TestFormat7(TestCaseWithTransport):
166
def test_disk_layout(self):
167
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
168
repo = weaverepo.RepositoryFormat7().initialize(control)
169
# in case of side effects of locking.
173
# format 'Bazaar-NG Repository format 7'
175
# inventory.weave == empty_weave
176
# empty revision-store directory
177
# empty weaves directory
178
t = control.get_repository_transport(None)
179
self.assertEqualDiff('Bazaar-NG Repository format 7',
180
t.get('format').read())
181
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
182
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
183
self.assertEqualDiff('# bzr weave file v5\n'
186
t.get('inventory.weave').read())
188
def test_shared_disk_layout(self):
189
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
190
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
192
# format 'Bazaar-NG Repository format 7'
193
# inventory.weave == empty_weave
194
# empty revision-store directory
195
# empty weaves directory
196
# a 'shared-storage' marker file.
197
# lock is not present when unlocked
198
t = control.get_repository_transport(None)
199
self.assertEqualDiff('Bazaar-NG Repository format 7',
200
t.get('format').read())
201
self.assertEqualDiff('', t.get('shared-storage').read())
202
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
203
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
204
self.assertEqualDiff('# bzr weave file v5\n'
207
t.get('inventory.weave').read())
208
self.assertFalse(t.has('branch-lock'))
210
def test_creates_lockdir(self):
211
"""Make sure it appears to be controlled by a LockDir existence"""
212
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
213
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
214
t = control.get_repository_transport(None)
215
# TODO: Should check there is a 'lock' toplevel directory,
216
# regardless of contents
217
self.assertFalse(t.has('lock/held/info'))
220
self.assertTrue(t.has('lock/held/info'))
222
# unlock so we don't get a warning about failing to do so
225
def test_uses_lockdir(self):
226
"""repo format 7 actually locks on lockdir"""
227
base_url = self.get_url()
228
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
229
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
230
t = control.get_repository_transport(None)
234
# make sure the same lock is created by opening it
235
repo = repository.Repository.open(base_url)
237
self.assertTrue(t.has('lock/held/info'))
239
self.assertFalse(t.has('lock/held/info'))
241
def test_shared_no_tree_disk_layout(self):
242
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
243
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
244
repo.set_make_working_trees(False)
246
# format 'Bazaar-NG Repository format 7'
248
# inventory.weave == empty_weave
249
# empty revision-store directory
250
# empty weaves directory
251
# a 'shared-storage' marker file.
252
t = control.get_repository_transport(None)
253
self.assertEqualDiff('Bazaar-NG Repository format 7',
254
t.get('format').read())
255
## self.assertEqualDiff('', t.get('lock').read())
256
self.assertEqualDiff('', t.get('shared-storage').read())
257
self.assertEqualDiff('', t.get('no-working-trees').read())
258
repo.set_make_working_trees(True)
259
self.assertFalse(t.has('no-working-trees'))
260
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
261
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
262
self.assertEqualDiff('# bzr weave file v5\n'
265
t.get('inventory.weave').read())
268
class TestFormatKnit1(TestCaseWithTransport):
270
def test_disk_layout(self):
271
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
272
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
273
# in case of side effects of locking.
277
# format 'Bazaar-NG Knit Repository Format 1'
278
# lock: is a directory
279
# inventory.weave == empty_weave
280
# empty revision-store directory
281
# empty weaves directory
282
t = control.get_repository_transport(None)
283
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
284
t.get('format').read())
285
# XXX: no locks left when unlocked at the moment
286
# self.assertEqualDiff('', t.get('lock').read())
287
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
290
def assertHasKnit(self, t, knit_name):
291
"""Assert that knit_name exists on t."""
292
self.assertEqualDiff('# bzr knit index 8\n',
293
t.get(knit_name + '.kndx').read())
295
self.assertTrue(t.has(knit_name + '.knit'))
297
def check_knits(self, t):
298
"""check knit content for a repository."""
299
self.assertHasKnit(t, 'inventory')
300
self.assertHasKnit(t, 'revisions')
301
self.assertHasKnit(t, 'signatures')
303
def test_shared_disk_layout(self):
304
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
305
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
307
# format 'Bazaar-NG Knit Repository Format 1'
308
# lock: is a directory
309
# inventory.weave == empty_weave
310
# empty revision-store directory
311
# empty weaves directory
312
# a 'shared-storage' marker file.
313
t = control.get_repository_transport(None)
314
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
315
t.get('format').read())
316
# XXX: no locks left when unlocked at the moment
317
# self.assertEqualDiff('', t.get('lock').read())
318
self.assertEqualDiff('', t.get('shared-storage').read())
319
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
322
def test_shared_no_tree_disk_layout(self):
323
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
324
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
325
repo.set_make_working_trees(False)
327
# format 'Bazaar-NG Knit Repository Format 1'
329
# inventory.weave == empty_weave
330
# empty revision-store directory
331
# empty weaves directory
332
# a 'shared-storage' marker file.
333
t = control.get_repository_transport(None)
334
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
335
t.get('format').read())
336
# XXX: no locks left when unlocked at the moment
337
# self.assertEqualDiff('', t.get('lock').read())
338
self.assertEqualDiff('', t.get('shared-storage').read())
339
self.assertEqualDiff('', t.get('no-working-trees').read())
340
repo.set_make_working_trees(True)
341
self.assertFalse(t.has('no-working-trees'))
342
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
346
class KnitRepositoryStreamTests(test_knit.KnitTests):
347
"""Tests for knitrepo._get_stream_as_bytes."""
349
def test_get_stream_as_bytes(self):
351
k1 = self.make_test_knit()
352
k1.add_lines('text-a', [], test_knit.split_lines(test_knit.TEXT_1))
354
# Serialise it, check the output.
355
bytes = knitrepo._get_stream_as_bytes(k1, ['text-a'])
356
data = bencode.bdecode(bytes)
357
format, record = data
358
self.assertEqual('knit-plain', format)
359
self.assertEqual(['text-a', ['fulltext'], []], record[:3])
360
self.assertRecordContentEqual(k1, 'text-a', record[3])
362
def test_get_stream_as_bytes_all(self):
363
"""Get a serialised data stream for all the records in a knit.
365
Much like test_get_stream_all, except for get_stream_as_bytes.
367
k1 = self.make_test_knit()
368
# Insert the same data as BasicKnitTests.test_knit_join, as they seem
369
# to cover a range of cases (no parents, one parent, multiple parents).
371
('text-a', [], test_knit.TEXT_1),
372
('text-b', ['text-a'], test_knit.TEXT_1),
373
('text-c', [], test_knit.TEXT_1),
374
('text-d', ['text-c'], test_knit.TEXT_1),
375
('text-m', ['text-b', 'text-d'], test_knit.TEXT_1),
377
expected_data_list = [
378
# version, options, parents
379
('text-a', ['fulltext'], []),
380
('text-b', ['line-delta'], ['text-a']),
381
('text-c', ['fulltext'], []),
382
('text-d', ['line-delta'], ['text-c']),
383
('text-m', ['line-delta'], ['text-b', 'text-d']),
385
for version_id, parents, lines in test_data:
386
k1.add_lines(version_id, parents, test_knit.split_lines(lines))
388
bytes = knitrepo._get_stream_as_bytes(
389
k1, ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
391
data = bencode.bdecode(bytes)
393
self.assertEqual('knit-plain', format)
395
for expected, actual in zip(expected_data_list, data):
396
expected_version = expected[0]
397
expected_options = expected[1]
398
expected_parents = expected[2]
399
version, options, parents, bytes = actual
400
self.assertEqual(expected_version, version)
401
self.assertEqual(expected_options, options)
402
self.assertEqual(expected_parents, parents)
403
self.assertRecordContentEqual(k1, version, bytes)
406
class DummyRepository(object):
407
"""A dummy repository for testing."""
411
def supports_rich_root(self):
415
class InterDummy(repository.InterRepository):
416
"""An inter-repository optimised code path for DummyRepository.
418
This is for use during testing where we use DummyRepository as repositories
419
so that none of the default regsitered inter-repository classes will
424
def is_compatible(repo_source, repo_target):
425
"""InterDummy is compatible with DummyRepository."""
426
return (isinstance(repo_source, DummyRepository) and
427
isinstance(repo_target, DummyRepository))
430
class TestInterRepository(TestCaseWithTransport):
432
def test_get_default_inter_repository(self):
433
# test that the InterRepository.get(repo_a, repo_b) probes
434
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
435
# true and returns a default inter_repo otherwise.
436
# This also tests that the default registered optimised interrepository
437
# classes do not barf inappropriately when a surprising repository type
439
dummy_a = DummyRepository()
440
dummy_b = DummyRepository()
441
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
443
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
444
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
446
The effective default is now InterSameDataRepository because there is
447
no actual sane default in the presence of incompatible data models.
449
inter_repo = repository.InterRepository.get(repo_a, repo_b)
450
self.assertEqual(repository.InterSameDataRepository,
451
inter_repo.__class__)
452
self.assertEqual(repo_a, inter_repo.source)
453
self.assertEqual(repo_b, inter_repo.target)
455
def test_register_inter_repository_class(self):
456
# test that a optimised code path provider - a
457
# InterRepository subclass can be registered and unregistered
458
# and that it is correctly selected when given a repository
459
# pair that it returns true on for the is_compatible static method
461
dummy_a = DummyRepository()
462
dummy_b = DummyRepository()
463
repo = self.make_repository('.')
464
# hack dummies to look like repo somewhat.
465
dummy_a._serializer = repo._serializer
466
dummy_b._serializer = repo._serializer
467
repository.InterRepository.register_optimiser(InterDummy)
469
# we should get the default for something InterDummy returns False
471
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
472
self.assertGetsDefaultInterRepository(dummy_a, repo)
473
# and we should get an InterDummy for a pair it 'likes'
474
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
475
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
476
self.assertEqual(InterDummy, inter_repo.__class__)
477
self.assertEqual(dummy_a, inter_repo.source)
478
self.assertEqual(dummy_b, inter_repo.target)
480
repository.InterRepository.unregister_optimiser(InterDummy)
481
# now we should get the default InterRepository object again.
482
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
485
class TestInterWeaveRepo(TestCaseWithTransport):
487
def test_is_compatible_and_registered(self):
488
# InterWeaveRepo is compatible when either side
489
# is a format 5/6/7 branch
490
from bzrlib.repofmt import knitrepo, weaverepo
491
formats = [weaverepo.RepositoryFormat5(),
492
weaverepo.RepositoryFormat6(),
493
weaverepo.RepositoryFormat7()]
494
incompatible_formats = [weaverepo.RepositoryFormat4(),
495
knitrepo.RepositoryFormatKnit1(),
497
repo_a = self.make_repository('a')
498
repo_b = self.make_repository('b')
499
is_compatible = repository.InterWeaveRepo.is_compatible
500
for source in incompatible_formats:
501
# force incompatible left then right
502
repo_a._format = source
503
repo_b._format = formats[0]
504
self.assertFalse(is_compatible(repo_a, repo_b))
505
self.assertFalse(is_compatible(repo_b, repo_a))
506
for source in formats:
507
repo_a._format = source
508
for target in formats:
509
repo_b._format = target
510
self.assertTrue(is_compatible(repo_a, repo_b))
511
self.assertEqual(repository.InterWeaveRepo,
512
repository.InterRepository.get(repo_a,
516
class TestRepositoryConverter(TestCaseWithTransport):
518
def test_convert_empty(self):
519
t = get_transport(self.get_url('.'))
520
t.mkdir('repository')
521
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
522
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
523
target_format = knitrepo.RepositoryFormatKnit1()
524
converter = repository.CopyConverter(target_format)
525
pb = bzrlib.ui.ui_factory.nested_progress_bar()
527
converter.convert(repo, pb)
530
repo = repo_dir.open_repository()
531
self.assertTrue(isinstance(target_format, repo._format.__class__))
534
class TestMisc(TestCase):
536
def test_unescape_xml(self):
537
"""We get some kind of error when malformed entities are passed"""
538
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
541
class TestRepositoryFormatKnit3(TestCaseWithTransport):
543
def test_convert(self):
544
"""Ensure the upgrade adds weaves for roots"""
545
format = bzrdir.BzrDirMetaFormat1()
546
format.repository_format = knitrepo.RepositoryFormatKnit1()
547
tree = self.make_branch_and_tree('.', format)
548
tree.commit("Dull commit", rev_id="dull")
549
revision_tree = tree.branch.repository.revision_tree('dull')
550
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
551
revision_tree.inventory.root.file_id)
552
format = bzrdir.BzrDirMetaFormat1()
553
format.repository_format = knitrepo.RepositoryFormatKnit3()
554
upgrade.Convert('.', format)
555
tree = workingtree.WorkingTree.open('.')
556
revision_tree = tree.branch.repository.revision_tree('dull')
557
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
558
tree.commit("Another dull commit", rev_id='dull2')
559
revision_tree = tree.branch.repository.revision_tree('dull2')
560
self.assertEqual('dull', revision_tree.inventory.root.revision)