13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for the Branch facility that are not interface tests.
19
For interface tests see tests/per_branch/*.py.
19
For interface tests see tests/branch_implementations/*.py.
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
25
from cStringIO import StringIO
28
branch as _mod_branch,
39
class TestDefaultFormat(tests.TestCase):
41
def test_default_format(self):
42
# update this if you change the default branch format
43
self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
44
_mod_branch.BzrBranchFormat7)
46
def test_default_format_is_same_as_bzrdir_default(self):
47
# XXX: it might be nice if there was only one place the default was
48
# set, but at the moment that's not true -- mbp 20070814 --
49
# https://bugs.launchpad.net/bzr/+bug/132376
51
_mod_branch.BranchFormat.get_default_format(),
52
bzrdir.BzrDirFormat.get_default_format().get_branch_format())
25
from StringIO import StringIO
28
from bzrlib.branch import (BzrBranch5,
30
import bzrlib.bzrdir as bzrdir
31
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
33
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
41
class TestDefaultFormat(TestCase):
54
43
def test_get_set_default_format(self):
55
# set the format and then set it back again
56
old_format = _mod_branch.BranchFormat.get_default_format()
57
_mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
44
old_format = bzrlib.branch.BranchFormat.get_default_format()
46
self.assertTrue(isinstance(old_format, bzrlib.branch.BzrBranchFormat5))
47
bzrlib.branch.BranchFormat.set_default_format(SampleBranchFormat())
59
49
# the default branch format is used by the meta dir format
60
50
# which is not the default bzrdir format at this point
61
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
51
dir = BzrDirMetaFormat1().initialize('memory:/')
62
52
result = dir.create_branch()
63
53
self.assertEqual(result, 'A branch')
65
_mod_branch.BranchFormat.set_default_format(old_format)
66
self.assertEqual(old_format,
67
_mod_branch.BranchFormat.get_default_format())
70
class TestBranchFormat5(tests.TestCaseWithTransport):
55
bzrlib.branch.BranchFormat.set_default_format(old_format)
56
self.assertEqual(old_format, bzrlib.branch.BranchFormat.get_default_format())
59
class TestBranchFormat5(TestCaseWithTransport):
71
60
"""Tests specific to branch format 5"""
73
62
def test_branch_format_5_uses_lockdir(self):
74
63
url = self.get_url()
75
bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
bdir.create_repository()
77
branch = bdir.create_branch()
64
bzrdir = BzrDirMetaFormat1().initialize(url)
65
bzrdir.create_repository()
66
branch = bzrdir.create_branch()
78
67
t = self.get_transport()
79
68
self.log("branch instance is %r" % branch)
80
self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
69
self.assert_(isinstance(branch, BzrBranch5))
81
70
self.assertIsDirectory('.', t)
82
71
self.assertIsDirectory('.bzr/branch', t)
83
72
self.assertIsDirectory('.bzr/branch/lock', t)
84
73
branch.lock_write()
85
self.addCleanup(branch.unlock)
86
self.assertIsDirectory('.bzr/branch/lock/held', t)
88
def test_set_push_location(self):
89
from bzrlib.config import (locations_config_filename,
90
ensure_config_dir_exists)
91
ensure_config_dir_exists()
92
fn = locations_config_filename()
93
# write correct newlines to locations.conf
94
# by default ConfigObj uses native line-endings for new files
95
# but uses already existing line-endings if file is not empty
98
f.write('# comment\n')
75
self.assertIsDirectory('.bzr/branch/lock/held', t)
102
branch = self.make_branch('.', format='knit')
103
branch.set_push_location('foo')
104
local_path = urlutils.local_path_from_url(branch.base[:-1])
105
self.assertFileEqual("# comment\n"
107
"push_location = foo\n"
108
"push_location:policy = norecurse\n" % local_path,
111
# TODO RBC 20051029 test getting a push location from a branch in a
112
# recursive section - that is, it appends the branch name.
115
class SampleBranchFormat(_mod_branch.BranchFormat):
80
class TestBranchEscaping(TestCaseWithTransport):
81
"""Test a branch can be correctly stored and used on a vfat-like transport
83
Makes sure we have proper escaping of invalid characters, etc.
85
It'd be better to test all operations on the FakeVFATTransportDecorator,
86
but working trees go straight to the os not through the Transport layer.
87
Therefore we build some history first in the regular way and then
88
check it's safe to access for vfat.
95
super(TestBranchEscaping, self).setUp()
96
from bzrlib.repository import RepositoryFormatKnit1
97
bzrdir = BzrDirMetaFormat1().initialize(self.get_url())
98
repo = RepositoryFormatKnit1().initialize(bzrdir)
99
branch = bzrdir.create_branch()
100
wt = bzrdir.create_workingtree()
101
self.build_tree_contents([("foo", "contents of foo")])
102
# add file with id containing wierd characters
103
wt.add(['foo'], [self.FOO_ID])
104
wt.commit('this is my new commit', rev_id=self.REV_ID)
106
def test_branch_on_vfat(self):
107
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
108
# now access over vfat; should be safe
109
transport = FakeVFATTransportDecorator('vfat+' + self.get_url())
110
bzrdir, junk = BzrDir.open_containing_from_transport(transport)
111
branch = bzrdir.open_branch()
112
revtree = branch.repository.revision_tree(self.REV_ID)
113
contents = revtree.get_file_text(self.FOO_ID)
114
self.assertEqual(contents, 'contents of foo')
117
class SampleBranchFormat(bzrlib.branch.BranchFormat):
116
118
"""A sample format
118
this format is initializable, unsupported to aid in testing the
120
this format is initializable, unsupported to aid in testing the
119
121
open and open_downlevel routines.
173
175
format.initialize(dir)
174
176
# register a format for it.
175
_mod_branch.BranchFormat.register_format(format)
177
bzrlib.branch.BranchFormat.register_format(format)
176
178
# which branch.Open will refuse (not supported)
177
self.assertRaises(errors.UnsupportedFormatError,
178
_mod_branch.Branch.open, self.get_url())
179
self.make_branch_and_tree('foo')
179
self.assertRaises(UnsupportedFormatError, bzrlib.branch.Branch.open, self.get_url())
180
180
# but open_downlevel will work
183
bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
181
self.assertEqual(format.open(dir), bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
184
182
# unregister the format
185
_mod_branch.BranchFormat.unregister_format(format)
186
self.make_branch_and_tree('bar')
189
class TestBranch67(object):
190
"""Common tests for both branch 6 and 7 which are mostly the same."""
192
def get_format_name(self):
193
raise NotImplementedError(self.get_format_name)
195
def get_format_name_subtree(self):
196
raise NotImplementedError(self.get_format_name)
199
raise NotImplementedError(self.get_class)
201
def test_creation(self):
202
format = bzrdir.BzrDirMetaFormat1()
203
format.set_branch_format(_mod_branch.BzrBranchFormat6())
204
branch = self.make_branch('a', format=format)
205
self.assertIsInstance(branch, self.get_class())
206
branch = self.make_branch('b', format=self.get_format_name())
207
self.assertIsInstance(branch, self.get_class())
208
branch = _mod_branch.Branch.open('a')
209
self.assertIsInstance(branch, self.get_class())
211
def test_layout(self):
212
branch = self.make_branch('a', format=self.get_format_name())
213
self.failUnlessExists('a/.bzr/branch/last-revision')
214
self.failIfExists('a/.bzr/branch/revision-history')
215
self.failIfExists('a/.bzr/branch/references')
217
def test_config(self):
218
"""Ensure that all configuration data is stored in the branch"""
219
branch = self.make_branch('a', format=self.get_format_name())
220
branch.set_parent('http://bazaar-vcs.org')
221
self.failIfExists('a/.bzr/branch/parent')
222
self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
223
branch.set_push_location('sftp://bazaar-vcs.org')
224
config = branch.get_config()._get_branch_data_config()
225
self.assertEqual('sftp://bazaar-vcs.org',
226
config.get_user_option('push_location'))
227
branch.set_bound_location('ftp://bazaar-vcs.org')
228
self.failIfExists('a/.bzr/branch/bound')
229
self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
231
def test_set_revision_history(self):
232
builder = self.make_branch_builder('.', format=self.get_format_name())
233
builder.build_snapshot('foo', None,
234
[('add', ('', None, 'directory', None))],
236
builder.build_snapshot('bar', None, [], message='bar')
237
branch = builder.get_branch()
239
self.addCleanup(branch.unlock)
240
branch.set_revision_history(['foo', 'bar'])
241
branch.set_revision_history(['foo'])
242
self.assertRaises(errors.NotLefthandHistory,
243
branch.set_revision_history, ['bar'])
245
def do_checkout_test(self, lightweight=False):
246
tree = self.make_branch_and_tree('source',
247
format=self.get_format_name_subtree())
248
subtree = self.make_branch_and_tree('source/subtree',
249
format=self.get_format_name_subtree())
250
subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
251
format=self.get_format_name_subtree())
252
self.build_tree(['source/subtree/file',
253
'source/subtree/subsubtree/file'])
254
subsubtree.add('file')
256
subtree.add_reference(subsubtree)
257
tree.add_reference(subtree)
258
tree.commit('a revision')
259
subtree.commit('a subtree file')
260
subsubtree.commit('a subsubtree file')
261
tree.branch.create_checkout('target', lightweight=lightweight)
262
self.failUnlessExists('target')
263
self.failUnlessExists('target/subtree')
264
self.failUnlessExists('target/subtree/file')
265
self.failUnlessExists('target/subtree/subsubtree/file')
266
subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
268
self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
270
self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
272
def test_checkout_with_references(self):
273
self.do_checkout_test()
275
def test_light_checkout_with_references(self):
276
self.do_checkout_test(lightweight=True)
278
def test_set_push(self):
279
branch = self.make_branch('source', format=self.get_format_name())
280
branch.get_config().set_user_option('push_location', 'old',
281
store=config.STORE_LOCATION)
284
warnings.append(args[0] % args[1:])
285
_warning = trace.warning
286
trace.warning = warning
288
branch.set_push_location('new')
290
trace.warning = _warning
291
self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
295
class TestBranch6(TestBranch67, tests.TestCaseWithTransport):
298
return _mod_branch.BzrBranch6
300
def get_format_name(self):
301
return "dirstate-tags"
303
def get_format_name_subtree(self):
304
return "dirstate-with-subtree"
306
def test_set_stacked_on_url_errors(self):
307
branch = self.make_branch('a', format=self.get_format_name())
308
self.assertRaises(errors.UnstackableBranchFormat,
309
branch.set_stacked_on_url, None)
311
def test_default_stacked_location(self):
312
branch = self.make_branch('a', format=self.get_format_name())
313
self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
316
class TestBranch7(TestBranch67, tests.TestCaseWithTransport):
319
return _mod_branch.BzrBranch7
321
def get_format_name(self):
324
def get_format_name_subtree(self):
325
return "development-subtree"
327
def test_set_stacked_on_url_unstackable_repo(self):
328
repo = self.make_repository('a', format='dirstate-tags')
329
control = repo.bzrdir
330
branch = _mod_branch.BzrBranchFormat7().initialize(control)
331
target = self.make_branch('b')
332
self.assertRaises(errors.UnstackableRepositoryFormat,
333
branch.set_stacked_on_url, target.base)
335
def test_clone_stacked_on_unstackable_repo(self):
336
repo = self.make_repository('a', format='dirstate-tags')
337
control = repo.bzrdir
338
branch = _mod_branch.BzrBranchFormat7().initialize(control)
339
# Calling clone should not raise UnstackableRepositoryFormat.
340
cloned_bzrdir = control.clone('cloned')
342
def _test_default_stacked_location(self):
343
branch = self.make_branch('a', format=self.get_format_name())
344
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
346
def test_stack_and_unstack(self):
347
branch = self.make_branch('a', format=self.get_format_name())
348
target = self.make_branch_and_tree('b', format=self.get_format_name())
349
branch.set_stacked_on_url(target.branch.base)
350
self.assertEqual(target.branch.base, branch.get_stacked_on_url())
351
revid = target.commit('foo')
352
self.assertTrue(branch.repository.has_revision(revid))
353
branch.set_stacked_on_url(None)
354
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
355
self.assertFalse(branch.repository.has_revision(revid))
357
def test_open_opens_stacked_reference(self):
358
branch = self.make_branch('a', format=self.get_format_name())
359
target = self.make_branch_and_tree('b', format=self.get_format_name())
360
branch.set_stacked_on_url(target.branch.base)
361
branch = branch.bzrdir.open_branch()
362
revid = target.commit('foo')
363
self.assertTrue(branch.repository.has_revision(revid))
366
class BzrBranch8(tests.TestCaseWithTransport):
368
def make_branch(self, location, format=None):
370
format = bzrdir.format_registry.make_bzrdir('1.9')
371
format.set_branch_format(_mod_branch.BzrBranchFormat8())
372
return tests.TestCaseWithTransport.make_branch(
373
self, location, format=format)
375
def create_branch_with_reference(self):
376
branch = self.make_branch('branch')
377
branch._set_all_reference_info({'file-id': ('path', 'location')})
381
def instrument_branch(branch, gets):
382
old_get = branch._transport.get
383
def get(*args, **kwargs):
384
gets.append((args, kwargs))
385
return old_get(*args, **kwargs)
386
branch._transport.get = get
388
def test_reference_info_caching_read_locked(self):
390
branch = self.create_branch_with_reference()
392
self.addCleanup(branch.unlock)
393
self.instrument_branch(branch, gets)
394
branch.get_reference_info('file-id')
395
branch.get_reference_info('file-id')
396
self.assertEqual(1, len(gets))
398
def test_reference_info_caching_read_unlocked(self):
400
branch = self.create_branch_with_reference()
401
self.instrument_branch(branch, gets)
402
branch.get_reference_info('file-id')
403
branch.get_reference_info('file-id')
404
self.assertEqual(2, len(gets))
406
def test_reference_info_caching_write_locked(self):
408
branch = self.make_branch('branch')
410
self.instrument_branch(branch, gets)
411
self.addCleanup(branch.unlock)
412
branch._set_all_reference_info({'file-id': ('path2', 'location2')})
413
path, location = branch.get_reference_info('file-id')
414
self.assertEqual(0, len(gets))
415
self.assertEqual('path2', path)
416
self.assertEqual('location2', location)
418
def test_reference_info_caches_cleared(self):
419
branch = self.make_branch('branch')
421
branch.set_reference_info('file-id', 'path2', 'location2')
423
doppelganger = _mod_branch.Branch.open('branch')
424
doppelganger.set_reference_info('file-id', 'path3', 'location3')
425
self.assertEqual(('path3', 'location3'),
426
branch.get_reference_info('file-id'))
428
class TestBranchReference(tests.TestCaseWithTransport):
183
bzrlib.branch.BranchFormat.unregister_format(format)
186
class TestBranchReference(TestCaseWithTransport):
429
187
"""Tests for the branch reference facility."""
431
189
def test_create_open_reference(self):
432
190
bzrdirformat = bzrdir.BzrDirMetaFormat1()
433
t = transport.get_transport(self.get_url('.'))
191
t = get_transport(self.get_url('.'))
435
193
dir = bzrdirformat.initialize(self.get_url('repo'))
436
194
dir.create_repository()
437
195
target_branch = dir.create_branch()
438
196
t.mkdir('branch')
439
197
branch_dir = bzrdirformat.initialize(self.get_url('branch'))
440
made_branch = _mod_branch.BranchReferenceFormat().initialize(
441
branch_dir, target_branch)
198
made_branch = bzrlib.branch.BranchReferenceFormat().initialize(branch_dir, target_branch)
442
199
self.assertEqual(made_branch.base, target_branch.base)
443
200
opened_branch = branch_dir.open_branch()
444
201
self.assertEqual(opened_branch.base, target_branch.base)
446
def test_get_reference(self):
447
"""For a BranchReference, get_reference should reutrn the location."""
448
branch = self.make_branch('target')
449
checkout = branch.create_checkout('checkout', lightweight=True)
450
reference_url = branch.bzrdir.root_transport.abspath('') + '/'
451
# if the api for create_checkout changes to return different checkout types
452
# then this file read will fail.
453
self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
454
self.assertEqual(reference_url,
455
_mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
458
class TestHooks(tests.TestCase):
460
def test_constructor(self):
461
"""Check that creating a BranchHooks instance has the right defaults."""
462
hooks = _mod_branch.BranchHooks()
463
self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
464
self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
465
self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
466
self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
467
self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
468
self.assertTrue("post_uncommit" in hooks,
469
"post_uncommit not in %s" % hooks)
470
self.assertTrue("post_change_branch_tip" in hooks,
471
"post_change_branch_tip not in %s" % hooks)
473
def test_installed_hooks_are_BranchHooks(self):
474
"""The installed hooks object should be a BranchHooks."""
475
# the installed hooks are saved in self._preserved_hooks.
476
self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
477
_mod_branch.BranchHooks)
480
class TestPullResult(tests.TestCase):
482
def test_pull_result_to_int(self):
483
# to support old code, the pull result can be used as an int
484
r = _mod_branch.PullResult()
487
# this usage of results is not recommended for new code (because it
488
# doesn't describe very well what happened), but for api stability
489
# it's still supported
490
a = "%d revisions pulled" % r
491
self.assertEqual(a, "10 revisions pulled")
493
def test_report_changed(self):
494
r = _mod_branch.PullResult()
495
r.old_revid = "old-revid"
497
r.new_revid = "new-revid"
501
self.assertEqual("Now on revision 20.\n", f.getvalue())
503
def test_report_unchanged(self):
504
r = _mod_branch.PullResult()
505
r.old_revid = "same-revid"
506
r.new_revid = "same-revid"
509
self.assertEqual("No revisions to pull.\n", f.getvalue())
512
class _StubLockable(object):
513
"""Helper for TestRunWithWriteLockedTarget."""
515
def __init__(self, calls, unlock_exc=None):
517
self.unlock_exc = unlock_exc
519
def lock_write(self):
520
self.calls.append('lock_write')
523
self.calls.append('unlock')
524
if self.unlock_exc is not None:
525
raise self.unlock_exc
528
class _ErrorFromCallable(Exception):
529
"""Helper for TestRunWithWriteLockedTarget."""
532
class _ErrorFromUnlock(Exception):
533
"""Helper for TestRunWithWriteLockedTarget."""
536
class TestRunWithWriteLockedTarget(tests.TestCase):
537
"""Tests for _run_with_write_locked_target."""
540
tests.TestCase.setUp(self)
543
def func_that_returns_ok(self):
544
self._calls.append('func called')
547
def func_that_raises(self):
548
self._calls.append('func called')
549
raise _ErrorFromCallable()
551
def test_success_unlocks(self):
552
lockable = _StubLockable(self._calls)
553
result = _mod_branch._run_with_write_locked_target(
554
lockable, self.func_that_returns_ok)
555
self.assertEqual('ok', result)
556
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
558
def test_exception_unlocks_and_propagates(self):
559
lockable = _StubLockable(self._calls)
560
self.assertRaises(_ErrorFromCallable,
561
_mod_branch._run_with_write_locked_target,
562
lockable, self.func_that_raises)
563
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
565
def test_callable_succeeds_but_error_during_unlock(self):
566
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
567
self.assertRaises(_ErrorFromUnlock,
568
_mod_branch._run_with_write_locked_target,
569
lockable, self.func_that_returns_ok)
570
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
572
def test_error_during_unlock_does_not_mask_original_error(self):
573
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
574
self.assertRaises(_ErrorFromCallable,
575
_mod_branch._run_with_write_locked_target,
576
lockable, self.func_that_raises)
577
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)