~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Aaron Bentley
  • Date: 2007-06-20 22:06:22 UTC
  • mto: (2520.5.2 bzr.mpbundle)
  • mto: This revision was merged to the branch mainline in revision 2631.
  • Revision ID: abentley@panoramicfeedback.com-20070620220622-9lasxr96rr0e0xvn
Use a fresh versionedfile each time

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for LockDir"""
 
18
 
 
19
from cStringIO import StringIO
 
20
import os
 
21
from threading import Thread, Lock
 
22
import time
 
23
 
 
24
import bzrlib
 
25
from bzrlib import (
 
26
    config,
 
27
    errors,
 
28
    osutils,
 
29
    tests,
 
30
    )
 
31
from bzrlib.errors import (
 
32
        LockBreakMismatch,
 
33
        LockContention, LockError, UnlockableTransport,
 
34
        LockNotHeld, LockBroken
 
35
        )
 
36
from bzrlib.lockdir import LockDir
 
37
from bzrlib.tests import TestCaseWithTransport
 
38
from bzrlib.trace import note
 
39
 
 
40
# These tests sometimes use threads to test the behaviour of lock files with
 
41
# concurrent actors.  This is not a typical (or necessarily supported) use;
 
42
# they're really meant for guarding between processes.
 
43
 
 
44
# These tests are run on the default transport provided by the test framework
 
45
# (typically a local disk transport).  That can be changed by the --transport
 
46
# option to bzr selftest.  The required properties of the transport
 
47
# implementation are tested separately.  (The main requirement is just that
 
48
# they don't allow overwriting nonempty directories.)
 
49
 
 
50
class TestLockDir(TestCaseWithTransport):
 
51
    """Test LockDir operations"""
 
52
 
 
53
    def logging_report_function(self, fmt, *args):
 
54
        self._logged_reports.append((fmt, args))
 
55
 
 
56
    def setup_log_reporter(self, lock_dir):
 
57
        self._logged_reports = []
 
58
        lock_dir._report_function = self.logging_report_function
 
59
 
 
60
    def test_00_lock_creation(self):
 
61
        """Creation of lock file on a transport"""
 
62
        t = self.get_transport()
 
63
        lf = LockDir(t, 'test_lock')
 
64
        self.assertFalse(lf.is_held)
 
65
 
 
66
    def test_01_lock_repr(self):
 
67
        """Lock string representation"""
 
68
        lf = LockDir(self.get_transport(), 'test_lock')
 
69
        r = repr(lf)
 
70
        self.assertContainsRe(r, r'^LockDir\(.*/test_lock\)$')
 
71
 
 
72
    def test_02_unlocked_peek(self):
 
73
        lf = LockDir(self.get_transport(), 'test_lock')
 
74
        self.assertEqual(lf.peek(), None)
 
75
 
 
76
    def get_lock(self):
 
77
        return LockDir(self.get_transport(), 'test_lock')
 
78
 
 
79
    def test_unlock_after_break_raises(self):
 
80
        ld = self.get_lock()
 
81
        ld2 = self.get_lock()
 
82
        ld.create()
 
83
        ld.attempt_lock()
 
84
        ld2.force_break(ld2.peek())
 
85
        self.assertRaises(LockBroken, ld.unlock)
 
86
 
 
87
    def test_03_readonly_peek(self):
 
88
        lf = LockDir(self.get_readonly_transport(), 'test_lock')
 
89
        self.assertEqual(lf.peek(), None)
 
90
 
 
91
    def test_10_lock_uncontested(self):
 
92
        """Acquire and release a lock"""
 
93
        t = self.get_transport()
 
94
        lf = LockDir(t, 'test_lock')
 
95
        lf.create()
 
96
        lf.attempt_lock()
 
97
        try:
 
98
            self.assertTrue(lf.is_held)
 
99
        finally:
 
100
            lf.unlock()
 
101
            self.assertFalse(lf.is_held)
 
102
 
 
103
    def test_11_create_readonly_transport(self):
 
104
        """Fail to create lock on readonly transport"""
 
105
        t = self.get_readonly_transport()
 
106
        lf = LockDir(t, 'test_lock')
 
107
        self.assertRaises(UnlockableTransport, lf.create)
 
108
 
 
109
    def test_12_lock_readonly_transport(self):
 
110
        """Fail to lock on readonly transport"""
 
111
        lf = LockDir(self.get_transport(), 'test_lock')
 
112
        lf.create()
 
113
        lf = LockDir(self.get_readonly_transport(), 'test_lock')
 
114
        self.assertRaises(UnlockableTransport, lf.attempt_lock)
 
115
 
 
116
    def test_20_lock_contested(self):
 
117
        """Contention to get a lock"""
 
118
        t = self.get_transport()
 
119
        lf1 = LockDir(t, 'test_lock')
 
120
        lf1.create()
 
121
        lf1.attempt_lock()
 
122
        lf2 = LockDir(t, 'test_lock')
 
123
        try:
 
124
            # locking is between LockDir instances; aliases within 
 
125
            # a single process are not detected
 
126
            lf2.attempt_lock()
 
127
            self.fail('Failed to detect lock collision')
 
128
        except LockContention, e:
 
129
            self.assertEqual(e.lock, lf2)
 
130
            self.assertContainsRe(str(e),
 
131
                    r'^Could not acquire.*test_lock.*$')
 
132
        lf1.unlock()
 
133
 
 
134
    def test_20_lock_peek(self):
 
135
        """Peek at the state of a lock"""
 
136
        t = self.get_transport()
 
137
        lf1 = LockDir(t, 'test_lock')
 
138
        lf1.create()
 
139
        lf1.attempt_lock()
 
140
        # lock is held, should get some info on it
 
141
        info1 = lf1.peek()
 
142
        self.assertEqual(set(info1.keys()),
 
143
                         set(['user', 'nonce', 'hostname', 'pid', 'start_time']))
 
144
        # should get the same info if we look at it through a different
 
145
        # instance
 
146
        info2 = LockDir(t, 'test_lock').peek()
 
147
        self.assertEqual(info1, info2)
 
148
        # locks which are never used should be not-held
 
149
        self.assertEqual(LockDir(t, 'other_lock').peek(), None)
 
150
 
 
151
    def test_21_peek_readonly(self):
 
152
        """Peek over a readonly transport"""
 
153
        t = self.get_transport()
 
154
        lf1 = LockDir(t, 'test_lock')
 
155
        lf1.create()
 
156
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
 
157
        self.assertEqual(lf2.peek(), None)
 
158
        lf1.attempt_lock()
 
159
        info2 = lf2.peek()
 
160
        self.assertTrue(info2)
 
161
        self.assertEqual(info2['nonce'], lf1.nonce)
 
162
 
 
163
    def test_30_lock_wait_fail(self):
 
164
        """Wait on a lock, then fail
 
