3213
class _DirectPackAccess(object):
3214
"""Access to data in one or more packs with less translation."""
3216
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
3217
"""Create a _DirectPackAccess object.
3219
:param index_to_packs: A dict mapping index objects to the transport
3220
and file names for obtaining data.
3221
:param reload_func: A function to call if we determine that the pack
3222
files have moved and we need to reload our caches. See
3223
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
3225
self._container_writer = None
3226
self._write_index = None
3227
self._indices = index_to_packs
3228
self._reload_func = reload_func
3229
self._flush_func = flush_func
3231
def add_raw_records(self, key_sizes, raw_data):
3232
"""Add raw knit bytes to a storage area.
3234
The data is spooled to the container writer in one bytes-record per
3237
:param sizes: An iterable of tuples containing the key and size of each
3239
:param raw_data: A bytestring containing the data.
3240
:return: A list of memos to retrieve the record later. Each memo is an
3241
opaque index memo. For _DirectPackAccess the memo is (index, pos,
3242
length), where the index field is the write_index object supplied
3243
to the PackAccess object.
3245
if type(raw_data) is not str:
3246
raise AssertionError(
3247
'data must be plain bytes was %s' % type(raw_data))
3250
for key, size in key_sizes:
3251
p_offset, p_length = self._container_writer.add_bytes_record(
3252
raw_data[offset:offset+size], [])
3254
result.append((self._write_index, p_offset, p_length))
3258
"""Flush pending writes on this access object.
3260
This will flush any buffered writes to a NewPack.
3262
if self._flush_func is not None:
3265
def get_raw_records(self, memos_for_retrieval):
3266
"""Get the raw bytes for a records.
3268
:param memos_for_retrieval: An iterable containing the (index, pos,
3269
length) memo for retrieving the bytes. The Pack access method
3270
looks up the pack to use for a given record in its index_to_pack
3272
:return: An iterator over the bytes of the records.
3274
# first pass, group into same-index requests
3276
current_index = None
3277
for (index, offset, length) in memos_for_retrieval:
3278
if current_index == index:
3279
current_list.append((offset, length))
3281
if current_index is not None:
3282
request_lists.append((current_index, current_list))
3283
current_index = index
3284
current_list = [(offset, length)]
3285
# handle the last entry
3286
if current_index is not None:
3287
request_lists.append((current_index, current_list))
3288
for index, offsets in request_lists:
3290
transport, path = self._indices[index]
3292
# A KeyError here indicates that someone has triggered an index
3293
# reload, and this index has gone missing, we need to start
3295
if self._reload_func is None:
3296
# If we don't have a _reload_func there is nothing that can
3299
raise errors.RetryWithNewPacks(index,
3300
reload_occurred=True,
3301
exc_info=sys.exc_info())
3303
reader = pack.make_readv_reader(transport, path, offsets)
3304
for names, read_func in reader.iter_records():
3305
yield read_func(None)
3306
except errors.NoSuchFile:
3307
# A NoSuchFile error indicates that a pack file has gone
3308
# missing on disk, we need to trigger a reload, and start over.
3309
if self._reload_func is None:
3311
raise errors.RetryWithNewPacks(transport.abspath(path),
3312
reload_occurred=False,
3313
exc_info=sys.exc_info())
3315
def set_writer(self, writer, index, transport_packname):
3316
"""Set a writer to use for adding data."""
3317
if index is not None:
3318
self._indices[index] = transport_packname
3319
self._container_writer = writer
3320
self._write_index = index
3322
def reload_or_raise(self, retry_exc):
3323
"""Try calling the reload function, or re-raise the original exception.
3325
This should be called after _DirectPackAccess raises a
3326
RetryWithNewPacks exception. This function will handle the common logic
3327
of determining when the error is fatal versus being temporary.
3328
It will also make sure that the original exception is raised, rather
3329
than the RetryWithNewPacks exception.
3331
If this function returns, then the calling function should retry
3332
whatever operation was being performed. Otherwise an exception will
3335
:param retry_exc: A RetryWithNewPacks exception.
3338
if self._reload_func is None:
3340
elif not self._reload_func():
3341
# The reload claimed that nothing changed
3342
if not retry_exc.reload_occurred:
3343
# If there wasn't an earlier reload, then we really were
3344
# expecting to find changes. We didn't find them, so this is a
3348
exc_class, exc_value, exc_traceback = retry_exc.exc_info
3349
raise exc_class, exc_value, exc_traceback
3352
3213
def annotate_knit(knit, revision_id):
3353
3214
"""Annotate a knit with no cached annotations.