1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
import bzrlib.repository as repository
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib import upgrade, workingtree
44
class TestDefaultFormat(TestCase):
46
def test_get_set_default_format(self):
47
private_default = repository._default_format.__class__
48
old_format = repository.RepositoryFormat.get_default_format()
49
self.assertTrue(isinstance(old_format, private_default))
50
self.applyDeprecated(symbol_versioning.zero_fourteen,
51
repository.RepositoryFormat.set_default_format,
52
SampleRepositoryFormat())
53
# creating a repository should now create an instrumented dir.
55
# the default branch format is used by the meta dir format
56
# which is not the default bzrdir format at this point
57
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
58
result = dir.create_repository()
59
self.assertEqual(result, 'A bzr repository dir')
61
self.applyDeprecated(symbol_versioning.zero_fourteen,
62
repository.RepositoryFormat.set_default_format, old_format)
63
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
66
class SampleRepositoryFormat(repository.RepositoryFormat):
69
this format is initializable, unsupported to aid in testing the
70
open and open(unsupported=True) routines.
73
def get_format_string(self):
74
"""See RepositoryFormat.get_format_string()."""
75
return "Sample .bzr repository format."
77
def initialize(self, a_bzrdir, shared=False):
78
"""Initialize a repository in a BzrDir"""
79
t = a_bzrdir.get_repository_transport(self)
80
t.put_bytes('format', self.get_format_string())
81
return 'A bzr repository dir'
83
def is_supported(self):
86
def open(self, a_bzrdir, _found=False):
87
return "opened repository."
90
class TestRepositoryFormat(TestCaseWithTransport):
91
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
93
def test_find_format(self):
94
# is the right format object found for a repository?
95
# create a branch with a few known format objects.
96
# this is not quite the same as
97
self.build_tree(["foo/", "bar/"])
98
def check_format(format, url):
99
dir = format._matchingbzrdir.initialize(url)
100
format.initialize(dir)
101
t = get_transport(url)
102
found_format = repository.RepositoryFormat.find_format(dir)
103
self.failUnless(isinstance(found_format, format.__class__))
104
check_format(repository.RepositoryFormat7(), "bar")
106
def test_find_format_no_repository(self):
107
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
108
self.assertRaises(errors.NoRepositoryPresent,
109
repository.RepositoryFormat.find_format,
112
def test_find_format_unknown_format(self):
113
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
114
SampleRepositoryFormat().initialize(dir)
115
self.assertRaises(UnknownFormatError,
116
repository.RepositoryFormat.find_format,
119
def test_register_unregister_format(self):
120
format = SampleRepositoryFormat()
122
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
124
format.initialize(dir)
125
# register a format for it.
126
repository.RepositoryFormat.register_format(format)
127
# which repository.Open will refuse (not supported)
128
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
129
# but open(unsupported) will work
130
self.assertEqual(format.open(dir), "opened repository.")
131
# unregister the format
132
repository.RepositoryFormat.unregister_format(format)
135
class TestFormat6(TestCaseWithTransport):
137
def test_no_ancestry_weave(self):
138
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
139
repo = repository.RepositoryFormat6().initialize(control)
140
# We no longer need to create the ancestry.weave file
141
# since it is *never* used.
142
self.assertRaises(NoSuchFile,
143
control.transport.get,
147
class TestFormat7(TestCaseWithTransport):
149
def test_disk_layout(self):
150
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
151
repo = repository.RepositoryFormat7().initialize(control)
152
# in case of side effects of locking.
156
# format 'Bazaar-NG Repository format 7'
158
# inventory.weave == empty_weave
159
# empty revision-store directory
160
# empty weaves directory
161
t = control.get_repository_transport(None)
162
self.assertEqualDiff('Bazaar-NG Repository format 7',
163
t.get('format').read())
164
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
165
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
166
self.assertEqualDiff('# bzr weave file v5\n'
169
t.get('inventory.weave').read())
171
def test_shared_disk_layout(self):
172
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
173
repo = repository.RepositoryFormat7().initialize(control, shared=True)
175
# format 'Bazaar-NG Repository format 7'
176
# inventory.weave == empty_weave
177
# empty revision-store directory
178
# empty weaves directory
179
# a 'shared-storage' marker file.
180
# lock is not present when unlocked
181
t = control.get_repository_transport(None)
182
self.assertEqualDiff('Bazaar-NG Repository format 7',
183
t.get('format').read())
184
self.assertEqualDiff('', t.get('shared-storage').read())
185
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
186
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
187
self.assertEqualDiff('# bzr weave file v5\n'
190
t.get('inventory.weave').read())
191
self.assertFalse(t.has('branch-lock'))
193
def test_creates_lockdir(self):
194
"""Make sure it appears to be controlled by a LockDir existence"""
195
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
196
repo = repository.RepositoryFormat7().initialize(control, shared=True)
197
t = control.get_repository_transport(None)
198
# TODO: Should check there is a 'lock' toplevel directory,
199
# regardless of contents
200
self.assertFalse(t.has('lock/held/info'))
203
self.assertTrue(t.has('lock/held/info'))
205
# unlock so we don't get a warning about failing to do so
208
def test_uses_lockdir(self):
209
"""repo format 7 actually locks on lockdir"""
210
base_url = self.get_url()
211
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
212
repo = repository.RepositoryFormat7().initialize(control, shared=True)
213
t = control.get_repository_transport(None)
217
# make sure the same lock is created by opening it
218
repo = repository.Repository.open(base_url)
220
self.assertTrue(t.has('lock/held/info'))
222
self.assertFalse(t.has('lock/held/info'))
224
def test_shared_no_tree_disk_layout(self):
225
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
226
repo = repository.RepositoryFormat7().initialize(control, shared=True)
227
repo.set_make_working_trees(False)
229
# format 'Bazaar-NG Repository format 7'
231
# inventory.weave == empty_weave
232
# empty revision-store directory
233
# empty weaves directory
234
# a 'shared-storage' marker file.
235
t = control.get_repository_transport(None)
236
self.assertEqualDiff('Bazaar-NG Repository format 7',
237
t.get('format').read())
238
## self.assertEqualDiff('', t.get('lock').read())
239
self.assertEqualDiff('', t.get('shared-storage').read())
240
self.assertEqualDiff('', t.get('no-working-trees').read())
241
repo.set_make_working_trees(True)
242
self.assertFalse(t.has('no-working-trees'))
243
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
244
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
245
self.assertEqualDiff('# bzr weave file v5\n'
248
t.get('inventory.weave').read())
251
class TestFormatKnit1(TestCaseWithTransport):
253
def test_disk_layout(self):
254
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
255
repo = repository.RepositoryFormatKnit1().initialize(control)
256
# in case of side effects of locking.
260
# format 'Bazaar-NG Knit Repository Format 1'
261
# lock: is a directory
262
# inventory.weave == empty_weave
263
# empty revision-store directory
264
# empty weaves directory
265
t = control.get_repository_transport(None)
266
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
267
t.get('format').read())
268
# XXX: no locks left when unlocked at the moment
269
# self.assertEqualDiff('', t.get('lock').read())
270
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
273
def assertHasKnit(self, t, knit_name):
274
"""Assert that knit_name exists on t."""
275
self.assertEqualDiff('# bzr knit index 8\n',
276
t.get(knit_name + '.kndx').read())
278
self.assertTrue(t.has(knit_name + '.knit'))
280
def check_knits(self, t):
281
"""check knit content for a repository."""
282
self.assertHasKnit(t, 'inventory')
283
self.assertHasKnit(t, 'revisions')
284
self.assertHasKnit(t, 'signatures')
286
def test_shared_disk_layout(self):
287
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
288
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
290
# format 'Bazaar-NG Knit Repository Format 1'
291
# lock: is a directory
292
# inventory.weave == empty_weave
293
# empty revision-store directory
294
# empty weaves directory
295
# a 'shared-storage' marker file.
296
t = control.get_repository_transport(None)
297
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
298
t.get('format').read())
299
# XXX: no locks left when unlocked at the moment
300
# self.assertEqualDiff('', t.get('lock').read())
301
self.assertEqualDiff('', t.get('shared-storage').read())
302
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
305
def test_shared_no_tree_disk_layout(self):
306
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
307
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
308
repo.set_make_working_trees(False)
310
# format 'Bazaar-NG Knit Repository Format 1'
312
# inventory.weave == empty_weave
313
# empty revision-store directory
314
# empty weaves directory
315
# a 'shared-storage' marker file.
316
t = control.get_repository_transport(None)
317
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
318
t.get('format').read())
319
# XXX: no locks left when unlocked at the moment
320
# self.assertEqualDiff('', t.get('lock').read())
321
self.assertEqualDiff('', t.get('shared-storage').read())
322
self.assertEqualDiff('', t.get('no-working-trees').read())
323
repo.set_make_working_trees(True)
324
self.assertFalse(t.has('no-working-trees'))
325
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
329
class InterString(repository.InterRepository):
330
"""An inter-repository optimised code path for strings.
332
This is for use during testing where we use strings as repositories
333
so that none of the default regsitered inter-repository classes will
338
def is_compatible(repo_source, repo_target):
339
"""InterString is compatible with strings-as-repos."""
340
return isinstance(repo_source, str) and isinstance(repo_target, str)
343
class TestInterRepository(TestCaseWithTransport):
345
def test_get_default_inter_repository(self):
346
# test that the InterRepository.get(repo_a, repo_b) probes
347
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
348
# true and returns a default inter_repo otherwise.
349
# This also tests that the default registered optimised interrepository
350
# classes do not barf inappropriately when a surprising repository type
352
dummy_a = "Repository 1."
353
dummy_b = "Repository 2."
354
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
356
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
357
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
358
inter_repo = repository.InterRepository.get(repo_a, repo_b)
359
self.assertEqual(repository.InterRepository,
360
inter_repo.__class__)
361
self.assertEqual(repo_a, inter_repo.source)
362
self.assertEqual(repo_b, inter_repo.target)
364
def test_register_inter_repository_class(self):
365
# test that a optimised code path provider - a
366
# InterRepository subclass can be registered and unregistered
367
# and that it is correctly selected when given a repository
368
# pair that it returns true on for the is_compatible static method
370
dummy_a = "Repository 1."
371
dummy_b = "Repository 2."
372
repository.InterRepository.register_optimiser(InterString)
374
# we should get the default for something InterString returns False
376
self.assertFalse(InterString.is_compatible(dummy_a, None))
377
self.assertGetsDefaultInterRepository(dummy_a, None)
378
# and we should get an InterString for a pair it 'likes'
379
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
380
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
381
self.assertEqual(InterString, inter_repo.__class__)
382
self.assertEqual(dummy_a, inter_repo.source)
383
self.assertEqual(dummy_b, inter_repo.target)
385
repository.InterRepository.unregister_optimiser(InterString)
386
# now we should get the default InterRepository object again.
387
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
390
class TestInterWeaveRepo(TestCaseWithTransport):
392
def test_is_compatible_and_registered(self):
393
# InterWeaveRepo is compatible when either side
394
# is a format 5/6/7 branch
395
formats = [repository.RepositoryFormat5(),
396
repository.RepositoryFormat6(),
397
repository.RepositoryFormat7()]
398
incompatible_formats = [repository.RepositoryFormat4(),
399
repository.RepositoryFormatKnit1(),
401
repo_a = self.make_repository('a')
402
repo_b = self.make_repository('b')
403
is_compatible = repository.InterWeaveRepo.is_compatible
404
for source in incompatible_formats:
405
# force incompatible left then right
406
repo_a._format = source
407
repo_b._format = formats[0]
408
self.assertFalse(is_compatible(repo_a, repo_b))
409
self.assertFalse(is_compatible(repo_b, repo_a))
410
for source in formats:
411
repo_a._format = source
412
for target in formats:
413
repo_b._format = target
414
self.assertTrue(is_compatible(repo_a, repo_b))
415
self.assertEqual(repository.InterWeaveRepo,
416
repository.InterRepository.get(repo_a,
420
class TestRepositoryConverter(TestCaseWithTransport):
422
def test_convert_empty(self):
423
t = get_transport(self.get_url('.'))
424
t.mkdir('repository')
425
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
426
repo = repository.RepositoryFormat7().initialize(repo_dir)
427
target_format = repository.RepositoryFormatKnit1()
428
converter = repository.CopyConverter(target_format)
429
pb = bzrlib.ui.ui_factory.nested_progress_bar()
431
converter.convert(repo, pb)
434
repo = repo_dir.open_repository()
435
self.assertTrue(isinstance(target_format, repo._format.__class__))
438
class TestMisc(TestCase):
440
def test_unescape_xml(self):
441
"""We get some kind of error when malformed entities are passed"""
442
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
445
class TestRepositoryFormatKnit2(TestCaseWithTransport):
447
def test_convert(self):
448
"""Ensure the upgrade adds weaves for roots"""
449
format = bzrdir.BzrDirMetaFormat1()
450
format.repository_format = repository.RepositoryFormatKnit1()
451
tree = self.make_branch_and_tree('.', format)
452
tree.commit("Dull commit", rev_id="dull")
453
revision_tree = tree.branch.repository.revision_tree('dull')
454
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
455
revision_tree.inventory.root.file_id)
456
format = bzrdir.BzrDirMetaFormat1()
457
format.repository_format = repository.RepositoryFormatKnit2()
458
upgrade.Convert('.', format)
459
tree = workingtree.WorkingTree.open('.')
460
revision_tree = tree.branch.repository.revision_tree('dull')
461
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
462
tree.commit("Another dull commit", rev_id='dull2')
463
revision_tree = tree.branch.repository.revision_tree('dull2')
464
self.assertEqual('dull', revision_tree.inventory.root.revision)