~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
"""Tests for LockDir"""
18
18
 
19
19
from cStringIO import StringIO
20
 
from threading import Thread
 
20
from threading import Thread, Lock
21
21
import time
22
22
 
23
23
import bzrlib
 
24
from bzrlib import (
 
25
    osutils,
 
26
    )
24
27
from bzrlib.errors import (
25
28
        LockBreakMismatch,
26
29
        LockContention, LockError, UnlockableTransport,
27
30
        LockNotHeld, LockBroken
28
31
        )
29
 
from bzrlib.lockdir import LockDir
 
32
from bzrlib.lockdir import LockDir, _DEFAULT_TIMEOUT_SECONDS
30
33
from bzrlib.tests import TestCaseWithTransport
 
34
from bzrlib.trace import note
31
35
 
32
36
# These tests sometimes use threads to test the behaviour of lock files with
33
37
# concurrent actors.  This is not a typical (or necessarily supported) use;
42
46
class TestLockDir(TestCaseWithTransport):
43
47
    """Test LockDir operations"""
44
48
 
 
49
    def logging_report_function(self, fmt, *args):
 
50
        self._logged_reports.append((fmt, args))
 
51
 
 
52
    def setup_log_reporter(self, lock_dir):
 
53
        self._logged_reports = []
 
54
        lock_dir._report_function = self.logging_report_function
 
55
 
45
56
    def test_00_lock_creation(self):
46
57
        """Creation of lock file on a transport"""
47
58
        t = self.get_transport()
156
167
        lf1 = LockDir(t, 'test_lock')
157
168
        lf1.create()
158
169
        lf2 = LockDir(t, 'test_lock')
 
170
        self.setup_log_reporter(lf2)
159
171
        lf1.attempt_lock()
160
172
        try:
161
173
            before = time.time()
168
180
                    "took %f seconds to detect lock contention" % (after - before))
169
181
        finally:
170
182
            lf1.unlock()
 
183
        lock_base = lf2.transport.abspath(lf2.path)
 
184
        self.assertEqual(1, len(self._logged_reports))
 
185
        self.assertEqual('%s %s\n'
 
186
                         '%s\n%s\n'
 
187
                         'Will continue to try until %s\n',
 
188
                         self._logged_reports[0][0])
 
189
        args = self._logged_reports[0][1]
 
190
        self.assertEqual('Unable to obtain', args[0])
 
191
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
192
        self.assertStartsWith(args[2], 'held by ')
 
193
        self.assertStartsWith(args[3], 'locked ')
 
194
        self.assertEndsWith(args[3], ' ago')
 
195
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
171
196
 
172
197
    def test_31_lock_wait_easy(self):
173
198
        """Succeed when waiting on a lock with no contention.
175
200
        t = self.get_transport()
176
201
        lf1 = LockDir(t, 'test_lock')
177
202
        lf1.create()
 
203
        self.setup_log_reporter(lf1)
178
204
        try:
179
205
            before = time.time()
180
206
            lf1.wait_lock(timeout=0.4, poll=0.1)
182
208
            self.assertTrue(after - before <= 1.0)
183
209
        finally:
184
210
            lf1.unlock()
 
211
        self.assertEqual([], self._logged_reports)
185
212
 
186
213
    def test_32_lock_wait_succeed(self):
187
214
        """Succeed when trying to acquire a lock that gets released
201
228
        unlocker.start()
202
229
        try:
203
230
            lf2 = LockDir(t, 'test_lock')
 
231
            self.setup_log_reporter(lf2)
204
232
            before = time.time()
205
233
            # wait and then lock
206
234
            lf2.wait_lock(timeout=0.4, poll=0.1)
209
237
        finally:
210
238
            unlocker.join()
211
239
 
 
240
        # There should be only 1 report, even though it should have to
 
241
        # wait for a while
 
242
        lock_base = lf2.transport.abspath(lf2.path)
 
243
        self.assertEqual(1, len(self._logged_reports))
 
244
        self.assertEqual('%s %s\n'
 
245
                         '%s\n%s\n'
 
246
                         'Will continue to try until %s\n',
 
247
                         self._logged_reports[0][0])
 
248
        args = self._logged_reports[0][1]
 
