~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-27 22:29:55 UTC
  • mto: (3735.39.2 clean)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090327222955-utifmfm888zerixt
Implement apply_delta_to_source which doesn't have to malloc another string.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
142
142
        lf1 = LockDir(t, 'test_lock')
143
143
        lf1.create()
144
144
        lf1.attempt_lock()
145
 
        self.addCleanup(lf1.unlock)
146
145
        # lock is held, should get some info on it
147
146
        info1 = lf1.peek()
148
147
        self.assertEqual(set(info1.keys()),
162
161
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
163
162
        self.assertEqual(lf2.peek(), None)
164
163
        lf1.attempt_lock()
165
 
        self.addCleanup(lf1.unlock)
166
164
        info2 = lf2.peek()
167
165
        self.assertTrue(info2)
168
166
        self.assertEqual(info2['nonce'], lf1.nonce)
197
195
        self.assertEqual('%s %s\n'
198
196
                         '%s\n%s\n'
199
197
                         'Will continue to try until %s, unless '
200
 
                         'you press Ctrl-C.\n'
201
 
                         'See "bzr help break-lock" for more.',
 
198
                         'you press Ctrl-C\n'
 
199
                         'If you\'re sure that it\'s not being '
 
200
                         'modified, use bzr break-lock %s',
202
201
                         self._logged_reports[0][0])
203
202
        args = self._logged_reports[0][1]
204
203
        self.assertEqual('Unable to obtain', args[0])
419
418
        self.assertEqual('%s %s\n'
420
419
                         '%s\n%s\n'
421
420
                         'Will continue to try until %s, unless '
422
 
                         'you press Ctrl-C.\n'
423
 
                         'See "bzr help break-lock" for more.',
 
421
                         'you press Ctrl-C\n'
 
422
                         'If you\'re sure that it\'s not being '
 
423
                         'modified, use bzr break-lock %s',
424
424
                         self._logged_reports[0][0])
425
425
        args = self._logged_reports[0][1]
426
426
        self.assertEqual('Unable to obtain', args[0])
433
433
        self.assertEqual('%s %s\n'
434
434
                         '%s\n%s\n'
435
435
                         'Will continue to try until %s, unless '
436
 
                         'you press Ctrl-C.\n'
437
 
                         'See "bzr help break-lock" for more.',
 
436
                         'you press Ctrl-C\n'
 
437
                         'If you\'re sure that it\'s not being '
 
438
                         'modified, use bzr break-lock %s',
438
439
                         self._logged_reports[1][0])
439
440
        args = self._logged_reports[1][1]
440
441
        self.assertEqual('Lock owner changed for', args[0])
450
451
        lf1 = LockDir(t, 'test_lock')
451
452
        lf1.create()
452
453
        lf1.attempt_lock()
453
 
        self.addCleanup(lf1.unlock)
454
454
        lf1.confirm()
455
455
 
456
456
    def test_41_confirm_not_held(self):
468
468
        lf1.attempt_lock()
469
469
        t.move('test_lock', 'lock_gone_now')
470
470
        self.assertRaises(LockBroken, lf1.confirm)
471
 
        # Clean up
472
 
        t.move('lock_gone_now', 'test_lock')
473
 
        lf1.unlock()
474
471
 
475
472
    def test_43_break(self):
476
473
        """Break a lock whose caller has forgotten it"""
487
484
        lf2.force_break(holder_info)
488
485
        # now we should be able to take it
489
486
        lf2.attempt_lock()
490
 
        self.addCleanup(lf2.unlock)
491
487
        lf2.confirm()
492
488
 
493
489
    def test_44_break_already_released(self):
505
501
        lf2.force_break(holder_info)
506
502
        # now we should be able to take it
507
503
        lf2.attempt_lock()
508
 
        self.addCleanup(lf2.unlock)
509
504
        lf2.confirm()
510
505
 
511
506
    def test_45_break_mismatch(self):
558
553
        # do this without IO redirection to ensure it doesn't prompt.
559
554
        self.assertRaises(AssertionError, ld1.break_lock)
560
555
        orig_factory = bzrlib.ui.ui_factory
561
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
556
        # silent ui - no need for stdout
 
557
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
558
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
562
559
        try:
563
560
            ld2.break_lock()
564
561
            self.assertRaises(LockBroken, ld1.unlock)
624
621
    def test_lock_by_token(self):
625
622
        ld1 = self.get_lock()
626
623
        token = ld1.lock_write()
627
 
        self.addCleanup(ld1.unlock)
628
624
        self.assertNotEqual(None, token)
629
625
        ld2 = self.get_lock()
630
626
        t2 = ld2.lock_write(token)
631
 
        self.addCleanup(ld2.unlock)
632
627
        self.assertEqual(token, t2)
633
628
 
634
629
    def test_lock_with_buggy_rename(self):
659
654
        check_dir([])
660
655
        # when held, that's all we see
661
656
        ld1.attempt_lock()
662
 
        self.addCleanup(ld1.unlock)
663
657
        check_dir(['held'])
664
658
        # second guy should fail
665
659
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
666
660
        # no kibble
667
661
        check_dir(['held'])
668
662
 
669
 
    def test_no_lockdir_info(self):
670
 
        """We can cope with empty info files."""
671
 
        # This seems like a fairly common failure case - see
672
 
        # <https://bugs.launchpad.net/bzr/+bug/185103> and all its dupes.
673
 
        # Processes are often interrupted after opening the file
674
 
        # before the actual contents are committed.
675
 
        t = self.get_transport()
676
 
        t.mkdir('test_lock')
677
 
        t.mkdir('test_lock/held')
678
 
        t.put_bytes('test_lock/held/info', '')
679
 
        lf = LockDir(t, 'test_lock')
680
 
        info = lf.peek()
681
 
        formatted_info = lf._format_lock_info(info)
682
 
        self.assertEquals(
683
 
            ['lock %s' % t.abspath('test_lock'),
684
 
             'held by <unknown> on host <unknown> [process #<unknown>]',
685
 
             'locked (unknown)'],
686
 
            formatted_info)
687
 
 
688
 
 
689
 
class TestLockDirHooks(TestCaseWithTransport):
690
 
 
691
 
    def setUp(self):
692
 
        super(TestLockDirHooks, self).setUp()
693
 
        self._calls = []
694
 
 
695
 
    def get_lock(self):
696
 
        return LockDir(self.get_transport(), 'test_lock')
697
 
 
698
663
    def record_hook(self, result):
699
664
        self._calls.append(result)
700
665
 
 
666
    def reset_hooks(self):
 
667
        self._old_hooks = lock.Lock.hooks
 
668
        self.addCleanup(self.restore_hooks)
 
669
        lock.Lock.hooks = lock.LockHooks()
 
670
 
 
671
    def restore_hooks(self):
 
672
        lock.Lock.hooks = self._old_hooks
 
673
 
701
674
    def test_LockDir_acquired_success(self):
702
675
        # the LockDir.lock_acquired hook fires when a lock is acquired.
 
676
        self._calls = []
 
677
        self.reset_hooks()
703
678
        LockDir.hooks.install_named_hook('lock_acquired',
704
 
                                         self.record_hook, 'record_hook')
 
679
            self.record_hook, 'record_hook')
