~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/pack.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-05 23:59:29 UTC
  • mfrom: (5757.5.1 noknit)
  • mto: This revision was merged to the branch mainline in revision 5801.
  • Revision ID: jelmer@samba.org-20110405235929-e3e0111ttdehg16r
Merge moving of _DirectPackAccess.

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
from cStringIO import StringIO
24
24
import re
25
 
import sys
26
25
 
27
26
from bzrlib import errors
28
27
 
529
528
            break
530
529
 
531
530
 
532
 
class _DirectPackAccess(object):
533
 
    """Access to data in one or more packs with less translation."""
534
 
 
535
 
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
536
 
        """Create a _DirectPackAccess object.
537
 
 
538
 
        :param index_to_packs: A dict mapping index objects to the transport
539
 
            and file names for obtaining data.
540
 
        :param reload_func: A function to call if we determine that the pack
541
 
            files have moved and we need to reload our caches. See
542
 
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
543
 
        """
544
 
        self._container_writer = None
545
 
        self._write_index = None
546
 
        self._indices = index_to_packs
547
 
        self._reload_func = reload_func
548
 
        self._flush_func = flush_func
549
 
 
550
 
    def add_raw_records(self, key_sizes, raw_data):
551
 
        """Add raw knit bytes to a storage area.
552
 
 
553
 
        The data is spooled to the container writer in one bytes-record per
554
 
        raw data item.
555
 
 
556
 
        :param sizes: An iterable of tuples containing the key and size of each
557
 
            raw data segment.
558
 
        :param raw_data: A bytestring containing the data.
559
 
        :return: A list of memos to retrieve the record later. Each memo is an
560
 
            opaque index memo. For _DirectPackAccess the memo is (index, pos,
561
 
            length), where the index field is the write_index object supplied
562
 
            to the PackAccess object.
563
 
        """
564
 
        if type(raw_data) is not str:
565
 
            raise AssertionError(
566
 
                'data must be plain bytes was %s' % type(raw_data))
567
 
        result = []
568
 
        offset = 0
569
 
        for key, size in key_sizes:
570
 
            p_offset, p_length = self._container_writer.add_bytes_record(
571
 
                raw_data[offset:offset+size], [])
572
 
            offset += size
573
 
            result.append((self._write_index, p_offset, p_length))
574
 
        return result
575
 
 
576
 
    def flush(self):
577
 
        """Flush pending writes on this access object.
578
 
 
579
 
        This will flush any buffered writes to a NewPack.
580
 
        """
581
 
        if self._flush_func is not None:
582
 
            self._flush_func()
583
 
 
584
 
    def get_raw_records(self, memos_for_retrieval):
585
 
        """Get the raw bytes for a records.
586
 
 
587
 
        :param memos_for_retrieval: An iterable containing the (index, pos,
588
 
            length) memo for retrieving the bytes. The Pack access method
589
 
            looks up the pack to use for a given record in its index_to_pack
590
 
            map.
591
 
        :return: An iterator over the bytes of the records.
592
 
        """
593
 
        # first pass, group into same-index requests
594
 
        request_lists = []
595
 
        current_index = None
596
 
        for (index, offset, length) in memos_for_retrieval:
597
 
            if current_index == index:
598
 
                current_list.append((offset, length))
599
 
            else:
600
 
                if current_index is not None:
601
 
                    request_lists.append((current_index, current_list))
602
 
                current_index = index
603
 
                current_list = [(offset, length)]
604
 
        # handle the last entry
605
 
        if current_index is not None:
606
 
            request_lists.append((current_index, current_list))
607
 
        for index, offsets in request_lists:
608
 
            try:
609
 
                transport, path = self._indices[index]
610
 
            except KeyError:
611
 
                # A KeyError here indicates that someone has triggered an index
612
 
                # reload, and this index has gone missing, we need to start
613
 
                # over.
614
 
                if self._reload_func is None:
615
 
                    # If we don't have a _reload_func there is nothing that can
616
 
                    # be done
617
 
                    raise
618
 
                raise errors.RetryWithNewPacks(index,
619
 
                                               reload_occurred=True,
620
 
                                               exc_info=sys.exc_info())
621
 
            try:
622
 
                reader = make_readv_reader(transport, path, offsets)
623
 
                for names, read_func in reader.iter_records():
624
 
                    yield read_func(None)
625
 
            except errors.NoSuchFile:
626
 
                # A NoSuchFile error indicates that a pack file has gone
627
 
                # missing on disk, we need to trigger a reload, and start over.
628
 
                if self._reload_func is None:
629
 
                    raise
630
 
                raise errors.RetryWithNewPacks(transport.abspath(path),
631
 
                                               reload_occurred=False,
632
 
                                               exc_info=sys.exc_info())
633
 
 
634
 
    def set_writer(self, writer, index, transport_packname):
635
 
        """Set a writer to use for adding data."""
636
 
        if index is not None:
637
 
            self._indices[index] = transport_packname
638
 
        self._container_writer = writer
639
 
        self._write_index = index
640
 
 
641
 
    def reload_or_raise(self, retry_exc):
642
 
        """Try calling the reload function, or re-raise the original exception.
643
 
 
644
 
        This should be called after _DirectPackAccess raises a
645
 
        RetryWithNewPacks exception. This function will handle the common logic
646
 
        of determining when the error is fatal versus being temporary.
647
 
        It will also make sure that the original exception is raised, rather
648
 
        than the RetryWithNewPacks exception.
649
 
 
650
 
        If this function returns, then the calling function should retry
651
 
        whatever operation was being performed. Otherwise an exception will
652
 
        be raised.
653
 
 
654
 
        :param retry_exc: A RetryWithNewPacks exception.
655
 
        """
656
 
        is_error = False
657
 
        if self._reload_func is None:
658
 
            is_error = True
659
 
        elif not self._reload_func():
660
 
            # The reload claimed that nothing changed
661
 
            if not retry_exc.reload_occurred:
662
 
                # If there wasn't an earlier reload, then we really were
663
 
                # expecting to find changes. We didn't find them, so this is a
664
 
                # hard error
665
 
                is_error = True
666
 
        if is_error:
667
 
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
668
 
            raise exc_class, exc_value, exc_traceback
669
 
 
670
 
 
671
531