1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Test locks across all branch implemenations"""
19
from bzrlib.tests.branch_implementations.test_branch import TestCaseWithBranch
20
from bzrlib.tests.LockHelpers import TestPreventLocking, LockWrapper
23
class TestBranchLocking(TestCaseWithBranch):
25
def get_instrumented_branch(self):
26
"""Get a Branch object which has been instrumented"""
27
# TODO: jam 20060630 It may be that not all formats have a
28
# 'control_files' member. So we should fail gracefully if
29
# not there. But assuming it has them lets us test the exact
32
b = LockWrapper(self.locks, self.get_branch(), 'b')
33
b.repository = LockWrapper(self.locks, b.repository, 'r')
35
rcf = b.repository.control_files
37
# Look out for branch types that reuse their control files
38
self.combined_control = bcf is rcf
40
b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
41
b.repository.control_files = \
42
LockWrapper(self.locks, b.repository.control_files, 'rc')
45
def test_01_lock_read(self):
46
# Test that locking occurs in the correct order
47
b = self.get_instrumented_branch()
49
self.assertFalse(b.is_locked())
50
self.assertFalse(b.repository.is_locked())
53
self.assertTrue(b.is_locked())
54
self.assertTrue(b.repository.is_locked())
57
self.assertFalse(b.is_locked())
58
self.assertFalse(b.repository.is_locked())
60
self.assertEqual([('b', 'lr', True),
70
def test_02_lock_write(self):
71
# Test that locking occurs in the correct order
72
b = self.get_instrumented_branch()
74
self.assertFalse(b.is_locked())
75
self.assertFalse(b.repository.is_locked())
78
self.assertTrue(b.is_locked())
79
self.assertTrue(b.repository.is_locked())
82
self.assertFalse(b.is_locked())
83
self.assertFalse(b.repository.is_locked())
85
self.assertEqual([('b', 'lw', True),
95
def test_03_lock_fail_unlock_repo(self):
96
# Make sure branch.unlock() is called, even if there is a
97
# failure while unlocking the repository.
98
b = self.get_instrumented_branch()
99
b.repository.disable_unlock()
101
self.assertFalse(b.is_locked())
102
self.assertFalse(b.repository.is_locked())
105
self.assertTrue(b.is_locked())
106
self.assertTrue(b.repository.is_locked())
107
self.assertRaises(TestPreventLocking, b.unlock)
108
if self.combined_control:
109
self.assertTrue(b.is_locked())
111
self.assertFalse(b.is_locked())
112
self.assertTrue(b.repository.is_locked())
114
# We unlock the branch control files, even if
115
# we fail to unlock the repository
116
self.assertEqual([('b', 'lw', True),
126
# For cleanup purposes, make sure we are unlocked
127
b.repository._other.unlock()
129
def test_04_lock_fail_unlock_control(self):
130
# Make sure repository.unlock() is called, if we fail to unlock self
131
b = self.get_instrumented_branch()
132
b.control_files.disable_unlock()
134
self.assertFalse(b.is_locked())
135
self.assertFalse(b.repository.is_locked())
138
self.assertTrue(b.is_locked())
139
self.assertTrue(b.repository.is_locked())
140
self.assertRaises(TestPreventLocking, b.unlock)
141
self.assertTrue(b.is_locked())
142
if self.combined_control:
143
self.assertTrue(b.repository.is_locked())
145
self.assertFalse(b.repository.is_locked())
147
# We unlock the repository even if
148
# we fail to unlock the control files
149
self.assertEqual([('b', 'lw', True),
160
# For cleanup purposes, make sure we are unlocked
161
b.control_files._other.unlock()
163
def test_05_lock_read_fail_repo(self):
164
# Test that the branch is not locked if it cannot lock the repository
165
b = self.get_instrumented_branch()
166
b.repository.disable_lock_read()
168
self.assertRaises(TestPreventLocking, b.lock_read)
169
self.assertFalse(b.is_locked())
170
self.assertFalse(b.repository.is_locked())
172
self.assertEqual([('b', 'lr', True),
176
def test_06_lock_write_fail_repo(self):
177
# Test that the branch is not locked if it cannot lock the repository
178
b = self.get_instrumented_branch()
179
b.repository.disable_lock_write()
181
self.assertRaises(TestPreventLocking, b.lock_write)
182
self.assertFalse(b.is_locked())
183
self.assertFalse(b.repository.is_locked())
185
self.assertEqual([('b', 'lw', True),
189
def test_07_lock_read_fail_control(self):
190
# Test the repository is unlocked if we can't lock self
191
b = self.get_instrumented_branch()
192
b.control_files.disable_lock_read()
194
self.assertRaises(TestPreventLocking, b.lock_read)
195
self.assertFalse(b.is_locked())
196
self.assertFalse(b.repository.is_locked())
198
self.assertEqual([('b', 'lr', True),
206
def test_08_lock_write_fail_control(self):
207
# Test the repository is unlocked if we can't lock self
208
b = self.get_instrumented_branch()
209
b.control_files.disable_lock_write()
211
self.assertRaises(TestPreventLocking, b.lock_write)
212
self.assertFalse(b.is_locked())
213
self.assertFalse(b.repository.is_locked())
215
self.assertEqual([('b', 'lw', True),