~bzr-pqm/bzr/bzr.dev

1711.8.5 by John Arbash Meinel
Move the new locking tests into their own files, and move the helper functions into a test helper.
1
# Copyright (C) 2006 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""Test locks across all branch implemenations"""
18
1711.8.7 by John Arbash Meinel
Renaming LockHelpers.py to lock_helpers.py
19
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
1711.8.5 by John Arbash Meinel
Move the new locking tests into their own files, and move the helper functions into a test helper.
20
from bzrlib.tests.workingtree_implementations import TestCaseWithWorkingTree
21
22
23
class TestWorkingTreeLocking(TestCaseWithWorkingTree):
24
25
    def get_instrumented_tree(self):
26
        """Get a WorkingTree object which has been instrumented"""
27
        # TODO: jam 20060630 It may be that not all formats have a 
28
        # '_control_files' member. So we should fail gracefully if
29
        # not there. But assuming it has them lets us test the exact 
30
        # lock/unlock order.
31
        self.locks = []
32
        wt = self.make_branch_and_tree('.')
33
        wt = LockWrapper(self.locks, wt, 't')
34
        wt._branch = LockWrapper(self.locks, wt._branch, 'b')
35
36
        tcf = wt._control_files
37
        bcf = wt.branch.control_files
38
39
        # Look out for AllInOne
40
        self.combined_control = tcf is bcf
41
42
        wt._control_files = LockWrapper(self.locks, wt._control_files, 'tc')
43
        wt.branch.control_files = \
44
            LockWrapper(self.locks, wt.branch.control_files, 'bc')
45
        return wt
46
47
    def test_01_lock_read(self):
48
        # Test that locking occurs in the correct order
49
        wt = self.get_instrumented_tree()
50
51
        self.assertFalse(wt.is_locked())
52
        self.assertFalse(wt.branch.is_locked())
53
        wt.lock_read()
54
        try:
55
            self.assertTrue(wt.is_locked())
56
            self.assertTrue(wt.branch.is_locked())
57
        finally:
58
            wt.unlock()
59
        self.assertFalse(wt.is_locked())
60
        self.assertFalse(wt.branch.is_locked())
61
62
        self.assertEqual([('t', 'lr', True),
63
                          ('b', 'lr', True),
64
                          ('bc', 'lr', True),
65
                          ('tc', 'lr', True),
66
                          ('t', 'ul', True),
67
                          ('tc', 'ul', True),
68
                          ('b', 'ul', True),
69
                          ('bc', 'ul', True),
70
                         ], self.locks)
71
72
    def test_02_lock_write(self):
73
        # Test that locking occurs in the correct order
74
        wt = self.get_instrumented_tree()
75
76
        self.assertFalse(wt.is_locked())
77
        self.assertFalse(wt.branch.is_locked())
78
        wt.lock_write()
79
        try:
80
            self.assertTrue(wt.is_locked())
81
            self.assertTrue(wt.branch.is_locked())
82
        finally:
83
            wt.unlock()
84
        self.assertFalse(wt.is_locked())
85
        self.assertFalse(wt.branch.is_locked())
86
87
        self.assertEqual([('t', 'lw', True),
88
                          ('b', 'lw', True),
89
                          ('bc', 'lw', True),
90
                          ('tc', 'lw', True),
91
                          ('t', 'ul', True),
92
                          ('tc', 'ul', True),
93
                          ('b', 'ul', True),
94
                          ('bc', 'ul', True),
95
                         ], self.locks)
96
97
    def test_03_lock_fail_unlock_branch(self):
98
        # Make sure tree.unlock() is called, even if there is a
99
        # failure while unlocking the branch.
100
        wt = self.get_instrumented_tree()
101
        wt.branch.disable_unlock()
102
103
        self.assertFalse(wt.is_locked())
104
        self.assertFalse(wt.branch.is_locked())
105
        wt.lock_write()
106
        try:
107
            self.assertTrue(wt.is_locked())
108
            self.assertTrue(wt.branch.is_locked())
109
            self.assertRaises(TestPreventLocking, wt.unlock)
110
            if self.combined_control:
111
                self.assertTrue(wt.is_locked())
112
            else:
113
                self.assertFalse(wt.is_locked())
114
            self.assertTrue(wt.branch.is_locked())
