13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Tests for LockDir"""
142
142
lf1 = LockDir(t, 'test_lock')
144
144
lf1.attempt_lock()
145
self.addCleanup(lf1.unlock)
145
146
# lock is held, should get some info on it
146
147
info1 = lf1.peek()
147
148
self.assertEqual(set(info1.keys()),
161
162
lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
162
163
self.assertEqual(lf2.peek(), None)
163
164
lf1.attempt_lock()
165
self.addCleanup(lf1.unlock)
164
166
info2 = lf2.peek()
165
167
self.assertTrue(info2)
166
168
self.assertEqual(info2['nonce'], lf1.nonce)
468
471
lf1.attempt_lock()
469
472
t.move('test_lock', 'lock_gone_now')
470
473
self.assertRaises(LockBroken, lf1.confirm)
475
t.move('lock_gone_now', 'test_lock')
472
478
def test_43_break(self):
473
479
"""Break a lock whose caller has forgotten it"""
553
561
# do this without IO redirection to ensure it doesn't prompt.
554
562
self.assertRaises(AssertionError, ld1.break_lock)
555
563
orig_factory = bzrlib.ui.ui_factory
556
# silent ui - no need for stdout
557
bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
558
bzrlib.ui.ui_factory.stdin = StringIO("y\n")
564
bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
561
567
self.assertRaises(LockBroken, ld1.unlock)
621
627
def test_lock_by_token(self):
622
628
ld1 = self.get_lock()
623
629
token = ld1.lock_write()
630
self.addCleanup(ld1.unlock)
624
631
self.assertNotEqual(None, token)
625
632
ld2 = self.get_lock()
626
633
t2 = ld2.lock_write(token)
634
self.addCleanup(ld2.unlock)
627
635
self.assertEqual(token, t2)
629
637
def test_lock_with_buggy_rename(self):
655
663
# when held, that's all we see
656
664
ld1.attempt_lock()
665
self.addCleanup(ld1.unlock)
657
666
check_dir(['held'])
658
667
# second guy should fail
659
668
self.assertRaises(errors.LockContention, ld2.attempt_lock)
661
670
check_dir(['held'])
673
class TestLockDirHooks(TestCaseWithTransport):
676
super(TestLockDirHooks, self).setUp()
680
return LockDir(self.get_transport(), 'test_lock')
663
682
def record_hook(self, result):
664
683
self._calls.append(result)
666
def reset_hooks(self):
667
self._old_hooks = lock.Lock.hooks
668
self.addCleanup(self.restore_hooks)
669
lock.Lock.hooks = lock.LockHooks()
671
def restore_hooks(self):
672
lock.Lock.hooks = self._old_hooks
674
685
def test_LockDir_acquired_success(self):
675
686
# the LockDir.lock_acquired hook fires when a lock is acquired.
678
687
LockDir.hooks.install_named_hook('lock_acquired',
679
self.record_hook, 'record_hook')
688
self.record_hook, 'record_hook')
680
689
ld = self.get_lock()
682
691
self.assertEqual([], self._calls)
689
698
def test_LockDir_acquired_fail(self):
690
699
# the LockDir.lock_acquired hook does not fire on failure.
693
700
ld = self.get_lock()
695
702
ld2 = self.get_lock()
696
703
ld2.attempt_lock()
697
704
# install a lock hook now, when the disk lock is locked
698
705
LockDir.hooks.install_named_hook('lock_acquired',
699
self.record_hook, 'record_hook')
706
self.record_hook, 'record_hook')
700
707
self.assertRaises(errors.LockContention, ld.attempt_lock)
701
708
self.assertEqual([], self._calls)
705
712
def test_LockDir_released_success(self):
706
713
# the LockDir.lock_released hook fires when a lock is acquired.
709
714
LockDir.hooks.install_named_hook('lock_released',
710
self.record_hook, 'record_hook')
715
self.record_hook, 'record_hook')
711
716
ld = self.get_lock()
713
718
self.assertEqual([], self._calls)
720
725
def test_LockDir_released_fail(self):
721
726
# the LockDir.lock_released hook does not fire on failure.
724
727
ld = self.get_lock()
726
729
ld2 = self.get_lock()
727
730
ld.attempt_lock()
728
731
ld2.force_break(ld2.peek())
729
732
LockDir.hooks.install_named_hook('lock_released',
730
self.record_hook, 'record_hook')
733
self.record_hook, 'record_hook')
731
734
self.assertRaises(LockBroken, ld.unlock)
732
735
self.assertEqual([], self._calls)
737
def test_LockDir_broken_success(self):
738
# the LockDir.lock_broken hook fires when a lock is broken.
741
ld2 = self.get_lock()
742
result = ld.attempt_lock()
743
LockDir.hooks.install_named_hook('lock_broken',
744
self.record_hook, 'record_hook')
745
ld2.force_break(ld2.peek())
746
lock_path = ld.transport.abspath(ld.path)
747
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
749
def test_LockDir_broken_failure(self):
750
# the LockDir.lock_broken hook does not fires when a lock is already
754
ld2 = self.get_lock()
755
result = ld.attempt_lock()
756
holder_info = ld2.peek()
758
LockDir.hooks.install_named_hook('lock_broken',
759
self.record_hook, 'record_hook')
760
ld2.force_break(holder_info)
761
lock_path = ld.transport.abspath(ld.path)
762
self.assertEqual([], self._calls)