13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for LockDir"""
109
105
"""Fail to create lock on readonly transport"""
110
106
t = self.get_readonly_transport()
111
107
lf = LockDir(t, 'test_lock')
112
self.assertRaises(LockFailed, lf.create)
108
self.assertRaises(UnlockableTransport, lf.create)
114
110
def test_12_lock_readonly_transport(self):
115
111
"""Fail to lock on readonly transport"""
116
112
lf = LockDir(self.get_transport(), 'test_lock')
118
114
lf = LockDir(self.get_readonly_transport(), 'test_lock')
119
self.assertRaises(LockFailed, lf.attempt_lock)
115
self.assertRaises(UnlockableTransport, lf.attempt_lock)
121
117
def test_20_lock_contested(self):
122
118
"""Contention to get a lock"""
185
181
after = time.time()
186
182
# it should only take about 0.4 seconds, but we allow more time in
187
183
# case the machine is heavily loaded
188
self.assertTrue(after - before <= 8.0,
184
self.assertTrue(after - before <= 8.0,
189
185
"took %f seconds to detect lock contention" % (after - before))
192
188
lock_base = lf2.transport.abspath(lf2.path)
193
189
self.assertEqual(1, len(self._logged_reports))
194
lock_url = lf2.transport.abspath(lf2.path)
195
190
self.assertEqual('%s %s\n'
197
'Will continue to try until %s, unless '
199
'If you\'re sure that it\'s not being '
200
'modified, use bzr break-lock %s',
192
'Will continue to try until %s\n',
201
193
self._logged_reports[0][0])
202
194
args = self._logged_reports[0][1]
203
195
self.assertEqual('Unable to obtain', args[0])
277
269
self.assertContainsRe(args[4], r'\d\d:\d\d:\d\d')
279
271
def test_34_lock_write_waits(self):
280
"""LockDir.lock_write() will wait for the lock."""
272
"""LockDir.lock_write() will wait for the lock."""
281
273
# the test suite sets the default to 0 to make deadlocks fail fast.
282
274
# change it for this test, as we want to try a manual deadlock.
283
raise tests.TestSkipped('Timing-sensitive test')
284
275
bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 300
285
276
t = self.get_transport()
286
277
lf1 = LockDir(t, 'test_lock')
321
312
def test_35_wait_lock_changing(self):
322
313
"""LockDir.wait_lock() will report if the lock changes underneath.
324
315
This is the stages we want to happen:
326
317
0) Synchronization locks are created and locked.
327
318
1) Lock1 obtains the lockdir, and releases the 'check' lock.
328
319
2) Lock2 grabs the 'check' lock, and checks the lockdir.
329
It sees the lockdir is already acquired, reports the fact,
320
It sees the lockdir is already acquired, reports the fact,
330
321
and unsets the 'checked' lock.
331
322
3) Thread1 blocks on acquiring the 'checked' lock, and then tells
332
323
Lock1 to release and acquire the lockdir. This resets the 'check'
334
325
4) Lock2 acquires the 'check' lock, and checks again. It notices
335
that the holder of the lock has changed, and so reports a new
326
that the holder of the lock has changed, and so reports a new
337
328
5) Thread1 blocks on the 'checked' lock, this time, it completely
338
329
unlocks the lockdir, allowing Lock2 to acquire the lock.
341
raise tests.KnownFailure(
342
"timing dependency in lock tests (#213182)")
344
332
wait_to_check_lock = Lock()
345
333
wait_until_checked_lock = Lock()
414
402
# There should be 2 reports, because the lock changed
415
403
lock_base = lf2.transport.abspath(lf2.path)
416
404
self.assertEqual(2, len(self._logged_reports))
417
lock_url = lf2.transport.abspath(lf2.path)
418
406
self.assertEqual('%s %s\n'
420
'Will continue to try until %s, unless '
422
'If you\'re sure that it\'s not being '
423
'modified, use bzr break-lock %s',
408
'Will continue to try until %s\n',
424
409
self._logged_reports[0][0])
425
410
args = self._logged_reports[0][1]
426
411
self.assertEqual('Unable to obtain', args[0])
433
418
self.assertEqual('%s %s\n'
435
'Will continue to try until %s, unless '
437
'If you\'re sure that it\'s not being '
438
'modified, use bzr break-lock %s',
420
'Will continue to try until %s\n',
439
421
self._logged_reports[1][0])
440
422
args = self._logged_reports[1][1]
441
423
self.assertEqual('Lock owner changed for', args[0])
659
641
self.assertRaises(errors.LockContention, ld2.attempt_lock)
661
643
check_dir(['held'])
663
def record_hook(self, result):
664
self._calls.append(result)
666
def reset_hooks(self):
667
self._old_hooks = lock.Lock.hooks
668
self.addCleanup(self.restore_hooks)
669
lock.Lock.hooks = lock.LockHooks()
671
def restore_hooks(self):
672
lock.Lock.hooks = self._old_hooks
674
def test_LockDir_acquired_success(self):
675
# the LockDir.lock_acquired hook fires when a lock is acquired.
678
LockDir.hooks.install_named_hook('lock_acquired',
679
self.record_hook, 'record_hook')
682
self.assertEqual([], self._calls)
683
result = ld.attempt_lock()
684
lock_path = ld.transport.abspath(ld.path)
685
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
687
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
689
def test_LockDir_acquired_fail(self):
690
# the LockDir.lock_acquired hook does not fire on failure.
695
ld2 = self.get_lock()
697
# install a lock hook now, when the disk lock is locked
698
LockDir.hooks.install_named_hook('lock_acquired',
699
self.record_hook, 'record_hook')
700
self.assertRaises(errors.LockContention, ld.attempt_lock)
701
self.assertEqual([], self._calls)
703
self.assertEqual([], self._calls)
705
def test_LockDir_released_success(self):
706
# the LockDir.lock_released hook fires when a lock is acquired.
709
LockDir.hooks.install_named_hook('lock_released',
710
self.record_hook, 'record_hook')
713
self.assertEqual([], self._calls)
714
result = ld.attempt_lock()
715
self.assertEqual([], self._calls)
717
lock_path = ld.transport.abspath(ld.path)
718
self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
720
def test_LockDir_released_fail(self):
721
# the LockDir.lock_released hook does not fire on failure.
726
ld2 = self.get_lock()
728
ld2.force_break(ld2.peek())
729
LockDir.hooks.install_named_hook('lock_released',
730
self.record_hook, 'record_hook')
731
self.assertRaises(LockBroken, ld.unlock)
732
self.assertEqual([], self._calls)