1109
1114
:param allow_missing: If some records are missing, rather than
1110
1115
error, just return the data that could be generated.
1112
position_map = self._get_components_positions(keys,
1113
allow_missing=allow_missing)
1114
# key = component_id, r = record_details, i_m = index_memo, n = next
1115
records = [(key, i_m) for key, (r, i_m, n)
1116
in position_map.iteritems()]
1118
for key, record, digest in \
1119
self._read_records_iter(records):
1120
(record_details, index_memo, next) = position_map[key]
1121
record_map[key] = record, record_details, digest, next
1117
# This retries the whole request if anything fails. Potentially we
1118
# could be a bit more selective. We could track the keys whose records
1119
# we have successfully found, and then only request the new records
1120
# from there. However, _get_components_positions grabs the whole build
1121
# chain, which means we'll likely try to grab the same records again
1122
# anyway. Also, can the build chains change as part of a pack
1123
# operation? We wouldn't want to end up with a broken chain.
1126
position_map = self._get_components_positions(keys,
1127
allow_missing=allow_missing)
1128
# key = component_id, r = record_details, i_m = index_memo,
1130
records = [(key, i_m) for key, (r, i_m, n)
1131
in position_map.iteritems()]
1133
for key, record, digest in self._read_records_iter(records):
1134
(record_details, index_memo, next) = position_map[key]
1135
record_map[key] = record, record_details, digest, next
1137
except errors.RetryWithNewPacks, e:
1138
self._access.reload_or_raise(e)
1124
1140
def _split_by_prefix(self, keys):
1125
1141
"""For the given keys, split them up based on their prefix.
1160
1176
if not self._index.has_graph:
1161
1177
# Cannot topological order when no graph has been stored.
1162
1178
ordering = 'unordered'
1180
remaining_keys = keys
1183
keys = set(remaining_keys)
1184
for content_factory in self._get_remaining_record_stream(keys,
1185
ordering, include_delta_closure):
1186
remaining_keys.discard(content_factory.key)
1187
yield content_factory
1189
except errors.RetryWithNewPacks, e:
1190
self._access.reload_or_raise(e)
1192
def _get_remaining_record_stream(self, keys, ordering,
1193
include_delta_closure):
1194
"""This function is the 'retry' portion for get_record_stream."""
1163
1195
if include_delta_closure:
1164
1196
positions = self._get_components_positions(keys, allow_missing=True)
1455
1487
pb = progress.DummyProgress()
1456
1488
keys = set(keys)
1457
1489
total = len(keys)
1458
# we don't care about inclusions, the caller cares.
1459
# but we need to setup a list of records to visit.
1460
# we need key, position, length
1462
build_details = self._index.get_build_details(keys)
1463
for key, details in build_details.iteritems():
1465
key_records.append((key, details[0]))
1467
records_iter = enumerate(self._read_records_iter(key_records))
1468
for (key_idx, (key, data, sha_value)) in records_iter:
1469
pb.update('Walking content.', key_idx, total)
1470
compression_parent = build_details[key][1]
1471
if compression_parent is None:
1473
line_iterator = self._factory.get_fulltext_content(data)
1476
line_iterator = self._factory.get_linedelta_content(data)
1477
# XXX: It might be more efficient to yield (key,
1478
# line_iterator) in the future. However for now, this is a simpler
1479
# change to integrate into the rest of the codebase. RBC 20071110
1480
for line in line_iterator:
1493
# we don't care about inclusions, the caller cares.
1494
# but we need to setup a list of records to visit.
1495
# we need key, position, length
1497
build_details = self._index.get_build_details(keys)
1498
for key, details in build_details.iteritems():
1500
key_records.append((key, details[0]))
1501
records_iter = enumerate(self._read_records_iter(key_records))
1502
for (key_idx, (key, data, sha_value)) in records_iter:
1503
pb.update('Walking content.', key_idx, total)
1504
compression_parent = build_details[key][1]
1505
if compression_parent is None:
1507
line_iterator = self._factory.get_fulltext_content(data)
1510
line_iterator = self._factory.get_linedelta_content(data)
1511
# Now that we are yielding the data for this key, remove it
1514
# XXX: It might be more efficient to yield (key,
1515
# line_iterator) in the future. However for now, this is a
1516
# simpler change to integrate into the rest of the
1517
# codebase. RBC 20071110
1518
for line in line_iterator:
1521
except errors.RetryWithNewPacks, e:
1522
self._access.reload_or_raise(e)
1482
1523
# If there are still keys we've not yet found, we look in the fallback
1483
1524
# vfs, and hope to find them there. Note that if the keys are found
1484
1525
# but had no changes or no content, the fallback may not return
2417
2457
class _DirectPackAccess(object):
2418
2458
"""Access to data in one or more packs with less translation."""
2420
def __init__(self, index_to_packs):
2460
def __init__(self, index_to_packs, reload_func=None):
2421
2461
"""Create a _DirectPackAccess object.
2423
2463
:param index_to_packs: A dict mapping index objects to the transport
2424
2464
and file names for obtaining data.
2465
:param reload_func: A function to call if we determine that the pack
2466
files have moved and we need to reload our caches. See
2467
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2426
2469
self._container_writer = None
2427
2470
self._write_index = None
2428
2471
self._indices = index_to_packs
2472
self._reload_func = reload_func
2430
2474
def add_raw_records(self, key_sizes, raw_data):
2431
2475
"""Add raw knit bytes to a storage area.
2477
2521
if current_index is not None:
2478
2522
request_lists.append((current_index, current_list))
2479
2523
for index, offsets in request_lists:
2480
transport, path = self._indices[index]
2481
reader = pack.make_readv_reader(transport, path, offsets)
2482
for names, read_func in reader.iter_records():
2483
yield read_func(None)
2525
transport, path = self._indices[index]
2527
# A KeyError here indicates that someone has triggered an index
2528
# reload, and this index has gone missing, we need to start
2530
if self._reload_func is None:
2531
# If we don't have a _reload_func there is nothing that can
2534
raise errors.RetryWithNewPacks(reload_occurred=True,
2535
exc_info=sys.exc_info())
2537
reader = pack.make_readv_reader(transport, path, offsets)
2538
for names, read_func in reader.iter_records():
2539
yield read_func(None)
2540
except errors.NoSuchFile:
2541
# A NoSuchFile error indicates that a pack file has gone
2542
# missing on disk, we need to trigger a reload, and start over.
2543
if self._reload_func is None:
2545
raise errors.RetryWithNewPacks(reload_occurred=False,
2546
exc_info=sys.exc_info())
2485
2548
def set_writer(self, writer, index, transport_packname):
2486
2549
"""Set a writer to use for adding data."""
2489
2552
self._container_writer = writer
2490
2553
self._write_index = index
2555
def reload_or_raise(self, retry_exc):
2556
"""Try calling the reload function, or re-raise the original exception.
2558
This should be called after _DirectPackAccess raises a
2559
RetryWithNewPacks exception. This function will handle the common logic
2560
of determining when the error is fatal versus being temporary.
2561
It will also make sure that the original exception is raised, rather
2562
than the RetryWithNewPacks exception.
2564
If this function returns, then the calling function should retry
2565
whatever operation was being performed. Otherwise an exception will
2568
:param retry_exc: A RetryWithNewPacks exception.
2571
if self._reload_func is None:
2573
elif not self._reload_func():
2574
# The reload claimed that nothing changed
2575
if not retry_exc.reload_occurred:
2576
# If there wasn't an earlier reload, then we really were
2577
# expecting to find changes. We didn't find them, so this is a
2581
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2582
raise exc_class, exc_value, exc_traceback
2493
2585
# Deprecated, use PatienceSequenceMatcher instead
2494
2586
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2767
2859
if len(self._knit._fallback_vfs) > 0:
2768
2860
# stacked knits can't use the fast path at present.
2769
2861
return self._simple_annotate(key)
2770
records = self._get_build_graph(key)
2771
if key in self._ghosts:
2772
raise errors.RevisionNotPresent(key, self._knit)
2773
self._annotate_records(records)
2774
return self._annotated_lines[key]
2864
records = self._get_build_graph(key)
2865
if key in self._ghosts:
2866
raise errors.RevisionNotPresent(key, self._knit)
2867
self._annotate_records(records)
2868
return self._annotated_lines[key]
2869
except errors.RetryWithNewPacks, e:
2870
self._knit._access.reload_or_raise(e)
2871
# The cached build_details are no longer valid
2872
self._all_build_details.clear()
2776
2874
def _simple_annotate(self, key):
2777
2875
"""Return annotated fulltext, rediffing from the full texts.