557
583
return new_pack.data_inserted() and self._data_changed
586
class GCCHKCanonicalizingPacker(GCCHKPacker):
587
"""A packer that ensures inventories have canonical-form CHK maps.
589
Ideally this would be part of reconcile, but it's very slow and rarely
590
needed. (It repairs repositories affected by
591
https://bugs.launchpad.net/bzr/+bug/522637).
594
def __init__(self, *args, **kwargs):
595
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
596
self._data_changed = False
598
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
599
"""Create and exhaust a stream, but don't insert it.
601
This is useful to get the side-effects of generating a stream.
603
self.pb.update('scanning %s' % (message,), pb_offset)
604
child_pb = ui.ui_factory.nested_progress_bar()
606
list(vf_to_stream(source_vf, keys, message, child_pb))
610
def _copy_inventory_texts(self):
611
source_vf, target_vf = self._build_vfs('inventory', True, True)
612
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
613
inventory_keys = source_vf.keys()
614
# First, copy the existing CHKs on the assumption that most of them
615
# will be correct. This will save us from having to reinsert (and
616
# recompress) these records later at the cost of perhaps preserving a
618
# (Iterate but don't insert _get_filtered_inv_stream to populate the
619
# variables needed by GCCHKPacker._copy_chk_texts.)
620
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
621
self._get_filtered_inv_stream, 2)
622
GCCHKPacker._copy_chk_texts(self)
623
# Now copy and fix the inventories, and any regenerated CHKs.
624
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
625
return self._get_filtered_canonicalizing_inv_stream(
626
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
627
self._copy_stream(source_vf, target_vf, inventory_keys,
628
'inventories', chk_canonicalizing_inv_stream, 4)
630
def _copy_chk_texts(self):
631
# No-op; in this class this happens during _copy_inventory_texts.
634
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
635
pb=None, source_chk_vf=None, target_chk_vf=None):
636
"""Filter the texts of inventories, regenerating CHKs to make sure they
639
total_keys = len(keys)
640
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
641
def _filtered_inv_stream():
642
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
643
search_key_name = None
644
for idx, record in enumerate(stream):
645
# Inventories should always be with revisions; assume success.
646
bytes = record.get_bytes_as('fulltext')
647
chk_inv = inventory.CHKInventory.deserialise(
648
source_chk_vf, bytes, record.key)
650
pb.update('inv', idx, total_keys)
651
chk_inv.id_to_entry._ensure_root()
652
if search_key_name is None:
653
# Find the name corresponding to the search_key_func
654
search_key_reg = chk_map.search_key_registry
655
for search_key_name, func in search_key_reg.iteritems():
656
if func == chk_inv.id_to_entry._search_key_func:
658
canonical_inv = inventory.CHKInventory.from_inventory(
659
target_chk_vf, chk_inv,
660
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
661
search_key_name=search_key_name)
662
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
664
'Non-canonical CHK map for id_to_entry of inv: %s '
665
'(root is %s, should be %s)' % (chk_inv.revision_id,
666
chk_inv.id_to_entry.key()[0],
667
canonical_inv.id_to_entry.key()[0]))
668
self._data_changed = True
669
p_id_map = chk_inv.parent_id_basename_to_file_id
670
p_id_map._ensure_root()
671
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
672
if p_id_map.key() != canon_p_id_map.key():
674
'Non-canonical CHK map for parent_id_to_basename of '
675
'inv: %s (root is %s, should be %s)'
676
% (chk_inv.revision_id, p_id_map.key()[0],
677
canon_p_id_map.key()[0]))
678
self._data_changed = True
679
yield versionedfile.ChunkedContentFactory(record.key,
680
record.parents, record.sha1,
681
canonical_inv.to_lines())
682
# We have finished processing all of the inventory records, we
683
# don't need these sets anymore
684
return _filtered_inv_stream()
686
def _use_pack(self, new_pack):
687
"""Override _use_pack to check for reconcile having changed content."""
688
return new_pack.data_inserted() and self._data_changed
560
691
class GCRepositoryPackCollection(RepositoryPackCollection):
562
693
pack_factory = GCPack
563
694
resumed_pack_factory = ResumedGCPack
696
def _check_new_inventories(self):
697
"""Detect missing inventories or chk root entries for the new revisions
700
:returns: list of strs, summarising any problems found. If the list is
701
empty no problems were found.
703
# Ensure that all revisions added in this write group have:
704
# - corresponding inventories,
705
# - chk root entries for those inventories,
706
# - and any present parent inventories have their chk root
708
# And all this should be independent of any fallback repository.
710
key_deps = self.repo.revisions._index._key_dependencies
711
new_revisions_keys = key_deps.get_new_keys()
712
no_fallback_inv_index = self.repo.inventories._index
713
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
714
no_fallback_texts_index = self.repo.texts._index
715
inv_parent_map = no_fallback_inv_index.get_parent_map(
717
# Are any inventories for corresponding to the new revisions missing?
718
corresponding_invs = set(inv_parent_map)
719
missing_corresponding = set(new_revisions_keys)
720
missing_corresponding.difference_update(corresponding_invs)
721
if missing_corresponding:
722
problems.append("inventories missing for revisions %s" %
723
(sorted(missing_corresponding),))
725
# Are any chk root entries missing for any inventories? This includes
726
# any present parent inventories, which may be used when calculating
727
# deltas for streaming.
728
all_inv_keys = set(corresponding_invs)
729
for parent_inv_keys in inv_parent_map.itervalues():
730
all_inv_keys.update(parent_inv_keys)
731
# Filter out ghost parents.
732
all_inv_keys.intersection_update(
733
no_fallback_inv_index.get_parent_map(all_inv_keys))
734
parent_invs_only_keys = all_inv_keys.symmetric_difference(
737
inv_ids = [key[-1] for key in all_inv_keys]
738
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
739
root_key_info = _build_interesting_key_sets(
740
self.repo, inv_ids, parent_invs_only_ids)
741
expected_chk_roots = root_key_info.all_keys()
742
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
744
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
745
if missing_chk_roots:
746
problems.append("missing referenced chk root keys: %s"
747
% (sorted(missing_chk_roots),))
748
# Don't bother checking any further.
750
# Find all interesting chk_bytes records, and make sure they are
751
# present, as well as the text keys they reference.
752
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
753
chk_bytes_no_fallbacks._search_key_func = \
754
self.repo.chk_bytes._search_key_func
755
chk_diff = chk_map.iter_interesting_nodes(
756
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
757
root_key_info.uninteresting_root_keys)
760
for record in _filter_text_keys(chk_diff, text_keys,
761
chk_map._bytes_to_text_key):
763
except errors.NoSuchRevision, e:
764
# XXX: It would be nice if we could give a more precise error here.
765
problems.append("missing chk node(s) for id_to_entry maps")
766
chk_diff = chk_map.iter_interesting_nodes(
767
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
768
root_key_info.uninteresting_pid_root_keys)
770
for interesting_rec, interesting_map in chk_diff:
772
except errors.NoSuchRevision, e:
774
"missing chk node(s) for parent_id_basename_to_file_id maps")
775
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
776
missing_text_keys = text_keys.difference(present_text_keys)
777
if missing_text_keys:
778
problems.append("missing text keys: %r"
779
% (sorted(missing_text_keys),))
565
782
def _execute_pack_operations(self, pack_operations,
566
783
_packer_class=GCCHKPacker,
567
784
reload_func=None):