~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: v.ladeuil+lp at free
  • Date: 2007-05-18 18:20:31 UTC
  • mto: (2485.8.44 bzr.connection.sharing)
  • mto: This revision was merged to the branch mainline in revision 2646.
  • Revision ID: v.ladeuil+lp@free.fr-20070518182031-gbg2cgidv5l20x9p
Takes Robert comments into account.

* bzrlib/transport/ftp.py:
(FtpTransport.__init__): Write a better explanation.

* bzrlib/tests/test_init.py:
(InstrumentedTransport): Just make hooks a class attribute.
(InstrumentedTransport._get_FTP): Run hook directly in the for
loop.
(TransportHooks.run_hook, TransportHooks.uninstall_hook): Not
needed. The hooks should be cleaned up by the test itself.
(TestInit.setUp.cleanup): Resset to default hooks.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
    errors,
28
28
    osutils,
29
29
    tests,
30
 
    transport,
31
30
    )
32
31
from bzrlib.errors import (
33
32
        LockBreakMismatch,
221
220
        One thread holds on a lock and then releases it; another 
222
221
        tries to lock it.
223
222
        """
224
 
        # This test sometimes fails like this:
225
 
        # Traceback (most recent call last):
226
 
 
227
 
        #   File "/home/pqm/bzr-pqm-workdir/home/+trunk/bzrlib/tests/
228
 
        # test_lockdir.py", line 247, in test_32_lock_wait_succeed
229
 
        #     self.assertEqual(1, len(self._logged_reports))
230
 
        # AssertionError: not equal:
231
 
        # a = 1
232
 
        # b = 0
233
 
        raise tests.TestSkipped("Test fails intermittently")
234
223
        t = self.get_transport()
235
224
        lf1 = LockDir(t, 'test_lock')
236
225
        lf1.create()
268
257
        self.assertEndsWith(args[3], ' ago')
269
258
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
270
259
 
 
260
    def test_33_wait(self):
 
261
        """Succeed when waiting on a lock that gets released
 
262
 
 
263
        The difference from test_32_lock_wait_succeed is that the second 
 
264
        caller does not actually acquire the lock, but just waits for it
 
265
        to be released.  This is done over a readonly transport.
 
266
        """
 
267
        t = self.get_transport()
 
268
        lf1 = LockDir(t, 'test_lock')
 
269
        lf1.create()
 
270
        lf1.attempt_lock()
 
271
 
 
272
        def wait_and_unlock():
 
273
            time.sleep(0.1)
 
274
            lf1.unlock()
 
275
        unlocker = Thread(target=wait_and_unlock)
 
276
        unlocker.start()
 
277
        try:
 
278
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
 
279
            before = time.time()
 
280
            # wait but don't lock
 
281
            lf2.wait(timeout=0.4, poll=0.1)
 
282
            after = time.time()
 
283
            self.assertTrue(after - before <= 1.0)
 
284
        finally:
 
285
            unlocker.join()
 
286
 
271
287
    def test_34_lock_write_waits(self):
272
288
        """LockDir.lock_write() will wait for the lock.""" 
273
289
        # the test suite sets the default to 0 to make deadlocks fail fast.
599
615
        os.mkdir(lock_path)
600
616
        osutils.make_readonly(lock_path)
601
617
        self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)
602
 
 
603
 
    def test_lock_by_token(self):
604
 
        ld1 = self.get_lock()
605
 
        token = ld1.lock_write()
606
 
        self.assertNotEqual(None, token)
607
 
        ld2 = self.get_lock()
608
 
        t2 = ld2.lock_write(token)
609
 
        self.assertEqual(token, t2)
610
 
 
611
 
    def test_lock_with_buggy_rename(self):
612
 
        # test that lock acquisition handles servers which pretend they
613
 
        # renamed correctly but that actually fail
614
 
        t = transport.get_transport('brokenrename+' + self.get_url())
615
 
        ld1 = LockDir(t, 'test_lock')
616
 
        ld1.create()
617
 
        ld1.attempt_lock()
618
 
        ld2 = LockDir(t, 'test_lock')
619
 
        # we should fail to lock
620
 
        e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
621
 
        # now the original caller should succeed in unlocking
622
 
        ld1.unlock()
623
 
        # and there should be nothing left over
624
 
        self.assertEquals([], t.list_dir('test_lock'))
625
 
 
626
 
    def test_failed_lock_leaves_no_trash(self):
627
 
        # if we fail to acquire the lock, we don't leave pending directories
628
 
        # behind -- https://bugs.launchpad.net/bzr/+bug/109169
629
 
        ld1 = self.get_lock()
630
 
        ld2 = self.get_lock()
631
 
        # should be nothing before we start
632
 
        ld1.create()
633
 
        t = self.get_transport().clone('test_lock')
634
 
        def check_dir(a):
635
 
            self.assertEquals(a, t.list_dir('.'))
636
 
        check_dir([])
637
 
        # when held, that's all we see
638
 
        ld1.attempt_lock()
639
 
        check_dir(['held'])
640
 
        # second guy should fail
641
 
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
642
 
        # no kibble
643
 
        check_dir(['held'])