~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Vincent Ladeuil
  • Date: 2009-04-27 16:10:10 UTC
  • mto: (4310.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4311.
  • Revision ID: v.ladeuil+lp@free.fr-20090427161010-7swfzeagf63cpixd
Fix bug #367726 by reverting some default user handling introduced
while fixing bug #256612.

* bzrlib/transport/ssh.py:
(_paramiko_auth): Explicitly use getpass.getuser() as default
user.

* bzrlib/transport/ftp/_gssapi.py:
(GSSAPIFtpTransport._create_connection): Explicitly use
getpass.getuser() as default user.

* bzrlib/transport/ftp/__init__.py:
(FtpTransport._create_connection): Explicitly use
getpass.getuser() as default user.

* bzrlib/tests/test_sftp_transport.py:
(TestUsesAuthConfig.test_sftp_is_none_if_no_config)
(TestUsesAuthConfig.test_sftp_doesnt_prompt_username): Revert to
None as the default user.

* bzrlib/tests/test_remote.py:
(TestRemoteSSHTransportAuthentication): The really offending one:
revert to None as the default user.

* bzrlib/tests/test_config.py:
(TestAuthenticationConfig.test_username_default_no_prompt): Update
test (and some PEP8).

* bzrlib/smtp_connection.py:
(SMTPConnection._authenticate): Revert to None as the default
user.

* bzrlib/plugins/launchpad/account.py:
(_get_auth_user): Revert default value handling.

* bzrlib/config.py:
(AuthenticationConfig.get_user): Fix doc-string. Leave default
value handling to callers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
39
39
    LockNotHeld,
40
40
    )
41
41
from bzrlib.lockdir import LockDir
42
 
from bzrlib.tests import (features, TestCaseWithTransport)
 
42
from bzrlib.tests import TestCaseWithTransport
43
43
from bzrlib.trace import note
44
44
 
45
45
# These tests sometimes use threads to test the behaviour of lock files with
142
142
        lf1 = LockDir(t, 'test_lock')
143
143
        lf1.create()
144
144
        lf1.attempt_lock()
145
 
        self.addCleanup(lf1.unlock)
146
145
        # lock is held, should get some info on it
147
146
        info1 = lf1.peek()
148
147
        self.assertEqual(set(info1.keys()),
162
161
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
163
162
        self.assertEqual(lf2.peek(), None)
164
163
        lf1.attempt_lock()
165
 
        self.addCleanup(lf1.unlock)
166
164
        info2 = lf2.peek()
167
165
        self.assertTrue(info2)
168
166
        self.assertEqual(info2['nonce'], lf1.nonce)
191
189
                    "took %f seconds to detect lock contention" % (after - before))
192
190
        finally:
193
191
            lf1.unlock()
 
192
        lock_base = lf2.transport.abspath(lf2.path)
194
193
        self.assertEqual(1, len(self._logged_reports))
195
 
        self.assertEqual(self._logged_reports[0][0],
196
 
            '%s lock %s held by %s\n'
197
 
            'at %s [process #%s], acquired %s.\n'
198
 
            'Will continue to try until %s, unless '
199
 
            'you press Ctrl-C.\n'
200
 
            'See "bzr help break-lock" for more.')
201
 
        start, lock_url, user, hostname, pid, time_ago, deadline_str = \
202
 
            self._logged_reports[0][1]
203
 
        self.assertEqual(start, u'Unable to obtain')
204
 
        self.assertEqual(user, u'jrandom@example.com')
205
 
        # skip hostname
206
 
        self.assertContainsRe(pid, r'\d+')
207
 
        self.assertContainsRe(time_ago, r'.* ago')
208
 
        self.assertContainsRe(deadline_str, r'\d{2}:\d{2}:\d{2}')
 
194
        lock_url = lf2.transport.abspath(lf2.path)
 
195
        self.assertEqual('%s %s\n'
 
196
                         '%s\n%s\n'
 
197
                         'Will continue to try until %s, unless '
 
198
                         'you press Ctrl-C\n'
 
199
                         'If you\'re sure that it\'s not being '
 
200
                         'modified, use bzr break-lock %s',
 
201
                         self._logged_reports[0][0])
 
