~bzr-pqm/bzr/bzr.dev

2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
1
# Copyright (C) 2007 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""Tests for bzrlib.counted_lock"""
18
19
from bzrlib.counted_lock import CountedLock
20
from bzrlib.errors import (
21
    LockError,
22
    ReadOnlyError,
23
    )
24
from bzrlib.tests import TestCase
25
26
27
class DummyLock(object):
28
    """Lock that just records what's been done to it."""
29
30
    def __init__(self):
31
        self._calls = []
32
        self._lock_mode = None
33
34
    def is_locked(self):
35
        return self._lock_mode is not None
36
37
    def lock_read(self):
38
        self._assert_not_locked()
39
        self._lock_mode = 'r'
40
        self._calls.append('lock_read')
41
42
    def lock_write(self):
43
        self._assert_not_locked()
44
        self._lock_mode = 'w'
45
        self._calls.append('lock_write')
46
47
    def unlock(self):
48
        self._assert_locked()
49
        self._lock_mode = None
50
        self._calls.append('unlock')
51
52
    def break_lock(self):
53
        self._lock_mode = None
54
        self._calls.append('break')
55
56
    def _assert_locked(self):
57
        if not self._lock_mode:
58
            raise LockError("%s is not locked" % (self,))
59
60
    def _assert_not_locked(self):
61
        if self._lock_mode:
62
            raise LockError("%s is already locked in mode %r" %
63
                (self, self._lock_mode))
64
65
66
class TestDummyLock(TestCase):
67
68
    def test_lock_initially_not_held(self):
69
        l = DummyLock()
70
        self.assertFalse(l.is_locked())
71
72
    def test_lock_not_reentrant(self):
73
        # can't take the underlying lock twice
74
        l = DummyLock()
75
        l.lock_read()
76
        self.assertRaises(LockError, l.lock_read)
77
78
    def test_detect_underlock(self):
79
        l = DummyLock()
80
        self.assertRaises(LockError, l.unlock)
81
82
    def test_basic_locking(self):
83
        # dummy lock works like a basic non reentrant lock
84
        real_lock = DummyLock()
85
        self.assertFalse(real_lock.is_locked())
86
        # lock read and unlock
87
        real_lock.lock_read()
88
        self.assertTrue(real_lock.is_locked())
89
        real_lock.unlock()
90
        self.assertFalse(real_lock.is_locked())
91
        # lock write and unlock
92
        real_lock.lock_write()
93
        self.assertTrue(real_lock.is_locked())
94
        real_lock.unlock()
95
        self.assertFalse(real_lock.is_locked())
96
        # check calls
97
        self.assertEqual(
98
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
99
            real_lock._calls)
100
101
    def test_break_lock(self):
102
        l = DummyLock()
103
        l.lock_write()
104
        l.break_lock()
105
        self.assertFalse(l.is_locked())
106
        self.assertEqual(
107
            ['lock_write', 'break'],
108
            l._calls)
109
110
111
class TestCountedLock(TestCase):
112
113
    def test_lock_unlock(self):
114
        # Lock and unlock a counted lock
115
        real_lock = DummyLock()
116
        l = CountedLock(real_lock)
117
        self.assertFalse(l.is_locked())
118
        # can lock twice, although this isn't allowed on the underlying lock
119
        l.lock_read()
120
        l.lock_read()
121
        self.assertTrue(l.is_locked())
122
        # and release
123
        l.unlock()
124
        self.assertTrue(l.is_locked())
125
        l.unlock()
126
        self.assertFalse(l.is_locked())
127
        self.assertEquals(
128
            ['lock_read', 'unlock'],
129
            real_lock._calls)
130
131
    def test_unlock_not_locked(self):
132
        real_lock = DummyLock()
133
        l = CountedLock(real_lock)
134
        self.assertRaises(LockError, l.unlock)
135
136
    def test_read_lock_while_write_locked(self):
137
        real_lock = DummyLock()
138
        l = CountedLock(real_lock)
139
        l.lock_write()
140
        l.lock_read()
141
        l.lock_write()
142
        l.unlock()
143
        l.unlock()
144
        l.unlock()
145
        self.assertFalse(l.is_locked())
146
        self.assertEquals(
147
            ['lock_write', 'unlock'],
148
            real_lock._calls)
149
150
    def test_write_lock_while_read_locked(self):
151
        real_lock = DummyLock()
152
        l = CountedLock(real_lock)
153
        l.lock_read()
154
        self.assertRaises(ReadOnlyError, l.lock_write)
155
        self.assertRaises(ReadOnlyError, l.lock_write)
156
        l.unlock()
157
        self.assertFalse(l.is_locked())
158
        self.assertEquals(
159
            ['lock_read', 'unlock'],
160
            real_lock._calls)
161
162
    def test_break_lock(self):
163
        real_lock = DummyLock()
164
        l = CountedLock(real_lock)
165
        l.lock_write()
166
        l.lock_write()
167
        self.assertTrue(real_lock.is_locked())
168
        l.break_lock()
169
        self.assertFalse(l.is_locked())
170
        self.assertFalse(real_lock.is_locked())