~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-25 04:20:12 UTC
  • mfrom: (3468.3.4 controlfiles)
  • Revision ID: pqm@pqm.ubuntu.com-20090325042012-23a6pm0mraw7g2kg
(mbp) better CountedLock handling of tokens,
        and bzrdir takes more responsibility for default file modes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2008 Canonical Ltd
 
1
# Copyright (C) 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
42
42
        self._calls.append('lock_read')
43
43
 
44
44
    def lock_write(self, token=None):
45
 
        if token not in (None, 'token'):
46
 
            raise TokenMismatch(token, 'token')
 
45
        if token is not None:
 
46
            if token == 'token':
 
47
                # already held by this caller
 
48
                return 'token'
 
49
            else:
 
50
                raise TokenMismatch()
47
51
        self._assert_not_locked()
48
52
        self._lock_mode = 'w'
49
53
        self._calls.append('lock_write')
103
107
        real_lock.unlock()
104
108
        self.assertFalse(real_lock.is_locked())
105
109
        # lock write and unlock
106
 
        real_lock.lock_write()
 
110
        result = real_lock.lock_write()
 
111
        self.assertEqual('token', result)
107
112
        self.assertTrue(real_lock.is_locked())
108
113
        real_lock.unlock()
109
114
        self.assertFalse(real_lock.is_locked())
124
129
 
125
130
class TestCountedLock(TestCase):
126
131
 
127
 
    def test_lock_unlock(self):
 
132
    def test_read_lock(self):
128
133
        # Lock and unlock a counted lock
129
134
        real_lock = DummyLock()
130
135
        l = CountedLock(real_lock)
152
157
        l = CountedLock(real_lock)
153
158
        l.lock_write()
154
159
        l.lock_read()
155
 
        l.lock_write()
 
160
        self.assertEquals('token', l.lock_write())
156
161
        l.unlock()
157
162
        l.unlock()
158
163
        l.unlock()
173
178
            ['lock_read', 'unlock'],
174
179
            real_lock._calls)
175
180
 
 
181
    def test_write_lock_reentrant(self):
 
182
        real_lock = DummyLock()
 
183
        l = CountedLock(real_lock)
 
184
        self.assertEqual('token', l.lock_write())
 
185
        self.assertEqual('token', l.lock_write())
 
186
        l.unlock()
 
187
        l.unlock()
 
188
 
 
189
    def test_reenter_with_token(self):
 
190
        real_lock = DummyLock()
 
191
        l1 = CountedLock(real_lock)
 
192
        l2 = CountedLock(real_lock)
 
193
        token = l1.lock_write()
 
194
        self.assertEqual('token', token)
 
195
        # now imagine that we lost that connection, but we still have the
 
196
        # token...
 
197
        del l1
 
198
        # because we can supply the token, we can acquire the lock through
 
199
        # another instance
 
200
        self.assertTrue(real_lock.is_locked())
 
201
        self.assertFalse(l2.is_locked())
 
202
        self.assertEqual(token, l2.lock_write(token=token))
 
203
        self.assertTrue(l2.is_locked())
 
204
        self.assertTrue(real_lock.is_locked())
 
205
        l2.unlock()
 
206
        self.assertFalse(l2.is_locked())
 
207
        self.assertFalse(real_lock.is_locked())
 
208
 
176
209
    def test_break_lock(self):
177
210
        real_lock = DummyLock()
178
211
        l = CountedLock(real_lock)