165
        
 
166
        We ask to wait up to 400ms; this should fail within at most one
 
167
        second.  (Longer times are more realistic but we don't want the test
 
168
        suite to take too long, and this should do for now.)
 
169
        """
 
170
        t = self.get_transport()
 
171
        lf1 = LockDir(t, 'test_lock')
 
172
        lf1.create()
 
173
        lf2 = LockDir(t, 'test_lock')
 
174
        self.setup_log_reporter(lf2)
 
175
        lf1.attempt_lock()
 
176
        try:
 
177
            before = time.time()
 
178
            self.assertRaises(LockContention, lf2.wait_lock,
 
179
                              timeout=0.4, poll=0.1)
 
180
            after = time.time()
 
181
            # it should only take about 0.4 seconds, but we allow more time in
 
182
            # case the machine is heavily loaded
 
183
            self.assertTrue(after - before <= 8.0, 
 
184
                    "took %f seconds to detect lock contention" % (after - before))
 
185
        finally:
 
186
            lf1.unlock()
 
187
        lock_base = lf2.transport.abspath(lf2.path)
 
188
        self.assertEqual(1, len(self._logged_reports))
 
189
        self.assertEqual('%s %s\n'
 
190
                         '%s\n%s\n'
 
191
                         'Will continue to try until %s\n',
 
192
                         self._logged_reports[0][0])
 
193
        args = self._logged_reports[0][1]
 
194
        self.assertEqual('Unable to obtain', args[0])
 
195
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
196
        self.assertStartsWith(args[2], 'held by ')
 
197
        self.assertStartsWith(args[3], 'locked ')
 
198
        self.assertEndsWith(args[3], ' ago')
 
199
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
200
 
 
201
    def test_31_lock_wait_easy(self):
 
202
        """Succeed when waiting on a lock with no contention.
 
