1
# Copyright (C) 2007, 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.counted_lock"""
19
from bzrlib.counted_lock import CountedLock
20
from bzrlib.errors import (
26
from bzrlib.tests import TestCase
29
class DummyLock(object):
30
"""Lock that just records what's been done to it."""
34
self._lock_mode = None
37
return self._lock_mode is not None
40
self._assert_not_locked()
42
self._calls.append('lock_read')
44
def lock_write(self, token=None):
45
if token not in (None, 'token'):
46
raise TokenMismatch(token, 'token')
47
self._assert_not_locked()
49
self._calls.append('lock_write')
54
self._lock_mode = None
55
self._calls.append('unlock')
58
self._lock_mode = None
59
self._calls.append('break')
61
def _assert_locked(self):
62
if not self._lock_mode:
63
raise LockError("%s is not locked" % (self,))
65
def _assert_not_locked(self):
67
raise LockError("%s is already locked in mode %r" %
68
(self, self._lock_mode))
70
def validate_token(self, token):
72
# already held by this caller
77
raise TokenMismatch(token, 'token')
80
class TestDummyLock(TestCase):
82
def test_lock_initially_not_held(self):
84
self.assertFalse(l.is_locked())
86
def test_lock_not_reentrant(self):
87
# can't take the underlying lock twice
90
self.assertRaises(LockError, l.lock_read)
92
def test_detect_underlock(self):
94
self.assertRaises(LockError, l.unlock)
96
def test_basic_locking(self):
97
# dummy lock works like a basic non reentrant lock
98
real_lock = DummyLock()
99
self.assertFalse(real_lock.is_locked())
100
# lock read and unlock
101
real_lock.lock_read()
102
self.assertTrue(real_lock.is_locked())
104
self.assertFalse(real_lock.is_locked())
105
# lock write and unlock
106
real_lock.lock_write()
107
self.assertTrue(real_lock.is_locked())
109
self.assertFalse(real_lock.is_locked())
112
['lock_read', 'unlock', 'lock_write', 'unlock'],
115
def test_break_lock(self):
119
self.assertFalse(l.is_locked())
121
['lock_write', 'break'],
125
class TestCountedLock(TestCase):
127
def test_lock_unlock(self):
128
# Lock and unlock a counted lock
129
real_lock = DummyLock()
130
l = CountedLock(real_lock)
131
self.assertFalse(l.is_locked())
132
# can lock twice, although this isn't allowed on the underlying lock
135
self.assertTrue(l.is_locked())
138
self.assertTrue(l.is_locked())
140
self.assertFalse(l.is_locked())
142
['lock_read', 'unlock'],
145
def test_unlock_not_locked(self):
146
real_lock = DummyLock()
147
l = CountedLock(real_lock)
148
self.assertRaises(LockNotHeld, l.unlock)
150
def test_read_lock_while_write_locked(self):
151
real_lock = DummyLock()
152
l = CountedLock(real_lock)
159
self.assertFalse(l.is_locked())
161
['lock_write', 'unlock'],
164
def test_write_lock_while_read_locked(self):
165
real_lock = DummyLock()
166
l = CountedLock(real_lock)
168
self.assertRaises(ReadOnlyError, l.lock_write)
169
self.assertRaises(ReadOnlyError, l.lock_write)
171
self.assertFalse(l.is_locked())
173
['lock_read', 'unlock'],
176
def test_break_lock(self):
177
real_lock = DummyLock()
178
l = CountedLock(real_lock)
181
self.assertTrue(real_lock.is_locked())
183
self.assertFalse(l.is_locked())
184
self.assertFalse(real_lock.is_locked())