532
class _DirectPackAccess(object):
533
"""Access to data in one or more packs with less translation."""
535
def __init__(self, index_to_packs, reload_func=None, flush_func=None):
536
"""Create a _DirectPackAccess object.
538
:param index_to_packs: A dict mapping index objects to the transport
539
and file names for obtaining data.
540
:param reload_func: A function to call if we determine that the pack
541
files have moved and we need to reload our caches. See
542
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
544
self._container_writer = None
545
self._write_index = None
546
self._indices = index_to_packs
547
self._reload_func = reload_func
548
self._flush_func = flush_func
550
def add_raw_records(self, key_sizes, raw_data):
551
"""Add raw knit bytes to a storage area.
553
The data is spooled to the container writer in one bytes-record per
556
:param sizes: An iterable of tuples containing the key and size of each
558
:param raw_data: A bytestring containing the data.
559
:return: A list of memos to retrieve the record later. Each memo is an
560
opaque index memo. For _DirectPackAccess the memo is (index, pos,
561
length), where the index field is the write_index object supplied
562
to the PackAccess object.
564
if type(raw_data) is not str:
565
raise AssertionError(
566
'data must be plain bytes was %s' % type(raw_data))
569
for key, size in key_sizes:
570
p_offset, p_length = self._container_writer.add_bytes_record(
571
raw_data[offset:offset+size], [])
573
result.append((self._write_index, p_offset, p_length))
577
"""Flush pending writes on this access object.
579
This will flush any buffered writes to a NewPack.
581
if self._flush_func is not None:
584
def get_raw_records(self, memos_for_retrieval):
585
"""Get the raw bytes for a records.
587
:param memos_for_retrieval: An iterable containing the (index, pos,
588
length) memo for retrieving the bytes. The Pack access method
589
looks up the pack to use for a given record in its index_to_pack
591
:return: An iterator over the bytes of the records.
593
# first pass, group into same-index requests
596
for (index, offset, length) in memos_for_retrieval:
597
if current_index == index:
598
current_list.append((offset, length))
600
if current_index is not None:
601
request_lists.append((current_index, current_list))
602
current_index = index
603
current_list = [(offset, length)]
604
# handle the last entry
605
if current_index is not None:
606
request_lists.append((current_index, current_list))
607
for index, offsets in request_lists:
609
transport, path = self._indices[index]
611
# A KeyError here indicates that someone has triggered an index
612
# reload, and this index has gone missing, we need to start
614
if self._reload_func is None:
615
# If we don't have a _reload_func there is nothing that can
618
raise errors.RetryWithNewPacks(index,
619
reload_occurred=True,
620
exc_info=sys.exc_info())
622
reader = make_readv_reader(transport, path, offsets)
623
for names, read_func in reader.iter_records():
624
yield read_func(None)
625
except errors.NoSuchFile:
626
# A NoSuchFile error indicates that a pack file has gone
627
# missing on disk, we need to trigger a reload, and start over.
628
if self._reload_func is None:
630
raise errors.RetryWithNewPacks(transport.abspath(path),
631
reload_occurred=False,
632
exc_info=sys.exc_info())
634
def set_writer(self, writer, index, transport_packname):
635
"""Set a writer to use for adding data."""
636
if index is not None:
637
self._indices[index] = transport_packname
638
self._container_writer = writer
639
self._write_index = index
641
def reload_or_raise(self, retry_exc):
642
"""Try calling the reload function, or re-raise the original exception.
644
This should be called after _DirectPackAccess raises a
645
RetryWithNewPacks exception. This function will handle the common logic
646
of determining when the error is fatal versus being temporary.
647
It will also make sure that the original exception is raised, rather
648
than the RetryWithNewPacks exception.
650
If this function returns, then the calling function should retry
651
whatever operation was being performed. Otherwise an exception will
654
:param retry_exc: A RetryWithNewPacks exception.
657
if self._reload_func is None:
659
elif not self._reload_func():
660
# The reload claimed that nothing changed
661
if not retry_exc.reload_occurred:
662
# If there wasn't an earlier reload, then we really were
663
# expecting to find changes. We didn't find them, so this is a
667
exc_class, exc_value, exc_traceback = retry_exc.exc_info
668
raise exc_class, exc_value, exc_traceback