1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test locks across all branch implemenations"""
19
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
20
from bzrlib.tests.workingtree_implementations import TestCaseWithWorkingTree
23
class TestWorkingTreeLocking(TestCaseWithWorkingTree):
25
def get_instrumented_tree(self):
26
"""Get a WorkingTree object which has been instrumented"""
27
# TODO: jam 20060630 It may be that not all formats have a
28
# '_control_files' member. So we should fail gracefully if
29
# not there. But assuming it has them lets us test the exact
32
wt = self.make_branch_and_tree('.')
33
wt = LockWrapper(self.locks, wt, 't')
34
wt._branch = LockWrapper(self.locks, wt._branch, 'b')
36
tcf = wt._control_files
37
bcf = wt.branch.control_files
39
# Look out for AllInOne
40
self.combined_control = tcf is bcf
42
wt._control_files = LockWrapper(self.locks, wt._control_files, 'tc')
43
wt.branch.control_files = \
44
LockWrapper(self.locks, wt.branch.control_files, 'bc')
47
def test_01_lock_read(self):
48
# Test that locking occurs in the correct order
49
wt = self.get_instrumented_tree()
51
self.assertFalse(wt.is_locked())
52
self.assertFalse(wt.branch.is_locked())
55
self.assertTrue(wt.is_locked())
56
self.assertTrue(wt.branch.is_locked())
59
self.assertFalse(wt.is_locked())
60
self.assertFalse(wt.branch.is_locked())
62
self.assertEqual([('t', 'lr', True),
72
def test_02_lock_write(self):
73
# Test that locking occurs in the correct order
74
wt = self.get_instrumented_tree()
76
self.assertFalse(wt.is_locked())
77
self.assertFalse(wt.branch.is_locked())
80
self.assertTrue(wt.is_locked())
81
self.assertTrue(wt.branch.is_locked())
84
self.assertFalse(wt.is_locked())
85
self.assertFalse(wt.branch.is_locked())
87
self.assertEqual([('t', 'lw', True),
97
def test_03_lock_fail_unlock_branch(self):
98
# Make sure tree.unlock() is called, even if there is a
99
# failure while unlocking the branch.
100
wt = self.get_instrumented_tree()
101
wt.branch.disable_unlock()
103
self.assertFalse(wt.is_locked())
104
self.assertFalse(wt.branch.is_locked())
107
self.assertTrue(wt.is_locked())
108
self.assertTrue(wt.branch.is_locked())
109
self.assertRaises(TestPreventLocking, wt.unlock)
110
if self.combined_control:
111
self.assertTrue(wt.is_locked())
113
self.assertFalse(wt.is_locked())
114
self.assertTrue(wt.branch.is_locked())
116
self.assertEqual([('t', 'lw', True),
126
# For cleanup purposes, make sure we are unlocked
127
wt.branch._other.unlock()
129
def test_04_lock_fail_unlock_control(self):
130
# Make sure that branch is unlocked, even if we fail to unlock self
131
wt = self.get_instrumented_tree()
132
wt._control_files.disable_unlock()
134
self.assertFalse(wt.is_locked())
135
self.assertFalse(wt.branch.is_locked())
138
self.assertTrue(wt.is_locked())
139
self.assertTrue(wt.branch.is_locked())
140
self.assertRaises(TestPreventLocking, wt.unlock)
141
self.assertTrue(wt.is_locked())
142
if self.combined_control:
143
self.assertTrue(wt.branch.is_locked())
145
self.assertFalse(wt.branch.is_locked())
147
self.assertEqual([('t', 'lw', True),
158
# For cleanup purposes, make sure we are unlocked
159
wt._control_files._other.unlock()
161
def test_05_lock_read_fail_branch(self):
162
# Test that the tree is not locked if it cannot lock the branch
163
wt = self.get_instrumented_tree()
164
wt.branch.disable_lock_read()
166
self.assertRaises(TestPreventLocking, wt.lock_read)
167
self.assertFalse(wt.is_locked())
168
self.assertFalse(wt.branch.is_locked())
170
self.assertEqual([('t', 'lr', True),
174
def test_06_lock_write_fail_branch(self):
175
# Test that the tree is not locked if it cannot lock the branch
176
wt = self.get_instrumented_tree()
177
wt.branch.disable_lock_write()
179
self.assertRaises(TestPreventLocking, wt.lock_write)
180
self.assertFalse(wt.is_locked())
181
self.assertFalse(wt.branch.is_locked())
183
self.assertEqual([('t', 'lw', True),
187
def test_07_lock_read_fail_control(self):
188
# We should unlock the branch if we can't lock the tree
189
wt = self.get_instrumented_tree()
190
wt._control_files.disable_lock_read()
192
self.assertRaises(TestPreventLocking, wt.lock_read)
193
self.assertFalse(wt.is_locked())
194
self.assertFalse(wt.branch.is_locked())
196
self.assertEqual([('t', 'lr', True),
204
def test_08_lock_write_fail_control(self):
205
# We shouldn't try to lock the repo if we can't lock the branch
206
wt = self.get_instrumented_tree()
207
wt._control_files.disable_lock_write()
209
self.assertRaises(TestPreventLocking, wt.lock_write)
210
self.assertFalse(wt.is_locked())
211
self.assertFalse(wt.branch.is_locked())
213
self.assertEqual([('t', 'lw', True),