38
class ImmutableStore(object):
39
"""Store that holds files indexed by unique names.
42
"""This class represents the abstract storage layout for saving information.
41
44
Files can be added, but not modified once they are in. Typically
42
45
the hash is used as the name, or something else known to be unique,
45
>>> st = ImmutableScratchStore()
47
>>> st.add(StringIO('hello'), 'aa')
53
You are not allowed to add an id that is already present.
55
Entries can be retrieved as files, which may then be read.
57
>>> st.add(StringIO('goodbye'), '123123')
58
>>> st['123123'].read()
61
TODO: Atomic add by writing to a temporary file and renaming.
63
In bzr 0.0.5 and earlier, files within the store were marked
64
readonly on disk. This is no longer done but existing stores need
68
def __init__(self, basedir):
69
self._basedir = basedir
71
def _path(self, entry_id):
72
if not isinstance(entry_id, basestring):
73
raise TypeError(type(entry_id))
74
if '\\' in entry_id or '/' in entry_id:
75
raise ValueError("invalid store id %r" % entry_id)
76
return os.path.join(self._basedir, entry_id)
79
return "%s(%r)" % (self.__class__.__name__, self._basedir)
81
def add(self, f, fileid, compressed=True):
82
"""Add contents of a file into the store.
84
f -- An open file, or file-like object."""
85
# FIXME: Only works on files that will fit in memory
87
from bzrlib.atomicfile import AtomicFile
89
mutter("add store entry %r" % (fileid))
90
if isinstance(f, types.StringTypes):
95
p = self._path(fileid)
96
if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
97
raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
103
af = AtomicFile(fn, 'wb')
106
gf = gzip.GzipFile(mode='wb', fileobj=af)
50
raise NotImplementedError('Children should define their length')
52
def __getitem__(self, fileid):
53
"""Returns a file reading from a particular entry."""
54
raise NotImplementedError
56
def __contains__(self, fileid):
58
raise NotImplementedError
61
raise NotImplementedError
63
def add(self, f, fileid):
64
"""Add a file object f to the store accessible from the given fileid"""
65
raise NotImplementedError('Children of Store must define their method of adding entries.')
67
def add_multi(self, entries):
68
"""Add a series of file-like or string objects to the store with the given
71
:param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
72
This could also be a generator yielding (file,id) pairs.
74
for f, fileid in entries:
77
def has(self, fileids):
78
"""Return True/False for each entry in fileids.
80
:param fileids: A List or generator yielding file ids.
81
:return: A generator or list returning True/False for each entry.
83
for fileid in fileids:
116
def copy_multi(self, other, ids, permit_failure=False):
90
"""Return True if this store is able to be listed."""
91
return hasattr(self, "__iter__")
93
def get(self, fileids, permit_failure=False, pb=None):
94
"""Return a set of files, one for each requested entry.
96
:param permit_failure: If true, return None for entries which do not
98
:return: A list or generator of file-like objects, one for each id.
100
for fileid in fileids:
109
def copy_multi(self, other, ids, pb=None, permit_failure=False):
117
110
"""Copy texts for ids from other into self.
119
If an id is present in self, it is skipped.
112
If an id is present in self, it is skipped. A count of copied
113
ids is returned, which may be less than len(ids).
121
Returns (count_copied, failed), where failed is a collection of ids
122
that could not be copied.
115
:param other: Another Store object
116
:param ids: A list of entry ids to be copied
117
:param pb: A ProgressBar object, if none is given, the default will be created.
118
:param permit_failure: Allow missing entries to be ignored
119
:return: (n_copied, [failed]) The number of entries copied successfully,
120
followed by a list of entries which could not be copied (because they
124
pb = bzrlib.ui.ui_factory.progress_bar()
124
pb = bzrlib.ui.ui_factory.progress_bar()
126
# XXX: Is there any reason why we couldn't make this accept a generator
127
# and build a list as it finds things to copy?
128
ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
126
129
pb.update('preparing to copy')
127
to_copy = [id for id in ids if id not in self]
128
if isinstance(other, ImmutableStore):
129
return self.copy_multi_immutable(other, to_copy, pb,
130
permit_failure=permit_failure)
131
for file_id, has in zip(ids, self.has(ids)):
133
to_copy.append(file_id)
134
return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
136
def _do_copy(self, other, to_copy, pb, permit_failure=False):
137
"""This is the standard copying mechanism, just get them one at
138
a time from remote, and store them locally.
140
:param other: Another Store object
141
:param to_copy: A list of entry ids to copy
142
:param pb: A ProgressBar object to display completion status.
143
:param permit_failure: Allow missing entries to be ignored
144
:return: (n_copied, [failed])
145
The number of entries copied, and a list of failed entries.
147
# This should be updated to use add_multi() rather than
148
# the current methods of buffering requests.
149
# One question, is it faster to queue up 1-10 and then copy 1-10
150
# then queue up 11-20, copy 11-20
151
# or to queue up 1-10, copy 1, queue 11, copy 2, etc?
152
# sort of pipeline versus batch.
154
# We can't use self._transport.copy_to because we don't know
155
# whether the local tree is in the same format as other
135
pb.update('copy', count, len(to_copy))
136
if not permit_failure:
137
self.add(other[id], id)
157
def buffer_requests():
159
buffered_requests = []
160
for fileid in to_copy:
146
if not permit_failure:
170
buffered_requests.append((f, fileid))
171
if len(buffered_requests) > self._max_buffered_requests:
172
yield buffered_requests.pop(0)
174
pb.update('copy', count, len(to_copy))
176
for req in buffered_requests:
179
pb.update('copy', count, len(to_copy))
147
181
assert count == len(to_copy)
151
def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
152
from shutil import copyfile
157
other_p = other._path(id)
161
if e.errno == errno.ENOENT:
162
if not permit_failure:
163
copyfile(other_p+".gz", p+".gz")
166
copyfile(other_p+".gz", p+".gz")
168
if e.errno == errno.ENOENT:
176
pb.update('copy', count, len(to_copy))
177
assert count == len(to_copy)
183
self.add_multi(buffer_requests())
186
return len(to_copy), failed
189
class TransportStore(Store):
190
"""A TransportStore is a Store superclass for Stores that use Transports."""
192
_max_buffered_requests = 10
194
def __init__(self, transport):
195
assert isinstance(transport, bzrlib.transport.Transport)
196
super(TransportStore, self).__init__()
197
self._transport = transport
200
if self._transport is None:
201
return "%s(None)" % (self.__class__.__name__)
203
return "%s(%r)" % (self.__class__.__name__, self._transport.base)
208
"""Return True if this store is able to be listed."""
209
return self._transport.listable()
212
class ImmutableMemoryStore(Store):
213
"""A memory only store."""
182
215
def __contains__(self, fileid):
184
p = self._path(fileid)
185
return (os.access(p, os.R_OK)
186
or os.access(p + '.gz', os.R_OK))
188
# TODO: Guard against the same thing being stored twice, compressed and uncompresse
191
for f in os.listdir(self._basedir):
193
# TODO: case-insensitive?
199
return len(os.listdir(self._basedir))
216
return self._contents.has_key(fileid)
219
super(ImmutableMemoryStore, self).__init__()
222
def add(self, stream, fileid, compressed=True):
223
if self._contents.has_key(fileid):
224
raise StoreError("fileid %s already in the store" % fileid)
225
self._contents[fileid] = stream.read()
202
227
def __getitem__(self, fileid):
203
228
"""Returns a file reading from a particular entry."""
204
p = self._path(fileid)
206
return gzip.GzipFile(p + '.gz', 'rb')
208
if e.errno != errno.ENOENT:
214
if e.errno != errno.ENOENT:
217
raise KeyError(fileid)
229
if not self._contents.has_key(fileid):
231
return StringIO(self._contents[fileid])
233
def _item_size(self, fileid):
234
return len(self._contents[fileid])
237
return iter(self._contents.keys())
220
239
def total_size(self):
221
"""Return (count, bytes)
223
This is the (compressed) size stored on disk, not the size of
231
total += os.stat(p)[ST_SIZE]
233
total += os.stat(p + '.gz')[ST_SIZE]
240
class ImmutableScratchStore(ImmutableStore):
241
"""Self-destructing test subclass of ImmutableStore.
243
The Store only exists for the lifetime of the Python object.
244
Obviously you should not put anything precious in it.
244
result += self._item_size(fileid)
248
class CachedStore(Store):
249
"""A store that caches data locally, to avoid repeated downloads.
250
The precacache method should be used to avoid server round-trips for
247
ImmutableStore.__init__(self, tempfile.mkdtemp())
250
for f in os.listdir(self._basedir):
251
fpath = os.path.join(self._basedir, f)
252
# needed on windows, and maybe some other filesystems
253
os.chmod(fpath, 0600)
255
os.rmdir(self._basedir)
256
mutter("%r destroyed" % self)
254
def __init__(self, store, cache_dir):
255
super(CachedStore, self).__init__()
256
self.source_store = store
257
# This clones the source store type with a locally bound
258
# transport. FIXME: it assumes a constructor is == cloning.
259
# clonable store - it might be nicer to actually have a clone()
260
# or something. RBC 20051003
261
self.cache_store = store.__class__(LocalTransport(cache_dir))
263
def __getitem__(self, id):
264
mutter("Cache add %s" % id)
265
if id not in self.cache_store:
266
self.cache_store.add(self.source_store[id], id)
267
return self.cache_store[id]
269
def __contains__(self, fileid):
270
if fileid in self.cache_store:
272
if fileid in self.source_store:
273
# We could copy at this time
277
def get(self, fileids, permit_failure=False, pb=None):
278
fileids = list(fileids)
279
hasids = self.cache_store.has(fileids)
281
for has, fileid in zip(hasids, fileids):
285
self.cache_store.copy_multi(self.source_store, needs,
286
permit_failure=permit_failure)
287
return self.cache_store.get(fileids,
288
permit_failure=permit_failure, pb=pb)
290
def prefetch(self, ids):
291
"""Copy a series of ids into the cache, before they are used.
292
For remote stores that support pipelining or async downloads, this can
293
increase speed considerably.
295
Failures while prefetching are ignored.
297
mutter("Prefetch of ids %s" % ",".join(ids))
298
self.cache_store.copy_multi(self.source_store, ids,
302
def copy_all(store_from, store_to):
303
"""Copy all ids from one store to another."""
304
# TODO: Optional progress indicator
305
if not store_from.listable():
306
raise UnlistableStore(store_from)
307
ids = [f for f in store_from]
308
store_to.copy_multi(store_from, ids)