~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/local.py

  • Committer: Vincent Ladeuil
  • Date: 2007-07-22 15:44:59 UTC
  • mto: This revision was merged to the branch mainline in revision 2646.
  • Revision ID: v.ladeuil+lp@free.fr-20070722154459-520ws2gnifghkpgy
From review comments, use a private scheme for testing.

* bzrlib/transport/__init__.py:
(_unregister_urlparse_netloc_protocol): New function.

* bzrlib/tests/transport_util.py:
(InstrumentedTransport.__init__): Use a dedicated scheme.
(TestCaseWithConnectionHookedTransport.setUp): Reworked to
register the new transport.
(TestCaseWithConnectionHookedTransport.get_url): Use our dedicated
scheme.
(TestCaseWithConnectionHookedTransport.install_hooks,
TestCaseWithConnectionHookedTransport.reset_hooks): Registering
transport is setUp job.

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
    osutils,
34
34
    urlutils,
35
35
    symbol_versioning,
36
 
    transport,
37
36
    )
38
37
from bzrlib.trace import mutter
39
38
from bzrlib.transport import LateReadError
66
65
        super(LocalTransport, self).__init__(base)
67
66
        self._local_base = urlutils.local_path_from_url(base)
68
67
 
 
68
    def should_cache(self):
 
69
        return False
 
70
 
69
71
    def clone(self, offset=None):
70
72
        """Return a new LocalTransport with root at self.base + offset
71
73
        Because the local filesystem does not require a connection, 
125
127
            abspath = u'.'
126
128
 
127
129
        return urlutils.file_relpath(
128
 
            urlutils.strip_trailing_slash(self.base),
 
130
            urlutils.strip_trailing_slash(self.base), 
129
131
            urlutils.strip_trailing_slash(abspath))
130
132
 
131
133
    def has(self, relpath):
136
138
 
137
139
        :param relpath: The relative path to the file
138
140
        """
139
 
        canonical_url = self.abspath(relpath)
140
 
        if canonical_url in transport._file_streams:
141
 
            transport._file_streams[canonical_url].flush()
142
141
        try:
143
142
            path = self._abspath(relpath)
144
143
            return open(path, 'rb')
302
301
        """Create a directory at the given path."""
303
302
        self._mkdir(self._abspath(relpath), mode=mode)
304
303
 
305
 
    def open_write_stream(self, relpath, mode=None):
306
 
        """See Transport.open_write_stream."""
307
 
        # initialise the file
308
 
        self.put_bytes_non_atomic(relpath, "", mode=mode)
309
 
        handle = open(self._abspath(relpath), 'wb')
310
 
        transport._file_streams[self.abspath(relpath)] = handle
311
 
        return transport.FileFileStream(self, relpath, handle)
312
 
 
313
304
    def _get_append_file(self, relpath, mode=None):
314
305
        """Call os.open() for the given relpath"""
315
306
        file_abspath = self._abspath(relpath)