45
"""This class represents the abstract storage layout for saving information.
37
class ImmutableStore(object):
38
"""Store that holds files indexed by unique names.
47
40
Files can be added, but not modified once they are in. Typically
48
41
the hash is used as the name, or something else known to be unique,
44
>>> st = ImmutableScratchStore()
46
>>> st.add(StringIO('hello'), 'aa')
52
You are not allowed to add an id that is already present.
54
Entries can be retrieved as files, which may then be read.
56
>>> st.add(StringIO('goodbye'), '123123')
57
>>> st['123123'].read()
60
TODO: Atomic add by writing to a temporary file and renaming.
62
In bzr 0.0.5 and earlier, files within the store were marked
63
readonly on disk. This is no longer done but existing stores need
53
raise NotImplementedError('Children should define their length')
55
def __getitem__(self, fileid):
56
"""Returns a file reading from a particular entry."""
57
raise NotImplementedError
59
def __contains__(self, fileid):
61
raise NotImplementedError
64
raise NotImplementedError
66
def add(self, f, fileid):
67
"""Add a file object f to the store accessible from the given fileid"""
68
raise NotImplementedError('Children of Store must define their method of adding entries.')
70
def add_multi(self, entries):
71
"""Add a series of file-like or string objects to the store with the given
74
:param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
75
This could also be a generator yielding (file,id) pairs.
77
for f, fileid in entries:
80
def has(self, fileids):
81
"""Return True/False for each entry in fileids.
83
:param fileids: A List or generator yielding file ids.
84
:return: A generator or list returning True/False for each entry.
86
for fileid in fileids:
67
def __init__(self, basedir):
68
self._basedir = basedir
71
if '\\' in id or '/' in id:
72
raise ValueError("invalid store id %r" % id)
73
return os.path.join(self._basedir, id)
76
return "%s(%r)" % (self.__class__.__name__, self._basedir)
78
def add(self, f, fileid, compressed=True):
79
"""Add contents of a file into the store.
81
f -- An open file, or file-like object."""
82
# FIXME: Only works on files that will fit in memory
84
from bzrlib.atomicfile import AtomicFile
86
mutter("add store entry %r" % (fileid))
87
if isinstance(f, types.StringTypes):
92
p = self._path(fileid)
93
if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
94
raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
100
af = AtomicFile(fn, 'wb')
103
gf = gzip.GzipFile(mode='wb', fileobj=af)
93
"""Return True if this store is able to be listed."""
94
return hasattr(self, "__iter__")
96
def get(self, fileids, permit_failure=False, pb=None):
97
"""Return a set of files, one for each requested entry.
99
:param permit_failure: If true, return None for entries which do not
101
:return: A list or generator of file-like objects, one for each id.
103
for fileid in fileids:
112
def copy_multi(self, other, ids, pb=None, permit_failure=False):
113
def copy_multi(self, other, ids, permit_failure=False):
113
114
"""Copy texts for ids from other into self.
115
If an id is present in self, it is skipped. A count of copied
116
ids is returned, which may be less than len(ids).
116
If an id is present in self, it is skipped.
118
:param other: Another Store object
119
:param ids: A list of entry ids to be copied
120
:param pb: A ProgressBar object, if none is given, the default will be created.
121
:param permit_failure: Allow missing entries to be ignored
122
:return: (n_copied, [failed]) The number of entries copied successfully,
123
followed by a list of entries which could not be copied (because they
118
Returns (count_copied, failed), where failed is a collection of ids
119
that could not be copied.
127
pb = bzrlib.ui.ui_factory.progress_bar()
129
# XXX: Is there any reason why we couldn't make this accept a generator
130
# and build a list as it finds things to copy?
131
ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
121
pb = bzrlib.ui.ui_factory.progress_bar()
132
123
pb.update('preparing to copy')
134
for file_id, has in zip(ids, self.has(ids)):
136
to_copy.append(file_id)
137
return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
139
def _do_copy(self, other, to_copy, pb, permit_failure=False):
140
"""This is the standard copying mechanism, just get them one at
141
a time from remote, and store them locally.
143
:param other: Another Store object
144
:param to_copy: A list of entry ids to copy
145
:param pb: A ProgressBar object to display completion status.
146
:param permit_failure: Allow missing entries to be ignored
147
:return: (n_copied, [failed])
148
The number of entries copied, and a list of failed entries.
150
# This should be updated to use add_multi() rather than
151
# the current methods of buffering requests.
152
# One question, is it faster to queue up 1-10 and then copy 1-10
153
# then queue up 11-20, copy 11-20
154
# or to queue up 1-10, copy 1, queue 11, copy 2, etc?
155
# sort of pipeline versus batch.
157
# We can't use self._transport.copy_to because we don't know
158
# whether the local tree is in the same format as other
124
to_copy = [id for id in ids if id not in self]
125
if isinstance(other, ImmutableStore):
126
return self.copy_multi_immutable(other, to_copy, pb)
130
pb.update('copy', count, len(to_copy))
131
if not permit_failure:
132
self.add(other[id], id)
141
assert count == len(to_copy)
145
def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
146
from shutil import copyfile
160
def buffer_requests():
162
buffered_requests = []
163
for fileid in to_copy:
151
other_p = other._path(id)
155
if e.errno == errno.ENOENT:
156
if not permit_failure:
157
copyfile(other_p+".gz", p+".gz")
173
buffered_requests.append((f, fileid))
174
if len(buffered_requests) > self._max_buffered_requests:
175
yield buffered_requests.pop(0)
177
pb.update('copy', count, len(to_copy))
179
for req in buffered_requests:
182
pb.update('copy', count, len(to_copy))
184
assert count == len(to_copy)
186
self.add_multi(buffer_requests())
160
copyfile(other_p+".gz", p+".gz")
162
if e.errno == errno.ENOENT:
170
pb.update('copy', count, len(to_copy))
171
assert count == len(to_copy)
189
return len(to_copy), failed
192
class TransportStore(Store):
193
"""A TransportStore is a Store superclass for Stores that use Transports."""
195
_max_buffered_requests = 10
176
def __contains__(self, fileid):
178
p = self._path(fileid)
179
return (os.access(p, os.R_OK)
180
or os.access(p + '.gz', os.R_OK))
182
# TODO: Guard against the same thing being stored twice, compressed and uncompresse
185
for f in os.listdir(self._basedir):
187
# TODO: case-insensitive?
193
return len(os.listdir(self._basedir))
197
196
def __getitem__(self, fileid):
198
197
"""Returns a file reading from a particular entry."""
199
fn = self._relpath(fileid)
201
return self._transport.get(fn)
202
except errors.NoSuchFile:
203
raise KeyError(fileid)
205
def __init__(self, transport):
206
assert isinstance(transport, bzrlib.transport.Transport)
207
super(TransportStore, self).__init__()
208
self._transport = transport
211
if self._transport is None:
212
return "%s(None)" % (self.__class__.__name__)
214
return "%s(%r)" % (self.__class__.__name__, self._transport.base)
218
def _iter_relpaths(self):
219
"""Iter the relative paths of files in the transports sub-tree."""
220
transport = self._transport
221
queue = list(transport.list_dir('.'))
223
relpath = queue.pop(0)
224
st = transport.stat(relpath)
225
if S_ISDIR(st[ST_MODE]):
226
for i, basename in enumerate(transport.list_dir(relpath)):
227
queue.insert(i, relpath+'/'+basename)
232
"""Return True if this store is able to be listed."""
233
return self._transport.listable()
236
class ImmutableMemoryStore(Store):
237
"""A memory only store."""
239
def __contains__(self, fileid):
240
return self._contents.has_key(fileid)
198
p = self._path(fileid)
200
return gzip.GzipFile(p + '.gz', 'rb')
202
if e.errno != errno.ENOENT:
208
if e.errno != errno.ENOENT:
211
raise IndexError(fileid)
214
def total_size(self):
215
"""Return (count, bytes)
217
This is the (compressed) size stored on disk, not the size of
225
total += os.stat(p)[ST_SIZE]
227
total += os.stat(p + '.gz')[ST_SIZE]
234
class ImmutableScratchStore(ImmutableStore):
235
"""Self-destructing test subclass of ImmutableStore.
237
The Store only exists for the lifetime of the Python object.
238
Obviously you should not put anything precious in it.
242
240
def __init__(self):
243
super(ImmutableMemoryStore, self).__init__()
246
def add(self, stream, fileid, compressed=True):
247
if self._contents.has_key(fileid):
248
raise StoreError("fileid %s already in the store" % fileid)
249
self._contents[fileid] = stream.read()
251
def __getitem__(self, fileid):
252
"""Returns a file reading from a particular entry."""
253
if not self._contents.has_key(fileid):
255
return StringIO(self._contents[fileid])
257
def _item_size(self, fileid):
258
return len(self._contents[fileid])
261
return iter(self._contents.keys())
263
def total_size(self):
268
result += self._item_size(fileid)
272
class CachedStore(Store):
273
"""A store that caches data locally, to avoid repeated downloads.
274
The precacache method should be used to avoid server round-trips for
278
def __init__(self, store, cache_dir):
279
super(CachedStore, self).__init__()
280
self.source_store = store
281
# This clones the source store type with a locally bound
282
# transport. FIXME: it assumes a constructor is == cloning.
283
# clonable store - it might be nicer to actually have a clone()
284
# or something. RBC 20051003
285
self.cache_store = store.__class__(LocalTransport(cache_dir))
287
def __getitem__(self, id):
288
mutter("Cache add %s" % id)
289
if id not in self.cache_store:
290
self.cache_store.add(self.source_store[id], id)
291
return self.cache_store[id]
293
def __contains__(self, fileid):
294
if fileid in self.cache_store:
296
if fileid in self.source_store:
297
# We could copy at this time
301
def get(self, fileids, permit_failure=False, pb=None):
302
fileids = list(fileids)
303
hasids = self.cache_store.has(fileids)
305
for has, fileid in zip(hasids, fileids):
309
self.cache_store.copy_multi(self.source_store, needs,
310
permit_failure=permit_failure)
311
return self.cache_store.get(fileids,
312
permit_failure=permit_failure, pb=pb)
314
def prefetch(self, ids):
315
"""Copy a series of ids into the cache, before they are used.
316
For remote stores that support pipelining or async downloads, this can
317
increase speed considerably.
319
Failures while prefetching are ignored.
321
mutter("Prefetch of ids %s" % ",".join(ids))
322
self.cache_store.copy_multi(self.source_store, ids,
326
def copy_all(store_from, store_to):
327
"""Copy all ids from one store to another."""
328
# TODO: Optional progress indicator
329
if not store_from.listable():
330
raise UnlistableStore(store_from)
331
ids = [f for f in store_from]
332
store_to.copy_multi(store_from, ids)
334
def hash_prefix(file_id):
335
return "%02x/" % (adler32(file_id) & 0xff)
241
ImmutableStore.__init__(self, tempfile.mkdtemp())
244
for f in os.listdir(self._basedir):
245
fpath = os.path.join(self._basedir, f)
246
# needed on windows, and maybe some other filesystems
247
os.chmod(fpath, 0600)
249
os.rmdir(self._basedir)
250
mutter("%r destroyed" % self)