202
        args = self._logged_reports[0][1]
 
203
        self.assertEqual('Unable to obtain', args[0])
 
204
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
205
        self.assertStartsWith(args[2], 'held by ')
 
206
        self.assertStartsWith(args[3], 'locked ')
 
207
        self.assertEndsWith(args[3], ' ago')
 
208
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
209
209
 
210
210
    def test_31_lock_wait_easy(self):
211
211
        """Succeed when waiting on a lock with no contention.
418
418
        self.assertEqual('%s %s\n'
419
419
                         '%s\n%s\n'
420
420
                         'Will continue to try until %s, unless '
421
 
                         'you press Ctrl-C.\n'
422
 
                         'See "bzr help break-lock" for more.',
 
421
                         'you press Ctrl-C\n'
 
422
                         'If you\'re sure that it\'s not being '
 
423
                         'modified, use bzr break-lock %s',
423
424
                         self._logged_reports[0][0])
424
425
        args = self._logged_reports[0][1]
425
426
        self.assertEqual('Unable to obtain', args[0])
432
433
        self.assertEqual('%s %s\n'
433
434
                         '%s\n%s\n'
434
435
                         'Will continue to try until %s, unless '
435
 
                         'you press Ctrl-C.\n'
436
 
                         'See "bzr help break-lock" for more.',
 
436
                         'you press Ctrl-C\n'
 
437
                         'If you\'re sure that it\'s not being '
 
438
                         'modified, use bzr break-lock %s',
437
439
                         self._logged_reports[1][0])
438
440
        args = self._logged_reports[1][1]
439
441
        self.assertEqual('Lock owner changed for', args[0])
449
451
        lf1 = LockDir(t, 'test_lock')
450
452
        lf1.create()
451
453
        lf1.attempt_lock()
452
 
        self.addCleanup(lf1.unlock)
453
454
        lf1.confirm()
454
455
 
455
456
    def test_41_confirm_not_held(self):
467
468
        lf1.attempt_lock()
468
469
        t.move('test_lock', 'lock_gone_now')
469
470
        self.assertRaises(LockBroken, lf1.confirm)
470
 
        # Clean up
471
 
        t.move('lock_gone_now', 'test_lock')
472
 
        lf1.unlock()
473
471
 
474
472
    def test_43_break(self):
475
473
        """Break a lock whose caller has forgotten it"""
486
484
        lf2.force_break(holder_info)
487
485
        # now we should be able to take it
488
486
        lf2.attempt_lock()
489
 
        self.addCleanup(lf2.unlock)
490
487
        lf2.confirm()
491
488
 
492
489
    def test_44_break_already_released(self):
504
501
        lf2.force_break(holder_info)
505
502
        # now we should be able to take it
506
503
        lf2.attempt_lock()
507
 
        self.addCleanup(lf2.unlock)
508
504
        lf2.confirm()
509
505
 
510
506
    def test_45_break_mismatch(self):
557
553
        # do this without IO redirection to ensure it doesn't prompt.
558
554
        self.assertRaises(AssertionError, ld1.break_lock)
559
555
        orig_factory = bzrlib.ui.ui_factory
560
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
556
        # silent ui - no need for stdout
 
557
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
558
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
561
559
        try:
562
560
            ld2.break_lock()
563
561
            self.assertRaises(LockBroken, ld1.unlock)
564
562
        finally:
565
563
            bzrlib.ui.ui_factory = orig_factory
566
564
 
567
 
    def test_break_lock_corrupt_info(self):
568
 
        """break_lock works even if the info file is corrupt (and tells the UI
569
 
        that it is corrupt).