115
116
            self.assertEqual([('t', 'lw', True),
117
                              ('b', 'lw', True),
118
                              ('bc', 'lw', True),
119
                              ('tc', 'lw', True),
120
                              ('t', 'ul', True),
121
                              ('tc', 'ul', True),
122
                              ('b', 'ul', False), 
123
                             ], self.locks)
124
125
        finally:
126
            # For cleanup purposes, make sure we are unlocked
127
            wt.branch._other.unlock()
128
129
    def test_04_lock_fail_unlock_control(self):
130
        # Make sure that branch is unlocked, even if we fail to unlock self
131
        wt = self.get_instrumented_tree()
132
        wt._control_files.disable_unlock()
133
134
        self.assertFalse(wt.is_locked())
135
        self.assertFalse(wt.branch.is_locked())
136
        wt.lock_write()
137
        try:
138
            self.assertTrue(wt.is_locked())
139
            self.assertTrue(wt.branch.is_locked())
140
            self.assertRaises(TestPreventLocking, wt.unlock)
141
            self.assertTrue(wt.is_locked())
142
            if self.combined_control:
143
                self.assertTrue(wt.branch.is_locked())
144
            else:
145
                self.assertFalse(wt.branch.is_locked())
146
147
            self.assertEqual([('t', 'lw', True),
148
                              ('b', 'lw', True),
149
                              ('bc', 'lw', True),
150
                              ('tc', 'lw', True),
151
                              ('t', 'ul', True),
152
                              ('tc', 'ul', False),
153
                              ('b', 'ul', True), 
154
                              ('bc', 'ul', True), 
155
                             ], self.locks)
156
157
        finally:
158
            # For cleanup purposes, make sure we are unlocked
159
            wt._control_files._other.unlock()
160
161
    def test_05_lock_read_fail_branch(self):
162
        # Test that the tree is not locked if it cannot lock the branch
163
        wt = self.get_instrumented_tree()
164
        wt.branch.disable_lock_read()
165
166
        self.assertRaises(TestPreventLocking, wt.lock_read)
167
        self.assertFalse(wt.is_locked())
168
        self.assertFalse(wt.branch.is_locked())
169
170
        self.assertEqual([('t', 'lr', True),
171
                          ('b', 'lr', False), 
172
                         ], self.locks)
173
174
    def test_06_lock_write_fail_branch(self):
175
        # Test that the tree is not locked if it cannot lock the branch
176
        wt = self.get_instrumented_tree()
177
        wt.branch.disable_lock_write()
178
179
        self.assertRaises(TestPreventLocking, wt.lock_write)
180
        self.assertFalse(wt.is_locked())
181
        self.assertFalse(wt.branch.is_locked())
182
183
        self.assertEqual([('t', 'lw', True),
184
                          ('b', 'lw', False), 
185
                         ], self.locks)
186
187
    def test_07_lock_read_fail_control(self):
188
        # We should unlock the branch if we can't lock the tree
189
        wt = self.get_instrumented_tree()
190
        wt._control_files.disable_lock_read()
191
192
        self.assertRaises(TestPreventLocking, wt.lock_read)
193
        self.assertFalse(wt.is_locked())
194
        self.assertFalse(wt.branch.is_locked())
195
196
        self.assertEqual([('t', 'lr', True),
197
                          ('b', 'lr', True),
198
                          ('bc', 'lr', True),
199
                          ('tc', 'lr', False),
200
                          ('b', 'ul', True),
201
                          ('bc', 'ul', True)
202
                         ], self.locks)
203
204
    def test_08_lock_write_fail_control(self):
205
        # We shouldn't try to lock the repo if we can't lock the branch
206
        wt = self.get_instrumented_tree()
207
        wt._control_files.disable_lock_write()
208
209
        self.assertRaises(TestPreventLocking, wt.lock_write)
210
        self.assertFalse(wt.is_locked())
211
        self.assertFalse(wt.branch.is_locked())
212
213
        self.assertEqual([('t', 'lw', True),
214
                          ('b', 'lw', True),
215
                          ('bc', 'lw', True),
216
                          ('tc', 'lw', False),
217
                          ('b', 'ul', True),
218
                          ('bc', 'ul', True)
219
                         ], self.locks)