1
# (C) 2006 Canonical Ltd
1
# Copyright (C) 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
33
34
UnknownFormatError,
34
35
UnsupportedFormatError,
36
import bzrlib.repository as repository
37
from bzrlib.repository import RepositoryFormat
37
38
from bzrlib.tests import TestCase, TestCaseWithTransport
38
39
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
43
49
class TestDefaultFormat(TestCase):
45
51
def test_get_set_default_format(self):
52
old_default = bzrdir.format_registry.get('default')
53
private_default = old_default().repository_format.__class__
46
54
old_format = repository.RepositoryFormat.get_default_format()
47
# default is None - we cannot create a Repository independently yet
48
self.assertTrue(isinstance(old_format, repository.RepositoryFormat7))
49
repository.RepositoryFormat.set_default_format(SampleRepositoryFormat())
55
self.assertTrue(isinstance(old_format, private_default))
56
def make_sample_bzrdir():
57
my_bzrdir = bzrdir.BzrDirMetaFormat1()
58
my_bzrdir.repository_format = SampleRepositoryFormat()
60
bzrdir.format_registry.remove('default')
61
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
62
bzrdir.format_registry.set_default('sample')
50
63
# creating a repository should now create an instrumented dir.
52
65
# the default branch format is used by the meta dir format
53
66
# which is not the default bzrdir format at this point
54
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
67
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
55
68
result = dir.create_repository()
56
69
self.assertEqual(result, 'A bzr repository dir')
58
repository.RepositoryFormat.set_default_format(old_format)
59
self.assertEqual(old_format, repository.RepositoryFormat.get_default_format())
71
bzrdir.format_registry.remove('default')
72
bzrdir.format_registry.remove('sample')
73
bzrdir.format_registry.register('default', old_default, '')
74
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
62
78
class SampleRepositoryFormat(repository.RepositoryFormat):
73
89
def initialize(self, a_bzrdir, shared=False):
74
90
"""Initialize a repository in a BzrDir"""
75
91
t = a_bzrdir.get_repository_transport(self)
76
t.put('format', StringIO(self.get_format_string()))
92
t.put_bytes('format', self.get_format_string())
77
93
return 'A bzr repository dir'
79
95
def is_supported(self):
97
113
t = get_transport(url)
98
114
found_format = repository.RepositoryFormat.find_format(dir)
99
115
self.failUnless(isinstance(found_format, format.__class__))
100
check_format(repository.RepositoryFormat7(), "bar")
116
check_format(weaverepo.RepositoryFormat7(), "bar")
102
118
def test_find_format_no_repository(self):
103
119
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
133
149
def test_no_ancestry_weave(self):
134
150
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
135
repo = repository.RepositoryFormat6().initialize(control)
151
repo = weaverepo.RepositoryFormat6().initialize(control)
136
152
# We no longer need to create the ancestry.weave file
137
153
# since it is *never* used.
138
154
self.assertRaises(NoSuchFile,
145
161
def test_disk_layout(self):
146
162
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
147
repo = repository.RepositoryFormat7().initialize(control)
163
repo = weaverepo.RepositoryFormat7().initialize(control)
148
164
# in case of side effects of locking.
149
165
repo.lock_write()
157
173
t = control.get_repository_transport(None)
158
174
self.assertEqualDiff('Bazaar-NG Repository format 7',
159
175
t.get('format').read())
160
self.assertEqualDiff('', t.get('lock').read())
161
176
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
162
177
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
163
178
self.assertEqualDiff('# bzr weave file v5\n'
168
183
def test_shared_disk_layout(self):
169
184
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
170
repo = repository.RepositoryFormat7().initialize(control, shared=True)
185
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
172
187
# format 'Bazaar-NG Repository format 7'
174
188
# inventory.weave == empty_weave
175
189
# empty revision-store directory
176
190
# empty weaves directory
177
191
# a 'shared-storage' marker file.
192
# lock is not present when unlocked
178
193
t = control.get_repository_transport(None)
179
194
self.assertEqualDiff('Bazaar-NG Repository format 7',
180
195
t.get('format').read())
181
self.assertEqualDiff('', t.get('lock').read())
182
196
self.assertEqualDiff('', t.get('shared-storage').read())
183
197
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
184
198
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
188
202
t.get('inventory.weave').read())
203
self.assertFalse(t.has('branch-lock'))
205
def test_creates_lockdir(self):
206
"""Make sure it appears to be controlled by a LockDir existence"""
207
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
209
t = control.get_repository_transport(None)
210
# TODO: Should check there is a 'lock' toplevel directory,
211
# regardless of contents
212
self.assertFalse(t.has('lock/held/info'))
215
self.assertTrue(t.has('lock/held/info'))
217
# unlock so we don't get a warning about failing to do so
220
def test_uses_lockdir(self):
221
"""repo format 7 actually locks on lockdir"""
222
base_url = self.get_url()
223
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
224
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
225
t = control.get_repository_transport(None)
229
# make sure the same lock is created by opening it
230
repo = repository.Repository.open(base_url)
232
self.assertTrue(t.has('lock/held/info'))
234
self.assertFalse(t.has('lock/held/info'))
190
236
def test_shared_no_tree_disk_layout(self):
191
237
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
192
repo = repository.RepositoryFormat7().initialize(control, shared=True)
238
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
193
239
repo.set_make_working_trees(False)
195
241
# format 'Bazaar-NG Repository format 7'
201
247
t = control.get_repository_transport(None)
202
248
self.assertEqualDiff('Bazaar-NG Repository format 7',
203
249
t.get('format').read())
204
self.assertEqualDiff('', t.get('lock').read())
250
## self.assertEqualDiff('', t.get('lock').read())
205
251
self.assertEqualDiff('', t.get('shared-storage').read())
206
252
self.assertEqualDiff('', t.get('no-working-trees').read())
207
253
repo.set_make_working_trees(True)
219
265
def test_disk_layout(self):
220
266
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
221
repo = repository.RepositoryFormatKnit1().initialize(control)
267
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
222
268
# in case of side effects of locking.
223
269
repo.lock_write()
226
272
# format 'Bazaar-NG Knit Repository Format 1'
273
# lock: is a directory
228
274
# inventory.weave == empty_weave
229
275
# empty revision-store directory
230
276
# empty weaves directory
231
277
t = control.get_repository_transport(None)
232
278
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
233
279
t.get('format').read())
234
self.assertEqualDiff('', t.get('lock').read())
235
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
280
# XXX: no locks left when unlocked at the moment
281
# self.assertEqualDiff('', t.get('lock').read())
236
282
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
237
# cheating and using a weave for now.
238
self.assertEqualDiff('# bzr weave file v5\n'
241
t.get('inventory.weave').read())
285
def assertHasKnit(self, t, knit_name):
286
"""Assert that knit_name exists on t."""
287
self.assertEqualDiff('# bzr knit index 8\n',
288
t.get(knit_name + '.kndx').read())
290
self.assertTrue(t.has(knit_name + '.knit'))
292
def check_knits(self, t):
293
"""check knit content for a repository."""
294
self.assertHasKnit(t, 'inventory')
295
self.assertHasKnit(t, 'revisions')
296
self.assertHasKnit(t, 'signatures')
243
298
def test_shared_disk_layout(self):
244
299
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
245
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
300
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
247
302
# format 'Bazaar-NG Knit Repository Format 1'
303
# lock: is a directory
249
304
# inventory.weave == empty_weave
250
305
# empty revision-store directory
251
306
# empty weaves directory
253
308
t = control.get_repository_transport(None)
254
309
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
255
310
t.get('format').read())
256
self.assertEqualDiff('', t.get('lock').read())
311
# XXX: no locks left when unlocked at the moment
312
# self.assertEqualDiff('', t.get('lock').read())
257
313
self.assertEqualDiff('', t.get('shared-storage').read())
258
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
259
314
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
260
# cheating and using a weave for now.
261
self.assertEqualDiff('# bzr weave file v5\n'
264
t.get('inventory.weave').read())
266
317
def test_shared_no_tree_disk_layout(self):
267
318
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
268
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
319
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
269
320
repo.set_make_working_trees(False)
271
322
# format 'Bazaar-NG Knit Repository Format 1'
277
328
t = control.get_repository_transport(None)
278
329
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
279
330
t.get('format').read())
280
self.assertEqualDiff('', t.get('lock').read())
331
# XXX: no locks left when unlocked at the moment
332
# self.assertEqualDiff('', t.get('lock').read())
281
333
self.assertEqualDiff('', t.get('shared-storage').read())
282
334
self.assertEqualDiff('', t.get('no-working-trees').read())
283
335
repo.set_make_working_trees(True)
284
336
self.assertFalse(t.has('no-working-trees'))
285
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
286
337
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
287
# cheating and using a weave for now.
288
self.assertEqualDiff('# bzr weave file v5\n'
291
t.get('inventory.weave').read())
294
class InterString(repository.InterRepository):
295
"""An inter-repository optimised code path for strings.
297
This is for use during testing where we use strings as repositories
341
class DummyRepository(object):
342
"""A dummy repository for testing."""
346
def supports_rich_root(self):
350
class InterDummy(repository.InterRepository):
351
"""An inter-repository optimised code path for DummyRepository.
353
This is for use during testing where we use DummyRepository as repositories
298
354
so that none of the default regsitered inter-repository classes will
303
359
def is_compatible(repo_source, repo_target):
304
"""InterString is compatible with strings-as-repos."""
305
return isinstance(repo_source, str) and isinstance(repo_target, str)
360
"""InterDummy is compatible with DummyRepository."""
361
return (isinstance(repo_source, DummyRepository) and
362
isinstance(repo_target, DummyRepository))
308
365
class TestInterRepository(TestCaseWithTransport):
314
371
# This also tests that the default registered optimised interrepository
315
372
# classes do not barf inappropriately when a surprising repository type
316
373
# is handed to them.
317
dummy_a = "Repository 1."
318
dummy_b = "Repository 2."
374
dummy_a = DummyRepository()
375
dummy_b = DummyRepository()
319
376
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
321
378
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
322
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
379
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
381
The effective default is now InterSameDataRepository because there is
382
no actual sane default in the presence of incompatible data models.
323
384
inter_repo = repository.InterRepository.get(repo_a, repo_b)
324
self.assertEqual(repository.InterRepository,
385
self.assertEqual(repository.InterSameDataRepository,
325
386
inter_repo.__class__)
326
387
self.assertEqual(repo_a, inter_repo.source)
327
388
self.assertEqual(repo_b, inter_repo.target)
332
393
# and that it is correctly selected when given a repository
333
394
# pair that it returns true on for the is_compatible static method
335
dummy_a = "Repository 1."
336
dummy_b = "Repository 2."
337
repository.InterRepository.register_optimiser(InterString)
396
dummy_a = DummyRepository()
397
dummy_b = DummyRepository()
398
repo = self.make_repository('.')
399
# hack dummies to look like repo somewhat.
400
dummy_a._serializer = repo._serializer
401
dummy_b._serializer = repo._serializer
402
repository.InterRepository.register_optimiser(InterDummy)
339
# we should get the default for something InterString returns False
404
# we should get the default for something InterDummy returns False
341
self.assertFalse(InterString.is_compatible(dummy_a, None))
342
self.assertGetsDefaultInterRepository(dummy_a, None)
343
# and we should get an InterString for a pair it 'likes'
344
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
406
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
407
self.assertGetsDefaultInterRepository(dummy_a, repo)
408
# and we should get an InterDummy for a pair it 'likes'
409
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
345
410
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
346
self.assertEqual(InterString, inter_repo.__class__)
411
self.assertEqual(InterDummy, inter_repo.__class__)
347
412
self.assertEqual(dummy_a, inter_repo.source)
348
413
self.assertEqual(dummy_b, inter_repo.target)
350
repository.InterRepository.unregister_optimiser(InterString)
415
repository.InterRepository.unregister_optimiser(InterDummy)
351
416
# now we should get the default InterRepository object again.
352
417
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
357
422
def test_is_compatible_and_registered(self):
358
423
# InterWeaveRepo is compatible when either side
359
424
# is a format 5/6/7 branch
360
formats = [repository.RepositoryFormat5(),
361
repository.RepositoryFormat6(),
362
repository.RepositoryFormat7()]
363
incompatible_formats = [repository.RepositoryFormat4(),
364
repository.RepositoryFormatKnit1(),
425
from bzrlib.repofmt import knitrepo, weaverepo
426
formats = [weaverepo.RepositoryFormat5(),
427
weaverepo.RepositoryFormat6(),
428
weaverepo.RepositoryFormat7()]
429
incompatible_formats = [weaverepo.RepositoryFormat4(),
430
knitrepo.RepositoryFormatKnit1(),
366
432
repo_a = self.make_repository('a')
367
433
repo_b = self.make_repository('b')
388
454
t = get_transport(self.get_url('.'))
389
455
t.mkdir('repository')
390
456
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
391
repo = repository.RepositoryFormat7().initialize(repo_dir)
392
target_format = repository.RepositoryFormatKnit1()
393
pb = bzrlib.ui.ui_factory.progress_bar()
457
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
458
target_format = knitrepo.RepositoryFormatKnit1()
395
459
converter = repository.CopyConverter(target_format)
396
converter.convert(repo, pb)
460
pb = bzrlib.ui.ui_factory.nested_progress_bar()
462
converter.convert(repo, pb)
397
465
repo = repo_dir.open_repository()
398
466
self.assertTrue(isinstance(target_format, repo._format.__class__))
469
class TestMisc(TestCase):
471
def test_unescape_xml(self):
472
"""We get some kind of error when malformed entities are passed"""
473
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
476
class TestRepositoryFormatKnit3(TestCaseWithTransport):
478
def test_convert(self):
479
"""Ensure the upgrade adds weaves for roots"""
480
format = bzrdir.BzrDirMetaFormat1()
481
format.repository_format = knitrepo.RepositoryFormatKnit1()
482
tree = self.make_branch_and_tree('.', format)
483
tree.commit("Dull commit", rev_id="dull")
484
revision_tree = tree.branch.repository.revision_tree('dull')
485
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
486
revision_tree.inventory.root.file_id)
487
format = bzrdir.BzrDirMetaFormat1()
488
format.repository_format = knitrepo.RepositoryFormatKnit3()
489
upgrade.Convert('.', format)
490
tree = workingtree.WorkingTree.open('.')
491
revision_tree = tree.branch.repository.revision_tree('dull')
492
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
493
tree.commit("Another dull commit", rev_id='dull2')
494
revision_tree = tree.branch.repository.revision_tree('dull2')
495
self.assertEqual('dull', revision_tree.inventory.root.revision)