~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-11-27 08:26:50 UTC
  • mfrom: (3860.1.1 153786-retry)
  • Revision ID: pqm@pqm.ubuntu.com-20081127082650-adzra5ok5apue0gl
(mbp, for jam) retry when pack operations fail

Show diffs side-by-side

added added

removed removed

Lines of Context:
64
64
from itertools import izip, chain
65
65
import operator
66
66
import os
 
67
import sys
67
68
 
68
69
from bzrlib.lazy_import import lazy_import
69
70
lazy_import(globals(), """
707
708
    """
708
709
 
709
710
    def __init__(self, index, data_access, max_delta_chain=200,
710
 
        annotated=False):
 
711
                 annotated=False, reload_func=None):
711
712
        """Create a KnitVersionedFiles with index and data_access.
712
713
 
713
714
        :param index: The index for the knit data.
717
718
            insertion. Set to 0 to prohibit the use of deltas.
718
719
        :param annotated: Set to True to cause annotations to be calculated and
719
720
            stored during insertion.
 
721
        :param reload_func: An function that can be called if we think we need
 
722
            to reload the pack listing and try again. See
 
723
            'bzrlib.repofmt.pack_repo.AggregateIndex' for the signature.
720
724
        """
721
725
        self._index = index
722
726
        self._access = data_access
726
730
        else:
727
731
            self._factory = KnitPlainFactory()
728
732
        self._fallback_vfs = []
 
733
        self._reload_func = reload_func
729
734
 
730
735
    def __repr__(self):
