~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-08-28 03:56:39 UTC
  • mfrom: (2747.1.1 revspec)
  • Revision ID: pqm@pqm.ubuntu.com-20070828035639-q7qmg7gafaevlvku
(Lukas Lalinsky) Accept ..\ as a path in revision specifiers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for bzrlib.counted_lock"""
18
18
 
19
19
from bzrlib.counted_lock import CountedLock
20
20
from bzrlib.errors import (
21
21
    LockError,
22
 
    LockNotHeld,
23
22
    ReadOnlyError,
24
 
    TokenMismatch,
25
23
    )
26
24
from bzrlib.tests import TestCase
27
25
 
41
39
        self._lock_mode = 'r'
42
40
        self._calls.append('lock_read')
43
41
 
44
 
    def lock_write(self, token=None):
45
 
        if token is not None:
46
 
            if token == 'token':
47
 
                # already held by this caller
48
 
                return 'token'
49
 
            else:
50
 
                raise TokenMismatch()
 
42
    def lock_write(self):
51
43
        self._assert_not_locked()
52
44
        self._lock_mode = 'w'
53
45
        self._calls.append('lock_write')
54
 
        return 'token'
55
46
 
56
47
    def unlock(self):
57
48
        self._assert_locked()
71
62
            raise LockError("%s is already locked in mode %r" %
72
63
                (self, self._lock_mode))
73
64
 
74
 
    def validate_token(self, token):
75
 
        if token == 'token':
76
 
            # already held by this caller
77
 
            return 'token'
78
 
        elif token is None:
79
 
            return
80
 
        else:
81
 
            raise TokenMismatch(token, 'token')
82
 
 
83
65
 
84
66
class TestDummyLock(TestCase):
85
67
 
107
89
        real_lock.unlock()
108
90
        self.assertFalse(real_lock.is_locked())
109
91
        # lock write and unlock
110
 
        result = real_lock.lock_write()
111
 
        self.assertEqual('token', result)
 
92
        real_lock.lock_write()
112
93
        self.assertTrue(real_lock.is_locked())
113
94
        real_lock.unlock()
114
95
        self.assertFalse(real_lock.is_locked())
129
110
 
130
111
class TestCountedLock(TestCase):
131
112
 
132
 
    def test_read_lock(self):
 
113
    def test_lock_unlock(self):
133
114
        # Lock and unlock a counted lock
134
115
        real_lock = DummyLock()
135
116
        l = CountedLock(real_lock)
150
131
    def test_unlock_not_locked(self):
151
132
        real_lock = DummyLock()
152
133
        l = CountedLock(real_lock)
153
 
        self.assertRaises(LockNotHeld, l.unlock)
 
134
        self.assertRaises(LockError, l.unlock)
154
135
 
155
136
    def test_read_lock_while_write_locked(self):
156
137
        real_lock = DummyLock()
157
138
        l = CountedLock(real_lock)
158
139
        l.lock_write()
159
140
        l.lock_read()
160
 
        self.assertEquals('token', l.lock_write())
 
141
        l.lock_write()
161
142
        l.unlock()
162
143
        l.unlock()
163
144
        l.unlock()
178
159
            ['lock_read', 'unlock'],
179
160
            real_lock._calls)
180
161
 
181
 
    def test_write_lock_reentrant(self):
182
 
        real_lock = DummyLock()
183
 
        l = CountedLock(real_lock)
184
 
        self.assertEqual('token', l.lock_write())
185
 
        self.assertEqual('token', l.lock_write())
186
 
        l.unlock()
187
 
        l.unlock()
188
 
 
189
 
    def test_reenter_with_token(self):
190
 
        real_lock = DummyLock()
191
 
        l1 = CountedLock(real_lock)
192
 
        l2 = CountedLock(real_lock)
193
 
        token = l1.lock_write()
194
 
        self.assertEqual('token', token)
195
 
        # now imagine that we lost that connection, but we still have the
196
 
        # token...
197
 
        del l1
198
 
        # because we can supply the token, we can acquire the lock through
199
 
        # another instance
200
 
        self.assertTrue(real_lock.is_locked())
201
 
        self.assertFalse(l2.is_locked())
202
 
        self.assertEqual(token, l2.lock_write(token=token))
203
 
        self.assertTrue(l2.is_locked())
204
 
        self.assertTrue(real_lock.is_locked())
205
 
        l2.unlock()
206
 
        self.assertFalse(l2.is_locked())
207
 
        self.assertFalse(real_lock.is_locked())
208
 
 
209
162
    def test_break_lock(self):
210
163
        real_lock = DummyLock()
211
164
        l = CountedLock(real_lock)
215
168
        l.break_lock()
216
169
        self.assertFalse(l.is_locked())
217
170
        self.assertFalse(real_lock.is_locked())
218
 
 
219
 
    # TODO: test get_physical_lock_status