~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Robert Collins
  • Date: 2010-06-25 20:34:05 UTC
  • mto: This revision was merged to the branch mainline in revision 5324.
  • Revision ID: robertc@robertcollins.net-20100625203405-c74lxd3enklhaqf9
``bzrlib.osutils.get_terminal_encoding`` will now only mutter its
selection when explicitly requested; this avoids many duplicate calls
being logged when helpers, wrappers and older code that manually calls
it are executed it is now logged deliberately by the ui setup code.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2008 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
46
46
# so won't modernize them now. - mbp 20080430
47
47
class _TestLockableFiles_mixin(object):
48
48
 
49
 
    def test_read_write(self):
50
 
        self.assertRaises(NoSuchFile,
51
 
            self.applyDeprecated,
52
 
            deprecated_in((1, 5, 0)),
53
 
            self.lockable.get, 'foo')
54
 
        self.assertRaises(NoSuchFile,
55
 
            self.applyDeprecated,
56
 
            deprecated_in((1, 5, 0)),
57
 
            self.lockable.get_utf8, 'foo')
58
 
        self.lockable.lock_write()
59
 
        try:
60
 
            unicode_string = u'bar\u1234'
61
 
            self.assertEqual(4, len(unicode_string))
62
 
            byte_string = unicode_string.encode('utf-8')
63
 
            self.assertEqual(6, len(byte_string))
64
 
            self.assertRaises(UnicodeEncodeError,
65
 
                self.applyDeprecated,
66
 
                deprecated_in((1, 6, 0)),
67
 
                self.lockable.put, 'foo',
68
 
                StringIO(unicode_string))
69
 
            self.applyDeprecated(
70
 
                deprecated_in((1, 6, 0)),
71
 
                self.lockable.put,
72
 
                'foo', StringIO(byte_string))
73
 
            byte_stream = self.applyDeprecated(
74
 
                deprecated_in((1, 5, 0)),
75
 
                self.lockable.get,
76
 
                'foo')
77
 
            self.assertEqual(byte_string, byte_stream.read())
78
 
            unicode_stream = self.applyDeprecated(
79
 
                deprecated_in((1, 5, 0)),
80
 
                self.lockable.get_utf8,
81
 
                'foo')
82
 
            self.assertEqual(unicode_string,
83
 
                unicode_stream.read())
84
 
            self.assertRaises(BzrBadParameterNotString,
85
 
                self.applyDeprecated,
86
 
                deprecated_in((1, 6, 0)),
87
 
                self.lockable.put_utf8,
88
 
                'bar',
89
 
                StringIO(unicode_string))
90
 
            self.applyDeprecated(
91
 
                deprecated_in((1, 6, 0)),
92
 
                self.lockable.put_utf8,
93
 
                'bar',
94
 
                unicode_string)
95
 
            unicode_stream = self.applyDeprecated(
96
 
                deprecated_in((1, 5, 0)),
97
 
                self.lockable.get_utf8,
98
 
                'bar')
99
 
            self.assertEqual(unicode_string,
100
 
                unicode_stream.read())
101
 
            byte_stream = self.applyDeprecated(
102
 
                deprecated_in((1, 5, 0)),
103
 
                self.lockable.get,
104
 
                'bar')
105
 
            self.assertEqual(byte_string, byte_stream.read())
106
 
            self.applyDeprecated(
107
 
                deprecated_in((1, 6, 0)),
108
 
                self.lockable.put_bytes,
109
 
                'raw', 'raw\xffbytes')
110
 
            byte_stream = self.applyDeprecated(
111
 
                deprecated_in((1, 5, 0)),
112
 
                self.lockable.get,
113
 
                'raw')
114
 
            self.assertEqual('raw\xffbytes', byte_stream.read())
115
 
        finally:
116
 
            self.lockable.unlock()
117
 
 
118
 
    def test_locks(self):
119
 
        self.lockable.lock_read()
120
 
        try:
121
 
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
122
 
                              StringIO('bar\u1234'))
123
 
        finally:
124
 
            self.lockable.unlock()
125
 
 
126
49
    def test_transactions(self):
