~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Martin Pool
  • Date: 2005-07-21 21:32:13 UTC
  • Revision ID: mbp@sourcefrog.net-20050721213213-c6ac0e8b06eaad0f
- bzr update-hashes shows some stats on what it did

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2008 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for bzrlib.counted_lock"""
18
 
 
19
 
from bzrlib.counted_lock import CountedLock
20
 
from bzrlib.errors import (
21
 
    LockError,
22
 
    LockNotHeld,
23
 
    ReadOnlyError,
24
 
    TokenMismatch,
25
 
    )
26
 
from bzrlib.tests import TestCase
27
 
 
28
 
 
29
 
class DummyLock(object):
30
 
    """Lock that just records what's been done to it."""
31
 
 
32
 
    def __init__(self):
33
 
        self._calls = []
34
 
        self._lock_mode = None
35
 
 
36
 
    def is_locked(self):
37
 
        return self._lock_mode is not None
38
 
 
39
 
    def lock_read(self):
40
 
        self._assert_not_locked()
41
 
        self._lock_mode = 'r'
42
 
        self._calls.append('lock_read')
43
 
 
44
 
    def lock_write(self, token=None):
45
 
        if token not in (None, 'token'):
46
 
            raise TokenMismatch(token, 'token')
47
 
        self._assert_not_locked()
48
 
        self._lock_mode = 'w'
49
 
        self._calls.append('lock_write')
50
 
        return 'token'
51
 
 
52
 
    def unlock(self):
53
 
        self._assert_locked()
54
 
        self._lock_mode = None
55
 
        self._calls.append('unlock')
56
 
 
57
 
    def break_lock(self):
58
 
        self._lock_mode = None
59
 
        self._calls.append('break')
60
 
 
61
 
    def _assert_locked(self):
62
 
        if not self._lock_mode:
63
 
            raise LockError("%s is not locked" % (self,))
64
 
 
65
 
    def _assert_not_locked(self):
66
 
        if self._lock_mode:
67
 
            raise LockError("%s is already locked in mode %r" %
68
 
                (self, self._lock_mode))
69
 
 
70
 
    def validate_token(self, token):
71
 
        if token == 'token':
72
 
            # already held by this caller
73
 
            return 'token'
74
 
        elif token is None:
75
 
            return
76
 
        else:
77
 
            raise TokenMismatch(token, 'token')
78
 
 
79
 
 
80
 
class TestDummyLock(TestCase):
81
 
 
82
 
    def test_lock_initially_not_held(self):
83
 
        l = DummyLock()
84
 
        self.assertFalse(l.is_locked())
85
 
 
86
 
    def test_lock_not_reentrant(self):
87
 
        # can't take the underlying lock twice
88
 
        l = DummyLock()
89
 
        l.lock_read()
90
 
        self.assertRaises(LockError, l.lock_read)
91
 
 
92
 
    def test_detect_underlock(self):
93
 
        l = DummyLock()
94
 
        self.assertRaises(LockError, l.unlock)
95
 
 
96
 
    def test_basic_locking(self):
97
 
        # dummy lock works like a basic non reentrant lock
98
 
        real_lock = DummyLock()
99
 
        self.assertFalse(real_lock.is_locked())
100
 
        # lock read and unlock
101
 
        real_lock.lock_read()
102
 
        self.assertTrue(real_lock.is_locked())
103
 
        real_lock.unlock()
104
 
        self.assertFalse(real_lock.is_locked())
105
 
        # lock write and unlock
106
 
        real_lock.lock_write()
107
 
        self.assertTrue(real_lock.is_locked())
108
 
        real_lock.unlock()
109
 
        self.assertFalse(real_lock.is_locked())
110
 
        # check calls
111
 
        self.assertEqual(
112
 
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
113
 
            real_lock._calls)
114
 
 
115
 
    def test_break_lock(self):
116
 
        l = DummyLock()
117
 
        l.lock_write()
118
 
        l.break_lock()
119
 
        self.assertFalse(l.is_locked())
120
 
        self.assertEqual(
121
 
            ['lock_write', 'break'],
122
 
            l._calls)
123
 
 
124
 
 
125
 
class TestCountedLock(TestCase):
126
 
 
127
 
    def test_lock_unlock(self):
128
 
        # Lock and unlock a counted lock
129
 
        real_lock = DummyLock()
130
 
        l = CountedLock(real_lock)
131
 
        self.assertFalse(l.is_locked())
132
 
        # can lock twice, although this isn't allowed on the underlying lock
133
 
        l.lock_read()
134
 
        l.lock_read()
135
 
        self.assertTrue(l.is_locked())
136
 
        # and release
137
 
        l.unlock()
138
 
        self.assertTrue(l.is_locked())
139
 
        l.unlock()
140
 
        self.assertFalse(l.is_locked())
141
 
        self.assertEquals(
142
 
            ['lock_read', 'unlock'],
143
 
            real_lock._calls)
144
 
 
145
 
    def test_unlock_not_locked(self):
146
 
        real_lock = DummyLock()
147
 
        l = CountedLock(real_lock)
148
 
        self.assertRaises(LockNotHeld, l.unlock)
149
 
 
150
 
    def test_read_lock_while_write_locked(self):
151
 
        real_lock = DummyLock()
152
 
        l = CountedLock(real_lock)
153
 
        l.lock_write()
154
 
        l.lock_read()
155
 
        l.lock_write()
156
 
        l.unlock()
157
 
        l.unlock()
158
 
        l.unlock()
159
 
        self.assertFalse(l.is_locked())
160
 
        self.assertEquals(
161
 
            ['lock_write', 'unlock'],
162
 
            real_lock._calls)
163
 
 
164
 
    def test_write_lock_while_read_locked(self):
165
 
        real_lock = DummyLock()
166
 
        l = CountedLock(real_lock)
167
 
        l.lock_read()
168
 
        self.assertRaises(ReadOnlyError, l.lock_write)
169
 
        self.assertRaises(ReadOnlyError, l.lock_write)
170
 
        l.unlock()
171
 
        self.assertFalse(l.is_locked())
172
 
        self.assertEquals(
173
 
            ['lock_read', 'unlock'],
174
 
            real_lock._calls)
175
 
 
176
 
    def test_break_lock(self):
177
 
        real_lock = DummyLock()
178
 
        l = CountedLock(real_lock)
179
 
        l.lock_write()
180
 
        l.lock_write()
181
 
        self.assertTrue(real_lock.is_locked())
182
 
        l.break_lock()
183
 
        self.assertFalse(l.is_locked())
184
 
        self.assertFalse(real_lock.is_locked())