58
58
def __iter__(self):
59
59
raise NotImplementedError
61
def add(self, f, fileid):
61
def add(self, fileid, f):
62
62
"""Add a file object f to the store accessible from the given fileid"""
63
63
raise NotImplementedError('Children of Storage must define their method of adding entries.')
65
def add_multi(self, entries):
66
"""Add a series of file-like or string objects to the store with the given
69
:param entries: A list of tuples of id,file pairs [(id1, file1), (id2, file2), ...]
70
This could also be a generator yielding (id,file) pairs.
72
for fileid, f in entries:
65
75
def copy_multi(self, other, ids):
66
76
"""Copy texts for ids from other into self.
68
78
If an id is present in self, it is skipped. A count of copied
69
79
ids is returned, which may be less than len(ids).
81
:param other: Another Storage object
82
:param ids: A list of entry ids to be copied
83
:return: The number of entries copied
71
85
from bzrlib.progress import ProgressBar
73
87
pb.update('preparing to copy')
74
to_copy = [fileid for fileid in ids if fileid not in self]
88
to_copy = [fileid for fileid in text_ids if fileid not in self]
75
89
return self._do_copy(other, to_copy, pb)
77
91
def _do_copy(self, other, to_copy, pb):
78
92
"""This is the standard copying mechanism, just get them one at
79
93
a time from remote, and store them locally.
95
:param other: Another Storage object
96
:param to_copy: A list of entry ids to copy
97
:param pb: A ProgressBar object to display completion status.
98
:return: The number of entries copied.
100
# This should be updated to use add_multi() rather than
101
# the current methods of buffering requests.
102
# One question, is it faster to queue up 1-10 and then copy 1-10
103
# then queue up 11-20, copy 11-20
104
# or to queue up 1-10, copy 1, queue 11, copy 2, etc?
105
# sort of pipeline versus batch.
82
buffered_requests = []
83
for fileid in to_copy:
84
buffered_requests.append((other[fileid], fileid))
85
if len(buffered_requests) > self._max_buffered_requests:
86
self.add(*buffered_requests.pop(0))
107
def buffer_requests():
108
buffered_requests = []
109
for fileid in to_copy:
110
buffered_requests.append((fileid, other[fileid]))
111
if len(buffered_requests) > self._max_buffered_requests:
112
yield buffered_requests.pop(0)
114
pb.update('copy', count, len(to_copy))
116
for req in buffered_requests:
88
119
pb.update('copy', count, len(to_copy))
90
for req in buffered_requests:
93
pb.update('copy', count, len(to_copy))
121
self.add_multi(buffer_requests())
95
123
assert count == len(to_copy)
101
127
class CompressedTextStore(Storage):
102
128
"""Store that holds files indexed by unique names.
133
159
def __init__(self, basedir):
134
160
super(CompressedTextStore, self).__init__(basedir)
136
def _path(self, fileid):
162
def _check_fileid(self, fileid):
137
163
if '\\' in fileid or '/' in fileid:
138
164
raise ValueError("invalid store id %r" % fileid)
139
return self._transport.get_filename(fileid)
166
def _relpath(self, fileid):
167
self._check_fileid(fileid)
168
return fileid + '.gz'
141
170
def __repr__(self):
142
171
return "%s(%r)" % (self.__class__.__name__, self._location)
144
def add(self, f, fileid, compressed=True):
173
def add(self, fileid, f):
145
174
"""Add contents of a file into the store.
147
176
f -- An open file, or file-like object."""
148
# FIXME: Only works on files that will fit in memory
177
# TODO: implement an add_multi which can do some of it's
178
# own piplelining, and possible take advantage of
179
# transport.put_multi(). The problem is that
180
# entries potentially need to be compressed as they
181
# are received, which implies translation, which
182
# means it isn't as straightforward as we would like.
150
183
from cStringIO import StringIO
184
from bzrlib.osutils import pumpfile
152
186
mutter("add store entry %r" % (fileid))
153
187
if isinstance(f, basestring):
158
if self._transport.has(fileid) or self._transport.has(fileid + '.gz'):
159
raise BzrError("store %r already contains id %r" % (self._location, fileid))
190
fn = self._relpath(fileid)
191
if self._transport.has(fn):
192
raise BzrError("store %r already contains id %r" % (self._transport.base, fileid))
167
gf = gzip.GzipFile(mode='wb', fileobj=sio)
196
gf = gzip.GzipFile(mode='wb', fileobj=sio)
197
# if pumpfile handles files that don't fit in ram,
198
# so will this function
173
202
self._transport.put(fn, sio)
177
206
return self._copy_multi_text(other, to_copy, pb)
178
207
return super(CompressedTextStore, self)._do_copy(other, to_copy, pb)
181
209
def _copy_multi_text(self, other, to_copy, pb):
182
from shutil import copyfile
186
other_p = other._path(id)
190
if e.errno == errno.ENOENT:
191
copyfile(other_p+".gz", p+".gz")
196
pb.update('copy', count, len(to_copy))
210
# Because of _transport, we can no longer assume
211
# that they are on the same filesystem, we can, however
212
# assume that we only need to copy the exact bytes,
213
# we don't need to process the files.
215
paths = [self._relpath(fileid) for fileid in to_copy]
216
count = self._transport.put_multi(
217
zip(paths, other._transport.get_multi(paths, pb=pb)))
197
218
assert count == len(to_copy)
202
222
def __contains__(self, fileid):
204
p = self._path(fileid)
205
return (os.access(p, os.R_OK)
206
or os.access(p + '.gz', os.R_OK))
224
fn = self._relpath(fileid)
225
return self._transport.has(fn)
208
227
# TODO: Guard against the same thing being stored twice, compressed and uncompresse
210
229
def __iter__(self):
211
for f in os.listdir(self._location):
230
# TODO: case-insensitive?
231
for f in self._transport.list_dir('.'):
212
232
if f[-3:] == '.gz':
213
# TODO: case-insensitive?
218
237
def __len__(self):
219
return len(os.listdir(self._location))
238
return len([f for f in self._transport.list_dir('.')])
221
240
def __getitem__(self, fileid):
222
241
"""Returns a file reading from a particular entry."""
223
p = self._path(fileid)
225
return gzip.GzipFile(p + '.gz', 'rb')
227
if e.errno == errno.ENOENT:
242
fn = self._relpath(fileid)
243
f = self._transport.get(fn)
244
return gzip.GzipFile(mode='rb', fileobj=f)
232
246
def total_size(self):
233
247
"""Return (count, bytes)
253
relpaths = [self._relpath(fid) for fid in self]
255
for st in self._transport.stat_multi(relpaths):
243
total += os.stat(p)[ST_SIZE]
245
total += os.stat(p + '.gz')[ST_SIZE]
247
259
return count, total
252
class ScratchFlatTextStore(CompressedTextStore):
261
class ScratchCompressedTextStore(CompressedTextStore):
253
262
"""Self-destructing test subclass of ImmutableStore.
255
264
The Store only exists for the lifetime of the Python object.
256
265
Obviously you should not put anything precious in it.
258
267
def __init__(self):
259
super(ScratchFlatTextStore, self).__init__(tempfile.mkdtemp())
268
from transport import transport
269
super(ScratchCompressedTextStore, self).__init__(transport(tempfile.mkdtemp()))
261
271
def __del__(self):
262
for f in os.listdir(self._location):
263
fpath = os.path.join(self._location, f)
264
# needed on windows, and maybe some other filesystems
265
os.chmod(fpath, 0600)
267
os.rmdir(self._location)
272
self._transport.delete_multi(self._transport.list_dir('.'))
273
os.rmdir(self._transport.base)
268
274
mutter("%r destroyed" % self)