~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/workingtree_implementations/test_locking.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-13 18:38:58 UTC
  • mfrom: (1863 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1869.
  • Revision ID: john@arbash-meinel.com-20060713183858-ebf4aa1f9ef8bb6e
[merge] bzr.dev 1863

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Test locks across all branch implemenations"""
 
18
 
 
19
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
 
20
from bzrlib.tests.workingtree_implementations import TestCaseWithWorkingTree
 
21
 
 
22
 
 
23
class TestWorkingTreeLocking(TestCaseWithWorkingTree):
 
24
 
 
25
    def get_instrumented_tree(self):
 
26
        """Get a WorkingTree object which has been instrumented"""
 
27
        # TODO: jam 20060630 It may be that not all formats have a 
 
28
        # '_control_files' member. So we should fail gracefully if
 
29
        # not there. But assuming it has them lets us test the exact 
 
30
        # lock/unlock order.
 
31
        self.locks = []
 
32
        wt = self.make_branch_and_tree('.')
 
33
        wt = LockWrapper(self.locks, wt, 't')
 
34
        wt._branch = LockWrapper(self.locks, wt._branch, 'b')
 
35
 
 
36
        tcf = wt._control_files
 
37
        bcf = wt.branch.control_files
 
38
 
 
39
        # Look out for AllInOne
 
40
        self.combined_control = tcf is bcf
 
41
 
 
42
        wt._control_files = LockWrapper(self.locks, wt._control_files, 'tc')
 
43
        wt.branch.control_files = \
 
44
            LockWrapper(self.locks, wt.branch.control_files, 'bc')
 
45
        return wt
 
46
 
 
47
    def test_01_lock_read(self):
 
48
        # Test that locking occurs in the correct order
 
49
        wt = self.get_instrumented_tree()
 
50
 
 
51
        self.assertFalse(wt.is_locked())
 
52
        self.assertFalse(wt.branch.is_locked())
 
53
        wt.lock_read()
 
54
        try:
 
55
            self.assertTrue(wt.is_locked())
 
56
            self.assertTrue(wt.branch.is_locked())
 
57
        finally:
 
58
            wt.unlock()
 
59
        self.assertFalse(wt.is_locked())
 
60
        self.assertFalse(wt.branch.is_locked())
 
61
 
 
62
        self.assertEqual([('t', 'lr', True),
 
63
                          ('b', 'lr', True),
 
64
                          ('bc', 'lr', True),
 
65
                          ('tc', 'lr', True),
 
66
                          ('t', 'ul', True),
 
67
                          ('tc', 'ul', True),
 
68
                          ('b', 'ul', True),
 
69
                          ('bc', 'ul', True),
 
70
                         ], self.locks)
 
71
 
 
72
    def test_02_lock_write(self):
 
73
        # Test that locking occurs in the correct order
 
74
        wt = self.get_instrumented_tree()
 
75
 
 
76
        self.assertFalse(wt.is_locked())
 
77
        self.assertFalse(wt.branch.is_locked())
 
78
        wt.lock_write()
 
79
        try:
 
80
            self.assertTrue(wt.is_locked())
 
81
            self.assertTrue(wt.branch.is_locked())
 
82
        finally:
 
83
            wt.unlock()
 
84
        self.assertFalse(wt.is_locked())
 
85
        self.assertFalse(wt.branch.is_locked())
 
86
 
 
87
        self.assertEqual([('t', 'lw', True),
 
88
                          ('b', 'lw', True),
 
89
                          ('bc', 'lw', True),
 
90
                          ('tc', 'lw', True),
 
91
                          ('t', 'ul', True),
 
92
                          ('tc', 'ul', True),
 
93
                          ('b', 'ul', True),
 
94
                          ('bc', 'ul', True),
 
95
                         ], self.locks)
 
96
 
 
97
    def test_03_lock_fail_unlock_branch(self):
 
98
        # Make sure tree.unlock() is called, even if there is a
 
99
        # failure while unlocking the branch.
 
100
        wt = self.get_instrumented_tree()
 
101
        wt.branch.disable_unlock()
 
102
 
 
103
        self.assertFalse(wt.is_locked())
 
104
        self.assertFalse(wt.branch.is_locked())
 
105
        wt.lock_write()
 
106
        try:
 
107
            self.assertTrue(wt.is_locked())
 
108
            self.assertTrue(wt.branch.is_locked())
 
109
            self.assertRaises(TestPreventLocking, wt.unlock)
 
110
            if self.combined_control:
 
111
                self.assertTrue(wt.is_locked())
 
112
            else:
 
113
                self.assertFalse(wt.is_locked())
 
114
            self.assertTrue(wt.branch.is_locked())
 
115
 
 
116
            self.assertEqual([('t', 'lw', True),
 
117
                              ('b', 'lw', True),
 
118
                              ('bc', 'lw', True),
 
119
                              ('tc', 'lw', True),
 
120
                              ('t', 'ul', True),
 
121
                              ('tc', 'ul', True),
 
122
                              ('b', 'ul', False), 
 
123
                             ], self.locks)
 
124
 
 
125
        finally:
 
126
            # For cleanup purposes, make sure we are unlocked
 
127
            wt.branch._other.unlock()
 
128
 
 
129
    def test_04_lock_fail_unlock_control(self):
 
130
        # Make sure that branch is unlocked, even if we fail to unlock self
 
131
        wt = self.get_instrumented_tree()
 
132
        wt._control_files.disable_unlock()
 
133
 
 
134
        self.assertFalse(wt.is_locked())
 
135
        self.assertFalse(wt.branch.is_locked())
 
136
        wt.lock_write()
 
137
        try:
 
138
            self.assertTrue(wt.is_locked())
 
139
            self.assertTrue(wt.branch.is_locked())
 
140
            self.assertRaises(TestPreventLocking, wt.unlock)
 
141
            self.assertTrue(wt.is_locked())
 
142
            if self.combined_control:
 
143
                self.assertTrue(wt.branch.is_locked())
 
144
            else:
 
145
                self.assertFalse(wt.branch.is_locked())
 
146
 
 
147
            self.assertEqual([('t', 'lw', True),
 
148
                              ('b', 'lw', True),
 
149
                              ('bc', 'lw', True),
 
150
                              ('tc', 'lw', True),
 
151
                              ('t', 'ul', True),
 
152
                              ('tc', 'ul', False),
 
153
                              ('b', 'ul', True), 
 
154
                              ('bc', 'ul', True), 
 
155
                             ], self.locks)
 
156
 
 
157
        finally:
 
158
            # For cleanup purposes, make sure we are unlocked
 
159
            wt._control_files._other.unlock()
 
160
 
 
161
    def test_05_lock_read_fail_branch(self):
 
162
        # Test that the tree is not locked if it cannot lock the branch
 
163
        wt = self.get_instrumented_tree()
 
164
        wt.branch.disable_lock_read()
 
165
 
 
166
        self.assertRaises(TestPreventLocking, wt.lock_read)
 
167
        self.assertFalse(wt.is_locked())
 
168
        self.assertFalse(wt.branch.is_locked())
 
169
 
 
170
        self.assertEqual([('t', 'lr', True),
 
171
                          ('b', 'lr', False), 
 
172
                         ], self.locks)
 
173
 
 
174
    def test_06_lock_write_fail_branch(self):
 
175
        # Test that the tree is not locked if it cannot lock the branch
 
176
        wt = self.get_instrumented_tree()
 
177
        wt.branch.disable_lock_write()
 
178
 
 
179
        self.assertRaises(TestPreventLocking, wt.lock_write)
 
180
        self.assertFalse(wt.is_locked())
 
181
        self.assertFalse(wt.branch.is_locked())
 
182
 
 
183
        self.assertEqual([('t', 'lw', True),
 
184
                          ('b', 'lw', False), 
 
185
                         ], self.locks)
 
186
 
 
187
    def test_07_lock_read_fail_control(self):
 
188
        # We should unlock the branch if we can't lock the tree
 
189
        wt = self.get_instrumented_tree()
 
190
        wt._control_files.disable_lock_read()
 
191
 
 
192
        self.assertRaises(TestPreventLocking, wt.lock_read)
 
193
        self.assertFalse(wt.is_locked())
 
194
        self.assertFalse(wt.branch.is_locked())
 
195
 
 
196
        self.assertEqual([('t', 'lr', True),
 
197
                          ('b', 'lr', True),
 
198
                          ('bc', 'lr', True),
 
199
                          ('tc', 'lr', False),
 
200
                          ('b', 'ul', True),
 
201
                          ('bc', 'ul', True)
 
202
                         ], self.locks)
 
203
 
 
204
    def test_08_lock_write_fail_control(self):
 
205
        # We shouldn't try to lock the repo if we can't lock the branch
 
206
        wt = self.get_instrumented_tree()
 
207
        wt._control_files.disable_lock_write()
 
208
 
 
209
        self.assertRaises(TestPreventLocking, wt.lock_write)
 
210
        self.assertFalse(wt.is_locked())
 
211
        self.assertFalse(wt.branch.is_locked())
 
212
 
 
213
        self.assertEqual([('t', 'lw', True),
 
214
                          ('b', 'lw', True),
 
215
                          ('bc', 'lw', True),
 
216
                          ('tc', 'lw', False),
 
217
                          ('b', 'ul', True),
 
218
                          ('bc', 'ul', True)
 
219
                         ], self.locks)