705
680
        ld = self.get_lock()
706
681
        ld.create()
707
682
        self.assertEqual([], self._calls)
713
688
 
714
689
    def test_LockDir_acquired_fail(self):
715
690
        # the LockDir.lock_acquired hook does not fire on failure.
 
691
        self._calls = []
 
692
        self.reset_hooks()
716
693
        ld = self.get_lock()
717
694
        ld.create()
718
695
        ld2 = self.get_lock()
719
696
        ld2.attempt_lock()
720
697
        # install a lock hook now, when the disk lock is locked
721
698
        LockDir.hooks.install_named_hook('lock_acquired',
722
 
                                         self.record_hook, 'record_hook')
 
699
            self.record_hook, 'record_hook')
723
700
        self.assertRaises(errors.LockContention, ld.attempt_lock)
724
701
        self.assertEqual([], self._calls)
725
702
        ld2.unlock()
727
704
 
728
705
    def test_LockDir_released_success(self):
729
706
        # the LockDir.lock_released hook fires when a lock is acquired.
 
707
        self._calls = []
 
708
        self.reset_hooks()
730
709
        LockDir.hooks.install_named_hook('lock_released',
731
 
                                         self.record_hook, 'record_hook')
 
710
            self.record_hook, 'record_hook')
732
711
        ld = self.get_lock()
733
712
        ld.create()
734
713
        self.assertEqual([], self._calls)
740
719
 
741
720
    def test_LockDir_released_fail(self):
742
721
        # the LockDir.lock_released hook does not fire on failure.
 
722
        self._calls = []
 
723
        self.reset_hooks()
743
724
        ld = self.get_lock()
744
725
        ld.create()
745
726
        ld2 = self.get_lock()
746
727
        ld.attempt_lock()
747
728
        ld2.force_break(ld2.peek())
748
729
        LockDir.hooks.install_named_hook('lock_released',
749
 
                                         self.record_hook, 'record_hook')
 
730
            self.record_hook, 'record_hook')
750
731
        self.assertRaises(LockBroken, ld.unlock)
751
732
        self.assertEqual([], self._calls)
752
 
 
753
 
    def test_LockDir_broken_success(self):
754
 
        # the LockDir.lock_broken hook fires when a lock is broken.
755
 
        ld = self.get_lock()
756
 
        ld.create()
757
 
        ld2 = self.get_lock()
758
 
        result = ld.attempt_lock()
759
 
        LockDir.hooks.install_named_hook('lock_broken',
760
 
                                         self.record_hook, 'record_hook')
761
 
        ld2.force_break(ld2.peek())
762
 
        lock_path = ld.transport.abspath(ld.path)
763
 
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
764
 
 
765
 
    def test_LockDir_broken_failure(self):
766
 
        # the LockDir.lock_broken hook does not fires when a lock is already
767
 
        # released.
768
 
        ld = self.get_lock()
769
 
        ld.create()
770
 
        ld2 = self.get_lock()
771
 
        result = ld.attempt_lock()
772
 
        holder_info = ld2.peek()
773
 
        ld.unlock()
774
 
        LockDir.hooks.install_named_hook('lock_broken',
775
 
                                         self.record_hook, 'record_hook')
776
 
        ld2.force_break(holder_info)
777
 
        lock_path = ld.transport.abspath(ld.path)
778
 
        self.assertEqual([], self._calls)