~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Martin Pool
  • Date: 2008-11-27 07:25:52 UTC
  • mfrom: (3789.2.15 pack_retry_153786)
  • mto: This revision was merged to the branch mainline in revision 3865.
  • Revision ID: mbp@sourcefrog.net-20081127072552-st8jnmahi0iy3lrt
merge fix to retry when packs have change

Show diffs side-by-side

added added

removed removed

Lines of Context:
64
64
from itertools import izip, chain
65
65
import operator
66
66
import os
 
67
import sys
67
68
 
68
69
from bzrlib.lazy_import import lazy_import
69
70
lazy_import(globals(), """
707
708
    """
708
709
 
709
710
    def __init__(self, index, data_access, max_delta_chain=200,
710
 
        annotated=False):
 
711
                 annotated=False, reload_func=None):
711
712
        """Create a KnitVersionedFiles with index and data_access.
712
713
 
713
714
        :param index: The index for the knit data.
717
718
            insertion. Set to 0 to prohibit the use of deltas.
718
719
        :param annotated: Set to True to cause annotations to be calculated and
719
720
            stored during insertion.
 
721
        :param reload_func: An function that can be called if we think we need
 
722
            to reload the pack listing and try again. See
 
723
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
720
724
        """
721
725
        self._index = index
722
726
        self._access = data_access
726
730
        else:
727
731
            self._factory = KnitPlainFactory()
728
732
        self._fallback_vfs = []
 
733
        self._reload_func = reload_func
729
734
 
730
735
    def __repr__(self):
