~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: John Arbash Meinel
  • Date: 2006-06-30 19:58:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1846.
  • Revision ID: john@arbash-meinel.com-20060630195802-c6e78240f04c9d52
Branch.lock_read/lock_write/unlock should handle failures
because of the multi-phase of lock and unlock, we should make sure we
cleanup the locks properly. This is now tested for Branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
                           NoSuchFile,
28
28
                           UninitializableFormat,
29
29
                           NotBranchError,
 
30
                           TestPreventLocking,
30
31
                           )
 
32
from bzrlib.lock import LockWrapper
31
33
from bzrlib.osutils import getcwd
32
34
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
33
35
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
561
563
            return
562
564
        self.assertEqual(self.branch_format,
563
565
                         branch.BranchFormat.find_format(opened_control))
 
566
 
 
567
 
 
568
class TestBranchLocking(TestCaseWithBranch):
 
569
 
 
570
    def get_instrumented_branch(self):
 
571
        """Get a Branch object which has been instrumented"""
 
572
        # TODO: jam 20060630 It may be that not all formats have a 
 
573
        # 'control_files' member. So we should fail gracefully if
 
574
        # not there. But assuming it has them lets us test the exact 
 
575
        # lock/unlock order.
 
576
        self.locks = []
 
577
        b = LockWrapper(self.locks, self.get_branch(), 'b')
 
578
        b.repository = LockWrapper(self.locks, b.repository, 'r')
 
579
        bcf = b.control_files
 
580
        rcf = b.repository.control_files
 
581
 
 
582
        # Look out for branch types that reuse their control files
 
583
        self.combined_control = bcf is rcf
 
584
 
 
585
        b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
 
586
        b.repository.control_files = \
 
587
            LockWrapper(self.locks, b.repository.control_files, 'rc')
 
588
        return b
 
589
 
 
590
    def test_01_lock_read(self):
 
591
        # Test that locking occurs in the correct order
 
592
        b = self.get_instrumented_branch()
 
593
 
 
594
        self.assertFalse(b.is_locked())
 
595
        self.assertFalse(b.repository.is_locked())
 
596
        b.lock_read()
 
597
        try:
 
598
            self.assertTrue(b.is_locked())
 
599
            self.assertTrue(b.repository.is_locked())
 
600
        finally:
 
601
            b.unlock()
 
602
        self.assertFalse(b.is_locked())
 
603
        self.assertFalse(b.repository.is_locked())
 
604
 
 
605
        self.assertEqual([('b', 'lr', True), ('bc', 'lr', True),
 
606
                          ('r', 'lr', True), ('rc', 'lr', True),
 
607
                          ('b', 'ul', True), ('r', 'ul', True),
 
608
                          ('rc', 'ul', True), ('bc', 'ul', True),
 
609
                         ], self.locks)
 
610
 
 
611
    def test_02_lock_write(self):
 
612
        # Test that locking occurs in the correct order
 
613
        b = self.get_instrumented_branch()
 
614
 
 
615
        self.assertFalse(b.is_locked())
 
616
        self.assertFalse(b.repository.is_locked())
 
617
        b.lock_write()
 
618
        try:
 
619
            self.assertTrue(b.is_locked())
 
620
            self.assertTrue(b.repository.is_locked())
 
621
        finally:
 
622
            b.unlock()
 
623
        self.assertFalse(b.is_locked())
 
624
        self.assertFalse(b.repository.is_locked())
 
625
 
 
626
        self.assertEqual([('b', 'lw', True), ('bc', 'lw', True),
 
627
                          ('r', 'lw', True), ('rc', 'lw', True),
 
628
                          ('b', 'ul', True), ('r', 'ul', True),
 
629
                          ('rc', 'ul', True), ('bc', 'ul', True),
 
630
                         ], self.locks)
 
631
 
 
632
    def test_03_lock_fail_unlock_repo(self):
 
633
        # Make sure branch.unlock() is called, even if there is a
 
