13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for the Branch facility that are not interface tests.
19
For interface tests see tests/per_branch/*.py.
19
For interface tests see tests/branch_implementations/*.py.
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
65
64
def test_default_format_is_same_as_bzrdir_default(self):
66
65
# XXX: it might be nice if there was only one place the default was
67
# set, but at the moment that's not true -- mbp 20070814 --
66
# set, but at the moment that's not true -- mbp 20070814 --
68
67
# https://bugs.launchpad.net/bzr/+bug/132376
69
68
self.assertEqual(BranchFormat.get_default_format(),
70
69
BzrDirFormat.get_default_format().get_branch_format())
161
160
def test_find_format(self):
162
161
# is the right format object found for a branch?
163
162
# create a branch with a few known format objects.
164
# this is not quite the same as
163
# this is not quite the same as
165
164
self.build_tree(["foo/", "bar/"])
166
165
def check_format(format, url):
167
166
dir = format._matchingbzrdir.initialize(url)
379
377
self.assertTrue(branch.repository.has_revision(revid))
382
class BzrBranch8(TestCaseWithTransport):
384
def make_branch(self, location, format=None):
386
format = bzrdir.format_registry.make_bzrdir('1.9')
387
format.set_branch_format(_mod_branch.BzrBranchFormat8())
388
return TestCaseWithTransport.make_branch(self, location, format=format)
390
def create_branch_with_reference(self):
391
branch = self.make_branch('branch')
392
branch._set_all_reference_info({'file-id': ('path', 'location')})
396
def instrument_branch(branch, gets):
397
old_get = branch._transport.get
398
def get(*args, **kwargs):
399
gets.append((args, kwargs))
400
return old_get(*args, **kwargs)
401
branch._transport.get = get
403
def test_reference_info_caching_read_locked(self):
405
branch = self.create_branch_with_reference()
407
self.addCleanup(branch.unlock)
408
self.instrument_branch(branch, gets)
409
branch.get_reference_info('file-id')
410
branch.get_reference_info('file-id')
411
self.assertEqual(1, len(gets))
413
def test_reference_info_caching_read_unlocked(self):
415
branch = self.create_branch_with_reference()
416
self.instrument_branch(branch, gets)
417
branch.get_reference_info('file-id')
418
branch.get_reference_info('file-id')
419
self.assertEqual(2, len(gets))
421
def test_reference_info_caching_write_locked(self):
423
branch = self.make_branch('branch')
425
self.instrument_branch(branch, gets)
426
self.addCleanup(branch.unlock)
427
branch._set_all_reference_info({'file-id': ('path2', 'location2')})
428
path, location = branch.get_reference_info('file-id')
429
self.assertEqual(0, len(gets))
430
self.assertEqual('path2', path)
431
self.assertEqual('location2', location)
433
def test_reference_info_caches_cleared(self):
434
branch = self.make_branch('branch')
436
branch.set_reference_info('file-id', 'path2', 'location2')
438
doppelganger = Branch.open('branch')
439
doppelganger.set_reference_info('file-id', 'path3', 'location3')
440
self.assertEqual(('path3', 'location3'),
441
branch.get_reference_info('file-id'))
443
380
class TestBranchReference(TestCaseWithTransport):
444
381
"""Tests for the branch reference facility."""
502
438
# it's still supported
503
439
a = "%d revisions pulled" % r
504
440
self.assertEqual(a, "10 revisions pulled")
508
class _StubLockable(object):
509
"""Helper for TestRunWithWriteLockedTarget."""
511
def __init__(self, calls, unlock_exc=None):
513
self.unlock_exc = unlock_exc
515
def lock_write(self):
516
self.calls.append('lock_write')
519
self.calls.append('unlock')
520
if self.unlock_exc is not None:
521
raise self.unlock_exc
524
class _ErrorFromCallable(Exception):
525
"""Helper for TestRunWithWriteLockedTarget."""
528
class _ErrorFromUnlock(Exception):
529
"""Helper for TestRunWithWriteLockedTarget."""
532
class TestRunWithWriteLockedTarget(TestCase):
533
"""Tests for _run_with_write_locked_target."""
539
def func_that_returns_ok(self):
540
self._calls.append('func called')
543
def func_that_raises(self):
544
self._calls.append('func called')
545
raise _ErrorFromCallable()
547
def test_success_unlocks(self):
548
lockable = _StubLockable(self._calls)
549
result = _run_with_write_locked_target(
550
lockable, self.func_that_returns_ok)
551
self.assertEqual('ok', result)
552
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
554
def test_exception_unlocks_and_propagates(self):
555
lockable = _StubLockable(self._calls)
556
self.assertRaises(_ErrorFromCallable,
557
_run_with_write_locked_target, lockable, self.func_that_raises)
558
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
560
def test_callable_succeeds_but_error_during_unlock(self):
561
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
562
self.assertRaises(_ErrorFromUnlock,
563
_run_with_write_locked_target, lockable, self.func_that_returns_ok)
564
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
566
def test_error_during_unlock_does_not_mask_original_error(self):
567
lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
568
self.assertRaises(_ErrorFromCallable,
569
_run_with_write_locked_target, lockable, self.func_that_raises)
570
self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)