913
892
fulltext_size = None
914
893
for count in xrange(self._max_delta_chain):
916
# Note that this only looks in the index of this particular
917
# KnitVersionedFiles, not in the fallbacks. This ensures that
918
# we won't store a delta spanning physical repository
920
build_details = self._index.get_build_details([parent])
921
parent_details = build_details[parent]
922
except (RevisionNotPresent, KeyError), e:
923
# Some basis is not locally present: always fulltext
925
index_memo, compression_parent, _, _ = parent_details
926
_, _, size = index_memo
927
if compression_parent is None:
894
# XXX: Collapse these two queries:
895
method = self._index.get_method(parent)
896
index, pos, size = self._index.get_position(parent)
897
if method == 'fulltext':
928
898
fulltext_size = size
930
900
delta_size += size
931
901
# We don't explicitly check for presence because this is in an
932
902
# inner loop, and if it's missing it'll fail anyhow.
933
parent = compression_parent
903
# TODO: This should be asking for compression parent, not graph
905
parent = self._index.get_parent_map([parent])[parent][0]
935
907
# We couldn't find a fulltext, so we must create a new one
1006
974
keys = list(keys)
1007
975
multiple_versions = len(keys) != 1
1008
record_map = self._get_record_map(keys, allow_missing=True)
976
record_map = self._get_record_map(keys)
1011
979
content_map = {}
1012
980
final_content = {}
1013
if nonlocal_keys is None:
1014
nonlocal_keys = set()
1016
nonlocal_keys = frozenset(nonlocal_keys)
1017
missing_keys = set(nonlocal_keys)
1018
for source in self._fallback_vfs:
1019
if not missing_keys:
1021
for record in source.get_record_stream(missing_keys,
1023
if record.storage_kind == 'absent':
1025
missing_keys.remove(record.key)
1026
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
text_map[record.key] = lines
1028
content_map[record.key] = PlainKnitContent(lines, record.key)
1029
if record.key in keys:
1030
final_content[record.key] = content_map[record.key]
1031
981
for key in keys:
1032
if key in nonlocal_keys:
1037
984
while cursor is not None:
1039
record, record_details, digest, next = record_map[cursor]
1041
raise RevisionNotPresent(cursor, self)
985
record, record_details, digest, next = record_map[cursor]
1042
986
components.append((cursor, record, record_details, digest))
1044
987
if cursor in content_map:
1045
# no need to plan further back
1046
components.append((cursor, None, None, None))
1050
992
for (component_id, record, record_details,
1064
1006
text = content.text()
1065
1007
actual_sha = sha_strings(text)
1066
1008
if actual_sha != digest:
1067
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1009
raise KnitCorrupt(self,
1011
'\n of reconstructed text does not match'
1013
'\n for version %s' %
1014
(actual_sha, digest, key))
1068
1015
text_map[key] = text
1069
1016
return text_map, final_content
1071
1018
def get_parent_map(self, keys):
1072
"""Get a map of the graph parents of keys.
1019
"""Get a map of the parents of keys.
1074
1021
:param keys: The keys to look up parents for.
1075
1022
:return: A mapping from keys to parents. Absent keys are absent from
1078
return self._get_parent_map_with_sources(keys)[0]
1080
def _get_parent_map_with_sources(self, keys):
1081
"""Get a map of the parents of keys.
1083
:param keys: The keys to look up parents for.
1084
:return: A tuple. The first element is a mapping from keys to parents.
1085
Absent keys are absent from the mapping. The second element is a
1086
list with the locations each key was found in. The first element
1087
is the in-this-knit parents, the second the first fallback source,
1091
sources = [self._index] + self._fallback_vfs
1094
for source in sources:
1097
new_result = source.get_parent_map(missing)
1098
source_results.append(new_result)
1099
result.update(new_result)
1100
missing.difference_update(set(new_result))
1101
return result, source_results
1103
def _get_record_map(self, keys, allow_missing=False):
1025
return self._index.get_parent_map(keys)
1027
def _get_record_map(self, keys):
1104
1028
"""Produce a dictionary of knit records.
1106
1030
:return: {key:(record, record_details, digest, next)}
1114
1038
build-parent of the version, i.e. the leftmost ancestor.
1115
1039
Will be None if the record is not a delta.
1116
:param keys: The keys to build a map for
1117
:param allow_missing: If some records are missing, rather than
1118
error, just return the data that could be generated.
1120
# This retries the whole request if anything fails. Potentially we
1121
# could be a bit more selective. We could track the keys whose records
1122
# we have successfully found, and then only request the new records
1123
# from there. However, _get_components_positions grabs the whole build
1124
# chain, which means we'll likely try to grab the same records again
1125
# anyway. Also, can the build chains change as part of a pack
1126
# operation? We wouldn't want to end up with a broken chain.
1129
position_map = self._get_components_positions(keys,
1130
allow_missing=allow_missing)
1131
# key = component_id, r = record_details, i_m = index_memo,
1133
records = [(key, i_m) for key, (r, i_m, n)
1134
in position_map.iteritems()]
1136
for key, record, digest in self._read_records_iter(records):
1137
(record_details, index_memo, next) = position_map[key]
1138
record_map[key] = record, record_details, digest, next
1140
except errors.RetryWithNewPacks, e:
1141
self._access.reload_or_raise(e)
1143
def _split_by_prefix(self, keys):
1144
"""For the given keys, split them up based on their prefix.
1146
To keep memory pressure somewhat under control, split the
1147
requests back into per-file-id requests, otherwise "bzr co"
1148
extracts the full tree into memory before writing it to disk.
1149
This should be revisited if _get_content_maps() can ever cross
1152
:param keys: An iterable of key tuples
1153
:return: A dict of {prefix: [key_list]}
1155
split_by_prefix = {}
1158
split_by_prefix.setdefault('', []).append(key)
1160
split_by_prefix.setdefault(key[0], []).append(key)
1161
return split_by_prefix
1041
position_map = self._get_components_positions(keys)
1042
# key = component_id, r = record_details, i_m = index_memo, n = next
1043
records = [(key, i_m) for key, (r, i_m, n)
1044
in position_map.iteritems()]
1046
for key, record, digest in \
1047
self._read_records_iter(records):
1048
(record_details, index_memo, next) = position_map[key]
1049
record_map[key] = record, record_details, digest, next
1163
1052
def get_record_stream(self, keys, ordering, include_delta_closure):
1164
1053
"""Get a stream of records for keys.
1228
1098
chain.append(positions[chain[-1]][2])
1229
1099
except KeyError:
1230
1100
# missing basis component
1231
needed_from_fallback.add(chain[-1])
1234
1103
for chain_key in chain[:-1]:
1235
1104
reconstructable_keys[chain_key] = result
1237
needed_from_fallback.add(key)
1238
# Double index lookups here : need a unified api ?
1239
global_map, parent_maps = self._get_parent_map_with_sources(keys)
1240
if ordering == 'topological':
1241
# Global topological sort
1242
present_keys = tsort.topo_sort(global_map)
1243
# Now group by source:
1245
current_source = None
1246
for key in present_keys:
1247
for parent_map in parent_maps:
1248
if key in parent_map:
1249
key_source = parent_map
1251
if current_source is not key_source:
1252
source_keys.append((key_source, []))
1253
current_source = key_source
1254
source_keys[-1][1].append(key)
1256
if ordering != 'unordered':
1257
raise AssertionError('valid values for ordering are:'
1258
' "unordered" or "topological" not: %r'
1260
# Just group by source; remote sources first.
1263
for parent_map in reversed(parent_maps):
1264
source_keys.append((parent_map, []))
1265
for key in parent_map:
1266
present_keys.append(key)
1267
source_keys[-1][1].append(key)
1268
# We have been requested to return these records in an order that
1269
# suits us. So we ask the index to give us an optimally sorted
1271
for source, sub_keys in source_keys:
1272
if source is parent_maps[0]:
1273
# Only sort the keys for this VF
1274
self._index._sort_keys_by_io(sub_keys, positions)
1275
absent_keys = keys - set(global_map)
1106
absent_keys.add(key)
1276
1107
for key in absent_keys:
1277
1108
yield AbsentContentFactory(key)
1278
1109
# restrict our view to the keys we can answer.
1110
keys = keys - absent_keys
1111
# Double index lookups here : need a unified api ?
1112
parent_map = self.get_parent_map(keys)
1113
if ordering == 'topological':
1114
present_keys = topo_sort(parent_map)
1279
1117
# XXX: Memory: TODO: batch data here to cap buffered data at (say) 1MB.
1280
# XXX: At that point we need to consider the impact of double reads by
1281
# utilising components multiple times.
1118
# XXX: At that point we need to consider double reads by utilising
1119
# components multiple times.
1282
1120
if include_delta_closure:
1283
1121
# XXX: get_content_maps performs its own index queries; allow state
1284
1122
# to be passed in.
1285
non_local_keys = needed_from_fallback - absent_keys
1286
prefix_split_keys = self._split_by_prefix(present_keys)
1287
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1288
for prefix, keys in prefix_split_keys.iteritems():
1289
non_local = prefix_split_non_local_keys.get(prefix, [])
1290
non_local = set(non_local)
1291
text_map, _ = self._get_content_maps(keys, non_local)
1293
lines = text_map.pop(key)
1294
yield ChunkedContentFactory(key, global_map[key], None,
1123
text_map, _ = self._get_content_maps(present_keys)
1124
for key in present_keys:
1125
yield FulltextContentFactory(key, parent_map[key], None,
1126
''.join(text_map[key]))
1297
for source, keys in source_keys:
1298
if source is parent_maps[0]:
1299
# this KnitVersionedFiles
1300
records = [(key, positions[key][1]) for key in keys]
1301
for key, raw_data, sha1 in self._read_records_iter_raw(records):
1302
(record_details, index_memo, _) = positions[key]
1303
yield KnitContentFactory(key, global_map[key],
1304
record_details, sha1, raw_data, self._factory.annotated, None)
1306
vf = self._fallback_vfs[parent_maps.index(source) - 1]
1307
for record in vf.get_record_stream(keys, ordering,
1308
include_delta_closure):
1128
records = [(key, positions[key][1]) for key in present_keys]
1129
for key, raw_data, sha1 in self._read_records_iter_raw(records):
1130
(record_details, index_memo, _) = positions[key]
1131
yield KnitContentFactory(key, parent_map[key],
1132
record_details, sha1, raw_data, self._factory.annotated, None)
1311
1134
def get_sha1s(self, keys):
1312
1135
"""See VersionedFiles.get_sha1s()."""
1314
record_map = self._get_record_map(missing, allow_missing=True)
1316
for key, details in record_map.iteritems():
1317
if key not in missing:
1319
# record entry 2 is the 'digest'.
1320
result[key] = details[2]
1321
missing.difference_update(set(result))
1322
for source in self._fallback_vfs:
1325
new_result = source.get_sha1s(missing)
1326
result.update(new_result)
1327
missing.difference_update(set(new_result))
1136
record_map = self._get_record_map(keys)
1137
# record entry 2 is the 'digest'.
1138
return [record_map[key][2] for key in keys]
1330
1140
def insert_record_stream(self, stream):
1331
1141
"""Insert a record stream into this container.
1368
1175
# can't generate annotations from new deltas until their basis parent
1369
1176
# is present anyway, so we get away with not needing an index that
1370
1177
# includes the new keys.
1372
# See <http://launchpad.net/bugs/300177> about ordering of compression
1373
# parents in the records - to be conservative, we insist that all
1374
# parents must be present to avoid expanding to a fulltext.
1376
1178
# key = basis_parent, value = index entry to add
1377
1179
buffered_index_entries = {}
1378
1180
for record in stream:
1379
1181
parents = record.parents
1380
if record.storage_kind in delta_types:
1381
# TODO: eventually the record itself should track
1382
# compression_parent
1383
compression_parent = parents[0]
1385
compression_parent = None
1386
1182
# Raise an error when a record is missing.
1387
1183
if record.storage_kind == 'absent':
1388
1184
raise RevisionNotPresent([record.key], self)
1389
elif ((record.storage_kind in knit_types)
1390
and (compression_parent is None
1391
or not self._fallback_vfs
1392
or self._index.has_key(compression_parent)
1393
or not self.has_key(compression_parent))):
1394
# we can insert the knit record literally if either it has no
1395
# compression parent OR we already have its basis in this kvf
1396
# OR the basis is not present even in the fallbacks. In the
1397
# last case it will either turn up later in the stream and all
1398
# will be well, or it won't turn up at all and we'll raise an
1401
# TODO: self.has_key is somewhat redundant with
1402
# self._index.has_key; we really want something that directly
1403
# asks if it's only present in the fallbacks. -- mbp 20081119
1185
if record.storage_kind in knit_types:
1404
1186
if record.storage_kind not in native_types:
1406
1188
adapter_key = (record.storage_kind, "knit-delta-gz")
1428
1210
index_entry = (record.key, options, access_memo, parents)
1429
1211
buffered = False
1430
1212
if 'fulltext' not in options:
1431
# Not a fulltext, so we need to make sure the compression
1432
# parent will also be present.
1213
basis_parent = parents[0]
1433
1214
# Note that pack backed knits don't need to buffer here
1434
1215
# because they buffer all writes to the transaction level,
1435
1216
# but we don't expose that difference at the index level. If
1436
1217
# the query here has sufficient cost to show up in
1437
1218
# profiling we should do that.
1439
# They're required to be physically in this
1440
# KnitVersionedFiles, not in a fallback.
1441
if not self._index.has_key(compression_parent):
1219
if basis_parent not in self.get_parent_map([basis_parent]):
1442
1220
pending = buffered_index_entries.setdefault(
1443
compression_parent, [])
1444
1222
pending.append(index_entry)
1445
1223
buffered = True
1446
1224
if not buffered:
1447
1225
self._index.add_records([index_entry])
1448
elif record.storage_kind == 'chunked':
1449
self.add_lines(record.key, parents,
1450
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
1226
elif record.storage_kind == 'fulltext':
1452
1227
self.add_lines(record.key, parents,
1453
1228
split_lines(record.get_bytes_as('fulltext')))
1455
# Not a fulltext, and not suitable for direct insertion as a
1456
# delta, either because it's not the right format, or this
1457
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
# 0) or because it depends on a base only present in the
1460
1230
adapter_key = record.storage_kind, 'fulltext'
1461
1231
adapter = get_adapter(adapter_key)
1462
1232
lines = split_lines(adapter.get_bytes(
1497
1265
is an iterator).
1500
* Lines are normalised by the underlying store: they will all have \\n
1268
* Lines are normalised by the underlying store: they will all have \n
1502
1270
* Lines are returned in arbitrary order.
1503
* If a requested key did not change any lines (or didn't have any
1504
lines), it may not be mentioned at all in the result.
1506
1272
:return: An iterator over (line, key).
1509
1275
pb = progress.DummyProgress()
1510
1276
keys = set(keys)
1515
# we don't care about inclusions, the caller cares.
1516
# but we need to setup a list of records to visit.
1517
# we need key, position, length
1519
build_details = self._index.get_build_details(keys)
1520
for key, details in build_details.iteritems():
1522
key_records.append((key, details[0]))
1523
records_iter = enumerate(self._read_records_iter(key_records))
1524
for (key_idx, (key, data, sha_value)) in records_iter:
1525
pb.update('Walking content.', key_idx, total)
1526
compression_parent = build_details[key][1]
1527
if compression_parent is None:
1529
line_iterator = self._factory.get_fulltext_content(data)
1532
line_iterator = self._factory.get_linedelta_content(data)
1533
# Now that we are yielding the data for this key, remove it
1536
# XXX: It might be more efficient to yield (key,
1537
# line_iterator) in the future. However for now, this is a
1538
# simpler change to integrate into the rest of the
1539
# codebase. RBC 20071110
1540
for line in line_iterator:
1543
except errors.RetryWithNewPacks, e:
1544
self._access.reload_or_raise(e)
1545
# If there are still keys we've not yet found, we look in the fallback
1546
# vfs, and hope to find them there. Note that if the keys are found
1547
# but had no changes or no content, the fallback may not return
1549
if keys and not self._fallback_vfs:
1550
# XXX: strictly the second parameter is meant to be the file id
1551
# but it's not easily accessible here.
1552
raise RevisionNotPresent(keys, repr(self))
1553
for source in self._fallback_vfs:
1557
for line, key in source.iter_lines_added_or_present_in_keys(keys):
1558
source_keys.add(key)
1277
# filter for available keys
1278
parent_map = self.get_parent_map(keys)
1279
if len(parent_map) != len(keys):
1280
missing = set(parent_map) - requested_keys
1281
raise RevisionNotPresent(key, self.filename)
1282
# we don't care about inclusions, the caller cares.
1283
# but we need to setup a list of records to visit.
1284
# we need key, position, length
1286
build_details = self._index.get_build_details(keys)
1289
key_records.append((key, build_details[key][0]))
1290
total = len(key_records)
1291
records_iter = enumerate(self._read_records_iter(key_records))
1292
for (key_idx, (key, data, sha_value)) in records_iter:
1293
pb.update('Walking content.', key_idx, total)
1294
compression_parent = build_details[key][1]
1295
if compression_parent is None:
1297
line_iterator = self._factory.get_fulltext_content(data)
1300
line_iterator = self._factory.get_linedelta_content(data)
1301
# XXX: It might be more efficient to yield (key,
1302
# line_iterator) in the future. However for now, this is a simpler
1303
# change to integrate into the rest of the codebase. RBC 20071110
1304
for line in line_iterator:
1559
1305
yield line, key
1560
keys.difference_update(source_keys)
1561
1306
pb.update('Walking content.', total, total)
1563
1308
def _make_line_delta(self, delta_seq, new_content):
2142
1877
self._mode = 'r'
2144
def _sort_keys_by_io(self, keys, positions):
2145
"""Figure out an optimal order to read the records for the given keys.
2147
Sort keys, grouped by index and sorted by position.
2149
:param keys: A list of keys whose records we want to read. This will be
2151
:param positions: A dict, such as the one returned by
2152
_get_components_positions()
2155
def get_sort_key(key):
2156
index_memo = positions[key][1]
2157
# Group by prefix and position. index_memo[0] is the key, so it is
2158
# (file_id, revision_id) and we don't want to sort on revision_id,
2159
# index_memo[1] is the position, and index_memo[2] is the size,
2160
# which doesn't matter for the sort
2161
return index_memo[0][:-1], index_memo[1]
2162
return keys.sort(key=get_sort_key)
2164
1879
def _split_key(self, key):
2165
1880
"""Split key into a prefix and suffix."""
2166
1881
return key[:-1], key[-1]
2413
2123
self._check_read()
2414
2124
return [node[1] for node in self._graph_index.iter_all_entries()]
2416
missing_keys = _mod_index._missing_keys_from_parent_map
2418
2126
def _node_to_position(self, node):
2419
2127
"""Convert an index value to position details."""
2420
2128
bits = node[2][1:].split(' ')
2421
2129
return node[0], int(bits[0]), int(bits[1])
2423
def _sort_keys_by_io(self, keys, positions):
2424
"""Figure out an optimal order to read the records for the given keys.
2426
Sort keys, grouped by index and sorted by position.
2428
:param keys: A list of keys whose records we want to read. This will be
2430
:param positions: A dict, such as the one returned by
2431
_get_components_positions()
2434
def get_index_memo(key):
2435
# index_memo is at offset [1]. It is made up of (GraphIndex,
2436
# position, size). GI is an object, which will be unique for each
2437
# pack file. This causes us to group by pack file, then sort by
2438
# position. Size doesn't matter, but it isn't worth breaking up the
2440
return positions[key][1]
2441
return keys.sort(key=get_index_memo)
2444
2132
class _KnitKeyAccess(object):
2445
2133
"""Access to records in .knit files."""
2583
2267
if current_index is not None:
2584
2268
request_lists.append((current_index, current_list))
2585
2269
for index, offsets in request_lists:
2587
transport, path = self._indices[index]
2589
# A KeyError here indicates that someone has triggered an index
2590
# reload, and this index has gone missing, we need to start
2592
if self._reload_func is None:
2593
# If we don't have a _reload_func there is nothing that can
2596
raise errors.RetryWithNewPacks(index,
2597
reload_occurred=True,
2598
exc_info=sys.exc_info())
2600
reader = pack.make_readv_reader(transport, path, offsets)
2601
for names, read_func in reader.iter_records():
2602
yield read_func(None)
2603
except errors.NoSuchFile:
2604
# A NoSuchFile error indicates that a pack file has gone
2605
# missing on disk, we need to trigger a reload, and start over.
2606
if self._reload_func is None:
2608
raise errors.RetryWithNewPacks(transport.abspath(path),
2609
reload_occurred=False,
2610
exc_info=sys.exc_info())
2270
transport, path = self._indices[index]
2271
reader = pack.make_readv_reader(transport, path, offsets)
2272
for names, read_func in reader.iter_records():
2273
yield read_func(None)
2612
2275
def set_writer(self, writer, index, transport_packname):
2613
2276
"""Set a writer to use for adding data."""
2616
2279
self._container_writer = writer
2617
2280
self._write_index = index
2619
def reload_or_raise(self, retry_exc):
2620
"""Try calling the reload function, or re-raise the original exception.
2622
This should be called after _DirectPackAccess raises a
2623
RetryWithNewPacks exception. This function will handle the common logic
2624
of determining when the error is fatal versus being temporary.
2625
It will also make sure that the original exception is raised, rather
2626
than the RetryWithNewPacks exception.
2628
If this function returns, then the calling function should retry
2629
whatever operation was being performed. Otherwise an exception will
2632
:param retry_exc: A RetryWithNewPacks exception.
2635
if self._reload_func is None:
2637
elif not self._reload_func():
2638
# The reload claimed that nothing changed
2639
if not retry_exc.reload_occurred:
2640
# If there wasn't an earlier reload, then we really were
2641
# expecting to find changes. We didn't find them, so this is a
2645
exc_class, exc_value, exc_traceback = retry_exc.exc_info
2646
raise exc_class, exc_value, exc_traceback
2649
2283
# Deprecated, use PatienceSequenceMatcher instead
2650
2284
KnitSequenceMatcher = patiencediff.PatienceSequenceMatcher
2921
2552
:param key: The key to annotate.
2923
if len(self._knit._fallback_vfs) > 0:
2924
# stacked knits can't use the fast path at present.
2925
return self._simple_annotate(key)
2928
records = self._get_build_graph(key)
2929
if key in self._ghosts:
2930
raise errors.RevisionNotPresent(key, self._knit)
2931
self._annotate_records(records)
2932
return self._annotated_lines[key]
2933
except errors.RetryWithNewPacks, e:
2934
self._knit._access.reload_or_raise(e)
2935
# The cached build_details are no longer valid
2936
self._all_build_details.clear()
2938
def _simple_annotate(self, key):
2939
"""Return annotated fulltext, rediffing from the full texts.
2941
This is slow but makes no assumptions about the repository
2942
being able to produce line deltas.
2944
# TODO: this code generates a parent maps of present ancestors; it
2945
# could be split out into a separate method, and probably should use
2946
# iter_ancestry instead. -- mbp and robertc 20080704
2947
graph = _mod_graph.Graph(self._knit)
2948
head_cache = _mod_graph.FrozenHeadsCache(graph)
2949
search = graph._make_breadth_first_searcher([key])
2953
present, ghosts = search.next_with_ghosts()
2954
except StopIteration:
2956
keys.update(present)
2957
parent_map = self._knit.get_parent_map(keys)
2959
reannotate = annotate.reannotate
2960
for record in self._knit.get_record_stream(keys, 'topological', True):
2962
fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
2963
parents = parent_map[key]
2964
if parents is not None:
2965
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
2968
parent_cache[key] = list(
2969
reannotate(parent_lines, fulltext, key, None, head_cache))
2971
return parent_cache[key]
2554
records = self._get_build_graph(key)
2555
if key in self._ghosts:
2973
2556
raise errors.RevisionNotPresent(key, self._knit)
2557
self._annotate_records(records)
2558
return self._annotated_lines[key]