2318
2318
" context: %(context)s %(orig_error)s")
2321
class _DirectPackAccess(object):
2322
"""Access to data in one or more packs with less translation."""
2324
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2325
"""Create a _DirectPackAccess object.
2327
:param index_to_packs: A dict mapping index objects to the transport
2328
and file names for obtaining data.
2329
:param reload_func: A function to call if we determine that the pack
2330
files have moved and we need to reload our caches. See
2331
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2333
self._container_writer = None
2334
self._write_index = None
2335
self._indices = index_to_packs
2336
self._reload_func = reload_func
2337
self._flush_func = flush_func
2339
def add_raw_records(self, key_sizes, raw_data):
2340
"""Add raw knit bytes to a storage area.
2342
The data is spooled to the container writer in one bytes-record per
2345
:param sizes: An iterable of tuples containing the key and size of each
2347
:param raw_data: A bytestring containing the data.
2348
:return: A list of memos to retrieve the record later. Each memo is an
2349
opaque index memo. For _DirectPackAccess the memo is (index, pos,
2350
length), where the index field is the write_index object supplied
2351
to the PackAccess object.
2353
if type(raw_data) is not str:
2354
raise AssertionError(
2355
'data must be plain bytes was %s' % type(raw_data))
2358
for key, size in key_sizes:
2359
p_offset, p_length = self._container_writer.add_bytes_record(
2360
raw_data[offset:offset+size], [])
2362
result.append((self._write_index, p_offset, p_length))
2366
"""Flush pending writes on this access object.
2368
This will flush any buffered writes to a NewPack.
2370
if self._flush_func is not None:
2373
def get_raw_records(self, memos_for_retrieval):
2374
"""Get the raw bytes for a records.
2376
:param memos_for_retrieval: An iterable containing the (index, pos,
2377
length) memo for retrieving the bytes. The Pack access method
2378
looks up the pack to use for a given record in its index_to_pack
2380
:return: An iterator over the bytes of the records.
2382
# first pass, group into same-index requests
2384
current_index = None
2385
for (index, offset, length) in memos_for_retrieval:
2386
if current_index == index:
2387
current_list.append((offset, length))
2389
if current_index is not None:
2390
request_lists.append((current_index, current_list))
2391
current_index = index
2392
current_list = [(offset, length)]
2393
# handle the last entry
2394
if current_index is not None:
2395
request_lists.append((current_index, current_list))
2396
for index, offsets in request_lists:
2398
transport, path = self._indices[index]
2400
# A KeyError here indicates that someone has triggered an index
2401
# reload, and this index has gone missing, we need to start
2403
if self._reload_func is None:
2404
# If we don't have a _reload_func there is nothing that can
2407
raise errors.RetryWithNewPacks(index,
2408
reload_occurred=True,
2409
exc_info=sys.exc_info())
2411
reader = pack.make_readv_reader(transport, path, offsets)
2412
for names, read_func in reader.iter_records():
2413
yield read_func(None)
2414
except errors.NoSuchFile:
2415
# A NoSuchFile error indicates that a pack file has gone
2416
# missing on disk, we need to trigger a reload, and start over.
2417
if self._reload_func is None:
2419
raise errors.RetryWithNewPacks(transport.abspath(path),
2420
reload_occurred=False,
2421
exc_info=sys.exc_info())
2423
def set_writer(self, writer, index, transport_packname):
2424
"""Set a writer to use for adding data."""
2425
if index is not None:
2426
self._indices[index] = transport_packname
2427
self._container_writer = writer
2428
self._write_index = index
2430
def reload_or_raise(self, retry_exc):
2431
"""Try calling the reload function, or re-raise the original exception.
2433
This should be called after _DirectPackAccess raises a
2434
RetryWithNewPacks exception. This function will handle the common logic
2435
of determining when the error is fatal versus being temporary.
2436
It will also make sure that the original exception is raised, rather
2437
than the RetryWithNewPacks exception.
2439
If this function returns, then the calling function should retry
2440
whatever operation was being performed. Otherwise an exception will
2443
:param retry_exc: A RetryWithNewPacks exception.
2446
if self._reload_func is None:
2448
elif not self._reload_func():
2449
# The reload claimed that nothing changed
2450
if not retry_exc.reload_occurred:
2451
# If there wasn't an earlier reload, then we really were
2452
# expecting to find changes. We didn't find them, so this is a
2456
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2457
raise exc_class, exc_value, exc_traceback