~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/memory.py

  • Committer: Martin Pool
  • Date: 2007-07-13 04:22:17 UTC
  • mto: This revision was merged to the branch mainline in revision 2618.
  • Revision ID: mbp@sourcefrog.net-20070713042217-mnkwb9przs8x2de0
Move bencode tests from within their module into our test suite

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Implementation of Transport that uses memory for its storage.
18
18
 
20
20
so this is primarily useful for testing.
21
21
"""
22
22
 
23
 
from __future__ import absolute_import
24
 
 
25
23
import os
26
24
import errno
 
25
import re
27
26
from stat import S_IFREG, S_IFDIR
28
27
from cStringIO import StringIO
 
28
import warnings
29
29
 
30
 
from bzrlib import (
31
 
    transport,
32
 
    urlutils,
33
 
    )
34
30
from bzrlib.errors import (
35
31
    FileExists,
36
32
    LockError,
37
33
    InProcessTransport,
38
34
    NoSuchFile,
 
35
    TransportError,
39
36
    )
 
37
from bzrlib.trace import mutter
40
38
from bzrlib.transport import (
41
 
    AppendBasedFileStream,
42
 
    _file_streams,
43
39
    LateReadError,
 
40
    register_transport,
 
41
    Server,
 
42
    Transport,
44
43
    )
 
44
import bzrlib.urlutils as urlutils
45
45
 
46
46
 
47
47
 
59
59
            self.st_mode = S_IFDIR | perms
60
60
 
61
61
 
62
 
class MemoryTransport(transport.Transport):
 
62
class MemoryTransport(Transport):
63
63
    """This is an in memory file system for transient data storage."""
64
64
 
65
65
    def __init__(self, url=""):
79
79
 
80
80
    def clone(self, offset=None):
81
81
        """See Transport.clone()."""
82
 
        path = urlutils.URL._combine_paths(self._cwd, offset)
 
82
        path = self._combine_paths(self._cwd, offset)
83
83
        if len(path) == 0 or path[-1] != '/':
84
84
            path += '/'
85
85
        url = self._scheme + path
86
 
        result = self.__class__(url)
 
86
        result = MemoryTransport(url)
87
87
        result._dirs = self._dirs
88
88
        result._files = self._files
89
89
        result._locks = self._locks
148
148
        """See Transport.put_file()."""
149
149
        _abspath = self._abspath(relpath)
150
150
        self._check_parent(_abspath)
151
 
        raw_bytes = f.read()
152
 
        self._files[_abspath] = (raw_bytes, mode)
153
 
        return len(raw_bytes)
 
151
        bytes = f.read()
 
152
        if type(bytes) is not str:
 
153
            # Although not strictly correct, we raise UnicodeEncodeError to be
 
154
            # compatible with other transports.
 
155
            raise UnicodeEncodeError(
 
156
                'undefined', bytes, 0, 1,
 
157
                'put_file must be given a file of bytes, not unicode.')
 
158
        self._files[_abspath] = (bytes, mode)
154
159
 
155
160
    def mkdir(self, relpath, mode=None):
156
161
        """See Transport.mkdir()."""
160
165
            raise FileExists(relpath)
161
166
        self._dirs[_abspath]=mode
162
167
 
163
 
    def open_write_stream(self, relpath, mode=None):
164
 
        """See Transport.open_write_stream."""
165
 
        self.put_bytes(relpath, "", mode)
166
 
        result = AppendBasedFileStream(self, relpath)
167
 
        _file_streams[self.abspath(relpath)] = result
168
 
        return result
169
 
 
170
168
    def listable(self):
171
169
        """See Transport.listable."""
172
170
        return True
175
173
        for file in self._files:
176
174
            if file.startswith(self._cwd):
177
175
                yield urlutils.escape(file[len(self._cwd):])
178
 
 
 
176
    
179
177
    def list_dir(self, relpath):
180
178
        """See Transport.list_dir()."""
181
179
        _abspath = self._abspath(relpath)
214
212
                    del container[path]
215
213
        do_renames(self._files)
216
214
        do_renames(self._dirs)
217
 
 
 
215
    
218
216
    def rmdir(self, relpath):
219
217
        """See Transport.rmdir."""
220
218
        _abspath = self._abspath(relpath)
235
233
        """See Transport.stat()."""
236
234
        _abspath = self._abspath(relpath)
237
235
        if _abspath in self._files:
238
 
            return MemoryStat(len(self._files[_abspath][0]), False,
 
236
            return MemoryStat(len(self._files[_abspath][0]), False, 
239
237
                              self._files[_abspath][1])
240
238
        elif _abspath in self._dirs:
241
239
            return MemoryStat(0, True, self._dirs[_abspath])
253
251
    def _abspath(self, relpath):
254
252
        """Generate an internal absolute path."""
255
253
        relpath = urlutils.unescape(relpath)
256
 
        if relpath[:1] == '/':
 
254
        if relpath.find('..') != -1:
 
255
            raise AssertionError('relpath contains ..')
 
256
        if relpath == '':
 
257
            return '/'
 
258
        if relpath[0] == '/':
257
259
            return relpath
258
 
        cwd_parts = self._cwd.split('/')
259
 
        rel_parts = relpath.split('/')
260
 
        r = []
261
 
        for i in cwd_parts + rel_parts:
262
 
            if i == '..':
263
 
                if not r:
264
 
                    raise ValueError("illegal relpath %r under %r"
265
 
                        % (relpath, self._cwd))
266
 
                r = r[:-1]
267
 
            elif i == '.' or i == '':
268
 
                pass
269
 
            else:
270
 
                r.append(i)
271
 
        return '/' + '/'.join(r)
 
260
        if relpath == '.':
 
261
            if (self._cwd == '/'):
 
262
                return self._cwd
 
263
            return self._cwd[:-1]
 
264
        if relpath.endswith('/'):
 
265
            relpath = relpath[:-1]
 
266
        if relpath.startswith('./'):
 
267
            relpath = relpath[2:]
 
268
        return self._cwd + relpath
272
269
 
273
270
 
274
271
class _MemoryLock(object):
275
272
    """This makes a lock."""
276
273
 
277
274
    def __init__(self, path, transport):
 
275
        assert isinstance(transport, MemoryTransport)
278
276
        self.path = path
279
277
        self.transport = transport
280
278
        if self.path in self.transport._locks:
281
279
            raise LockError('File %r already locked' % (self.path,))
282
280
        self.transport._locks[self.path] = self
283
281
 
 
282
    def __del__(self):
 
283
        # Should this warn, or actually try to cleanup?
 
284
        if self.transport:
 
285
            warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
 
286
            self.unlock()
 
287
 
284
288
    def unlock(self):
285
289
        del self.transport._locks[self.path]
286
290
        self.transport = None
287
291
 
288
292
 
289
 
class MemoryServer(transport.Server):
 
293
class MemoryServer(Server):
290
294
    """Server for the MemoryTransport for testing with."""
291
295
 
292
 
    def start_server(self):
 
296
    def setUp(self):
 
297
        """See bzrlib.transport.Server.setUp."""
293
298
        self._dirs = {'/':None}
294
299
        self._files = {}
295
300
        self._locks = {}
296
301
        self._scheme = "memory+%s:///" % id(self)
297
302
        def memory_factory(url):
298
 
            from bzrlib.transport import memory
299
 
            result = memory.MemoryTransport(url)
 
303
            result = MemoryTransport(url)
300
304
            result._dirs = self._dirs
301
305
            result._files = self._files
302
306
            result._locks = self._locks
303
307
            return result
304
 
        self._memory_factory = memory_factory
305
 
        transport.register_transport(self._scheme, self._memory_factory)
 
308
        register_transport(self._scheme, memory_factory)
306
309
 
307
 
    def stop_server(self):
 
310
    def tearDown(self):
 
311
        """See bzrlib.transport.Server.tearDown."""
308
312
        # unregister this server
309
 
        transport.unregister_transport(self._scheme, self._memory_factory)
310
313
 
311
314
    def get_url(self):
312
315
        """See bzrlib.transport.Server.get_url."""
313
316
        return self._scheme
314
317
 
315
 
    def get_bogus_url(self):
316
 
        raise NotImplementedError
317
 
 
318
318
 
319
319
def get_test_permutations():
320
320
    """Return the permutations to be used in testing."""