249
        self.assertEqual('Unable to obtain', args[0])
 
250
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
251
        self.assertStartsWith(args[2], 'held by ')
 
252
        self.assertStartsWith(args[3], 'locked ')
 
253
        self.assertEndsWith(args[3], ' ago')
 
254
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
255
 
212
256
    def test_33_wait(self):
213
257
        """Succeed when waiting on a lock that gets released
214
258
 
236
280
        finally:
237
281
            unlocker.join()
238
282
 
 
283
    def test_34_lock_write_waits(self):
 
284
        """LockDir.lock_write() will wait for the lock.""" 
 
285
        t = self.get_transport()
 
286
        lf1 = LockDir(t, 'test_lock')
 
287
        lf1.create()
 
288
        lf1.attempt_lock()
 
289
 
 
290
        def wait_and_unlock():
 
291
            time.sleep(0.1)
 
292
            lf1.unlock()
 
293
        unlocker = Thread(target=wait_and_unlock)
 
294
        unlocker.start()
 
295
        try:
 
296
            lf2 = LockDir(t, 'test_lock')
 
297
            self.setup_log_reporter(lf2)
 
298
            before = time.time()
 
299
            # wait and then lock
 
300
            lf2.lock_write()
 
301
            after = time.time()
 
302
        finally:
 
303
            unlocker.join()
 
304
 
 
305
        # There should be only 1 report, even though it should have to
 
306
        # wait for a while
 
307
        lock_base = lf2.transport.abspath(lf2.path)
 
308
        self.assertEqual(1, len(self._logged_reports))
 
309
        self.assertEqual('%s %s\n'
 
310
                         '%s\n%s\n'
 
311
                         'Will continue to try until %s\n',
 
312
                         self._logged_reports[0][0])
 
313
        args = self._logged_reports[0][1]
 
314
        self.assertEqual('Unable to obtain', args[0])
 
315
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
316
        self.assertStartsWith(args[2], 'held by ')
 
317
        self.assertStartsWith(args[3], 'locked ')
 
318
        self.assertEndsWith(args[3], ' ago')
 
319
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
320
 
 
321
    def test_35_wait_lock_changing(self):
 
322
        """LockDir.wait_lock() will report if the lock changes underneath.
 
323
        
 
324
        This is the stages we want to happen:
 
325
 
 
326
        0) Synchronization locks are created and locked.
 
327
        1) Lock1 obtains the lockdir, and releases the 'check' lock.
 
328
        2) Lock2 grabs the 'check' lock, and checks the lockdir.
 
329
           It sees the lockdir is already acquired, reports the fact, 
 
330
           and unsets the 'checked' lock.
 
331
        3) Thread1 blocks on acquiring the 'checked' lock, and then tells
 
332
           Lock1 to release and acquire the lockdir. This resets the 'check'
 
333
           lock.
 
334
        4) Lock2 acquires the 'check' lock, and checks again. It notices
 
335
           that the holder of the lock has changed, and so reports a new 
 
336
           lock holder.
 
337
        5) Thread1 blocks on the 'checked' lock, this time, it completely
 
338
           unlocks the lockdir, allowing Lock2 to acquire the lock.
 
