~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-03-28 06:58:22 UTC
  • mfrom: (2379.2.3 hpss-chroot)
  • Revision ID: pqm@pqm.ubuntu.com-20070328065822-999550a858a3ced3
(robertc) Fix chroot urls to not expose the url of the transport they are protecting, allowing regular url operations to work on them. (Robert Collins, Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
16
18
 
17
19
import bzrlib
18
 
from bzrlib import (
19
 
    errors,
20
 
    lockdir,
21
 
    osutils,
22
 
    transport,
23
 
    )
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
24
23
from bzrlib.lockable_files import LockableFiles, TransportLock
25
 
from bzrlib.tests import (
26
 
    TestCaseInTempDir,
27
 
    TestNotApplicable,
28
 
    )
29
 
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
24
from bzrlib.lockdir import LockDir
 
25
from bzrlib.tests import TestCaseInTempDir
30
26
from bzrlib.tests.test_transactions import DummyWeave
31
27
from bzrlib.transactions import (PassThroughTransaction,
32
28
                                 ReadOnlyTransaction,
33
29
                                 WriteTransaction,
34
30
                                 )
 
31
from bzrlib.transport import get_transport
35
32
 
36
33
 
37
34
# these tests are applied in each parameterized suite for LockableFiles
38
 
#
39
 
# they use an old style of parameterization, but we want to remove this class
40
 
# so won't modernize them now. - mbp 20080430
41
35
class _TestLockableFiles_mixin(object):
42
36
 
 
37
    def test_read_write(self):
 
38
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
39
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
40
        self.lockable.lock_write()
 
41
        try:
 
42
            unicode_string = u'bar\u1234'
 
43
            self.assertEqual(4, len(unicode_string))
 
44
            byte_string = unicode_string.encode('utf-8')
 
45
            self.assertEqual(6, len(byte_string))
 
46
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
47
                              StringIO(unicode_string))
 
48
            self.lockable.put('foo', StringIO(byte_string))
 
49
            self.assertEqual(byte_string,
 
50
                             self.lockable.get('foo').read())
 
51
            self.assertEqual(unicode_string,
 
52
                             self.lockable.get_utf8('foo').read())
 
53
            self.assertRaises(BzrBadParameterNotString,
 
54
                              self.lockable.put_utf8,
 
55
                              'bar',
 
56
                              StringIO(unicode_string)
 
57
                              )
 
58
            self.lockable.put_utf8('bar', unicode_string)
 
59
            self.assertEqual(unicode_string,
 
60
                             self.lockable.get_utf8('bar').read())
 
61
            self.assertEqual(byte_string,
 
62
                             self.lockable.get('bar').read())
 
63
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
64
            self.assertEqual('raw\xffbytes',
 
65
                             self.lockable.get('raw').read())
 
66
        finally:
 
67
            self.lockable.unlock()
 
68
 
 
69
    def test_locks(self):
 
70
        self.lockable.lock_read()
 
71
        try:
 
72
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
73
                              StringIO('bar\u1234'))
 
74
        finally:
 
75
            self.lockable.unlock()
 
76
 
43
77
    def test_transactions(self):
44
78
        self.assertIs(self.lockable.get_transaction().__class__,
45
79
                      PassThroughTransaction)
62
96
 
63
97
    def test__escape(self):
64
98
        self.assertEqual('%25', self.lockable._escape('%'))
65
 
 
 
99
        
66
100
    def test__escape_empty(self):
67
101
        self.assertEqual('', self.lockable._escape(''))
68
102
 
74
108
        except NotImplementedError:
75
109
            # this lock cannot be broken
76
110
            self.lockable.unlock()
77
 
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
111
            return
78
112
        l2 = self.get_lockable()
79
113
        orig_factory = bzrlib.ui.ui_factory
80
114
        # silent ui - no need for stdout
81
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
115
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
116
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
82
117
        try:
83
118
            l2.break_lock()
84
119
        finally:
90
125
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
91
126
            self.assertFalse(self.lockable.is_locked())
92
127
 
93
 
    def test_lock_write_returns_None_refuses_token(self):
