~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Martin Pool
  • Date: 2005-08-18 07:51:58 UTC
  • Revision ID: mbp@sourcefrog.net-20050818075157-5f69075fa843d558
- add space to store revision-id in weave files

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for bzrlib.counted_lock"""
18
 
 
19
 
from bzrlib.counted_lock import CountedLock
20
 
from bzrlib.errors import (
21
 
    LockError,
22
 
    ReadOnlyError,
23
 
    )
24
 
from bzrlib.tests import TestCase
25
 
 
26
 
 
27
 
class DummyLock(object):
28
 
    """Lock that just records what's been done to it."""
29
 
 
30
 
    def __init__(self):
31
 
        self._calls = []
32
 
        self._lock_mode = None
33
 
 
34
 
    def is_locked(self):
35
 
        return self._lock_mode is not None
36
 
 
37
 
    def lock_read(self):
38
 
        self._assert_not_locked()
39
 
        self._lock_mode = 'r'
40
 
        self._calls.append('lock_read')
41
 
 
42
 
    def lock_write(self):
43
 
        self._assert_not_locked()
44
 
        self._lock_mode = 'w'
45
 
        self._calls.append('lock_write')
46
 
 
47
 
    def unlock(self):
48
 
        self._assert_locked()
49
 
        self._lock_mode = None
50
 
        self._calls.append('unlock')
51
 
 
52
 
    def break_lock(self):
53
 
        self._lock_mode = None
54
 
        self._calls.append('break')
55
 
 
56
 
    def _assert_locked(self):
57
 
        if not self._lock_mode:
58
 
            raise LockError("%s is not locked" % (self,))
59
 
 
60
 
    def _assert_not_locked(self):
61
 
        if self._lock_mode:
62
 
            raise LockError("%s is already locked in mode %r" %
63
 
                (self, self._lock_mode))
64
 
 
65
 
 
66
 
class TestDummyLock(TestCase):
67
 
 
68
 
    def test_lock_initially_not_held(self):
69
 
        l = DummyLock()
70
 
        self.assertFalse(l.is_locked())
71
 
 
72
 
    def test_lock_not_reentrant(self):
73
 
        # can't take the underlying lock twice
74
 
        l = DummyLock()
75
 
        l.lock_read()
76
 
        self.assertRaises(LockError, l.lock_read)
77
 
 
78
 
    def test_detect_underlock(self):
79
 
        l = DummyLock()
80
 
        self.assertRaises(LockError, l.unlock)
81
 
 
82
 
    def test_basic_locking(self):
83
 
        # dummy lock works like a basic non reentrant lock
84
 
        real_lock = DummyLock()
85
 
        self.assertFalse(real_lock.is_locked())
86
 
        # lock read and unlock
87
 
        real_lock.lock_read()
88
 
        self.assertTrue(real_lock.is_locked())
89
 
        real_lock.unlock()
90
 
        self.assertFalse(real_lock.is_locked())
91
 
        # lock write and unlock
92
 
        real_lock.lock_write()
93
 
        self.assertTrue(real_lock.is_locked())
94
 
        real_lock.unlock()
95
 
        self.assertFalse(real_lock.is_locked())
96
 
        # check calls
97
 
        self.assertEqual(
98
 
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
99
 
            real_lock._calls)
100
 
 
101
 
    def test_break_lock(self):
102
 
        l = DummyLock()
103
 
        l.lock_write()
104
 
        l.break_lock()
105
 
        self.assertFalse(l.is_locked())
106
 
        self.assertEqual(
107
 
            ['lock_write', 'break'],
108
 
            l._calls)
109
 
 
110
 
 
111
 
class TestCountedLock(TestCase):
112
 
 
113
 
    def test_lock_unlock(self):
114
 
        # Lock and unlock a counted lock
115
 
        real_lock = DummyLock()
116
 
        l = CountedLock(real_lock)
117
 
        self.assertFalse(l.is_locked())
118
 
        # can lock twice, although this isn't allowed on the underlying lock
119
 
        l.lock_read()
120
 
        l.lock_read()
121
 
        self.assertTrue(l.is_locked())
122
 
        # and release
123
 
        l.unlock()
124
 
        self.assertTrue(l.is_locked())
125
 
        l.unlock()
126
 
        self.assertFalse(l.is_locked())
127
 
        self.assertEquals(
128
 
            ['lock_read', 'unlock'],
129
 
            real_lock._calls)
130
 
 
131
 
    def test_unlock_not_locked(self):
132
 
        real_lock = DummyLock()
133
 
        l = CountedLock(real_lock)
134
 
        self.assertRaises(LockError, l.unlock)
135
 
 
136
 
    def test_read_lock_while_write_locked(self):
137
 
        real_lock = DummyLock()
138
 
        l = CountedLock(real_lock)
139
 
        l.lock_write()
140
 
        l.lock_read()
141
 
        l.lock_write()
142
 
        l.unlock()
143
 
        l.unlock()
144
 
        l.unlock()
145
 
        self.assertFalse(l.is_locked())
146
 
        self.assertEquals(
147
 
            ['lock_write', 'unlock'],
148
 
            real_lock._calls)
149
 
 
150
 
    def test_write_lock_while_read_locked(self):
151
 
        real_lock = DummyLock()
152
 
        l = CountedLock(real_lock)
153
 
        l.lock_read()
154
 
        self.assertRaises(ReadOnlyError, l.lock_write)
155
 
        self.assertRaises(ReadOnlyError, l.lock_write)
156
 
        l.unlock()
157
 
        self.assertFalse(l.is_locked())
158
 
        self.assertEquals(
159
 
            ['lock_read', 'unlock'],
160
 
            real_lock._calls)
161
 
 
162
 
    def test_break_lock(self):
163
 
        real_lock = DummyLock()
164
 
        l = CountedLock(real_lock)
165
 
        l.lock_write()
166
 
        l.lock_write()
167
 
        self.assertTrue(real_lock.is_locked())
168
 
        l.break_lock()
169
 
        self.assertFalse(l.is_locked())
170
 
        self.assertFalse(real_lock.is_locked())