556
568
revision_ids=revision_ids,
557
569
reload_func=reload_func)
571
def _pack_map_and_index_list(self, index_attribute):
572
"""Convert a list of packs to an index pack map and index list.
574
:param index_attribute: The attribute that the desired index is found
576
:return: A tuple (map, list) where map contains the dict from
577
index:pack_tuple, and list contains the indices in the preferred
582
for pack_obj in self.packs:
583
index = getattr(pack_obj, index_attribute)
584
indices.append(index)
585
pack_map[index] = pack_obj
586
return pack_map, indices
588
def _index_contents(self, indices, key_filter=None):
589
"""Get an iterable of the index contents from a pack_map.
591
:param indices: The list of indices to query
592
:param key_filter: An optional filter to limit the keys returned.
594
all_index = CombinedGraphIndex(indices)
595
if key_filter is None:
596
return all_index.iter_all_entries()
598
return all_index.iter_entries(key_filter)
600
def _copy_nodes(self, nodes, index_map, writer, write_index,
602
"""Copy knit nodes between packs with no graph references.
604
:param output_lines: Output full texts of copied items.
606
pb = ui.ui_factory.nested_progress_bar()
608
return self._do_copy_nodes(nodes, index_map, writer,
609
write_index, pb, output_lines=output_lines)
613
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
615
# for record verification
616
knit = KnitVersionedFiles(None, None)
617
# plan a readv on each source pack:
619
nodes = sorted(nodes)
620
# how to map this into knit.py - or knit.py into this?
621
# we don't want the typical knit logic, we want grouping by pack
622
# at this point - perhaps a helper library for the following code
623
# duplication points?
625
for index, key, value in nodes:
626
if index not in request_groups:
627
request_groups[index] = []
628
request_groups[index].append((key, value))
630
pb.update("Copied record", record_index, len(nodes))
631
for index, items in request_groups.iteritems():
632
pack_readv_requests = []
633
for key, value in items:
634
# ---- KnitGraphIndex.get_position
635
bits = value[1:].split(' ')
636
offset, length = int(bits[0]), int(bits[1])
637
pack_readv_requests.append((offset, length, (key, value[0])))
638
# linear scan up the pack
639
pack_readv_requests.sort()
641
pack_obj = index_map[index]
642
transport, path = pack_obj.access_tuple()
644
reader = pack.make_readv_reader(transport, path,
645
[offset[0:2] for offset in pack_readv_requests])
646
except errors.NoSuchFile:
647
if self._reload_func is not None:
650
for (names, read_func), (_1, _2, (key, eol_flag)) in \
651
izip(reader.iter_records(), pack_readv_requests):
652
raw_data = read_func(None)
653
# check the header only
654
if output_lines is not None:
655
output_lines(knit._parse_record(key[-1], raw_data)[0])
657
df, _ = knit._parse_record_header(key, raw_data)
659
pos, size = writer.add_bytes_record(raw_data, names)
660
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
661
pb.update("Copied record", record_index)
664
def _copy_nodes_graph(self, index_map, writer, write_index,
665
readv_group_iter, total_items, output_lines=False):
666
"""Copy knit nodes between packs.
668
:param output_lines: Return lines present in the copied data as
669
an iterator of line,version_id.
671
pb = ui.ui_factory.nested_progress_bar()
673
for result in self._do_copy_nodes_graph(index_map, writer,
674
write_index, output_lines, pb, readv_group_iter, total_items):
677
# Python 2.4 does not permit try:finally: in a generator.
683
def _do_copy_nodes_graph(self, index_map, writer, write_index,
684
output_lines, pb, readv_group_iter, total_items):
685
# for record verification
686
knit = KnitVersionedFiles(None, None)
687
# for line extraction when requested (inventories only)
689
factory = KnitPlainFactory()
691
pb.update("Copied record", record_index, total_items)
692
for index, readv_vector, node_vector in readv_group_iter:
694
pack_obj = index_map[index]
695
transport, path = pack_obj.access_tuple()
697
reader = pack.make_readv_reader(transport, path, readv_vector)
698
except errors.NoSuchFile:
699
if self._reload_func is not None:
702
for (names, read_func), (key, eol_flag, references) in \
703
izip(reader.iter_records(), node_vector):
704
raw_data = read_func(None)
706
# read the entire thing
707
content, _ = knit._parse_record(key[-1], raw_data)
708
if len(references[-1]) == 0:
709
line_iterator = factory.get_fulltext_content(content)
711
line_iterator = factory.get_linedelta_content(content)
712
for line in line_iterator:
715
# check the header only
716
df, _ = knit._parse_record_header(key, raw_data)
718
pos, size = writer.add_bytes_record(raw_data, names)
719
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
720
pb.update("Copied record", record_index)
723
def _process_inventory_lines(self, inv_lines):
724
"""Use up the inv_lines generator and setup a text key filter."""
725
repo = self._pack_collection.repo
726
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
727
inv_lines, self.revision_keys)
729
for fileid, file_revids in fileid_revisions.iteritems():
730
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
731
self._text_filter = text_filter
733
def _copy_inventory_texts(self):
734
# select inventory keys
735
inv_keys = self._revision_keys # currently the same keyspace, and note that
736
# querying for keys here could introduce a bug where an inventory item
737
# is missed, so do not change it to query separately without cross
738
# checking like the text key check below.
739
inventory_index_map, inventory_indices = self._pack_map_and_index_list(
741
inv_nodes = self._index_contents(inventory_indices, inv_keys)
742
# copy inventory keys and adjust values
743
# XXX: Should be a helper function to allow different inv representation
745
self.pb.update("Copying inventory texts", 2)
746
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
747
# Only grab the output lines if we will be processing them
748
output_lines = bool(self.revision_ids)
749
inv_lines = self._copy_nodes_graph(inventory_index_map,
750
self.new_pack._writer, self.new_pack.inventory_index,
751
readv_group_iter, total_items, output_lines=output_lines)
752
if self.revision_ids:
753
self._process_inventory_lines(inv_lines)
755
# eat the iterator to cause it to execute.
757
self._text_filter = None
758
if 'pack' in debug.debug_flags:
759
trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
760
time.ctime(), self._pack_collection._upload_transport.base,
761
self.new_pack.random_name,
762
self.new_pack.inventory_index.key_count(),
763
time.time() - self.new_pack.start_time)
765
def _update_pack_order(self, entries, index_to_pack_map):
766
"""Determine how we want our packs to be ordered.
768
This changes the sort order of the self.packs list so that packs unused
769
by 'entries' will be at the end of the list, so that future requests
770
can avoid probing them. Used packs will be at the front of the
771
self.packs list, in the order of their first use in 'entries'.
773
:param entries: A list of (index, ...) tuples
774
:param index_to_pack_map: A mapping from index objects to pack objects.
778
for entry in entries:
780
if index not in seen_indexes:
781
packs.append(index_to_pack_map[index])
782
seen_indexes.add(index)
783
if len(packs) == len(self.packs):
784
if 'pack' in debug.debug_flags:
785
trace.mutter('Not changing pack list, all packs used.')
787
seen_packs = set(packs)
788
for pack in self.packs:
789
if pack not in seen_packs:
792
if 'pack' in debug.debug_flags:
793
old_names = [p.access_tuple()[1] for p in self.packs]
794
new_names = [p.access_tuple()[1] for p in packs]
795
trace.mutter('Reordering packs\nfrom: %s\n to: %s',
796
old_names, new_names)
799
def _copy_revision_texts(self):
801
if self.revision_ids:
802
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
805
# select revision keys
806
revision_index_map, revision_indices = self._pack_map_and_index_list(
808
revision_nodes = self._index_contents(revision_indices, revision_keys)
809
revision_nodes = list(revision_nodes)
810
self._update_pack_order(revision_nodes, revision_index_map)
811
# copy revision keys and adjust values
812
self.pb.update("Copying revision texts", 1)
813
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
814
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
815
self.new_pack.revision_index, readv_group_iter, total_items))
816
if 'pack' in debug.debug_flags:
817
trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
818
time.ctime(), self._pack_collection._upload_transport.base,
819
self.new_pack.random_name,
820
self.new_pack.revision_index.key_count(),
821
time.time() - self.new_pack.start_time)
822
self._revision_keys = revision_keys
824
def _get_text_nodes(self):
825
text_index_map, text_indices = self._pack_map_and_index_list(
827
return text_index_map, self._index_contents(text_indices,
830
def _copy_text_texts(self):
832
text_index_map, text_nodes = self._get_text_nodes()
833
if self._text_filter is not None:
834
# We could return the keys copied as part of the return value from
835
# _copy_nodes_graph but this doesn't work all that well with the
836
# need to get line output too, so we check separately, and as we're
837
# going to buffer everything anyway, we check beforehand, which
838
# saves reading knit data over the wire when we know there are
840
text_nodes = set(text_nodes)
841
present_text_keys = set(_node[1] for _node in text_nodes)
842
missing_text_keys = set(self._text_filter) - present_text_keys
843
if missing_text_keys:
844
# TODO: raise a specific error that can handle many missing
846
trace.mutter("missing keys during fetch: %r", missing_text_keys)
847
a_missing_key = missing_text_keys.pop()
848
raise errors.RevisionNotPresent(a_missing_key[1],
850
# copy text keys and adjust values
851
self.pb.update("Copying content texts", 3)
852
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
853
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
854
self.new_pack.text_index, readv_group_iter, total_items))
855
self._log_copied_texts()
857
def _create_pack_from_packs(self):
858
self.pb.update("Opening pack", 0, 5)
859
self.new_pack = self.open_pack()
860
new_pack = self.new_pack
861
# buffer data - we won't be reading-back during the pack creation and
862
# this makes a significant difference on sftp pushes.
863
new_pack.set_write_cache_size(1024*1024)
864
if 'pack' in debug.debug_flags:
865
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
866
for a_pack in self.packs]
867
if self.revision_ids is not None:
868
rev_count = len(self.revision_ids)
871
trace.mutter('%s: create_pack: creating pack from source packs: '
872
'%s%s %s revisions wanted %s t=0',
873
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
874
plain_pack_list, rev_count)
875
self._copy_revision_texts()
876
self._copy_inventory_texts()
877
self._copy_text_texts()
878
# select signature keys
879
signature_filter = self._revision_keys # same keyspace
880
signature_index_map, signature_indices = self._pack_map_and_index_list(
882
signature_nodes = self._index_contents(signature_indices,
884
# copy signature keys and adjust values
885
self.pb.update("Copying signature texts", 4)
886
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
887
new_pack.signature_index)
888
if 'pack' in debug.debug_flags:
889
trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
890
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
891
new_pack.signature_index.key_count(),
892
time.time() - new_pack.start_time)
893
new_pack._check_references()
894
if not self._use_pack(new_pack):
897
self.pb.update("Finishing pack", 5)
899
self._pack_collection.allocate(new_pack)
902
def _least_readv_node_readv(self, nodes):
903
"""Generate request groups for nodes using the least readv's.
905
:param nodes: An iterable of graph index nodes.
906
:return: Total node count and an iterator of the data needed to perform
907
readvs to obtain the data for nodes. Each item yielded by the
908
iterator is a tuple with:
909
index, readv_vector, node_vector. readv_vector is a list ready to
910
hand to the transport readv method, and node_vector is a list of
911
(key, eol_flag, references) for the node retrieved by the
912
matching readv_vector.
914
# group by pack so we do one readv per pack
915
nodes = sorted(nodes)
918
for index, key, value, references in nodes:
919
if index not in request_groups:
920
request_groups[index] = []
921
request_groups[index].append((key, value, references))
923
for index, items in request_groups.iteritems():
924
pack_readv_requests = []
925
for key, value, references in items:
926
# ---- KnitGraphIndex.get_position
927
bits = value[1:].split(' ')
928
offset, length = int(bits[0]), int(bits[1])
929
pack_readv_requests.append(
930
((offset, length), (key, value[0], references)))
931
# linear scan up the pack to maximum range combining.
932
pack_readv_requests.sort()
933
# split out the readv and the node data.
934
pack_readv = [readv for readv, node in pack_readv_requests]
935
node_vector = [node for readv, node in pack_readv_requests]
936
result.append((index, pack_readv, node_vector))
939
def _revision_node_readv(self, revision_nodes):
940
"""Return the total revisions and the readv's to issue.
942
:param revision_nodes: The revision index contents for the packs being
943
incorporated into the new pack.
944
:return: As per _least_readv_node_readv.
946
return self._least_readv_node_readv(revision_nodes)
560
949
class KnitReconcilePacker(KnitPacker):
561
950
"""A packer which regenerates indices etc as it copies.