727
813
Sets self._text_filter appropriately.
729
raise NotImplementedError(self._copy_inventory_texts)
815
# select inventory keys
816
inv_keys = self._revision_keys # currently the same keyspace, and note that
817
# querying for keys here could introduce a bug where an inventory item
818
# is missed, so do not change it to query separately without cross
819
# checking like the text key check below.
820
inventory_index_map, inventory_indices = self._pack_map_and_index_list(
822
inv_nodes = self._index_contents(inventory_indices, inv_keys)
823
# copy inventory keys and adjust values
824
# XXX: Should be a helper function to allow different inv representation
826
self.pb.update("Copying inventory texts", 2)
827
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
828
# Only grab the output lines if we will be processing them
829
output_lines = bool(self.revision_ids)
830
inv_lines = self._copy_nodes_graph(inventory_index_map,
831
self.new_pack._writer, self.new_pack.inventory_index,
832
readv_group_iter, total_items, output_lines=output_lines)
833
if self.revision_ids:
834
self._process_inventory_lines(inv_lines)
836
# eat the iterator to cause it to execute.
838
self._text_filter = None
839
if 'pack' in debug.debug_flags:
840
mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
841
time.ctime(), self._pack_collection._upload_transport.base,
842
self.new_pack.random_name,
843
self.new_pack.inventory_index.key_count(),
844
time.time() - self.new_pack.start_time)
731
846
def _copy_text_texts(self):
732
raise NotImplementedError(self._copy_text_texts)
848
text_index_map, text_nodes = self._get_text_nodes()
849
if self._text_filter is not None:
850
# We could return the keys copied as part of the return value from
851
# _copy_nodes_graph but this doesn't work all that well with the
852
# need to get line output too, so we check separately, and as we're
853
# going to buffer everything anyway, we check beforehand, which
854
# saves reading knit data over the wire when we know there are
856
text_nodes = set(text_nodes)
857
present_text_keys = set(_node[1] for _node in text_nodes)
858
missing_text_keys = set(self._text_filter) - present_text_keys
859
if missing_text_keys:
860
# TODO: raise a specific error that can handle many missing
862
mutter("missing keys during fetch: %r", missing_text_keys)
863
a_missing_key = missing_text_keys.pop()
864
raise errors.RevisionNotPresent(a_missing_key[1],
866
# copy text keys and adjust values
867
self.pb.update("Copying content texts", 3)
868
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
869
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
870
self.new_pack.text_index, readv_group_iter, total_items))
871
self._log_copied_texts()
734
873
def _create_pack_from_packs(self):
735
raise NotImplementedError(self._create_pack_from_packs)
874
self.pb.update("Opening pack", 0, 5)
875
self.new_pack = self.open_pack()
876
new_pack = self.new_pack
877
# buffer data - we won't be reading-back during the pack creation and
878
# this makes a significant difference on sftp pushes.
879
new_pack.set_write_cache_size(1024*1024)
880
if 'pack' in debug.debug_flags:
881
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
882
for a_pack in self.packs]
883
if self.revision_ids is not None:
884
rev_count = len(self.revision_ids)
887
mutter('%s: create_pack: creating pack from source packs: '
888
'%s%s %s revisions wanted %s t=0',
889
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
890
plain_pack_list, rev_count)
891
self._copy_revision_texts()
892
self._copy_inventory_texts()
893
self._copy_text_texts()
894
# select signature keys
895
signature_filter = self._revision_keys # same keyspace
896
signature_index_map, signature_indices = self._pack_map_and_index_list(
898
signature_nodes = self._index_contents(signature_indices,
900
# copy signature keys and adjust values
901
self.pb.update("Copying signature texts", 4)
902
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
903
new_pack.signature_index)
904
if 'pack' in debug.debug_flags:
905
mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
906
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
907
new_pack.signature_index.key_count(),
908
time.time() - new_pack.start_time)
910
# NB XXX: how to check CHK references are present? perhaps by yielding
911
# the items? How should that interact with stacked repos?
912
if new_pack.chk_index is not None:
914
if 'pack' in debug.debug_flags:
915
mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
916
time.ctime(), self._pack_collection._upload_transport.base,
917
new_pack.random_name,
918
new_pack.chk_index.key_count(),
919
time.time() - new_pack.start_time)
920
new_pack._check_references()
921
if not self._use_pack(new_pack):
924
self.pb.update("Finishing pack", 5)
926
self._pack_collection.allocate(new_pack)
929
def _copy_chks(self, refs=None):
930
# XXX: Todo, recursive follow-pointers facility when fetching some
932
chk_index_map, chk_indices = self._pack_map_and_index_list(
934
chk_nodes = self._index_contents(chk_indices, refs)
936
# TODO: This isn't strictly tasteful as we are accessing some private
937
# variables (_serializer). Perhaps a better way would be to have
938
# Repository._deserialise_chk_node()
939
search_key_func = chk_map.search_key_registry.get(
940
self._pack_collection.repo._serializer.search_key_name)
941
def accumlate_refs(lines):
942
# XXX: move to a generic location
944
bytes = ''.join(lines)
945
node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
946
new_refs.update(node.refs())
947
self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
948
self.new_pack.chk_index, output_lines=accumlate_refs)
951
def _copy_nodes(self, nodes, index_map, writer, write_index,
953
"""Copy knit nodes between packs with no graph references.
955
:param output_lines: Output full texts of copied items.
957
pb = ui.ui_factory.nested_progress_bar()
959
return self._do_copy_nodes(nodes, index_map, writer,
960
write_index, pb, output_lines=output_lines)
964
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
966
# for record verification
967
knit = KnitVersionedFiles(None, None)
968
# plan a readv on each source pack:
970
nodes = sorted(nodes)
971
# how to map this into knit.py - or knit.py into this?
972
# we don't want the typical knit logic, we want grouping by pack
973
# at this point - perhaps a helper library for the following code
974
# duplication points?
976
for index, key, value in nodes:
977
if index not in request_groups:
978
request_groups[index] = []
979
request_groups[index].append((key, value))
981
pb.update("Copied record", record_index, len(nodes))
982
for index, items in request_groups.iteritems():
983
pack_readv_requests = []
984
for key, value in items:
985
# ---- KnitGraphIndex.get_position
986
bits = value[1:].split(' ')
987
offset, length = int(bits[0]), int(bits[1])
988
pack_readv_requests.append((offset, length, (key, value[0])))
989
# linear scan up the pack
990
pack_readv_requests.sort()
992
pack_obj = index_map[index]
993
transport, path = pack_obj.access_tuple()
995
reader = pack.make_readv_reader(transport, path,
996
[offset[0:2] for offset in pack_readv_requests])
997
except errors.NoSuchFile:
998
if self._reload_func is not None:
1001
for (names, read_func), (_1, _2, (key, eol_flag)) in \
1002
izip(reader.iter_records(), pack_readv_requests):
1003
raw_data = read_func(None)
1004
# check the header only
1005
if output_lines is not None:
1006
output_lines(knit._parse_record(key[-1], raw_data)[0])
1008
df, _ = knit._parse_record_header(key, raw_data)
1010
pos, size = writer.add_bytes_record(raw_data, names)
1011
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
1012
pb.update("Copied record", record_index)
1015
def _copy_nodes_graph(self, index_map, writer, write_index,
1016
readv_group_iter, total_items, output_lines=False):
1017
"""Copy knit nodes between packs.
1019
:param output_lines: Return lines present in the copied data as
1020
an iterator of line,version_id.
1022
pb = ui.ui_factory.nested_progress_bar()
1024
for result in self._do_copy_nodes_graph(index_map, writer,
1025
write_index, output_lines, pb, readv_group_iter, total_items):
1028
# Python 2.4 does not permit try:finally: in a generator.
1034
def _do_copy_nodes_graph(self, index_map, writer, write_index,
1035
output_lines, pb, readv_group_iter, total_items):
1036
# for record verification
1037
knit = KnitVersionedFiles(None, None)
1038
# for line extraction when requested (inventories only)
1040
factory = KnitPlainFactory()
1042
pb.update("Copied record", record_index, total_items)
1043
for index, readv_vector, node_vector in readv_group_iter:
1045
pack_obj = index_map[index]
1046
transport, path = pack_obj.access_tuple()
1048
reader = pack.make_readv_reader(transport, path, readv_vector)
1049
except errors.NoSuchFile:
1050
if self._reload_func is not None:
1053
for (names, read_func), (key, eol_flag, references) in \
1054
izip(reader.iter_records(), node_vector):
1055
raw_data = read_func(None)
1057
# read the entire thing
1058
content, _ = knit._parse_record(key[-1], raw_data)
1059
if len(references[-1]) == 0:
1060
line_iterator = factory.get_fulltext_content(content)
1062
line_iterator = factory.get_linedelta_content(content)
1063
for line in line_iterator:
1066
# check the header only
1067
df, _ = knit._parse_record_header(key, raw_data)
1069
pos, size = writer.add_bytes_record(raw_data, names)
1070
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
1071
pb.update("Copied record", record_index)
1074
def _get_text_nodes(self):
1075
text_index_map, text_indices = self._pack_map_and_index_list(
1077
return text_index_map, self._index_contents(text_indices,
1080
def _least_readv_node_readv(self, nodes):
1081
"""Generate request groups for nodes using the least readv's.
1083
:param nodes: An iterable of graph index nodes.
1084
:return: Total node count and an iterator of the data needed to perform
1085
readvs to obtain the data for nodes. Each item yielded by the
1086
iterator is a tuple with:
1087
index, readv_vector, node_vector. readv_vector is a list ready to
1088
hand to the transport readv method, and node_vector is a list of
1089
(key, eol_flag, references) for the node retrieved by the
1090
matching readv_vector.
1092
# group by pack so we do one readv per pack
1093
nodes = sorted(nodes)
1096
for index, key, value, references in nodes:
1097
if index not in request_groups:
1098
request_groups[index] = []
1099
request_groups[index].append((key, value, references))
1101
for index, items in request_groups.iteritems():
1102
pack_readv_requests = []
1103
for key, value, references in items:
1104
# ---- KnitGraphIndex.get_position
1105
bits = value[1:].split(' ')
1106
offset, length = int(bits[0]), int(bits[1])
1107
pack_readv_requests.append(
1108
((offset, length), (key, value[0], references)))
1109
# linear scan up the pack to maximum range combining.
1110
pack_readv_requests.sort()
1111
# split out the readv and the node data.
1112
pack_readv = [readv for readv, node in pack_readv_requests]
1113
node_vector = [node for readv, node in pack_readv_requests]
1114
result.append((index, pack_readv, node_vector))
1115
return total, result
737
1117
def _log_copied_texts(self):
738
1118
if 'pack' in debug.debug_flags: