~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: Robert Collins
  • Date: 2007-11-09 17:50:31 UTC
  • mto: This revision was merged to the branch mainline in revision 2988.
  • Revision ID: robertc@robertcollins.net-20071109175031-agaiy6530rvbprmb
Change (without backwards compatibility) the
iter_lines_added_or_present_in_versions VersionedFile API to yield the
text version that each line is being returned from. This is useful for
reconcile in determining what inventories reference what texts.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for bzrlib.counted_lock"""
 
18
 
 
19
from bzrlib.counted_lock import CountedLock
 
20
from bzrlib.errors import (
 
21
    LockError,
 
22
    ReadOnlyError,
 
23
    )
 
24
from bzrlib.tests import TestCase
 
25
 
 
26
 
 
27
class DummyLock(object):
 
28
    """Lock that just records what's been done to it."""
 
29
 
 
30
    def __init__(self):
 
31
        self._calls = []
 
32
        self._lock_mode = None
 
33
 
 
34
    def is_locked(self):
 
35
        return self._lock_mode is not None
 
36
 
 
37
    def lock_read(self):
 
38
        self._assert_not_locked()
 
39
        self._lock_mode = 'r'
 
40
        self._calls.append('lock_read')
 
41
 
 
42
    def lock_write(self):
 
43
        self._assert_not_locked()
 
44
        self._lock_mode = 'w'
 
45
        self._calls.append('lock_write')
 
46
 
 
47
    def unlock(self):
 
48
        self._assert_locked()
 
49
        self._lock_mode = None
 
50
        self._calls.append('unlock')
 
51
 
 
52
    def break_lock(self):
 
53
        self._lock_mode = None
 
54
        self._calls.append('break')
 
55
 
 
56
    def _assert_locked(self):
 
57
        if not self._lock_mode:
 
58
            raise LockError("%s is not locked" % (self,))
 
59
 
 
60
    def _assert_not_locked(self):
 
61
        if self._lock_mode:
 
62
            raise LockError("%s is already locked in mode %r" %
 
63
                (self, self._lock_mode))
 
64
 
 
65
 
 
66
class TestDummyLock(TestCase):
 
67
 
 
68
    def test_lock_initially_not_held(self):
 
69
        l = DummyLock()
 
70
        self.assertFalse(l.is_locked())
 
71
 
 
72
    def test_lock_not_reentrant(self):
 
73
        # can't take the underlying lock twice
 
74
        l = DummyLock()
 
75
        l.lock_read()
 
76
        self.assertRaises(LockError, l.lock_read)
 
77
 
 
78
    def test_detect_underlock(self):
 
79
        l = DummyLock()
 
80
        self.assertRaises(LockError, l.unlock)
 
81
 
 
82
    def test_basic_locking(self):
 
83
        # dummy lock works like a basic non reentrant lock
 
84
        real_lock = DummyLock()
 
85
        self.assertFalse(real_lock.is_locked())
 
86
        # lock read and unlock
 
87
        real_lock.lock_read()
 
88
        self.assertTrue(real_lock.is_locked())
 
89
        real_lock.unlock()
 
90
        self.assertFalse(real_lock.is_locked())
 
91
        # lock write and unlock
 
92
        real_lock.lock_write()
 
93
        self.assertTrue(real_lock.is_locked())
 
94
        real_lock.unlock()
 
95
        self.assertFalse(real_lock.is_locked())
 
96
        # check calls
 
97
        self.assertEqual(
 
98
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
 
99
            real_lock._calls)
 
100
 
 
101
    def test_break_lock(self):
 
102
        l = DummyLock()
 
103
        l.lock_write()
 
104
        l.break_lock()
 
105
        self.assertFalse(l.is_locked())
 
106
        self.assertEqual(
 
107
            ['lock_write', 'break'],
 
108
            l._calls)
 
109
 
 
110
 
 
111
class TestCountedLock(TestCase):
 
112
 
 
113
    def test_lock_unlock(self):
 
114
        # Lock and unlock a counted lock
 
115
        real_lock = DummyLock()
 
116
        l = CountedLock(real_lock)
 
117
        self.assertFalse(l.is_locked())
 
118
        # can lock twice, although this isn't allowed on the underlying lock
 
119
        l.lock_read()
 
120
        l.lock_read()
 
121
        self.assertTrue(l.is_locked())
 
122
        # and release
 
123
        l.unlock()
 
124
        self.assertTrue(l.is_locked())
 
125
        l.unlock()
 
126
        self.assertFalse(l.is_locked())
 
127
        self.assertEquals(
 
128
            ['lock_read', 'unlock'],
 
129
            real_lock._calls)
 
130
 
 
131
    def test_unlock_not_locked(self):
 
132
        real_lock = DummyLock()
 
133
        l = CountedLock(real_lock)
 
134
        self.assertRaises(LockError, l.unlock)
 
135
 
 
136
    def test_read_lock_while_write_locked(self):
 
137
        real_lock = DummyLock()
 
138
        l = CountedLock(real_lock)
 
139
        l.lock_write()
 
140
        l.lock_read()
 
141
        l.lock_write()
 
142
        l.unlock()
 
143
        l.unlock()
 
144
        l.unlock()
 
145
        self.assertFalse(l.is_locked())
 
146
        self.assertEquals(
 
147
            ['lock_write', 'unlock'],
 
148
            real_lock._calls)
 
149
 
 
150
    def test_write_lock_while_read_locked(self):
 
151
        real_lock = DummyLock()
 
152
        l = CountedLock(real_lock)
 
153
        l.lock_read()
 
154
        self.assertRaises(ReadOnlyError, l.lock_write)
 
155
        self.assertRaises(ReadOnlyError, l.lock_write)
 
156
        l.unlock()
 
157
        self.assertFalse(l.is_locked())
 
158
        self.assertEquals(
 
159
            ['lock_read', 'unlock'],
 
160
            real_lock._calls)
 
161
 
 
162
    def test_break_lock(self):
 
163
        real_lock = DummyLock()
 
164
        l = CountedLock(real_lock)
 
165
        l.lock_write()
 
166
        l.lock_write()
 
167
        self.assertTrue(real_lock.is_locked())
 
168
        l.break_lock()
 
169
        self.assertFalse(l.is_locked())
 
170
        self.assertFalse(real_lock.is_locked())