~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-07-06 03:15:29 UTC
  • mfrom: (1711.2.78 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20060706031529-e189d8c3f42076be
(jam) allow plugins to include benchmarks

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
from StringIO import StringIO
18
18
 
19
19
import bzrlib
 
20
from bzrlib.branch import Branch
20
21
import bzrlib.errors as errors
21
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
22
23
from bzrlib.lockable_files import LockableFiles, TransportLock
23
 
from bzrlib import lockdir
24
24
from bzrlib.lockdir import LockDir
25
25
from bzrlib.tests import TestCaseInTempDir
26
 
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
27
26
from bzrlib.tests.test_transactions import DummyWeave
28
27
from bzrlib.transactions import (PassThroughTransaction,
29
28
                                 ReadOnlyTransaction,
44
43
            self.assertEqual(4, len(unicode_string))
45
44
            byte_string = unicode_string.encode('utf-8')
46
45
            self.assertEqual(6, len(byte_string))
47
 
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
 
46
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
48
47
                              StringIO(unicode_string))
49
48
            self.lockable.put('foo', StringIO(byte_string))
50
49
            self.assertEqual(byte_string,
57
56
                              StringIO(unicode_string)
58
57
                              )
59
58
            self.lockable.put_utf8('bar', unicode_string)
60
 
            self.assertEqual(unicode_string,
 
59
            self.assertEqual(unicode_string, 
61
60
                             self.lockable.get_utf8('bar').read())
62
 
            self.assertEqual(byte_string,
 
61
            self.assertEqual(byte_string, 
63
62
                             self.lockable.get('bar').read())
64
 
            self.lockable.put_bytes('raw', 'raw\xffbytes')
65
 
            self.assertEqual('raw\xffbytes',
66
 
                             self.lockable.get('raw').read())
67
63
        finally:
68
64
            self.lockable.unlock()
69
65
 
126
122
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
127
123
            self.assertFalse(self.lockable.is_locked())
128
124
 
129
 
    def test_lock_write_returns_None_refuses_token(self):
130
 
        token = self.lockable.lock_write()
131
 
        try:
132
 
            if token is not None:
133
 
                # This test does not apply, because this lockable supports
134
 
                # tokens.
135
 
                return
136
 
            self.assertRaises(errors.TokenLockingNotSupported,
137
 
                              self.lockable.lock_write, token='token')
138
 
        finally:
139
 
            self.lockable.unlock()
140
 
 
141
 
    def test_lock_write_returns_token_when_given_token(self):
142
 
        token = self.lockable.lock_write()
143
 
        try:
144
 
            if token is None:
145
 
                # This test does not apply, because this lockable refuses
146
 
                # tokens.
147
 
                return
148
 
            new_lockable = self.get_lockable()
149
 
            token_from_new_lockable = new_lockable.lock_write(token=token)
150
 
            try:
151
 
                self.assertEqual(token, token_from_new_lockable)
152
 
            finally:
153
 
                new_lockable.unlock()
154
 
        finally:
155
 
            self.lockable.unlock()
156
 
 
157
 
    def test_lock_write_raises_on_token_mismatch(self):
158
 
        token = self.lockable.lock_write()
159
 
        try:
160
 
            if token is None:
161
 
                # This test does not apply, because this lockable refuses
162
 
                # tokens.
163
 
                return
164
 
            different_token = token + 'xxx'
165
 
            # Re-using the same lockable instance with a different token will
166
 
            # raise TokenMismatch.
167
 
            self.assertRaises(errors.TokenMismatch,
168
 
                              self.lockable.lock_write, token=different_token)
169
 
            # A seperate instance for the same lockable will also raise
170
 
            # TokenMismatch.
171
 
            # This detects the case where a caller claims to have a lock (via
172
 
            # the token) for an external resource, but doesn't (the token is
173
 
            # different).  Clients need a seperate lock object to make sure the
174
 
            # external resource is probed, whereas the existing lock object
175
 
            # might cache.
176
 
            new_lockable = self.get_lockable()
177
 
            self.assertRaises(errors.TokenMismatch,
178
 
                              new_lockable.lock_write, token=different_token)
179
 
        finally:
180
 
            self.lockable.unlock()
181
 
 
182
 
    def test_lock_write_with_matching_token(self):
183
 
        # If the token matches, so no exception is raised by lock_write.
184
 
        token = self.lockable.lock_write()
185
 
        try:
186
 
            if token is None:
187
 
                # This test does not apply, because this lockable refuses
188
 
                # tokens.
189
 
                return
190
 
            # The same instance will accept a second lock_write if the specified
191
 
            # token matches.
192
 
            self.lockable.lock_write(token=token)
193
 
            self.lockable.unlock()
194
 
            # Calling lock_write on a new instance for the same lockable will
195
 
            # also succeed.
196
 
            new_lockable = self.get_lockable()
197
 
            new_lockable.lock_write(token=token)
198
 
            new_lockable.unlock()
199
 
        finally:
200
 
            self.lockable.unlock()
201
 
 
202
 
    def test_unlock_after_lock_write_with_token(self):
203
 
        # If lock_write did not physically acquire the lock (because it was
204
 
        # passed a token), then unlock should not physically release it.
205
 
        token = self.lockable.lock_write()
206
 
        try:
207
 
            if token is None:
208
 
                # This test does not apply, because this lockable refuses
209
 
                # tokens.
210
 
                return
211
 
            new_lockable = self.get_lockable()
212
 
            new_lockable.lock_write(token=token)
213
 
            new_lockable.unlock()
214
 
            self.assertTrue(self.lockable.get_physical_lock_status())
215
 
        finally:
216
 
            self.lockable.unlock()
217
 
 
218
 
    def test_lock_write_with_token_fails_when_unlocked(self):
219
 
        # Lock and unlock to get a superficially valid token.  This mimics a
220
 
        # likely programming error, where a caller accidentally tries to lock
221
 
        # with a token that is no longer valid (because the original lock was
222
 
        # released).
223
 
        token = self.lockable.lock_write()
224
 
        self.lockable.unlock()
225
 
        if token is None:
226
 
            # This test does not apply, because this lockable refuses
227
 
            # tokens.
228
 
            return
229
 
 
230
 
        self.assertRaises(errors.TokenMismatch,
231
 
                          self.lockable.lock_write, token=token)
232
 
 
233
 
    def test_lock_write_reenter_with_token(self):
234
 
        token = self.lockable.lock_write()
235
 
        try:
236
 
            if token is None:
237
 
                # This test does not apply, because this lockable refuses
238
 
                # tokens.
239
 
                return
240
 
            # Relock with a token.
241
 
            token_from_reentry = self.lockable.lock_write(token=token)
242
 
            try:
243
 
                self.assertEqual(token, token_from_reentry)
244
 
            finally:
245
 
                self.lockable.unlock()
246
 
        finally:
247
 
            self.lockable.unlock()
248
 
        # The lock should be unlocked on disk.  Verify that with a new lock
249
 
        # instance.
250
 
        new_lockable = self.get_lockable()
251
 
        # Calling lock_write now should work, rather than raise LockContention.
252
 
        new_lockable.lock_write()
253
 
        new_lockable.unlock()
254
 
 
255
 
    def test_second_lock_write_returns_same_token(self):
256
 
        first_token = self.lockable.lock_write()
257
 
        try:
258
 
            if first_token is None:
259
 
                # This test does not apply, because this lockable refuses
260
 
                # tokens.
261
 
                return
262
 
            # Relock the already locked lockable.  It should return the same
263
 
            # token.
264
 
            second_token = self.lockable.lock_write()
265
 
            try:
266
 
                self.assertEqual(first_token, second_token)
267
 
            finally:
268
 
                self.lockable.unlock()
269
 
        finally:
270
 
            self.lockable.unlock()
271
 
 
272
 
    def test_leave_in_place(self):
273
 
        token = self.lockable.lock_write()
274
 
        try:
275
 
            if token is None:
276
 
                # This test does not apply, because this lockable refuses
277
 
                # tokens.
278
 
                return
279
 
            self.lockable.leave_in_place()
280
 
        finally:
281
 
            self.lockable.unlock()
282
 
        # At this point, the lock is still in place on disk
283
 
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
284
 
        # But should be relockable with a token.
285
 
        self.lockable.lock_write(token=token)
286
 
        self.lockable.unlock()
287
 
 
288
 
    def test_dont_leave_in_place(self):
289
 
        token = self.lockable.lock_write()
290
 
        try:
291
 
            if token is None:
292
 
                # This test does not apply, because this lockable refuses
293
 
                # tokens.
294
 
                return
295
 
            self.lockable.leave_in_place()
296
 
        finally:
297
 
            self.lockable.unlock()
298
 
        # At this point, the lock is still in place on disk.
299
 
        # Acquire the existing lock with the token, and ask that it is removed
300
 
        # when this object unlocks, and unlock to trigger that removal.
301
 
        new_lockable = self.get_lockable()
302
 
        new_lockable.lock_write(token=token)
303
 
        new_lockable.dont_leave_in_place()
304
 
        new_lockable.unlock()
305
 
        # At this point, the lock is no longer on disk, so we can lock it.
306
 
        third_lockable = self.get_lockable()
307
 
        third_lockable.lock_write()
308
 
        third_lockable.unlock()
309
 
 
310
125
 
311
126
# This method of adapting tests to parameters is different to 
312
127
# the TestProviderAdapters used elsewhere, but seems simpler for this 
315
130
                                      _TestLockableFiles_mixin):
316
131
 
317
132
    def setUp(self):
318
 
        TestCaseInTempDir.setUp(self)
 
133
        super(TestLockableFiles_TransportLock, self).setUp()
319
134
        transport = get_transport('.')
320
135
        transport.mkdir('.bzr')
321
136
        self.sub_transport = transport.clone('.bzr')
337
152
    """LockableFile tests run with LockDir underneath"""
338
153
 
339
154
    def setUp(self):
340
 
        TestCaseInTempDir.setUp(self)
 
155
        super(TestLockableFiles_LockDir, self).setUp()
341
156
        self.transport = get_transport('.')
342
157
        self.lockable = self.get_lockable()
343
158
        # the lock creation here sets mode - test_permissions on branch 
360
175
 
361
176
    # TODO: Test the lockdir inherits the right file and directory permissions
362
177
    # from the LockableFiles.
363
 
        
364
 
 
365
 
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
366
 
                              _TestLockableFiles_mixin):
367
 
    """LockableFile tests run with RemoteLockDir on a branch."""
368
 
 
369
 
    def setUp(self):
370
 
        TestCaseWithSmartMedium.setUp(self)
371
 
        # can only get a RemoteLockDir with some RemoteObject...
372
 
        # use a branch as thats what we want. These mixin tests test the end
373
 
        # to end behaviour, so stubbing out the backend and simulating would
374
 
        # defeat the purpose. We test the protocol implementation separately
375
 
        # in test_remote and test_smart as usual.
376
 
        b = self.make_branch('foo')
377
 
        self.addCleanup(b.bzrdir.transport.disconnect)
378
 
        self.transport = get_transport('.')
379
 
        self.lockable = self.get_lockable()
380
 
 
381
 
    def get_lockable(self):
382
 
        # getting a new lockable involves opening a new instance of the branch
383
 
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
384
 
        self.addCleanup(branch.bzrdir.transport.disconnect)
385
 
        return branch.control_files