569
556
revision_ids=revision_ids,
570
557
reload_func=reload_func)
572
def _pack_map_and_index_list(self, index_attribute):
573
"""Convert a list of packs to an index pack map and index list.
575
:param index_attribute: The attribute that the desired index is found
577
:return: A tuple (map, list) where map contains the dict from
578
index:pack_tuple, and list contains the indices in the preferred
583
for pack_obj in self.packs:
584
index = getattr(pack_obj, index_attribute)
585
indices.append(index)
586
pack_map[index] = pack_obj
587
return pack_map, indices
589
def _index_contents(self, indices, key_filter=None):
590
"""Get an iterable of the index contents from a pack_map.
592
:param indices: The list of indices to query
593
:param key_filter: An optional filter to limit the keys returned.
595
all_index = CombinedGraphIndex(indices)
596
if key_filter is None:
597
return all_index.iter_all_entries()
599
return all_index.iter_entries(key_filter)
601
def _copy_nodes(self, nodes, index_map, writer, write_index,
603
"""Copy knit nodes between packs with no graph references.
605
:param output_lines: Output full texts of copied items.
607
pb = ui.ui_factory.nested_progress_bar()
609
return self._do_copy_nodes(nodes, index_map, writer,
610
write_index, pb, output_lines=output_lines)
614
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
616
# for record verification
617
knit = KnitVersionedFiles(None, None)
618
# plan a readv on each source pack:
620
nodes = sorted(nodes)
621
# how to map this into knit.py - or knit.py into this?
622
# we don't want the typical knit logic, we want grouping by pack
623
# at this point - perhaps a helper library for the following code
624
# duplication points?
626
for index, key, value in nodes:
627
if index not in request_groups:
628
request_groups[index] = []
629
request_groups[index].append((key, value))
631
pb.update("Copied record", record_index, len(nodes))
632
for index, items in request_groups.iteritems():
633
pack_readv_requests = []
634
for key, value in items:
635
# ---- KnitGraphIndex.get_position
636
bits = value[1:].split(' ')
637
offset, length = int(bits[0]), int(bits[1])
638
pack_readv_requests.append((offset, length, (key, value[0])))
639
# linear scan up the pack
640
pack_readv_requests.sort()
642
pack_obj = index_map[index]
643
transport, path = pack_obj.access_tuple()
645
reader = pack.make_readv_reader(transport, path,
646
[offset[0:2] for offset in pack_readv_requests])
647
except errors.NoSuchFile:
648
if self._reload_func is not None:
651
for (names, read_func), (_1, _2, (key, eol_flag)) in \
652
izip(reader.iter_records(), pack_readv_requests):
653
raw_data = read_func(None)
654
# check the header only
655
if output_lines is not None:
656
output_lines(knit._parse_record(key[-1], raw_data)[0])
658
df, _ = knit._parse_record_header(key, raw_data)
660
pos, size = writer.add_bytes_record(raw_data, names)
661
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
662
pb.update("Copied record", record_index)
665
def _copy_nodes_graph(self, index_map, writer, write_index,
666
readv_group_iter, total_items, output_lines=False):
667
"""Copy knit nodes between packs.
669
:param output_lines: Return lines present in the copied data as
670
an iterator of line,version_id.
672
pb = ui.ui_factory.nested_progress_bar()
674
for result in self._do_copy_nodes_graph(index_map, writer,
675
write_index, output_lines, pb, readv_group_iter, total_items):
678
# Python 2.4 does not permit try:finally: in a generator.
684
def _do_copy_nodes_graph(self, index_map, writer, write_index,
685
output_lines, pb, readv_group_iter, total_items):
686
# for record verification
687
knit = KnitVersionedFiles(None, None)
688
# for line extraction when requested (inventories only)
690
factory = KnitPlainFactory()
692
pb.update("Copied record", record_index, total_items)
693
for index, readv_vector, node_vector in readv_group_iter:
695
pack_obj = index_map[index]
696
transport, path = pack_obj.access_tuple()
698
reader = pack.make_readv_reader(transport, path, readv_vector)
699
except errors.NoSuchFile:
700
if self._reload_func is not None:
703
for (names, read_func), (key, eol_flag, references) in \
704
izip(reader.iter_records(), node_vector):
705
raw_data = read_func(None)
707
# read the entire thing
708
content, _ = knit._parse_record(key[-1], raw_data)
709
if len(references[-1]) == 0:
710
line_iterator = factory.get_fulltext_content(content)
712
line_iterator = factory.get_linedelta_content(content)
713
for line in line_iterator:
716
# check the header only
717
df, _ = knit._parse_record_header(key, raw_data)
719
pos, size = writer.add_bytes_record(raw_data, names)
720
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
721
pb.update("Copied record", record_index)
724
def _process_inventory_lines(self, inv_lines):
725
"""Use up the inv_lines generator and setup a text key filter."""
726
repo = self._pack_collection.repo
727
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
728
inv_lines, self.revision_keys)
730
for fileid, file_revids in fileid_revisions.iteritems():
731
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
732
self._text_filter = text_filter
734
def _copy_inventory_texts(self):
735
# select inventory keys
736
inv_keys = self._revision_keys # currently the same keyspace, and note that
737
# querying for keys here could introduce a bug where an inventory item
738
# is missed, so do not change it to query separately without cross
739
# checking like the text key check below.
740
inventory_index_map, inventory_indices = self._pack_map_and_index_list(
742
inv_nodes = self._index_contents(inventory_indices, inv_keys)
743
# copy inventory keys and adjust values
744
# XXX: Should be a helper function to allow different inv representation
746
self.pb.update("Copying inventory texts", 2)
747
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
748
# Only grab the output lines if we will be processing them
749
output_lines = bool(self.revision_ids)
750
inv_lines = self._copy_nodes_graph(inventory_index_map,
751
self.new_pack._writer, self.new_pack.inventory_index,
752
readv_group_iter, total_items, output_lines=output_lines)
753
if self.revision_ids:
754
self._process_inventory_lines(inv_lines)
756
# eat the iterator to cause it to execute.
758
self._text_filter = None
759
if 'pack' in debug.debug_flags:
760
trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
761
time.ctime(), self._pack_collection._upload_transport.base,
762
self.new_pack.random_name,
763
self.new_pack.inventory_index.key_count(),
764
time.time() - self.new_pack.start_time)
766
def _update_pack_order(self, entries, index_to_pack_map):
767
"""Determine how we want our packs to be ordered.
769
This changes the sort order of the self.packs list so that packs unused
770
by 'entries' will be at the end of the list, so that future requests
771
can avoid probing them. Used packs will be at the front of the
772
self.packs list, in the order of their first use in 'entries'.
774
:param entries: A list of (index, ...) tuples
775
:param index_to_pack_map: A mapping from index objects to pack objects.
779
for entry in entries:
781
if index not in seen_indexes:
782
packs.append(index_to_pack_map[index])
783
seen_indexes.add(index)
784
if len(packs) == len(self.packs):
785
if 'pack' in debug.debug_flags:
786
trace.mutter('Not changing pack list, all packs used.')
788
seen_packs = set(packs)
789
for pack in self.packs:
790
if pack not in seen_packs:
793
if 'pack' in debug.debug_flags:
794
old_names = [p.access_tuple()[1] for p in self.packs]
795
new_names = [p.access_tuple()[1] for p in packs]
796
trace.mutter('Reordering packs\nfrom: %s\n to: %s',
797
old_names, new_names)
800
def _copy_revision_texts(self):
802
if self.revision_ids:
803
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
806
# select revision keys
807
revision_index_map, revision_indices = self._pack_map_and_index_list(
809
revision_nodes = self._index_contents(revision_indices, revision_keys)
810
revision_nodes = list(revision_nodes)
811
self._update_pack_order(revision_nodes, revision_index_map)
812
# copy revision keys and adjust values
813
self.pb.update("Copying revision texts", 1)
814
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
815
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
816
self.new_pack.revision_index, readv_group_iter, total_items))
817
if 'pack' in debug.debug_flags:
818
trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
819
time.ctime(), self._pack_collection._upload_transport.base,
820
self.new_pack.random_name,
821
self.new_pack.revision_index.key_count(),
822
time.time() - self.new_pack.start_time)
823
self._revision_keys = revision_keys
825
def _get_text_nodes(self):
826
text_index_map, text_indices = self._pack_map_and_index_list(
828
return text_index_map, self._index_contents(text_indices,
831
def _copy_text_texts(self):
833
text_index_map, text_nodes = self._get_text_nodes()
834
if self._text_filter is not None:
835
# We could return the keys copied as part of the return value from
836
# _copy_nodes_graph but this doesn't work all that well with the
837
# need to get line output too, so we check separately, and as we're
838
# going to buffer everything anyway, we check beforehand, which
839
# saves reading knit data over the wire when we know there are
841
text_nodes = set(text_nodes)
842
present_text_keys = set(_node[1] for _node in text_nodes)
843
missing_text_keys = set(self._text_filter) - present_text_keys
844
if missing_text_keys:
845
# TODO: raise a specific error that can handle many missing
847
trace.mutter("missing keys during fetch: %r", missing_text_keys)
848
a_missing_key = missing_text_keys.pop()
849
raise errors.RevisionNotPresent(a_missing_key[1],
851
# copy text keys and adjust values
852
self.pb.update("Copying content texts", 3)
853
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
854
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
855
self.new_pack.text_index, readv_group_iter, total_items))
856
self._log_copied_texts()
858
def _create_pack_from_packs(self):
859
self.pb.update("Opening pack", 0, 5)
860
self.new_pack = self.open_pack()
861
new_pack = self.new_pack
862
# buffer data - we won't be reading-back during the pack creation and
863
# this makes a significant difference on sftp pushes.
864
new_pack.set_write_cache_size(1024*1024)
865
if 'pack' in debug.debug_flags:
866
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
867
for a_pack in self.packs]
868
if self.revision_ids is not None:
869
rev_count = len(self.revision_ids)
872
trace.mutter('%s: create_pack: creating pack from source packs: '
873
'%s%s %s revisions wanted %s t=0',
874
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
875
plain_pack_list, rev_count)
876
self._copy_revision_texts()
877
self._copy_inventory_texts()
878
self._copy_text_texts()
879
# select signature keys
880
signature_filter = self._revision_keys # same keyspace
881
signature_index_map, signature_indices = self._pack_map_and_index_list(
883
signature_nodes = self._index_contents(signature_indices,
885
# copy signature keys and adjust values
886
self.pb.update("Copying signature texts", 4)
887
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
888
new_pack.signature_index)
889
if 'pack' in debug.debug_flags:
890
trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
891
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
892
new_pack.signature_index.key_count(),
893
time.time() - new_pack.start_time)
894
new_pack._check_references()
895
if not self._use_pack(new_pack):
898
self.pb.update("Finishing pack", 5)
900
self._pack_collection.allocate(new_pack)
903
def _least_readv_node_readv(self, nodes):
904
"""Generate request groups for nodes using the least readv's.
906
:param nodes: An iterable of graph index nodes.
907
:return: Total node count and an iterator of the data needed to perform
908
readvs to obtain the data for nodes. Each item yielded by the
909
iterator is a tuple with:
910
index, readv_vector, node_vector. readv_vector is a list ready to
911
hand to the transport readv method, and node_vector is a list of
912
(key, eol_flag, references) for the node retrieved by the
913
matching readv_vector.
915
# group by pack so we do one readv per pack
916
nodes = sorted(nodes)
919
for index, key, value, references in nodes:
920
if index not in request_groups:
921
request_groups[index] = []
922
request_groups[index].append((key, value, references))
924
for index, items in request_groups.iteritems():
925
pack_readv_requests = []
926
for key, value, references in items:
927
# ---- KnitGraphIndex.get_position
928
bits = value[1:].split(' ')
929
offset, length = int(bits[0]), int(bits[1])
930
pack_readv_requests.append(
931
((offset, length), (key, value[0], references)))
932
# linear scan up the pack to maximum range combining.
933
pack_readv_requests.sort()
934
# split out the readv and the node data.
935
pack_readv = [readv for readv, node in pack_readv_requests]
936
node_vector = [node for readv, node in pack_readv_requests]
937
result.append((index, pack_readv, node_vector))
940
def _revision_node_readv(self, revision_nodes):
941
"""Return the total revisions and the readv's to issue.
943
:param revision_nodes: The revision index contents for the packs being
944
incorporated into the new pack.
945
:return: As per _least_readv_node_readv.
947
return self._least_readv_node_readv(revision_nodes)
950
560
class KnitReconcilePacker(KnitPacker):
951
561
"""A packer which regenerates indices etc as it copies.