1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
29
import bzrlib.bzrdir as bzrdir
30
import bzrlib.errors as errors
31
from bzrlib.errors import (NotBranchError,
34
UnsupportedFormatError,
36
import bzrlib.repository as repository
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.memory import MemoryServer
40
from bzrlib import upgrade, workingtree
43
class TestDefaultFormat(TestCase):
45
def test_get_set_default_format(self):
46
private_default = repository._default_format.__class__
47
old_format = repository.RepositoryFormat.get_default_format()
48
self.assertTrue(isinstance(old_format, private_default))
49
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
50
# creating a repository should now create an instrumented dir.
52
# the default branch format is used by the meta dir format
53
# which is not the default bzrdir format at this point
54
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
55
result = dir.create_repository()
56
self.assertEqual(result, 'A bzr repository dir')
58
repository.RepositoryFormat.set_default_format(old_format)
59
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
62
class SampleRepositoryFormat(repository.RepositoryFormat):
65
this format is initializable, unsupported to aid in testing the
66
open and open(unsupported=True) routines.
69
def get_format_string(self):
70
"""See RepositoryFormat.get_format_string()."""
71
return "Sample .bzr repository format."
73
def initialize(self, a_bzrdir, shared=False):
74
"""Initialize a repository in a BzrDir"""
75
t = a_bzrdir.get_repository_transport(self)
76
t.put_bytes('format', self.get_format_string())
77
return 'A bzr repository dir'
79
def is_supported(self):
82
def open(self, a_bzrdir, _found=False):
83
return "opened repository."
86
class TestRepositoryFormat(TestCaseWithTransport):
87
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
89
def test_find_format(self):
90
# is the right format object found for a repository?
91
# create a branch with a few known format objects.
92
# this is not quite the same as
93
self.build_tree(["foo/", "bar/"])
94
def check_format(format, url):
95
dir = format._matchingbzrdir.initialize(url)
96
format.initialize(dir)
97
t = get_transport(url)
98
found_format = repository.RepositoryFormat.find_format(dir)
99
self.failUnless(isinstance(found_format, format.__class__))
100
check_format(repository.RepositoryFormat7(), "bar")
102
def test_find_format_no_repository(self):
103
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
104
self.assertRaises(errors.NoRepositoryPresent,
105
repository.RepositoryFormat.find_format,
108
def test_find_format_unknown_format(self):
109
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
110
SampleRepositoryFormat().initialize(dir)
111
self.assertRaises(UnknownFormatError,
112
repository.RepositoryFormat.find_format,
115
def test_register_unregister_format(self):
116
format = SampleRepositoryFormat()
118
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
120
format.initialize(dir)
121
# register a format for it.
122
repository.RepositoryFormat.register_format(format)
123
# which repository.Open will refuse (not supported)
124
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
125
# but open(unsupported) will work
126
self.assertEqual(format.open(dir), "opened repository.")
127
# unregister the format
128
repository.RepositoryFormat.unregister_format(format)
131
class TestFormat6(TestCaseWithTransport):
133
def test_no_ancestry_weave(self):
134
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
135
repo = repository.RepositoryFormat6().initialize(control)
136
# We no longer need to create the ancestry.weave file
137
# since it is *never* used.
138
self.assertRaises(NoSuchFile,
139
control.transport.get,
143
class TestFormat7(TestCaseWithTransport):
145
def test_disk_layout(self):
146
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
147
repo = repository.RepositoryFormat7().initialize(control)
148
# in case of side effects of locking.
152
# format 'Bazaar-NG Repository format 7'
154
# inventory.weave == empty_weave
155
# empty revision-store directory
156
# empty weaves directory
157
t = control.get_repository_transport(None)
158
self.assertEqualDiff('Bazaar-NG Repository format 7',
159
t.get('format').read())
160
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
161
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
162
self.assertEqualDiff('# bzr weave file v5\n'
165
t.get('inventory.weave').read())
167
def test_shared_disk_layout(self):
168
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
169
repo = repository.RepositoryFormat7().initialize(control, shared=True)
171
# format 'Bazaar-NG Repository format 7'
172
# inventory.weave == empty_weave
173
# empty revision-store directory
174
# empty weaves directory
175
# a 'shared-storage' marker file.
176
# lock is not present when unlocked
177
t = control.get_repository_transport(None)
178
self.assertEqualDiff('Bazaar-NG Repository format 7',
179
t.get('format').read())
180
self.assertEqualDiff('', t.get('shared-storage').read())
181
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
182
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
183
self.assertEqualDiff('# bzr weave file v5\n'
186
t.get('inventory.weave').read())
187
self.assertFalse(t.has('branch-lock'))
189
def test_creates_lockdir(self):
190
"""Make sure it appears to be controlled by a LockDir existence"""
191
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
192
repo = repository.RepositoryFormat7().initialize(control, shared=True)
193
t = control.get_repository_transport(None)
194
# TODO: Should check there is a 'lock' toplevel directory,
195
# regardless of contents
196
self.assertFalse(t.has('lock/held/info'))
199
self.assertTrue(t.has('lock/held/info'))
201
# unlock so we don't get a warning about failing to do so
204
def test_uses_lockdir(self):
205
"""repo format 7 actually locks on lockdir"""
206
base_url = self.get_url()
207
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
208
repo = repository.RepositoryFormat7().initialize(control, shared=True)
209
t = control.get_repository_transport(None)
213
# make sure the same lock is created by opening it
214
repo = repository.Repository.open(base_url)
216
self.assertTrue(t.has('lock/held/info'))
218
self.assertFalse(t.has('lock/held/info'))
220
def test_shared_no_tree_disk_layout(self):
221
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
222
repo = repository.RepositoryFormat7().initialize(control, shared=True)
223
repo.set_make_working_trees(False)
225
# format 'Bazaar-NG Repository format 7'
227
# inventory.weave == empty_weave
228
# empty revision-store directory
229
# empty weaves directory
230
# a 'shared-storage' marker file.
231
t = control.get_repository_transport(None)
232
self.assertEqualDiff('Bazaar-NG Repository format 7',
233
t.get('format').read())
234
## self.assertEqualDiff('', t.get('lock').read())
235
self.assertEqualDiff('', t.get('shared-storage').read())
236
self.assertEqualDiff('', t.get('no-working-trees').read())
237
repo.set_make_working_trees(True)
238
self.assertFalse(t.has('no-working-trees'))
239
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
240
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
241
self.assertEqualDiff('# bzr weave file v5\n'
244
t.get('inventory.weave').read())
247
class TestFormatKnit1(TestCaseWithTransport):
249
def test_disk_layout(self):
250
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
251
repo = repository.RepositoryFormatKnit1().initialize(control)
252
# in case of side effects of locking.
256
# format 'Bazaar-NG Knit Repository Format 1'
257
# lock: is a directory
258
# inventory.weave == empty_weave
259
# empty revision-store directory
260
# empty weaves directory
261
t = control.get_repository_transport(None)
262
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
263
t.get('format').read())
264
# XXX: no locks left when unlocked at the moment
265
# self.assertEqualDiff('', t.get('lock').read())
266
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
269
def assertHasKnit(self, t, knit_name):
270
"""Assert that knit_name exists on t."""
271
self.assertEqualDiff('# bzr knit index 8\n',
272
t.get(knit_name + '.kndx').read())
274
self.assertTrue(t.has(knit_name + '.knit'))
276
def check_knits(self, t):
277
"""check knit content for a repository."""
278
self.assertHasKnit(t, 'inventory')
279
self.assertHasKnit(t, 'revisions')
280
self.assertHasKnit(t, 'signatures')
282
def test_shared_disk_layout(self):
283
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
284
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
286
# format 'Bazaar-NG Knit Repository Format 1'
287
# lock: is a directory
288
# inventory.weave == empty_weave
289
# empty revision-store directory
290
# empty weaves directory
291
# a 'shared-storage' marker file.
292
t = control.get_repository_transport(None)
293
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
294
t.get('format').read())
295
# XXX: no locks left when unlocked at the moment
296
# self.assertEqualDiff('', t.get('lock').read())
297
self.assertEqualDiff('', t.get('shared-storage').read())
298
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
301
def test_shared_no_tree_disk_layout(self):
302
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
303
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
304
repo.set_make_working_trees(False)
306
# format 'Bazaar-NG Knit Repository Format 1'
308
# inventory.weave == empty_weave
309
# empty revision-store directory
310
# empty weaves directory
311
# a 'shared-storage' marker file.
312
t = control.get_repository_transport(None)
313
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
314
t.get('format').read())
315
# XXX: no locks left when unlocked at the moment
316
# self.assertEqualDiff('', t.get('lock').read())
317
self.assertEqualDiff('', t.get('shared-storage').read())
318
self.assertEqualDiff('', t.get('no-working-trees').read())
319
repo.set_make_working_trees(True)
320
self.assertFalse(t.has('no-working-trees'))
321
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
325
class InterString(repository.InterRepository):
326
"""An inter-repository optimised code path for strings.
328
This is for use during testing where we use strings as repositories
329
so that none of the default regsitered inter-repository classes will
334
def is_compatible(repo_source, repo_target):
335
"""InterString is compatible with strings-as-repos."""
336
return isinstance(repo_source, str) and isinstance(repo_target, str)
339
class TestInterRepository(TestCaseWithTransport):
341
def test_get_default_inter_repository(self):
342
# test that the InterRepository.get(repo_a, repo_b) probes
343
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
344
# true and returns a default inter_repo otherwise.
345
# This also tests that the default registered optimised interrepository
346
# classes do not barf inappropriately when a surprising repository type
348
dummy_a = "Repository 1."
349
dummy_b = "Repository 2."
350
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
352
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
353
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
354
inter_repo = repository.InterRepository.get(repo_a, repo_b)
355
self.assertEqual(repository.InterRepository,
356
inter_repo.__class__)
357
self.assertEqual(repo_a, inter_repo.source)
358
self.assertEqual(repo_b, inter_repo.target)
360
def test_register_inter_repository_class(self):
361
# test that a optimised code path provider - a
362
# InterRepository subclass can be registered and unregistered
363
# and that it is correctly selected when given a repository
364
# pair that it returns true on for the is_compatible static method
366
dummy_a = "Repository 1."
367
dummy_b = "Repository 2."
368
repository.InterRepository.register_optimiser(InterString)
370
# we should get the default for something InterString returns False
372
self.assertFalse(InterString.is_compatible(dummy_a, None))
373
self.assertGetsDefaultInterRepository(dummy_a, None)
374
# and we should get an InterString for a pair it 'likes'
375
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
376
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
377
self.assertEqual(InterString, inter_repo.__class__)
378
self.assertEqual(dummy_a, inter_repo.source)
379
self.assertEqual(dummy_b, inter_repo.target)
381
repository.InterRepository.unregister_optimiser(InterString)
382
# now we should get the default InterRepository object again.
383
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
386
class TestInterWeaveRepo(TestCaseWithTransport):
388
def test_is_compatible_and_registered(self):
389
# InterWeaveRepo is compatible when either side
390
# is a format 5/6/7 branch
391
formats = [repository.RepositoryFormat5(),
392
repository.RepositoryFormat6(),
393
repository.RepositoryFormat7()]
394
incompatible_formats = [repository.RepositoryFormat4(),
395
repository.RepositoryFormatKnit1(),
397
repo_a = self.make_repository('a')
398
repo_b = self.make_repository('b')
399
is_compatible = repository.InterWeaveRepo.is_compatible
400
for source in incompatible_formats:
401
# force incompatible left then right
402
repo_a._format = source
403
repo_b._format = formats[0]
404
self.assertFalse(is_compatible(repo_a, repo_b))
405
self.assertFalse(is_compatible(repo_b, repo_a))
406
for source in formats:
407
repo_a._format = source
408
for target in formats:
409
repo_b._format = target
410
self.assertTrue(is_compatible(repo_a, repo_b))
411
self.assertEqual(repository.InterWeaveRepo,
412
repository.InterRepository.get(repo_a,
416
class TestRepositoryConverter(TestCaseWithTransport):
418
def test_convert_empty(self):
419
t = get_transport(self.get_url('.'))
420
t.mkdir('repository')
421
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
422
repo = repository.RepositoryFormat7().initialize(repo_dir)
423
target_format = repository.RepositoryFormatKnit1()
424
converter = repository.CopyConverter(target_format)
425
pb = bzrlib.ui.ui_factory.nested_progress_bar()
427
converter.convert(repo, pb)
430
repo = repo_dir.open_repository()
431
self.assertTrue(isinstance(target_format, repo._format.__class__))
434
class TestMisc(TestCase):
436
def test_unescape_xml(self):
437
"""We get some kind of error when malformed entities are passed"""
438
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
441
class TestRepositoryFormatKnit2(TestCaseWithTransport):
443
def test_convert(self):
444
"""Ensure the upgrade adds weaves for roots"""
445
format = bzrdir.BzrDirMetaFormat1()
446
format.repository_format = repository.RepositoryFormatKnit1()
447
tree = self.make_branch_and_tree('.', format)
448
tree.commit("Dull commit", rev_id="dull")
449
revision_tree = tree.branch.repository.revision_tree('dull')
450
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
451
revision_tree.inventory.root.file_id)
452
format = bzrdir.BzrDirMetaFormat1()
453
format.repository_format = repository.RepositoryFormatKnit2()
454
upgrade.Convert('.', format)
455
tree = workingtree.WorkingTree.open('.')
456
revision_tree = tree.branch.repository.revision_tree('dull')
457
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
458
tree.commit("Another dull commit", rev_id='dull2')
459
revision_tree = tree.branch.repository.revision_tree('dull2')
460
self.assertEqual('dull', revision_tree.inventory.root.revision)