14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test locks across all branch implemenations"""
17
"""Tests for the (un)lock interfaces on all working tree implemenations."""
19
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
19
import bzrlib.branch as branch
20
import bzrlib.errors as errors
20
21
from bzrlib.tests.workingtree_implementations import TestCaseWithWorkingTree
23
24
class TestWorkingTreeLocking(TestCaseWithWorkingTree):
25
def get_instrumented_tree(self):
26
"""Get a WorkingTree object which has been instrumented"""
27
# TODO: jam 20060630 It may be that not all formats have a
28
# '_control_files' member. So we should fail gracefully if
29
# not there. But assuming it has them lets us test the exact
26
def test_trivial_lock_read_unlock(self):
27
"""Locking and unlocking should work trivially."""
32
28
wt = self.make_branch_and_tree('.')
33
wt = LockWrapper(self.locks, wt, 't')
34
wt._branch = LockWrapper(self.locks, wt._branch, 'b')
36
tcf = wt._control_files
37
bcf = wt.branch.control_files
39
# Look out for AllInOne
40
self.combined_control = tcf is bcf
42
wt._control_files = LockWrapper(self.locks, wt._control_files, 'tc')
43
wt.branch.control_files = \
44
LockWrapper(self.locks, wt.branch.control_files, 'bc')
47
def test_01_lock_read(self):
48
# Test that locking occurs in the correct order
49
wt = self.get_instrumented_tree()
51
30
self.assertFalse(wt.is_locked())
52
31
self.assertFalse(wt.branch.is_locked())
84
53
self.assertFalse(wt.is_locked())
85
54
self.assertFalse(wt.branch.is_locked())
87
self.assertEqual([('t', 'lw', True),
97
def test_03_lock_fail_unlock_branch(self):
98
# Make sure tree.unlock() is called, even if there is a
99
# failure while unlocking the branch.
100
wt = self.get_instrumented_tree()
101
wt.branch.disable_unlock()
103
self.assertFalse(wt.is_locked())
104
self.assertFalse(wt.branch.is_locked())
107
self.assertTrue(wt.is_locked())
108
self.assertTrue(wt.branch.is_locked())
109
self.assertRaises(TestPreventLocking, wt.unlock)
110
if self.combined_control:
111
self.assertTrue(wt.is_locked())
113
self.assertFalse(wt.is_locked())
114
self.assertTrue(wt.branch.is_locked())
116
self.assertEqual([('t', 'lw', True),
126
# For cleanup purposes, make sure we are unlocked
127
wt.branch._other.unlock()
129
def test_04_lock_fail_unlock_control(self):
130
# Make sure that branch is unlocked, even if we fail to unlock self
131
wt = self.get_instrumented_tree()
132
wt._control_files.disable_unlock()
134
self.assertFalse(wt.is_locked())
135
self.assertFalse(wt.branch.is_locked())
138
self.assertTrue(wt.is_locked())
139
self.assertTrue(wt.branch.is_locked())
140
self.assertRaises(TestPreventLocking, wt.unlock)
141
self.assertTrue(wt.is_locked())
142
if self.combined_control:
143
self.assertTrue(wt.branch.is_locked())
145
self.assertFalse(wt.branch.is_locked())
147
self.assertEqual([('t', 'lw', True),
158
# For cleanup purposes, make sure we are unlocked
159
wt._control_files._other.unlock()
161
def test_05_lock_read_fail_branch(self):
162
# Test that the tree is not locked if it cannot lock the branch
163
wt = self.get_instrumented_tree()
164
wt.branch.disable_lock_read()
166
self.assertRaises(TestPreventLocking, wt.lock_read)
167
self.assertFalse(wt.is_locked())
168
self.assertFalse(wt.branch.is_locked())
170
self.assertEqual([('t', 'lr', True),
174
def test_06_lock_write_fail_branch(self):
175
# Test that the tree is not locked if it cannot lock the branch
176
wt = self.get_instrumented_tree()
177
wt.branch.disable_lock_write()
179
self.assertRaises(TestPreventLocking, wt.lock_write)
180
self.assertFalse(wt.is_locked())
181
self.assertFalse(wt.branch.is_locked())
183
self.assertEqual([('t', 'lw', True),
187
def test_07_lock_read_fail_control(self):
188
# We should unlock the branch if we can't lock the tree
189
wt = self.get_instrumented_tree()
190
wt._control_files.disable_lock_read()
192
self.assertRaises(TestPreventLocking, wt.lock_read)
193
self.assertFalse(wt.is_locked())
194
self.assertFalse(wt.branch.is_locked())
196
self.assertEqual([('t', 'lr', True),
204
def test_08_lock_write_fail_control(self):
205
# We shouldn't try to lock the repo if we can't lock the branch
206
wt = self.get_instrumented_tree()
207
wt._control_files.disable_lock_write()
209
self.assertRaises(TestPreventLocking, wt.lock_write)
210
self.assertFalse(wt.is_locked())
211
self.assertFalse(wt.branch.is_locked())
213
self.assertEqual([('t', 'lw', True),
56
def test_unlock_branch_failures(self):
57
"""If the branch unlock fails the tree must still unlock."""
58
# The public interface for WorkingTree requires a branch, but
59
# does not require that the working tree use the branch - its
60
# implementation specific how the WorkingTree, Branch, and Repository
62
# in order to test that implementations which *do* unlock via the branch
63
# do so correctly, we unlock the branch after locking the working tree.
64
# The next unlock on working tree should trigger a LockNotHeld exception
65
# from the branch object, which must be exposed to the caller. To meet
66
# our object model - where locking a tree locks its branch, and
67
# unlocking a branch does not unlock a working tree, *even* for
68
# all-in-one implementations like bzr 0.6, git, and hg, implementations
69
# must have some separate counter for each object, so our explicit
70
# unlock should trigger some error on all implementations, and
71
# requiring that to be LockNotHeld seems reasonable.
73
# we use this approach rather than decorating the Branch, because the
74
# public interface of WorkingTree does not permit altering the branch
75
# object - and we cannot tell which attribute might allow us to
76
# backdoor-in and change it reliably. For implementation specific tests
77
# we can do such skullduggery, but not for interface specific tests.
79
wt = self.make_branch_and_tree('.')
81
self.assertFalse(wt.is_locked())
82
self.assertFalse(wt.branch.is_locked())
84
self.assertTrue(wt.is_locked())
85
self.assertTrue(wt.branch.is_locked())
87
# manually unlock the branch, preparing a LockNotHeld error.
89
# the branch *may* still be locked here, if its an all-in-one
90
# implementation because there is a single lock object with three
91
# references on it, and unlocking the branch only drops this by two
92
self.assertRaises(errors.LockNotHeld, wt.unlock)
93
# but now, the tree must be unlocked
94
self.assertFalse(wt.is_locked())
96
self.assertFalse(wt.branch.is_locked())
98
def test_failing_to_lock_branch_does_not_lock(self):
99
"""If the branch cannot be locked, dont lock the tree."""
100
# Many implementations treat read-locks as non-blocking, but some
101
# treat them as blocking with writes.. Accordingly we test this by
102
# opening the branch twice, and locking the branch for write in the
103
# second instance. Our lock contract requires separate instances to
104
# mutually exclude if a lock is exclusive at all: If we get no error
105
# locking, the test still passes.
106
wt = self.make_branch_and_tree('.')
107
branch_copy = branch.Branch.open('.')
108
branch_copy.lock_write()
112
except errors.LockError:
113
# any error here means the locks are exclusive in some
115
self.assertFalse(wt.is_locked())
116
self.assertFalse(wt.branch.is_locked())
119
# no error - the branch allows read locks while writes
120
# are taken, just pass.
125
def test_failing_to_lock_write_branch_does_not_lock(self):
126
"""If the branch cannot be write locked, dont lock the tree."""
127
# all implementations of branch are required to treat write
128
# locks as blocking (compare to repositories which are not required
130
# Accordingly we test this by opening the branch twice, and locking the
131
# branch for write in the second instance. Our lock contract requires
132
# separate instances to mutually exclude.
133
wt = self.make_branch_and_tree('.')
134
branch_copy = branch.Branch.open('.')
135
branch_copy.lock_write()
138
self.assertRaises(errors.LockError, wt.lock_write)
139
self.assertFalse(wt.is_locked())
140
self.assertFalse(wt.branch.is_locked())