42
42
bcf = b.control_files
43
43
rcf = getattr(b.repository, 'control_files', None)
46
"This tests depends on being able to instrument "
47
"repository.control_files, but %r doesn't have control_files."
50
# Look out for branch types that reuse their control files
51
self.combined_control = bcf is rcf
53
b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
54
b.repository.control_files = \
55
LockWrapper(self.locks, b.repository.control_files, 'rc')
45
self.combined_branch = False
47
# Look out for branch types that reuse their control files
48
self.combined_control = bcf is rcf
50
b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
51
except AttributeError:
52
# RemoteBranch seems to trigger this.
53
raise TestSkipped("Could not instrument branch control files.")
54
if self.combined_control:
55
# instrument the repository control files too to ensure its worked
56
# with correctly. When they are not shared, we trust the repository
57
# API and only instrument the repository itself.
58
b.repository.control_files = \
59
LockWrapper(self.locks, b.repository.control_files, 'rc')
58
62
def test_01_lock_read(self):
70
74
self.assertFalse(b.is_locked())
71
75
self.assertFalse(b.repository.is_locked())
73
self.assertEqual([('b', 'lr', True),
77
if self.combined_control:
78
self.assertEqual([('b', 'lr', True),
88
self.assertEqual([('b', 'lr', True),
83
96
def test_02_lock_write(self):
84
97
# Test that locking occurs in the correct order
95
108
self.assertFalse(b.is_locked())
96
109
self.assertFalse(b.repository.is_locked())
98
self.assertEqual([('b', 'lw', True),
111
if self.combined_control:
112
self.assertEqual([('b', 'lw', True),
122
self.assertEqual([('b', 'lw', True),
108
130
def test_03_lock_fail_unlock_repo(self):
109
131
# Make sure branch.unlock() is called, even if there is a
127
149
# We unlock the branch control files, even if
128
150
# we fail to unlock the repository
129
self.assertEqual([('b', 'lw', True),
151
if self.combined_control:
152
self.assertEqual([('b', 'lw', True),
161
self.assertEqual([('b', 'lw', True),
139
170
# For cleanup purposes, make sure we are unlocked
160
191
# We unlock the repository even if
161
192
# we fail to unlock the control files
162
self.assertEqual([('b', 'lw', True),
193
if self.combined_control:
194
self.assertEqual([('b', 'lw', True),
204
self.assertEqual([('b', 'lw', True),
173
213
# For cleanup purposes, make sure we are unlocked
208
248
self.assertFalse(b.is_locked())
209
249
self.assertFalse(b.repository.is_locked())
211
self.assertEqual([('b', 'lr', True),
251
if self.combined_control:
252
self.assertEqual([('b', 'lr', True),
260
self.assertEqual([('b', 'lr', True),
219
266
def test_08_lock_write_fail_control(self):
220
267
# Test the repository is unlocked if we can't lock self
224
271
self.assertRaises(TestPreventLocking, b.lock_write)
225
272
self.assertFalse(b.is_locked())
226
273
self.assertFalse(b.repository.is_locked())
228
self.assertEqual([('b', 'lw', True),
274
if self.combined_control:
275
self.assertEqual([('b', 'lw', True),
283
self.assertEqual([('b', 'lw', True),
236
289
def test_lock_write_returns_None_refuses_token(self):
237
290
branch = self.make_branch('b')
398
451
except NotImplementedError:
399
452
# This branch doesn't support this API.
401
branch.repository.leave_lock_in_place()
402
repo_token = branch.repository.lock_write()
403
branch.repository.unlock()
455
branch.repository.leave_lock_in_place()
456
except NotImplementedError:
457
# This repo doesn't support leaving locks around,
458
# assume it is essentially lock-free.
461
repo_token = branch.repository.lock_write()
462
branch.repository.unlock()
406
465
# Reacquire the lock (with a different branch object) by using the
408
467
new_branch = branch.bzrdir.open_branch()
409
# We have to explicitly lock the repository first.
410
new_branch.repository.lock_write(token=repo_token)
468
if repo_token is not None:
469
# We have to explicitly lock the repository first.
470
new_branch.repository.lock_write(token=repo_token)
411
471
new_branch.lock_write(token=token)
412
# Now we don't need our own repository lock anymore (the branch is
413
# holding it for us).
414
new_branch.repository.unlock()
472
if repo_token is not None:
473
# Now we don't need our own repository lock anymore (the branch is
474
# holding it for us).
475
new_branch.repository.unlock()
415
476
# Call dont_leave_lock_in_place, so that the lock will be released by
416
477
# this instance, even though the lock wasn't originally acquired by it.
417
478
new_branch.dont_leave_lock_in_place()
418
new_branch.repository.dont_leave_lock_in_place()
479
if repo_token is not None:
480
new_branch.repository.dont_leave_lock_in_place()
419
481
new_branch.unlock()
420
482
# Now the branch (and repository) is unlocked. Test this by locking it
421
483
# without tokens.
437
499
branch = branch.bzrdir.open_branch()
438
500
branch.lock_write()
440
# Now the branch.repository is locked, so we can't lock it with a new
441
# repository without a token.
502
# The branch should have asked the repository to lock.
503
self.assertTrue(branch.repository.is_write_locked())
504
# Does the repository type actually lock?
505
if not branch.repository.get_physical_lock_status():
506
# The test was successfully applied, so it was applicable.
508
# Now the branch.repository is physically locked, so we can't lock
509
# it with a new repository instance.
442
510
new_repo = branch.bzrdir.open_repository()
443
511
self.assertRaises(errors.LockContention, new_repo.lock_write)
444
512
# We can call lock_write on the original repository object though,