1
# Copyright (C) 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for bzrlib.counted_lock"""
19
from bzrlib.counted_lock import CountedLock
20
from bzrlib.errors import (
26
from bzrlib.tests import TestCase
29
class DummyLock(object):
30
"""Lock that just records what's been done to it."""
34
self._lock_mode = None
37
return self._lock_mode is not None
40
self._assert_not_locked()
42
self._calls.append('lock_read')
44
def lock_write(self, token=None):
47
# already held by this caller
51
self._assert_not_locked()
53
self._calls.append('lock_write')
58
self._lock_mode = None
59
self._calls.append('unlock')
62
self._lock_mode = None
63
self._calls.append('break')
65
def _assert_locked(self):
66
if not self._lock_mode:
67
raise LockError("%s is not locked" % (self,))
69
def _assert_not_locked(self):
71
raise LockError("%s is already locked in mode %r" %
72
(self, self._lock_mode))
74
def validate_token(self, token):
76
# already held by this caller
81
raise TokenMismatch(token, 'token')
84
class TestDummyLock(TestCase):
86
def test_lock_initially_not_held(self):
88
self.assertFalse(l.is_locked())
90
def test_lock_not_reentrant(self):
91
# can't take the underlying lock twice
94
self.assertRaises(LockError, l.lock_read)
96
def test_detect_underlock(self):
98
self.assertRaises(LockError, l.unlock)
100
def test_basic_locking(self):
101
# dummy lock works like a basic non reentrant lock
102
real_lock = DummyLock()
103
self.assertFalse(real_lock.is_locked())
104
# lock read and unlock
105
real_lock.lock_read()
106
self.assertTrue(real_lock.is_locked())
108
self.assertFalse(real_lock.is_locked())
109
# lock write and unlock
110
result = real_lock.lock_write()
111
self.assertEqual('token', result)
112
self.assertTrue(real_lock.is_locked())
114
self.assertFalse(real_lock.is_locked())
117
['lock_read', 'unlock', 'lock_write', 'unlock'],
120
def test_break_lock(self):
124
self.assertFalse(l.is_locked())
126
['lock_write', 'break'],
130
class TestCountedLock(TestCase):
132
def test_read_lock(self):
133
# Lock and unlock a counted lock
134
real_lock = DummyLock()
135
l = CountedLock(real_lock)
136
self.assertFalse(l.is_locked())
137
# can lock twice, although this isn't allowed on the underlying lock
140
self.assertTrue(l.is_locked())
143
self.assertTrue(l.is_locked())
145
self.assertFalse(l.is_locked())
147
['lock_read', 'unlock'],
150
def test_unlock_not_locked(self):
151
real_lock = DummyLock()
152
l = CountedLock(real_lock)
153
self.assertRaises(LockNotHeld, l.unlock)
155
def test_read_lock_while_write_locked(self):
156
real_lock = DummyLock()
157
l = CountedLock(real_lock)
160
self.assertEquals('token', l.lock_write())
164
self.assertFalse(l.is_locked())
166
['lock_write', 'unlock'],
169
def test_write_lock_while_read_locked(self):
170
real_lock = DummyLock()
171
l = CountedLock(real_lock)
173
self.assertRaises(ReadOnlyError, l.lock_write)
174
self.assertRaises(ReadOnlyError, l.lock_write)
176
self.assertFalse(l.is_locked())
178
['lock_read', 'unlock'],
181
def test_write_lock_reentrant(self):
182
real_lock = DummyLock()
183
l = CountedLock(real_lock)
184
self.assertEqual('token', l.lock_write())
185
self.assertEqual('token', l.lock_write())
189
def test_reenter_with_token(self):
190
real_lock = DummyLock()
191
l1 = CountedLock(real_lock)
192
l2 = CountedLock(real_lock)
193
token = l1.lock_write()
194
self.assertEqual('token', token)
195
# now imagine that we lost that connection, but we still have the
198
# because we can supply the token, we can acquire the lock through
200
self.assertTrue(real_lock.is_locked())
201
self.assertFalse(l2.is_locked())
202
self.assertEqual(token, l2.lock_write(token=token))
203
self.assertTrue(l2.is_locked())
204
self.assertTrue(real_lock.is_locked())
206
self.assertFalse(l2.is_locked())
207
self.assertFalse(real_lock.is_locked())
209
def test_break_lock(self):
210
real_lock = DummyLock()
211
l = CountedLock(real_lock)
214
self.assertTrue(real_lock.is_locked())
216
self.assertFalse(l.is_locked())
217
self.assertFalse(real_lock.is_locked())
219
# TODO: test get_physical_lock_status