1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# Copyright (C) 2005, 2006 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Helper functions/classes for testing locking"""
from bzrlib import errors
from bzrlib.decorators import only_raises
class TestPreventLocking(errors.LockError):
"""A test exception for forcing locking failure: %(message)s"""
class LockWrapper(object):
"""A wrapper which lets us set locking ability.
This also lets us record what objects were locked in what order,
to ensure that locking happens correctly.
"""
def __init__(self, sequence, other, other_id):
"""Wrap a locking policy around a given object.
:param sequence: A list object where we should record actions
:param other: The object to control policy on
:param other_id: Something to identify the object by
"""
self.__dict__['_sequence'] = sequence
self.__dict__['_other'] = other
self.__dict__['_other_id'] = other_id
self.__dict__['_allow_write'] = True
self.__dict__['_allow_read'] = True
self.__dict__['_allow_unlock'] = True
def __eq__(self, other):
# Branch objects look for controlfiles == repo.controlfiles.
if type(other) is LockWrapper:
return self._other == other._other
return False
def __getattr__(self, attr):
return getattr(self._other, attr)
def __setattr__(self, attr, val):
return setattr(self._other, attr, val)
def lock_read(self):
self._sequence.append((self._other_id, 'lr', self._allow_read))
if self._allow_read:
return self._other.lock_read()
raise TestPreventLocking('lock_read disabled')
def lock_write(self, token=None):
self._sequence.append((self._other_id, 'lw', self._allow_write))
if self._allow_write:
return self._other.lock_write()
raise TestPreventLocking('lock_write disabled')
@only_raises(errors.LockNotHeld, errors.LockBroken)
def unlock(self):
self._sequence.append((self._other_id, 'ul', self._allow_unlock))
if self._allow_unlock:
return self._other.unlock()
raise TestPreventLocking('unlock disabled')
def disable_lock_read(self):
"""Make a lock_read call fail"""
self.__dict__['_allow_read'] = False
def disable_unlock(self):
"""Make an unlock call fail"""
self.__dict__['_allow_unlock'] = False
def disable_lock_write(self):
"""Make a lock_write call fail"""
self.__dict__['_allow_write'] = False
|