1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
26
from StringIO import StringIO
29
import bzrlib.bzrdir as bzrdir
30
import bzrlib.errors as errors
31
from bzrlib.errors import (NotBranchError,
34
UnsupportedFormatError,
36
import bzrlib.repository as repository
37
from bzrlib.tests import TestCase, TestCaseWithTransport
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
43
class TestDefaultFormat(TestCase):
45
def test_get_set_default_format(self):
46
old_format = repository.RepositoryFormat.get_default_format()
47
self.assertTrue(isinstance(old_format, repository.RepositoryFormatKnit1))
48
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
49
# creating a repository should now create an instrumented dir.
51
# the default branch format is used by the meta dir format
52
# which is not the default bzrdir format at this point
53
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
54
result = dir.create_repository()
55
self.assertEqual(result, 'A bzr repository dir')
57
repository.RepositoryFormat.set_default_format(old_format)
58
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
61
class SampleRepositoryFormat(repository.RepositoryFormat):
64
this format is initializable, unsupported to aid in testing the
65
open and open(unsupported=True) routines.
68
def get_format_string(self):
69
"""See RepositoryFormat.get_format_string()."""
70
return "Sample .bzr repository format."
72
def initialize(self, a_bzrdir, shared=False):
73
"""Initialize a repository in a BzrDir"""
74
t = a_bzrdir.get_repository_transport(self)
75
t.put('format', StringIO(self.get_format_string()))
76
return 'A bzr repository dir'
78
def is_supported(self):
81
def open(self, a_bzrdir, _found=False):
82
return "opened repository."
85
class TestRepositoryFormat(TestCaseWithTransport):
86
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
88
def test_find_format(self):
89
# is the right format object found for a repository?
90
# create a branch with a few known format objects.
91
# this is not quite the same as
92
self.build_tree(["foo/", "bar/"])
93
def check_format(format, url):
94
dir = format._matchingbzrdir.initialize(url)
95
format.initialize(dir)
96
t = get_transport(url)
97
found_format = repository.RepositoryFormat.find_format(dir)
98
self.failUnless(isinstance(found_format, format.__class__))
99
check_format(repository.RepositoryFormat7(), "bar")
101
def test_find_format_no_repository(self):
102
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
103
self.assertRaises(errors.NoRepositoryPresent,
104
repository.RepositoryFormat.find_format,
107
def test_find_format_unknown_format(self):
108
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
109
SampleRepositoryFormat().initialize(dir)
110
self.assertRaises(UnknownFormatError,
111
repository.RepositoryFormat.find_format,
114
def test_register_unregister_format(self):
115
format = SampleRepositoryFormat()
117
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
119
format.initialize(dir)
120
# register a format for it.
121
repository.RepositoryFormat.register_format(format)
122
# which repository.Open will refuse (not supported)
123
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
124
# but open(unsupported) will work
125
self.assertEqual(format.open(dir), "opened repository.")
126
# unregister the format
127
repository.RepositoryFormat.unregister_format(format)
130
class TestFormat6(TestCaseWithTransport):
132
def test_no_ancestry_weave(self):
133
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
134
repo = repository.RepositoryFormat6().initialize(control)
135
# We no longer need to create the ancestry.weave file
136
# since it is *never* used.
137
self.assertRaises(NoSuchFile,
138
control.transport.get,
142
class TestFormat7(TestCaseWithTransport):
144
def test_disk_layout(self):
145
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
146
repo = repository.RepositoryFormat7().initialize(control)
147
# in case of side effects of locking.
151
# format 'Bazaar-NG Repository format 7'
153
# inventory.weave == empty_weave
154
# empty revision-store directory
155
# empty weaves directory
156
t = control.get_repository_transport(None)
157
self.assertEqualDiff('Bazaar-NG Repository format 7',
158
t.get('format').read())
159
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
160
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
161
self.assertEqualDiff('# bzr weave file v5\n'
164
t.get('inventory.weave').read())
166
def test_shared_disk_layout(self):
167
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
168
repo = repository.RepositoryFormat7().initialize(control, shared=True)
170
# format 'Bazaar-NG Repository format 7'
171
# inventory.weave == empty_weave
172
# empty revision-store directory
173
# empty weaves directory
174
# a 'shared-storage' marker file.
175
# lock is not present when unlocked
176
t = control.get_repository_transport(None)
177
self.assertEqualDiff('Bazaar-NG Repository format 7',
178
t.get('format').read())
179
self.assertEqualDiff('', t.get('shared-storage').read())
180
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
181
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
182
self.assertEqualDiff('# bzr weave file v5\n'
185
t.get('inventory.weave').read())
186
self.assertFalse(t.has('branch-lock'))
188
def test_creates_lockdir(self):
189
"""Make sure it appears to be controlled by a LockDir existence"""
190
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
191
repo = repository.RepositoryFormat7().initialize(control, shared=True)
192
t = control.get_repository_transport(None)
193
# TODO: Should check there is a 'lock' toplevel directory,
194
# regardless of contents
195
self.assertFalse(t.has('lock/held/info'))
198
self.assertTrue(t.has('lock/held/info'))
200
# unlock so we don't get a warning about failing to do so
203
def test_uses_lockdir(self):
204
"""repo format 7 actually locks on lockdir"""
205
base_url = self.get_url()
206
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
207
repo = repository.RepositoryFormat7().initialize(control, shared=True)
208
t = control.get_repository_transport(None)
212
# make sure the same lock is created by opening it
213
repo = repository.Repository.open(base_url)
215
self.assertTrue(t.has('lock/held/info'))
217
self.assertFalse(t.has('lock/held/info'))
219
def test_shared_no_tree_disk_layout(self):
220
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
221
repo = repository.RepositoryFormat7().initialize(control, shared=True)
222
repo.set_make_working_trees(False)
224
# format 'Bazaar-NG Repository format 7'
226
# inventory.weave == empty_weave
227
# empty revision-store directory
228
# empty weaves directory
229
# a 'shared-storage' marker file.
230
t = control.get_repository_transport(None)
231
self.assertEqualDiff('Bazaar-NG Repository format 7',
232
t.get('format').read())
233
## self.assertEqualDiff('', t.get('lock').read())
234
self.assertEqualDiff('', t.get('shared-storage').read())
235
self.assertEqualDiff('', t.get('no-working-trees').read())
236
repo.set_make_working_trees(True)
237
self.assertFalse(t.has('no-working-trees'))
238
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
239
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
240
self.assertEqualDiff('# bzr weave file v5\n'
243
t.get('inventory.weave').read())
246
class TestFormatKnit1(TestCaseWithTransport):
248
def test_disk_layout(self):
249
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
250
repo = repository.RepositoryFormatKnit1().initialize(control)
251
# in case of side effects of locking.
255
# format 'Bazaar-NG Knit Repository Format 1'
256
# lock: is a directory
257
# inventory.weave == empty_weave
258
# empty revision-store directory
259
# empty weaves directory
260
t = control.get_repository_transport(None)
261
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
262
t.get('format').read())
263
# XXX: no locks left when unlocked at the moment
264
# self.assertEqualDiff('', t.get('lock').read())
265
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
268
def assertHasKnit(self, t, knit_name):
269
"""Assert that knit_name exists on t."""
270
self.assertEqualDiff('# bzr knit index 8\n',
271
t.get(knit_name + '.kndx').read())
273
self.assertTrue(t.has(knit_name + '.knit'))
275
def check_knits(self, t):
276
"""check knit content for a repository."""
277
self.assertHasKnit(t, 'inventory')
278
self.assertHasKnit(t, 'revisions')
279
self.assertHasKnit(t, 'signatures')
281
def test_shared_disk_layout(self):
282
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
283
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
285
# format 'Bazaar-NG Knit Repository Format 1'
286
# lock: is a directory
287
# inventory.weave == empty_weave
288
# empty revision-store directory
289
# empty weaves directory
290
# a 'shared-storage' marker file.
291
t = control.get_repository_transport(None)
292
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
293
t.get('format').read())
294
# XXX: no locks left when unlocked at the moment
295
# self.assertEqualDiff('', t.get('lock').read())
296
self.assertEqualDiff('', t.get('shared-storage').read())
297
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
300
def test_shared_no_tree_disk_layout(self):
301
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
302
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
303
repo.set_make_working_trees(False)
305
# format 'Bazaar-NG Knit Repository Format 1'
307
# inventory.weave == empty_weave
308
# empty revision-store directory
309
# empty weaves directory
310
# a 'shared-storage' marker file.
311
t = control.get_repository_transport(None)
312
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
313
t.get('format').read())
314
# XXX: no locks left when unlocked at the moment
315
# self.assertEqualDiff('', t.get('lock').read())
316
self.assertEqualDiff('', t.get('shared-storage').read())
317
self.assertEqualDiff('', t.get('no-working-trees').read())
318
repo.set_make_working_trees(True)
319
self.assertFalse(t.has('no-working-trees'))
320
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
324
class InterString(repository.InterRepository):
325
"""An inter-repository optimised code path for strings.
327
This is for use during testing where we use strings as repositories
328
so that none of the default regsitered inter-repository classes will
333
def is_compatible(repo_source, repo_target):
334
"""InterString is compatible with strings-as-repos."""
335
return isinstance(repo_source, str) and isinstance(repo_target, str)
338
class TestInterRepository(TestCaseWithTransport):
340
def test_get_default_inter_repository(self):
341
# test that the InterRepository.get(repo_a, repo_b) probes
342
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
343
# true and returns a default inter_repo otherwise.
344
# This also tests that the default registered optimised interrepository
345
# classes do not barf inappropriately when a surprising repository type
347
dummy_a = "Repository 1."
348
dummy_b = "Repository 2."
349
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
351
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
352
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
353
inter_repo = repository.InterRepository.get(repo_a, repo_b)
354
self.assertEqual(repository.InterRepository,
355
inter_repo.__class__)
356
self.assertEqual(repo_a, inter_repo.source)
357
self.assertEqual(repo_b, inter_repo.target)
359
def test_register_inter_repository_class(self):
360
# test that a optimised code path provider - a
361
# InterRepository subclass can be registered and unregistered
362
# and that it is correctly selected when given a repository
363
# pair that it returns true on for the is_compatible static method
365
dummy_a = "Repository 1."
366
dummy_b = "Repository 2."
367
repository.InterRepository.register_optimiser(InterString)
369
# we should get the default for something InterString returns False
371
self.assertFalse(InterString.is_compatible(dummy_a, None))
372
self.assertGetsDefaultInterRepository(dummy_a, None)
373
# and we should get an InterString for a pair it 'likes'
374
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
375
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
376
self.assertEqual(InterString, inter_repo.__class__)
377
self.assertEqual(dummy_a, inter_repo.source)
378
self.assertEqual(dummy_b, inter_repo.target)
380
repository.InterRepository.unregister_optimiser(InterString)
381
# now we should get the default InterRepository object again.
382
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
385
class TestInterWeaveRepo(TestCaseWithTransport):
387
def test_is_compatible_and_registered(self):
388
# InterWeaveRepo is compatible when either side
389
# is a format 5/6/7 branch
390
formats = [repository.RepositoryFormat5(),
391
repository.RepositoryFormat6(),
392
repository.RepositoryFormat7()]
393
incompatible_formats = [repository.RepositoryFormat4(),
394
repository.RepositoryFormatKnit1(),
396
repo_a = self.make_repository('a')
397
repo_b = self.make_repository('b')
398
is_compatible = repository.InterWeaveRepo.is_compatible
399
for source in incompatible_formats:
400
# force incompatible left then right
401
repo_a._format = source
402
repo_b._format = formats[0]
403
self.assertFalse(is_compatible(repo_a, repo_b))
404
self.assertFalse(is_compatible(repo_b, repo_a))
405
for source in formats:
406
repo_a._format = source
407
for target in formats:
408
repo_b._format = target
409
self.assertTrue(is_compatible(repo_a, repo_b))
410
self.assertEqual(repository.InterWeaveRepo,
411
repository.InterRepository.get(repo_a,
415
class TestRepositoryConverter(TestCaseWithTransport):
417
def test_convert_empty(self):
418
t = get_transport(self.get_url('.'))
419
t.mkdir('repository')
420
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
421
repo = repository.RepositoryFormat7().initialize(repo_dir)
422
target_format = repository.RepositoryFormatKnit1()
423
converter = repository.CopyConverter(target_format)
424
pb = bzrlib.ui.ui_factory.nested_progress_bar()
426
converter.convert(repo, pb)
429
repo = repo_dir.open_repository()
430
self.assertTrue(isinstance(target_format, repo._format.__class__))