~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lock.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-11-08 03:00:11 UTC
  • mfrom: (4789.2.1 bzrdirvfswarning)
  • Revision ID: pqm@pqm.ubuntu.com-20091108030011-4ifz210jwrd3ewkv
(robertc) Extend -Dhpssvfs debugging support to BzrDir._ensure_real.
        (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for OS Locks."""
 
18
 
 
19
 
 
20
 
 
21
from bzrlib import (
 
22
    debug,
 
23
    errors,
 
24
    lock,
 
25
    tests,
 
26
    )
 
27
 
 
28
 
 
29
def load_tests(standard_tests, module, loader):
 
30
    """Parameterize tests for all versions of groupcompress."""
 
31
    scenarios = []
 
32
    for name, write_lock, read_lock in lock._lock_classes:
 
33
        scenarios.append((name, {'write_lock': write_lock,
 
34
                                 'read_lock': read_lock}))
 
35
    suite = loader.suiteClass()
 
36
    result = tests.multiply_tests(standard_tests, scenarios, suite)
 
37
    return result
 
38
 
 
39
 
 
40
class TestOSLock(tests.TestCaseInTempDir):
 
41
 
 
42
    # Set by load_tests
 
43
    read_lock = None
 
44
    write_lock = None
 
45
 
 
46
    def setUp(self):
 
47
        super(TestOSLock, self).setUp()
 
48
        self.build_tree(['a-lock-file'])
 
49
 
 
50
    def test_create_read_lock(self):
 
51
        r_lock = self.read_lock('a-lock-file')
 
52
        r_lock.unlock()
 
53
 
 
54
    def test_create_write_lock(self):
 
55
        w_lock = self.write_lock('a-lock-file')
 
56
        w_lock.unlock()
 
57
 
 
58
    def test_read_locks_share(self):
 
59
        r_lock = self.read_lock('a-lock-file')
 
60
        try:
 
61
            lock2 = self.read_lock('a-lock-file')
 
62
            lock2.unlock()
 
63
        finally:
 
64
            r_lock.unlock()
 
65
 
 
66
    def test_write_locks_are_exclusive(self):
 
67
        w_lock = self.write_lock('a-lock-file')
 
68
        try:
 
69
            self.assertRaises(errors.LockContention,
 
70
                              self.write_lock, 'a-lock-file')
 
71
        finally:
 
72
            w_lock.unlock()
 
73
 
 
74
    def test_read_locks_block_write_locks(self):
 
75
        r_lock = self.read_lock('a-lock-file')
 
76
        try:
 
77
            if lock.have_fcntl and self.write_lock is lock._fcntl_WriteLock:
 
78
                # With -Dlock, fcntl locks are properly exclusive
 
79
                debug.debug_flags.add('strict_locks')
 
80
                self.assertRaises(errors.LockContention,
 
81
                                  self.write_lock, 'a-lock-file')
 
82
                # But not without it
 
83
                debug.debug_flags.remove('strict_locks')
 
84
                try:
 
85
                    w_lock = self.write_lock('a-lock-file')
 
86
                except errors.LockContention:
 
87
                    self.fail('Unexpected success. fcntl read locks'
 
88
                              ' do not usually block write locks')
 
89
                else:
 
90
                    w_lock.unlock()
 
91
                    self.knownFailure('fcntl read locks don\'t'
 
92
                                      ' block write locks without -Dlock')
 
93
            else:
 
94
                self.assertRaises(errors.LockContention,
 
95
                                  self.write_lock, 'a-lock-file')
 
96
        finally:
 
97
            r_lock.unlock()
 
98
 
 
99
    def test_write_locks_block_read_lock(self):
 
100
        w_lock = self.write_lock('a-lock-file')
 
101
        try:
 
102
            if lock.have_fcntl and self.read_lock is lock._fcntl_ReadLock:
 
103
                # With -Dlock, fcntl locks are properly exclusive
 
104
                debug.debug_flags.add('strict_locks')
 
105
                self.assertRaises(errors.LockContention,
 
106
                                  self.read_lock, 'a-lock-file')
 
107
                # But not without it
 
108
                debug.debug_flags.remove('strict_locks')
 
109
                try:
 
110
                    r_lock = self.read_lock('a-lock-file')
 
111
                except errors.LockContention:
 
112
                    self.fail('Unexpected success. fcntl write locks'
 
113
                              ' do not usually block read locks')
 
114
                else:
 
115
                    r_lock.unlock()
 
116
                    self.knownFailure('fcntl write locks don\'t'
 
117
                                      ' block read locks without -Dlock')
 
118
            else:
 
119
                self.assertRaises(errors.LockContention,
 
120
                                  self.read_lock, 'a-lock-file')
 
121
        finally:
 
122
            w_lock.unlock()
 
123
 
 
124
 
 
125
    def test_temporary_write_lock(self):
 
126
        r_lock = self.read_lock('a-lock-file')
 
127
        try:
 
128
            status, w_lock = r_lock.temporary_write_lock()
 
129
            self.assertTrue(status)
 
130
            # This should block another write lock
 
131
            try:
 
132
                self.assertRaises(errors.LockContention,
 
133
                                  self.write_lock, 'a-lock-file')
 
134
            finally:
 
135
                r_lock = w_lock.restore_read_lock()
 
136
            # We should be able to take a read lock now
 
137
            r_lock2 = self.read_lock('a-lock-file')
 
138
            r_lock2.unlock()
 
139
        finally:
 
140
            r_lock.unlock()
 
141
 
 
142
    def test_temporary_write_lock_fails(self):
 
143
        r_lock = self.read_lock('a-lock-file')
 
144
        try:
 
145
            r_lock2 = self.read_lock('a-lock-file')
 
146
            try:
 
147
                status, w_lock = r_lock.temporary_write_lock()
 
148
                self.assertFalse(status)
 
149
                # Taking out the lock requires unlocking and locking again, so
 
150
                # we have to replace the original object
 
151
                r_lock = w_lock
 
152
            finally:
 
153
                r_lock2.unlock()
 
154
            # We should be able to take a read lock now
 
155
            r_lock2 = self.read_lock('a-lock-file')
 
156
            r_lock2.unlock()
 
157
        finally:
 
158
            r_lock.unlock()