3212
class _DirectPackAccess(object):
3213
"""Access to data in one or more packs with less translation."""
3215
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3216
"""Create a _DirectPackAccess object.
3218
:param index_to_packs: A dict mapping index objects to the transport
3219
and file names for obtaining data.
3220
:param reload_func: A function to call if we determine that the pack
3221
files have moved and we need to reload our caches. See
3222
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3224
self._container_writer = None
3225
self._write_index = None
3226
self._indices = index_to_packs
3227
self._reload_func = reload_func
3228
self._flush_func = flush_func
3230
def add_raw_records(self, key_sizes, raw_data):
3231
"""Add raw knit bytes to a storage area.
3233
The data is spooled to the container writer in one bytes-record per
3236
:param sizes: An iterable of tuples containing the key and size of each
3238
:param raw_data: A bytestring containing the data.
3239
:return: A list of memos to retrieve the record later. Each memo is an
3240
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3241
length), where the index field is the write_index object supplied
3242
to the PackAccess object.
3244
if type(raw_data) is not str:
3245
raise AssertionError(
3246
'data must be plain bytes was %s' % type(raw_data))
3249
for key, size in key_sizes:
3250
p_offset, p_length = self._container_writer.add_bytes_record(
3251
raw_data[offset:offset+size], [])
3253
result.append((self._write_index, p_offset, p_length))
3257
"""Flush pending writes on this access object.
3259
This will flush any buffered writes to a NewPack.
3261
if self._flush_func is not None:
3264
def get_raw_records(self, memos_for_retrieval):
3265
"""Get the raw bytes for a records.
3267
:param memos_for_retrieval: An iterable containing the (index, pos,
3268
length) memo for retrieving the bytes. The Pack access method
3269
looks up the pack to use for a given record in its index_to_pack
3271
:return: An iterator over the bytes of the records.
3273
# first pass, group into same-index requests
3275
current_index = None
3276
for (index, offset, length) in memos_for_retrieval:
3277
if current_index == index:
3278
current_list.append((offset, length))
3280
if current_index is not None:
3281
request_lists.append((current_index, current_list))
3282
current_index = index
3283
current_list = [(offset, length)]
3284
# handle the last entry
3285
if current_index is not None:
3286
request_lists.append((current_index, current_list))
3287
for index, offsets in request_lists:
3289
transport, path = self._indices[index]
3291
# A KeyError here indicates that someone has triggered an index
3292
# reload, and this index has gone missing, we need to start
3294
if self._reload_func is None:
3295
# If we don't have a _reload_func there is nothing that can
3298
raise errors.RetryWithNewPacks(index,
3299
reload_occurred=True,
3300
exc_info=sys.exc_info())
3302
reader = pack.make_readv_reader(transport, path, offsets)
3303
for names, read_func in reader.iter_records():
3304
yield read_func(None)
3305
except errors.NoSuchFile:
3306
# A NoSuchFile error indicates that a pack file has gone
3307
# missing on disk, we need to trigger a reload, and start over.
3308
if self._reload_func is None:
3310
raise errors.RetryWithNewPacks(transport.abspath(path),
3311
reload_occurred=False,
3312
exc_info=sys.exc_info())
3314
def set_writer(self, writer, index, transport_packname):
3315
"""Set a writer to use for adding data."""
3316
if index is not None:
3317
self._indices[index] = transport_packname
3318
self._container_writer = writer
3319
self._write_index = index
3321
def reload_or_raise(self, retry_exc):
3322
"""Try calling the reload function, or re-raise the original exception.
3324
This should be called after _DirectPackAccess raises a
3325
RetryWithNewPacks exception. This function will handle the common logic
3326
of determining when the error is fatal versus being temporary.
3327
It will also make sure that the original exception is raised, rather
3328
than the RetryWithNewPacks exception.
3330
If this function returns, then the calling function should retry
3331
whatever operation was being performed. Otherwise an exception will
3334
:param retry_exc: A RetryWithNewPacks exception.
3337
if self._reload_func is None:
3339
elif not self._reload_func():
3340
# The reload claimed that nothing changed
3341
if not retry_exc.reload_occurred:
3342
# If there wasn't an earlier reload, then we really were
3343
# expecting to find changes. We didn't find them, so this is a
3347
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3348
raise exc_class, exc_value, exc_traceback
3215
3351
def annotate_knit(knit, revision_id):
3216
3352
"""Annotate a knit with no cached annotations.