13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for the Branch facility that are not interface tests.
19
For interface tests see tests/per_branch/*.py.
19
For interface tests see tests/branch_implementations/*.py.
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
25
from cStringIO import StringIO
28
branch as _mod_branch,
39
class TestDefaultFormat(tests.TestCase):
41
def test_default_format(self):
42
# update this if you change the default branch format
43
self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
44
_mod_branch.BzrBranchFormat7)
46
def test_default_format_is_same_as_bzrdir_default(self):
47
# XXX: it might be nice if there was only one place the default was
48
# set, but at the moment that's not true -- mbp 20070814 --
49
# https://bugs.launchpad.net/bzr/+bug/132376
51
_mod_branch.BranchFormat.get_default_format(),
52
bzrdir.BzrDirFormat.get_default_format().get_branch_format())
25
from StringIO import StringIO
28
from bzrlib.branch import (BzrBranch5,
30
import bzrlib.bzrdir as bzrdir
31
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
33
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
41
class TestDefaultFormat(TestCase):
54
43
def test_get_set_default_format(self):
55
# set the format and then set it back again
56
old_format = _mod_branch.BranchFormat.get_default_format()
57
_mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
44
old_format = bzrlib.branch.BranchFormat.get_default_format()
46
self.assertTrue(isinstance(old_format, bzrlib.branch.BzrBranchFormat5))
47
bzrlib.branch.BranchFormat.set_default_format(SampleBranchFormat())
59
49
# the default branch format is used by the meta dir format
60
50
# which is not the default bzrdir format at this point
61
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
51
dir = BzrDirMetaFormat1().initialize('memory:///')
62
52
result = dir.create_branch()
63
53
self.assertEqual(result, 'A branch')
65
_mod_branch.BranchFormat.set_default_format(old_format)
66
self.assertEqual(old_format,
67
_mod_branch.BranchFormat.get_default_format())
70
class TestBranchFormat5(tests.TestCaseWithTransport):
55
bzrlib.branch.BranchFormat.set_default_format(old_format)
56
self.assertEqual(old_format, bzrlib.branch.BranchFormat.get_default_format())
59
class TestBranchFormat5(TestCaseWithTransport):
71
60
"""Tests specific to branch format 5"""
73
62
def test_branch_format_5_uses_lockdir(self):
74
63
url = self.get_url()
75
bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
bdir.create_repository()
77
branch = bdir.create_branch()
64
bzrdir = BzrDirMetaFormat1().initialize(url)
65
bzrdir.create_repository()
66
branch = bzrdir.create_branch()
78
67
t = self.get_transport()
79
68
self.log("branch instance is %r" % branch)
80
self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
69
self.assert_(isinstance(branch, BzrBranch5))
81
70
self.assertIsDirectory('.', t)
82
71
self.assertIsDirectory('.bzr/branch', t)
83
72
self.assertIsDirectory('.bzr/branch/lock', t)
84
73
branch.lock_write()
85
self.addCleanup(branch.unlock)
86
self.assertIsDirectory('.bzr/branch/lock/held', t)
88
def test_set_push_location(self):
89
conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
91
branch = self.make_branch('.', format='knit')
92
branch.set_push_location('foo')
93
local_path = urlutils.local_path_from_url(branch.base[:-1])
94
self.assertFileEqual("# comment\n"
96
"push_location = foo\n"
97
"push_location:policy = norecurse\n" % local_path,
98
config.locations_config_filename())
100
# TODO RBC 20051029 test getting a push location from a branch in a
101
# recursive section - that is, it appends the branch name.
104
class SampleBranchFormat(_mod_branch.BranchFormat):
75
self.assertIsDirectory('.bzr/branch/lock/held', t)
80
class TestBranchEscaping(TestCaseWithTransport):
81
"""Test a branch can be correctly stored and used on a vfat-like transport
83
Makes sure we have proper escaping of invalid characters, etc.
85
It'd be better to test all operations on the FakeVFATTransportDecorator,
86
but working trees go straight to the os not through the Transport layer.
87
Therefore we build some history first in the regular way and then
88
check it's safe to access for vfat.
95
super(TestBranchEscaping, self).setUp()
96
from bzrlib.repository import RepositoryFormatKnit1
97
bzrdir = BzrDirMetaFormat1().initialize(self.get_url())
98
repo = RepositoryFormatKnit1().initialize(bzrdir)
99
branch = bzrdir.create_branch()
100
wt = bzrdir.create_workingtree()
101
self.build_tree_contents([("foo", "contents of foo")])
102
# add file with id containing wierd characters
103
wt.add(['foo'], [self.FOO_ID])
104
wt.commit('this is my new commit', rev_id=self.REV_ID)
106
def test_branch_on_vfat(self):
107
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
108
# now access over vfat; should be safe
109
transport = FakeVFATTransportDecorator('vfat+' + self.get_url())
110
bzrdir, junk = BzrDir.open_containing_from_transport(transport)
111
branch = bzrdir.open_branch()
112
revtree = branch.repository.revision_tree(self.REV_ID)
113
contents = revtree.get_file_text(self.FOO_ID)
114
self.assertEqual(contents, 'contents of foo')
117
class SampleBranchFormat(bzrlib.branch.BranchFormat):
105
118
"""A sample format
107
this format is initializable, unsupported to aid in testing the
120
this format is initializable, unsupported to aid in testing the
108
121
open and open_downlevel routines.
194
175
format.initialize(dir)
195
176
# register a format for it.
196
_mod_branch.BranchFormat.register_format(format)
177
bzrlib.branch.BranchFormat.register_format(format)
197
178
# which branch.Open will refuse (not supported)
198
self.assertRaises(errors.UnsupportedFormatError,
199
_mod_branch.Branch.open, self.get_url())
200
self.make_branch_and_tree('foo')
179
self.assertRaises(UnsupportedFormatError, bzrlib.branch.Branch.open, self.get_url())
201
180
# but open_downlevel will work
204
bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
181
self.assertEqual(format.open(dir), bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
205
182
# unregister the format
206
_mod_branch.BranchFormat.unregister_format(format)
207
self.make_branch_and_tree('bar')
210
#Used by TestMetaDirBranchFormatFactory
211
FakeLazyFormat = None
214
class TestMetaDirBranchFormatFactory(tests.TestCase):
216
def test_get_format_string_does_not_load(self):
217
"""Formats have a static format string."""
218
factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
219
self.assertEqual("yo", factory.get_format_string())
221
def test_call_loads(self):
222
# __call__ is used by the network_format_registry interface to get a
224
global FakeLazyFormat
226
factory = _mod_branch.MetaDirBranchFormatFactory(None,
227
"bzrlib.tests.test_branch", "FakeLazyFormat")
228
self.assertRaises(AttributeError, factory)
230
def test_call_returns_call_of_referenced_object(self):
231
global FakeLazyFormat
232
FakeLazyFormat = lambda:'called'
233
factory = _mod_branch.MetaDirBranchFormatFactory(None,
234
"bzrlib.tests.test_branch", "FakeLazyFormat")
235
self.assertEqual('called', factory())
238
class TestBranch67(object):
239
"""Common tests for both branch 6 and 7 which are mostly the same."""
241
def get_format_name(self):
242
raise NotImplementedError(self.get_format_name)
244
def get_format_name_subtree(self):
245
raise NotImplementedError(self.get_format_name)
248
raise NotImplementedError(self.get_class)
250
def test_creation(self):
251
format = bzrdir.BzrDirMetaFormat1()
252
format.set_branch_format(_mod_branch.BzrBranchFormat6())
253
branch = self.make_branch('a', format=format)
254
self.assertIsInstance(branch, self.get_class())
255
branch = self.make_branch('b', format=self.get_format_name())
256
self.assertIsInstance(branch, self.get_class())
257
branch = _mod_branch.Branch.open('a')
258
self.assertIsInstance(branch, self.get_class())
260
def test_layout(self):
261
branch = self.make_branch('a', format=self.get_format_name())
262
self.failUnlessExists('a/.bzr/branch/last-revision')
263
self.failIfExists('a/.bzr/branch/revision-history')
264
self.failIfExists('a/.bzr/branch/references')
266
def test_config(self):
267
"""Ensure that all configuration data is stored in the branch"""
268
branch = self.make_branch('a', format=self.get_format_name())
269
branch.set_parent('http://bazaar-vcs.org')
270
self.failIfExists('a/.bzr/branch/parent')
271
self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
272
branch.set_push_location('sftp://bazaar-vcs.org')
273
config = branch.get_config()._get_branch_data_config()
274
self.assertEqual('sftp://bazaar-vcs.org',
275
config.get_user_option('push_location'))
276
branch.set_bound_location('ftp://bazaar-vcs.org')
277
self.failIfExists('a/.bzr/branch/bound')
278
self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
280
def test_set_revision_history(self):
281
builder = self.make_branch_builder('.', format=self.get_format_name())
282
builder.build_snapshot('foo', None,
283
[('add', ('', None, 'directory', None))],
285
builder.build_snapshot('bar', None, [], message='bar')
286
branch = builder.get_branch()
288
self.addCleanup(branch.unlock)
289
branch.set_revision_history(['foo', 'bar'])
290
branch.set_revision_history(['foo'])
291
self.assertRaises(errors.NotLefthandHistory,
292
branch.set_revision_history, ['bar'])
294
def do_checkout_test(self, lightweight=False):
295
tree = self.make_branch_and_tree('source',
296
format=self.get_format_name_subtree())
297
subtree = self.make_branch_and_tree('source/subtree',
298
format=self.get_format_name_subtree())
299
subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
300
format=self.get_format_name_subtree())
301
self.build_tree(['source/subtree/file',
302
'source/subtree/subsubtree/file'])
303
subsubtree.add('file')
305
subtree.add_reference(subsubtree)
306
tree.add_reference(subtree)
307
tree.commit('a revision')
308
subtree.commit('a subtree file')
309
subsubtree.commit('a subsubtree file')
310
tree.branch.create_checkout('target', lightweight=lightweight)
311
self.failUnlessExists('target')
312
self.failUnlessExists('target/subtree')
313
self.failUnlessExists('target/subtree/file')
314
self.failUnlessExists('target/subtree/subsubtree/file')
315
subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
317
self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
319
self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
321
def test_checkout_with_references(self):
322
self.do_checkout_test()
324
def test_light_checkout_with_references(self):
325
self.do_checkout_test(lightweight=True)
327
def test_set_push(self):
328
branch = self.make_branch('source', format=self.get_format_name())
329
branch.get_config().set_user_option('push_location', 'old',
330
store=config.STORE_LOCATION)
333
warnings.append(args[0] % args[1:])
334
_warning = trace.warning
335
trace.warning = warning
337
branch.set_push_location('new')
339
trace.warning = _warning
340
self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
344
class TestBranch6(TestBranch67, tests.TestCaseWithTransport):
347
return _mod_branch.BzrBranch6
349
def get_format_name(self):
350
return "dirstate-tags"
352
def get_format_name_subtree(self):
353
return "dirstate-with-subtree"
355
def test_set_stacked_on_url_errors(self):
356
branch = self.make_branch('a', format=self.get_format_name())
357
self.assertRaises(errors.UnstackableBranchFormat,
358
branch.set_stacked_on_url, None)
360
def test_default_stacked_location(self):
361
branch = self.make_branch('a', format=self.get_format_name())
362
self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
365
class TestBranch7(TestBranch67, tests.TestCaseWithTransport):
368
return _mod_branch.BzrBranch7
370
def get_format_name(self):
373
def get_format_name_subtree(self):
374
return "development-subtree"
376
def test_set_stacked_on_url_unstackable_repo(self):
377
repo = self.make_repository('a', format='dirstate-tags')
378
control = repo.bzrdir
379
branch = _mod_branch.BzrBranchFormat7().initialize(control)
380
target = self.make_branch('b')
381
self.assertRaises(errors.UnstackableRepositoryFormat,
382
branch.set_stacked_on_url, target.base)
384
def test_clone_stacked_on_unstackable_repo(self):
385
repo = self.make_repository('a', format='dirstate-tags')
386
control = repo.bzrdir
387
branch = _mod_branch.BzrBranchFormat7().initialize(control)
388
# Calling clone should not raise UnstackableRepositoryFormat.
389
cloned_bzrdir = control.clone('cloned')
391
def _test_default_stacked_location(self):
392
branch = self.make_branch('a', format=self.get_format_name())
393
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
395
def test_stack_and_unstack(self):
396
branch = self.make_branch('a', format=self.get_format_name())
397
target = self.make_branch_and_tree('b', format=self.get_format_name())
398
branch.set_stacked_on_url(target.branch.base)
399
self.assertEqual(target.branch.base, branch.get_stacked_on_url())
400
revid = target.commit('foo')
401
self.assertTrue(branch.repository.has_revision(revid))
402
branch.set_stacked_on_url(None)
403
self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
404
self.assertFalse(branch.repository.has_revision(revid))
406
def test_open_opens_stacked_reference(self):
407
branch = self.make_branch('a', format=self.get_format_name())
408
target = self.make_branch_and_tree('b', format=self.get_format_name())
409
branch.set_stacked_on_url(target.branch.base)
410
branch = branch.bzrdir.open_branch()
411
revid = target.commit('foo')
412
self.assertTrue(branch.repository.has_revision(revid))
415
class BzrBranch8(tests.TestCaseWithTransport):
417
def make_branch(self, location, format=None):
419
format = bzrdir.format_registry.make_bzrdir('1.9')
420
format.set_branch_format(_mod_branch.BzrBranchFormat8())
421
return tests.TestCaseWithTransport.make_branch(
422
self, location, format=format)
424
def create_branch_with_reference(self):
425
branch = self.make_branch('branch')
426
branch._set_all_reference_info({'file-id': ('path', 'location')})
430
def instrument_branch(branch, gets):
431
old_get = branch._transport.get
432
def get(*args, **kwargs):
433
gets.append((args, kwargs))
434
return old_get(*args, **kwargs)
435
branch._transport.get = get
437
def test_reference_info_caching_read_locked(self):
439
branch = self.create_branch_with_reference()
441
self.addCleanup(branch.unlock)
442
self.instrument_branch(branch, gets)
443
branch.get_reference_info('file-id')
444
branch.get_reference_info('file-id')
445
self.assertEqual(1, len(gets))
447
def test_reference_info_caching_read_unlocked(self):
449
branch = self.create_branch_with_reference()
450
self.instrument_branch(branch, gets)
451
branch.get_reference_info('file-id')
452
branch.get_reference_info('file-id')
453
self.assertEqual(2, len(gets))
455
def test_reference_info_caching_write_locked(self):
457
branch = self.make_branch('branch')
459
self.instrument_branch(branch, gets)
460
self.addCleanup(branch.unlock)
461
branch._set_all_reference_info({'file-id': ('path2', 'location2')})
462
path, location = branch.get_reference_info('file-id')
463
self.assertEqual(0, len(gets))
464
self.assertEqual('path2', path)
465
self.assertEqual('location2', location)
467
def test_reference_info_caches_cleared(self):
468
branch = self.make_branch('branch')
470
branch.set_reference_info('file-id', 'path2', 'location2')
472
doppelganger = _mod_branch.Branch.open('branch')
473
doppelganger.set_reference_info('file-id', 'path3', 'location3')
474
self.assertEqual(('path3', 'location3'),
475
branch.get_reference_info('file-id'))
477
class TestBranchReference(tests.TestCaseWithTransport):
183
bzrlib.branch.BranchFormat.unregister_format(format)
186
class TestBranchReference(TestCaseWithTransport):
478
187
"""Tests for the branch reference facility."""
480
189
def test_create_open_reference(self):
481
190
bzrdirformat = bzrdir.BzrDirMetaFormat1()
482
t = transport.get_transport(self.get_url('.'))
191
t = get_transport(self.get_url('.'))
484
193
dir = bzrdirformat.initialize(self.get_url('repo'))
485
194
dir.create_repository()
486
195
target_branch = dir.create_branch()
487
196
t.mkdir('branch')
488
197
branch_dir = bzrdirformat.initialize(self.get_url('branch'))
489
made_branch = _mod_branch.BranchReferenceFormat().initialize(
490
branch_dir, target_branch=target_branch)
198
made_branch = bzrlib.branch.BranchReferenceFormat().initialize(branch_dir, target_branch)
491
199
self.assertEqual(made_branch.base, target_branch.base)
492
200
opened_branch = branch_dir.open_branch()
493
201
self.assertEqual(opened_branch.base, target_branch.base)
495
def test_get_reference(self):
496
"""For a BranchReference, get_reference should reutrn the location."""
497
branch = self.make_branch('target')
498
checkout = branch.create_checkout('checkout', lightweight=True)
499
reference_url = branch.bzrdir.root_transport.abspath('') + '/'
500
# if the api for create_checkout changes to return different checkout types
501
# then this file read will fail.
502
self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
503
self.assertEqual(reference_url,
504
_mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
507
class TestHooks(tests.TestCaseWithTransport):
509
def test_constructor(self):
510
"""Check that creating a BranchHooks instance has the right defaults."""
511
hooks = _mod_branch.BranchHooks()
512
self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
513
self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
514
self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
515
self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
516
self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
517
self.assertTrue("post_uncommit" in hooks,
518
"post_uncommit not in %s" % hooks)
519
self.assertTrue("post_change_branch_tip" in hooks,
520
"post_change_branch_tip not in %s" % hooks)
521
self.assertTrue("post_branch_init" in hooks,
522
"post_branch_init not in %s" % hooks)
523
self.assertTrue("post_switch" in hooks,
524
"post_switch not in %s" % hooks)
526
def test_installed_hooks_are_BranchHooks(self):
527
"""The installed hooks object should be a BranchHooks."""
528
# the installed hooks are saved in self._preserved_hooks.
529
self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
530
_mod_branch.BranchHooks)
532
def test_post_branch_init_hook(self):
534
_mod_branch.Branch.hooks.install_named_hook('post_branch_init',
536
self.assertLength(0, calls)
537
branch = self.make_branch('a')
538
self.assertLength(1, calls)
540
self.assertIsInstance(params, _mod_branch.BranchInitHookParams)
541
self.assertTrue(hasattr(params, 'bzrdir'))
542
self.assertTrue(hasattr(params, 'branch'))
544
def test_post_branch_init_hook_repr(self):
546
_mod_branch.Branch.hooks.install_named_hook('post_branch_init',
547
lambda params: param_reprs.append(repr(params)), None)
548
branch = self.make_branch('a')
549
self.assertLength(1, param_reprs)
550
param_repr = param_reprs[0]
551
self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
553
def test_post_switch_hook(self):
554
from bzrlib import switch
556
_mod_branch.Branch.hooks.install_named_hook('post_switch',
558
tree = self.make_branch_and_tree('branch-1')
559
self.build_tree(['branch-1/file-1'])
562
to_branch = tree.bzrdir.sprout('branch-2').open_branch()
563
self.build_tree(['branch-1/file-2'])
565
tree.remove('file-1')
567
checkout = tree.branch.create_checkout('checkout')
568
self.assertLength(0, calls)
569
switch.switch(checkout.bzrdir, to_branch)
570
self.assertLength(1, calls)
572
self.assertIsInstance(params, _mod_branch.SwitchHookParams)
573
self.assertTrue(hasattr(params, 'to_branch'))
574
self.assertTrue(hasattr(params, 'revision_id'))
577
class TestBranchOptions(tests.TestCaseWithTransport):
580
super(TestBranchOptions, self).setUp()
581
self.branch = self.make_branch('.')
582
self.config = self.branch.get_config()
584
def check_append_revisions_only(self, expected_value, value=None):
585
"""Set append_revisions_only in config and check its interpretation."""
586
if value is not None:
587
self.config.set_user_option('append_revisions_only', value)
588
self.assertEqual(expected_value,
589
self.branch._get_append_revisions_only())
591
def test_valid_append_revisions_only(self):
592
self.assertEquals(None,
593
self.config.get_user_option('append_revisions_only'))
594
self.check_append_revisions_only(None)
595
self.check_append_revisions_only(False, 'False')
596
self.check_append_revisions_only(True, 'True')
597
# The following values will cause compatibility problems on projects
598
# using older bzr versions (<2.2) but are accepted
599
self.check_append_revisions_only(False, 'false')
600
self.check_append_revisions_only(True, 'true')
602
def test_invalid_append_revisions_only(self):
603
"""Ensure warning is noted on invalid settings"""
606
self.warnings.append(args[0] % args[1:])
607
self.overrideAttr(trace, 'warning', warning)
608
self.check_append_revisions_only(None, 'not-a-bool')
609
self.assertLength(1, self.warnings)
611
'Value "not-a-bool" is not a boolean for "append_revisions_only"',
615
class TestPullResult(tests.TestCase):
617
def test_pull_result_to_int(self):
618
# to support old code, the pull result can be used as an int
619
r = _mod_branch.PullResult()
622
# this usage of results is not recommended for new code (because it
623
# doesn't describe very well what happened), but for api stability
624
# it's still supported
625
a = "%d revisions pulled" % r
626
self.assertEqual(a, "10 revisions pulled")
628
def test_report_changed(self):
629
r = _mod_branch.PullResult()
630
r.old_revid = "old-revid"
632
r.new_revid = "new-revid"
636
self.assertEqual("Now on revision 20.\n", f.getvalue())
638
def test_report_unchanged(self):
639
r = _mod_branch.PullResult()
640
r.old_revid = "same-revid"
641
r.new_revid = "same-revid"
644
self.assertEqual("No revisions to pull.\n", f.getvalue())
647
class _StubLockable(object):
648
"""Helper for TestRunWithWriteLockedTarget."""
650
def __init__(self, calls, unlock_exc=None):
652
self.unlock_exc = unlock_exc
654
def lock_write(self):
655
self.calls.append('lock_write')
658
self.calls.append('unlock')
659
if self.unlock_exc is not None:
660
raise self.unlock_exc
663
class _ErrorFromCallable(Exception):
664
"""Helper for TestRunWithWriteLockedTarget."""
667
class _ErrorFromUnlock(Exception):
668
"""Helper for TestRunWithWriteLockedTarget."""
671
class TestRunWithWriteLockedTarget(tests.TestCase):
672
"""Tests for _run_with_write_locked_target."""
675
tests.TestCase.setUp(self)
678
def func_that_returns_ok(self):
679
self._calls.append('func called')
682
def func_that_raises(self):
683
self._calls.append('func called')
684
raise _ErrorFromCallable()
686
def test_success_unlocks(self):
687
lockable = _StubLockable(self._calls)
688
result = _mod_branch._run_with_write_locked_target(
689
lockable, self.func_that_returns_ok)
690
self.assertEqual('ok', result)
691
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
693
def test_exception_unlocks_and_propagates(self):
694
lockable = _StubLockable(self._calls)
695
self.assertRaises(_ErrorFromCallable,
696
_mod_branch._run_with_write_locked_target,
697
lockable, self.func_that_raises)
698
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
700
def test_callable_succeeds_but_error_during_unlock(self):
701
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
702
self.assertRaises(_ErrorFromUnlock,
703
_mod_branch._run_with_write_locked_target,
704
lockable, self.func_that_returns_ok)
705
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
707
def test_error_during_unlock_does_not_mask_original_error(self):
708
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
709
self.assertRaises(_ErrorFromCallable,
710
_mod_branch._run_with_write_locked_target,
711
lockable, self.func_that_raises)
712
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)