109
100
"""Fail to create lock on readonly transport"""
110
101
t = self.get_readonly_transport()
111
102
lf = LockDir(t, 'test_lock')
112
self.assertRaises(LockFailed, lf.create)
103
self.assertRaises(UnlockableTransport, lf.create)
114
105
def test_12_lock_readonly_transport(self):
115
106
"""Fail to lock on readonly transport"""
116
107
lf = LockDir(self.get_transport(), 'test_lock')
118
109
lf = LockDir(self.get_readonly_transport(), 'test_lock')
119
self.assertRaises(LockFailed, lf.attempt_lock)
110
self.assertRaises(UnlockableTransport, lf.attempt_lock)
121
112
def test_20_lock_contested(self):
122
113
"""Contention to get a lock"""
276
253
self.assertEndsWith(args[3], ' ago')
277
254
self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
256
def test_33_wait(self):
257
"""Succeed when waiting on a lock that gets released
259
The difference from test_32_lock_wait_succeed is that the second
260
caller does not actually acquire the lock, but just waits for it
261
to be released. This is done over a readonly transport.
263
t = self.get_transport()
264
lf1 = LockDir(t, 'test_lock')
268
def wait_and_unlock():
271
unlocker = Thread(target=wait_and_unlock)
274
lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
276
# wait but don't lock
277
lf2.wait(timeout=0.4, poll=0.1)
279
self.assertTrue(after - before <= 1.0)
279
283
def test_34_lock_write_waits(self):
280
284
"""LockDir.lock_write() will wait for the lock."""
281
# the test suite sets the default to 0 to make deadlocks fail fast.
282
# change it for this test, as we want to try a manual deadlock.
283
raise tests.TestSkipped('Timing-sensitive test')
284
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
285
285
t = self.get_transport()
286
286
lf1 = LockDir(t, 'test_lock')
414
411
# There should be 2 reports, because the lock changed
415
412
lock_base = lf2.transport.abspath(lf2.path)
416
413
self.assertEqual(2, len(self._logged_reports))
417
lock_url = lf2.transport.abspath(lf2.path)
418
415
self.assertEqual('%s %s\n'
420
'Will continue to try until %s, unless '
422
'If you\'re sure that it\'s not being '
423
'modified, use bzr break-lock %s',
417
'Will continue to try until %s\n',
424
418
self._logged_reports[0][0])
425
419
args = self._logged_reports[0][1]
426
420
self.assertEqual('Unable to obtain', args[0])
599
590
self.assertContainsRe(info_list[1],
600
591
r'^held by .* on host .* \[process #\d*\]$')
601
592
self.assertContainsRe(info_list[2], r'locked \d+ seconds? ago$')
603
def test_lock_without_email(self):
604
global_config = config.GlobalConfig()
605
# Intentionally has no email address
606
global_config.set_user_option('email', 'User Identity')
607
ld1 = self.get_lock()
612
def test_lock_permission(self):
613
if not osutils.supports_posix_readonly():
614
raise tests.TestSkipped('Cannot induce a permission failure')
615
ld1 = self.get_lock()
616
lock_path = ld1.transport.local_abspath('test_lock')
618
osutils.make_readonly(lock_path)
619
self.assertRaises(errors.LockFailed, ld1.attempt_lock)
621
def test_lock_by_token(self):
622
ld1 = self.get_lock()
623
token = ld1.lock_write()
624
self.assertNotEqual(None, token)
625
ld2 = self.get_lock()
626
t2 = ld2.lock_write(token)
627
self.assertEqual(token, t2)
629
def test_lock_with_buggy_rename(self):
630
# test that lock acquisition handles servers which pretend they
631
# renamed correctly but that actually fail
632
t = transport.get_transport('brokenrename+' + self.get_url())
633
ld1 = LockDir(t, 'test_lock')
636
ld2 = LockDir(t, 'test_lock')
637
# we should fail to lock
638
e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
639
# now the original caller should succeed in unlocking
641
# and there should be nothing left over
642
self.assertEquals([], t.list_dir('test_lock'))
644
def test_failed_lock_leaves_no_trash(self):
645
# if we fail to acquire the lock, we don't leave pending directories
646
# behind -- https://bugs.launchpad.net/bzr/+bug/109169
647
ld1 = self.get_lock()
648
ld2 = self.get_lock()
649
# should be nothing before we start
651
t = self.get_transport().clone('test_lock')
653
self.assertEquals(a, t.list_dir('.'))
655
# when held, that's all we see
658
# second guy should fail
659
self.assertRaises(errors.LockContention, ld2.attempt_lock)
663
def record_hook(self, result):
664
self._calls.append(result)
666
def reset_hooks(self):
667
self._old_hooks = lock.Lock.hooks
668
self.addCleanup(self.restore_hooks)
669
lock.Lock.hooks = lock.LockHooks()
671
def restore_hooks(self):
672
lock.Lock.hooks = self._old_hooks
674
def test_LockDir_acquired_success(self):
675
# the LockDir.lock_acquired hook fires when a lock is acquired.
678
LockDir.hooks.install_named_hook('lock_acquired',
679
self.record_hook, 'record_hook')
682
self.assertEqual([], self._calls)
683
result = ld.attempt_lock()
684
lock_path = ld.transport.abspath(ld.path)
685
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
687
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
689
def test_LockDir_acquired_fail(self):
690
# the LockDir.lock_acquired hook does not fire on failure.
695
ld2 = self.get_lock()
697
# install a lock hook now, when the disk lock is locked
698
LockDir.hooks.install_named_hook('lock_acquired',
699
self.record_hook, 'record_hook')
700
self.assertRaises(errors.LockContention, ld.attempt_lock)
701
self.assertEqual([], self._calls)
703
self.assertEqual([], self._calls)
705
def test_LockDir_released_success(self):
706
# the LockDir.lock_released hook fires when a lock is acquired.
709
LockDir.hooks.install_named_hook('lock_released',
710
self.record_hook, 'record_hook')
713
self.assertEqual([], self._calls)
714
result = ld.attempt_lock()
715
self.assertEqual([], self._calls)
717
lock_path = ld.transport.abspath(ld.path)
718
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
720
def test_LockDir_released_fail(self):
721
# the LockDir.lock_released hook does not fire on failure.
726
ld2 = self.get_lock()
728
ld2.force_break(ld2.peek())
729
LockDir.hooks.install_named_hook('lock_released',
730
self.record_hook, 'record_hook')
731
self.assertRaises(LockBroken, ld.unlock)
732
self.assertEqual([], self._calls)