~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-06-28 07:08:27 UTC
  • mfrom: (2553.1.3 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20070628070827-h5s313dg5tnag9vj
(robertc) Show the names of commit hooks during commit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
    errors,
28
28
    osutils,
29
29
    tests,
30
 
    transport,
31
30
    )
32
31
from bzrlib.errors import (
33
32
        LockBreakMismatch,
268
267
        self.assertEndsWith(args[3], ' ago')
269
268
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
270
269
 
 
270
    def test_33_wait(self):
 
271
        """Succeed when waiting on a lock that gets released
 
272
 
 
273
        The difference from test_32_lock_wait_succeed is that the second 
 
274
        caller does not actually acquire the lock, but just waits for it
 
275
        to be released.  This is done over a readonly transport.
 
276
        """
 
277
        t = self.get_transport()
 
278
        lf1 = LockDir(t, 'test_lock')
 
279
        lf1.create()
 
280
        lf1.attempt_lock()
 
281
 
 
282
        def wait_and_unlock():
 
283
            time.sleep(0.1)
 
284
            lf1.unlock()
 
285
        unlocker = Thread(target=wait_and_unlock)
 
286
        unlocker.start()
 
287
        try:
 
288
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
 
289
            before = time.time()
 
290
            # wait but don't lock
 
291
            lf2.wait(timeout=0.4, poll=0.1)
 
292
            after = time.time()
 
293
            self.assertTrue(after - before <= 1.0)
 
294
        finally:
 
295
            unlocker.join()
 
296
 
271
297
    def test_34_lock_write_waits(self):
272
298
        """LockDir.lock_write() will wait for the lock.""" 
273
299
        # the test suite sets the default to 0 to make deadlocks fail fast.
599
625
        os.mkdir(lock_path)
600
626
        osutils.make_readonly(lock_path)
601
627
        self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)
602
 
 
603
 
    def test_lock_by_token(self):
604
 
        ld1 = self.get_lock()
605
 
        token = ld1.lock_write()
606
 
        self.assertNotEqual(None, token)
607
 
        ld2 = self.get_lock()
608
 
        t2 = ld2.lock_write(token)
609
 
        self.assertEqual(token, t2)
610
 
 
611
 
    def test_lock_with_buggy_rename(self):
612
 
        # test that lock acquisition handles servers which pretend they
613
 
        # renamed correctly but that actually fail
614
 
        t = transport.get_transport('brokenrename+' + self.get_url())
615
 
        ld1 = LockDir(t, 'test_lock')
616
 
        ld1.create()
617
 
        ld1.attempt_lock()
618
 
        ld2 = LockDir(t, 'test_lock')
619
 
        # we should fail to lock
620
 
        e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
621
 
        # now the original caller should succeed in unlocking
622
 
        ld1.unlock()
623
 
        # and there should be nothing left over
624
 
        self.assertEquals([], t.list_dir('test_lock'))
625
 
 
626
 
    def test_failed_lock_leaves_no_trash(self):
627
 
        # if we fail to acquire the lock, we don't leave pending directories
628
 
        # behind -- https://bugs.launchpad.net/bzr/+bug/109169
629
 
        ld1 = self.get_lock()
630
 
        ld2 = self.get_lock()
631
 
        # should be nothing before we start
632
 
        ld1.create()
633
 
        t = self.get_transport().clone('test_lock')
634
 
        def check_dir(a):
635
 
            self.assertEquals(a, t.list_dir('.'))
636
 
        check_dir([])
637
 
        # when held, that's all we see
638
 
        ld1.attempt_lock()
639
 
        check_dir(['held'])
640
 
        # second guy should fail
641
 
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
642
 
        # no kibble
643
 
        check_dir(['held'])