~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_lock/test_lock.py

  • Committer: Alexander Belchenko
  • Date: 2006-07-30 16:43:12 UTC
  • mto: (1711.2.111 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1906.
  • Revision ID: bialix@ukr.net-20060730164312-b025fd3ff0cee59e
rename  gpl.txt => COPYING.txt

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for OS level locks."""
18
 
 
19
 
from bzrlib import (
20
 
    errors,
21
 
    osutils,
22
 
    )
23
 
 
24
 
from bzrlib.tests import UnicodeFilenameFeature
25
 
from bzrlib.tests.per_lock import TestCaseWithLock
26
 
 
27
 
 
28
 
class TestLock(TestCaseWithLock):
29
 
 
30
 
    def setUp(self):
31
 
        super(TestLock, self).setUp()
32
 
        self.build_tree(['a-file'])
33
 
 
34
 
    def test_read_lock(self):
35
 
        """Smoke test for read locks."""
36
 
        a_lock = self.read_lock('a-file')
37
 
        self.addCleanup(a_lock.unlock)
38
 
        # The lock file should be opened for reading
39
 
        txt = a_lock.f.read()
40
 
        self.assertEqual('contents of a-file\n', txt)
41
 
 
42
 
    def test_create_if_needed_read(self):
43
 
        """We will create the file if it doesn't exist yet."""
44
 
        a_lock = self.read_lock('other-file')
45
 
        self.addCleanup(a_lock.unlock)
46
 
        txt = a_lock.f.read()
47
 
        self.assertEqual('', txt)
48
 
 
49
 
    def test_create_if_needed_write(self):
50
 
        """We will create the file if it doesn't exist yet."""
51
 
        a_lock = self.write_lock('other-file')
52
 
        self.addCleanup(a_lock.unlock)
53
 
        txt = a_lock.f.read()
54
 
        self.assertEqual('', txt)
55
 
        a_lock.f.seek(0)
56
 
        a_lock.f.write('foo\n')
57
 
        a_lock.f.seek(0)
58
 
        txt = a_lock.f.read()
59
 
        self.assertEqual('foo\n', txt)
60
 
 
61
 
    def test_readonly_file(self):
62
 
        """If the file is readonly, we can take a read lock.
63
 
 
64
 
        But we shouldn't be able to take a write lock.
65
 
        """
66
 
        osutils.make_readonly('a-file')
67
 
        # Make sure the file is read-only (on all platforms)
68
 
        self.assertRaises(IOError, open, 'a-file', 'rb+')
69
 
        a_lock = self.read_lock('a-file')
70
 
        a_lock.unlock()
71
 
 
72
 
        self.assertRaises(errors.LockFailed, self.write_lock, 'a-file')
73
 
 
74
 
    def test_write_lock(self):
75
 
        """Smoke test for write locks."""
76
 
        a_lock = self.write_lock('a-file')
77
 
        self.addCleanup(a_lock.unlock)
78
 
        # You should be able to read and write to the lock file.
79
 
        txt = a_lock.f.read()
80
 
        self.assertEqual('contents of a-file\n', txt)
81
 
        # Win32 requires that you call seek() when switching between a read
82
 
        # operation and a write operation.
83
 
        a_lock.f.seek(0, 2)
84
 
        a_lock.f.write('more content\n')
85
 
        a_lock.f.seek(0)
86
 
        txt = a_lock.f.read()
87
 
        self.assertEqual('contents of a-file\nmore content\n', txt)
88
 
 
89
 
    def test_multiple_read_locks(self):
90
 
        """You can take out more than one read lock on the same file."""
91
 
        a_lock = self.read_lock('a-file')
92
 
        self.addCleanup(a_lock.unlock)
93
 
        b_lock = self.read_lock('a-file')
94
 
        self.addCleanup(b_lock.unlock)
95
 
 
96
 
    def test_multiple_write_locks_exclude(self):
97
 
        """Taking out more than one write lock should fail."""
98
 
        a_lock = self.write_lock('a-file')
99
 
        self.addCleanup(a_lock.unlock)
100
 
        # Taking out a lock on a locked file should raise LockContention
101
 
        self.assertRaises(errors.LockContention, self.write_lock, 'a-file')
102
 
 
103
 
    def _disabled_test_read_then_write_excludes(self):
104
 
        """If a file is read-locked, taking out a write lock should fail."""
105
 
        a_lock = self.read_lock('a-file')
106
 
        self.addCleanup(a_lock.unlock)
107
 
        # Taking out a lock on a locked file should raise LockContention
108
 
        self.assertRaises(errors.LockContention, self.write_lock, 'a-file')
109
 
 
110
 
    def test_read_unlock_write(self):
111
 
        """Make sure that unlocking allows us to lock write"""
112
 
        a_lock = self.read_lock('a-file')
113
 
        a_lock.unlock()
114
 
        a_lock = self.write_lock('a-file')
115
 
        a_lock.unlock()
116
 
 
117
 
    # TODO: jam 20070319 fcntl read locks are not currently fully
118
 
    #       mutually exclusive with write locks. This will be fixed
119
 
    #       in the next release.
120
 
    def _disabled_test_write_then_read_excludes(self):
121
 
        """If a file is write-locked, taking out a read lock should fail.
122
 
 
123
 
        The file is exclusively owned by the write lock, so we shouldn't be
124
 
        able to take out a shared read lock.
125
 
        """
126
 
        a_lock = self.write_lock('a-file')
127
 
        self.addCleanup(a_lock.unlock)
128
 
        # Taking out a lock on a locked file should raise LockContention
129
 
        self.assertRaises(errors.LockContention, self.read_lock, 'a-file')
130
 
 
131
 
    # TODO: jam 20070319 fcntl write locks are not currently fully
132
 
    #       mutually exclusive with read locks. This will be fixed
133
 
    #       in the next release.
134
 
    def _disabled_test_write_unlock_read(self):
135
 
        """If we have removed the write lock, we can grab a read lock."""
136
 
        a_lock = self.write_lock('a-file')
137
 
        a_lock.unlock()
138
 
        a_lock = self.read_lock('a-file')
139
 
        a_lock.unlock()
140
 
 
141
 
    def _disabled_test_multiple_read_unlock_write(self):
142
 
        """We can only grab a write lock if all read locks are done."""
143
 
        a_lock = b_lock = c_lock = None
144
 
        try:
145
 
            a_lock = self.read_lock('a-file')
146
 
            b_lock = self.read_lock('a-file')
147
 
            self.assertRaises(errors.LockContention, self.write_lock, 'a-file')
148
 
            a_lock.unlock()
149
 
            a_lock = None
150
 
            self.assertRaises(errors.LockContention, self.write_lock, 'a-file')
151
 
            b_lock.unlock()
152
 
            b_lock = None
153
 
            c_lock = self.write_lock('a-file')
154
 
            c_lock.unlock()
155
 
            c_lock = None
156
 
        finally:
157
 
            # Cleanup as needed
158
 
            if a_lock is not None:
159
 
                a_lock.unlock()
160
 
            if b_lock is not None:
161
 
                b_lock.unlock()
162
 
            if c_lock is not None:
163
 
                c_lock.unlock()
164
 
 
165
 
 
166
 
class TestLockUnicodePath(TestCaseWithLock):
167
 
 
168
 
    _test_needs_features = [UnicodeFilenameFeature]
169
 
 
170
 
    def test_read_lock(self):
171
 
        self.build_tree([u'\u1234'])
172
 
        u_lock = self.read_lock(u'\u1234')
173
 
        self.addCleanup(u_lock.unlock)
174
 
 
175
 
    def test_write_lock(self):
176
 
        self.build_tree([u'\u1234'])
177
 
        u_lock = self.write_lock(u'\u1234')
178
 
        self.addCleanup(u_lock.unlock)