2475.4.1
by Martin Pool
Start adding CountedLock class to partially replace LockableFiles |
1 |
# Copyright (C) 2007 Canonical Ltd
|
2 |
#
|
|
3 |
# This program is free software; you can redistribute it and/or modify
|
|
4 |
# it under the terms of the GNU General Public License as published by
|
|
5 |
# the Free Software Foundation; either version 2 of the License, or
|
|
6 |
# (at your option) any later version.
|
|
7 |
#
|
|
8 |
# This program is distributed in the hope that it will be useful,
|
|
9 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
10 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
11 |
# GNU General Public License for more details.
|
|
12 |
#
|
|
13 |
# You should have received a copy of the GNU General Public License
|
|
14 |
# along with this program; if not, write to the Free Software
|
|
15 |
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
16 |
||
17 |
"""Tests for bzrlib.counted_lock"""
|
|
18 |
||
19 |
from bzrlib.counted_lock import CountedLock |
|
20 |
from bzrlib.errors import ( |
|
21 |
LockError, |
|
22 |
ReadOnlyError, |
|
23 |
)
|
|
24 |
from bzrlib.tests import TestCase |
|
25 |
||
26 |
||
27 |
class DummyLock(object): |
|
28 |
"""Lock that just records what's been done to it."""
|
|
29 |
||
30 |
def __init__(self): |
|
31 |
self._calls = [] |
|
32 |
self._lock_mode = None |
|
33 |
||
34 |
def is_locked(self): |
|
35 |
return self._lock_mode is not None |
|
36 |
||
37 |
def lock_read(self): |
|
38 |
self._assert_not_locked() |
|
39 |
self._lock_mode = 'r' |
|
40 |
self._calls.append('lock_read') |
|
41 |
||
42 |
def lock_write(self): |
|
43 |
self._assert_not_locked() |
|
44 |
self._lock_mode = 'w' |
|
45 |
self._calls.append('lock_write') |
|
46 |
||
47 |
def unlock(self): |
|
48 |
self._assert_locked() |
|
49 |
self._lock_mode = None |
|
50 |
self._calls.append('unlock') |
|
51 |
||
52 |
def break_lock(self): |
|
53 |
self._lock_mode = None |
|
54 |
self._calls.append('break') |
|
55 |
||
56 |
def _assert_locked(self): |
|
57 |
if not self._lock_mode: |
|
58 |
raise LockError("%s is not locked" % (self,)) |
|
59 |
||
60 |
def _assert_not_locked(self): |
|
61 |
if self._lock_mode: |
|
62 |
raise LockError("%s is already locked in mode %r" % |
|
63 |
(self, self._lock_mode)) |
|
64 |
||
65 |
||
66 |
class TestDummyLock(TestCase): |
|
67 |
||
68 |
def test_lock_initially_not_held(self): |
|
69 |
l = DummyLock() |
|
70 |
self.assertFalse(l.is_locked()) |
|
71 |
||
72 |
def test_lock_not_reentrant(self): |
|
73 |
# can't take the underlying lock twice
|
|
74 |
l = DummyLock() |
|
75 |
l.lock_read() |
|
76 |
self.assertRaises(LockError, l.lock_read) |
|
77 |
||
78 |
def test_detect_underlock(self): |
|
79 |
l = DummyLock() |
|
80 |
self.assertRaises(LockError, l.unlock) |
|
81 |
||
82 |
def test_basic_locking(self): |
|
83 |
# dummy lock works like a basic non reentrant lock
|
|
84 |
real_lock = DummyLock() |
|
85 |
self.assertFalse(real_lock.is_locked()) |
|
86 |
# lock read and unlock
|
|
87 |
real_lock.lock_read() |
|
88 |
self.assertTrue(real_lock.is_locked()) |
|
89 |
real_lock.unlock() |
|
90 |
self.assertFalse(real_lock.is_locked()) |
|
91 |
# lock write and unlock
|
|
92 |
real_lock.lock_write() |
|
93 |
self.assertTrue(real_lock.is_locked()) |
|
94 |
real_lock.unlock() |
|
95 |
self.assertFalse(real_lock.is_locked()) |
|
96 |
# check calls
|
|
97 |
self.assertEqual( |
|
98 |
['lock_read', 'unlock', 'lock_write', 'unlock'], |
|
99 |
real_lock._calls) |
|
100 |
||
101 |
def test_break_lock(self): |
|
102 |
l = DummyLock() |
|
103 |
l.lock_write() |
|
104 |
l.break_lock() |
|
105 |
self.assertFalse(l.is_locked()) |
|
106 |
self.assertEqual( |
|
107 |
['lock_write', 'break'], |
|
108 |
l._calls) |
|
109 |
||
110 |
||
111 |
class TestCountedLock(TestCase): |
|
112 |
||
113 |
def test_lock_unlock(self): |
|
114 |
# Lock and unlock a counted lock
|
|
115 |
real_lock = DummyLock() |
|
116 |
l = CountedLock(real_lock) |
|
117 |
self.assertFalse(l.is_locked()) |
|
118 |
# can lock twice, although this isn't allowed on the underlying lock
|
|
119 |
l.lock_read() |
|
120 |
l.lock_read() |
|
121 |
self.assertTrue(l.is_locked()) |
|
122 |
# and release
|
|
123 |
l.unlock() |
|
124 |
self.assertTrue(l.is_locked()) |
|
125 |
l.unlock() |
|
126 |
self.assertFalse(l.is_locked()) |
|
127 |
self.assertEquals( |
|
128 |
['lock_read', 'unlock'], |
|
129 |
real_lock._calls) |
|
130 |
||
131 |
def test_unlock_not_locked(self): |
|
132 |
real_lock = DummyLock() |
|
133 |
l = CountedLock(real_lock) |
|
134 |
self.assertRaises(LockError, l.unlock) |
|
135 |
||
136 |
def test_read_lock_while_write_locked(self): |
|
137 |
real_lock = DummyLock() |
|
138 |
l = CountedLock(real_lock) |
|
139 |
l.lock_write() |
|
140 |
l.lock_read() |
|
141 |
l.lock_write() |
|
142 |
l.unlock() |
|
143 |
l.unlock() |
|
144 |
l.unlock() |
|
145 |
self.assertFalse(l.is_locked()) |
|
146 |
self.assertEquals( |
|
147 |
['lock_write', 'unlock'], |
|
148 |
real_lock._calls) |
|
149 |
||
150 |
def test_write_lock_while_read_locked(self): |
|
151 |
real_lock = DummyLock() |
|
152 |
l = CountedLock(real_lock) |
|
153 |
l.lock_read() |
|
154 |
self.assertRaises(ReadOnlyError, l.lock_write) |
|
155 |
self.assertRaises(ReadOnlyError, l.lock_write) |
|
156 |
l.unlock() |
|
157 |
self.assertFalse(l.is_locked()) |
|
158 |
self.assertEquals( |
|
159 |
['lock_read', 'unlock'], |
|
160 |
real_lock._calls) |
|
161 |
||
162 |
def test_break_lock(self): |
|
163 |
real_lock = DummyLock() |
|
164 |
l = CountedLock(real_lock) |
|
165 |
l.lock_write() |
|
166 |
l.lock_write() |
|
167 |
self.assertTrue(real_lock.is_locked()) |
|
168 |
l.break_lock() |
|
169 |
self.assertFalse(l.is_locked()) |
|
170 |
self.assertFalse(real_lock.is_locked()) |