25
25
from StringIO import StringIO
28
branch as _mod_branch,
35
from bzrlib.branch import (
39
BranchReferenceFormat,
45
_run_with_write_locked_target,
47
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
28
import bzrlib.bzrdir as bzrdir
49
29
from bzrlib.errors import (NotBranchError,
50
30
UnknownFormatError,
52
31
UnsupportedFormatError,
55
34
from bzrlib.tests import TestCase, TestCaseWithTransport
56
35
from bzrlib.transport import get_transport
59
37
class TestDefaultFormat(TestCase):
61
def test_default_format(self):
62
# update this if you change the default branch format
63
self.assertIsInstance(BranchFormat.get_default_format(),
66
def test_default_format_is_same_as_bzrdir_default(self):
67
# XXX: it might be nice if there was only one place the default was
68
# set, but at the moment that's not true -- mbp 20070814 --
69
# https://bugs.launchpad.net/bzr/+bug/132376
70
self.assertEqual(BranchFormat.get_default_format(),
71
BzrDirFormat.get_default_format().get_branch_format())
73
39
def test_get_set_default_format(self):
74
# set the format and then set it back again
75
old_format = BranchFormat.get_default_format()
76
BranchFormat.set_default_format(SampleBranchFormat())
40
old_format = bzrlib.branch.BranchFormat.get_default_format()
42
self.assertTrue(isinstance(old_format, bzrlib.branch.BzrBranchFormat5))
43
bzrlib.branch.BranchFormat.set_default_format(SampleBranchFormat())
78
45
# the default branch format is used by the meta dir format
79
46
# which is not the default bzrdir format at this point
80
dir = BzrDirMetaFormat1().initialize('memory:///')
47
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:/')
81
48
result = dir.create_branch()
82
49
self.assertEqual(result, 'A branch')
84
BranchFormat.set_default_format(old_format)
85
self.assertEqual(old_format, BranchFormat.get_default_format())
88
class TestBranchFormat5(TestCaseWithTransport):
89
"""Tests specific to branch format 5"""
91
def test_branch_format_5_uses_lockdir(self):
93
bzrdir = BzrDirMetaFormat1().initialize(url)
94
bzrdir.create_repository()
95
branch = bzrdir.create_branch()
96
t = self.get_transport()
97
self.log("branch instance is %r" % branch)
98
self.assert_(isinstance(branch, BzrBranch5))
99
self.assertIsDirectory('.', t)
100
self.assertIsDirectory('.bzr/branch', t)
101
self.assertIsDirectory('.bzr/branch/lock', t)
104
self.assertIsDirectory('.bzr/branch/lock/held', t)
108
def test_set_push_location(self):
109
from bzrlib.config import (locations_config_filename,
110
ensure_config_dir_exists)
111
ensure_config_dir_exists()
112
fn = locations_config_filename()
113
# write correct newlines to locations.conf
114
# by default ConfigObj uses native line-endings for new files
115
# but uses already existing line-endings if file is not empty
118
f.write('# comment\n')
122
branch = self.make_branch('.', format='knit')
123
branch.set_push_location('foo')
124
local_path = urlutils.local_path_from_url(branch.base[:-1])
125
self.assertFileEqual("# comment\n"
127
"push_location = foo\n"
128
"push_location:policy = norecurse\n" % local_path,
131
# TODO RBC 20051029 test getting a push location from a branch in a
132
# recursive section - that is, it appends the branch name.
135
class SampleBranchFormat(BranchFormat):
51
bzrlib.branch.BranchFormat.set_default_format(old_format)
52
self.assertEqual(old_format, bzrlib.branch.BranchFormat.get_default_format())
55
class SampleBranchFormat(bzrlib.branch.BranchFormat):
136
56
"""A sample format
138
this format is initializable, unsupported to aid in testing the
58
this format is initializable, unsupported to aid in testing the
139
59
open and open_downlevel routines.
193
113
format.initialize(dir)
194
114
# register a format for it.
195
BranchFormat.register_format(format)
115
bzrlib.branch.BranchFormat.register_format(format)
196
116
# which branch.Open will refuse (not supported)
197
self.assertRaises(UnsupportedFormatError, Branch.open, self.get_url())
198
self.make_branch_and_tree('foo')
117
self.assertRaises(UnsupportedFormatError, bzrlib.branch.Branch.open, self.get_url())
199
118
# but open_downlevel will work
200
119
self.assertEqual(format.open(dir), bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
201
120
# unregister the format
202
BranchFormat.unregister_format(format)
203
self.make_branch_and_tree('bar')
206
class TestBranch67(object):
207
"""Common tests for both branch 6 and 7 which are mostly the same."""
209
def get_format_name(self):
210
raise NotImplementedError(self.get_format_name)
212
def get_format_name_subtree(self):
213
raise NotImplementedError(self.get_format_name)
216
raise NotImplementedError(self.get_class)
218
def test_creation(self):
219
format = BzrDirMetaFormat1()
220
format.set_branch_format(_mod_branch.BzrBranchFormat6())
221
branch = self.make_branch('a', format=format)
222
self.assertIsInstance(branch, self.get_class())
223
branch = self.make_branch('b', format=self.get_format_name())
224
self.assertIsInstance(branch, self.get_class())
225
branch = _mod_branch.Branch.open('a')
226
self.assertIsInstance(branch, self.get_class())
228
def test_layout(self):
229
branch = self.make_branch('a', format=self.get_format_name())
230
self.failUnlessExists('a/.bzr/branch/last-revision')
231
self.failIfExists('a/.bzr/branch/revision-history')
232
self.failIfExists('a/.bzr/branch/references')
234
def test_config(self):
235
"""Ensure that all configuration data is stored in the branch"""
236
branch = self.make_branch('a', format=self.get_format_name())
237
branch.set_parent('http://bazaar-vcs.org')
238
self.failIfExists('a/.bzr/branch/parent')
239
self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
240
branch.set_push_location('sftp://bazaar-vcs.org')
241
config = branch.get_config()._get_branch_data_config()
242
self.assertEqual('sftp://bazaar-vcs.org',
243
config.get_user_option('push_location'))
244
branch.set_bound_location('ftp://bazaar-vcs.org')
245
self.failIfExists('a/.bzr/branch/bound')
246
self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
248
def test_set_revision_history(self):
249
builder = self.make_branch_builder('.', format=self.get_format_name())
250
builder.build_snapshot('foo', None,
251
[('add', ('', None, 'directory', None))],
253
builder.build_snapshot('bar', None, [], message='bar')
254
branch = builder.get_branch()
256
self.addCleanup(branch.unlock)
257
branch.set_revision_history(['foo', 'bar'])
258
branch.set_revision_history(['foo'])
259
self.assertRaises(errors.NotLefthandHistory,
260
branch.set_revision_history, ['bar'])
262
def do_checkout_test(self, lightweight=False):
263
tree = self.make_branch_and_tree('source',
264
format=self.get_format_name_subtree())
265
subtree = self.make_branch_and_tree('source/subtree',
266
format=self.get_format_name_subtree())
267
subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
268
format=self.get_format_name_subtree())
269
self.build_tree(['source/subtree/file',
270
'source/subtree/subsubtree/file'])
271
subsubtree.add('file')
273
subtree.add_reference(subsubtree)
274
tree.add_reference(subtree)
275
tree.commit('a revision')
276
subtree.commit('a subtree file')
277
subsubtree.commit('a subsubtree file')
278
tree.branch.create_checkout('target', lightweight=lightweight)
279
self.failUnlessExists('target')
280
self.failUnlessExists('target/subtree')
281
self.failUnlessExists('target/subtree/file')
282
self.failUnlessExists('target/subtree/subsubtree/file')
283
subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
285
self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
287
self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
289
def test_checkout_with_references(self):
290
self.do_checkout_test()
292
def test_light_checkout_with_references(self):
293
self.do_checkout_test(lightweight=True)
295
def test_set_push(self):
296
branch = self.make_branch('source', format=self.get_format_name())
297
branch.get_config().set_user_option('push_location', 'old',
298
store=config.STORE_LOCATION)
301
warnings.append(args[0] % args[1:])
302
_warning = trace.warning
303
trace.warning = warning
305
branch.set_push_location('new')
307
trace.warning = _warning
308
self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
312
class TestBranch6(TestBranch67, TestCaseWithTransport):
315
return _mod_branch.BzrBranch6
317
def get_format_name(self):
318
return "dirstate-tags"
320
def get_format_name_subtree(self):
321
return "dirstate-with-subtree"
323
def test_set_stacked_on_url_errors(self):
324
branch = self.make_branch('a', format=self.get_format_name())
325
self.assertRaises(errors.UnstackableBranchFormat,
326
branch.set_stacked_on_url, None)
328
def test_default_stacked_location(self):
329
branch = self.make_branch('a', format=self.get_format_name())
330
self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
333
class TestBranch7(TestBranch67, TestCaseWithTransport):
336
return _mod_branch.BzrBranch7
338
def get_format_name(self):
341
def get_format_name_subtree(self):
342
return "development-subtree"
344
def test_set_stacked_on_url_unstackable_repo(self):
345
repo = self.make_repository('a', format='dirstate-tags')
346
control = repo.bzrdir
347
branch = _mod_branch.BzrBranchFormat7().initialize(control)
348
target = self.make_branch('b')
349
self.assertRaises(errors.UnstackableRepositoryFormat,
350
branch.set_stacked_on_url, target.base)
352
def test_clone_stacked_on_unstackable_repo(self):
353
repo = self.make_repository('a', format='dirstate-tags')
354
control = repo.bzrdir
355
branch = _mod_branch.BzrBranchFormat7().initialize(control)
356
# Calling clone should not raise UnstackableRepositoryFormat.
357
cloned_bzrdir = control.clone('cloned')
359
def _test_default_stacked_location(self):
360
branch = self.make_branch('a', format=self.get_format_name())
361
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
363
def test_stack_and_unstack(self):
364
branch = self.make_branch('a', format=self.get_format_name())
365
target = self.make_branch_and_tree('b', format=self.get_format_name())
366
branch.set_stacked_on_url(target.branch.base)
367
self.assertEqual(target.branch.base, branch.get_stacked_on_url())
368
revid = target.commit('foo')
369
self.assertTrue(branch.repository.has_revision(revid))
370
branch.set_stacked_on_url(None)
371
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
372
self.assertFalse(branch.repository.has_revision(revid))
374
def test_open_opens_stacked_reference(self):
375
branch = self.make_branch('a', format=self.get_format_name())
376
target = self.make_branch_and_tree('b', format=self.get_format_name())
377
branch.set_stacked_on_url(target.branch.base)
378
branch = branch.bzrdir.open_branch()
379
revid = target.commit('foo')
380
self.assertTrue(branch.repository.has_revision(revid))
383
class BzrBranch8(TestCaseWithTransport):
385
def make_branch(self, location, format=None):
387
format = bzrdir.format_registry.make_bzrdir('1.9')
388
format.set_branch_format(_mod_branch.BzrBranchFormat8())
389
return TestCaseWithTransport.make_branch(self, location, format=format)
391
def create_branch_with_reference(self):
392
branch = self.make_branch('branch')
393
branch._set_all_reference_info({'file-id': ('path', 'location')})
397
def instrument_branch(branch, gets):
398
old_get = branch._transport.get
399
def get(*args, **kwargs):
400
gets.append((args, kwargs))
401
return old_get(*args, **kwargs)
402
branch._transport.get = get
404
def test_reference_info_caching_read_locked(self):
406
branch = self.create_branch_with_reference()
408
self.addCleanup(branch.unlock)
409
self.instrument_branch(branch, gets)
410
branch.get_reference_info('file-id')
411
branch.get_reference_info('file-id')
412
self.assertEqual(1, len(gets))
414
def test_reference_info_caching_read_unlocked(self):
416
branch = self.create_branch_with_reference()
417
self.instrument_branch(branch, gets)
418
branch.get_reference_info('file-id')
419
branch.get_reference_info('file-id')
420
self.assertEqual(2, len(gets))
422
def test_reference_info_caching_write_locked(self):
424
branch = self.make_branch('branch')
426
self.instrument_branch(branch, gets)
427
self.addCleanup(branch.unlock)
428
branch._set_all_reference_info({'file-id': ('path2', 'location2')})
429
path, location = branch.get_reference_info('file-id')
430
self.assertEqual(0, len(gets))
431
self.assertEqual('path2', path)
432
self.assertEqual('location2', location)
434
def test_reference_info_caches_cleared(self):
435
branch = self.make_branch('branch')
437
branch.set_reference_info('file-id', 'path2', 'location2')
439
doppelganger = Branch.open('branch')
440
doppelganger.set_reference_info('file-id', 'path3', 'location3')
441
self.assertEqual(('path3', 'location3'),
442
branch.get_reference_info('file-id'))
121
bzrlib.branch.BranchFormat.unregister_format(format)
444
124
class TestBranchReference(TestCaseWithTransport):
445
125
"""Tests for the branch reference facility."""
453
133
target_branch = dir.create_branch()
454
134
t.mkdir('branch')
455
135
branch_dir = bzrdirformat.initialize(self.get_url('branch'))
456
made_branch = BranchReferenceFormat().initialize(branch_dir, target_branch)
136
made_branch = bzrlib.branch.BranchReferenceFormat().initialize(branch_dir, target_branch)
457
137
self.assertEqual(made_branch.base, target_branch.base)
458
138
opened_branch = branch_dir.open_branch()
459
139
self.assertEqual(opened_branch.base, target_branch.base)
461
def test_get_reference(self):
462
"""For a BranchReference, get_reference should reutrn the location."""
463
branch = self.make_branch('target')
464
checkout = branch.create_checkout('checkout', lightweight=True)
465
reference_url = branch.bzrdir.root_transport.abspath('') + '/'
466
# if the api for create_checkout changes to return different checkout types
467
# then this file read will fail.
468
self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
469
self.assertEqual(reference_url,
470
_mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
473
class TestHooks(TestCase):
475
def test_constructor(self):
476
"""Check that creating a BranchHooks instance has the right defaults."""
477
hooks = BranchHooks()
478
self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
479
self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
480
self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
481
self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
482
self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
483
self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
484
self.assertTrue("post_change_branch_tip" in hooks,
485
"post_change_branch_tip not in %s" % hooks)
487
def test_installed_hooks_are_BranchHooks(self):
488
"""The installed hooks object should be a BranchHooks."""
489
# the installed hooks are saved in self._preserved_hooks.
490
self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
494
class TestPullResult(TestCase):
496
def test_pull_result_to_int(self):
497
# to support old code, the pull result can be used as an int
501
# this usage of results is not recommended for new code (because it
502
# doesn't describe very well what happened), but for api stability
503
# it's still supported
504
a = "%d revisions pulled" % r
505
self.assertEqual(a, "10 revisions pulled")
509
class _StubLockable(object):
510
"""Helper for TestRunWithWriteLockedTarget."""
512
def __init__(self, calls, unlock_exc=None):
514
self.unlock_exc = unlock_exc
516
def lock_write(self):
517
self.calls.append('lock_write')
520
self.calls.append('unlock')
521
if self.unlock_exc is not None:
522
raise self.unlock_exc
525
class _ErrorFromCallable(Exception):
526
"""Helper for TestRunWithWriteLockedTarget."""
529
class _ErrorFromUnlock(Exception):
530
"""Helper for TestRunWithWriteLockedTarget."""
533
class TestRunWithWriteLockedTarget(TestCase):
534
"""Tests for _run_with_write_locked_target."""
540
def func_that_returns_ok(self):
541
self._calls.append('func called')
544
def func_that_raises(self):
545
self._calls.append('func called')
546
raise _ErrorFromCallable()
548
def test_success_unlocks(self):
549
lockable = _StubLockable(self._calls)
550
result = _run_with_write_locked_target(
551
lockable, self.func_that_returns_ok)
552
self.assertEqual('ok', result)
553
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
555
def test_exception_unlocks_and_propagates(self):
556
lockable = _StubLockable(self._calls)
557
self.assertRaises(_ErrorFromCallable,
558
_run_with_write_locked_target, lockable, self.func_that_raises)
559
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
561
def test_callable_succeeds_but_error_during_unlock(self):
562
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
563
self.assertRaises(_ErrorFromUnlock,
564
_run_with_write_locked_target, lockable, self.func_that_returns_ok)
565
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
567
def test_error_during_unlock_does_not_mask_original_error(self):
568
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
569
self.assertRaises(_ErrorFromCallable,
570
_run_with_write_locked_target, lockable, self.func_that_raises)
571
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)