1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
48
class TestDefaultFormat(TestCase):
50
def test_get_set_default_format(self):
51
old_format = RepositoryFormat.get_default_format()
52
test_format = SampleRepositoryFormat()
53
RepositoryFormat.register_format(test_format)
55
self.applyDeprecated(symbol_versioning.zero_fourteen,
56
RepositoryFormat.set_default_format,
58
# creating a repository should now create an instrumented dir.
60
# the default branch format is used by the meta dir format
61
# which is not the default bzrdir format at this point
62
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
63
result = dir.create_repository()
64
self.assertEqual(result, 'A bzr repository dir')
66
self.applyDeprecated(symbol_versioning.zero_fourteen,
67
RepositoryFormat.set_default_format, old_format)
69
RepositoryFormat.unregister_format(test_format)
70
self.assertEqual(old_format, RepositoryFormat.get_default_format())
73
class SampleRepositoryFormat(repository.RepositoryFormat):
76
this format is initializable, unsupported to aid in testing the
77
open and open(unsupported=True) routines.
80
def get_format_string(self):
81
"""See RepositoryFormat.get_format_string()."""
82
return "Sample .bzr repository format."
84
def initialize(self, a_bzrdir, shared=False):
85
"""Initialize a repository in a BzrDir"""
86
t = a_bzrdir.get_repository_transport(self)
87
t.put_bytes('format', self.get_format_string())
88
return 'A bzr repository dir'
90
def is_supported(self):
93
def open(self, a_bzrdir, _found=False):
94
return "opened repository."
97
class TestRepositoryFormat(TestCaseWithTransport):
98
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
100
def test_find_format(self):
101
# is the right format object found for a repository?
102
# create a branch with a few known format objects.
103
# this is not quite the same as
104
self.build_tree(["foo/", "bar/"])
105
def check_format(format, url):
106
dir = format._matchingbzrdir.initialize(url)
107
format.initialize(dir)
108
t = get_transport(url)
109
found_format = repository.RepositoryFormat.find_format(dir)
110
self.failUnless(isinstance(found_format, format.__class__))
111
check_format(repository.RepositoryFormat7(), "bar")
113
def test_find_format_no_repository(self):
114
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
115
self.assertRaises(errors.NoRepositoryPresent,
116
repository.RepositoryFormat.find_format,
119
def test_find_format_unknown_format(self):
120
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
121
SampleRepositoryFormat().initialize(dir)
122
self.assertRaises(UnknownFormatError,
123
repository.RepositoryFormat.find_format,
126
def test_register_unregister_format(self):
127
format = SampleRepositoryFormat()
129
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
131
format.initialize(dir)
132
# register a format for it.
133
repository.RepositoryFormat.register_format(format)
134
# which repository.Open will refuse (not supported)
135
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
136
# but open(unsupported) will work
137
self.assertEqual(format.open(dir), "opened repository.")
138
# unregister the format
139
repository.RepositoryFormat.unregister_format(format)
142
class TestFormat6(TestCaseWithTransport):
144
def test_no_ancestry_weave(self):
145
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
146
repo = repository.RepositoryFormat6().initialize(control)
147
# We no longer need to create the ancestry.weave file
148
# since it is *never* used.
149
self.assertRaises(NoSuchFile,
150
control.transport.get,
154
class TestFormat7(TestCaseWithTransport):
156
def test_disk_layout(self):
157
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
158
repo = repository.RepositoryFormat7().initialize(control)
159
# in case of side effects of locking.
163
# format 'Bazaar-NG Repository format 7'
165
# inventory.weave == empty_weave
166
# empty revision-store directory
167
# empty weaves directory
168
t = control.get_repository_transport(None)
169
self.assertEqualDiff('Bazaar-NG Repository format 7',
170
t.get('format').read())
171
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
172
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
173
self.assertEqualDiff('# bzr weave file v5\n'
176
t.get('inventory.weave').read())
178
def test_shared_disk_layout(self):
179
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
180
repo = repository.RepositoryFormat7().initialize(control, shared=True)
182
# format 'Bazaar-NG Repository format 7'
183
# inventory.weave == empty_weave
184
# empty revision-store directory
185
# empty weaves directory
186
# a 'shared-storage' marker file.
187
# lock is not present when unlocked
188
t = control.get_repository_transport(None)
189
self.assertEqualDiff('Bazaar-NG Repository format 7',
190
t.get('format').read())
191
self.assertEqualDiff('', t.get('shared-storage').read())
192
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
193
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
194
self.assertEqualDiff('# bzr weave file v5\n'
197
t.get('inventory.weave').read())
198
self.assertFalse(t.has('branch-lock'))
200
def test_creates_lockdir(self):
201
"""Make sure it appears to be controlled by a LockDir existence"""
202
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
203
repo = repository.RepositoryFormat7().initialize(control, shared=True)
204
t = control.get_repository_transport(None)
205
# TODO: Should check there is a 'lock' toplevel directory,
206
# regardless of contents
207
self.assertFalse(t.has('lock/held/info'))
210
self.assertTrue(t.has('lock/held/info'))
212
# unlock so we don't get a warning about failing to do so
215
def test_uses_lockdir(self):
216
"""repo format 7 actually locks on lockdir"""
217
base_url = self.get_url()
218
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
219
repo = repository.RepositoryFormat7().initialize(control, shared=True)
220
t = control.get_repository_transport(None)
224
# make sure the same lock is created by opening it
225
repo = repository.Repository.open(base_url)
227
self.assertTrue(t.has('lock/held/info'))
229
self.assertFalse(t.has('lock/held/info'))
231
def test_shared_no_tree_disk_layout(self):
232
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
233
repo = repository.RepositoryFormat7().initialize(control, shared=True)
234
repo.set_make_working_trees(False)
236
# format 'Bazaar-NG Repository format 7'
238
# inventory.weave == empty_weave
239
# empty revision-store directory
240
# empty weaves directory
241
# a 'shared-storage' marker file.
242
t = control.get_repository_transport(None)
243
self.assertEqualDiff('Bazaar-NG Repository format 7',
244
t.get('format').read())
245
## self.assertEqualDiff('', t.get('lock').read())
246
self.assertEqualDiff('', t.get('shared-storage').read())
247
self.assertEqualDiff('', t.get('no-working-trees').read())
248
repo.set_make_working_trees(True)
249
self.assertFalse(t.has('no-working-trees'))
250
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
251
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
252
self.assertEqualDiff('# bzr weave file v5\n'
255
t.get('inventory.weave').read())
258
class TestFormatKnit1(TestCaseWithTransport):
260
def test_disk_layout(self):
261
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
262
repo = repository.RepositoryFormatKnit1().initialize(control)
263
# in case of side effects of locking.
267
# format 'Bazaar-NG Knit Repository Format 1'
268
# lock: is a directory
269
# inventory.weave == empty_weave
270
# empty revision-store directory
271
# empty weaves directory
272
t = control.get_repository_transport(None)
273
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
274
t.get('format').read())
275
# XXX: no locks left when unlocked at the moment
276
# self.assertEqualDiff('', t.get('lock').read())
277
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
280
def assertHasKnit(self, t, knit_name):
281
"""Assert that knit_name exists on t."""
282
self.assertEqualDiff('# bzr knit index 8\n',
283
t.get(knit_name + '.kndx').read())
285
self.assertTrue(t.has(knit_name + '.knit'))
287
def check_knits(self, t):
288
"""check knit content for a repository."""
289
self.assertHasKnit(t, 'inventory')
290
self.assertHasKnit(t, 'revisions')
291
self.assertHasKnit(t, 'signatures')
293
def test_shared_disk_layout(self):
294
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
295
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
297
# format 'Bazaar-NG Knit Repository Format 1'
298
# lock: is a directory
299
# inventory.weave == empty_weave
300
# empty revision-store directory
301
# empty weaves directory
302
# a 'shared-storage' marker file.
303
t = control.get_repository_transport(None)
304
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
305
t.get('format').read())
306
# XXX: no locks left when unlocked at the moment
307
# self.assertEqualDiff('', t.get('lock').read())
308
self.assertEqualDiff('', t.get('shared-storage').read())
309
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
312
def test_shared_no_tree_disk_layout(self):
313
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
314
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
315
repo.set_make_working_trees(False)
317
# format 'Bazaar-NG Knit Repository Format 1'
319
# inventory.weave == empty_weave
320
# empty revision-store directory
321
# empty weaves directory
322
# a 'shared-storage' marker file.
323
t = control.get_repository_transport(None)
324
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
325
t.get('format').read())
326
# XXX: no locks left when unlocked at the moment
327
# self.assertEqualDiff('', t.get('lock').read())
328
self.assertEqualDiff('', t.get('shared-storage').read())
329
self.assertEqualDiff('', t.get('no-working-trees').read())
330
repo.set_make_working_trees(True)
331
self.assertFalse(t.has('no-working-trees'))
332
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
336
class InterString(repository.InterRepository):
337
"""An inter-repository optimised code path for strings.
339
This is for use during testing where we use strings as repositories
340
so that none of the default regsitered inter-repository classes will
345
def is_compatible(repo_source, repo_target):
346
"""InterString is compatible with strings-as-repos."""
347
return isinstance(repo_source, str) and isinstance(repo_target, str)
350
class TestInterRepository(TestCaseWithTransport):
352
def test_get_default_inter_repository(self):
353
# test that the InterRepository.get(repo_a, repo_b) probes
354
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
355
# true and returns a default inter_repo otherwise.
356
# This also tests that the default registered optimised interrepository
357
# classes do not barf inappropriately when a surprising repository type
359
dummy_a = "Repository 1."
360
dummy_b = "Repository 2."
361
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
363
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
364
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
365
inter_repo = repository.InterRepository.get(repo_a, repo_b)
366
self.assertEqual(repository.InterRepository,
367
inter_repo.__class__)
368
self.assertEqual(repo_a, inter_repo.source)
369
self.assertEqual(repo_b, inter_repo.target)
371
def test_register_inter_repository_class(self):
372
# test that a optimised code path provider - a
373
# InterRepository subclass can be registered and unregistered
374
# and that it is correctly selected when given a repository
375
# pair that it returns true on for the is_compatible static method
377
dummy_a = "Repository 1."
378
dummy_b = "Repository 2."
379
repository.InterRepository.register_optimiser(InterString)
381
# we should get the default for something InterString returns False
383
self.assertFalse(InterString.is_compatible(dummy_a, None))
384
self.assertGetsDefaultInterRepository(dummy_a, None)
385
# and we should get an InterString for a pair it 'likes'
386
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
387
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
388
self.assertEqual(InterString, inter_repo.__class__)
389
self.assertEqual(dummy_a, inter_repo.source)
390
self.assertEqual(dummy_b, inter_repo.target)
392
repository.InterRepository.unregister_optimiser(InterString)
393
# now we should get the default InterRepository object again.
394
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
397
class TestInterWeaveRepo(TestCaseWithTransport):
399
def test_is_compatible_and_registered(self):
400
# InterWeaveRepo is compatible when either side
401
# is a format 5/6/7 branch
402
formats = [repository.RepositoryFormat5(),
403
repository.RepositoryFormat6(),
404
repository.RepositoryFormat7()]
405
incompatible_formats = [repository.RepositoryFormat4(),
406
repository.RepositoryFormatKnit1(),
408
repo_a = self.make_repository('a')
409
repo_b = self.make_repository('b')
410
is_compatible = repository.InterWeaveRepo.is_compatible
411
for source in incompatible_formats:
412
# force incompatible left then right
413
repo_a._format = source
414
repo_b._format = formats[0]
415
self.assertFalse(is_compatible(repo_a, repo_b))
416
self.assertFalse(is_compatible(repo_b, repo_a))
417
for source in formats:
418
repo_a._format = source
419
for target in formats:
420
repo_b._format = target
421
self.assertTrue(is_compatible(repo_a, repo_b))
422
self.assertEqual(repository.InterWeaveRepo,
423
repository.InterRepository.get(repo_a,
427
class TestRepositoryConverter(TestCaseWithTransport):
429
def test_convert_empty(self):
430
t = get_transport(self.get_url('.'))
431
t.mkdir('repository')
432
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
433
repo = repository.RepositoryFormat7().initialize(repo_dir)
434
target_format = repository.RepositoryFormatKnit1()
435
converter = repository.CopyConverter(target_format)
436
pb = bzrlib.ui.ui_factory.nested_progress_bar()
438
converter.convert(repo, pb)
441
repo = repo_dir.open_repository()
442
self.assertTrue(isinstance(target_format, repo._format.__class__))
445
class TestMisc(TestCase):
447
def test_unescape_xml(self):
448
"""We get some kind of error when malformed entities are passed"""
449
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
452
class TestRepositoryFormatKnit2(TestCaseWithTransport):
454
def test_convert(self):
455
"""Ensure the upgrade adds weaves for roots"""
456
format = bzrdir.BzrDirMetaFormat1()
457
format.repository_format = repository.RepositoryFormatKnit1()
458
tree = self.make_branch_and_tree('.', format)
459
tree.commit("Dull commit", rev_id="dull")
460
revision_tree = tree.branch.repository.revision_tree('dull')
461
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
462
revision_tree.inventory.root.file_id)
463
format = bzrdir.BzrDirMetaFormat1()
464
format.repository_format = repository.RepositoryFormatKnit2()
465
upgrade.Convert('.', format)
466
tree = workingtree.WorkingTree.open('.')
467
revision_tree = tree.branch.repository.revision_tree('dull')
468
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
469
tree.commit("Another dull commit", rev_id='dull2')
470
revision_tree = tree.branch.repository.revision_tree('dull2')
471
self.assertEqual('dull', revision_tree.inventory.root.revision)