1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.counted_lock"""
19
from bzrlib.counted_lock import CountedLock
20
from bzrlib.errors import (
24
from bzrlib.tests import TestCase
27
class DummyLock(object):
28
"""Lock that just records what's been done to it."""
32
self._lock_mode = None
35
return self._lock_mode is not None
38
self._assert_not_locked()
40
self._calls.append('lock_read')
43
self._assert_not_locked()
45
self._calls.append('lock_write')
49
self._lock_mode = None
50
self._calls.append('unlock')
53
self._lock_mode = None
54
self._calls.append('break')
56
def _assert_locked(self):
57
if not self._lock_mode:
58
raise LockError("%s is not locked" % (self,))
60
def _assert_not_locked(self):
62
raise LockError("%s is already locked in mode %r" %
63
(self, self._lock_mode))
66
class TestDummyLock(TestCase):
68
def test_lock_initially_not_held(self):
70
self.assertFalse(l.is_locked())
72
def test_lock_not_reentrant(self):
73
# can't take the underlying lock twice
76
self.assertRaises(LockError, l.lock_read)
78
def test_detect_underlock(self):
80
self.assertRaises(LockError, l.unlock)
82
def test_basic_locking(self):
83
# dummy lock works like a basic non reentrant lock
84
real_lock = DummyLock()
85
self.assertFalse(real_lock.is_locked())
86
# lock read and unlock
88
self.assertTrue(real_lock.is_locked())
90
self.assertFalse(real_lock.is_locked())
91
# lock write and unlock
92
real_lock.lock_write()
93
self.assertTrue(real_lock.is_locked())
95
self.assertFalse(real_lock.is_locked())
98
['lock_read', 'unlock', 'lock_write', 'unlock'],
101
def test_break_lock(self):
105
self.assertFalse(l.is_locked())
107
['lock_write', 'break'],
111
class TestCountedLock(TestCase):
113
def test_lock_unlock(self):
114
# Lock and unlock a counted lock
115
real_lock = DummyLock()
116
l = CountedLock(real_lock)
117
self.assertFalse(l.is_locked())
118
# can lock twice, although this isn't allowed on the underlying lock
121
self.assertTrue(l.is_locked())
124
self.assertTrue(l.is_locked())
126
self.assertFalse(l.is_locked())
128
['lock_read', 'unlock'],
131
def test_unlock_not_locked(self):
132
real_lock = DummyLock()
133
l = CountedLock(real_lock)
134
self.assertRaises(LockError, l.unlock)
136
def test_read_lock_while_write_locked(self):
137
real_lock = DummyLock()
138
l = CountedLock(real_lock)
145
self.assertFalse(l.is_locked())
147
['lock_write', 'unlock'],
150
def test_write_lock_while_read_locked(self):
151
real_lock = DummyLock()
152
l = CountedLock(real_lock)
154
self.assertRaises(ReadOnlyError, l.lock_write)
155
self.assertRaises(ReadOnlyError, l.lock_write)
157
self.assertFalse(l.is_locked())
159
['lock_read', 'unlock'],
162
def test_break_lock(self):
163
real_lock = DummyLock()
164
l = CountedLock(real_lock)
167
self.assertTrue(real_lock.is_locked())
169
self.assertFalse(l.is_locked())
170
self.assertFalse(real_lock.is_locked())