~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Robert Collins
  • Date: 2007-07-15 15:40:37 UTC
  • mto: (2592.3.33 repository)
  • mto: This revision was merged to the branch mainline in revision 2624.
  • Revision ID: robertc@robertcollins.net-20070715154037-3ar8g89decddc9su
Make GraphIndex accept nodes as key, value, references, so that the method
signature is closer to what a simple key->value index delivers. Also
change the behaviour when the reference list count is zero to accept
key, value as nodes, and emit key, value to make it identical in that case
to a simple key->value index. This may not be a good idea, but for now it
seems ok.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
from StringIO import StringIO
18
18
 
19
19
import bzrlib
20
 
from bzrlib import (
21
 
    errors,
22
 
    lockdir,
23
 
    osutils,
24
 
    )
 
20
import bzrlib.errors as errors
25
21
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
26
22
from bzrlib.lockable_files import LockableFiles, TransportLock
27
 
from bzrlib.symbol_versioning import (
28
 
    deprecated_in,
29
 
    )
30
 
from bzrlib.tests import (
31
 
    TestCaseInTempDir,
32
 
    TestNotApplicable,
33
 
    )
 
23
from bzrlib import lockdir
 
24
from bzrlib.lockdir import LockDir
 
25
from bzrlib.tests import TestCaseInTempDir
34
26
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
35
27
from bzrlib.tests.test_transactions import DummyWeave
36
28
from bzrlib.transactions import (PassThroughTransaction,
41
33
 
42
34
 
43
35
# these tests are applied in each parameterized suite for LockableFiles
44
 
#
45
 
# they use an old style of parameterization, but we want to remove this class
46
 
# so won't modernize them now. - mbp 20080430
47
36
class _TestLockableFiles_mixin(object):
48
37
 
 
38
    def test_read_write(self):
 
39
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
40
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
41
        self.lockable.lock_write()
 
42
        try:
 
43
            unicode_string = u'bar\u1234'
 
44
            self.assertEqual(4, len(unicode_string))
 
45
            byte_string = unicode_string.encode('utf-8')
 
46
            self.assertEqual(6, len(byte_string))
 
47
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
 
48
                              StringIO(unicode_string))
 
49
            self.lockable.put('foo', StringIO(byte_string))
 
50
            self.assertEqual(byte_string,
 
51
                             self.lockable.get('foo').read())
 
52
            self.assertEqual(unicode_string,
 
53
                             self.lockable.get_utf8('foo').read())
 
54
            self.assertRaises(BzrBadParameterNotString,
 
55
                              self.lockable.put_utf8,
 
56
                              'bar',
 
57
                              StringIO(unicode_string)
 
58
                              )
 
59
            self.lockable.put_utf8('bar', unicode_string)
 
60
            self.assertEqual(unicode_string,
 
61
                             self.lockable.get_utf8('bar').read())
 
62
            self.assertEqual(byte_string,
 
63
                             self.lockable.get('bar').read())
 
64
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
65
            self.assertEqual('raw\xffbytes',
 
66
                             self.lockable.get('raw').read())
 
67
        finally:
 
68
            self.lockable.unlock()
 
69
 
 
70
    def test_locks(self):
 
71
        self.lockable.lock_read()
 
72
        try:
 
73
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
74
                              StringIO('bar\u1234'))
 
75
        finally:
 
76
            self.lockable.unlock()
 
77
 
49
78
    def test_transactions(self):
50
79
        self.assertIs(self.lockable.get_transaction().__class__,
51
80
                      PassThroughTransaction)
68
97
 
69
98
    def test__escape(self):
70
99
        self.assertEqual('%25', self.lockable._escape('%'))
71
 
 
 
100
        
72
101
    def test__escape_empty(self):
73
102
        self.assertEqual('', self.lockable._escape(''))
74
103
 
80
109
        except NotImplementedError:
81
110
            # this lock cannot be broken
82
111
            self.lockable.unlock()
83
 
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
112
            return
84
113
        l2 = self.get_lockable()
85
114
        orig_factory = bzrlib.ui.ui_factory
86
115
        # silent ui - no need for stdout
87
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
116
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
117
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
88
118
        try:
89
119
            l2.break_lock()
90
120
        finally:
98
128
 
99
129
    def test_lock_write_returns_None_refuses_token(self):
100
130
        token = self.lockable.lock_write()
101
 
        self.addCleanup(self.lockable.unlock)
102
 
        if token is not None:
103
 
            # This test does not apply, because this lockable supports
104
 
            # tokens.
105
 
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
106
 
        self.assertRaises(errors.TokenLockingNotSupported,
107
 
                          self.lockable.lock_write, token='token')
 
131
        try:
 