94
 
        token = self.lockable.lock_write()
95
 
        self.addCleanup(self.lockable.unlock)
96
 
        if token is not None:
97
 
            # This test does not apply, because this lockable supports
98
 
            # tokens.
99
 
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
100
 
        self.assertRaises(errors.TokenLockingNotSupported,
101
 
                          self.lockable.lock_write, token='token')
102
 
 
103
 
    def test_lock_write_returns_token_when_given_token(self):
104
 
        token = self.lockable.lock_write()
105
 
        self.addCleanup(self.lockable.unlock)
106
 
        if token is None:
107
 
            # This test does not apply, because this lockable refuses
108
 
            # tokens.
109
 
            return
110
 
        new_lockable = self.get_lockable()
111
 
        token_from_new_lockable = new_lockable.lock_write(token=token)
112
 
        self.addCleanup(new_lockable.unlock)
113
 
        self.assertEqual(token, token_from_new_lockable)
114
 
 
115
 
    def test_lock_write_raises_on_token_mismatch(self):
116
 
        token = self.lockable.lock_write()
117
 
        self.addCleanup(self.lockable.unlock)
118
 
        if token is None:
119
 
            # This test does not apply, because this lockable refuses
120
 
            # tokens.
121
 
            return
122
 
        different_token = token + 'xxx'
123
 
        # Re-using the same lockable instance with a different token will
124
 
        # raise TokenMismatch.
125
 
        self.assertRaises(errors.TokenMismatch,
126
 
                          self.lockable.lock_write, token=different_token)
127
 
        # A separate instance for the same lockable will also raise
128
 
        # TokenMismatch.
129
 
        # This detects the case where a caller claims to have a lock (via
130
 
        # the token) for an external resource, but doesn't (the token is
131
 
        # different).  Clients need a separate lock object to make sure the
132
 
        # external resource is probed, whereas the existing lock object
133
 
        # might cache.
134
 
        new_lockable = self.get_lockable()
135
 
        self.assertRaises(errors.TokenMismatch,
136
 
                          new_lockable.lock_write, token=different_token)
137
 
 
138
 
    def test_lock_write_with_matching_token(self):
139
 
        # If the token matches, so no exception is raised by lock_write.
140
 
        token = self.lockable.lock_write()
141
 
        self.addCleanup(self.lockable.unlock)
142
 
        if token is None:
143
 
            # This test does not apply, because this lockable refuses
144
 
            # tokens.
145
 
            return
146
 
        # The same instance will accept a second lock_write if the specified
147
 
        # token matches.
148
 
        self.lockable.lock_write(token=token)
149
 
        self.lockable.unlock()
150
 
        # Calling lock_write on a new instance for the same lockable will
151
 
        # also succeed.
152
 
        new_lockable = self.get_lockable()
153
 
        new_lockable.lock_write(token=token)
154
 
        new_lockable.unlock()
155
 
 
156
 
    def test_unlock_after_lock_write_with_token(self):
157
 
        # If lock_write did not physically acquire the lock (because it was
158
 
        # passed a token), then unlock should not physically release it.
159
 
        token = self.lockable.lock_write()
160
 
        self.addCleanup(self.lockable.unlock)
161
 
        if token is None:
162
 
            # This test does not apply, because this lockable refuses
163
 
            # tokens.
164
 
            return
165
 
        new_lockable = self.get_lockable()
166
 
        new_lockable.lock_write(token=token)
167
 
        new_lockable.unlock()
168
 
        self.assertTrue(self.lockable.get_physical_lock_status())
169
 
 
170
 
    def test_lock_write_with_token_fails_when_unlocked(self):
171
 
        # Lock and unlock to get a superficially valid token.  This mimics a
172
 
        # likely programming error, where a caller accidentally tries to lock
173
 
        # with a token that is no longer valid (because the original lock was
174
 
        # released).
175
 
        token = self.lockable.lock_write()
176
 
        self.lockable.unlock()
177
 
        if token is None:
178
 
            # This test does not apply, because this lockable refuses
179
 
            # tokens.
