1
# Copyright (C) 2006 Canonical Ltd
1
# Copyright (C) 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
32
from bzrlib.errors import (
33
LockContention, LockError, UnlockableTransport,
34
LockNotHeld, LockBroken
36
40
from bzrlib.lockdir import LockDir
37
41
from bzrlib.tests import TestCaseWithTransport
38
42
from bzrlib.trace import note
104
108
"""Fail to create lock on readonly transport"""
105
109
t = self.get_readonly_transport()
106
110
lf = LockDir(t, 'test_lock')
107
self.assertRaises(UnlockableTransport, lf.create)
111
self.assertRaises(LockFailed, lf.create)
109
113
def test_12_lock_readonly_transport(self):
110
114
"""Fail to lock on readonly transport"""
111
115
lf = LockDir(self.get_transport(), 'test_lock')
113
117
lf = LockDir(self.get_readonly_transport(), 'test_lock')
114
self.assertRaises(UnlockableTransport, lf.attempt_lock)
118
self.assertRaises(LockFailed, lf.attempt_lock)
116
120
def test_20_lock_contested(self):
117
121
"""Contention to get a lock"""
267
271
self.assertEndsWith(args[3], ' ago')
268
272
self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
270
def test_33_wait(self):
271
"""Succeed when waiting on a lock that gets released
273
The difference from test_32_lock_wait_succeed is that the second
274
caller does not actually acquire the lock, but just waits for it
275
to be released. This is done over a readonly transport.
277
t = self.get_transport()
278
lf1 = LockDir(t, 'test_lock')
282
def wait_and_unlock():
285
unlocker = Thread(target=wait_and_unlock)
288
lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
290
# wait but don't lock
291
lf2.wait(timeout=0.4, poll=0.1)
293
self.assertTrue(after - before <= 1.0)
297
274
def test_34_lock_write_waits(self):
298
275
"""LockDir.lock_write() will wait for the lock."""
299
276
# the test suite sets the default to 0 to make deadlocks fail fast.
300
277
# change it for this test, as we want to try a manual deadlock.
278
raise tests.TestSkipped('Timing-sensitive test')
301
279
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
302
280
t = self.get_transport()
303
281
lf1 = LockDir(t, 'test_lock')
624
602
lock_path = ld1.transport.local_abspath('test_lock')
625
603
os.mkdir(lock_path)
626
604
osutils.make_readonly(lock_path)
627
self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)
605
self.assertRaises(errors.LockFailed, ld1.attempt_lock)
607
def test_lock_by_token(self):
608
ld1 = self.get_lock()
609
token = ld1.lock_write()
610
self.assertNotEqual(None, token)
611
ld2 = self.get_lock()
612
t2 = ld2.lock_write(token)
613
self.assertEqual(token, t2)
615
def test_lock_with_buggy_rename(self):
616
# test that lock acquisition handles servers which pretend they
617
# renamed correctly but that actually fail
618
t = transport.get_transport('brokenrename+' + self.get_url())
619
ld1 = LockDir(t, 'test_lock')
622
ld2 = LockDir(t, 'test_lock')
623
# we should fail to lock
624
e = self.assertRaises(errors.LockContention, ld2.attempt_lock)
625
# now the original caller should succeed in unlocking
627
# and there should be nothing left over
628
self.assertEquals([], t.list_dir('test_lock'))
630
def test_failed_lock_leaves_no_trash(self):
631
# if we fail to acquire the lock, we don't leave pending directories
632
# behind -- https://bugs.launchpad.net/bzr/+bug/109169
633
ld1 = self.get_lock()
634
ld2 = self.get_lock()
635
# should be nothing before we start
637
t = self.get_transport().clone('test_lock')
639
self.assertEquals(a, t.list_dir('.'))
641
# when held, that's all we see
644
# second guy should fail
645
self.assertRaises(errors.LockContention, ld2.attempt_lock)