~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-05-08 19:51:48 UTC
  • mfrom: (4345.1.1 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20090508195148-cw1mw95i0qo39ggg
(vila) Fix some lock-related test failures

Show diffs side-by-side

added added

removed removed

Lines of Context:
142
142
        lf1 = LockDir(t, 'test_lock')
143
143
        lf1.create()
144
144
        lf1.attempt_lock()
 
145
        self.addCleanup(lf1.unlock)
145
146
        # lock is held, should get some info on it
146
147
        info1 = lf1.peek()
147
148
        self.assertEqual(set(info1.keys()),
161
162
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
162
163
        self.assertEqual(lf2.peek(), None)
163
164
        lf1.attempt_lock()
 
165
        self.addCleanup(lf1.unlock)
164
166
        info2 = lf2.peek()
165
167
        self.assertTrue(info2)
166
168
        self.assertEqual(info2['nonce'], lf1.nonce)
451
453
        lf1 = LockDir(t, 'test_lock')
452
454
        lf1.create()
453
455
        lf1.attempt_lock()
 
456
        self.addCleanup(lf1.unlock)
454
457
        lf1.confirm()
455
458
 
456
459
    def test_41_confirm_not_held(self):
468
471
        lf1.attempt_lock()
469
472
        t.move('test_lock', 'lock_gone_now')
470
473
        self.assertRaises(LockBroken, lf1.confirm)
 
474
        # Clean up
 
475
        t.move('lock_gone_now', 'test_lock')
 
476
        lf1.unlock()
471
477
 
472
478
    def test_43_break(self):
473
479
        """Break a lock whose caller has forgotten it"""
484
490
        lf2.force_break(holder_info)
485
491
        # now we should be able to take it
486
492
        lf2.attempt_lock()
 
493
        self.addCleanup(lf2.unlock)
487
494
        lf2.confirm()
488
495
 
489
496
    def test_44_break_already_released(self):
501
508
        lf2.force_break(holder_info)
502
509
        # now we should be able to take it
503
510
        lf2.attempt_lock()
 
511
        self.addCleanup(lf2.unlock)
504
512
        lf2.confirm()
505
513
 
506
514
    def test_45_break_mismatch(self):
621
629
    def test_lock_by_token(self):
622
630
        ld1 = self.get_lock()
623
631
        token = ld1.lock_write()
 
632
        self.addCleanup(ld1.unlock)
624
633
        self.assertNotEqual(None, token)
625
634
        ld2 = self.get_lock()
626
635
        t2 = ld2.lock_write(token)
 
636
        self.addCleanup(ld2.unlock)
627
637
        self.assertEqual(token, t2)
628
638
 
629
639
    def test_lock_with_buggy_rename(self):
654
664
        check_dir([])
655
665
        # when held, that's all we see
656
666
        ld1.attempt_lock()
 
667
        self.addCleanup(ld1.unlock)
657
668
        check_dir(['held'])
658
669
        # second guy should fail
659
670
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
660
671
        # no kibble
661
672
        check_dir(['held'])
662
673
 
 
674
 
 
675
class TestLockDirHooks(TestCaseWithTransport):
 
676
 
 
677
    def setUp(self):
 
678
        super(TestLockDirHooks, self).setUp()
 
679
        self._calls = []
 
680
 
 
681
    def get_lock(self):
 
682
        return LockDir(self.get_transport(), 'test_lock')
 
683
 
663
684
    def record_hook(self, result):
664
685
        self._calls.append(result)
665
686
 
666
 
    def reset_hooks(self):
667
 
        self._old_hooks = lock.Lock.hooks
668
 
        self.addCleanup(self.restore_hooks)
669
 
        lock.Lock.hooks = lock.LockHooks()
670
 
 
671
 
    def restore_hooks(self):
672
 
        lock.Lock.hooks = self._old_hooks
673
 
 
674
687
    def test_LockDir_acquired_success(self):
675
688
        # the LockDir.lock_acquired hook fires when a lock is acquired.
676
 
        self._calls = []
677
 
        self.reset_hooks()
678
689
        LockDir.hooks.install_named_hook('lock_acquired',
679
 
            self.record_hook, 'record_hook')
 
690
                                         self.record_hook, 'record_hook')
680
691
        ld = self.get_lock()
681
692
        ld.create()
682
693
        self.assertEqual([], self._calls)
688
699
 
689
700
    def test_LockDir_acquired_fail(self):
690
701
        # the LockDir.lock_acquired hook does not fire on failure.
691
 
        self._calls = []
692
 
        self.reset_hooks()
693
702
        ld = self.get_lock()
694
703
        ld.create()
695
704
        ld2 = self.get_lock()
696
705
        ld2.attempt_lock()
697
706
        # install a lock hook now, when the disk lock is locked
698
707
        LockDir.hooks.install_named_hook('lock_acquired',
699
 
            self.record_hook, 'record_hook')
 
708
                                         self.record_hook, 'record_hook')
700
709
        self.assertRaises(errors.LockContention, ld.attempt_lock)
701
710
        self.assertEqual([], self._calls)
702
711
        ld2.unlock()
704
713
 
705
714
    def test_LockDir_released_success(self):
706
715
        # the LockDir.lock_released hook fires when a lock is acquired.
707
 
        self._calls = []
708
 
        self.reset_hooks()
709
716
        LockDir.hooks.install_named_hook('lock_released',
710
 
            self.record_hook, 'record_hook')
 
717
                                         self.record_hook, 'record_hook')
711
718
        ld = self.get_lock()
712
719
        ld.create()
713
720
        self.assertEqual([], self._calls)
719
726
 
720
727
    def test_LockDir_released_fail(self):
721
728
        # the LockDir.lock_released hook does not fire on failure.
722
 
        self._calls = []
723
 
        self.reset_hooks()
724
729
        ld = self.get_lock()
725
730
        ld.create()
726
731
        ld2 = self.get_lock()
727
732
        ld.attempt_lock()
728
733
        ld2.force_break(ld2.peek())
729
734
        LockDir.hooks.install_named_hook('lock_released',
730
 
            self.record_hook, 'record_hook')
 
735
                                         self.record_hook, 'record_hook')
731
736
        self.assertRaises(LockBroken, ld.unlock)
732
737
        self.assertEqual([], self._calls)
 
738
 
 
739
    def test_LockDir_broken_success(self):
 
740
        # the LockDir.lock_broken hook fires when a lock is broken.
 
741
        ld = self.get_lock()
 
742
        ld.create()
 
743
        ld2 = self.get_lock()
 
744
        result = ld.attempt_lock()
 
745
        LockDir.hooks.install_named_hook('lock_broken',
 
746
                                         self.record_hook, 'record_hook')
 
747
        ld2.force_break(ld2.peek())
 
748
        lock_path = ld.transport.abspath(ld.path)
 
749
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
 
750
 
 
751
    def test_LockDir_broken_failure(self):
 
752
        # the LockDir.lock_broken hook does not fires when a lock is already
 
753
        # released.
 
754
        ld = self.get_lock()
 
755
        ld.create()
 
756
        ld2 = self.get_lock()
 
757
        result = ld.attempt_lock()
 
758
        holder_info = ld2.peek()
 
759
        ld.unlock()
 
760
        LockDir.hooks.install_named_hook('lock_broken',
 
761
                                         self.record_hook, 'record_hook')
 
762
        ld2.force_break(holder_info)
 
763
        lock_path = ld.transport.abspath(ld.path)
 
764
        self.assertEqual([], self._calls)