~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Matthäus G. Chajdas
  • Date: 2010-10-12 01:18:01 UTC
  • mto: (5484.1.1 trunk)
  • mto: This revision was merged to the branch mainline in revision 5485.
  • Revision ID: dev@anteru.net-20101012011801-thahmhfxdzz0j6d4
Remove spaces.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007, 2008, 2010 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
39
39
    LockNotHeld,
40
40
    )
41
41
from bzrlib.lockdir import LockDir
42
 
from bzrlib.tests import TestCaseWithTransport
 
42
from bzrlib.tests import (features, TestCaseWithTransport)
43
43
from bzrlib.trace import note
44
44
 
45
45
# These tests sometimes use threads to test the behaviour of lock files with
191
191
                    "took %f seconds to detect lock contention" % (after - before))
192
192
        finally:
193
193
            lf1.unlock()
194
 
        lock_base = lf2.transport.abspath(lf2.path)
195
194
        self.assertEqual(1, len(self._logged_reports))
196
 
        lock_url = lf2.transport.abspath(lf2.path)
197
 
        self.assertEqual('%s %s\n'
198
 
                         '%s\n%s\n'
199
 
                         'Will continue to try until %s, unless '
200
 
                         'you press Ctrl-C.\n'
201
 
                         'See "bzr help break-lock" for more.',
202
 
                         self._logged_reports[0][0])
203
 
        args = self._logged_reports[0][1]
204
 
        self.assertEqual('Unable to obtain', args[0])
205
 
        self.assertEqual('lock %s' % (lock_base,), args[1])
206
 
        self.assertStartsWith(args[2], 'held by ')
207
 
        self.assertStartsWith(args[3], 'locked ')
208
 
        self.assertEndsWith(args[3], ' ago')
209
 
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
195
        self.assertEqual(self._logged_reports[0][0],
 
196
            '%s lock %s held by %s\n'
 
197
            'at %s [process #%s], acquired %s.\n'
 
198
            'Will continue to try until %s, unless '
 
199
            'you press Ctrl-C.\n'
 
200
            'See "bzr help break-lock" for more.')
 
201
        start, lock_url, user, hostname, pid, time_ago, deadline_str = \
 
202
            self._logged_reports[0][1]
 
203
        self.assertEqual(start, u'Unable to obtain')
 
204
        self.assertEqual(user, u'jrandom@example.com')
 
205
        # skip hostname
 
206
        self.assertContainsRe(pid, r'\d+')
 
207
        self.assertContainsRe(time_ago, r'.* ago')
 
208
        self.assertContainsRe(deadline_str, r'\d{2}:\d{2}:\d{2}')
210
209
 
211
210
    def test_31_lock_wait_easy(self):
212
211
        """Succeed when waiting on a lock with no contention.
565
564
        finally:
566
565
            bzrlib.ui.ui_factory = orig_factory
567
566
 
 
567
    def test_break_lock_corrupt_info(self):
 
568
        """break_lock works even if the info file is corrupt (and tells the UI
 
569
        that it is corrupt).
 
570
        """
 
571
        ld = self.get_lock()
 
572
        ld2 = self.get_lock()
 
573
        ld.create()
 
574
        ld.lock_write()
 
575
        ld.transport.put_bytes_non_atomic('test_lock/held/info', '\0')
 
576
        class LoggingUIFactory(bzrlib.ui.SilentUIFactory):
 
577
            def __init__(self):
 
578
                self.prompts = []
 
579
            def get_boolean(self, prompt):
 
580
                self.prompts.append(('boolean', prompt))
 
581
                return True
 
582
        ui = LoggingUIFactory()
 
583
        orig_factory = bzrlib.ui.ui_factory
 
584
        bzrlib.ui.ui_factory = ui
 
585
        try:
 
586
            ld2.break_lock()
 
587
            self.assertLength(1, ui.prompts)
 
588
            self.assertEqual('boolean', ui.prompts[0][0])
 
589
            self.assertStartsWith(ui.prompts[0][1], 'Break (corrupt LockDir')
 
590
            self.assertRaises(LockBroken, ld.unlock)
 
591
        finally:
 
592
            bzrlib.ui.ui_factory = orig_factory
 
593
 
 
594
    def test_break_lock_missing_info(self):
 
595
        """break_lock works even if the info file is missing (and tells the UI
 
596
        that it is corrupt).
 
597
        """
 
598
        ld = self.get_lock()
 
599
        ld2 = self.get_lock()
 
600
        ld.create()
 
601
        ld.lock_write()
 
602
        ld.transport.delete('test_lock/held/info')
 
603
        class LoggingUIFactory(bzrlib.ui.SilentUIFactory):
 
604
            def __init__(self):
 
605
                self.prompts = []
 
606
            def get_boolean(self, prompt):
 
607
                self.prompts.append(('boolean', prompt))
 
608
                return True
 
609
        ui = LoggingUIFactory()
 
610
        orig_factory = bzrlib.ui.ui_factory
 
611
        bzrlib.ui.ui_factory = ui
 
612
        try:
 
613
            ld2.break_lock()
 
614
            self.assertRaises(LockBroken, ld.unlock)
 
615
            self.assertLength(0, ui.prompts)
 
616
        finally:
 
617
            bzrlib.ui.ui_factory = orig_factory
 
618
        # Suppress warnings due to ld not being unlocked
 
619
        # XXX: if lock_broken hook was invoked in this case, this hack would
 
620
        # not be necessary.  - Andrew Bennetts, 2010-09-06.
 
621
        del self._lock_actions[:]
 
622
 
568
623
    def test_create_missing_base_directory(self):
569
624
        """If LockDir.path doesn't exist, it can be created