570
 
        """
571
 
        ld = self.get_lock()
572
 
        ld2 = self.get_lock()
573
 
        ld.create()
574
 
        ld.lock_write()
575
 
        ld.transport.put_bytes_non_atomic('test_lock/held/info', '\0')
576
 
        class LoggingUIFactory(bzrlib.ui.SilentUIFactory):
577
 
            def __init__(self):
578
 
                self.prompts = []
579
 
            def get_boolean(self, prompt):
580
 
                self.prompts.append(('boolean', prompt))
581
 
                return True
582
 
        ui = LoggingUIFactory()
583
 
        orig_factory = bzrlib.ui.ui_factory
584
 
        bzrlib.ui.ui_factory = ui
585
 
        try:
586
 
            ld2.break_lock()
587
 
            self.assertLength(1, ui.prompts)
588
 
            self.assertEqual('boolean', ui.prompts[0][0])
589
 
            self.assertStartsWith(ui.prompts[0][1], 'Break (corrupt LockDir')
590
 
            self.assertRaises(LockBroken, ld.unlock)
591
 
        finally:
592
 
            bzrlib.ui.ui_factory = orig_factory
593
 
 
594
 
    def test_break_lock_missing_info(self):
595
 
        """break_lock works even if the info file is missing (and tells the UI
596
 
        that it is corrupt).
597
 
        """
598
 
        ld = self.get_lock()
599
 
        ld2 = self.get_lock()
600
 
        ld.create()
601
 
        ld.lock_write()
602
 
        ld.transport.delete('test_lock/held/info')
603
 
        class LoggingUIFactory(bzrlib.ui.SilentUIFactory):
604
 
            def __init__(self):
605
 
                self.prompts = []
606
 
            def get_boolean(self, prompt):
607
 
                self.prompts.append(('boolean', prompt))
608
 
                return True
609
 
        ui = LoggingUIFactory()
610
 
        orig_factory = bzrlib.ui.ui_factory
611
 
        bzrlib.ui.ui_factory = ui
612
 
        try:
613
 
            ld2.break_lock()
614
 
            self.assertRaises(LockBroken, ld.unlock)
615
 
            self.assertLength(0, ui.prompts)
616
 
        finally:
617
 
            bzrlib.ui.ui_factory = orig_factory
618
 
        # Suppress warnings due to ld not being unlocked
619
 
        # XXX: if lock_broken hook was invoked in this case, this hack would
620
 
        # not be necessary.  - Andrew Bennetts, 2010-09-06.
621
 
        del self._lock_actions[:]
622
 
 
623
565
    def test_create_missing_base_directory(self):
624
566
        """If LockDir.path doesn't exist, it can be created
625
567
 
652
594
            info_list = ld1._format_lock_info(ld1.peek())
653
595
        finally:
654
596
            ld1.unlock()
655
 
        self.assertEqual(info_list[0], u'jrandom@example.com')
656
 
        # info_list[1] is hostname. we skip this.
657
 
        self.assertContainsRe(info_list[2], '^\d+$') # pid
658
 
        self.assertContainsRe(info_list[3], r'^\d+ seconds? ago$') # time_ago
 
597
        self.assertEqual('lock %s' % (ld1.transport.abspath(ld1.path),),
 
598
                         info_list[0])
 
599
        self.assertContainsRe(info_list[1],
 
600
                              r'^held by .* on host .* \[process #\d*\]$')
 
601
        self.assertContainsRe(info_list[2], r'locked \d+ seconds? ago$')
659
602
 
660
603
    def test_lock_without_email(self):
661
604
        global_config = config.GlobalConfig()
667
610
        ld1.unlock()
668
611
 
669
612
    def test_lock_permission(self):
670
 
        self.requireFeature(features.not_running_as_root)
671
613
        if not osutils.supports_posix_readonly():
672
614
            raise tests.TestSkipped('Cannot induce a permission failure')
673
615
        ld1 = self.get_lock()
679
621
    def test_lock_by_token(self):
680
622
        ld1 = self.get_lock()
681
623
        token = ld1.lock_write()
682
 
        self.addCleanup(ld1.unlock)
683
624
        self.assertNotEqual(None, token)
684
625
        ld2 = self.get_lock()
685
626
        t2 = ld2.lock_write(token)
686
 
        self.addCleanup(ld2.unlock)
687
627
        self.assertEqual(token, t2)