203
        """
 
204
        t = self.get_transport()
 
205
        lf1 = LockDir(t, 'test_lock')
 
206
        lf1.create()
 
207
        self.setup_log_reporter(lf1)
 
208
        try:
 
209
            before = time.time()
 
210
            lf1.wait_lock(timeout=0.4, poll=0.1)
 
211
            after = time.time()
 
212
            self.assertTrue(after - before <= 1.0)
 
213
        finally:
 
214
            lf1.unlock()
 
215
        self.assertEqual([], self._logged_reports)
 
216
 
 
217
    def test_32_lock_wait_succeed(self):
 
218
        """Succeed when trying to acquire a lock that gets released
 
219
 
 
220
        One thread holds on a lock and then releases it; another 
 
221
        tries to lock it.
 
222
        """
 
223
        # This test sometimes fails like this:
 
224
        # Traceback (most recent call last):
 
225
 
 
226
        #   File "/home/pqm/bzr-pqm-workdir/home/+trunk/bzrlib/tests/
 
227
        # test_lockdir.py", line 247, in test_32_lock_wait_succeed
 
228
        #     self.assertEqual(1, len(self._logged_reports))
 
229
        # AssertionError: not equal:
 
230
        # a = 1
 
231
        # b = 0
 
232
        raise tests.TestSkipped("Test fails intermittently")
 
233
        t = self.get_transport()
 
234
        lf1 = LockDir(t, 'test_lock')
 
235
        lf1.create()
 
236
        lf1.attempt_lock()
 
237
 
 
238
        def wait_and_unlock():
 
239
            time.sleep(0.1)
 
240
            lf1.unlock()
 
241
        unlocker = Thread(target=wait_and_unlock)
 
242
        unlocker.start()
 
243
        try:
 
244
            lf2 = LockDir(t, 'test_lock')
 
245
            self.setup_log_reporter(lf2)
 
246
            before = time.time()
 
247
            # wait and then lock
 
248
            lf2.wait_lock(timeout=0.4, poll=0.1)
 
249
            after = time.time()
 
250
            self.assertTrue(after - before <= 1.0)
 
251
        finally:
 
252
            unlocker.join()
 
253
 
 
254
        # There should be only 1 report, even though it should have to
 
255
        # wait for a while
 
256
        lock_base = lf2.transport.abspath(lf2.path)
 
257
        self.assertEqual(1, len(self._logged_reports))
 
258
        self.assertEqual('%s %s\n'
 
259
                         '%s\n%s\n'
 
260
                         'Will continue to try until %s\n',
 
261
                         self._logged_reports[0][0])
 
262
        args = self._logged_reports[0][1]
 
263
        self.assertEqual('Unable to obtain', args[0])
 
264
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
265
        self.assertStartsWith(args[2], 'held by ')
 
266
        self.assertStartsWith(args[3], 'locked ')
 
267
        self.assertEndsWith(args[3], ' ago')
 
268
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
269
 
 
270
    def test_33_wait(self):
 
271
        """Succeed when waiting on a lock that gets released
 
