806
807
versioned_files.writer.end()
810
def _get_total_build_size(self, keys, positions):
811
"""Determine the total bytes to build these keys.
813
(helper function because _KnitGraphIndex and _KndxIndex work the same, but
814
don't inherit from a common base.)
816
:param keys: Keys that we want to build
817
:param positions: dict of {key, (info, index_memo, comp_parent)} (such
818
as returned by _get_components_positions)
819
:return: Number of bytes to build those keys
821
all_build_index_memos = {}
825
for key in build_keys:
826
# This is mostly for the 'stacked' case
827
# Where we will be getting the data from a fallback
828
if key not in positions:
830
_, index_memo, compression_parent = positions[key]
831
all_build_index_memos[key] = index_memo
832
if compression_parent not in all_build_index_memos:
833
next_keys.add(compression_parent)
834
build_keys = next_keys
835
return sum([index_memo[2] for index_memo
836
in all_build_index_memos.itervalues()])
809
839
class KnitVersionedFiles(VersionedFiles):
810
840
"""Storage for many versioned files using knit compression.
1182
1212
records = [(key, i_m) for key, (r, i_m, n)
1183
1213
in position_map.iteritems()]
1214
# Sort by the index memo, so that we request records from the
1215
# same pack file together, and in forward-sorted order
1216
records.sort(key=operator.itemgetter(1))
1184
1217
raw_record_map = {}
1185
1218
for key, data in self._read_records_iter_unchecked(records):
1186
1219
(record_details, index_memo, next) = position_map[key]
1189
1222
except errors.RetryWithNewPacks, e:
1190
1223
self._access.reload_or_raise(e)
1192
def _split_by_prefix(self, keys):
1226
def _split_by_prefix(cls, keys):
1193
1227
"""For the given keys, split them up based on their prefix.
1195
1229
To keep memory pressure somewhat under control, split the
1198
1232
This should be revisited if _get_content_maps() can ever cross
1199
1233
file-id boundaries.
1235
The keys for a given file_id are kept in the same relative order.
1236
Ordering between file_ids is not, though prefix_order will return the
1237
order that the key was first seen.
1201
1239
:param keys: An iterable of key tuples
1202
:return: A dict of {prefix: [key_list]}
1240
:return: (split_map, prefix_order)
1241
split_map A dictionary mapping prefix => keys
1242
prefix_order The order that we saw the various prefixes
1204
1244
split_by_prefix = {}
1205
1246
for key in keys:
1206
1247
if len(key) == 1:
1207
split_by_prefix.setdefault('', []).append(key)
1209
split_by_prefix.setdefault(key[0], []).append(key)
1210
return split_by_prefix
1252
if prefix in split_by_prefix:
1253
split_by_prefix[prefix].append(key)
1255
split_by_prefix[prefix] = [key]
1256
prefix_order.append(prefix)
1257
return split_by_prefix, prefix_order
1259
def _group_keys_for_io(self, keys, non_local_keys, positions,
1260
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1261
"""For the given keys, group them into 'best-sized' requests.
1263
The idea is to avoid making 1 request per file, but to never try to
1264
unpack an entire 1.5GB source tree in a single pass. Also when
1265
possible, we should try to group requests to the same pack file
1268
:return: list of (keys, non_local) tuples that indicate what keys
1269
should be fetched next.
1271
# TODO: Ideally we would group on 2 factors. We want to extract texts
1272
# from the same pack file together, and we want to extract all
1273
# the texts for a given build-chain together. Ultimately it
1274
# probably needs a better global view.
1275
total_keys = len(keys)
1276
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1277
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1279
cur_non_local = set()
1283
for prefix in prefix_order:
1284
keys = prefix_split_keys[prefix]
1285
non_local = prefix_split_non_local_keys.get(prefix, [])
1287
this_size = self._index._get_total_build_size(keys, positions)
1288
cur_size += this_size
1289
cur_keys.extend(keys)
1290
cur_non_local.update(non_local)
1291
if cur_size > _min_buffer_size:
1292
result.append((cur_keys, cur_non_local))
1293
sizes.append(cur_size)
1295
cur_non_local = set()
1298
result.append((cur_keys, cur_non_local))
1299
sizes.append(cur_size)
1300
trace.mutter('Collapsed %d keys into %d requests w/ %d file_ids'
1301
' w/ sizes: %s', total_keys, len(result),
1302
len(prefix_split_keys), sizes)
1212
1305
def get_record_stream(self, keys, ordering, include_delta_closure):
1213
1306
"""Get a stream of records for keys.
1334
1427
# XXX: get_content_maps performs its own index queries; allow state
1335
1428
# to be passed in.
1336
1429
non_local_keys = needed_from_fallback - absent_keys
1337
prefix_split_keys = self._split_by_prefix(present_keys)
1338
prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1339
for prefix, keys in prefix_split_keys.iteritems():
1340
non_local = prefix_split_non_local_keys.get(prefix, [])
1341
non_local = set(non_local)
1342
generator = _VFContentMapGenerator(self, keys, non_local,
1430
for keys, non_local_keys in self._group_keys_for_io(present_keys,
1433
generator = _VFContentMapGenerator(self, keys, non_local_keys,
1344
1435
for record in generator.get_record_stream():