570
625
 
597
652
            info_list = ld1._format_lock_info(ld1.peek())
598
653
        finally:
599
654
            ld1.unlock()
600
 
        self.assertEqual('lock %s' % (ld1.transport.abspath(ld1.path),),
601
 
                         info_list[0])
602
 
        self.assertContainsRe(info_list[1],
603
 
                              r'^held by .* on host .* \[process #\d*\]$')
604
 
        self.assertContainsRe(info_list[2], r'locked \d+ seconds? ago$')
 
655
        self.assertEqual(info_list[0], u'jrandom@example.com')
 
656
        # info_list[1] is hostname. we skip this.
 
657
        self.assertContainsRe(info_list[2], '^\d+$') # pid
 
658
        self.assertContainsRe(info_list[3], r'^\d+ seconds? ago$') # time_ago
605
659
 
606
660
    def test_lock_without_email(self):
607
661
        global_config = config.GlobalConfig()
613
667
        ld1.unlock()
614
668
 
615
669
    def test_lock_permission(self):
 
670
        self.requireFeature(features.not_running_as_root)
616
671
        if not osutils.supports_posix_readonly():
617
672
            raise tests.TestSkipped('Cannot induce a permission failure')
618
673
        ld1 = self.get_lock()
666
721
        # no kibble
667
722
        check_dir(['held'])
668
723
 
 
724
    def test_no_lockdir_info(self):
 
725
        """We can cope with empty info files."""
 
726
        # This seems like a fairly common failure case - see
 
727
        # <https://bugs.launchpad.net/bzr/+bug/185103> and all its dupes.
 
728
        # Processes are often interrupted after opening the file
 
729
        # before the actual contents are committed.
 
730
        t = self.get_transport()
 
731
        t.mkdir('test_lock')
 
732
        t.mkdir('test_lock/held')
 
733
        t.put_bytes('test_lock/held/info', '')
 
734
        lf = LockDir(t, 'test_lock')
 
735
        info = lf.peek()
 
736
        formatted_info = lf._format_lock_info(info)
 
737
        self.assertEquals(
 
738
            ['<unknown>', '<unknown>', '<unknown>', '(unknown)'],
 
739
            formatted_info)
 
740
 
 
741
    def test_corrupt_lockdir_info(self):
 
742
        """We can cope with corrupt (and thus unparseable) info files."""
 
743
        # This seems like a fairly common failure case too - see
 
744
        # <https://bugs.edge.launchpad.net/bzr/+bug/619872> for instance.
 
745
        # In particular some systems tend to fill recently created files with
 
746
        # nul bytes after recovering from a system crash.
 
747
        t = self.get_transport()
 
748
        t.mkdir('test_lock')
 
749
        t.mkdir('test_lock/held')
 
750
        t.put_bytes('test_lock/held/info', '\0')
 
751
        lf = LockDir(t, 'test_lock')
 
752
        self.assertRaises(errors.LockCorrupt, lf.peek)
 
753
        # Currently attempt_lock gives LockContention, but LockCorrupt would be
 
754
        # a reasonable result too.
 
755
        self.assertRaises(
 
756
            (errors.LockCorrupt, errors.LockContention), lf.attempt_lock)
 
757
        self.assertRaises(errors.LockCorrupt, lf.validate_token, 'fake token')
 
758
 
 
759
    def test_missing_lockdir_info(self):
 
760
        """We can cope with absent info files."""
 
761
        t = self.get_transport()
 
762
        t.mkdir('test_lock')
 
763
        t.mkdir('test_lock/held')
 
764
        lf = LockDir(t, 'test_lock')
 
765
        # In this case we expect the 'not held' result from peek, because peek
 
766
        # cannot be expected to notice that there is a 'held' directory with no
 
767
        # 'info' file.
 
768
        self.assertEqual(None, lf.peek())
 
769
        # And lock/unlock may work or give LockContention (but not any other
 
770
        # error).
 
771
        try:
 
772
            lf.attempt_lock()
 
773
        except LockContention:
 
774
            # LockContention is ok, and expected on Windows
 
775
            pass
 
776
        else:
 
777
            # no error is ok, and expected on POSIX (because POSIX allows
 
778
            # os.rename over an empty directory).
 
779
            lf.unlock()
 
780
        # Currently raises TokenMismatch, but LockCorrupt would be reasonable
 
781
        # too.
 
782
        self.assertRaises(
 
783
            (errors.TokenMismatch, errors.LockCorrupt),
 
784
            lf.validate_token, 'fake token')
 
785
 
669
786
 
670
787
class TestLockDirHooks(TestCaseWithTransport):
671
788