~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/lockdir.py

Merge in the branch with the extracted lock_write token changes, resolving conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright (C) 2006 Canonical Ltd
2
 
 
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
96
96
 
97
97
import os
98
98
import time
99
 
from warnings import warn
100
 
from StringIO import StringIO
 
99
from cStringIO import StringIO
101
100
 
 
101
from bzrlib import (
 
102
    errors,
 
103
    )
102
104
import bzrlib.config
103
105
from bzrlib.errors import (
104
106
        DirectoryNotEmpty,
106
108
        LockBreakMismatch,
107
109
        LockBroken,
108
110
        LockContention,
109
 
        LockError,
110
111
        LockNotHeld,
111
112
        NoSuchFile,
112
113
        PathError,
113
114
        ResourceBusy,
114
115
        UnlockableTransport,
115
116
        )
116
 
from bzrlib.trace import mutter
 
117
from bzrlib.trace import mutter, note
117
118
from bzrlib.transport import Transport
118
 
from bzrlib.osutils import rand_chars
119
 
from bzrlib.rio import RioWriter, read_stanza, Stanza
 
119
from bzrlib.osutils import rand_chars, format_delta
 
120
from bzrlib.rio import read_stanza, Stanza
 
121
import bzrlib.ui
 
122
 
120
123
 
121
124
# XXX: At the moment there is no consideration of thread safety on LockDir
122
125
# objects.  This should perhaps be updated - e.g. if two threads try to take a
132
135
# TODO: Make sure to pass the right file and directory mode bits to all
133
136
# files/dirs created.
134
137
 
 
138
 
135
139
_DEFAULT_TIMEOUT_SECONDS = 300
136
 
_DEFAULT_POLL_SECONDS = 0.5
 
140
_DEFAULT_POLL_SECONDS = 1.0
 
141
 
137
142
 
138
143
class LockDir(object):
139
144
    """Write-lock guarding access to data."""
155
160
        self.transport = transport
156
161
        self.path = path
157
162
        self._lock_held = False
 
163
        self._locked_via_token = False
158
164
        self._fake_read_lock = False
159
165
        self._held_dir = path + '/held'
160
166
        self._held_info_path = self._held_dir + self.__INFO_NAME
161
167
        self._file_modebits = file_modebits
162
168
        self._dir_modebits = dir_modebits
163
 
        self.nonce = rand_chars(20)
 
169
 
 
170
        self._report_function = note
164
171
 
165
172
    def __repr__(self):
166
173
        return '%s(%s%s)' % (self.__class__.__name__,
191
198
            raise UnlockableTransport(self.transport)
192
199
        try:
193
200
            tmpname = '%s/pending.%s.tmp' % (self.path, rand_chars(20))
194
 
            self.transport.mkdir(tmpname)
195
 
            sio = StringIO()
196
 
            self._prepare_info(sio)
197
 
            sio.seek(0)
198
 
            # append will create a new file; we use append rather than put
199
 
            # because we don't want to write to a temporary file and rename
200
 
            # into place, because that's going to happen to the whole
201
 
            # directory
202
 
            self.transport.append(tmpname + self.__INFO_NAME, sio)
 
201
            try:
 
202
                self.transport.mkdir(tmpname)
 
203
            except NoSuchFile:
 
204
                # This may raise a FileExists exception
 
205
                # which is okay, it will be caught later and determined
 
206
                # to be a LockContention.
 
207
                self.create(mode=self._dir_modebits)
 
208
                
 
209
                # After creating the lock directory, try again
 
210
                self.transport.mkdir(tmpname)
 
211
 
 
212
            self.nonce = rand_chars(20)
 
213
            info_bytes = self._prepare_info()
 
214
            # We use put_file_non_atomic because we just created a new unique
 
215
            # directory so we don't have to worry about files existing there.
 
216
            # We'll rename the whole directory into place to get atomic
 
217
            # properties
 
218
            self.transport.put_bytes_non_atomic(tmpname + self.__INFO_NAME,
 
219
                                                info_bytes)
 
220
 
203
221
            self.transport.rename(tmpname, self._held_dir)
204
222
            self._lock_held = True
205
223
            self.confirm()
 
224
        except errors.PermissionDenied:
 
225
            raise
206
226
        except (PathError, DirectoryNotEmpty, FileExists, ResourceBusy), e:
207
227
            mutter("contention on %r: %s", self, e)
208
228
            raise LockContention(self)
215
235
            return
216
236
        if not self._lock_held:
217
237
            raise LockNotHeld(self)
218
 
        # rename before deleting, because we can't atomically remove the whole
219
 
        # tree
220
 
        tmpname = '%s/releasing.%s.tmp' % (self.path, rand_chars(20))
221
 
        # gotta own it to unlock
222
 
        self.confirm()
223
 
        self.transport.rename(self._held_dir, tmpname)
224
 
        self._lock_held = False
225
 
        self.transport.delete(tmpname + self.__INFO_NAME)
226
 
        self.transport.rmdir(tmpname)
 
238
        if self._locked_via_token:
 
239
            self._locked_via_token = False
 
240
            self._lock_held = False
 
241
        else:
 
242
            # rename before deleting, because we can't atomically remove the
 
243
            # whole tree
 
244
            tmpname = '%s/releasing.%s.tmp' % (self.path, rand_chars(20))
 
245
            # gotta own it to unlock
 
246
            self.confirm()
 
247
            self.transport.rename(self._held_dir, tmpname)
 
248
            self._lock_held = False
 
249
            self.transport.delete(tmpname + self.__INFO_NAME)
 
250
            self.transport.rmdir(tmpname)
227
251
 
228
252
    def break_lock(self):
229
253
        """Break a lock not held by this instance of LockDir.
235
259
        self._check_not_locked()
236
260
        holder_info = self.peek()
237
261
        if holder_info is not None:
238
 
            if bzrlib.ui.ui_factory.get_boolean(
239
 
                "Break lock %s held by %s@%s [process #%s]" % (
240
 
                    self.transport,
241
 
                    holder_info["user"],
242
 
                    holder_info["hostname"],
243
 
                    holder_info["pid"])):
 
262
            lock_info = '\n'.join(self._format_lock_info(holder_info))
 
263
            if bzrlib.ui.ui_factory.get_boolean("Break %s" % lock_info):
244
264
                self.force_break(holder_info)
245
265
        
246
266
    def force_break(self, dead_holder_info):
327
347
        except NoSuchFile, e:
328
348
            return None
329
349
 
330
 
    def _prepare_info(self, outf):
 
350
    def _prepare_info(self):
331
351
        """Write information about a pending lock to a temporary file.
332
352
        """
333
353
        import socket
334
354
        # XXX: is creating this here inefficient?
335
355
        config = bzrlib.config.GlobalConfig()
 
356
        try:
 
357
            user = config.user_email()
 
358
        except errors.NoEmailInUsername:
 
359
            user = config.username()
336
360
        s = Stanza(hostname=socket.gethostname(),
337
361
                   pid=str(os.getpid()),
338
362
                   start_time=str(int(time.time())),
339
363
                   nonce=self.nonce,
340
 
                   user=config.user_email(),
 
364
                   user=user,
341
365
                   )
342
 
        RioWriter(outf).write_stanza(s)
 
366
        return s.to_string()
343
367
 
344
368
    def _parse_info(self, info_file):
345
369
        return read_stanza(info_file.readlines()).as_dict()
346
370
 
347
 
    def wait_lock(self, timeout=_DEFAULT_TIMEOUT_SECONDS,
348
 
                  poll=_DEFAULT_POLL_SECONDS):
 
371
    def wait_lock(self, timeout=None, poll=None):
349
372
        """Wait a certain period for a lock.
350
373
 
351
374
        If the lock can be acquired within the bounded time, it
354
377
        approximately `timeout` seconds.  (It may be a bit more if
355
378
        a transport operation takes a long time to complete.)
356
379
        """
 
380
        if timeout is None:
 
381
            timeout = _DEFAULT_TIMEOUT_SECONDS
 
382
        if poll is None:
 
383
            poll = _DEFAULT_POLL_SECONDS
 
384
 
357
385
        # XXX: the transport interface doesn't let us guard 
358
386
        # against operations there taking a long time.
359
387
        deadline = time.time() + timeout
 
388
        deadline_str = None
 
389
        last_info = None
360
390
        while True:
361
391
            try:
362
392
                self.attempt_lock()
363
393
                return
364
394
            except LockContention:
365
395
                pass
 
396
            new_info = self.peek()
 
397
            mutter('last_info: %s, new info: %s', last_info, new_info)
 
398
            if new_info is not None and new_info != last_info:
 
399
                if last_info is None:
 
400
                    start = 'Unable to obtain'
 
401
                else:
 
402
                    start = 'Lock owner changed for'
 
403
                last_info = new_info
 
404
                formatted_info = self._format_lock_info(new_info)
 
405
                if deadline_str is None:
 
406
                    deadline_str = time.strftime('%H:%M:%S',
 
407
                                                 time.localtime(deadline))
 
408
                self._report_function('%s %s\n'
 
409
                                      '%s\n' # held by
 
410
                                      '%s\n' # locked ... ago
 
411
                                      'Will continue to try until %s\n',
 
412
                                      start,
 
413
                                      formatted_info[0],
 
414
                                      formatted_info[1],
 
415
                                      formatted_info[2],
 
416
                                      deadline_str)
 
417
 
366
418
            if time.time() + poll < deadline:
367
419
                time.sleep(poll)
368
420
            else:
369
421
                raise LockContention(self)
370
 
 
371
 
    def lock_write(self):
372
 
        """Wait for and acquire the lock."""
373
 
        self.attempt_lock()
 
422
    
 
423
    def leave_in_place(self):
 
424
        self._locked_via_token = True
 
425
 
 
426
    def dont_leave_in_place(self):
 
427
        self._locked_via_token = False
 
428
 
 
429
    def lock_write(self, token=None):
 
430
        """Wait for and acquire the lock.
 
431
        
 
432
        :param token: if this is already locked, then lock_write will fail
 
433
            unless the token matches the existing lock.
 
434
        :returns: a token if this instance supports tokens, otherwise None.
 
435
        :raises TokenLockingNotSupported: when a token is given but this
 
436
            instance doesn't support using token locks.
 
437
        :raises MismatchedToken: if the specified token doesn't match the token
 
438
            of the existing lock.
 
439
 
 
440
        A token should be passed in if you know that you have locked the object
 
441
        some other way, and need to synchronise this object's state with that
 
442
        fact.
 
443
         
 
444
        XXX: docstring duplicated from LockableFiles.lock_write.
 
445
        """
 
446
        if token is not None:
 
447
            self.validate_token(token)
 
448
            self.nonce = token
 
449
            self._lock_held = True
 
450
            self._locked_via_token = True
 
451
            return token
 
452
        else:
 
453
            self.wait_lock()
 
454
            return self.peek().get('nonce')
374
455
 
375
456
    def lock_read(self):
376
457
        """Compatibility-mode shared lock.
400
481
            else:
401
482
                raise LockContention(self)
402
483
 
 
484
    def _format_lock_info(self, info):
 
485
        """Turn the contents of peek() into something for the user"""
 
486
        lock_url = self.transport.abspath(self.path)
 
487
        delta = time.time() - int(info['start_time'])
 
488
        return [
 
489
            'lock %s' % (lock_url,),
 
490
            'held by %(user)s on host %(hostname)s [process #%(pid)s]' % info,
 
491
            'locked %s' % (format_delta(delta),),
 
492
            ]
 
493
 
 
494
    def validate_token(self, token):
 
495
        if token is not None:
 
496
            info = self.peek()
 
497
            if info is None:
 
498
                # Lock isn't held
 
499
                lock_token = None
 
500
            else:
 
501
                lock_token = info.get('nonce')
 
502
            if token != lock_token:
 
503
                raise errors.TokenMismatch(token, lock_token)
 
504