688
628
 
689
629
    def test_lock_with_buggy_rename(self):
714
654
        check_dir([])
715
655
        # when held, that's all we see
716
656
        ld1.attempt_lock()
717
 
        self.addCleanup(ld1.unlock)
718
657
        check_dir(['held'])
719
658
        # second guy should fail
720
659
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
721
660
        # no kibble
722
661
        check_dir(['held'])
723
662
 
724
 
    def test_no_lockdir_info(self):
725
 
        """We can cope with empty info files."""
726
 
        # This seems like a fairly common failure case - see
727
 
        # <https://bugs.launchpad.net/bzr/+bug/185103> and all its dupes.
728
 
        # Processes are often interrupted after opening the file
729
 
        # before the actual contents are committed.
730
 
        t = self.get_transport()
731
 
        t.mkdir('test_lock')
732
 
        t.mkdir('test_lock/held')
733
 
        t.put_bytes('test_lock/held/info', '')
734
 
        lf = LockDir(t, 'test_lock')
735
 
        info = lf.peek()
736
 
        formatted_info = lf._format_lock_info(info)
737
 
        self.assertEquals(
738
 
            ['<unknown>', '<unknown>', '<unknown>', '(unknown)'],
739
 
            formatted_info)
740
 
 
741
 
    def test_corrupt_lockdir_info(self):
742
 
        """We can cope with corrupt (and thus unparseable) info files."""
743
 
        # This seems like a fairly common failure case too - see
744
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/619872> for instance.
745
 
        # In particular some systems tend to fill recently created files with
746
 
        # nul bytes after recovering from a system crash.
747
 
        t = self.get_transport()
748
 
        t.mkdir('test_lock')
749
 
        t.mkdir('test_lock/held')
750
 
        t.put_bytes('test_lock/held/info', '\0')
751
 
        lf = LockDir(t, 'test_lock')
752
 
        self.assertRaises(errors.LockCorrupt, lf.peek)
753
 
        # Currently attempt_lock gives LockContention, but LockCorrupt would be
754
 
        # a reasonable result too.
755
 
        self.assertRaises(
756
 
            (errors.LockCorrupt, errors.LockContention), lf.attempt_lock)
757
 
        self.assertRaises(errors.LockCorrupt, lf.validate_token, 'fake token')
758
 
 
759
 
    def test_missing_lockdir_info(self):
760
 
        """We can cope with absent info files."""
761
 
        t = self.get_transport()
762
 
        t.mkdir('test_lock')
763
 
        t.mkdir('test_lock/held')
764
 
        lf = LockDir(t, 'test_lock')
765
 
        # In this case we expect the 'not held' result from peek, because peek
766
 
        # cannot be expected to notice that there is a 'held' directory with no
767
 
        # 'info' file.
768
 
        self.assertEqual(None, lf.peek())
769
 
        # And lock/unlock may work or give LockContention (but not any other
770
 
        # error).
771
 
        try:
772
 
            lf.attempt_lock()
773
 
        except LockContention:
774
 
            # LockContention is ok, and expected on Windows
775
 
            pass
776
 
        else:
777
 
            # no error is ok, and expected on POSIX (because POSIX allows
778
 
            # os.rename over an empty directory).
779
 
            lf.unlock()
780
 
        # Currently raises TokenMismatch, but LockCorrupt would be reasonable
781
 
        # too.
782
 
        self.assertRaises(
783
 
            (errors.TokenMismatch, errors.LockCorrupt),
784
 
            lf.validate_token, 'fake token')
785
 
 
786
 
 
787
 
class TestLockDirHooks(TestCaseWithTransport):
788
 
 
789
 
    def setUp(self):
790
 
        super(TestLockDirHooks, self).setUp()
791
 
        self._calls = []
792
 
 
793
 
    def get_lock(self):
794
 
        return LockDir(self.get_transport(), 'test_lock')
795
 
 
796
663
    def record_hook(self, result):
797
664
        self._calls.append(result)
798
665
 
 
666
    def reset_hooks(self):
 
