~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Alexander Belchenko
  • Date: 2007-01-30 23:05:35 UTC
  • mto: This revision was merged to the branch mainline in revision 2259.
  • Revision ID: bialix@ukr.net-20070130230535-kx1rd478rtigyc3v
standalone installer: win98 support

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for LockDir"""
18
18
 
19
19
from cStringIO import StringIO
20
 
import os
21
20
from threading import Thread, Lock
22
21
import time
23
22
 
24
23
import bzrlib
25
24
from bzrlib import (
26
25
    config,
27
 
    errors,
28
26
    osutils,
29
 
    tests,
30
 
    transport,
31
27
    )
32
28
from bzrlib.errors import (
33
 
    LockBreakMismatch,
34
 
    LockBroken,
35
 
    LockContention,
36
 
    LockError,
37
 
    LockFailed,
38
 
    LockNotHeld,
39
 
    )
40
 
from bzrlib.lockdir import LockDir
 
29
        LockBreakMismatch,
 
30
        LockContention, LockError, UnlockableTransport,
 
31
        LockNotHeld, LockBroken
 
32
        )
 
33
from bzrlib.lockdir import LockDir, _DEFAULT_TIMEOUT_SECONDS
41
34
from bzrlib.tests import TestCaseWithTransport
42
35
from bzrlib.trace import note
43
36
 
108
101
        """Fail to create lock on readonly transport"""
109
102
        t = self.get_readonly_transport()
110
103
        lf = LockDir(t, 'test_lock')
111
 
        self.assertRaises(LockFailed, lf.create)
 
104
        self.assertRaises(UnlockableTransport, lf.create)
112
105
 
113
106
    def test_12_lock_readonly_transport(self):
114
107
        """Fail to lock on readonly transport"""
115
108
        lf = LockDir(self.get_transport(), 'test_lock')
116
109
        lf.create()
117
110
        lf = LockDir(self.get_readonly_transport(), 'test_lock')
118
 
        self.assertRaises(LockFailed, lf.attempt_lock)
 
111
        self.assertRaises(UnlockableTransport, lf.attempt_lock)
119
112
 
120
113
    def test_20_lock_contested(self):
121
114
        """Contention to get a lock"""
224
217
        One thread holds on a lock and then releases it; another 
225
218
        tries to lock it.
226
219
        """
227
 
        # This test sometimes fails like this:
228
 
        # Traceback (most recent call last):
229
 
 
230
 
        #   File "/home/pqm/bzr-pqm-workdir/home/+trunk/bzrlib/tests/
231
 
        # test_lockdir.py", line 247, in test_32_lock_wait_succeed
232
 
        #     self.assertEqual(1, len(self._logged_reports))
233
 
        # AssertionError: not equal:
234
 
        # a = 1
235
 
        # b = 0
236
 
        raise tests.TestSkipped("Test fails intermittently")
237
220
        t = self.get_transport()
238
221
        lf1 = LockDir(t, 'test_lock')
239
222
        lf1.create()
271
254
        self.assertEndsWith(args[3], ' ago')
272
255
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
273
256
 
 
257
    def test_33_wait(self):
 
258
        """Succeed when waiting on a lock that gets released
 
259
 
 
260
        The difference from test_32_lock_wait_succeed is that the second 
 
261
        caller does not actually acquire the lock, but just waits for it
 
262
        to be released.  This is done over a readonly transport.
 
263
        """
 
264
        t = self.get_transport()
 
265
        lf1 = LockDir(t, 'test_lock')
 
266
        lf1.create()
 
267
        lf1.attempt_lock()
 
268
 
 
269
        def wait_and_unlock():
 
270
            time.sleep(0.1)
 
271
            lf1.unlock()
 
272
        unlocker = Thread(target=wait_and_unlock)
 
273
        unlocker.start()
 
274
        try:
 
275
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
 
276
            before = time.time()
 
277
            # wait but don't lock
 
278
            lf2.wait(timeout=0.4, poll=0.1)
 
279
            after = time.time()
 
280
            self.assertTrue(after - before <= 1.0)
 
281
        finally:
 
282
            unlocker.join()
 
283
 
274
284
    def test_34_lock_write_waits(self):
275
285
        """LockDir.lock_write() will wait for the lock.""" 
276
 
        # the test suite sets the default to 0 to make deadlocks fail fast.
277
 
        # change it for this test, as we want to try a manual deadlock.
278
 
        raise tests.TestSkipped('Timing-sensitive test')
279
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
280
286
        t = self.get_transport()
281
287
        lf1 = LockDir(t, 'test_lock')
282
288
        lf1.create()
398
404
        unlocker.start()
399
405
        try:
400
406
            # Wait and play against the other thread
401
 
            lf2.wait_lock(timeout=20.0, poll=0.01)
 
407
            lf2.wait_lock(timeout=1.0, poll=0.01)
402
408
        finally:
403
409
            unlocker.join()
404
410
        lf2.unlock()
594
600
        ld1.create()
595
601
        ld1.lock_write()
596
602
        ld1.unlock()
597
 
 
598
 
    def test_lock_permission(self):
599
 
        if not osutils.supports_posix_readonly():
600
 
            raise tests.TestSkipped('Cannot induce a permission failure')
601
 
        ld1 = self.get_lock()
602
 
        lock_path = ld1.transport.local_abspath('test_lock')
603
 
        os.mkdir(lock_path)
604
 
        osutils.make_readonly(lock_path)
605
 
        self.assertRaises(errors.LockFailed, ld1.attempt_lock)
606
 
 
607
 
    def test_lock_by_token(self):
608
 
        ld1 = self.get_lock()
609
 
        token = ld1.lock_write()
610
 
        self.assertNotEqual(None, token)
611
 
        ld2 = self.get_lock()
612
 
        t2 = ld2.lock_write(token)
613
 
        self.assertEqual(token, t2)
614
 
 
615
 
    def test_lock_with_buggy_rename(self):
616
 
        # test that lock acquisition handles servers which pretend they
617
 
        # renamed correctly but that actually fail
618
 
        t = transport.get_transport('brokenrename+' + self.get_url())
619
 
        ld1 = LockDir(t, 'test_lock')
620
 
        ld1.create()
621
 
        ld1.attempt_lock()
622
 
        ld2 = LockDir(t, 'test_lock')
623
 
        # we should fail to lock
624
 
        e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
625
 
        # now the original caller should succeed in unlocking
626
 
        ld1.unlock()
627
 
        # and there should be nothing left over
628
 
        self.assertEquals([], t.list_dir('test_lock'))
629
 
 
630
 
    def test_failed_lock_leaves_no_trash(self):
631
 
        # if we fail to acquire the lock, we don't leave pending directories
632
 
        # behind -- https://bugs.launchpad.net/bzr/+bug/109169
633
 
        ld1 = self.get_lock()
634
 
        ld2 = self.get_lock()
635
 
        # should be nothing before we start
636
 
        ld1.create()
637
 
        t = self.get_transport().clone('test_lock')
638
 
        def check_dir(a):
639
 
            self.assertEquals(a, t.list_dir('.'))
640
 
        check_dir([])
641
 
        # when held, that's all we see
642
 
        ld1.attempt_lock()
643
 
        check_dir(['held'])
644
 
        # second guy should fail
645
 
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
646
 
        # no kibble
647
 
        check_dir(['held'])