634
        # failure while unlocking the repository.
 
635
        b = self.get_instrumented_branch()
 
636
        b.repository.disable_unlock()
 
637
 
 
638
        self.assertFalse(b.is_locked())
 
639
        self.assertFalse(b.repository.is_locked())
 
640
        b.lock_write()
 
641
        try:
 
642
            self.assertTrue(b.is_locked())
 
643
            self.assertTrue(b.repository.is_locked())
 
644
            self.assertRaises(TestPreventLocking, b.unlock)
 
645
            if self.combined_control:
 
646
                self.assertTrue(b.is_locked())
 
647
            else:
 
648
                self.assertFalse(b.is_locked())
 
649
            self.assertTrue(b.repository.is_locked())
 
650
 
 
651
            # We unlock the branch control files, even if 
 
652
            # we fail to unlock the repository
 
653
            self.assertEqual([('b', 'lw', True), ('bc', 'lw', True),
 
654
                              ('r', 'lw', True), ('rc', 'lw', True),
 
655
                              ('b', 'ul', True), ('r', 'ul', False), 
 
656
                              ('bc', 'ul', True),
 
657
                             ], self.locks)
 
658
 
 
659
        finally:
 
660
            # For cleanup purposes, make sure we are unlocked
 
661
            b.repository._other.unlock()
 
662
 
 
663
    def test_04_lock_read_fail_repo(self):
 
664
        # Test that the branch is unlocked if it cannot lock the repository
 
665
        b = self.get_instrumented_branch()
 
666
        b.repository.disable_lock_read()
 
667
 
 
668
        self.assertRaises(TestPreventLocking, b.lock_read)
 
669
        self.assertFalse(b.is_locked())
 
670
        self.assertFalse(b.repository.is_locked())
 
671
 
 
672
        self.assertEqual([('b', 'lr', True), ('bc', 'lr', True),
 
673
                          ('r', 'lr', False), 
 
674
                          ('bc', 'ul', True),
 
675
                         ], self.locks)
 
676
 
 
677
    def test_05_lock_write_fail_repo(self):
 
678
        # Test that the branch is unlocked if it cannot lock the repository
 
679
        b = self.get_instrumented_branch()
 
680
        b.repository.disable_lock_write()
 
681
 
 
682
        self.assertRaises(TestPreventLocking, b.lock_write)
 
683
        self.assertFalse(b.is_locked())
 
684
        self.assertFalse(b.repository.is_locked())
 
685
 
 
686
        self.assertEqual([('b', 'lw', True), ('bc', 'lw', True),
 
687
                          ('r', 'lw', False), 
 
688
                          ('bc', 'ul', True),
 
689
                         ], self.locks)
 
690
 
 
691
    def test_06_lock_read_fail_control(self):
 
692
        # We shouldn't try to lock the repo if we can't lock the branch
 
693
        b = self.get_instrumented_branch()
 
694
        b.control_files.disable_lock_read()
 
695
 
 
696
        self.assertRaises(TestPreventLocking, b.lock_read)
 
697
        self.assertFalse(b.is_locked())
 
698
        self.assertFalse(b.repository.is_locked())
 
699
 
 
700
        self.assertEqual([('b', 'lr', True), ('bc', 'lr', False),
 
701
                         ], self.locks)
 
702
 
 
703
    def test_07_lock_write_fail_control(self):
 
704
        # We shouldn't try to lock the repo if we can't lock the branch
 
705
        b = self.get_instrumented_branch()
 
706
        b.control_files.disable_lock_write()
 
707
 
 
708
        self.assertRaises(TestPreventLocking, b.lock_write)
 
709
        self.assertFalse(b.is_locked())
 
710
        self.assertFalse(b.repository.is_locked())
 
711
 
 
712
        self.assertEqual([('b', 'lw', True), ('bc', 'lw', False),
 
713
                         ], self.locks)
 
714
 
 
715
 
 
716