180
 
            return
181
 
 
182
 
        self.assertRaises(errors.TokenMismatch,
183
 
                          self.lockable.lock_write, token=token)
184
 
 
185
 
    def test_lock_write_reenter_with_token(self):
186
 
        token = self.lockable.lock_write()
187
 
        try:
188
 
            if token is None:
189
 
                # This test does not apply, because this lockable refuses
190
 
                # tokens.
191
 
                return
192
 
            # Relock with a token.
193
 
            token_from_reentry = self.lockable.lock_write(token=token)
194
 
            try:
195
 
                self.assertEqual(token, token_from_reentry)
196
 
            finally:
197
 
                self.lockable.unlock()
198
 
        finally:
199
 
            self.lockable.unlock()
200
 
        # The lock should be unlocked on disk.  Verify that with a new lock
201
 
        # instance.
202
 
        new_lockable = self.get_lockable()
203
 
        # Calling lock_write now should work, rather than raise LockContention.
204
 
        new_lockable.lock_write()
205
 
        new_lockable.unlock()
206
 
 
207
 
    def test_second_lock_write_returns_same_token(self):
208
 
        first_token = self.lockable.lock_write()
209
 
        try:
210
 
            if first_token is None:
211
 
                # This test does not apply, because this lockable refuses
212
 
                # tokens.
213
 
                return
214
 
            # Relock the already locked lockable.  It should return the same
215
 
            # token.
216
 
            second_token = self.lockable.lock_write()
217
 
            try:
218
 
                self.assertEqual(first_token, second_token)
219
 
            finally:
220
 
                self.lockable.unlock()
221
 
        finally:
222
 
            self.lockable.unlock()
223
 
 
224
 
    def test_leave_in_place(self):
225
 
        token = self.lockable.lock_write()
226
 
        try:
227
 
            if token is None:
228
 
                # This test does not apply, because this lockable refuses
229
 
                # tokens.
230
 
                return
231
 
            self.lockable.leave_in_place()
232
 
        finally:
233
 
            self.lockable.unlock()
234
 
        # At this point, the lock is still in place on disk
235
 
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
236
 
        # But should be relockable with a token.
237
 
        self.lockable.lock_write(token=token)
238
 
        self.lockable.unlock()
239
 
        # Cleanup: we should still be able to get the lock, but we restore the
240
 
        # behavior to clearing the lock when unlocking.
241
 
        self.lockable.lock_write(token=token)
242
 
        self.lockable.dont_leave_in_place()
243
 
        self.lockable.unlock()
244
 
 
245
 
    def test_dont_leave_in_place(self):
246
 
        token = self.lockable.lock_write()
247
 
        try:
248
 
            if token is None:
249
 
                # This test does not apply, because this lockable refuses
250
 
                # tokens.
251
 
                return
252
 
            self.lockable.leave_in_place()
253
 
        finally:
254
 
            self.lockable.unlock()
255
 
        # At this point, the lock is still in place on disk.
256
 
        # Acquire the existing lock with the token, and ask that it is removed
257
 
        # when this object unlocks, and unlock to trigger that removal.
258
 
        new_lockable = self.get_lockable()
259
 
        new_lockable.lock_write(token=token)
260
 
        new_lockable.dont_leave_in_place()
261
 
        new_lockable.unlock()
262
 
        # At this point, the lock is no longer on disk, so we can lock it.
263
 
        third_lockable = self.get_lockable()
264
 
        third_lockable.lock_write()
265
 
        third_lockable.unlock()
266
 
 
267
 
 
268
 
# This method of adapting tests to parameters is different to
269
 
# the TestProviderAdapters used elsewhere, but seems simpler for this
270
 
# case.
 
128
 
 
129
# This method of adapting tests to parameters is different to 
 
130
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
131
# case.  
271
132
class TestLockableFiles_TransportLock(TestCaseInTempDir,
272
133
                                      _TestLockableFiles_mixin):
273
134
 
274
135
    def setUp(self):
275
136
        super(TestLockableFiles_TransportLock, self).setUp()