731
736
        return "%s(%r, %r)" % (
1109
1114
        :param allow_missing: If some records are missing, rather than 
1110
1115
            error, just return the data that could be generated.
1111
1116
        """
1112
 
        position_map = self._get_components_positions(keys,
1113
 
            allow_missing=allow_missing)
1114
 
        # key = component_id, r = record_details, i_m = index_memo, n = next
1115
 
        records = [(key, i_m) for key, (r, i_m, n)
1116
 
                             in position_map.iteritems()]
1117
 
        record_map = {}
1118
 
        for key, record, digest in \
1119
 
                self._read_records_iter(records):
1120
 
            (record_details, index_memo, next) = position_map[key]
1121
 
            record_map[key] = record, record_details, digest, next
1122
 
        return record_map
 
1117
        # This retries the whole request if anything fails. Potentially we
 
1118
        # could be a bit more selective. We could track the keys whose records
 
1119
        # we have successfully found, and then only request the new records
 
1120
        # from there. However, _get_components_positions grabs the whole build
 
1121
        # chain, which means we'll likely try to grab the same records again
 
1122
        # anyway. Also, can the build chains change as part of a pack
 
1123
        # operation? We wouldn't want to end up with a broken chain.
 
1124
        while True:
 
1125
            try:
 
1126
                position_map = self._get_components_positions(keys,
 
1127
                    allow_missing=allow_missing)
 
1128
                # key = component_id, r = record_details, i_m = index_memo,
 
1129
                # n = next
 
1130
                records = [(key, i_m) for key, (r, i_m, n)
 
1131
                                       in position_map.iteritems()]
 
1132
                record_map = {}
 
1133
                for key, record, digest in self._read_records_iter(records):
 
1134
                    (record_details, index_memo, next) = position_map[key]
 
1135
                    record_map[key] = record, record_details, digest, next
 
1136
                return record_map
 
1137
            except errors.RetryWithNewPacks, e:
 
1138
                self._access.reload_or_raise(e)
1123
1139
 
1124
1140
    def _split_by_prefix(self, keys):
1125
1141
        """For the given keys, split them up based on their prefix.
1160
1176
        if not self._index.has_graph:
1161
1177
            # Cannot topological order when no graph has been stored.
1162
1178
            ordering = 'unordered'
 
1179
 
 
1180
        remaining_keys = keys
 
1181
        while True:
 
1182
            try:
 
1183
                keys = set(remaining_keys)
 
1184
                for content_factory in self._get_remaining_record_stream(keys,
 
1185
                                            ordering, include_delta_closure):
 
1186
                    remaining_keys.discard(content_factory.key)
 
1187
                    yield content_factory
 
1188
                return
 
1189
            except errors.RetryWithNewPacks, e:
 
1190
                self._access.reload_or_raise(e)
 
1191
 
 
1192
    def _get_remaining_record_stream(self, keys, ordering,
 
1193
                                     include_delta_closure):
 
1194
        """This function is the 'retry' portion for get_record_stream."""
1163
1195
        if include_delta_closure:
1164
1196
            positions = self._get_components_positions(keys, allow_missing=True)
1165
1197
        else:
1457
1489
            pb = progress.DummyProgress()
1458
1490
        keys = set(keys)
1459
1491
        total = len(keys)
1460
 
        # we don't care about inclusions, the caller cares.
1461
 
        # but we need to setup a list of records to visit.
1462
 
        # we need key, position, length
1463
 
        key_records = []
1464
 
        build_details = self._index.get_build_details(keys)
1465
 
        for key, details in build_details.iteritems():
1466
 
            if key in keys:
1467
 
                key_records.append((key, details[0]))
1468
 
                keys.remove(key)
1469
 
        records_iter = enumerate(self._read_records_iter(key_records))
1470
 
        for (key_idx, (key, data, sha_value)) in records_iter:
1471
 
            pb.update('Walking content.', key_idx, total)
1472
 
            compression_parent = build_details[key][1]
1473
 
            if compression_parent is None:
1474
 
                # fulltext
1475
 
                line_iterator = self._factory.get_fulltext_content(data)
1476
 
            else:
1477
 
                # Delta 
1478
 
                line_iterator = self._factory.get_linedelta_content(data)
1479
 
            # XXX: It might be more efficient to yield (key,
1480
 
            # line_iterator) in the future. However for now, this is a simpler
1481
 
            # change to integrate into the rest of the codebase. RBC 20071110
1482
 
            for line in line_iterator:
1483
 
                yield line, key
 
1492
        done = False
 
1493
        while not done:
 
1494
            try:
 
1495
                # we don't care about inclusions, the caller cares.
 
1496
                # but we need to setup a list of records to visit.
 
1497
                # we need key, position, length
 
1498
                key_records = []
 
1499
                build_details = self._index.get_build_details(keys)
 
1500
                for key, details in build_details.iteritems():
 
1501
                    if key in keys:
 
1502
                        key_records.append((key, details[0]))
 
1503
                records_iter = enumerate(self._read_records_iter(key_records))
 
1504
                for (key_idx, (key, data, sha_value)) in records_iter:
 
1505
                    pb.update('Walking content.', key_idx, total)
 
1506
                    compression_parent = build_details[key][1]
 
1507
                    if compression_parent is None:
 
1508
                        # fulltext
 
1509
                        line_iterator = self._factory.get_fulltext_content(data)
 
1510
                    else:
 
1511
                        # Delta 
 
1512
                        line_iterator = self._factory.get_linedelta_content(data)
 
1513
                    # Now that we are yielding the data for this key, remove it
 
1514
                    # from the list
 
1515
                    keys.remove(key)
 
1516
                    # XXX: It might be more efficient to yield (key,
 
1517
                    # line_iterator) in the future. However for now, this is a
 
1518
                    # simpler change to integrate into the rest of the
 
1519
                    # codebase. RBC 20071110
 
1520
                    for line in line_iterator:
 
1521
                        yield line, key
 
1522
                done = True
 
1523
            except errors.RetryWithNewPacks, e:
 
1524
                self._access.reload_or_raise(e)
1484
1525
        # If there are still keys we've not yet found, we look in the fallback
1485
1526
        # vfs, and hope to find them there.  Note that if the keys are found
1486
1527
        # but had no changes or no content, the fallback may not return
1893
1934
                extra information about the content which needs to be passed to
1894
1935
                Factory.parse_record
1895
1936
        """
1896
 
        prefixes = self._partition_keys(keys)
1897
1937
        parent_map = self.get_parent_map(keys)
1898
1938
        result = {}
1899
1939
        for key in keys:
2419
2459
class _DirectPackAccess(object):
2420
2460
    """Access to data in one or more packs with less translation."""
2421
2461
 
2422
 
    def __init__(self, index_to_packs):
 
2462
    def __init__(self, index_to_packs, reload_func=None):
2423
2463
        """Create a _DirectPackAccess object.
2424
2464
 
2425
2465
        :param index_to_packs: A dict mapping index objects to the transport
2426
2466
            and file names for obtaining data.
 
2467
        :param reload_func: A function to call if we determine that the pack
 
2468
            files have moved and we need to reload our caches. See
 
2469
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2427
2470
        """
2428
2471
        self._container_writer = None
2429
2472
        self._write_index = None
2430
2473
        self._indices = index_to_packs
 
2474
        self._reload_func = reload_func
2431
2475
 
2432
2476
    def add_raw_records(self, key_sizes, raw_data):
2433
2477
        """Add raw knit bytes to a storage area.
2479
2523
        if current_index is not None:
2480
2524
            request_lists.append((current_index, current_list))
2481
2525
        for index, offsets in request_lists:
2482
 
            transport, path = self._indices[index]
2483
 
            reader = pack.make_readv_reader(transport, path, offsets)
2484
 
            for names, read_func in reader.iter_records():
2485
 
                yield read_func(None)
 
2526
            try:
 
2527
                transport, path = self._indices[index]
 
2528
            except KeyError:
 
2529
                # A KeyError here indicates that someone has triggered an index
 
2530
                # reload, and this index has gone missing, we need to start
 
2531
                # over.
 
2532
                if self._reload_func is None:
 
2533
                    # If we don't have a _reload_func there is nothing that can
 
2534
                    # be done
 
2535
                    raise
 
2536
                raise errors.RetryWithNewPacks(reload_occurred=True,
 
2537
                                               exc_info=sys.exc_info())
 
2538
            try:
 
2539
                reader = pack.make_readv_reader(transport, path, offsets)
 
2540
                for names, read_func in reader.iter_records():
 
2541
                    yield read_func(None)
 
2542
            except errors.NoSuchFile:
 
2543
                # A NoSuchFile error indicates that a pack file has gone
 
2544
                # missing on disk, we need to trigger a reload, and start over.
 
2545
                if self._reload_func is None:
 
2546
                    raise
 
2547
                raise errors.RetryWithNewPacks(reload_occurred=False,
 
2548
                                               exc_info=sys.exc_info())
2486
2549
 
2487
2550
    def set_writer(self, writer, index, transport_packname):
2488
2551
        """Set a writer to use for adding data."""
2491
2554
        self._container_writer = writer
2492
2555
        self._write_index = index
2493
2556
 
 
2557
    def reload_or_raise(self, retry_exc):
 
2558
        """Try calling the reload function, or re-raise the original exception.
 
2559
 
 
2560
        This should be called after _DirectPackAccess raises a
 
2561
        RetryWithNewPacks exception. This function will handle the common logic
 
2562
        of determining when the error is fatal versus being temporary.
 
2563
        It will also make sure that the original exception is raised, rather
 
2564
        than the RetryWithNewPacks exception.
 
2565
 
 
2566
        If this function returns, then the calling function should retry
 
2567
        whatever operation was being performed. Otherwise an exception will
 
2568
        be raised.
 
2569
 
 
2570
        :param retry_exc: A RetryWithNewPacks exception.
 
2571
        """
 
2572
        is_error = False
 
2573
        if self._reload_func is None:
 
2574
            is_error = True
 
2575
        elif not self._reload_func():
 
2576
            # The reload claimed that nothing changed
 
2577
            if not retry_exc.reload_occurred:
 
2578
                # If there wasn't an earlier reload, then we really were
 
2579
                # expecting to find changes. We didn't find them, so this is a
 
2580
                # hard error
 
2581
                is_error = True
 
2582
        if is_error:
 
2583
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
2584
            raise exc_class, exc_value, exc_traceback
 
2585
 
2494
2586
 
2495
2587
# Deprecated, use PatienceSequenceMatcher instead
2496
2588
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2769
2861
        if len(self._knit._fallback_vfs) > 0:
2770
2862
            # stacked knits can't use the fast path at present.
2771
2863
            return self._simple_annotate(key)
2772
 
        records = self._get_build_graph(key)
2773
 
        if key in self._ghosts:
2774
 
            raise errors.RevisionNotPresent(key, self._knit)
2775
 
        self._annotate_records(records)
2776
 
        return self._annotated_lines[key]
 
2864
        while True:
 
2865
            try:
 
2866
                records = self._get_build_graph(key)
 
2867
                if key in self._ghosts:
 
2868
                    raise errors.RevisionNotPresent(key, self._knit)
 
2869
                self._annotate_records(records)
 
2870
                return self._annotated_lines[key]
 
2871
            except errors.RetryWithNewPacks, e:
 
2872
                self._knit._access.reload_or_raise(e)
 
2873
                # The cached build_details are no longer valid
 
2874
                self._all_build_details.clear()
2777
2875
 
2778
2876
    def _simple_annotate(self, key):
2779
2877
        """Return annotated fulltext, rediffing from the full texts.