272
 
 
273
        The difference from test_32_lock_wait_succeed is that the second 
 
274
        caller does not actually acquire the lock, but just waits for it
 
275
        to be released.  This is done over a readonly transport.
 
276
        """
 
277
        t = self.get_transport()
 
278
        lf1 = LockDir(t, 'test_lock')
 
279
        lf1.create()
 
280
        lf1.attempt_lock()
 
281
 
 
282
        def wait_and_unlock():
 
283
            time.sleep(0.1)
 
284
            lf1.unlock()
 
285
        unlocker = Thread(target=wait_and_unlock)
 
286
        unlocker.start()
 
287
        try:
 
288
            lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
 
289
            before = time.time()
 
290
            # wait but don't lock
 
291
            lf2.wait(timeout=0.4, poll=0.1)
 
292
            after = time.time()
 
293
            self.assertTrue(after - before <= 1.0)
 
294
        finally:
 
295
            unlocker.join()
 
296
 
 
297
    def test_34_lock_write_waits(self):
 
298
        """LockDir.lock_write() will wait for the lock.""" 
 
299
        # the test suite sets the default to 0 to make deadlocks fail fast.
 
300
        # change it for this test, as we want to try a manual deadlock.
 
301
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
 
302
        t = self.get_transport()
 
303
        lf1 = LockDir(t, 'test_lock')
 
304
        lf1.create()
 
305
        lf1.attempt_lock()
 
306
 
 
307
        def wait_and_unlock():
 
308
            time.sleep(0.1)
 
309
            lf1.unlock()
 
310
        unlocker = Thread(target=wait_and_unlock)
 
311
        unlocker.start()
 
312
        try:
 
313
            lf2 = LockDir(t, 'test_lock')
 
314
            self.setup_log_reporter(lf2)
 
315
            before = time.time()
 
316
            # wait and then lock
 
317
            lf2.lock_write()
 
318
            after = time.time()
 
319
        finally:
 
320
            unlocker.join()
 
321
 
 
322
        # There should be only 1 report, even though it should have to
 
323
        # wait for a while
 
324
        lock_base = lf2.transport.abspath(lf2.path)
 
325
        self.assertEqual(1, len(self._logged_reports))
 
326
        self.assertEqual('%s %s\n'
 
327
                         '%s\n%s\n'
 
328
                         'Will continue to try until %s\n',
 
329
                         self._logged_reports[0][0])
 
330
        args = self._logged_reports[0][1]
 
331
        self.assertEqual('Unable to obtain', args[0])
 
332
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
333
        self.assertStartsWith(args[2], 'held by ')
 
334
        self.assertStartsWith(args[3], 'locked ')
 
335
        self.assertEndsWith(args[3], ' ago')
 
336
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
337
 
 
338
    def test_35_wait_lock_changing(self):
 
339
        """LockDir.wait_lock() will report if the lock changes underneath.
 
340
        
 
341
        This is the stages we want to happen:
 
342
 
 
343
        0) Synchronization locks are created and locked.
 
344
        1) Lock1 obtains the lockdir, and releases the 'check' lock.
 
345
        2) Lock2 grabs the 'check' lock, and checks the lockdir.
 
346
           It sees the lockdir is already acquired, reports the fact, 
 
347
           and unsets the 'checked' lock.
 
348
        3) Thread1 blocks on acquiring the 'checked' lock, and then tells
 
349
           Lock1 to release and acquire the lockdir. This resets the 'check'
 
350
           lock.
 
351
        4) Lock2 acquires the 'check' lock, and checks again. It notices
 
352
           that the holder of the lock has changed, and so reports a new 
 
353
           lock holder.
 
354
        5) Thread1 blocks on the 'checked' lock, this time, it completely
 
355
           unlocks the lockdir, allowing Lock2 to acquire the lock.
 
