~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: John Arbash Meinel
  • Date: 2007-04-28 15:04:17 UTC
  • mfrom: (2466 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2566.
  • Revision ID: john@arbash-meinel.com-20070428150417-trp3pi0pzd411pu4
[merge] bzr.dev 2466

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
from StringIO import StringIO
18
18
 
19
19
import bzrlib
20
 
from bzrlib.branch import Branch
21
20
import bzrlib.errors as errors
22
21
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
23
22
from bzrlib.lockable_files import LockableFiles, TransportLock
 
23
from bzrlib import lockdir
24
24
from bzrlib.lockdir import LockDir
25
25
from bzrlib.tests import TestCaseInTempDir
 
26
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
26
27
from bzrlib.tests.test_transactions import DummyWeave
27
28
from bzrlib.transactions import (PassThroughTransaction,
28
29
                                 ReadOnlyTransaction,
43
44
            self.assertEqual(4, len(unicode_string))
44
45
            byte_string = unicode_string.encode('utf-8')
45
46
            self.assertEqual(6, len(byte_string))
46
 
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
47
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
47
48
                              StringIO(unicode_string))
48
49
            self.lockable.put('foo', StringIO(byte_string))
49
50
            self.assertEqual(byte_string,
125
126
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
126
127
            self.assertFalse(self.lockable.is_locked())
127
128
 
 
129
    def test_lock_write_returns_None_refuses_token(self):
 
130
        token = self.lockable.lock_write()
 
131
        try:
 
132
            if token is not None:
 
133
                # This test does not apply, because this lockable supports
 
134
                # tokens.
 
135
                return
 
136
            self.assertRaises(errors.TokenLockingNotSupported,
 
137
                              self.lockable.lock_write, token='token')
 
138
        finally:
 
139
            self.lockable.unlock()
 
140
 
 
141
    def test_lock_write_returns_token_when_given_token(self):
 
142
        token = self.lockable.lock_write()
 
143
        try:
 
144
            if token is None:
 
145
                # This test does not apply, because this lockable refuses
 
146
                # tokens.
 
147
                return
 
148
            new_lockable = self.get_lockable()
 
149
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
150
            try:
 
151
                self.assertEqual(token, token_from_new_lockable)
 
152
            finally:
 
153
                new_lockable.unlock()
 
154
        finally:
 
155
            self.lockable.unlock()
 
156
 
 
157
    def test_lock_write_raises_on_token_mismatch(self):
 
158
        token = self.lockable.lock_write()
 
159
        try:
 
160
            if token is None:
 
161
                # This test does not apply, because this lockable refuses
 
162
                # tokens.
 
163
                return
 
164
            different_token = token + 'xxx'
 
165
            # Re-using the same lockable instance with a different token will
 
166
            # raise TokenMismatch.
 
167
            self.assertRaises(errors.TokenMismatch,
 
168
                              self.lockable.lock_write, token=different_token)
 
169
            # A seperate instance for the same lockable will also raise
 
170
            # TokenMismatch.
 
171
            # This detects the case where a caller claims to have a lock (via
 
172
            # the token) for an external resource, but doesn't (the token is
 
173
            # different).  Clients need a seperate lock object to make sure the
 
174
            # external resource is probed, whereas the existing lock object
 
175
            # might cache.
 
176
            new_lockable = self.get_lockable()
 
177
            self.assertRaises(errors.TokenMismatch,
 
178
                              new_lockable.lock_write, token=different_token)
 
179
        finally:
 
180
            self.lockable.unlock()
 
181
 
 
182
    def test_lock_write_with_matching_token(self):
 
183
        # If the token matches, so no exception is raised by lock_write.
 
184
        token = self.lockable.lock_write()
 
185
        try:
 
186
            if token is None:
 
187
                # This test does not apply, because this lockable refuses
 
188
                # tokens.
 
189
                return
 
190
            # The same instance will accept a second lock_write if the specified
 
191
            # token matches.
 
192
            self.lockable.lock_write(token=token)
 
193
            self.lockable.unlock()
 
194
            # Calling lock_write on a new instance for the same lockable will
 
195
            # also succeed.
 
196
            new_lockable = self.get_lockable()
 
197
            new_lockable.lock_write(token=token)
 
198
            new_lockable.unlock()
 
199
        finally:
 
200
            self.lockable.unlock()
 
201
 
 
202
    def test_unlock_after_lock_write_with_token(self):
 
203
        # If lock_write did not physically acquire the lock (because it was
 
204
        # passed a token), then unlock should not physically release it.
 
205
        token = self.lockable.lock_write()
 
206
        try:
 
207
            if token is None:
 
208
                # This test does not apply, because this lockable refuses
 
209
                # tokens.
 
210
                return
 
211
            new_lockable = self.get_lockable()
 
212
            new_lockable.lock_write(token=token)
 
213
            new_lockable.unlock()
 
214
            self.assertTrue(self.lockable.get_physical_lock_status())
 
215
        finally:
 
216
            self.lockable.unlock()
 
217
 
 
218
    def test_lock_write_with_token_fails_when_unlocked(self):
 
219
        # Lock and unlock to get a superficially valid token.  This mimics a
 
220
        # likely programming error, where a caller accidentally tries to lock
 
221
        # with a token that is no longer valid (because the original lock was
 
222
        # released).
 
223
        token = self.lockable.lock_write()
 
224
        self.lockable.unlock()
 
225
        if token is None:
 
226
            # This test does not apply, because this lockable refuses
 
227
            # tokens.
 
228
            return
 
229
 
 
230
        self.assertRaises(errors.TokenMismatch,
 
231
                          self.lockable.lock_write, token=token)
 
232
 
 
233
    def test_lock_write_reenter_with_token(self):
 
234
        token = self.lockable.lock_write()
 
235
        try:
 
236
            if token is None:
 
237
                # This test does not apply, because this lockable refuses
 
238
                # tokens.
 
239
                return
 
240
            # Relock with a token.
 
241
            token_from_reentry = self.lockable.lock_write(token=token)
 
242
            try:
 
243
                self.assertEqual(token, token_from_reentry)
 
244
            finally:
 
245
                self.lockable.unlock()
 
246
        finally:
 
247
            self.lockable.unlock()
 
248
        # The lock should be unlocked on disk.  Verify that with a new lock
 
249
        # instance.
 
250
        new_lockable = self.get_lockable()
 
251
        # Calling lock_write now should work, rather than raise LockContention.
 
252
        new_lockable.lock_write()
 
253
        new_lockable.unlock()
 
254
 
 
255
    def test_second_lock_write_returns_same_token(self):
 
256
        first_token = self.lockable.lock_write()
 
257
        try:
 
258
            if first_token is None:
 
259
                # This test does not apply, because this lockable refuses
 
260
                # tokens.
 
261
                return
 
262
            # Relock the already locked lockable.  It should return the same
 
263
            # token.
 
264
            second_token = self.lockable.lock_write()
 
265
            try:
 
266
                self.assertEqual(first_token, second_token)
 
267
            finally:
 
268
                self.lockable.unlock()
 
269
        finally:
 
270
            self.lockable.unlock()
 
271
 
 
272
    def test_leave_in_place(self):
 
273
        token = self.lockable.lock_write()
 
274
        try:
 
275
            if token is None:
 
276
                # This test does not apply, because this lockable refuses
 
277
                # tokens.
 
278
                return
 
279
            self.lockable.leave_in_place()
 
280
        finally:
 
281
            self.lockable.unlock()
 
282
        # At this point, the lock is still in place on disk
 
283
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
284
        # But should be relockable with a token.
 
285
        self.lockable.lock_write(token=token)
 
286
        self.lockable.unlock()
 
287
 
 
288
    def test_dont_leave_in_place(self):
 
289
        token = self.lockable.lock_write()
 
290
        try:
 
291
            if token is None:
 
292
                # This test does not apply, because this lockable refuses
 
293
                # tokens.
 
294
                return
 
295
            self.lockable.leave_in_place()
 
296
        finally:
 
297
            self.lockable.unlock()
 
298
        # At this point, the lock is still in place on disk.
 
299
        # Acquire the existing lock with the token, and ask that it is removed
 
300
        # when this object unlocks, and unlock to trigger that removal.
 
301
        new_lockable = self.get_lockable()
 
302
        new_lockable.lock_write(token=token)
 
303
        new_lockable.dont_leave_in_place()
 
304
        new_lockable.unlock()
 
305
        # At this point, the lock is no longer on disk, so we can lock it.
 
306
        third_lockable = self.get_lockable()
 
307
        third_lockable.lock_write()
 
308
        third_lockable.unlock()
 
309
 
128
310
 
129
311
# This method of adapting tests to parameters is different to 
130
312
# the TestProviderAdapters used elsewhere, but seems simpler for this 
133
315
                                      _TestLockableFiles_mixin):
134
316
 
135
317
    def setUp(self):
136
 
        super(TestLockableFiles_TransportLock, self).setUp()
 
318
        TestCaseInTempDir.setUp(self)
137
319
        transport = get_transport('.')
138
320
        transport.mkdir('.bzr')
139
321
        self.sub_transport = transport.clone('.bzr')
155
337
    """LockableFile tests run with LockDir underneath"""
156
338
 
157
339
    def setUp(self):
158
 
        super(TestLockableFiles_LockDir, self).setUp()
 
340
        TestCaseInTempDir.setUp(self)
159
341
        self.transport = get_transport('.')
160
342
        self.lockable = self.get_lockable()
161
343
        # the lock creation here sets mode - test_permissions on branch 
178
360
 
179
361
    # TODO: Test the lockdir inherits the right file and directory permissions
180
362
    # from the LockableFiles.
 
363
        
 
364
 
 
365
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
366
                              _TestLockableFiles_mixin):
 
367
    """LockableFile tests run with RemoteLockDir on a branch."""
 
368
 
 
369
    def setUp(self):
 
370
        TestCaseWithSmartMedium.setUp(self)
 
371
        # can only get a RemoteLockDir with some RemoteObject...
 
372
        # use a branch as thats what we want. These mixin tests test the end
 
373
        # to end behaviour, so stubbing out the backend and simulating would
 
374
        # defeat the purpose. We test the protocol implementation separately
 
375
        # in test_remote and test_smart as usual.
 
376
        b = self.make_branch('foo')
 
377
        self.addCleanup(b.bzrdir.transport.disconnect)
 
378
        self.transport = get_transport('.')
 
379
        self.lockable = self.get_lockable()
 
380
 
 
381
    def get_lockable(self):
 
382
        # getting a new lockable involves opening a new instance of the branch
 
383
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
384
        self.addCleanup(branch.bzrdir.transport.disconnect)
 
385
        return branch.control_files