1
# Copyright (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16
"""Implementation of Transport that uses memory for its storage."""
22
from cStringIO import StringIO
24
from bzrlib.trace import mutter
25
from bzrlib.errors import TransportError, NoSuchFile, FileExists
26
from bzrlib.transport import Transport, register_transport, Server
28
class MemoryStat(object):
30
def __init__(self, size, is_dir, perms):
35
self.st_mode = S_IFREG | perms
37
self.st_mode = S_IFDIR | perms
40
class MemoryTransport(Transport):
41
"""This is the transport agent for local filesystem access."""
43
def __init__(self, url=""):
44
"""Set the 'base' path where files will be stored."""
49
super(MemoryTransport, self).__init__(url)
50
self._cwd = url[url.find(':') + 1:]
54
def clone(self, offset=None):
55
"""See Transport.clone()."""
58
segments = offset.split('/')
59
cwdsegments = self._cwd.split('/')[:-1]
61
segment = segments.pop(0)
65
if len(cwdsegments) > 1:
68
cwdsegments.append(segment)
69
url = self.base[:self.base.find(':') + 1] + '/'.join(cwdsegments) + '/'
70
result = MemoryTransport(url)
71
result._dirs = self._dirs
72
result._files = self._files
75
def abspath(self, relpath):
76
"""See Transport.abspath()."""
77
return self.base[:-1] + self._abspath(relpath)
79
def append(self, relpath, f):
80
"""See Transport.append()."""
81
_abspath = self._abspath(relpath)
82
self._check_parent(_abspath)
83
orig_content, orig_mode = self._files.get(_abspath, ("", None))
84
self._files[_abspath] = (orig_content + f.read(), orig_mode)
86
def _check_parent(self, _abspath):
87
dir = os.path.dirname(_abspath)
89
if not dir in self._dirs:
90
raise NoSuchFile(_abspath)
92
def has(self, relpath):
93
"""See Transport.has()."""
94
_abspath = self._abspath(relpath)
95
return _abspath in self._files or _abspath in self._dirs
97
def delete(self, relpath):
98
"""See Transport.delete()."""
99
_abspath = self._abspath(relpath)
100
if not _abspath in self._files:
101
raise NoSuchFile(relpath)
102
del self._files[_abspath]
104
def get(self, relpath):
105
"""See Transport.get()."""
106
_abspath = self._abspath(relpath)
107
if not _abspath in self._files:
108
raise NoSuchFile(relpath)
109
return StringIO(self._files[_abspath][0])
111
def put(self, relpath, f, mode=None):
112
"""See Transport.put()."""
113
_abspath = self._abspath(relpath)
114
self._check_parent(_abspath)
115
self._files[_abspath] = (f.read(), mode)
117
def mkdir(self, relpath, mode=None):
118
"""See Transport.mkdir()."""
119
_abspath = self._abspath(relpath)
120
self._check_parent(_abspath)
121
if _abspath in self._dirs:
122
raise FileExists(relpath)
123
self._dirs[_abspath]=mode
126
"""See Transport.listable."""
129
def iter_files_recursive(self):
130
for file in self._files:
131
if file.startswith(self._cwd):
132
yield file[len(self._cwd):]
134
def list_dir(self, relpath):
135
"""See Transport.list_dir()."""
136
_abspath = self._abspath(relpath)
137
if _abspath != '/' and _abspath not in self._dirs:
138
raise NoSuchFile(relpath)
140
for path in self._files:
141
if (path.startswith(_abspath) and
142
path[len(_abspath) + 1:].find('/') == -1 and
143
len(path) > len(_abspath)):
144
result.append(path[len(_abspath) + 1:])
145
for path in self._dirs:
146
if (path.startswith(_abspath) and
147
path[len(_abspath) + 1:].find('/') == -1 and
148
len(path) > len(_abspath)):
149
result.append(path[len(_abspath) + 1:])
152
def stat(self, relpath):
153
"""See Transport.stat()."""
154
_abspath = self._abspath(relpath)
155
if _abspath in self._files:
156
return MemoryStat(len(self._files[_abspath][0]), False,
157
self._files[_abspath][1])
159
return MemoryStat(0, True, None)
160
elif _abspath in self._dirs:
161
return MemoryStat(0, True, self._dirs[_abspath])
163
raise NoSuchFile(relpath)
165
# def lock_read(self, relpath):
168
# def lock_write(self, relpath):
171
def _abspath(self, relpath):
172
"""Generate an internal absolute path."""
173
if relpath.find('..') != -1:
174
raise AssertionError('relpath contains ..')
176
return self._cwd[:-1]
177
if relpath.endswith('/'):
178
relpath = relpath[:-1]
179
return self._cwd + relpath
182
class MemoryServer(Server):
183
"""Server for the MemoryTransport for testing with."""
186
"""See bzrlib.transport.Server.setUp."""
187
self._scheme = "memory+%s:" % id(self)
188
register_transport(self._scheme, MemoryTransport)
191
"""See bzrlib.transport.Server.tearDown."""
192
# unregister this server
195
"""See bzrlib.transport.Server.get_url."""
199
def get_test_permutations():
200
"""Return the permutations to be used in testing."""
201
return [(MemoryTransport, MemoryServer),