356
        """
 
357
 
 
358
        wait_to_check_lock = Lock()
 
359
        wait_until_checked_lock = Lock()
 
360
 
 
361
        wait_to_check_lock.acquire()
 
362
        wait_until_checked_lock.acquire()
 
363
        note('locked check and checked locks')
 
364
 
 
365
        class LockDir1(LockDir):
 
366
            """Use the synchronization points for the first lock."""
 
367
 
 
368
            def attempt_lock(self):
 
369
                # Once we have acquired the lock, it is okay for
 
370
                # the other lock to check it
 
371
                try:
 
372
                    return super(LockDir1, self).attempt_lock()
 
373
                finally:
 
374
                    note('lock1: releasing check lock')
 
375
                    wait_to_check_lock.release()
 
376
 
 
377
        class LockDir2(LockDir):
 
378
            """Use the synchronization points for the second lock."""
 
379
 
 
380
            def attempt_lock(self):
 
381
                note('lock2: waiting for check lock')
 
382
                wait_to_check_lock.acquire()
 
383
                note('lock2: acquired check lock')
 
384
                try:
 
385
                    return super(LockDir2, self).attempt_lock()
 
386
                finally:
 
387
                    note('lock2: releasing checked lock')
 
388
                    wait_until_checked_lock.release()
 
389
 
 
390
        t = self.get_transport()
 
391
        lf1 = LockDir1(t, 'test_lock')
 
392
        lf1.create()
 
393
 
 
394
        lf2 = LockDir2(t, 'test_lock')
 
395
        self.setup_log_reporter(lf2)
 
396
 
 
397
        def wait_and_switch():
 
398
            lf1.attempt_lock()
 
399
            # Block until lock2 has had a chance to check
 
400
            note('lock1: waiting 1 for checked lock')
 
401
            wait_until_checked_lock.acquire()
 
402
            note('lock1: acquired for checked lock')
 
403
            note('lock1: released lockdir')
 
404
            lf1.unlock()
 
405
            note('lock1: acquiring lockdir')
 
406
            # Create a new nonce, so the lock looks different.
 
407
            lf1.nonce = osutils.rand_chars(20)
 
408
            lf1.lock_write()
 
409
            note('lock1: acquired lockdir')
 
410
 
 
411
            # Block until lock2 has peeked again
 
412
            note('lock1: waiting 2 for checked lock')
 
413
            wait_until_checked_lock.acquire()
 
414
            note('lock1: acquired for checked lock')
 
415
            # Now unlock, and let lock 2 grab the lock
 
416
            lf1.unlock()
 
417
            wait_to_check_lock.release()
 
418
 
 
419
        unlocker = Thread(target=wait_and_switch)
 
420
        unlocker.start()
 
421
        try:
 
422
            # Wait and play against the other thread
 
423
            lf2.wait_lock(timeout=1.0, poll=0.01)
 
424
        finally:
 
425
            unlocker.join()
 
426
        lf2.unlock()
 
427
 
 
428
        # There should be 2 reports, because the lock changed
 
429
        lock_base = lf2.transport.abspath(lf2.path)
 
430
        self.assertEqual(2, len(self._logged_reports))
 
431
 
 
432
        self.assertEqual('%s %s\n'
 
433
                         '%s\n%s\n'
 
434
                         'Will continue to try until %s\n',
 
435
                         self._logged_reports[0][0])
 
436
        args = self._logged_reports[0][1]
 
437
        self.assertEqual('Unable to obtain', args[0])
 
438
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
439
        self.assertStartsWith(args[2], 'held by ')
 
440
        self.assertStartsWith(args[3], 'locked ')
 
441
        self.assertEndsWith(args[3], ' ago')
 
442
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
443
 
 
444
        self.assertEqual('%s %s\n'
 
445
                         '%s\n%s\n'
 
446
                         'Will continue to try until %s\n',
 
447
                         self._logged_reports[1][0])
 
448
        args = self._logged_reports[1][1]
 
449
        self.assertEqual('Lock owner changed for', args[0])
 
450
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
451
        self.assertStartsWith(args[2], 'held by ')
 
452
        self.assertStartsWith(args[3], 'locked ')
 
453
        self.assertEndsWith(args[3], ' ago')
 
454
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
455
 
 
456
    def test_40_confirm_easy(self):
 
457
        """Confirm a lock that's already held"""
 
