~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-27 05:35:00 UTC
  • mfrom: (4570 +trunk)
  • mto: (4634.6.29 2.0)
  • mto: This revision was merged to the branch mainline in revision 4680.
  • Revision ID: andrew.bennetts@canonical.com-20090727053500-q76zsn2dx33jhmj5
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for LockDir"""
18
18
 
142
142
        lf1 = LockDir(t, 'test_lock')
143
143
        lf1.create()
144
144
        lf1.attempt_lock()
 
145
        self.addCleanup(lf1.unlock)
145
146
        # lock is held, should get some info on it
146
147
        info1 = lf1.peek()
147
148
        self.assertEqual(set(info1.keys()),
161
162
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
162
163
        self.assertEqual(lf2.peek(), None)
163
164
        lf1.attempt_lock()
 
165
        self.addCleanup(lf1.unlock)
164
166
        info2 = lf2.peek()
165
167
        self.assertTrue(info2)
166
168
        self.assertEqual(info2['nonce'], lf1.nonce)
451
453
        lf1 = LockDir(t, 'test_lock')
452
454
        lf1.create()
453
455
        lf1.attempt_lock()
 
456
        self.addCleanup(lf1.unlock)
454
457
        lf1.confirm()
455
458
 
456
459
    def test_41_confirm_not_held(self):
468
471
        lf1.attempt_lock()
469
472
        t.move('test_lock', 'lock_gone_now')
470
473
        self.assertRaises(LockBroken, lf1.confirm)
 
474
        # Clean up
 
475
        t.move('lock_gone_now', 'test_lock')
 
476
        lf1.unlock()
471
477
 
472
478
    def test_43_break(self):
473
479
        """Break a lock whose caller has forgotten it"""
484
490
        lf2.force_break(holder_info)
485
491
        # now we should be able to take it
486
492
        lf2.attempt_lock()
 
493
        self.addCleanup(lf2.unlock)
487
494
        lf2.confirm()
488
495
 
489
496
    def test_44_break_already_released(self):
501
508
        lf2.force_break(holder_info)
502
509
        # now we should be able to take it
503
510
        lf2.attempt_lock()
 
511
        self.addCleanup(lf2.unlock)
504
512
        lf2.confirm()
505
513
 
506
514
    def test_45_break_mismatch(self):
553
561
        # do this without IO redirection to ensure it doesn't prompt.
554
562
        self.assertRaises(AssertionError, ld1.break_lock)
555
563
        orig_factory = bzrlib.ui.ui_factory
556
 
        # silent ui - no need for stdout
557
 
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
558
 
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
564
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
559
565
        try:
560
566
            ld2.break_lock()
561
567
            self.assertRaises(LockBroken, ld1.unlock)
621
627
    def test_lock_by_token(self):
622
628
        ld1 = self.get_lock()
623
629
        token = ld1.lock_write()
 
630
        self.addCleanup(ld1.unlock)
624
631
        self.assertNotEqual(None, token)
625
632
        ld2 = self.get_lock()
626
633
        t2 = ld2.lock_write(token)
 
634
        self.addCleanup(ld2.unlock)
627
635
        self.assertEqual(token, t2)
628
636
 
629
637
    def test_lock_with_buggy_rename(self):
654
662
        check_dir([])
655
663
        # when held, that's all we see
656
664
        ld1.attempt_lock()
 
665
        self.addCleanup(ld1.unlock)
657
666
        check_dir(['held'])
658
667
        # second guy should fail
659
668
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
660
669
        # no kibble
661
670
        check_dir(['held'])
662
671
 
 
672
 
 
673
class TestLockDirHooks(TestCaseWithTransport):
 
674
 
 
675
    def setUp(self):
 
676
        super(TestLockDirHooks, self).setUp()
 
677
        self._calls = []
 
678
 
 
679
    def get_lock(self):
 
680
        return LockDir(self.get_transport(), 'test_lock')
 
681
 
663
682
    def record_hook(self, result):
664
683
        self._calls.append(result)
665
684
 
666
 
    def reset_hooks(self):
667
 
        self._old_hooks = lock.Lock.hooks
668
 
        self.addCleanup(self.restore_hooks)
669
 
        lock.Lock.hooks = lock.LockHooks()
670
 
 
671
 
    def restore_hooks(self):
672
 
        lock.Lock.hooks = self._old_hooks
673
 
 
674
685
    def test_LockDir_acquired_success(self):
675
686
        # the LockDir.lock_acquired hook fires when a lock is acquired.
676
 
        self._calls = []
677
 
        self.reset_hooks()
678
687
        LockDir.hooks.install_named_hook('lock_acquired',
679
 
            self.record_hook, 'record_hook')
 
688
                                         self.record_hook, 'record_hook')
680
689
        ld = self.get_lock()
681
690
        ld.create()
682
691
        self.assertEqual([], self._calls)
688
697
 
689
698
    def test_LockDir_acquired_fail(self):
690
699
        # the LockDir.lock_acquired hook does not fire on failure.
691
 
        self._calls = []
692
 
        self.reset_hooks()
693
700
        ld = self.get_lock()
694
701
        ld.create()
695
702
        ld2 = self.get_lock()
696
703
        ld2.attempt_lock()
697
704
        # install a lock hook now, when the disk lock is locked
698
705
        LockDir.hooks.install_named_hook('lock_acquired',
699
 
            self.record_hook, 'record_hook')
 
706
                                         self.record_hook, 'record_hook')
700
707
        self.assertRaises(errors.LockContention, ld.attempt_lock)
701
708
        self.assertEqual([], self._calls)
702
709
        ld2.unlock()
704
711
 
705
712
    def test_LockDir_released_success(self):
706
713
        # the LockDir.lock_released hook fires when a lock is acquired.
707
 
        self._calls = []
708
 
        self.reset_hooks()
709
714
        LockDir.hooks.install_named_hook('lock_released',
710
 
            self.record_hook, 'record_hook')
 
715
                                         self.record_hook, 'record_hook')
711
716
        ld = self.get_lock()
712
717
        ld.create()
713
718
        self.assertEqual([], self._calls)
719
724
 
720
725
    def test_LockDir_released_fail(self):
721
726
        # the LockDir.lock_released hook does not fire on failure.
722
 
        self._calls = []
723
 
        self.reset_hooks()
724
727
        ld = self.get_lock()
725
728
        ld.create()
726
729
        ld2 = self.get_lock()
727
730
        ld.attempt_lock()
728
731
        ld2.force_break(ld2.peek())
729
732
        LockDir.hooks.install_named_hook('lock_released',
730
 
            self.record_hook, 'record_hook')
 
733
                                         self.record_hook, 'record_hook')
731
734
        self.assertRaises(LockBroken, ld.unlock)
732
735
        self.assertEqual([], self._calls)
 
736
 
 
737
    def test_LockDir_broken_success(self):
 
738
        # the LockDir.lock_broken hook fires when a lock is broken.
 
739
        ld = self.get_lock()
 
740
        ld.create()
 
741
        ld2 = self.get_lock()
 
742
        result = ld.attempt_lock()
 
743
        LockDir.hooks.install_named_hook('lock_broken',
 
744
                                         self.record_hook, 'record_hook')
 
745
        ld2.force_break(ld2.peek())
 
746
        lock_path = ld.transport.abspath(ld.path)
 
747
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
 
748
 
 
749
    def test_LockDir_broken_failure(self):
 
750
        # the LockDir.lock_broken hook does not fires when a lock is already
 
751
        # released.
 
752
        ld = self.get_lock()
 
753
        ld.create()
 
754
        ld2 = self.get_lock()
 
755
        result = ld.attempt_lock()
 
756
        holder_info = ld2.peek()
 
757
        ld.unlock()
 
758
        LockDir.hooks.install_named_hook('lock_broken',
 
759
                                         self.record_hook, 'record_hook')
 
760
        ld2.force_break(holder_info)
 
761
        lock_path = ld.transport.abspath(ld.path)
 
762
        self.assertEqual([], self._calls)