~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-01-02 08:23:44 UTC
  • mfrom: (3140.1.9 find-branches)
  • Revision ID: pqm@pqm.ubuntu.com-20080102082344-qret383z2bdk1ud4
Optimize find_branches for standalone repositories (abentley)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
27
27
    errors,
28
28
    osutils,
29
29
    tests,
 
30
    transport,
30
31
    )
31
32
from bzrlib.errors import (
32
 
        LockBreakMismatch,
33
 
        LockContention, LockError, UnlockableTransport,
34
 
        LockNotHeld, LockBroken
35
 
        )
 
33
    LockBreakMismatch,
 
34
    LockBroken,
 
35
    LockContention,
 
36
    LockError,
 
37
    LockFailed,
 
38
    LockNotHeld,
 
39
    )
36
40
from bzrlib.lockdir import LockDir
37
41
from bzrlib.tests import TestCaseWithTransport
38
42
from bzrlib.trace import note
104
108
        """Fail to create lock on readonly transport"""
105
109
        t = self.get_readonly_transport()
106
110
        lf = LockDir(t, 'test_lock')
107
 
        self.assertRaises(UnlockableTransport, lf.create)
 
111
        self.assertRaises(LockFailed, lf.create)
108
112
 
109
113
    def test_12_lock_readonly_transport(self):
110
114
        """Fail to lock on readonly transport"""
111
115
        lf = LockDir(self.get_transport(), 'test_lock')
112
116
        lf.create()
113
117
        lf = LockDir(self.get_readonly_transport(), 'test_lock')
114
 
        self.assertRaises(UnlockableTransport, lf.attempt_lock)
 
118
        self.assertRaises(LockFailed, lf.attempt_lock)
115
119
 
116
120
    def test_20_lock_contested(self):
117
121
        """Contention to get a lock"""
267
271
        self.assertEndsWith(args[3], ' ago')
268
272
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
269
273
 
270
 
    def test_33_wait(self):
271
 
        """Succeed when waiting on a lock that gets released
272
 
 
273
 
        The difference from test_32_lock_wait_succeed is that the second 
274
 
        caller does not actually acquire the lock, but just waits for it
275
 
        to be released.  This is done over a readonly transport.
276
 
        """
277
 
        t = self.get_transport()
278
 
        lf1 = LockDir(t, 'test_lock')
279
 
        lf1.create()
280
 
        lf1.attempt_lock()
281
 
 
282
 
        def wait_and_unlock():
283
 
            time.sleep(0.1)
284
 
            lf1.unlock()
285
 
        unlocker = Thread(target=wait_and_unlock)
286
 
        unlocker.start()
287
 
        try:
288
 
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
289
 
            before = time.time()
290
 
            # wait but don't lock
291
 
            lf2.wait(timeout=0.4, poll=0.1)
292
 
            after = time.time()
293
 
            self.assertTrue(after - before <= 1.0)
294
 
        finally:
295
 
            unlocker.join()
296
 
 
297
274
    def test_34_lock_write_waits(self):
298
275
        """LockDir.lock_write() will wait for the lock.""" 
299
276
        # the test suite sets the default to 0 to make deadlocks fail fast.
300
277
        # change it for this test, as we want to try a manual deadlock.
 
278
        raise tests.TestSkipped('Timing-sensitive test')
301
279
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
302
280
        t = self.get_transport()
303
281
        lf1 = LockDir(t, 'test_lock')
420
398
        unlocker.start()
421
399
        try:
422
400
            # Wait and play against the other thread
423
 
            lf2.wait_lock(timeout=1.0, poll=0.01)
 
401
            lf2.wait_lock(timeout=20.0, poll=0.01)
424
402
        finally:
425
403
            unlocker.join()
426
404
        lf2.unlock()
624
602
        lock_path = ld1.transport.local_abspath('test_lock')
625
603
        os.mkdir(lock_path)
626
604
        osutils.make_readonly(lock_path)
627
 
        self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)
 
605
        self.assertRaises(errors.LockFailed, ld1.attempt_lock)
 
606
 
 
607
    def test_lock_by_token(self):
 
608
        ld1 = self.get_lock()
 
609
        token = ld1.lock_write()
 
610
        self.assertNotEqual(None, token)
 
611
        ld2 = self.get_lock()
 
612
        t2 = ld2.lock_write(token)
 
613
        self.assertEqual(token, t2)
 
614
 
 
615
    def test_lock_with_buggy_rename(self):
 
616
        # test that lock acquisition handles servers which pretend they
 
617
        # renamed correctly but that actually fail
 
618
        t = transport.get_transport('brokenrename+' + self.get_url())
 
619
        ld1 = LockDir(t, 'test_lock')
 
620
        ld1.create()
 
621
        ld1.attempt_lock()
 
622
        ld2 = LockDir(t, 'test_lock')
 
623
        # we should fail to lock
 
624
        e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
 
625
        # now the original caller should succeed in unlocking
 
626
        ld1.unlock()
 
627
        # and there should be nothing left over
 
628
        self.assertEquals([], t.list_dir('test_lock'))
 
629
 
 
630
    def test_failed_lock_leaves_no_trash(self):
 
631
        # if we fail to acquire the lock, we don't leave pending directories
 
632
        # behind -- https://bugs.launchpad.net/bzr/+bug/109169
 
633
        ld1 = self.get_lock()
 
634
        ld2 = self.get_lock()
 
635
        # should be nothing before we start
 
636
        ld1.create()
 
637
        t = self.get_transport().clone('test_lock')
 
638
        def check_dir(a):
 
639
            self.assertEquals(a, t.list_dir('.'))
 
640
        check_dir([])
 
641
        # when held, that's all we see
 
642
        ld1.attempt_lock()
 
643
        check_dir(['held'])
 
644
        # second guy should fail
 
645
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
 
646
        # no kibble
 
647
        check_dir(['held'])