458
        t = self.get_transport()
 
459
        lf1 = LockDir(t, 'test_lock')
 
460
        lf1.create()
 
461
        lf1.attempt_lock()
 
462
        lf1.confirm()
 
463
 
 
464
    def test_41_confirm_not_held(self):
 
465
        """Confirm a lock that's already held"""
 
466
        t = self.get_transport()
 
467
        lf1 = LockDir(t, 'test_lock')
 
468
        lf1.create()
 
469
        self.assertRaises(LockNotHeld, lf1.confirm)
 
470
 
 
471
    def test_42_confirm_broken_manually(self):
 
472
        """Confirm a lock broken by hand"""
 
473
        t = self.get_transport()
 
474
        lf1 = LockDir(t, 'test_lock')
 
475
        lf1.create()
 
476
        lf1.attempt_lock()
 
477
        t.move('test_lock', 'lock_gone_now')
 
478
        self.assertRaises(LockBroken, lf1.confirm)
 
479
 
 
480
    def test_43_break(self):
 
481
        """Break a lock whose caller has forgotten it"""
 
482
        t = self.get_transport()
 
483
        lf1 = LockDir(t, 'test_lock')
 
484
        lf1.create()
 
485
        lf1.attempt_lock()
 
486
        # we incorrectly discard the lock object without unlocking it
 
487
        del lf1
 
488
        # someone else sees it's still locked
 
489
        lf2 = LockDir(t, 'test_lock')
 
490
        holder_info = lf2.peek()
 
491
        self.assertTrue(holder_info)
 
492
        lf2.force_break(holder_info)
 
493
        # now we should be able to take it
 
494
        lf2.attempt_lock()
 
495
        lf2.confirm()
 
496
 
 
497
    def test_44_break_already_released(self):
 
498
        """Lock break races with regular release"""
 
499
        t = self.get_transport()
 
500
        lf1 = LockDir(t, 'test_lock')
 
501
        lf1.create()
 
502
        lf1.attempt_lock()
 
503
        # someone else sees it's still locked
 
504
        lf2 = LockDir(t, 'test_lock')
 
505
        holder_info = lf2.peek()
 
506
        # in the interim the lock is released
 
507
        lf1.unlock()
 
508
        # break should succeed
 
509
        lf2.force_break(holder_info)
 
510
        # now we should be able to take it
 
511
        lf2.attempt_lock()
 
512
        lf2.confirm()
 
513
 
 
514
    def test_45_break_mismatch(self):
 
515
        """Lock break races with someone else acquiring it"""
 
516
        t = self.get_transport()
 
517
        lf1 = LockDir(t, 'test_lock')
 
518
        lf1.create()
 
519
        lf1.attempt_lock()
 
520
        # someone else sees it's still locked
 
521
        lf2 = LockDir(t, 'test_lock')
 
522
        holder_info = lf2.peek()
 
523
        # in the interim the lock is released
 
524
        lf1.unlock()
 
525
        lf3 = LockDir(t, 'test_lock')
 
526
        lf3.attempt_lock()
 
527
        # break should now *fail*
 
528
        self.assertRaises(LockBreakMismatch, lf2.force_break,
 
529
                          holder_info)
 
530
        lf3.unlock()
 
531
 
 
532
    def test_46_fake_read_lock(self):
 
533
        t = self.get_transport()
 
534
        lf1 = LockDir(t, 'test_lock')
 
535
        lf1.create()
 
536
        lf1.lock_read()
 
537
        lf1.unlock()
 
538
 
 
539
    def test_50_lockdir_representation(self):
 
