13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Implementation of Transport that uses memory for its storage.
82
80
def clone(self, offset=None):
83
81
"""See Transport.clone()."""
84
path = urlutils.URL._combine_paths(self._cwd, offset)
82
path = self._combine_paths(self._cwd, offset)
85
83
if len(path) == 0 or path[-1] != '/':
87
85
url = self._scheme + path
88
result = self.__class__(url)
86
result = MemoryTransport(url)
89
87
result._dirs = self._dirs
90
88
result._files = self._files
91
89
result._locks = self._locks
158
156
'undefined', bytes, 0, 1,
159
157
'put_file must be given a file of bytes, not unicode.')
160
158
self._files[_abspath] = (bytes, mode)
163
160
def mkdir(self, relpath, mode=None):
164
161
"""See Transport.mkdir()."""
168
165
raise FileExists(relpath)
169
166
self._dirs[_abspath]=mode
171
def open_write_stream(self, relpath, mode=None):
172
"""See Transport.open_write_stream."""
173
self.put_bytes(relpath, "", mode)
174
result = AppendBasedFileStream(self, relpath)
175
_file_streams[self.abspath(relpath)] = result
178
168
def listable(self):
179
169
"""See Transport.listable."""
243
233
"""See Transport.stat()."""
244
234
_abspath = self._abspath(relpath)
245
235
if _abspath in self._files:
246
return MemoryStat(len(self._files[_abspath][0]), False,
236
return MemoryStat(len(self._files[_abspath][0]), False,
247
237
self._files[_abspath][1])
248
238
elif _abspath in self._dirs:
249
239
return MemoryStat(0, True, self._dirs[_abspath])
261
251
def _abspath(self, relpath):
262
252
"""Generate an internal absolute path."""
263
253
relpath = urlutils.unescape(relpath)
264
if relpath[:1] == '/':
254
if relpath.find('..') != -1:
255
raise AssertionError('relpath contains ..')
258
if relpath[0] == '/':
266
cwd_parts = self._cwd.split('/')
267
rel_parts = relpath.split('/')
269
for i in cwd_parts + rel_parts:
272
raise ValueError("illegal relpath %r under %r"
273
% (relpath, self._cwd))
275
elif i == '.' or i == '':
279
return '/' + '/'.join(r)
261
if (self._cwd == '/'):
263
return self._cwd[:-1]
264
if relpath.endswith('/'):
265
relpath = relpath[:-1]
266
if relpath.startswith('./'):
267
relpath = relpath[2:]
268
return self._cwd + relpath
282
271
class _MemoryLock(object):
283
272
"""This makes a lock."""
285
274
def __init__(self, path, transport):
275
assert isinstance(transport, MemoryTransport)
287
277
self.transport = transport
288
278
if self.path in self.transport._locks:
289
279
raise LockError('File %r already locked' % (self.path,))
290
280
self.transport._locks[self.path] = self
283
# Should this warn, or actually try to cleanup?
285
warnings.warn("MemoryLock %r not explicitly unlocked" % (self.path,))
292
288
def unlock(self):
293
289
del self.transport._locks[self.path]
294
290
self.transport = None
297
class MemoryServer(transport.Server):
293
class MemoryServer(Server):
298
294
"""Server for the MemoryTransport for testing with."""
300
def start_server(self):
297
"""See bzrlib.transport.Server.setUp."""
301
298
self._dirs = {'/':None}
304
301
self._scheme = "memory+%s:///" % id(self)
305
302
def memory_factory(url):
306
from bzrlib.transport import memory
307
result = memory.MemoryTransport(url)
303
result = MemoryTransport(url)
308
304
result._dirs = self._dirs
309
305
result._files = self._files
310
306
result._locks = self._locks
312
self._memory_factory = memory_factory
313
transport.register_transport(self._scheme, self._memory_factory)
308
register_transport(self._scheme, memory_factory)
315
def stop_server(self):
311
"""See bzrlib.transport.Server.tearDown."""
316
312
# unregister this server
317
transport.unregister_transport(self._scheme, self._memory_factory)
319
314
def get_url(self):
320
315
"""See bzrlib.transport.Server.get_url."""
321
316
return self._scheme
323
def get_bogus_url(self):
324
raise NotImplementedError
327
319
def get_test_permutations():
328
320
"""Return the permutations to be used in testing."""