~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/memory.py

(jelmer) Use the absolute_import feature everywhere in bzrlib,
 and add a source test to make sure it's used everywhere. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
so this is primarily useful for testing.
21
21
"""
22
22
 
 
23
from __future__ import absolute_import
 
24
 
23
25
import os
24
26
import errno
25
 
import re
26
27
from stat import S_IFREG, S_IFDIR
27
28
from cStringIO import StringIO
28
 
import warnings
29
29
 
 
30
from bzrlib import (
 
31
    transport,
 
32
    urlutils,
 
33
    )
30
34
from bzrlib.errors import (
31
35
    FileExists,
32
36
    LockError,
33
37
    InProcessTransport,
34
38
    NoSuchFile,
35
 
    TransportError,
36
39
    )
37
 
from bzrlib.trace import mutter
38
40
from bzrlib.transport import (
39
41
    AppendBasedFileStream,
40
42
    _file_streams,
41
43
    LateReadError,
42
 
    register_transport,
43
 
    Server,
44
 
    Transport,
45
44
    )
46
 
import bzrlib.urlutils as urlutils
47
45
 
48
46
 
49
47
 
61
59
            self.st_mode = S_IFDIR | perms
62
60
 
63
61
 
64
 
class MemoryTransport(Transport):
 
62
class MemoryTransport(transport.Transport):
65
63
    """This is an in memory file system for transient data storage."""
66
64
 
67
65
    def __init__(self, url=""):
81
79
 
82
80
    def clone(self, offset=None):
83
81
        """See Transport.clone()."""
84
 
        path = self._combine_paths(self._cwd, offset)
 
82
        path = urlutils.URL._combine_paths(self._cwd, offset)
85
83
        if len(path) == 0 or path[-1] != '/':
86
84
            path += '/'
87
85
        url = self._scheme + path
289
287
            raise LockError('File %r already locked' % (self.path,))
290
288
        self.transport._locks[self.path] = self
291
289
 
292
 
    def __del__(self):
293
 
        # Should this warn, or actually try to cleanup?
294
 
        if self.transport:
295
 
            warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
296
 
            self.unlock()
297
 
 
298
290
    def unlock(self):
299
291
        del self.transport._locks[self.path]
300
292
        self.transport = None
301
293
 
302
294
 
303
 
class MemoryServer(Server):
 
295
class MemoryServer(transport.Server):
304
296
    """Server for the MemoryTransport for testing with."""
305
297
 
306
 
    def setUp(self):
307
 
        """See bzrlib.transport.Server.setUp."""
 
298
    def start_server(self):
308
299
        self._dirs = {'/':None}
309
300
        self._files = {}
310
301
        self._locks = {}
311
302
        self._scheme = "memory+%s:///" % id(self)
312
303
        def memory_factory(url):
313
 
            result = MemoryTransport(url)
 
304
            from bzrlib.transport import memory
 
305
            result = memory.MemoryTransport(url)
314
306
            result._dirs = self._dirs
315
307
            result._files = self._files
316
308
            result._locks = self._locks
317
309
            return result
318
 
        register_transport(self._scheme, memory_factory)
 
310
        self._memory_factory = memory_factory
 
311
        transport.register_transport(self._scheme, self._memory_factory)
319
312
 
320
 
    def tearDown(self):
321
 
        """See bzrlib.transport.Server.tearDown."""
 
313
    def stop_server(self):
322
314
        # unregister this server
 
315
        transport.unregister_transport(self._scheme, self._memory_factory)
323
316
 
324
317
    def get_url(self):
325
318
        """See bzrlib.transport.Server.get_url."""
326
319
        return self._scheme
327
320
 
 
321
    def get_bogus_url(self):
 
322
        raise NotImplementedError
 
323
 
328
324
 
329
325
def get_test_permutations():
330
326
    """Return the permutations to be used in testing."""