667
        self._old_hooks = lock.Lock.hooks
 
668
        self.addCleanup(self.restore_hooks)
 
669
        lock.Lock.hooks = lock.LockHooks()
 
670
 
 
671
    def restore_hooks(self):
 
672
        lock.Lock.hooks = self._old_hooks
 
673
 
799
674
    def test_LockDir_acquired_success(self):
800
675
        # the LockDir.lock_acquired hook fires when a lock is acquired.
 
676
        self._calls = []
 
677
        self.reset_hooks()
801
678
        LockDir.hooks.install_named_hook('lock_acquired',
802
 
                                         self.record_hook, 'record_hook')
 
679
            self.record_hook, 'record_hook')
803
680
        ld = self.get_lock()
804
681
        ld.create()
805
682
        self.assertEqual([], self._calls)
811
688
 
812
689
    def test_LockDir_acquired_fail(self):
813
690
        # the LockDir.lock_acquired hook does not fire on failure.
 
691
        self._calls = []
 
692
        self.reset_hooks()
814
693
        ld = self.get_lock()
815
694
        ld.create()
816
695
        ld2 = self.get_lock()
817
696
        ld2.attempt_lock()
818
697
        # install a lock hook now, when the disk lock is locked
819
698
        LockDir.hooks.install_named_hook('lock_acquired',
820
 
                                         self.record_hook, 'record_hook')
 
699
            self.record_hook, 'record_hook')
821
700
        self.assertRaises(errors.LockContention, ld.attempt_lock)
822
701
        self.assertEqual([], self._calls)
823
702
        ld2.unlock()
825
704
 
826
705
    def test_LockDir_released_success(self):
827
706
        # the LockDir.lock_released hook fires when a lock is acquired.
 
707
        self._calls = []
 
708
        self.reset_hooks()
828
709
        LockDir.hooks.install_named_hook('lock_released',
829
 
                                         self.record_hook, 'record_hook')
 
710
            self.record_hook, 'record_hook')
830
711
        ld = self.get_lock()
831
712
        ld.create()
832
713
        self.assertEqual([], self._calls)
838
719
 
839
720
    def test_LockDir_released_fail(self):
840
721
        # the LockDir.lock_released hook does not fire on failure.
 
722
        self._calls = []
 
723
        self.reset_hooks()
841
724
        ld = self.get_lock()
842
725
        ld.create()
843
726
        ld2 = self.get_lock()
844
727
        ld.attempt_lock()
845
728
        ld2.force_break(ld2.peek())
846
729
        LockDir.hooks.install_named_hook('lock_released',
847
 
                                         self.record_hook, 'record_hook')
 
730
            self.record_hook, 'record_hook')
848
731
        self.assertRaises(LockBroken, ld.unlock)
849
732
        self.assertEqual([], self._calls)
850
 
 
851
 
    def test_LockDir_broken_success(self):
852
 
        # the LockDir.lock_broken hook fires when a lock is broken.
853
 
        ld = self.get_lock()
854
 
        ld.create()
855
 
        ld2 = self.get_lock()
856
 
        result = ld.attempt_lock()
857
 
        LockDir.hooks.install_named_hook('lock_broken',
858
 
                                         self.record_hook, 'record_hook')
859
 
        ld2.force_break(ld2.peek())
860
 
        lock_path = ld.transport.abspath(ld.path)
861
 
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
862
 
 
863
 
    def test_LockDir_broken_failure(self):
864
 
        # the LockDir.lock_broken hook does not fires when a lock is already
865
 
        # released.
866
 
        ld = self.get_lock()
867
 
        ld.create()
868
 
        ld2 = self.get_lock()
869
 
        result = ld.attempt_lock()
870
 
        holder_info = ld2.peek()
871
 
        ld.unlock()
872
 
        LockDir.hooks.install_named_hook('lock_broken',
873
 
                                         self.record_hook, 'record_hook')
874
 
        ld2.force_break(holder_info)
875
 
        lock_path = ld.transport.abspath(ld.path)
876
 
        self.assertEqual([], self._calls)