~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Robert Collins
  • Date: 2009-07-07 04:32:13 UTC
  • mto: This revision was merged to the branch mainline in revision 4524.
  • Revision ID: robertc@robertcollins.net-20090707043213-4hjjhgr40iq7gk2d
More informative assertions in xml serialisation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007 Canonical Ltd
 
1
# Copyright (C) 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for bzrlib.counted_lock"""
18
18
 
19
19
from bzrlib.counted_lock import CountedLock
20
20
from bzrlib.errors import (
21
21
    LockError,
 
22
    LockNotHeld,
22
23
    ReadOnlyError,
 
24
    TokenMismatch,
23
25
    )
24
26
from bzrlib.tests import TestCase
25
27
 
39
41
        self._lock_mode = 'r'
40
42
        self._calls.append('lock_read')
41
43
 
42
 
    def lock_write(self):
 
44
    def lock_write(self, token=None):
 
45
        if token is not None:
 
46
            if token == 'token':
 
47
                # already held by this caller
 
48
                return 'token'
 
49
            else:
 
50
                raise TokenMismatch()
43
51
        self._assert_not_locked()
44
52
        self._lock_mode = 'w'
45
53
        self._calls.append('lock_write')
 
54
        return 'token'
46
55
 
47
56
    def unlock(self):
48
57
        self._assert_locked()
62
71
            raise LockError("%s is already locked in mode %r" %
63
72
                (self, self._lock_mode))
64
73
 
 
74
    def validate_token(self, token):
 
75
        if token == 'token':
 
76
            # already held by this caller
 
77
            return 'token'
 
78
        elif token is None:
 
79
            return
 
80
        else:
 
81
            raise TokenMismatch(token, 'token')
 
82
 
65
83
 
66
84
class TestDummyLock(TestCase):
67
85
 
89
107
        real_lock.unlock()
90
108
        self.assertFalse(real_lock.is_locked())
91
109
        # lock write and unlock
92
 
        real_lock.lock_write()
 
110
        result = real_lock.lock_write()
 
111
        self.assertEqual('token', result)
93
112
        self.assertTrue(real_lock.is_locked())
94
113
        real_lock.unlock()
95
114
        self.assertFalse(real_lock.is_locked())
110
129
 
111
130
class TestCountedLock(TestCase):
112
131
 
113
 
    def test_lock_unlock(self):
 
132
    def test_read_lock(self):
114
133
        # Lock and unlock a counted lock
115
134
        real_lock = DummyLock()
116
135
        l = CountedLock(real_lock)
131
150
    def test_unlock_not_locked(self):
132
151
        real_lock = DummyLock()
133
152
        l = CountedLock(real_lock)
134
 
        self.assertRaises(LockError, l.unlock)
 
153
        self.assertRaises(LockNotHeld, l.unlock)
135
154
 
136
155
    def test_read_lock_while_write_locked(self):
137
156
        real_lock = DummyLock()
138
157
        l = CountedLock(real_lock)
139
158
        l.lock_write()
140
159
        l.lock_read()
141
 
        l.lock_write()
 
160
        self.assertEquals('token', l.lock_write())
142
161
        l.unlock()
143
162
        l.unlock()
144
163
        l.unlock()
159
178
            ['lock_read', 'unlock'],
160
179
            real_lock._calls)
161
180
 
 
181
    def test_write_lock_reentrant(self):
 
182
        real_lock = DummyLock()
 
183
        l = CountedLock(real_lock)
 
184
        self.assertEqual('token', l.lock_write())
 
185
        self.assertEqual('token', l.lock_write())
 
186
        l.unlock()
 
187
        l.unlock()
 
188
 
 
189
    def test_reenter_with_token(self):
 
190
        real_lock = DummyLock()
 
191
        l1 = CountedLock(real_lock)
 
192
        l2 = CountedLock(real_lock)
 
193
        token = l1.lock_write()
 
194
        self.assertEqual('token', token)
 
195
        # now imagine that we lost that connection, but we still have the
 
196
        # token...
 
197
        del l1
 
198
        # because we can supply the token, we can acquire the lock through
 
199
        # another instance
 
200
        self.assertTrue(real_lock.is_locked())
 
201
        self.assertFalse(l2.is_locked())
 
202
        self.assertEqual(token, l2.lock_write(token=token))
 
203
        self.assertTrue(l2.is_locked())
 
204
        self.assertTrue(real_lock.is_locked())
 
205
        l2.unlock()
 
206
        self.assertFalse(l2.is_locked())
 
207
        self.assertFalse(real_lock.is_locked())
 
208
 
162
209
    def test_break_lock(self):
163
210
        real_lock = DummyLock()
164
211
        l = CountedLock(real_lock)