~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/lockdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-04-07 07:52:50 UTC
  • mfrom: (3340.1.1 208418-1.4)
  • Revision ID: pqm@pqm.ubuntu.com-20080407075250-phs53xnslo8boaeo
Return the correct knit serialisation method in _StreamAccess.
        (Andrew Bennetts, Martin Pool, Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
88
88
>>> t = MemoryTransport()
89
89
>>> l = LockDir(t, 'sample-lock')
90
90
>>> l.create()
91
 
>>> l.wait_lock()
 
91
>>> token = l.wait_lock()
92
92
>>> # do something here
93
93
>>> l.unlock()
94
94
 
95
95
"""
96
96
 
 
97
 
 
98
# TODO: We sometimes have the problem that our attempt to rename '1234' to
 
99
# 'held' fails because the transport server moves into an existing directory,
 
100
# rather than failing the rename.  If we made the info file name the same as
 
101
# the locked directory name we would avoid this problem because moving into
 
102
# the held directory would implicitly clash.  However this would not mesh with
 
103
# the existing locking code and needs a new format of the containing object.
 
104
# -- robertc, mbp 20070628
 
105
 
97
106
import os
98
107
import time
99
108
from cStringIO import StringIO
100
109
 
101
110
from bzrlib import (
 
111
    debug,
102
112
    errors,
103
113
    )
104
114
import bzrlib.config
108
118
        LockBreakMismatch,
109
119
        LockBroken,
110
120
        LockContention,
 
121
        LockFailed,
111
122
        LockNotHeld,
112
123
        NoSuchFile,
113
124
        PathError,
114
125
        ResourceBusy,
 
126
        TransportError,
115
127
        UnlockableTransport,
116
128
        )
117
129
from bzrlib.trace import mutter, note
182
194
        This is typically only called when the object/directory containing the 
183
195
        directory is first created.  The lock is not held when it's created.
184
196
        """
185
 
        if self.transport.is_readonly():
186
 
            raise UnlockableTransport(self.transport)
187
 
        self.transport.mkdir(self.path, mode=mode)
188
 
 
189
 
    def attempt_lock(self):
190
 
        """Take the lock; fail if it's already held.
 
197
        self._trace("create lock directory")
 
198
        try:
 
199
            self.transport.mkdir(self.path, mode=mode)
 
200
        except (TransportError, PathError), e:
 
201
            raise LockFailed(self, e)
 
202
 
 
203
 
 
204
    def _attempt_lock(self):
 
205
        """Make the pending directory and attempt to rename into place.
191
206
        
192
 
        If you wish to block until the lock can be obtained, call wait_lock()
193
 
        instead.
 
207
        If the rename succeeds, we read back the info file to check that we
 
208
        really got the lock.
 
209
 
 
210
        If we fail to acquire the lock, this method is responsible for
 
211
        cleaning up the pending directory if possible.  (But it doesn't do
 
212
        that yet.)
 
213
 
 
214
        :returns: The nonce of the lock, if it was successfully acquired.
 
215
 
 
216
        :raises LockContention: If the lock is held by someone else.  The exception
 
217
            contains the info of the current holder of the lock.
194
218
        """
195
 
        if self._fake_read_lock:
196
 
            raise LockContention(self)
197
 
        if self.transport.is_readonly():
198
 
            raise UnlockableTransport(self.transport)
199
 
        try:
200
 
            tmpname = '%s/pending.%s.tmp' % (self.path, rand_chars(20))
201
 
            try:
202
 
                self.transport.mkdir(tmpname)
203
 
            except NoSuchFile:
204
 
                # This may raise a FileExists exception
205
 
                # which is okay, it will be caught later and determined
206
 
                # to be a LockContention.
207
 
                self.create(mode=self._dir_modebits)
208
 
                
209
 
                # After creating the lock directory, try again
210
 
                self.transport.mkdir(tmpname)
211
 
 
212
 
            self.nonce = rand_chars(20)
213
 
            info_bytes = self._prepare_info()
214
 
            # We use put_file_non_atomic because we just created a new unique
215
 
            # directory so we don't have to worry about files existing there.
216
 
            # We'll rename the whole directory into place to get atomic
217
 
            # properties
218
 
            self.transport.put_bytes_non_atomic(tmpname + self.__INFO_NAME,
219
 
                                                info_bytes)
220
 
 
 
219
        self._trace("lock_write...")
 
220
        start_time = time.time()
 
221
        try:
 
222
            tmpname = self._create_pending_dir()
 
223
        except (errors.TransportError, PathError), e:
 
224
            self._trace("... failed to create pending dir, %s", e)
 
225
            raise LockFailed(self, e)
 
226
        try:
221
227
            self.transport.rename(tmpname, self._held_dir)
222
 
            self._lock_held = True
223
 
            self.confirm()
224
 
        except errors.PermissionDenied:
 
228
        except (errors.TransportError, PathError, DirectoryNotEmpty,
 
229
                FileExists, ResourceBusy), e:
 
230
            self._trace("... contention, %s", e)
 
231
            self._remove_pending_dir(tmpname)
 
232
            raise LockContention(self)
 
233
        except Exception, e:
 
234
            self._trace("... lock failed, %s", e)
 
235
            self._remove_pending_dir(tmpname)
225
236
            raise
226
 
        except (PathError, DirectoryNotEmpty, FileExists, ResourceBusy), e:
227
 
            mutter("contention on %r: %s", self, e)
 
237
        # We must check we really got the lock, because Launchpad's sftp
 
238
        # server at one time had a bug were the rename would successfully
 
239
        # move the new directory into the existing directory, which was
 
240
        # incorrect.  It's possible some other servers or filesystems will
 
241
        # have a similar bug allowing someone to think they got the lock
 
242
        # when it's already held.
 
243
        info = self.peek()
 
244
        self._trace("after locking, info=%r", info)
 
245
        if info['nonce'] != self.nonce:
 
246
            self._trace("rename succeeded, "
 
247
                "but lock is still held by someone else")
228
248
            raise LockContention(self)
 
249
        self._lock_held = True
 
250
        self._trace("... lock succeeded after %dms",
 
251
                (time.time() - start_time) * 1000)
 
252
        return self.nonce
 
253
 
 
254
    def _remove_pending_dir(self, tmpname):
 
255
        """Remove the pending directory
 
256
 
 
257
        This is called if we failed to rename into place, so that the pending 
 
258
        dirs don't clutter up the lockdir.
 
259
        """
 
260
        self._trace("remove %s", tmpname)
 
261
        try:
 
262
            self.transport.delete(tmpname + self.__INFO_NAME)
 
263
            self.transport.rmdir(tmpname)
 
264
        except PathError, e:
 
265
            note("error removing pending lock: %s", e)
 
266
 
 
267
    def _create_pending_dir(self):
 
268
        tmpname = '%s/%s.tmp' % (self.path, rand_chars(10))
 
269
        try:
 
270
            self.transport.mkdir(tmpname)
 
271
        except NoSuchFile:
 
272
            # This may raise a FileExists exception
 
273
            # which is okay, it will be caught later and determined
 
274
            # to be a LockContention.
 
275
            self._trace("lock directory does not exist, creating it")
 
276
            self.create(mode=self._dir_modebits)
 
277
            # After creating the lock directory, try again
 
278
            self.transport.mkdir(tmpname)
 
279
        self.nonce = rand_chars(20)
 
280
        info_bytes = self._prepare_info()
 
281
        # We use put_file_non_atomic because we just created a new unique
 
282
        # directory so we don't have to worry about files existing there.
 
283
        # We'll rename the whole directory into place to get atomic
 
284
        # properties
 
285
        self.transport.put_bytes_non_atomic(tmpname + self.__INFO_NAME,
 
286
                                            info_bytes)
 
287
        return tmpname
229
288
 
230
289
    def unlock(self):
231
290
        """Release a held lock
241
300
        else:
242
301
            # rename before deleting, because we can't atomically remove the
243
302
            # whole tree
 
303
            start_time = time.time()
 
304
            self._trace("unlocking")
244
305
            tmpname = '%s/releasing.%s.tmp' % (self.path, rand_chars(20))
245
306
            # gotta own it to unlock
246
307
            self.confirm()
247
308
            self.transport.rename(self._held_dir, tmpname)
248
309
            self._lock_held = False
249
310
            self.transport.delete(tmpname + self.__INFO_NAME)
250
 
            self.transport.rmdir(tmpname)
 
311
            try:
 
312
                self.transport.rmdir(tmpname)
 
313
            except DirectoryNotEmpty, e:
 
314
                # There might have been junk left over by a rename that moved
 
315
                # another locker within the 'held' directory.  do a slower
 
316
                # deletion where we list the directory and remove everything
 
317
                # within it.
 
318
                #
 
319
                # Maybe this should be broader to allow for ftp servers with
 
320
                # non-specific error messages?
 
321
                self._trace("doing recursive deletion of non-empty directory "
 
322
                        "%s", tmpname)
 
323
                self.transport.delete_tree(tmpname)
 
324
            self._trace("... unlock succeeded after %dms",
 
325
                    (time.time() - start_time) * 1000)
251
326
 
252
327
    def break_lock(self):
253
328
        """Break a lock not held by this instance of LockDir.
341
416
        """
342
417
        try:
343
418
            info = self._read_info_file(self._held_info_path)
 
419
            self._trace("peek -> held")
344
420
            assert isinstance(info, dict), \
345
421
                    "bad parse result %r" % info
346
422
            return info
347
423
        except NoSuchFile, e:
348
 
            return None
 
424
            self._trace("peek -> not held")
349
425
 
350
426
    def _prepare_info(self):
351
427
        """Write information about a pending lock to a temporary file.
368
444
    def _parse_info(self, info_file):
369
445
        return read_stanza(info_file.readlines()).as_dict()
370
446
 
371
 
    def wait_lock(self, timeout=None, poll=None):
 
447
    def attempt_lock(self):
 
448
        """Take the lock; fail if it's already held.
 
449
        
 
450
        If you wish to block until the lock can be obtained, call wait_lock()
 
451
        instead.
 
452
 
 
453
        :return: The lock token.
 
454
        :raises LockContention: if the lock is held by someone else.
 
455
        """
 
456
        if self._fake_read_lock:
 
457
            raise LockContention(self)
 
458
        return self._attempt_lock()
 
459
 
 
460
    def wait_lock(self, timeout=None, poll=None, max_attempts=None):
372
461
        """Wait a certain period for a lock.
373
462
 
374
463
        If the lock can be acquired within the bounded time, it
376
465
        is raised.  Either way, this function should return within
377
466
        approximately `timeout` seconds.  (It may be a bit more if
378
467
        a transport operation takes a long time to complete.)
 
468
 
 
469
        :param timeout: Approximate maximum amount of time to wait for the
 
470
        lock, in seconds.
 
471
         
 
472
        :param poll: Delay in seconds between retrying the lock.
 
473
 
 
474
        :param max_attempts: Maximum number of times to try to lock.
 
475
 
 
476
        :return: The lock token.
379
477
        """
380
478
        if timeout is None:
381
479
            timeout = _DEFAULT_TIMEOUT_SECONDS
382
480
        if poll is None:
383
481
            poll = _DEFAULT_POLL_SECONDS
384
 
 
385
 
        # XXX: the transport interface doesn't let us guard 
386
 
        # against operations there taking a long time.
 
482
        # XXX: the transport interface doesn't let us guard against operations
 
483
        # there taking a long time, so the total elapsed time or poll interval
 
484
        # may be more than was requested.
387
485
        deadline = time.time() + timeout
388
486
        deadline_str = None
389
487
        last_info = None
 
488
        attempt_count = 0
390
489
        while True:
 
490
            attempt_count += 1
391
491
            try:
392
 
                self.attempt_lock()
393
 
                return
 
492
                return self.attempt_lock()
394
493
            except LockContention:
 
494
                # possibly report the blockage, then try again
395
495
                pass
 
496
            # TODO: In a few cases, we find out that there's contention by
 
497
            # reading the held info and observing that it's not ours.  In
 
498
            # those cases it's a bit redundant to read it again.  However,
 
499
            # the normal case (??) is that the rename fails and so we
 
500
            # don't know who holds the lock.  For simplicity we peek
 
501
            # always.
396
502
            new_info = self.peek()
397
 
            mutter('last_info: %s, new info: %s', last_info, new_info)
398
503
            if new_info is not None and new_info != last_info:
399
504
                if last_info is None:
400
505
                    start = 'Unable to obtain'
415
520
                                      formatted_info[2],
416
521
                                      deadline_str)
417
522
 
 
523
            if (max_attempts is not None) and (attempt_count >= max_attempts):
 
524
                self._trace("exceeded %d attempts")
 
525
                raise LockContention(self)
418
526
            if time.time() + poll < deadline:
 
527
                self._trace("waiting %ss", poll)
419
528
                time.sleep(poll)
420
529
            else:
 
530
                self._trace("timeout after waiting %ss", timeout)
421
531
                raise LockContention(self)
422
532
    
423
533
    def leave_in_place(self):
450
560
            self._locked_via_token = True
451
561
            return token
452
562
        else:
453
 
            self.wait_lock()
454
 
            return self.peek().get('nonce')
 
563
            return self.wait_lock()
455
564
 
456
565
    def lock_read(self):
457
566
        """Compatibility-mode shared lock.
468
577
            raise LockContention(self)
469
578
        self._fake_read_lock = True
470
579
 
471
 
    def wait(self, timeout=20, poll=0.5):
472
 
        """Wait a certain period for a lock to be released."""
473
 
        # XXX: the transport interface doesn't let us guard 
474
 
        # against operations there taking a long time.
475
 
        deadline = time.time() + timeout
476
 
        while True:
477
 
            if self.peek():
478
 
                return
479
 
            if time.time() + poll < deadline:
480
 
                time.sleep(poll)
481
 
            else:
482
 
                raise LockContention(self)
483
 
 
484
580
    def _format_lock_info(self, info):
485
581
        """Turn the contents of peek() into something for the user"""
486
582
        lock_url = self.transport.abspath(self.path)
501
597
                lock_token = info.get('nonce')
502
598
            if token != lock_token:
503
599
                raise errors.TokenMismatch(token, lock_token)
 
600
            else:
 
601
                self._trace("revalidated by token %r", token)
504
602
 
 
603
    def _trace(self, format, *args):
 
604
        if 'lock' not in debug.debug_flags:
 
605
            return
 
606
        mutter(str(self) + ": " + (format % args))