1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import (
40
TestCaseWithTransport,
43
from bzrlib.transport import get_transport
44
from bzrlib.transport.memory import MemoryServer
45
from bzrlib.util import bencode
51
from bzrlib.repofmt import knitrepo, weaverepo
54
class TestDefaultFormat(TestCase):
56
def test_get_set_default_format(self):
57
old_default = bzrdir.format_registry.get('default')
58
private_default = old_default().repository_format.__class__
59
old_format = repository.RepositoryFormat.get_default_format()
60
self.assertTrue(isinstance(old_format, private_default))
61
def make_sample_bzrdir():
62
my_bzrdir = bzrdir.BzrDirMetaFormat1()
63
my_bzrdir.repository_format = SampleRepositoryFormat()
65
bzrdir.format_registry.remove('default')
66
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
67
bzrdir.format_registry.set_default('sample')
68
# creating a repository should now create an instrumented dir.
70
# the default branch format is used by the meta dir format
71
# which is not the default bzrdir format at this point
72
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
73
result = dir.create_repository()
74
self.assertEqual(result, 'A bzr repository dir')
76
bzrdir.format_registry.remove('default')
77
bzrdir.format_registry.remove('sample')
78
bzrdir.format_registry.register('default', old_default, '')
79
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
83
class SampleRepositoryFormat(repository.RepositoryFormat):
86
this format is initializable, unsupported to aid in testing the
87
open and open(unsupported=True) routines.
90
def get_format_string(self):
91
"""See RepositoryFormat.get_format_string()."""
92
return "Sample .bzr repository format."
94
def initialize(self, a_bzrdir, shared=False):
95
"""Initialize a repository in a BzrDir"""
96
t = a_bzrdir.get_repository_transport(self)
97
t.put_bytes('format', self.get_format_string())
98
return 'A bzr repository dir'
100
def is_supported(self):
103
def open(self, a_bzrdir, _found=False):
104
return "opened repository."
107
class TestRepositoryFormat(TestCaseWithTransport):
108
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
110
def test_find_format(self):
111
# is the right format object found for a repository?
112
# create a branch with a few known format objects.
113
# this is not quite the same as
114
self.build_tree(["foo/", "bar/"])
115
def check_format(format, url):
116
dir = format._matchingbzrdir.initialize(url)
117
format.initialize(dir)
118
t = get_transport(url)
119
found_format = repository.RepositoryFormat.find_format(dir)
120
self.failUnless(isinstance(found_format, format.__class__))
121
check_format(weaverepo.RepositoryFormat7(), "bar")
123
def test_find_format_no_repository(self):
124
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
125
self.assertRaises(errors.NoRepositoryPresent,
126
repository.RepositoryFormat.find_format,
129
def test_find_format_unknown_format(self):
130
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
131
SampleRepositoryFormat().initialize(dir)
132
self.assertRaises(UnknownFormatError,
133
repository.RepositoryFormat.find_format,
136
def test_register_unregister_format(self):
137
format = SampleRepositoryFormat()
139
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
141
format.initialize(dir)
142
# register a format for it.
143
repository.RepositoryFormat.register_format(format)
144
# which repository.Open will refuse (not supported)
145
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
146
# but open(unsupported) will work
147
self.assertEqual(format.open(dir), "opened repository.")
148
# unregister the format
149
repository.RepositoryFormat.unregister_format(format)
152
class TestFormat6(TestCaseWithTransport):
154
def test_no_ancestry_weave(self):
155
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
156
repo = weaverepo.RepositoryFormat6().initialize(control)
157
# We no longer need to create the ancestry.weave file
158
# since it is *never* used.
159
self.assertRaises(NoSuchFile,
160
control.transport.get,
163
def test_exposed_versioned_files_are_marked_dirty(self):
164
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
165
repo = weaverepo.RepositoryFormat6().initialize(control)
167
inv = repo.get_inventory_weave()
169
self.assertRaises(errors.OutSideTransaction,
170
inv.add_lines, 'foo', [], [])
173
class TestFormat7(TestCaseWithTransport):
175
def test_disk_layout(self):
176
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
177
repo = weaverepo.RepositoryFormat7().initialize(control)
178
# in case of side effects of locking.
182
# format 'Bazaar-NG Repository format 7'
184
# inventory.weave == empty_weave
185
# empty revision-store directory
186
# empty weaves directory
187
t = control.get_repository_transport(None)
188
self.assertEqualDiff('Bazaar-NG Repository format 7',
189
t.get('format').read())
190
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
191
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
192
self.assertEqualDiff('# bzr weave file v5\n'
195
t.get('inventory.weave').read())
197
def test_shared_disk_layout(self):
198
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
199
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
201
# format 'Bazaar-NG Repository format 7'
202
# inventory.weave == empty_weave
203
# empty revision-store directory
204
# empty weaves directory
205
# a 'shared-storage' marker file.
206
# lock is not present when unlocked
207
t = control.get_repository_transport(None)
208
self.assertEqualDiff('Bazaar-NG Repository format 7',
209
t.get('format').read())
210
self.assertEqualDiff('', t.get('shared-storage').read())
211
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
212
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
213
self.assertEqualDiff('# bzr weave file v5\n'
216
t.get('inventory.weave').read())
217
self.assertFalse(t.has('branch-lock'))
219
def test_creates_lockdir(self):
220
"""Make sure it appears to be controlled by a LockDir existence"""
221
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
222
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
223
t = control.get_repository_transport(None)
224
# TODO: Should check there is a 'lock' toplevel directory,
225
# regardless of contents
226
self.assertFalse(t.has('lock/held/info'))
229
self.assertTrue(t.has('lock/held/info'))
231
# unlock so we don't get a warning about failing to do so
234
def test_uses_lockdir(self):
235
"""repo format 7 actually locks on lockdir"""
236
base_url = self.get_url()
237
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
239
t = control.get_repository_transport(None)
243
# make sure the same lock is created by opening it
244
repo = repository.Repository.open(base_url)
246
self.assertTrue(t.has('lock/held/info'))
248
self.assertFalse(t.has('lock/held/info'))
250
def test_shared_no_tree_disk_layout(self):
251
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
252
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
253
repo.set_make_working_trees(False)
255
# format 'Bazaar-NG Repository format 7'
257
# inventory.weave == empty_weave
258
# empty revision-store directory
259
# empty weaves directory
260
# a 'shared-storage' marker file.
261
t = control.get_repository_transport(None)
262
self.assertEqualDiff('Bazaar-NG Repository format 7',
263
t.get('format').read())
264
## self.assertEqualDiff('', t.get('lock').read())
265
self.assertEqualDiff('', t.get('shared-storage').read())
266
self.assertEqualDiff('', t.get('no-working-trees').read())
267
repo.set_make_working_trees(True)
268
self.assertFalse(t.has('no-working-trees'))
269
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
270
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
271
self.assertEqualDiff('# bzr weave file v5\n'
274
t.get('inventory.weave').read())
276
def test_exposed_versioned_files_are_marked_dirty(self):
277
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
278
repo = weaverepo.RepositoryFormat7().initialize(control)
280
inv = repo.get_inventory_weave()
282
self.assertRaises(errors.OutSideTransaction,
283
inv.add_lines, 'foo', [], [])
286
class TestFormatKnit1(TestCaseWithTransport):
288
def test_disk_layout(self):
289
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
290
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
291
# in case of side effects of locking.
295
# format 'Bazaar-NG Knit Repository Format 1'
296
# lock: is a directory
297
# inventory.weave == empty_weave
298
# empty revision-store directory
299
# empty weaves directory
300
t = control.get_repository_transport(None)
301
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
302
t.get('format').read())
303
# XXX: no locks left when unlocked at the moment
304
# self.assertEqualDiff('', t.get('lock').read())
305
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
308
def assertHasKnit(self, t, knit_name):
309
"""Assert that knit_name exists on t."""
310
self.assertEqualDiff('# bzr knit index 8\n',
311
t.get(knit_name + '.kndx').read())
313
self.assertTrue(t.has(knit_name + '.knit'))
315
def check_knits(self, t):
316
"""check knit content for a repository."""
317
self.assertHasKnit(t, 'inventory')
318
self.assertHasKnit(t, 'revisions')
319
self.assertHasKnit(t, 'signatures')
321
def test_shared_disk_layout(self):
322
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
323
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
325
# format 'Bazaar-NG Knit Repository Format 1'
326
# lock: is a directory
327
# inventory.weave == empty_weave
328
# empty revision-store directory
329
# empty weaves directory
330
# a 'shared-storage' marker file.
331
t = control.get_repository_transport(None)
332
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
333
t.get('format').read())
334
# XXX: no locks left when unlocked at the moment
335
# self.assertEqualDiff('', t.get('lock').read())
336
self.assertEqualDiff('', t.get('shared-storage').read())
337
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
340
def test_shared_no_tree_disk_layout(self):
341
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
342
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
343
repo.set_make_working_trees(False)
345
# format 'Bazaar-NG Knit Repository Format 1'
347
# inventory.weave == empty_weave
348
# empty revision-store directory
349
# empty weaves directory
350
# a 'shared-storage' marker file.
351
t = control.get_repository_transport(None)
352
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
353
t.get('format').read())
354
# XXX: no locks left when unlocked at the moment
355
# self.assertEqualDiff('', t.get('lock').read())
356
self.assertEqualDiff('', t.get('shared-storage').read())
357
self.assertEqualDiff('', t.get('no-working-trees').read())
358
repo.set_make_working_trees(True)
359
self.assertFalse(t.has('no-working-trees'))
360
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
363
def test_exposed_versioned_files_are_marked_dirty(self):
364
format = bzrdir.BzrDirMetaFormat1()
365
format.repository_format = knitrepo.RepositoryFormatKnit1()
366
repo = self.make_repository('.', format=format)
368
inv = repo.get_inventory_weave()
370
self.assertRaises(errors.OutSideTransaction,
371
inv.add_lines, 'foo', [], [])
374
class KnitRepositoryStreamTests(test_knit.KnitTests):
375
"""Tests for knitrepo._get_stream_as_bytes."""
377
def test_get_stream_as_bytes(self):
379
k1 = self.make_test_knit()
380
k1.add_lines('text-a', [], test_knit.split_lines(test_knit.TEXT_1))
382
# Serialise it, check the output.
383
bytes = knitrepo._get_stream_as_bytes(k1, ['text-a'])
384
data = bencode.bdecode(bytes)
385
format, record = data
386
self.assertEqual('knit-plain', format)
387
self.assertEqual(['text-a', ['fulltext'], []], record[:3])
388
self.assertRecordContentEqual(k1, 'text-a', record[3])
390
def test_get_stream_as_bytes_all(self):
391
"""Get a serialised data stream for all the records in a knit.
393
Much like test_get_stream_all, except for get_stream_as_bytes.
395
k1 = self.make_test_knit()
396
# Insert the same data as BasicKnitTests.test_knit_join, as they seem
397
# to cover a range of cases (no parents, one parent, multiple parents).
399
('text-a', [], test_knit.TEXT_1),
400
('text-b', ['text-a'], test_knit.TEXT_1),
401
('text-c', [], test_knit.TEXT_1),
402
('text-d', ['text-c'], test_knit.TEXT_1),
403
('text-m', ['text-b', 'text-d'], test_knit.TEXT_1),
405
expected_data_list = [
406
# version, options, parents
407
('text-a', ['fulltext'], []),
408
('text-b', ['line-delta'], ['text-a']),
409
('text-c', ['fulltext'], []),
410
('text-d', ['line-delta'], ['text-c']),
411
('text-m', ['line-delta'], ['text-b', 'text-d']),
413
for version_id, parents, lines in test_data:
414
k1.add_lines(version_id, parents, test_knit.split_lines(lines))
416
bytes = knitrepo._get_stream_as_bytes(
417
k1, ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
419
data = bencode.bdecode(bytes)
421
self.assertEqual('knit-plain', format)
423
for expected, actual in zip(expected_data_list, data):
424
expected_version = expected[0]
425
expected_options = expected[1]
426
expected_parents = expected[2]
427
version, options, parents, bytes = actual
428
self.assertEqual(expected_version, version)
429
self.assertEqual(expected_options, options)
430
self.assertEqual(expected_parents, parents)
431
self.assertRecordContentEqual(k1, version, bytes)
434
class DummyRepository(object):
435
"""A dummy repository for testing."""
439
def supports_rich_root(self):
443
class InterDummy(repository.InterRepository):
444
"""An inter-repository optimised code path for DummyRepository.
446
This is for use during testing where we use DummyRepository as repositories
447
so that none of the default regsitered inter-repository classes will
452
def is_compatible(repo_source, repo_target):
453
"""InterDummy is compatible with DummyRepository."""
454
return (isinstance(repo_source, DummyRepository) and
455
isinstance(repo_target, DummyRepository))
458
class TestInterRepository(TestCaseWithTransport):
460
def test_get_default_inter_repository(self):
461
# test that the InterRepository.get(repo_a, repo_b) probes
462
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
463
# true and returns a default inter_repo otherwise.
464
# This also tests that the default registered optimised interrepository
465
# classes do not barf inappropriately when a surprising repository type
467
dummy_a = DummyRepository()
468
dummy_b = DummyRepository()
469
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
471
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
472
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
474
The effective default is now InterSameDataRepository because there is
475
no actual sane default in the presence of incompatible data models.
477
inter_repo = repository.InterRepository.get(repo_a, repo_b)
478
self.assertEqual(repository.InterSameDataRepository,
479
inter_repo.__class__)
480
self.assertEqual(repo_a, inter_repo.source)
481
self.assertEqual(repo_b, inter_repo.target)
483
def test_register_inter_repository_class(self):
484
# test that a optimised code path provider - a
485
# InterRepository subclass can be registered and unregistered
486
# and that it is correctly selected when given a repository
487
# pair that it returns true on for the is_compatible static method
489
dummy_a = DummyRepository()
490
dummy_b = DummyRepository()
491
repo = self.make_repository('.')
492
# hack dummies to look like repo somewhat.
493
dummy_a._serializer = repo._serializer
494
dummy_b._serializer = repo._serializer
495
repository.InterRepository.register_optimiser(InterDummy)
497
# we should get the default for something InterDummy returns False
499
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
500
self.assertGetsDefaultInterRepository(dummy_a, repo)
501
# and we should get an InterDummy for a pair it 'likes'
502
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
503
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
504
self.assertEqual(InterDummy, inter_repo.__class__)
505
self.assertEqual(dummy_a, inter_repo.source)
506
self.assertEqual(dummy_b, inter_repo.target)
508
repository.InterRepository.unregister_optimiser(InterDummy)
509
# now we should get the default InterRepository object again.
510
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
513
class TestInterWeaveRepo(TestCaseWithTransport):
515
def test_is_compatible_and_registered(self):
516
# InterWeaveRepo is compatible when either side
517
# is a format 5/6/7 branch
518
from bzrlib.repofmt import knitrepo, weaverepo
519
formats = [weaverepo.RepositoryFormat5(),
520
weaverepo.RepositoryFormat6(),
521
weaverepo.RepositoryFormat7()]
522
incompatible_formats = [weaverepo.RepositoryFormat4(),
523
knitrepo.RepositoryFormatKnit1(),
525
repo_a = self.make_repository('a')
526
repo_b = self.make_repository('b')
527
is_compatible = repository.InterWeaveRepo.is_compatible
528
for source in incompatible_formats:
529
# force incompatible left then right
530
repo_a._format = source
531
repo_b._format = formats[0]
532
self.assertFalse(is_compatible(repo_a, repo_b))
533
self.assertFalse(is_compatible(repo_b, repo_a))
534
for source in formats:
535
repo_a._format = source
536
for target in formats:
537
repo_b._format = target
538
self.assertTrue(is_compatible(repo_a, repo_b))
539
self.assertEqual(repository.InterWeaveRepo,
540
repository.InterRepository.get(repo_a,
544
class TestRepositoryConverter(TestCaseWithTransport):
546
def test_convert_empty(self):
547
t = get_transport(self.get_url('.'))
548
t.mkdir('repository')
549
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
550
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
551
target_format = knitrepo.RepositoryFormatKnit1()
552
converter = repository.CopyConverter(target_format)
553
pb = bzrlib.ui.ui_factory.nested_progress_bar()
555
converter.convert(repo, pb)
558
repo = repo_dir.open_repository()
559
self.assertTrue(isinstance(target_format, repo._format.__class__))
562
class TestMisc(TestCase):
564
def test_unescape_xml(self):
565
"""We get some kind of error when malformed entities are passed"""
566
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
569
class TestRepositoryFormatKnit3(TestCaseWithTransport):
571
def test_convert(self):
572
"""Ensure the upgrade adds weaves for roots"""
573
format = bzrdir.BzrDirMetaFormat1()
574
format.repository_format = knitrepo.RepositoryFormatKnit1()
575
tree = self.make_branch_and_tree('.', format)
576
tree.commit("Dull commit", rev_id="dull")
577
revision_tree = tree.branch.repository.revision_tree('dull')
578
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
579
revision_tree.inventory.root.file_id)
580
format = bzrdir.BzrDirMetaFormat1()
581
format.repository_format = knitrepo.RepositoryFormatKnit3()
582
upgrade.Convert('.', format)
583
tree = workingtree.WorkingTree.open('.')
584
revision_tree = tree.branch.repository.revision_tree('dull')
585
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
586
tree.commit("Another dull commit", rev_id='dull2')
587
revision_tree = tree.branch.repository.revision_tree('dull2')
588
self.assertEqual('dull', revision_tree.inventory.root.revision)
590
def test_exposed_versioned_files_are_marked_dirty(self):
591
format = bzrdir.BzrDirMetaFormat1()
592
format.repository_format = knitrepo.RepositoryFormatKnit3()
593
repo = self.make_repository('.', format=format)
595
inv = repo.get_inventory_weave()
597
self.assertRaises(errors.OutSideTransaction,
598
inv.add_lines, 'foo', [], [])