~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
    errors,
28
28
    osutils,
29
29
    tests,
 
30
    transport,
30
31
    )
31
32
from bzrlib.errors import (
32
33
        LockBreakMismatch,
267
268
        self.assertEndsWith(args[3], ' ago')
268
269
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
269
270
 
270
 
    def test_33_wait(self):
271
 
        """Succeed when waiting on a lock that gets released
272
 
 
273
 
        The difference from test_32_lock_wait_succeed is that the second 
274
 
        caller does not actually acquire the lock, but just waits for it
275
 
        to be released.  This is done over a readonly transport.
276
 
        """
277
 
        t = self.get_transport()
278
 
        lf1 = LockDir(t, 'test_lock')
279
 
        lf1.create()
280
 
        lf1.attempt_lock()
281
 
 
282
 
        def wait_and_unlock():
283
 
            time.sleep(0.1)
284
 
            lf1.unlock()
285
 
        unlocker = Thread(target=wait_and_unlock)
286
 
        unlocker.start()
287
 
        try:
288
 
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
289
 
            before = time.time()
290
 
            # wait but don't lock
291
 
            lf2.wait(timeout=0.4, poll=0.1)
292
 
            after = time.time()
293
 
            self.assertTrue(after - before <= 1.0)
294
 
        finally:
295
 
            unlocker.join()
296
 
 
297
271
    def test_34_lock_write_waits(self):
298
272
        """LockDir.lock_write() will wait for the lock.""" 
299
273
        # the test suite sets the default to 0 to make deadlocks fail fast.
625
599
        os.mkdir(lock_path)
626
600
        osutils.make_readonly(lock_path)
627
601
        self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)
 
602
 
 
603
    def test_lock_by_token(self):
 
604
        ld1 = self.get_lock()
 
605
        token = ld1.lock_write()
 
606
        self.assertNotEqual(None, token)
 
607
        ld2 = self.get_lock()
 
608
        t2 = ld2.lock_write(token)
 
609
        self.assertEqual(token, t2)
 
610
 
 
611
    def test_lock_with_buggy_rename(self):
 
612
        # test that lock acquisition handles servers which pretend they
 
613
        # renamed correctly but that actually fail
 
614
        t = transport.get_transport('brokenrename+' + self.get_url())
 
615
        ld1 = LockDir(t, 'test_lock')
 
616
        ld1.create()
 
617
        ld1.attempt_lock()
 
618
        ld2 = LockDir(t, 'test_lock')
 
619
        # we should fail to lock
 
620
        e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
 
621
        # now the original caller should succeed in unlocking
 
622
        ld1.unlock()
 
623
        # and there should be nothing left over
 
624
        self.assertEquals([], t.list_dir('test_lock'))
 
625
 
 
626
    def test_failed_lock_leaves_no_trash(self):
 
627
        # if we fail to acquire the lock, we don't leave pending directories
 
628
        # behind -- https://bugs.launchpad.net/bzr/+bug/109169
 
629
        ld1 = self.get_lock()
 
630
        ld2 = self.get_lock()
 
631
        # should be nothing before we start
 
632
        ld1.create()
 
633
        t = self.get_transport().clone('test_lock')
 
634
        def check_dir(a):
 
635
            self.assertEquals(a, t.list_dir('.'))
 
636
        check_dir([])
 
637
        # when held, that's all we see
 
638
        ld1.attempt_lock()
 
639
        check_dir(['held'])
 
640
        # second guy should fail
 
641
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
 
642
        # no kibble
 
643
        check_dir(['held'])