1109
1114
:param allow_missing: If some records are missing, rather than
1110
1115
error, just return the data that could be generated.
1112
position_map = self._get_components_positions(keys,
1113
allow_missing=allow_missing)
1114
# key = component_id, r = record_details, i_m = index_memo, n = next
1115
records = [(key, i_m) for key, (r, i_m, n)
1116
in position_map.iteritems()]
1118
for key, record, digest in \
1119
self._read_records_iter(records):
1120
(record_details, index_memo, next) = position_map[key]
1121
record_map[key] = record, record_details, digest, next
1117
# This retries the whole request if anything fails. Potentially we
1118
# could be a bit more selective. We could track the keys whose records
1119
# we have successfully found, and then only request the new records
1120
# from there. However, _get_components_positions grabs the whole build
1121
# chain, which means we'll likely try to grab the same records again
1122
# anyway. Also, can the build chains change as part of a pack
1123
# operation? We wouldn't want to end up with a broken chain.
1126
position_map = self._get_components_positions(keys,
1127
allow_missing=allow_missing)
1128
# key = component_id, r = record_details, i_m = index_memo,
1130
records = [(key, i_m) for key, (r, i_m, n)
1131
in position_map.iteritems()]
1133
for key, record, digest in self._read_records_iter(records):
1134
(record_details, index_memo, next) = position_map[key]
1135
record_map[key] = record, record_details, digest, next
1137
except errors.RetryWithNewPacks, e:
1138
self._access.reload_or_raise(e)
1124
1140
def _split_by_prefix(self, keys):
1125
1141
"""For the given keys, split them up based on their prefix.
1160
1176
if not self._index.has_graph:
1161
1177
# Cannot topological order when no graph has been stored.
1162
1178
ordering = 'unordered'
1180
remaining_keys = keys
1183
keys = set(remaining_keys)
1184
for content_factory in self._get_remaining_record_stream(keys,
1185
ordering, include_delta_closure):
1186
remaining_keys.discard(content_factory.key)
1187
yield content_factory
1189
except errors.RetryWithNewPacks, e:
1190
self._access.reload_or_raise(e)
1192
def _get_remaining_record_stream(self, keys, ordering,
1193
include_delta_closure):
1194
"""This function is the 'retry' portion for get_record_stream."""
1163
1195
if include_delta_closure:
1164
1196
positions = self._get_components_positions(keys, allow_missing=True)
1457
1489
pb = progress.DummyProgress()
1458
1490
keys = set(keys)
1459
1491
total = len(keys)
1460
# we don't care about inclusions, the caller cares.
1461
# but we need to setup a list of records to visit.
1462
# we need key, position, length
1464
build_details = self._index.get_build_details(keys)
1465
for key, details in build_details.iteritems():
1467
key_records.append((key, details[0]))
1469
records_iter = enumerate(self._read_records_iter(key_records))
1470
for (key_idx, (key, data, sha_value)) in records_iter:
1471
pb.update('Walking content.', key_idx, total)
1472
compression_parent = build_details[key][1]
1473
if compression_parent is None:
1475
line_iterator = self._factory.get_fulltext_content(data)
1478
line_iterator = self._factory.get_linedelta_content(data)
1479
# XXX: It might be more efficient to yield (key,
1480
# line_iterator) in the future. However for now, this is a simpler
1481
# change to integrate into the rest of the codebase. RBC 20071110
1482
for line in line_iterator:
1495
# we don't care about inclusions, the caller cares.
1496
# but we need to setup a list of records to visit.
1497
# we need key, position, length
1499
build_details = self._index.get_build_details(keys)
1500
for key, details in build_details.iteritems():
1502
key_records.append((key, details[0]))
1503
records_iter = enumerate(self._read_records_iter(key_records))
1504
for (key_idx, (key, data, sha_value)) in records_iter:
1505
pb.update('Walking content.', key_idx, total)
1506
compression_parent = build_details[key][1]
1507
if compression_parent is None:
1509
line_iterator = self._factory.get_fulltext_content(data)
1512
line_iterator = self._factory.get_linedelta_content(data)
1513
# Now that we are yielding the data for this key, remove it
1516
# XXX: It might be more efficient to yield (key,
1517
# line_iterator) in the future. However for now, this is a
1518
# simpler change to integrate into the rest of the
1519
# codebase. RBC 20071110
1520
for line in line_iterator:
1523
except errors.RetryWithNewPacks, e:
1524
self._access.reload_or_raise(e)
1484
1525
# If there are still keys we've not yet found, we look in the fallback
1485
1526
# vfs, and hope to find them there. Note that if the keys are found
1486
1527
# but had no changes or no content, the fallback may not return
2419
2459
class _DirectPackAccess(object):
2420
2460
"""Access to data in one or more packs with less translation."""
2422
def __init__(self, index_to_packs):
2462
def __init__(self, index_to_packs, reload_func=None):
2423
2463
"""Create a _DirectPackAccess object.
2425
2465
:param index_to_packs: A dict mapping index objects to the transport
2426
2466
and file names for obtaining data.
2467
:param reload_func: A function to call if we determine that the pack
2468
files have moved and we need to reload our caches. See
2469
bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
2428
2471
self._container_writer = None
2429
2472
self._write_index = None
2430
2473
self._indices = index_to_packs
2474
self._reload_func = reload_func
2432
2476
def add_raw_records(self, key_sizes, raw_data):
2433
2477
"""Add raw knit bytes to a storage area.
2479
2523
if current_index is not None:
2480
2524
request_lists.append((current_index, current_list))
2481
2525
for index, offsets in request_lists:
2482
transport, path = self._indices[index]
2483
reader = pack.make_readv_reader(transport, path, offsets)
2484
for names, read_func in reader.iter_records():
2485
yield read_func(None)
2527
transport, path = self._indices[index]
2529
# A KeyError here indicates that someone has triggered an index
2530
# reload, and this index has gone missing, we need to start
2532
if self._reload_func is None:
2533
# If we don't have a _reload_func there is nothing that can
2536
raise errors.RetryWithNewPacks(reload_occurred=True,
2537
exc_info=sys.exc_info())
2539
reader = pack.make_readv_reader(transport, path, offsets)
2540
for names, read_func in reader.iter_records():
2541
yield read_func(None)
2542
except errors.NoSuchFile:
2543
# A NoSuchFile error indicates that a pack file has gone
2544
# missing on disk, we need to trigger a reload, and start over.
2545
if self._reload_func is None:
2547
raise errors.RetryWithNewPacks(reload_occurred=False,
2548
exc_info=sys.exc_info())
2487
2550
def set_writer(self, writer, index, transport_packname):
2488
2551
"""Set a writer to use for adding data."""
2491
2554
self._container_writer = writer
2492
2555
self._write_index = index
2557
def reload_or_raise(self, retry_exc):
2558
"""Try calling the reload function, or re-raise the original exception.
2560
This should be called after _DirectPackAccess raises a
2561
RetryWithNewPacks exception. This function will handle the common logic
2562
of determining when the error is fatal versus being temporary.
2563
It will also make sure that the original exception is raised, rather
2564
than the RetryWithNewPacks exception.
2566
If this function returns, then the calling function should retry
2567
whatever operation was being performed. Otherwise an exception will
2570
:param retry_exc: A RetryWithNewPacks exception.
2573
if self._reload_func is None:
2575
elif not self._reload_func():
2576
# The reload claimed that nothing changed
2577
if not retry_exc.reload_occurred:
2578
# If there wasn't an earlier reload, then we really were
2579
# expecting to find changes. We didn't find them, so this is a
2583
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2584
raise exc_class, exc_value, exc_traceback
2495
2587
# Deprecated, use PatienceSequenceMatcher instead
2496
2588
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2769
2861
if len(self._knit._fallback_vfs) > 0:
2770
2862
# stacked knits can't use the fast path at present.
2771
2863
return self._simple_annotate(key)
2772
records = self._get_build_graph(key)
2773
if key in self._ghosts:
2774
raise errors.RevisionNotPresent(key, self._knit)
2775
self._annotate_records(records)
2776
return self._annotated_lines[key]
2866
records = self._get_build_graph(key)
2867
if key in self._ghosts:
2868
raise errors.RevisionNotPresent(key, self._knit)
2869
self._annotate_records(records)
2870
return self._annotated_lines[key]
2871
except errors.RetryWithNewPacks, e:
2872
self._knit._access.reload_or_raise(e)
2873
# The cached build_details are no longer valid
2874
self._all_build_details.clear()
2778
2876
def _simple_annotate(self, key):
2779
2877
"""Return annotated fulltext, rediffing from the full texts.