731
736
        return "%s(%r, %r)" % (
1109
1114
        :param allow_missing: If some records are missing, rather than 
1110
1115
            error, just return the data that could be generated.
1111
1116
        """
1112
 
        position_map = self._get_components_positions(keys,
1113
 
            allow_missing=allow_missing)
1114
 
        # key = component_id, r = record_details, i_m = index_memo, n = next
1115
 
        records = [(key, i_m) for key, (r, i_m, n)
1116
 
                             in position_map.iteritems()]
1117
 
        record_map = {}
1118
 
        for key, record, digest in \
1119
 
                self._read_records_iter(records):
1120
 
            (record_details, index_memo, next) = position_map[key]
1121
 
            record_map[key] = record, record_details, digest, next
1122
 
        return record_map
 
1117
        # This retries the whole request if anything fails. Potentially we
 
1118
        # could be a bit more selective. We could track the keys whose records
 
1119
        # we have successfully found, and then only request the new records
 
1120
        # from there. However, _get_components_positions grabs the whole build
 
1121
        # chain, which means we'll likely try to grab the same records again
 
1122
        # anyway. Also, can the build chains change as part of a pack
 
1123
        # operation? We wouldn't want to end up with a broken chain.
 
1124
        while True:
 
1125
            try:
 
1126
                position_map = self._get_components_positions(keys,
 
1127
                    allow_missing=allow_missing)
 
1128
                # key = component_id, r = record_details, i_m = index_memo,
 
1129
                # n = next
 
1130
                records = [(key, i_m) for key, (r, i_m, n)
 
1131
                                       in position_map.iteritems()]
 
1132
                record_map = {}
 
1133
                for key, record, digest in self._read_records_iter(records):
 
1134
                    (record_details, index_memo, next) = position_map[key]
 
1135
                    record_map[key] = record, record_details, digest, next
 
1136
                return record_map
 
1137
            except errors.RetryWithNewPacks, e:
 
1138
                self._access.reload_or_raise(e)
1123
1139
 
1124
1140
    def _split_by_prefix(self, keys):
1125
1141
        """For the given keys, split them up based on their prefix.
1160
1176
        if not self._index.has_graph:
1161
1177
            # Cannot topological order when no graph has been stored.
1162
1178
            ordering = 'unordered'
 
1179
 
 
1180
        remaining_keys = keys
 
1181
        while True:
 
1182
            try:
 
1183
                keys = set(remaining_keys)
 
1184
                for content_factory in self._get_remaining_record_stream(keys,
 
1185
                                            ordering, include_delta_closure):
 
1186
                    remaining_keys.discard(content_factory.key)
 
1187
                    yield content_factory
 
1188
                return
 
1189
            except errors.RetryWithNewPacks, e:
 
1190
                self._access.reload_or_raise(e)
 
1191
 
 
1192
    def _get_remaining_record_stream(self, keys, ordering,
 
1193
                                     include_delta_closure):
 
1194
        """This function is the 'retry' portion for get_record_stream."""
1163
1195
        if include_delta_closure:
1164
1196
            positions = self._get_components_positions(keys, allow_missing=True)
1165
1197
        else:
1455
1487
            pb = progress.DummyProgress()
1456
1488
        keys = set(keys)
1457
1489
        total = len(keys)
1458
 
        # we don't care about inclusions, the caller cares.
1459
 
        # but we need to setup a list of records to visit.
1460
 
        # we need key, position, length
1461
 
        key_records = []
1462
 
        build_details = self._index.get_build_details(keys)
1463
 
        for key, details in build_details.iteritems():
1464
 
            if key in keys:
1465
 
                key_records.append((key, details[0]))
1466
 
                keys.remove(key)
1467
 
        records_iter = enumerate(self._read_records_iter(key_records))
1468
 
        for (key_idx, (key, data, sha_value)) in records_iter:
1469
 
            pb.update('Walking content.', key_idx, total)
1470
 
            compression_parent = build_details[key][1]
1471
 
            if compression_parent is None:
1472
 
                # fulltext
1473
 
                line_iterator = self._factory.get_fulltext_content(data)
1474
 
            else:
1475
 
                # Delta 
1476
 
                line_iterator = self._factory.get_linedelta_content(data)
1477
 
            # XXX: It might be more efficient to yield (key,
1478
 
            # line_iterator) in the future. However for now, this is a simpler
1479
 
            # change to integrate into the rest of the codebase. RBC 20071110
1480
 
            for line in line_iterator:
1481
 
                yield line, key
 
1490
        done = False
 
1491
        while not done:
 
1492
            try:
 
1493
                # we don't care about inclusions, the caller cares.
 
1494
                # but we need to setup a list of records to visit.
 
1495
                # we need key, position, length
 
1496
                key_records = []
 
1497
                build_details = self._index.get_build_details(keys)
 
1498
                for key, details in build_details.iteritems():
 
1499
                    if key in keys:
 
1500
                        key_records.append((key, details[0]))
 
1501
                records_iter = enumerate(self._read_records_iter(key_records))
 
1502
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1503
                    pb.update('Walking content.', key_idx, total)
 
1504
                    compression_parent = build_details[key][1]
 
1505
                    if compression_parent is None:
 
1506
                        # fulltext
 
1507
                        line_iterator = self._factory.get_fulltext_content(data)
 
1508
                    else:
 
1509
                        # Delta 
 
1510
                        line_iterator = self._factory.get_linedelta_content(data)
 
1511
                    # Now that we are yielding the data for this key, remove it
 
1512
                    # from the list
 
1513
                    keys.remove(key)
 
1514
                    # XXX: It might be more efficient to yield (key,
 
1515
                    # line_iterator) in the future. However for now, this is a
 
1516
                    # simpler change to integrate into the rest of the
 
1517
                    # codebase. RBC 20071110
 
1518
                    for line in line_iterator:
 
1519
                        yield line, key
 
1520
                done = True
 
1521
            except errors.RetryWithNewPacks, e:
 
1522
                self._access.reload_or_raise(e)
1482
1523
        # If there are still keys we've not yet found, we look in the fallback
1483
1524
        # vfs, and hope to find them there.  Note that if the keys are found
1484
1525
        # but had no changes or no content, the fallback may not return
1891
1932
                extra information about the content which needs to be passed to
1892
1933
                Factory.parse_record
1893
1934
        """
1894
 
        prefixes = self._partition_keys(keys)
1895
1935
        parent_map = self.get_parent_map(keys)
1896
1936
        result = {}
1897
1937
        for key in keys:
2417
2457
class _DirectPackAccess(object):
2418
2458
    """Access to data in one or more packs with less translation."""
2419
2459
 
2420
 
    def __init__(self, index_to_packs):
 
2460
    def __init__(self, index_to_packs, reload_func=None):
2421
2461
        """Create a _DirectPackAccess object.
2422
2462
 
2423
2463
        :param index_to_packs: A dict mapping index objects to the transport
2424
2464
            and file names for obtaining data.
 
2465
        :param reload_func: A function to call if we determine that the pack
 
2466
            files have moved and we need to reload our caches. See
 
2467
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2425
2468
        """
2426
2469
        self._container_writer = None
2427
2470
        self._write_index = None
2428
2471
        self._indices = index_to_packs
 
2472
        self._reload_func = reload_func
2429
2473
 
2430
2474
    def add_raw_records(self, key_sizes, raw_data):
2431
2475
        """Add raw knit bytes to a storage area.
2477
2521
        if current_index is not None:
2478
2522
            request_lists.append((current_index, current_list))
2479
2523
        for index, offsets in request_lists:
2480
 
            transport, path = self._indices[index]
2481
 
            reader = pack.make_readv_reader(transport, path, offsets)
2482
 
            for names, read_func in reader.iter_records():
2483
 
                yield read_func(None)
 
2524
            try:
 
2525
                transport, path = self._indices[index]
 
2526
            except KeyError:
 
2527
                # A KeyError here indicates that someone has triggered an index
 
2528
                # reload, and this index has gone missing, we need to start
 
2529
                # over.
 
2530
                if self._reload_func is None:
 
2531
                    # If we don't have a _reload_func there is nothing that can
 
2532
                    # be done
 
2533
                    raise
 
2534
                raise errors.RetryWithNewPacks(reload_occurred=True,
 
2535
                                               exc_info=sys.exc_info())
 
2536
            try:
 
2537
                reader = pack.make_readv_reader(transport, path, offsets)
 
2538
                for names, read_func in reader.iter_records():
 
2539
                    yield read_func(None)
 
2540
            except errors.NoSuchFile:
 
2541
                # A NoSuchFile error indicates that a pack file has gone
 
2542
                # missing on disk, we need to trigger a reload, and start over.
 
2543
                if self._reload_func is None:
 
2544
                    raise
 
2545
                raise errors.RetryWithNewPacks(reload_occurred=False,
 
2546
                                               exc_info=sys.exc_info())
2484
2547
 
2485
2548
    def set_writer(self, writer, index, transport_packname):
2486
2549
        """Set a writer to use for adding data."""
2489
2552
        self._container_writer = writer
2490
2553
        self._write_index = index
2491
2554
 
 
2555
    def reload_or_raise(self, retry_exc):
 
2556
        """Try calling the reload function, or re-raise the original exception.
 
2557
 
 
2558
        This should be called after _DirectPackAccess raises a
 
2559
        RetryWithNewPacks exception. This function will handle the common logic
 
2560
        of determining when the error is fatal versus being temporary.
 
2561
        It will also make sure that the original exception is raised, rather
 
2562
        than the RetryWithNewPacks exception.
 
2563
 
 
2564
        If this function returns, then the calling function should retry
 
2565
        whatever operation was being performed. Otherwise an exception will
 
2566
        be raised.
 
2567
 
 
2568
        :param retry_exc: A RetryWithNewPacks exception.
 
2569
        """
 
2570
        is_error = False
 
2571
        if self._reload_func is None:
 
2572
            is_error = True
 
2573
        elif not self._reload_func():
 
2574
            # The reload claimed that nothing changed
 
2575
            if not retry_exc.reload_occurred:
 
2576
                # If there wasn't an earlier reload, then we really were
 
2577
                # expecting to find changes. We didn't find them, so this is a
 
2578
                # hard error
 
2579
                is_error = True
 
2580
        if is_error:
 
2581
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
2582
            raise exc_class, exc_value, exc_traceback
 
2583
 
2492
2584
 
2493
2585
# Deprecated, use PatienceSequenceMatcher instead
2494
2586
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2767
2859
        if len(self._knit._fallback_vfs) > 0:
2768
2860
            # stacked knits can't use the fast path at present.
2769
2861
            return self._simple_annotate(key)
2770
 
        records = self._get_build_graph(key)
2771
 
        if key in self._ghosts:
2772
 
            raise errors.RevisionNotPresent(key, self._knit)
2773
 
        self._annotate_records(records)
2774
 
        return self._annotated_lines[key]
 
2862
        while True:
 
2863
            try:
 
2864
                records = self._get_build_graph(key)
 
2865
                if key in self._ghosts:
 
2866
                    raise errors.RevisionNotPresent(key, self._knit)
 
2867
                self._annotate_records(records)
 
2868
                return self._annotated_lines[key]
 
2869
            except errors.RetryWithNewPacks, e:
 
2870
                self._knit._access.reload_or_raise(e)
 
2871
                # The cached build_details are no longer valid
 
2872
                self._all_build_details.clear()
2775
2873
 
2776
2874
    def _simple_annotate(self, key):
2777
2875
        """Return annotated fulltext, rediffing from the full texts.