127
50
        self.assertIs(self.lockable.get_transaction().__class__,
128
51
                      PassThroughTransaction)
161
84
        l2 = self.get_lockable()
162
85
        orig_factory = bzrlib.ui.ui_factory
163
86
        # silent ui - no need for stdout
164
 
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
165
 
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
87
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
166
88
        try:
167
89
            l2.break_lock()
168
90
        finally:
176
98
 
177
99
    def test_lock_write_returns_None_refuses_token(self):
178
100
        token = self.lockable.lock_write()
179
 
        try:
180
 
            if token is not None:
181
 
                # This test does not apply, because this lockable supports
182
 
                # tokens.
183
 
                raise TestNotApplicable("%r uses tokens" % (self.lockable,))
184
 
            self.assertRaises(errors.TokenLockingNotSupported,
185
 
                              self.lockable.lock_write, token='token')
186
 
        finally:
187
 
            self.lockable.unlock()
 
101
        self.addCleanup(self.lockable.unlock)
 
102
        if token is not None:
 
103
            # This test does not apply, because this lockable supports
 
104
            # tokens.
 
105
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
 
106
        self.assertRaises(errors.TokenLockingNotSupported,
 
107
                          self.lockable.lock_write, token='token')
188
108
 
189
109
    def test_lock_write_returns_token_when_given_token(self):
190
110
        token = self.lockable.lock_write()
191
 
        try:
192
 
            if token is None:
193
 
                # This test does not apply, because this lockable refuses
194
 
                # tokens.
195
 
                return
196
 
            new_lockable = self.get_lockable()
197
 
            token_from_new_lockable = new_lockable.lock_write(token=token)
198
 
            try:
199
 
                self.assertEqual(token, token_from_new_lockable)
200
 
            finally:
201
 
                new_lockable.unlock()
202
 
        finally:
203
 
            self.lockable.unlock()
 
111
        self.addCleanup(self.lockable.unlock)
 
112
        if token is None:
 
113
            # This test does not apply, because this lockable refuses
 
114
            # tokens.
 
115
            return
 
116
        new_lockable = self.get_lockable()
 
117
        token_from_new_lockable = new_lockable.lock_write(token=token)
 
118
        self.addCleanup(new_lockable.unlock)
 
119
        self.assertEqual(token, token_from_new_lockable)
204
120
 
205
121
    def test_lock_write_raises_on_token_mismatch(self):
206
122
        token = self.lockable.lock_write()
207
 
        try:
208
 
            if token is None:
209
 
                # This test does not apply, because this lockable refuses
210
 
                # tokens.
211
 
                return
212
 
            different_token = token + 'xxx'
213
 
            # Re-using the same lockable instance with a different token will
214
 
            # raise TokenMismatch.
215
 
            self.assertRaises(errors.TokenMismatch,
216
 
                              self.lockable.lock_write, token=different_token)
217
 
            # A seperate instance for the same lockable will also raise
218
 
            # TokenMismatch.
219
 
            # This detects the case where a caller claims to have a lock (via
220
 
            # the token) for an external resource, but doesn't (the token is
221
 
            # different).  Clients need a seperate lock object to make sure the
222
 
            # external resource is probed, whereas the existing lock object
223
 
            # might cache.
224
 
            new_lockable = self.get_lockable()
225
 
            self.assertRaises(errors.TokenMismatch,
226
 
                              new_lockable.lock_write, token=different_token)
227
 
        finally:
228
 
            self.lockable.unlock()
 
123
        self.addCleanup(self.lockable.unlock)
 
124
        if token is None:
 
125
            # This test does not apply, because this lockable refuses
 
126
            # tokens.
 
127
            return
 
128
        different_token = token + 'xxx'
 
129
        # Re-using the same lockable instance with a different token will
 
130
        # raise TokenMismatch.
 
131
        self.assertRaises(errors.TokenMismatch,
 
132
                          self.lockable.lock_write, token=different_token)
 
133
        # A separate instance for the same lockable will also raise
 
134
        # TokenMismatch.
 
135
        # This detects the case where a caller claims to have a lock (via
 
