25
25
from StringIO import StringIO
28
branch as _mod_branch,
35
from bzrlib.branch import (
39
BranchReferenceFormat,
44
_run_with_write_locked_target,
28
from bzrlib.branch import (BzrBranch5,
30
import bzrlib.bzrdir as bzrdir
46
31
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
47
32
BzrDir, BzrDirFormat)
48
33
from bzrlib.errors import (NotBranchError,
49
34
UnknownFormatError,
51
35
UnsupportedFormatError,
54
38
from bzrlib.tests import TestCase, TestCaseWithTransport
55
39
from bzrlib.transport import get_transport
58
41
class TestDefaultFormat(TestCase):
60
def test_default_format(self):
61
# update this if you change the default branch format
62
self.assertIsInstance(BranchFormat.get_default_format(),
65
def test_default_format_is_same_as_bzrdir_default(self):
66
# XXX: it might be nice if there was only one place the default was
67
# set, but at the moment that's not true -- mbp 20070814 --
68
# https://bugs.launchpad.net/bzr/+bug/132376
69
self.assertEqual(BranchFormat.get_default_format(),
70
BzrDirFormat.get_default_format().get_branch_format())
72
43
def test_get_set_default_format(self):
73
# set the format and then set it back again
74
old_format = BranchFormat.get_default_format()
75
BranchFormat.set_default_format(SampleBranchFormat())
44
old_format = bzrlib.branch.BranchFormat.get_default_format()
46
self.assertTrue(isinstance(old_format, bzrlib.branch.BzrBranchFormat5))
47
bzrlib.branch.BranchFormat.set_default_format(SampleBranchFormat())
77
49
# the default branch format is used by the meta dir format
78
50
# which is not the default bzrdir format at this point
79
dir = BzrDirMetaFormat1().initialize('memory:///')
51
dir = BzrDirMetaFormat1().initialize('memory:/')
80
52
result = dir.create_branch()
81
53
self.assertEqual(result, 'A branch')
83
BranchFormat.set_default_format(old_format)
84
self.assertEqual(old_format, BranchFormat.get_default_format())
55
bzrlib.branch.BranchFormat.set_default_format(old_format)
56
self.assertEqual(old_format, bzrlib.branch.BranchFormat.get_default_format())
87
59
class TestBranchFormat5(TestCaseWithTransport):
107
def test_set_push_location(self):
108
from bzrlib.config import (locations_config_filename,
109
ensure_config_dir_exists)
110
ensure_config_dir_exists()
111
fn = locations_config_filename()
112
# write correct newlines to locations.conf
113
# by default ConfigObj uses native line-endings for new files
114
# but uses already existing line-endings if file is not empty
117
f.write('# comment\n')
121
branch = self.make_branch('.', format='knit')
122
branch.set_push_location('foo')
123
local_path = urlutils.local_path_from_url(branch.base[:-1])
124
self.assertFileEqual("# comment\n"
126
"push_location = foo\n"
127
"push_location:policy = norecurse\n" % local_path,
130
# TODO RBC 20051029 test getting a push location from a branch in a
131
# recursive section - that is, it appends the branch name.
134
class SampleBranchFormat(BranchFormat):
80
class TestBranchEscaping(TestCaseWithTransport):
81
"""Test a branch can be correctly stored and used on a vfat-like transport
83
Makes sure we have proper escaping of invalid characters, etc.
85
It'd be better to test all operations on the FakeVFATTransportDecorator,
86
but working trees go straight to the os not through the Transport layer.
87
Therefore we build some history first in the regular way and then
88
check it's safe to access for vfat.
95
super(TestBranchEscaping, self).setUp()
96
from bzrlib.repository import RepositoryFormatKnit1
97
bzrdir = BzrDirMetaFormat1().initialize(self.get_url())
98
repo = RepositoryFormatKnit1().initialize(bzrdir)
99
branch = bzrdir.create_branch()
100
wt = bzrdir.create_workingtree()
101
self.build_tree_contents([("foo", "contents of foo")])
102
# add file with id containing wierd characters
103
wt.add(['foo'], [self.FOO_ID])
104
wt.commit('this is my new commit', rev_id=self.REV_ID)
106
def test_branch_on_vfat(self):
107
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
108
# now access over vfat; should be safe
109
transport = FakeVFATTransportDecorator('vfat+' + self.get_url())
110
bzrdir, junk = BzrDir.open_containing_from_transport(transport)
111
branch = bzrdir.open_branch()
112
revtree = branch.repository.revision_tree(self.REV_ID)
113
contents = revtree.get_file_text(self.FOO_ID)
114
self.assertEqual(contents, 'contents of foo')
117
class SampleBranchFormat(bzrlib.branch.BranchFormat):
135
118
"""A sample format
137
120
this format is initializable, unsupported to aid in testing the
192
175
format.initialize(dir)
193
176
# register a format for it.
194
BranchFormat.register_format(format)
177
bzrlib.branch.BranchFormat.register_format(format)
195
178
# which branch.Open will refuse (not supported)
196
self.assertRaises(UnsupportedFormatError, Branch.open, self.get_url())
197
self.make_branch_and_tree('foo')
179
self.assertRaises(UnsupportedFormatError, bzrlib.branch.Branch.open, self.get_url())
198
180
# but open_downlevel will work
199
181
self.assertEqual(format.open(dir), bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
200
182
# unregister the format
201
BranchFormat.unregister_format(format)
202
self.make_branch_and_tree('bar')
205
class TestBranch67(object):
206
"""Common tests for both branch 6 and 7 which are mostly the same."""
208
def get_format_name(self):
209
raise NotImplementedError(self.get_format_name)
211
def get_format_name_subtree(self):
212
raise NotImplementedError(self.get_format_name)
215
raise NotImplementedError(self.get_class)
217
def test_creation(self):
218
format = BzrDirMetaFormat1()
219
format.set_branch_format(_mod_branch.BzrBranchFormat6())
220
branch = self.make_branch('a', format=format)
221
self.assertIsInstance(branch, self.get_class())
222
branch = self.make_branch('b', format=self.get_format_name())
223
self.assertIsInstance(branch, self.get_class())
224
branch = _mod_branch.Branch.open('a')
225
self.assertIsInstance(branch, self.get_class())
227
def test_layout(self):
228
branch = self.make_branch('a', format=self.get_format_name())
229
self.failUnlessExists('a/.bzr/branch/last-revision')
230
self.failIfExists('a/.bzr/branch/revision-history')
232
def test_config(self):
233
"""Ensure that all configuration data is stored in the branch"""
234
branch = self.make_branch('a', format=self.get_format_name())
235
branch.set_parent('http://bazaar-vcs.org')
236
self.failIfExists('a/.bzr/branch/parent')
237
self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
238
branch.set_push_location('sftp://bazaar-vcs.org')
239
config = branch.get_config()._get_branch_data_config()
240
self.assertEqual('sftp://bazaar-vcs.org',
241
config.get_user_option('push_location'))
242
branch.set_bound_location('ftp://bazaar-vcs.org')
243
self.failIfExists('a/.bzr/branch/bound')
244
self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
246
def test_set_revision_history(self):
247
builder = self.make_branch_builder('.', format=self.get_format_name())
248
builder.build_snapshot('foo', None,
249
[('add', ('', None, 'directory', None))],
251
builder.build_snapshot('bar', None, [], message='bar')
252
branch = builder.get_branch()
254
self.addCleanup(branch.unlock)
255
branch.set_revision_history(['foo', 'bar'])
256
branch.set_revision_history(['foo'])
257
self.assertRaises(errors.NotLefthandHistory,
258
branch.set_revision_history, ['bar'])
260
def do_checkout_test(self, lightweight=False):
261
tree = self.make_branch_and_tree('source',
262
format=self.get_format_name_subtree())
263
subtree = self.make_branch_and_tree('source/subtree',
264
format=self.get_format_name_subtree())
265
subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
266
format=self.get_format_name_subtree())
267
self.build_tree(['source/subtree/file',
268
'source/subtree/subsubtree/file'])
269
subsubtree.add('file')
271
subtree.add_reference(subsubtree)
272
tree.add_reference(subtree)
273
tree.commit('a revision')
274
subtree.commit('a subtree file')
275
subsubtree.commit('a subsubtree file')
276
tree.branch.create_checkout('target', lightweight=lightweight)
277
self.failUnlessExists('target')
278
self.failUnlessExists('target/subtree')
279
self.failUnlessExists('target/subtree/file')
280
self.failUnlessExists('target/subtree/subsubtree/file')
281
subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
283
self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
285
self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
287
def test_checkout_with_references(self):
288
self.do_checkout_test()
290
def test_light_checkout_with_references(self):
291
self.do_checkout_test(lightweight=True)
293
def test_set_push(self):
294
branch = self.make_branch('source', format=self.get_format_name())
295
branch.get_config().set_user_option('push_location', 'old',
296
store=config.STORE_LOCATION)
299
warnings.append(args[0] % args[1:])
300
_warning = trace.warning
301
trace.warning = warning
303
branch.set_push_location('new')
305
trace.warning = _warning
306
self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
310
class TestBranch6(TestBranch67, TestCaseWithTransport):
313
return _mod_branch.BzrBranch6
315
def get_format_name(self):
316
return "dirstate-tags"
318
def get_format_name_subtree(self):
319
return "dirstate-with-subtree"
321
def test_set_stacked_on_url_errors(self):
322
branch = self.make_branch('a', format=self.get_format_name())
323
self.assertRaises(errors.UnstackableBranchFormat,
324
branch.set_stacked_on_url, None)
326
def test_default_stacked_location(self):
327
branch = self.make_branch('a', format=self.get_format_name())
328
self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
331
class TestBranch7(TestBranch67, TestCaseWithTransport):
334
return _mod_branch.BzrBranch7
336
def get_format_name(self):
339
def get_format_name_subtree(self):
340
return "development-subtree"
342
def test_set_stacked_on_url_unstackable_repo(self):
343
repo = self.make_repository('a', format='dirstate-tags')
344
control = repo.bzrdir
345
branch = _mod_branch.BzrBranchFormat7().initialize(control)
346
target = self.make_branch('b')
347
self.assertRaises(errors.UnstackableRepositoryFormat,
348
branch.set_stacked_on_url, target.base)
350
def test_clone_stacked_on_unstackable_repo(self):
351
repo = self.make_repository('a', format='dirstate-tags')
352
control = repo.bzrdir
353
branch = _mod_branch.BzrBranchFormat7().initialize(control)
354
# Calling clone should not raise UnstackableRepositoryFormat.
355
cloned_bzrdir = control.clone('cloned')
357
def _test_default_stacked_location(self):
358
branch = self.make_branch('a', format=self.get_format_name())
359
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
361
def test_stack_and_unstack(self):
362
branch = self.make_branch('a', format=self.get_format_name())
363
target = self.make_branch_and_tree('b', format=self.get_format_name())
364
branch.set_stacked_on_url(target.branch.base)
365
self.assertEqual(target.branch.base, branch.get_stacked_on_url())
366
revid = target.commit('foo')
367
self.assertTrue(branch.repository.has_revision(revid))
368
branch.set_stacked_on_url(None)
369
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
370
self.assertFalse(branch.repository.has_revision(revid))
372
def test_open_opens_stacked_reference(self):
373
branch = self.make_branch('a', format=self.get_format_name())
374
target = self.make_branch_and_tree('b', format=self.get_format_name())
375
branch.set_stacked_on_url(target.branch.base)
376
branch = branch.bzrdir.open_branch()
377
revid = target.commit('foo')
378
self.assertTrue(branch.repository.has_revision(revid))
183
bzrlib.branch.BranchFormat.unregister_format(format)
381
186
class TestBranchReference(TestCaseWithTransport):
390
195
target_branch = dir.create_branch()
391
196
t.mkdir('branch')
392
197
branch_dir = bzrdirformat.initialize(self.get_url('branch'))
393
made_branch = BranchReferenceFormat().initialize(branch_dir, target_branch)
198
made_branch = bzrlib.branch.BranchReferenceFormat().initialize(branch_dir, target_branch)
394
199
self.assertEqual(made_branch.base, target_branch.base)
395
200
opened_branch = branch_dir.open_branch()
396
201
self.assertEqual(opened_branch.base, target_branch.base)
398
def test_get_reference(self):
399
"""For a BranchReference, get_reference should reutrn the location."""
400
branch = self.make_branch('target')
401
checkout = branch.create_checkout('checkout', lightweight=True)
402
reference_url = branch.bzrdir.root_transport.abspath('') + '/'
403
# if the api for create_checkout changes to return different checkout types
404
# then this file read will fail.
405
self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
406
self.assertEqual(reference_url,
407
_mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
410
class TestHooks(TestCase):
412
def test_constructor(self):
413
"""Check that creating a BranchHooks instance has the right defaults."""
414
hooks = BranchHooks()
415
self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
416
self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
417
self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
418
self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
419
self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
420
self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
421
self.assertTrue("post_change_branch_tip" in hooks,
422
"post_change_branch_tip not in %s" % hooks)
424
def test_installed_hooks_are_BranchHooks(self):
425
"""The installed hooks object should be a BranchHooks."""
426
# the installed hooks are saved in self._preserved_hooks.
427
self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
430
class TestPullResult(TestCase):
432
def test_pull_result_to_int(self):
433
# to support old code, the pull result can be used as an int
437
# this usage of results is not recommended for new code (because it
438
# doesn't describe very well what happened), but for api stability
439
# it's still supported
440
a = "%d revisions pulled" % r
441
self.assertEqual(a, "10 revisions pulled")
445
class _StubLockable(object):
446
"""Helper for TestRunWithWriteLockedTarget."""
448
def __init__(self, calls, unlock_exc=None):
450
self.unlock_exc = unlock_exc
452
def lock_write(self):
453
self.calls.append('lock_write')
456
self.calls.append('unlock')
457
if self.unlock_exc is not None:
458
raise self.unlock_exc
461
class _ErrorFromCallable(Exception):
462
"""Helper for TestRunWithWriteLockedTarget."""
465
class _ErrorFromUnlock(Exception):
466
"""Helper for TestRunWithWriteLockedTarget."""
469
class TestRunWithWriteLockedTarget(TestCase):
470
"""Tests for _run_with_write_locked_target."""
475
def func_that_returns_ok(self):
476
self._calls.append('func called')
479
def func_that_raises(self):
480
self._calls.append('func called')
481
raise _ErrorFromCallable()
483
def test_success_unlocks(self):
484
lockable = _StubLockable(self._calls)
485
result = _run_with_write_locked_target(
486
lockable, self.func_that_returns_ok)
487
self.assertEqual('ok', result)
488
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
490
def test_exception_unlocks_and_propagates(self):
491
lockable = _StubLockable(self._calls)
492
self.assertRaises(_ErrorFromCallable,
493
_run_with_write_locked_target, lockable, self.func_that_raises)
494
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
496
def test_callable_succeeds_but_error_during_unlock(self):
497
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
498
self.assertRaises(_ErrorFromUnlock,
499
_run_with_write_locked_target, lockable, self.func_that_returns_ok)
500
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
502
def test_error_during_unlock_does_not_mask_original_error(self):
503
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
504
self.assertRaises(_ErrorFromCallable,
505
_run_with_write_locked_target, lockable, self.func_that_raises)
506
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)