276
 
        t = transport.get_transport_from_path('.')
277
 
        t.mkdir('.bzr')
278
 
        self.sub_transport = t.clone('.bzr')
 
137
        transport = get_transport('.')
 
138
        transport.mkdir('.bzr')
 
139
        self.sub_transport = transport.clone('.bzr')
279
140
        self.lockable = self.get_lockable()
280
141
        self.lockable.create_lock()
281
142
 
282
 
    def stop_server(self):
283
 
        super(TestLockableFiles_TransportLock, self).stop_server()
 
143
    def tearDown(self):
 
144
        super(TestLockableFiles_TransportLock, self).tearDown()
284
145
        # free the subtransport so that we do not get a 5 second
285
146
        # timeout due to the SFTP connection cache.
286
 
        try:
287
 
            del self.sub_transport
288
 
        except AttributeError:
289
 
            pass
 
147
        del self.sub_transport
290
148
 
291
149
    def get_lockable(self):
292
150
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
293
 
 
 
151
        
294
152
 
295
153
class TestLockableFiles_LockDir(TestCaseInTempDir,
296
 
                                _TestLockableFiles_mixin):
 
154
                              _TestLockableFiles_mixin):
297
155
    """LockableFile tests run with LockDir underneath"""
298
156
 
299
157
    def setUp(self):
300
158
        super(TestLockableFiles_LockDir, self).setUp()
301
 
        self.transport = transport.get_transport_from_path('.')
 
159
        self.transport = get_transport('.')
302
160
        self.lockable = self.get_lockable()
303
 
        # the lock creation here sets mode - test_permissions on branch
304
 
        # tests that implicitly, but it might be a good idea to factor
 
161
        # the lock creation here sets mode - test_permissions on branch 
 
162
        # tests that implicitly, but it might be a good idea to factor 
305
163
        # out the mode checking logic and have it applied to loackable files
306
164
        # directly. RBC 20060418
307
165
        self.lockable.create_lock()
308
166
 
309
167
    def get_lockable(self):
310
 
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
168
        return LockableFiles(self.transport, 'my-lock', LockDir)
311
169
 
312
170
    def test_lock_created(self):
313
171
        self.assertTrue(self.transport.has('my-lock'))
317
175
        self.assertFalse(self.transport.has('my-lock/held/info'))
318
176
        self.assertTrue(self.transport.has('my-lock'))
319
177
 
320
 
    def test__file_modes(self):
321
 
        self.transport.mkdir('readonly')
322
 
        osutils.make_readonly('readonly')
323
 
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
324
 
                                 lockdir.LockDir)
325
 
        # The directory mode should be read-write-execute for the current user
326
 
        self.assertEqual(00700, lockable._dir_mode & 00700)
327
 
        # Files should be read-write for the current user
328
 
        self.assertEqual(00600, lockable._file_mode & 00700)
329
 
 
330
 
 
331
 
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
332
 
                                      _TestLockableFiles_mixin):
333
 
    """LockableFile tests run with RemoteLockDir on a branch."""
334
 
 
335
 
    def setUp(self):
336
 
        super(TestLockableFiles_RemoteLockDir, self).setUp()
337
 
        # can only get a RemoteLockDir with some RemoteObject...
338
 
        # use a branch as thats what we want. These mixin tests test the end
339
 
        # to end behaviour, so stubbing out the backend and simulating would
340
 
        # defeat the purpose. We test the protocol implementation separately
341
 
        # in test_remote and test_smart as usual.
342
 
        b = self.make_branch('foo')
343
 
        self.addCleanup(b.bzrdir.transport.disconnect)
344
 
        self.transport = transport.get_transport_from_path('.')
345
 
        self.lockable = self.get_lockable()
346
 
 
347
 
    def get_lockable(self):
348
 
        # getting a new lockable involves opening a new instance of the branch
349
 
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
350
 
        self.addCleanup(branch.bzrdir.transport.disconnect)
351
 
        return branch.control_files
 
178
 
 
179
    # TODO: Test the lockdir inherits the right file and directory permissions
 
180
    # from the LockableFiles.