45
class ImmutableStore(object):
46
"""Store that holds files indexed by unique names.
42
"""This class represents the abstract storage layout for saving information.
48
44
Files can be added, but not modified once they are in. Typically
49
45
the hash is used as the name, or something else known to be unique,
52
>>> st = ImmutableScratchStore()
54
>>> st.add(StringIO('hello'), 'aa')
60
You are not allowed to add an id that is already present.
62
Entries can be retrieved as files, which may then be read.
64
>>> st.add(StringIO('goodbye'), '123123')
65
>>> st['123123'].read()
68
TODO: Atomic add by writing to a temporary file and renaming.
70
In bzr 0.0.5 and earlier, files within the store were marked
71
readonly on disk. This is no longer done but existing stores need
75
def __init__(self, basedir):
76
self._basedir = basedir
78
def _path(self, entry_id):
79
if not isinstance(entry_id, basestring):
80
raise TypeError(type(entry_id))
81
if '\\' in entry_id or '/' in entry_id:
82
raise ValueError("invalid store id %r" % entry_id)
83
return os.path.join(self._basedir, entry_id)
86
return "%s(%r)" % (self.__class__.__name__, self._basedir)
88
def add(self, f, fileid, compressed=True):
89
"""Add contents of a file into the store.
91
f -- An open file, or file-like object."""
92
# FIXME: Only works on files that will fit in memory
94
from bzrlib.atomicfile import AtomicFile
96
mutter("add store entry %r" % (fileid))
97
if isinstance(f, types.StringTypes):
102
p = self._path(fileid)
103
if os.access(p, os.F_OK) or os.access(p + '.gz', os.F_OK):
104
raise BzrError("store %r already contains id %r" % (self._basedir, fileid))
110
af = AtomicFile(fn, 'wb')
113
gf = gzip.GzipFile(mode='wb', fileobj=af)
50
raise NotImplementedError('Children should define their length')
52
def __getitem__(self, fileid):
53
"""Returns a file reading from a particular entry."""
54
raise NotImplementedError
56
def __contains__(self, fileid):
58
raise NotImplementedError
61
raise NotImplementedError
63
def add(self, f, fileid):
64
"""Add a file object f to the store accessible from the given fileid"""
65
raise NotImplementedError('Children of Store must define their method of adding entries.')
67
def add_multi(self, entries):
68
"""Add a series of file-like or string objects to the store with the given
71
:param entries: A list of tuples of file,id pairs [(file1, id1), (file2, id2), ...]
72
This could also be a generator yielding (file,id) pairs.
74
for f, fileid in entries:
77
def has(self, fileids):
78
"""Return True/False for each entry in fileids.
80
:param fileids: A List or generator yielding file ids.
81
:return: A generator or list returning True/False for each entry.
83
for fileid in fileids:
123
def copy_multi(self, other, ids, permit_failure=False):
90
"""Return True if this store is able to be listed."""
91
return hasattr(self, "__iter__")
93
def get(self, fileids, permit_failure=False, pb=None):
94
"""Return a set of files, one for each requested entry.
96
:param permit_failure: If true, return None for entries which do not
98
:return: A list or generator of file-like objects, one for each id.
100
for fileid in fileids:
109
def copy_multi(self, other, ids, pb=None, permit_failure=False):
124
110
"""Copy texts for ids from other into self.
126
If an id is present in self, it is skipped.
112
If an id is present in self, it is skipped. A count of copied
113
ids is returned, which may be less than len(ids).
128
Returns (count_copied, failed), where failed is a collection of ids
129
that could not be copied.
115
:param other: Another Store object
116
:param ids: A list of entry ids to be copied
117
:param pb: A ProgressBar object, if none is given, the default will be created.
118
:param permit_failure: Allow missing entries to be ignored
119
:return: (n_copied, [failed]) The number of entries copied successfully,
120
followed by a list of entries which could not be copied (because they
131
pb = bzrlib.ui.ui_factory.progress_bar()
124
pb = bzrlib.ui.ui_factory.progress_bar()
126
# XXX: Is there any reason why we couldn't make this accept a generator
127
# and build a list as it finds things to copy?
128
ids = list(ids) # Make sure we don't have a generator, since we iterate 2 times
133
129
pb.update('preparing to copy')
134
to_copy = [id for id in ids if id not in self]
135
if isinstance(other, ImmutableStore):
136
return self.copy_multi_immutable(other, to_copy, pb,
137
permit_failure=permit_failure)
131
for file_id, has in zip(ids, self.has(ids)):
133
to_copy.append(file_id)
134
return self._do_copy(other, to_copy, pb, permit_failure=permit_failure)
136
def _do_copy(self, other, to_copy, pb, permit_failure=False):
137
"""This is the standard copying mechanism, just get them one at
138
a time from remote, and store them locally.
140
:param other: Another Store object
141
:param to_copy: A list of entry ids to copy
142
:param pb: A ProgressBar object to display completion status.
143
:param permit_failure: Allow missing entries to be ignored
144
:return: (n_copied, [failed])
145
The number of entries copied, and a list of failed entries.
147
# This should be updated to use add_multi() rather than
148
# the current methods of buffering requests.
149
# One question, is it faster to queue up 1-10 and then copy 1-10
150
# then queue up 11-20, copy 11-20
151
# or to queue up 1-10, copy 1, queue 11, copy 2, etc?
152
# sort of pipeline versus batch.
154
# We can't use self._transport.copy_to because we don't know
155
# whether the local tree is in the same format as other
142
pb.update('copy', count, len(to_copy))
143
if not permit_failure:
144
self.add(other[id], id)
157
def buffer_requests():
159
buffered_requests = []
160
for fileid in to_copy:
153
if not permit_failure:
170
buffered_requests.append((f, fileid))
171
if len(buffered_requests) > self._max_buffered_requests:
172
yield buffered_requests.pop(0)
174
pb.update('copy', count, len(to_copy))
176
for req in buffered_requests:
179
pb.update('copy', count, len(to_copy))
154
181
assert count == len(to_copy)
158
def copy_multi_immutable(self, other, to_copy, pb, permit_failure=False):
163
other_p = other._path(id)
165
osutils.link_or_copy(other_p, p)
166
except (IOError, OSError), e:
167
if e.errno == errno.ENOENT:
168
if not permit_failure:
169
osutils.link_or_copy(other_p+".gz", p+".gz")
172
osutils.link_or_copy(other_p+".gz", p+".gz")
174
if e.errno == errno.ENOENT:
182
pb.update('copy', count, len(to_copy))
183
assert count == len(to_copy)
183
self.add_multi(buffer_requests())
186
return len(to_copy), failed
189
class TransportStore(Store):
190
"""A TransportStore is a Store superclass for Stores that use Transports."""
192
_max_buffered_requests = 10
194
def __init__(self, transport):
195
assert isinstance(transport, bzrlib.transport.Transport)
196
super(TransportStore, self).__init__()
197
self._transport = transport
200
if self._transport is None:
201
return "%s(None)" % (self.__class__.__name__)
203
return "%s(%r)" % (self.__class__.__name__, self._transport.base)
208
"""Return True if this store is able to be listed."""
209
return self._transport.listable()
212
class ImmutableMemoryStore(Store):
213
"""A memory only store."""
188
215
def __contains__(self, fileid):
190
p = self._path(fileid)
191
return (os.access(p, os.R_OK)
192
or os.access(p + '.gz', os.R_OK))
194
# TODO: Guard against the same thing being stored twice, compressed and uncompresse
197
for f in os.listdir(self._basedir):
199
# TODO: case-insensitive?
205
return len(os.listdir(self._basedir))
216
return self._contents.has_key(fileid)
219
super(ImmutableMemoryStore, self).__init__()
222
def add(self, stream, fileid, compressed=True):
223
if self._contents.has_key(fileid):
224
raise StoreError("fileid %s already in the store" % fileid)
225
self._contents[fileid] = stream.read()
208
227
def __getitem__(self, fileid):
209
228
"""Returns a file reading from a particular entry."""
210
p = self._path(fileid)
212
return gzip.GzipFile(p + '.gz', 'rb')
214
if e.errno != errno.ENOENT:
220
if e.errno != errno.ENOENT:
223
raise KeyError(fileid)
229
if not self._contents.has_key(fileid):
231
return StringIO(self._contents[fileid])
233
def _item_size(self, fileid):
234
return len(self._contents[fileid])
237
return iter(self._contents.keys())
226
239
def total_size(self):
227
"""Return (count, bytes)
229
This is the (compressed) size stored on disk, not the size of
237
total += os.stat(p)[ST_SIZE]
239
total += os.stat(p + '.gz')[ST_SIZE]
246
class ImmutableScratchStore(ImmutableStore):
247
"""Self-destructing test subclass of ImmutableStore.
249
The Store only exists for the lifetime of the Python object.
250
Obviously you should not put anything precious in it.
244
result += self._item_size(fileid)
248
class CachedStore(Store):
249
"""A store that caches data locally, to avoid repeated downloads.
250
The precacache method should be used to avoid server round-trips for
253
ImmutableStore.__init__(self, tempfile.mkdtemp())
256
for f in os.listdir(self._basedir):
257
fpath = os.path.join(self._basedir, f)
258
# needed on windows, and maybe some other filesystems
259
os.chmod(fpath, 0600)
261
os.rmdir(self._basedir)
262
mutter("%r destroyed" % self)
254
def __init__(self, store, cache_dir):
255
super(CachedStore, self).__init__()
256
self.source_store = store
257
# This clones the source store type with a locally bound
258
# transport. FIXME: it assumes a constructor is == cloning.
259
# clonable store - it might be nicer to actually have a clone()
260
# or something. RBC 20051003
261
self.cache_store = store.__class__(LocalTransport(cache_dir))
263
def __getitem__(self, id):
264
mutter("Cache add %s" % id)
265
if id not in self.cache_store:
266
self.cache_store.add(self.source_store[id], id)
267
return self.cache_store[id]
269
def __contains__(self, fileid):
270
if fileid in self.cache_store:
272
if fileid in self.source_store:
273
# We could copy at this time
277
def get(self, fileids, permit_failure=False, pb=None):
278
fileids = list(fileids)
279
hasids = self.cache_store.has(fileids)
281
for has, fileid in zip(hasids, fileids):
285
self.cache_store.copy_multi(self.source_store, needs,
286
permit_failure=permit_failure)
287
return self.cache_store.get(fileids,
288
permit_failure=permit_failure, pb=pb)
290
def prefetch(self, ids):
291
"""Copy a series of ids into the cache, before they are used.
292
For remote stores that support pipelining or async downloads, this can
293
increase speed considerably.
295
Failures while prefetching are ignored.
297
mutter("Prefetch of ids %s" % ",".join(ids))
298
self.cache_store.copy_multi(self.source_store, ids,
264
302
def copy_all(store_from, store_to):
265
303
"""Copy all ids from one store to another."""
266
if not hasattr(store_from, "__iter__"):
304
# TODO: Optional progress indicator
305
if not store_from.listable():
267
306
raise UnlistableStore(store_from)
268
307
ids = [f for f in store_from]
269
308
store_to.copy_multi(store_from, ids)