1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006, 2007, 2008 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
142
142
lf1 = LockDir(t, 'test_lock')
144
144
lf1.attempt_lock()
145
self.addCleanup(lf1.unlock)
146
145
# lock is held, should get some info on it
147
146
info1 = lf1.peek()
148
147
self.assertEqual(set(info1.keys()),
162
161
lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
163
162
self.assertEqual(lf2.peek(), None)
164
163
lf1.attempt_lock()
165
self.addCleanup(lf1.unlock)
166
164
info2 = lf2.peek()
167
165
self.assertTrue(info2)
168
166
self.assertEqual(info2['nonce'], lf1.nonce)
197
195
self.assertEqual('%s %s\n'
199
197
'Will continue to try until %s, unless '
200
'you press Ctrl-C.\n'
201
'See "bzr help break-lock" for more.',
199
'If you\'re sure that it\'s not being '
200
'modified, use bzr break-lock %s',
202
201
self._logged_reports[0][0])
203
202
args = self._logged_reports[0][1]
204
203
self.assertEqual('Unable to obtain', args[0])
419
418
self.assertEqual('%s %s\n'
421
420
'Will continue to try until %s, unless '
422
'you press Ctrl-C.\n'
423
'See "bzr help break-lock" for more.',
422
'If you\'re sure that it\'s not being '
423
'modified, use bzr break-lock %s',
424
424
self._logged_reports[0][0])
425
425
args = self._logged_reports[0][1]
426
426
self.assertEqual('Unable to obtain', args[0])
433
433
self.assertEqual('%s %s\n'
435
435
'Will continue to try until %s, unless '
436
'you press Ctrl-C.\n'
437
'See "bzr help break-lock" for more.',
437
'If you\'re sure that it\'s not being '
438
'modified, use bzr break-lock %s',
438
439
self._logged_reports[1][0])
439
440
args = self._logged_reports[1][1]
440
441
self.assertEqual('Lock owner changed for', args[0])
468
468
lf1.attempt_lock()
469
469
t.move('test_lock', 'lock_gone_now')
470
470
self.assertRaises(LockBroken, lf1.confirm)
472
t.move('lock_gone_now', 'test_lock')
475
472
def test_43_break(self):
476
473
"""Break a lock whose caller has forgotten it"""
558
553
# do this without IO redirection to ensure it doesn't prompt.
559
554
self.assertRaises(AssertionError, ld1.break_lock)
560
555
orig_factory = bzrlib.ui.ui_factory
561
bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
556
# silent ui - no need for stdout
557
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
558
bzrlib.ui.ui_factory.stdin = StringIO("y\n")
564
561
self.assertRaises(LockBroken, ld1.unlock)
624
621
def test_lock_by_token(self):
625
622
ld1 = self.get_lock()
626
623
token = ld1.lock_write()
627
self.addCleanup(ld1.unlock)
628
624
self.assertNotEqual(None, token)
629
625
ld2 = self.get_lock()
630
626
t2 = ld2.lock_write(token)
631
self.addCleanup(ld2.unlock)
632
627
self.assertEqual(token, t2)
634
629
def test_lock_with_buggy_rename(self):
660
655
# when held, that's all we see
661
656
ld1.attempt_lock()
662
self.addCleanup(ld1.unlock)
663
657
check_dir(['held'])
664
658
# second guy should fail
665
659
self.assertRaises(errors.LockContention, ld2.attempt_lock)
667
661
check_dir(['held'])
669
def test_no_lockdir_info(self):
670
"""We can cope with empty info files."""
671
# This seems like a fairly common failure case - see
672
# <https://bugs.launchpad.net/bzr/+bug/185103> and all its dupes.
673
# Processes are often interrupted after opening the file
674
# before the actual contents are committed.
675
t = self.get_transport()
677
t.mkdir('test_lock/held')
678
t.put_bytes('test_lock/held/info', '')
679
lf = LockDir(t, 'test_lock')
681
formatted_info = lf._format_lock_info(info)
683
['lock %s' % t.abspath('test_lock'),
684
'held by <unknown> on host <unknown> [process #<unknown>]',
689
class TestLockDirHooks(TestCaseWithTransport):
692
super(TestLockDirHooks, self).setUp()
696
return LockDir(self.get_transport(), 'test_lock')
698
663
def record_hook(self, result):
699
664
self._calls.append(result)
666
def reset_hooks(self):
667
self._old_hooks = lock.Lock.hooks
668
self.addCleanup(self.restore_hooks)
669
lock.Lock.hooks = lock.LockHooks()
671
def restore_hooks(self):
672
lock.Lock.hooks = self._old_hooks
701
674
def test_LockDir_acquired_success(self):
702
675
# the LockDir.lock_acquired hook fires when a lock is acquired.
703
678
LockDir.hooks.install_named_hook('lock_acquired',
704
self.record_hook, 'record_hook')
679
self.record_hook, 'record_hook')
705
680
ld = self.get_lock()
707
682
self.assertEqual([], self._calls)
714
689
def test_LockDir_acquired_fail(self):
715
690
# the LockDir.lock_acquired hook does not fire on failure.
716
693
ld = self.get_lock()
718
695
ld2 = self.get_lock()
719
696
ld2.attempt_lock()
720
697
# install a lock hook now, when the disk lock is locked
721
698
LockDir.hooks.install_named_hook('lock_acquired',
722
self.record_hook, 'record_hook')
699
self.record_hook, 'record_hook')
723
700
self.assertRaises(errors.LockContention, ld.attempt_lock)
724
701
self.assertEqual([], self._calls)
728
705
def test_LockDir_released_success(self):
729
706
# the LockDir.lock_released hook fires when a lock is acquired.
730
709
LockDir.hooks.install_named_hook('lock_released',
731
self.record_hook, 'record_hook')
710
self.record_hook, 'record_hook')
732
711
ld = self.get_lock()
734
713
self.assertEqual([], self._calls)
741
720
def test_LockDir_released_fail(self):
742
721
# the LockDir.lock_released hook does not fire on failure.
743
724
ld = self.get_lock()
745
726
ld2 = self.get_lock()
746
727
ld.attempt_lock()
747
728
ld2.force_break(ld2.peek())
748
729
LockDir.hooks.install_named_hook('lock_released',
749
self.record_hook, 'record_hook')
730
self.record_hook, 'record_hook')
750
731
self.assertRaises(LockBroken, ld.unlock)
751
732
self.assertEqual([], self._calls)
753
def test_LockDir_broken_success(self):
754
# the LockDir.lock_broken hook fires when a lock is broken.
757
ld2 = self.get_lock()
758
result = ld.attempt_lock()
759
LockDir.hooks.install_named_hook('lock_broken',
760
self.record_hook, 'record_hook')
761
ld2.force_break(ld2.peek())
762
lock_path = ld.transport.abspath(ld.path)
763
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
765
def test_LockDir_broken_failure(self):
766
# the LockDir.lock_broken hook does not fires when a lock is already
770
ld2 = self.get_lock()
771
result = ld.attempt_lock()
772
holder_info = ld2.peek()
774
LockDir.hooks.install_named_hook('lock_broken',
775
self.record_hook, 'record_hook')
776
ld2.force_break(holder_info)
777
lock_path = ld.transport.abspath(ld.path)
778
self.assertEqual([], self._calls)