136
        # the token) for an external resource, but doesn't (the token is
 
137
        # different).  Clients need a separate lock object to make sure the
 
138
        # external resource is probed, whereas the existing lock object
 
139
        # might cache.
 
140
        new_lockable = self.get_lockable()
 
141
        self.assertRaises(errors.TokenMismatch,
 
142
                          new_lockable.lock_write, token=different_token)
229
143
 
230
144
    def test_lock_write_with_matching_token(self):
231
145
        # If the token matches, so no exception is raised by lock_write.
232
146
        token = self.lockable.lock_write()
233
 
        try:
234
 
            if token is None:
235
 
                # This test does not apply, because this lockable refuses
236
 
                # tokens.
237
 
                return
238
 
            # The same instance will accept a second lock_write if the specified
239
 
            # token matches.
240
 
            self.lockable.lock_write(token=token)
241
 
            self.lockable.unlock()
242
 
            # Calling lock_write on a new instance for the same lockable will
243
 
            # also succeed.
244
 
            new_lockable = self.get_lockable()
245
 
            new_lockable.lock_write(token=token)
246
 
            new_lockable.unlock()
247
 
        finally:
248
 
            self.lockable.unlock()
 
147
        self.addCleanup(self.lockable.unlock)
 
148
        if token is None:
 
149
            # This test does not apply, because this lockable refuses
 
150
            # tokens.
 
151
            return
 
152
        # The same instance will accept a second lock_write if the specified
 
153
        # token matches.
 
154
        self.lockable.lock_write(token=token)
 
155
        self.lockable.unlock()
 
156
        # Calling lock_write on a new instance for the same lockable will
 
157
        # also succeed.
 
158
        new_lockable = self.get_lockable()
 
159
        new_lockable.lock_write(token=token)
 
160
        new_lockable.unlock()
249
161
 
250
162
    def test_unlock_after_lock_write_with_token(self):
251
163
        # If lock_write did not physically acquire the lock (because it was
252
164
        # passed a token), then unlock should not physically release it.
253
165
        token = self.lockable.lock_write()
254
 
        try:
255
 
            if token is None:
256
 
                # This test does not apply, because this lockable refuses
257
 
                # tokens.
258
 
                return
259
 
            new_lockable = self.get_lockable()
260
 
            new_lockable.lock_write(token=token)
261
 
            new_lockable.unlock()
262
 
            self.assertTrue(self.lockable.get_physical_lock_status())
263
 
        finally:
264
 
            self.lockable.unlock()
 
166
        self.addCleanup(self.lockable.unlock)
 
167
        if token is None:
 
168
            # This test does not apply, because this lockable refuses
 
169
            # tokens.
 
170
            return
 
171
        new_lockable = self.get_lockable()
 
172
        new_lockable.lock_write(token=token)
 
173
        new_lockable.unlock()
 
174
        self.assertTrue(self.lockable.get_physical_lock_status())
265
175
 
266
176
    def test_lock_write_with_token_fails_when_unlocked(self):
267
177
        # Lock and unlock to get a superficially valid token.  This mimics a
332
242
        # But should be relockable with a token.
333
243
        self.lockable.lock_write(token=token)
334
244
        self.lockable.unlock()
 
245
        # Cleanup: we should still be able to get the lock, but we restore the
 
246
        # behavior to clearing the lock when unlocking.
 
247
        self.lockable.lock_write(token=token)
 
248
        self.lockable.dont_leave_in_place()
 
249
        self.lockable.unlock()
335
250
 
336
251
    def test_dont_leave_in_place(self):
337
252
        token = self.lockable.lock_write()
370
285
        self.lockable = self.get_lockable()
371
286
        self.lockable.create_lock()
372
287
 
373
 
    def tearDown(self):
374
 
        super(TestLockableFiles_TransportLock, self).tearDown()
 
288
    def stop_server(self):
 
289
        super(TestLockableFiles_TransportLock, self).stop_server()
375
290
        # free the subtransport so that we do not get a 5 second
376
291
        # timeout due to the SFTP connection cache.
377
292
        try: