~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/memory.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-29 22:03:03 UTC
  • mfrom: (5416.2.6 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20100929220303-cr95h8iwtggco721
(mbp) Add 'break-lock --force'

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
27
27
from cStringIO import StringIO
28
28
import warnings
29
29
 
 
30
from bzrlib import (
 
31
    transport,
 
32
    urlutils,
 
33
    )
30
34
from bzrlib.errors import (
31
35
    FileExists,
32
36
    LockError,
39
43
    AppendBasedFileStream,
40
44
    _file_streams,
41
45
    LateReadError,
42
 
    register_transport,
43
 
    Server,
44
 
    Transport,
45
46
    )
46
 
import bzrlib.urlutils as urlutils
47
47
 
48
48
 
49
49
 
61
61
            self.st_mode = S_IFDIR | perms
62
62
 
63
63
 
64
 
class MemoryTransport(Transport):
 
64
class MemoryTransport(transport.Transport):
65
65
    """This is an in memory file system for transient data storage."""
66
66
 
67
67
    def __init__(self, url=""):
85
85
        if len(path) == 0 or path[-1] != '/':
86
86
            path += '/'
87
87
        url = self._scheme + path
88
 
        result = MemoryTransport(url)
 
88
        result = self.__class__(url)
89
89
        result._dirs = self._dirs
90
90
        result._files = self._files
91
91
        result._locks = self._locks
300
300
        self.transport = None
301
301
 
302
302
 
303
 
class MemoryServer(Server):
 
303
class MemoryServer(transport.Server):
304
304
    """Server for the MemoryTransport for testing with."""
305
305
 
306
 
    def setUp(self):
307
 
        """See bzrlib.transport.Server.setUp."""
 
306
    def start_server(self):
308
307
        self._dirs = {'/':None}
309
308
        self._files = {}
310
309
        self._locks = {}
311
310
        self._scheme = "memory+%s:///" % id(self)
312
311
        def memory_factory(url):
313
 
            result = MemoryTransport(url)
 
312
            from bzrlib.transport import memory
 
313
            result = memory.MemoryTransport(url)
314
314
            result._dirs = self._dirs
315
315
            result._files = self._files
316
316
            result._locks = self._locks
317
317
            return result
318
 
        register_transport(self._scheme, memory_factory)
 
318
        self._memory_factory = memory_factory
 
319
        transport.register_transport(self._scheme, self._memory_factory)
319
320
 
320
 
    def tearDown(self):
321
 
        """See bzrlib.transport.Server.tearDown."""
 
321
    def stop_server(self):
322
322
        # unregister this server
 
323
        transport.unregister_transport(self._scheme, self._memory_factory)
323
324
 
324
325
    def get_url(self):
325
326
        """See bzrlib.transport.Server.get_url."""
326
327
        return self._scheme
327
328
 
 
329
    def get_bogus_url(self):
 
330
        raise NotImplementedError
 
331
 
328
332
 
329
333
def get_test_permutations():
330
334
    """Return the permutations to be used in testing."""