236
242
self.repo.signatures._index._add_callback = self.signature_index.add_callback
237
243
self.repo.texts._index._add_callback = self.text_index.add_callback
245
def _get_filtered_inv_stream(self, source_vf, keys):
246
"""Filter the texts of inventories, to find the chk pages."""
250
p_id_roots_set = set()
251
def _filter_inv_stream(stream):
252
for idx, record in enumerate(stream):
253
### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
254
bytes = record.get_bytes_as('fulltext')
255
chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
256
key = chk_inv.id_to_entry.key()
257
if key not in id_roots_set:
259
id_roots_set.add(key)
260
p_id_map = chk_inv.parent_id_basename_to_file_id
261
if p_id_map is not None:
263
if key not in p_id_roots_set:
264
p_id_roots_set.add(key)
265
p_id_roots.append(key)
267
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
268
return _filter_inv_stream(stream), id_roots, p_id_roots
270
def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots, pb=None):
271
# We want to stream the keys from 'id_roots', and things they
272
# reference, and then stream things from p_id_roots and things they
273
# reference, and then any remaining keys that we didn't get to.
275
# We also group referenced texts together, so if one root references a
276
# text with prefix 'a', and another root references a node with prefix
277
# 'a', we want to yield those nodes before we yield the nodes for 'b'
278
# This keeps 'similar' nodes together
280
# Note: We probably actually want multiple streams here, to help the
281
# client understand that the different levels won't compress well
283
# Test the difference between using one Group per level, and
284
# using 1 Group per prefix. (so '' (root) would get a group, then
285
# all the references to search-key 'a' would get a group, etc.)
286
remaining_keys = set(keys)
288
def _get_referenced_stream(root_keys):
291
keys_by_search_prefix = {}
292
remaining_keys.difference_update(cur_keys)
294
stream = source_vf.get_record_stream(cur_keys, 'as-requested',
297
for record in stream:
298
bytes = record.get_bytes_as('fulltext')
299
# We don't care about search_key_func for this code,
300
# because we only care about external references.
301
node = chk_map._deserialise(bytes, record.key,
302
search_key_func=None)
303
common_base = node._search_prefix
304
if isinstance(node, chk_map.InternalNode):
305
for prefix, value in node._items.iteritems():
306
assert isinstance(value, tuple)
307
if value not in next_keys:
308
keys_by_search_prefix.setdefault(prefix,
313
pb.update('chk node', counter[0])
316
# Double check that we won't be emitting any keys twice
317
next_keys = next_keys.intersection(remaining_keys)
319
for prefix in sorted(keys_by_search_prefix):
320
cur_keys.extend(keys_by_search_prefix[prefix])
321
for stream in _get_referenced_stream(id_roots):
323
for stream in _get_referenced_stream(p_id_roots):
326
trace.note('There were %d keys in the chk index, which'
327
' were not referenced from inventories',
329
stream = source_vf.get_record_stream(remaining_keys, 'unordered',
239
333
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
240
334
reload_func=None):
241
335
"""Execute a series of pack operations.
302
402
is_locked=self.repo.is_locked),
303
403
access=target_access,
304
404
delta=source_vf._delta)
305
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
306
target_vf.insert_record_stream(stream)
406
child_pb = ui.ui_factory.nested_progress_bar()
409
if vf_name == 'inventories':
410
stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
412
elif vf_name == 'chk_bytes':
413
for stream in self._get_chk_stream(source_vf, keys,
414
id_roots, p_id_roots,
416
target_vf.insert_record_stream(stream)
421
substream = source_vf.get_record_stream(keys, 'gc-optimal', True)
422
for idx, record in enumerate(substream):
423
child_pb.update(vf_name, idx, len(keys))
426
target_vf.insert_record_stream(stream)
307
429
new_pack._check_references() # shouldn't be needed