339
        """
 
340
 
 
341
        wait_to_check_lock = Lock()
 
342
        wait_until_checked_lock = Lock()
 
343
 
 
344
        wait_to_check_lock.acquire()
 
345
        wait_until_checked_lock.acquire()
 
346
        note('locked check and checked locks')
 
347
 
 
348
        class LockDir1(LockDir):
 
349
            """Use the synchronization points for the first lock."""
 
350
 
 
351
            def attempt_lock(self):
 
352
                # Once we have acquired the lock, it is okay for
 
353
                # the other lock to check it
 
354
                try:
 
355
                    return super(LockDir1, self).attempt_lock()
 
356
                finally:
 
357
                    note('lock1: releasing check lock')
 
358
                    wait_to_check_lock.release()
 
359
 
 
360
        class LockDir2(LockDir):
 
361
            """Use the synchronization points for the second lock."""
 
362
 
 
363
            def attempt_lock(self):
 
364
                note('lock2: waiting for check lock')
 
365
                wait_to_check_lock.acquire()
 
366
                note('lock2: acquired check lock')
 
367
                try:
 
368
                    return super(LockDir2, self).attempt_lock()
 
369
                finally:
 
370
                    note('lock2: releasing checked lock')
 
371
                    wait_until_checked_lock.release()
 
372
 
 
373
        t = self.get_transport()
 
374
        lf1 = LockDir1(t, 'test_lock')
 
375
        lf1.create()
 
376
 
 
377
        lf2 = LockDir2(t, 'test_lock')
 
378
        self.setup_log_reporter(lf2)
 
379
 
 
380
        def wait_and_switch():
 
381
            lf1.attempt_lock()
 
382
            # Block until lock2 has had a chance to check
 
383
            note('lock1: waiting 1 for checked lock')
 
384
            wait_until_checked_lock.acquire()
 
385
            note('lock1: acquired for checked lock')
 
386
            note('lock1: released lockdir')
 
387
            lf1.unlock()
 
388
            note('lock1: acquiring lockdir')
 
389
            # Create a new nonce, so the lock looks different.
 
390
            lf1.nonce = osutils.rand_chars(20)
 
391
            lf1.lock_write()
 
392
            note('lock1: acquired lockdir')
 
393
 
 
394
            # Block until lock2 has peeked again
 
395
            note('lock1: waiting 2 for checked lock')
 
396
            wait_until_checked_lock.acquire()
 
397
            note('lock1: acquired for checked lock')
 
398
            # Now unlock, and let lock 2 grab the lock
 
399
            lf1.unlock()
 
400
            wait_to_check_lock.release()
 
401
 
 
402
        unlocker = Thread(target=wait_and_switch)
 
403
        unlocker.start()
 
404
        try:
 
405
            # Wait and play against the other thread
 
406
            lf2.wait_lock(timeout=1.0, poll=0.01)
 
407
        finally:
 
408
            unlocker.join()
 
409
        lf2.unlock()
 
410
 
 
411
        # There should be 2 reports, because the lock changed
 
412
        lock_base = lf2.transport.abspath(lf2.path)
 
413
        self.assertEqual(2, len(self._logged_reports))
 
414
 
 
415
        self.assertEqual('%s %s\n'
 
416
                         '%s\n%s\n'
 
417
                         'Will continue to try until %s\n',
 
418
                         self._logged_reports[0][0])
 
419
        args = self._logged_reports[0][1]
 
420
        self.assertEqual('Unable to obtain', args[0])
 
421
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
422
        self.assertStartsWith(args[2], 'held by ')
 
423
        self.assertStartsWith(args[3], 'locked ')
 
424
        self.assertEndsWith(args[3], ' ago')
 
425
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
426
 
 
427
        self.assertEqual('%s %s\n'
 
428
                         '%s\n%s\n'
 
429
                         'Will continue to try until %s\n',
 
430
                         self._logged_reports[1][0])
 
431
        args = self._logged_reports[1][1]
 
432
        self.assertEqual('Lock owner changed for', args[0])
 
433
        self.assertEqual('lock %s' % (lock_base,), args[1])
 
434
        self.assertStartsWith(args[2], 'held by ')
 
435
        self.assertStartsWith(args[3], 'locked ')
 
436
        self.assertEndsWith(args[3], ' ago')
 
437
        self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
 
438
 
239
439
    def test_40_confirm_easy(self):
240
440
        """Confirm a lock that's already held"""
241
441
        t = self.get_transport()
376
576
 
377
577
        lf1.unlock()
378
578
        self.failIf(t.has('test_lock/held/info'))
 
579
 
 
580
    def test__format_lock_info(self):
 
581
        ld1 = self.get_lock()
 
582
        ld1.create()
 
583
        ld1.lock_write()
 
584
        try:
 
585
            info_list = ld1._format_lock_info(ld1.peek())
 
586
        finally:
 
587
            ld1.unlock()
 
588
        self.assertEqual('lock %s' % (ld1.transport.abspath(ld1.path),),
 
589
                         info_list[0])
 
590
        self.assertContainsRe(info_list[1],
 
591
                              r'^held by .* on host .* \[process #\d*\]$')
 
592
        self.assertContainsRe(info_list[2], r'locked \d+ seconds? ago$')