132
            if token is not None:
 
133
                # This test does not apply, because this lockable supports
 
134
                # tokens.
 
135
                return
 
136
            self.assertRaises(errors.TokenLockingNotSupported,
 
137
                              self.lockable.lock_write, token='token')
 
138
        finally:
 
139
            self.lockable.unlock()
108
140
 
109
141
    def test_lock_write_returns_token_when_given_token(self):
110
142
        token = self.lockable.lock_write()
111
 
        self.addCleanup(self.lockable.unlock)
112
 
        if token is None:
113
 
            # This test does not apply, because this lockable refuses
114
 
            # tokens.
115
 
            return
116
 
        new_lockable = self.get_lockable()
117
 
        token_from_new_lockable = new_lockable.lock_write(token=token)
118
 
        self.addCleanup(new_lockable.unlock)
119
 
        self.assertEqual(token, token_from_new_lockable)
 
143
        try:
 
144
            if token is None:
 
145
                # This test does not apply, because this lockable refuses
 
146
                # tokens.
 
147
                return
 
148
            new_lockable = self.get_lockable()
 
149
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
150
            try:
 
151
                self.assertEqual(token, token_from_new_lockable)
 
152
            finally:
 
153
                new_lockable.unlock()
 
154
        finally:
 
155
            self.lockable.unlock()
120
156
 
121
157
    def test_lock_write_raises_on_token_mismatch(self):
122
158
        token = self.lockable.lock_write()
123
 
        self.addCleanup(self.lockable.unlock)
124
 
        if token is None:
125
 
            # This test does not apply, because this lockable refuses
126
 
            # tokens.
127
 
            return
128
 
        different_token = token + 'xxx'
129
 
        # Re-using the same lockable instance with a different token will
130
 
        # raise TokenMismatch.
131
 
        self.assertRaises(errors.TokenMismatch,
132
 
                          self.lockable.lock_write, token=different_token)
133
 
        # A separate instance for the same lockable will also raise
134
 
        # TokenMismatch.
135
 
        # This detects the case where a caller claims to have a lock (via
136
 
        # the token) for an external resource, but doesn't (the token is
137
 
        # different).  Clients need a separate lock object to make sure the
138
 
        # external resource is probed, whereas the existing lock object
139
 
        # might cache.
140
 
        new_lockable = self.get_lockable()
141
 
        self.assertRaises(errors.TokenMismatch,
142
 
                          new_lockable.lock_write, token=different_token)
 
159
        try:
 
160
            if token is None:
 
161
                # This test does not apply, because this lockable refuses
 
162
                # tokens.
 
163
                return
 
164
            different_token = token + 'xxx'
 
165
            # Re-using the same lockable instance with a different token will
 
166
            # raise TokenMismatch.
 
167
            self.assertRaises(errors.TokenMismatch,
 
168
                              self.lockable.lock_write, token=different_token)
 
169
            # A seperate instance for the same lockable will also raise
 
170
            # TokenMismatch.
 
171
            # This detects the case where a caller claims to have a lock (via
 
172
            # the token) for an external resource, but doesn't (the token is
 
173
            # different).  Clients need a seperate lock object to make sure the
 
174
            # external resource is probed, whereas the existing lock object
 
175
            # might cache.
 
176
            new_lockable = self.get_lockable()
 
177
            self.assertRaises(errors.TokenMismatch,
 
178
                              new_lockable.lock_write, token=different_token)
 
179
        finally:
 
180
            self.lockable.unlock()
143
181
 
144
182
    def test_lock_write_with_matching_token(self):
145
183
        # If the token matches, so no exception is raised by lock_write.
146
184
        token = self.lockable.lock_write()
147
 
        self.addCleanup(self.lockable.unlock)
148
 
        if token is None:
149
 
            # This test does not apply, because this lockable refuses
150
 
            # tokens.
151
 
            return
152
 
        # The same instance will accept a second lock_write if the specified
153
 
        # token matches.
154
 
        self.lockable.lock_write(token=token)
155
 
        self.lockable.unlock()
156
 
        # Calling lock_write on a new instance for the same lockable will
157
 
        # also succeed.
158
 
        new_lockable = self.get_lockable()
159
 
        new_lockable.lock_write(token=token)
160
 
        new_lockable.unlock()
 
185
        try:
 
186
            if token is None:
 
187
                # This test does not apply, because this lockable refuses
 
188
                # tokens.
 
189
                return
 
190
            # The same instance will accept a second lock_write if the specified
 
191
            # token matches.
 
192
            self.lockable.lock_write(token=token)
 
193
            self.lockable.unlock()
 
194
            # Calling lock_write on a new instance for the same lockable will
 
195
            # also succeed.
 
