1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
29
import bzrlib.bzrdir as bzrdir
30
import bzrlib.errors as errors
31
from bzrlib.errors import (NotBranchError,
34
UnsupportedFormatError,
36
import bzrlib.repository as repository
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib import upgrade, workingtree
44
class TestDefaultFormat(TestCase):
46
def test_get_set_default_format(self):
47
private_default = repository._default_format.__class__
48
old_format = repository.RepositoryFormat.get_default_format()
49
self.assertTrue(isinstance(old_format, private_default))
50
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
51
# creating a repository should now create an instrumented dir.
53
# the default branch format is used by the meta dir format
54
# which is not the default bzrdir format at this point
55
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
56
result = dir.create_repository()
57
self.assertEqual(result, 'A bzr repository dir')
59
repository.RepositoryFormat.set_default_format(old_format)
60
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
63
class SampleRepositoryFormat(repository.RepositoryFormat):
66
this format is initializable, unsupported to aid in testing the
67
open and open(unsupported=True) routines.
70
def get_format_string(self):
71
"""See RepositoryFormat.get_format_string()."""
72
return "Sample .bzr repository format."
74
def initialize(self, a_bzrdir, shared=False):
75
"""Initialize a repository in a BzrDir"""
76
t = a_bzrdir.get_repository_transport(self)
77
t.put_bytes('format', self.get_format_string())
78
return 'A bzr repository dir'
80
def is_supported(self):
83
def open(self, a_bzrdir, _found=False):
84
return "opened repository."
87
class TestRepositoryFormat(TestCaseWithTransport):
88
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
90
def test_find_format(self):
91
# is the right format object found for a repository?
92
# create a branch with a few known format objects.
93
# this is not quite the same as
94
self.build_tree(["foo/", "bar/"])
95
def check_format(format, url):
96
dir = format._matchingbzrdir.initialize(url)
97
format.initialize(dir)
98
t = get_transport(url)
99
found_format = repository.RepositoryFormat.find_format(dir)
100
self.failUnless(isinstance(found_format, format.__class__))
101
check_format(repository.RepositoryFormat7(), "bar")
103
def test_find_format_no_repository(self):
104
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
105
self.assertRaises(errors.NoRepositoryPresent,
106
repository.RepositoryFormat.find_format,
109
def test_find_format_unknown_format(self):
110
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
111
SampleRepositoryFormat().initialize(dir)
112
self.assertRaises(UnknownFormatError,
113
repository.RepositoryFormat.find_format,
116
def test_register_unregister_format(self):
117
format = SampleRepositoryFormat()
119
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
121
format.initialize(dir)
122
# register a format for it.
123
repository.RepositoryFormat.register_format(format)
124
# which repository.Open will refuse (not supported)
125
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
126
# but open(unsupported) will work
127
self.assertEqual(format.open(dir), "opened repository.")
128
# unregister the format
129
repository.RepositoryFormat.unregister_format(format)
132
class TestFormat6(TestCaseWithTransport):
134
def test_no_ancestry_weave(self):
135
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
136
repo = repository.RepositoryFormat6().initialize(control)
137
# We no longer need to create the ancestry.weave file
138
# since it is *never* used.
139
self.assertRaises(NoSuchFile,
140
control.transport.get,
144
class TestFormat7(TestCaseWithTransport):
146
def test_disk_layout(self):
147
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
148
repo = repository.RepositoryFormat7().initialize(control)
149
# in case of side effects of locking.
153
# format 'Bazaar-NG Repository format 7'
155
# inventory.weave == empty_weave
156
# empty revision-store directory
157
# empty weaves directory
158
t = control.get_repository_transport(None)
159
self.assertEqualDiff('Bazaar-NG Repository format 7',
160
t.get('format').read())
161
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
162
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
163
self.assertEqualDiff('# bzr weave file v5\n'
166
t.get('inventory.weave').read())
168
def test_shared_disk_layout(self):
169
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
170
repo = repository.RepositoryFormat7().initialize(control, shared=True)
172
# format 'Bazaar-NG Repository format 7'
173
# inventory.weave == empty_weave
174
# empty revision-store directory
175
# empty weaves directory
176
# a 'shared-storage' marker file.
177
# lock is not present when unlocked
178
t = control.get_repository_transport(None)
179
self.assertEqualDiff('Bazaar-NG Repository format 7',
180
t.get('format').read())
181
self.assertEqualDiff('', t.get('shared-storage').read())
182
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
183
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
184
self.assertEqualDiff('# bzr weave file v5\n'
187
t.get('inventory.weave').read())
188
self.assertFalse(t.has('branch-lock'))
190
def test_creates_lockdir(self):
191
"""Make sure it appears to be controlled by a LockDir existence"""
192
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
193
repo = repository.RepositoryFormat7().initialize(control, shared=True)
194
t = control.get_repository_transport(None)
195
# TODO: Should check there is a 'lock' toplevel directory,
196
# regardless of contents
197
self.assertFalse(t.has('lock/held/info'))
200
self.assertTrue(t.has('lock/held/info'))
202
# unlock so we don't get a warning about failing to do so
205
def test_uses_lockdir(self):
206
"""repo format 7 actually locks on lockdir"""
207
base_url = self.get_url()
208
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
209
repo = repository.RepositoryFormat7().initialize(control, shared=True)
210
t = control.get_repository_transport(None)
214
# make sure the same lock is created by opening it
215
repo = repository.Repository.open(base_url)
217
self.assertTrue(t.has('lock/held/info'))
219
self.assertFalse(t.has('lock/held/info'))
221
def test_shared_no_tree_disk_layout(self):
222
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
223
repo = repository.RepositoryFormat7().initialize(control, shared=True)
224
repo.set_make_working_trees(False)
226
# format 'Bazaar-NG Repository format 7'
228
# inventory.weave == empty_weave
229
# empty revision-store directory
230
# empty weaves directory
231
# a 'shared-storage' marker file.
232
t = control.get_repository_transport(None)
233
self.assertEqualDiff('Bazaar-NG Repository format 7',
234
t.get('format').read())
235
## self.assertEqualDiff('', t.get('lock').read())
236
self.assertEqualDiff('', t.get('shared-storage').read())
237
self.assertEqualDiff('', t.get('no-working-trees').read())
238
repo.set_make_working_trees(True)
239
self.assertFalse(t.has('no-working-trees'))
240
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
241
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
242
self.assertEqualDiff('# bzr weave file v5\n'
245
t.get('inventory.weave').read())
248
class TestFormatKnit1(TestCaseWithTransport):
250
def test_disk_layout(self):
251
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
252
repo = repository.RepositoryFormatKnit1().initialize(control)
253
# in case of side effects of locking.
257
# format 'Bazaar-NG Knit Repository Format 1'
258
# lock: is a directory
259
# inventory.weave == empty_weave
260
# empty revision-store directory
261
# empty weaves directory
262
t = control.get_repository_transport(None)
263
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
264
t.get('format').read())
265
# XXX: no locks left when unlocked at the moment
266
# self.assertEqualDiff('', t.get('lock').read())
267
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
270
def assertHasKnit(self, t, knit_name):
271
"""Assert that knit_name exists on t."""
272
self.assertEqualDiff('# bzr knit index 8\n',
273
t.get(knit_name + '.kndx').read())
275
self.assertTrue(t.has(knit_name + '.knit'))
277
def check_knits(self, t):
278
"""check knit content for a repository."""
279
self.assertHasKnit(t, 'inventory')
280
self.assertHasKnit(t, 'revisions')
281
self.assertHasKnit(t, 'signatures')
283
def test_shared_disk_layout(self):
284
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
285
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
287
# format 'Bazaar-NG Knit Repository Format 1'
288
# lock: is a directory
289
# inventory.weave == empty_weave
290
# empty revision-store directory
291
# empty weaves directory
292
# a 'shared-storage' marker file.
293
t = control.get_repository_transport(None)
294
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
295
t.get('format').read())
296
# XXX: no locks left when unlocked at the moment
297
# self.assertEqualDiff('', t.get('lock').read())
298
self.assertEqualDiff('', t.get('shared-storage').read())
299
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
302
def test_shared_no_tree_disk_layout(self):
303
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
304
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
305
repo.set_make_working_trees(False)
307
# format 'Bazaar-NG Knit Repository Format 1'
309
# inventory.weave == empty_weave
310
# empty revision-store directory
311
# empty weaves directory
312
# a 'shared-storage' marker file.
313
t = control.get_repository_transport(None)
314
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
315
t.get('format').read())
316
# XXX: no locks left when unlocked at the moment
317
# self.assertEqualDiff('', t.get('lock').read())
318
self.assertEqualDiff('', t.get('shared-storage').read())
319
self.assertEqualDiff('', t.get('no-working-trees').read())
320
repo.set_make_working_trees(True)
321
self.assertFalse(t.has('no-working-trees'))
322
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
326
class InterString(repository.InterRepository):
327
"""An inter-repository optimised code path for strings.
329
This is for use during testing where we use strings as repositories
330
so that none of the default regsitered inter-repository classes will
335
def is_compatible(repo_source, repo_target):
336
"""InterString is compatible with strings-as-repos."""
337
return isinstance(repo_source, str) and isinstance(repo_target, str)
340
class TestInterRepository(TestCaseWithTransport):
342
def test_get_default_inter_repository(self):
343
# test that the InterRepository.get(repo_a, repo_b) probes
344
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
345
# true and returns a default inter_repo otherwise.
346
# This also tests that the default registered optimised interrepository
347
# classes do not barf inappropriately when a surprising repository type
349
dummy_a = "Repository 1."
350
dummy_b = "Repository 2."
351
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
353
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
354
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
355
inter_repo = repository.InterRepository.get(repo_a, repo_b)
356
self.assertEqual(repository.InterRepository,
357
inter_repo.__class__)
358
self.assertEqual(repo_a, inter_repo.source)
359
self.assertEqual(repo_b, inter_repo.target)
361
def test_register_inter_repository_class(self):
362
# test that a optimised code path provider - a
363
# InterRepository subclass can be registered and unregistered
364
# and that it is correctly selected when given a repository
365
# pair that it returns true on for the is_compatible static method
367
dummy_a = "Repository 1."
368
dummy_b = "Repository 2."
369
repository.InterRepository.register_optimiser(InterString)
371
# we should get the default for something InterString returns False
373
self.assertFalse(InterString.is_compatible(dummy_a, None))
374
self.assertGetsDefaultInterRepository(dummy_a, None)
375
# and we should get an InterString for a pair it 'likes'
376
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
377
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
378
self.assertEqual(InterString, inter_repo.__class__)
379
self.assertEqual(dummy_a, inter_repo.source)
380
self.assertEqual(dummy_b, inter_repo.target)
382
repository.InterRepository.unregister_optimiser(InterString)
383
# now we should get the default InterRepository object again.
384
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
387
class TestInterWeaveRepo(TestCaseWithTransport):
389
def test_is_compatible_and_registered(self):
390
# InterWeaveRepo is compatible when either side
391
# is a format 5/6/7 branch
392
formats = [repository.RepositoryFormat5(),
393
repository.RepositoryFormat6(),
394
repository.RepositoryFormat7()]
395
incompatible_formats = [repository.RepositoryFormat4(),
396
repository.RepositoryFormatKnit1(),
398
repo_a = self.make_repository('a')
399
repo_b = self.make_repository('b')
400
is_compatible = repository.InterWeaveRepo.is_compatible
401
for source in incompatible_formats:
402
# force incompatible left then right
403
repo_a._format = source
404
repo_b._format = formats[0]
405
self.assertFalse(is_compatible(repo_a, repo_b))
406
self.assertFalse(is_compatible(repo_b, repo_a))
407
for source in formats:
408
repo_a._format = source
409
for target in formats:
410
repo_b._format = target
411
self.assertTrue(is_compatible(repo_a, repo_b))
412
self.assertEqual(repository.InterWeaveRepo,
413
repository.InterRepository.get(repo_a,
417
class TestRepositoryConverter(TestCaseWithTransport):
419
def test_convert_empty(self):
420
t = get_transport(self.get_url('.'))
421
t.mkdir('repository')
422
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
423
repo = repository.RepositoryFormat7().initialize(repo_dir)
424
target_format = repository.RepositoryFormatKnit1()
425
converter = repository.CopyConverter(target_format)
426
pb = bzrlib.ui.ui_factory.nested_progress_bar()
428
converter.convert(repo, pb)
431
repo = repo_dir.open_repository()
432
self.assertTrue(isinstance(target_format, repo._format.__class__))
435
class TestMisc(TestCase):
437
def test_unescape_xml(self):
438
"""We get some kind of error when malformed entities are passed"""
439
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
442
class TestRepositoryFormatKnit2(TestCaseWithTransport):
444
def test_convert(self):
445
"""Ensure the upgrade adds weaves for roots"""
446
format = bzrdir.BzrDirMetaFormat1()
447
format.repository_format = repository.RepositoryFormatKnit1()
448
tree = self.make_branch_and_tree('.', format)
449
tree.commit("Dull commit", rev_id="dull")
450
revision_tree = tree.branch.repository.revision_tree('dull')
451
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
452
revision_tree.inventory.root.file_id)
453
format = bzrdir.BzrDirMetaFormat1()
454
format.repository_format = repository.RepositoryFormatKnit2()
455
upgrade.Convert('.', format)
456
tree = workingtree.WorkingTree.open('.')
457
revision_tree = tree.branch.repository.revision_tree('dull')
458
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
459
tree.commit("Another dull commit", rev_id='dull2')
460
revision_tree = tree.branch.repository.revision_tree('dull2')
461
self.assertEqual('dull', revision_tree.inventory.root.revision)