45
"""This class represents the abstract storage layout for saving information.
38
class ImmutableStore(object):
39
"""Store that holds files indexed by unique names.
47
41
Files can be added, but not modified once they are in. Typically
48
42
the hash is used as the name, or something else known to be unique,
45
>>> st = ImmutableScratchStore()
47
>>> st.add(StringIO('hello'), 'aa')
53
You are not allowed to add an id that is already present.
55
Entries can be retrieved as files, which may then be read.
57
>>> st.add(StringIO('goodbye'), '123123')
58
>>> st['123123'].read()
61
TODO: Atomic add by writing to a temporary file and renaming.
63
In bzr 0.0.5 and earlier, files within the store were marked
64
readonly on disk. This is no longer done but existing stores need
53
raise NotImplementedError('Children should define their length')
55
def get(self, file_id):
56
"""Returns a file reading from a particular entry."""
58
def __getitem__(self, fileid):
59
"""DEPRECATED. Please use .get(file_id) instead."""
60
raise NotImplementedError
62
def __contains__(self, fileid):
64
raise NotImplementedError
67
raise NotImplementedError
69
def add(self, f, fileid):
70
"""Add a file object f to the store accessible from the given fileid"""
71
raise NotImplementedError('Children of Store must define their method of adding entries.')
73
def add_multi(self, entries):
74
"""Add a series of file-like or string objects to the store with the given
77
:param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
78
This could also be a generator yielding (file,id) pairs.
80
for f, fileid in entries:
83
def has(self, fileids):
84
"""Return True/False for each entry in fileids.
86
:param fileids: A List or generator yielding file ids.
87
:return: A generator or list returning True/False for each entry.
89
for fileid in fileids:
68
def __init__(self, basedir):
69
self._basedir = basedir
72
if '\\' in id or '/' in id:
73
raise ValueError("invalid store id %r" % id)
74
return os.path.join(self._basedir, id)
77
return "%s(%r)" % (self.__class__.__name__, self._basedir)
79
def add(self, f, fileid, compressed=True):
80
"""Add contents of a file into the store.
82
f -- An open file, or file-like object."""
83
# FIXME: Only works on files that will fit in memory
85
from bzrlib.atomicfile import AtomicFile
87
mutter("add store entry %r" % (fileid))
88
if isinstance(f, types.StringTypes):
93
p = self._path(fileid)
94
if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
95
raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
101
af = AtomicFile(fn, 'wb')
104
gf = gzip.GzipFile(mode='wb', fileobj=af)
96
"""Return True if this store is able to be listed."""
97
return hasattr(self, "__iter__")
99
def copy_multi(self, other, ids, pb=None, permit_failure=False):
114
def copy_multi(self, other, ids, permit_failure=False):
100
115
"""Copy texts for ids from other into self.
102
If an id is present in self, it is skipped. A count of copied
103
ids is returned, which may be less than len(ids).
117
If an id is present in self, it is skipped.
105
:param other: Another Store object
106
:param ids: A list of entry ids to be copied
107
:param pb: A ProgressBar object, if none is given, the default will be created.
108
:param permit_failure: Allow missing entries to be ignored
109
:return: (n_copied, [failed]) The number of entries copied successfully,
110
followed by a list of entries which could not be copied (because they
119
Returns (count_copied, failed), where failed is a collection of ids
120
that could not be copied.
114
pb = bzrlib.ui.ui_factory.progress_bar()
116
# XXX: Is there any reason why we couldn't make this accept a generator
117
# and build a list as it finds things to copy?
118
ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
122
pb = bzrlib.ui.ui_factory.progress_bar()
119
124
pb.update('preparing to copy')
121
for file_id, has in zip(ids, self.has(ids)):
123
to_copy.append(file_id)
124
return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
126
def _do_copy(self, other, to_copy, pb, permit_failure=False):
127
"""This is the standard copying mechanism, just get them one at
128
a time from remote, and store them locally.
130
:param other: Another Store object
131
:param to_copy: A list of entry ids to copy
132
:param pb: A ProgressBar object to display completion status.
133
:param permit_failure: Allow missing entries to be ignored
134
:return: (n_copied, [failed])
135
The number of entries copied, and a list of failed entries.
137
# This should be updated to use add_multi() rather than
138
# the current methods of buffering requests.
139
# One question, is it faster to queue up 1-10 and then copy 1-10
140
# then queue up 11-20, copy 11-20
141
# or to queue up 1-10, copy 1, queue 11, copy 2, etc?
142
# sort of pipeline versus batch.
144
# We can't use self._transport.copy_to because we don't know
145
# whether the local tree is in the same format as other
125
to_copy = [id for id in ids if id not in self]
126
if isinstance(other, ImmutableStore):
127
return self.copy_multi_immutable(other, to_copy, pb)
147
def buffer_requests():
149
buffered_requests = []
150
for fileid in to_copy:
132
pb.update('copy', count, len(to_copy))
133
if not permit_failure:
134
self.add(other[id], id)
152
f = other.get(fileid)
143
if not permit_failure:
144
assert count == len(to_copy)
148
def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
149
from shutil import copyfile
154
other_p = other._path(id)
158
if e.errno == errno.ENOENT:
159
if not permit_failure:
160
copyfile(other_p+".gz", p+".gz")
160
buffered_requests.append((f, fileid))
161
if len(buffered_requests) > self._max_buffered_requests:
162
yield buffered_requests.pop(0)
164
pb.update('copy', count, len(to_copy))
166
for req in buffered_requests:
169
pb.update('copy', count, len(to_copy))
171
assert count == len(to_copy)
173
self.add_multi(buffer_requests())
163
copyfile(other_p+".gz", p+".gz")
165
if e.errno == errno.ENOENT:
173
pb.update('copy', count, len(to_copy))
174
assert count == len(to_copy)
176
return len(to_copy), failed
179
class TransportStore(Store):
180
"""A TransportStore is a Store superclass for Stores that use Transports."""
182
_max_buffered_requests = 10
184
def add(self, f, fileid, suffix=None):
185
"""Add contents of a file into the store.
187
f -- A file-like object, or string
189
mutter("add store entry %r" % (fileid))
191
if suffix is not None:
192
fn = self._relpath(fileid, [suffix])
194
fn = self._relpath(fileid)
195
if self._transport.has(fn):
196
raise BzrError("store %r already contains id %r" % (self._transport.base, fileid))
200
self._transport.mkdir(hash_prefix(fileid))
201
except errors.FileExists:
206
def _check_fileid(self, fileid):
207
if not isinstance(fileid, basestring):
208
raise TypeError('Fileids should be a string type: %s %r' % (type(fileid), fileid))
209
if '\\' in fileid or '/' in fileid:
210
raise ValueError("invalid store id %r" % fileid)
212
179
def __contains__(self, fileid):
213
fn = self._relpath(fileid)
214
return self._transport.has(fn)
216
def _get(self, filename):
217
"""Return an vanilla file stream for clients to read from.
219
This is the body of a template method on 'get', and should be
220
implemented by subclasses.
222
raise NotImplementedError
224
def get(self, fileid):
181
p = self._path(fileid)
182
return (os.access(p, os.R_OK)
183
or os.access(p + '.gz', os.R_OK))
185
# TODO: Guard against the same thing being stored twice, compressed and uncompresse
188
for f in os.listdir(self._basedir):
190
# TODO: case-insensitive?
196
return len(os.listdir(self._basedir))
199
def __getitem__(self, fileid):
225
200
"""Returns a file reading from a particular entry."""
226
fn = self._relpath(fileid)
229
except errors.NoSuchFile:
230
raise KeyError(fileid)
232
def has(self, fileids, pb=None):
233
"""Return True/False for each entry in fileids.
235
:param fileids: A List or generator yielding file ids.
236
:return: A generator or list returning True/False for each entry.
238
relpaths = (self._relpath(fid) for fid in fileids)
239
return self._transport.has_multi(relpaths, pb=pb)
241
def __init__(self, transport, prefixed=False):
242
assert isinstance(transport, bzrlib.transport.Transport)
243
super(TransportStore, self).__init__()
244
self._transport = transport
245
self._prefixed = prefixed
248
return len(list(self._iter_relpath()))
250
def _relpath(self, fileid, suffixes=[]):
251
self._check_fileid(fileid)
252
for suffix in suffixes:
253
self._check_fileid(suffix)
255
path = [hash_prefix(fileid) + fileid]
258
path.extend(suffixes)
259
return '.'.join(path)
262
if self._transport is None:
263
return "%s(None)" % (self.__class__.__name__)
265
return "%s(%r)" % (self.__class__.__name__, self._transport.base)
269
def _iter_relpaths(self):
270
"""Iter the relative paths of files in the transports sub-tree."""
271
transport = self._transport
272
queue = list(transport.list_dir('.'))
274
relpath = queue.pop(0)
275
st = transport.stat(relpath)
276
if S_ISDIR(st[ST_MODE]):
277
for i, basename in enumerate(transport.list_dir(relpath)):
278
queue.insert(i, relpath+'/'+basename)
283
"""Return True if this store is able to be listed."""
284
return self._transport.listable()
201
p = self._path(fileid)
203
return gzip.GzipFile(p + '.gz', 'rb')
205
if e.errno != errno.ENOENT:
211
if e.errno != errno.ENOENT:
214
raise IndexError(fileid)
286
217
def total_size(self):
287
218
"""Return (count, bytes)
293
for relpath, st in self._iter_relpaths():
228
total += os.stat(p)[ST_SIZE]
230
total += os.stat(p + '.gz')[ST_SIZE]
297
232
return count, total
300
class ImmutableMemoryStore(Store):
301
"""A memory only store."""
303
def __contains__(self, fileid):
304
return self._contents.has_key(fileid)
237
class ImmutableScratchStore(ImmutableStore):
238
"""Self-destructing test subclass of ImmutableStore.
240
The Store only exists for the lifetime of the Python object.
241
Obviously you should not put anything precious in it.
306
243
def __init__(self):
307
super(ImmutableMemoryStore, self).__init__()
310
def add(self, stream, fileid, compressed=True):
311
if self._contents.has_key(fileid):
312
raise StoreError("fileid %s already in the store" % fileid)
313
self._contents[fileid] = stream.read()
315
def get(self, fileid):
316
"""Returns a file reading from a particular entry."""
317
if not self._contents.has_key(fileid):
319
return StringIO(self._contents[fileid])
321
def _item_size(self, fileid):
322
return len(self._contents[fileid])
325
return iter(self._contents.keys())
327
def total_size(self):
332
result += self._item_size(fileid)
336
class CachedStore(Store):
337
"""A store that caches data locally, to avoid repeated downloads.
338
The precacache method should be used to avoid server round-trips for
342
def __init__(self, store, cache_dir):
343
super(CachedStore, self).__init__()
344
self.source_store = store
345
# This clones the source store type with a locally bound
346
# transport. FIXME: it assumes a constructor is == cloning.
347
# clonable store - it might be nicer to actually have a clone()
348
# or something. RBC 20051003
349
self.cache_store = store.__class__(LocalTransport(cache_dir))
352
mutter("Cache add %s" % id)
353
if id not in self.cache_store:
354
self.cache_store.add(self.source_store.get(id), id)
355
return self.cache_store.get(id)
357
def __contains__(self, fileid):
358
if fileid in self.cache_store:
360
if fileid in self.source_store:
361
# We could copy at this time
365
def prefetch(self, ids):
366
"""Copy a series of ids into the cache, before they are used.
367
For remote stores that support pipelining or async downloads, this can
368
increase speed considerably.
370
Failures while prefetching are ignored.
372
mutter("Prefetch of ids %s" % ",".join(ids))
373
self.cache_store.copy_multi(self.source_store, ids,
377
def copy_all(store_from, store_to):
378
"""Copy all ids from one store to another."""
379
# TODO: Optional progress indicator
380
if not store_from.listable():
381
raise UnlistableStore(store_from)
382
ids = [f for f in store_from]
383
store_to.copy_multi(store_from, ids)
385
def hash_prefix(file_id):
386
return "%02x/" % (adler32(file_id) & 0xff)
244
ImmutableStore.__init__(self, tempfile.mkdtemp())
247
for f in os.listdir(self._basedir):
248
fpath = os.path.join(self._basedir, f)
249
# needed on windows, and maybe some other filesystems
250
os.chmod(fpath, 0600)
252
os.rmdir(self._basedir)
253
mutter("%r destroyed" % self)