196
            new_lockable = self.get_lockable()
 
197
            new_lockable.lock_write(token=token)
 
198
            new_lockable.unlock()
 
199
        finally:
 
200
            self.lockable.unlock()
161
201
 
162
202
    def test_unlock_after_lock_write_with_token(self):
163
203
        # If lock_write did not physically acquire the lock (because it was
164
204
        # passed a token), then unlock should not physically release it.
165
205
        token = self.lockable.lock_write()
166
 
        self.addCleanup(self.lockable.unlock)
167
 
        if token is None:
168
 
            # This test does not apply, because this lockable refuses
169
 
            # tokens.
170
 
            return
171
 
        new_lockable = self.get_lockable()
172
 
        new_lockable.lock_write(token=token)
173
 
        new_lockable.unlock()
174
 
        self.assertTrue(self.lockable.get_physical_lock_status())
 
206
        try:
 
207
            if token is None:
 
208
                # This test does not apply, because this lockable refuses
 
209
                # tokens.
 
210
                return
 
211
            new_lockable = self.get_lockable()
 
212
            new_lockable.lock_write(token=token)
 
213
            new_lockable.unlock()
 
214
            self.assertTrue(self.lockable.get_physical_lock_status())
 
215
        finally:
 
216
            self.lockable.unlock()
175
217
 
176
218
    def test_lock_write_with_token_fails_when_unlocked(self):
177
219
        # Lock and unlock to get a superficially valid token.  This mimics a
242
284
        # But should be relockable with a token.
243
285
        self.lockable.lock_write(token=token)
244
286
        self.lockable.unlock()
245
 
        # Cleanup: we should still be able to get the lock, but we restore the
246
 
        # behavior to clearing the lock when unlocking.
247
 
        self.lockable.lock_write(token=token)
248
 
        self.lockable.dont_leave_in_place()
249
 
        self.lockable.unlock()
250
287
 
251
288
    def test_dont_leave_in_place(self):
252
289
        token = self.lockable.lock_write()
271
308
        third_lockable.unlock()
272
309
 
273
310
 
274
 
# This method of adapting tests to parameters is different to
275
 
# the TestProviderAdapters used elsewhere, but seems simpler for this
276
 
# case.
 
311
# This method of adapting tests to parameters is different to 
 
312
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
313
# case.  
277
314
class TestLockableFiles_TransportLock(TestCaseInTempDir,
278
315
                                      _TestLockableFiles_mixin):
279
316
 
289
326
        super(TestLockableFiles_TransportLock, self).tearDown()
290
327
        # free the subtransport so that we do not get a 5 second
291
328
        # timeout due to the SFTP connection cache.
292
 
        try:
293
 
            del self.sub_transport
294
 
        except AttributeError:
295
 
            pass
 
329
        del self.sub_transport
296
330
 
297
331
    def get_lockable(self):
298
332
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
299
 
 
 
333
        
300
334
 
301
335
class TestLockableFiles_LockDir(TestCaseInTempDir,
302
336
                              _TestLockableFiles_mixin):
306
340
        TestCaseInTempDir.setUp(self)
307
341
        self.transport = get_transport('.')
308
342
        self.lockable = self.get_lockable()
309
 
        # the lock creation here sets mode - test_permissions on branch
310
 
        # tests that implicitly, but it might be a good idea to factor
 
343
        # the lock creation here sets mode - test_permissions on branch 
 
344
        # tests that implicitly, but it might be a good idea to factor 
311
345
        # out the mode checking logic and have it applied to loackable files
312
346
        # directly. RBC 20060418
313
347
        self.lockable.create_lock()
314
348
 
315
349
    def get_lockable(self):
316
 
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
350
        return LockableFiles(self.transport, 'my-lock', LockDir)
317
351
 
318
352
    def test_lock_created(self):
319
353
        self.assertTrue(self.transport.has('my-lock'))
323
357
        self.assertFalse(self.transport.has('my-lock/held/info'))
324
358
        self.assertTrue(self.transport.has('my-lock'))
325
359
 
326
 
    def test__file_modes(self):
327
 
        self.transport.mkdir('readonly')
328
 
        osutils.make_readonly('readonly')
329
 
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
330
 
                                 lockdir.LockDir)
331
 
        # The directory mode should be read-write-execute for the current user
332
 
        self.assertEqual(00700, lockable._dir_mode & 00700)
333
 
        # Files should be read-write for the current user
334
 
        self.assertEqual(00600, lockable._file_mode & 00700)
335
360
 
 
361
    # TODO: Test the lockdir inherits the right file and directory permissions
 
362
    # from the LockableFiles.
 
363
        
336
364
 
337
365
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
338
366
                              _TestLockableFiles_mixin):