540
        """Check the on-disk representation of LockDirs is as expected.
 
541
 
 
542
        There should always be a top-level directory named by the lock.
 
543
        When the lock is held, there should be a lockname/held directory 
 
544
        containing an info file.
 
545
        """
 
546
        t = self.get_transport()
 
547
        lf1 = LockDir(t, 'test_lock')
 
548
        lf1.create()
 
549
        self.assertTrue(t.has('test_lock'))
 
550
        lf1.lock_write()
 
551
        self.assertTrue(t.has('test_lock/held/info'))
 
552
        lf1.unlock()
 
553
        self.assertFalse(t.has('test_lock/held/info'))
 
554
 
 
555
    def test_break_lock(self):
 
556
        # the ui based break_lock routine should Just Work (tm)
 
557
        ld1 = self.get_lock()
 
558
        ld2 = self.get_lock()
 
559
        ld1.create()
 
560
        ld1.lock_write()
 
561
        # do this without IO redirection to ensure it doesn't prompt.
 
562
        self.assertRaises(AssertionError, ld1.break_lock)
 
563
        orig_factory = bzrlib.ui.ui_factory
 
564
        # silent ui - no need for stdout
 
565
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
566
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
567
        try:
 
568
            ld2.break_lock()
 
569
            self.assertRaises(LockBroken, ld1.unlock)
 
570
        finally:
 
571
            bzrlib.ui.ui_factory = orig_factory
 
572
 
 
573
    def test_create_missing_base_directory(self):
 
574
        """If LockDir.path doesn't exist, it can be created
 
575
 
 
576
        Some people manually remove the entire lock/ directory trying
 
577
        to unlock a stuck repository/branch/etc. Rather than failing
 
578
        after that, just create the lock directory when needed.
 
579
        """
 
580
        t = self.get_transport()
 
581
        lf1 = LockDir(t, 'test_lock')
 
582
 
 
583
        lf1.create()
 
584
        self.failUnless(t.has('test_lock'))
 
585
 
 
586
        t.rmdir('test_lock')
 
587
        self.failIf(t.has('test_lock'))
 
588
 
 
589
        # This will create 'test_lock' if it needs to
 
590
        lf1.lock_write()
 
591
        self.failUnless(t.has('test_lock'))
 
592
        self.failUnless(t.has('test_lock/held/info'))
 
593
 
 
594
        lf1.unlock()
 
595
        self.failIf(t.has('test_lock/held/info'))
 
596
 
 
597
    def test__format_lock_info(self):
 
598
        ld1 = self.get_lock()
 
599
        ld1.create()
 
600
        ld1.lock_write()
 
601
        try:
 
602
            info_list = ld1._format_lock_info(ld1.peek())
 
603
        finally:
 
604
            ld1.unlock()
 
605
        self.assertEqual('lock %s' % (ld1.transport.abspath(ld1.path),),
 
606
                         info_list[0])
 
607
        self.assertContainsRe(info_list[1],
 
608
                              r'^held by .* on host .* \[process #\d*\]$')
 
609
        self.assertContainsRe(info_list[2], r'locked \d+ seconds? ago$')
 
610
 
 
611
    def test_lock_without_email(self):
 
612
        global_config = config.GlobalConfig()
 
613
        # Intentionally has no email address
 
614
        global_config.set_user_option('email', 'User Identity')
 
615
        ld1 = self.get_lock()
 
616
        ld1.create()
 
617
        ld1.lock_write()
 
618
        ld1.unlock()
 
619
 
 
620
    def test_lock_permission(self):
 
621
        if not osutils.supports_posix_readonly():
 
622
            raise tests.TestSkipped('Cannot induce a permission failure')
 
623
        ld1 = self.get_lock()
 
624
        lock_path = ld1.transport.local_abspath('test_lock')
 
625
        os.mkdir(lock_path)
 
626
        osutils.make_readonly(lock_path)
 
627
        self.assertRaises(errors.PermissionDenied, ld1.attempt_lock)