82
81
CommitBuilder.__init__(self, repository, parents, config,
83
82
timestamp=timestamp, timezone=timezone, committer=committer,
84
83
revprops=revprops, revision_id=revision_id)
85
self._file_graph = graph.Graph(
84
self._file_graph = Graph(
86
85
repository._pack_collection.text_index.combined_index)
88
87
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
108
107
CommitBuilder.__init__(self, repository, parents, config,
109
108
timestamp=timestamp, timezone=timezone, committer=committer,
110
109
revprops=revprops, revision_id=revision_id)
111
self._file_graph = graph.Graph(
110
self._file_graph = Graph(
112
111
repository._pack_collection.text_index.combined_index)
114
113
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
611
610
revision_nodes = self._pack_collection._index_contents(revision_index_map, revision_keys)
612
611
# copy revision keys and adjust values
613
612
self.pb.update("Copying revision texts", 1)
614
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
615
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
616
self.new_pack.revision_index, readv_group_iter, total_items))
613
list(self._copy_nodes_graph(revision_nodes, revision_index_map,
614
self.new_pack._writer, self.new_pack.revision_index))
617
615
if 'pack' in debug.debug_flags:
618
616
mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
619
617
time.ctime(), self._pack_collection._upload_transport.base,
641
639
# XXX: Should be a helper function to allow different inv representation
643
641
self.pb.update("Copying inventory texts", 2)
644
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
645
inv_lines = self._copy_nodes_graph(inventory_index_map,
646
self.new_pack._writer, self.new_pack.inventory_index,
647
readv_group_iter, total_items, output_lines=True)
642
inv_lines = self._copy_nodes_graph(inv_nodes, inventory_index_map,
643
self.new_pack._writer, self.new_pack.inventory_index, output_lines=True)
648
644
if self.revision_ids:
649
645
self._process_inventory_lines(inv_lines)
679
675
a_missing_key[0])
680
676
# copy text keys and adjust values
681
677
self.pb.update("Copying content texts", 3)
682
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
683
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
684
self.new_pack.text_index, readv_group_iter, total_items))
678
list(self._copy_nodes_graph(text_nodes, text_index_map,
679
self.new_pack._writer, self.new_pack.text_index))
685
680
self._log_copied_texts()
687
682
def _check_references(self):
792
787
pb.update("Copied record", record_index)
793
788
record_index += 1
795
def _copy_nodes_graph(self, index_map, writer, write_index,
796
readv_group_iter, total_items, output_lines=False):
790
def _copy_nodes_graph(self, nodes, index_map, writer, write_index,
797
792
"""Copy knit nodes between packs.
799
794
:param output_lines: Return lines present in the copied data as
802
797
pb = ui.ui_factory.nested_progress_bar()
804
for result in self._do_copy_nodes_graph(index_map, writer,
805
write_index, output_lines, pb, readv_group_iter, total_items):
799
for result in self._do_copy_nodes_graph(nodes, index_map, writer,
800
write_index, output_lines, pb):
807
802
except Exception:
808
803
# Python 2.4 does not permit try:finally: in a generator.
814
def _do_copy_nodes_graph(self, index_map, writer, write_index,
815
output_lines, pb, readv_group_iter, total_items):
809
def _do_copy_nodes_graph(self, nodes, index_map, writer, write_index,
816
811
# for record verification
817
812
knit_data = _KnitData(None)
818
813
# for line extraction when requested (inventories only)
820
815
factory = knit.KnitPlainFactory()
816
# plan a readv on each source pack:
818
nodes = sorted(nodes)
819
# how to map this into knit.py - or knit.py into this?
820
# we don't want the typical knit logic, we want grouping by pack
821
# at this point - perhaps a helper library for the following code
822
# duplication points?
822
pb.update("Copied record", record_index, total_items)
823
for index, readv_vector, node_vector in readv_group_iter:
825
pb.update("Copied record", record_index, len(nodes))
826
for index, key, value, references in nodes:
827
if index not in request_groups:
828
request_groups[index] = []
829
request_groups[index].append((key, value, references))
830
for index, items in request_groups.iteritems():
831
pack_readv_requests = []
832
for key, value, references in items:
833
# ---- KnitGraphIndex.get_position
834
bits = value[1:].split(' ')
835
offset, length = int(bits[0]), int(bits[1])
836
pack_readv_requests.append((offset, length, (key, value[0], references)))
837
# linear scan up the pack
838
pack_readv_requests.sort()
825
840
transport, path = index_map[index]
826
reader = pack.make_readv_reader(transport, path, readv_vector)
827
for (names, read_func), (key, eol_flag, references) in \
828
izip(reader.iter_records(), node_vector):
841
reader = pack.make_readv_reader(transport, path,
842
[offset[0:2] for offset in pack_readv_requests])
843
for (names, read_func), (_1, _2, (key, eol_flag, references)) in \
844
izip(reader.iter_records(), pack_readv_requests):
829
845
raw_data = read_func(None)
830
846
version_id = key[-1]
852
868
return text_index_map, self._pack_collection._index_contents(text_index_map,
853
869
self._text_filter)
855
def _least_readv_node_readv(self, nodes):
856
"""Generate request groups for nodes using the least readv's.
858
:param nodes: An iterable of graph index nodes.
859
:return: Total node count and an iterator of the data needed to perform
860
readvs to obtain the data for nodes. Each item yielded by the
861
iterator is a tuple with:
862
index, readv_vector, node_vector. readv_vector is a list ready to
863
hand to the transport readv method, and node_vector is a list of
864
(key, eol_flag, references) for the the node retrieved by the
865
matching readv_vector.
867
# group by pack so we do one readv per pack
868
nodes = sorted(nodes)
871
for index, key, value, references in nodes:
872
if index not in request_groups:
873
request_groups[index] = []
874
request_groups[index].append((key, value, references))
876
for index, items in request_groups.iteritems():
877
pack_readv_requests = []
878
for key, value, references in items:
879
# ---- KnitGraphIndex.get_position
880
bits = value[1:].split(' ')
881
offset, length = int(bits[0]), int(bits[1])
882
pack_readv_requests.append(
883
((offset, length), (key, value[0], references)))
884
# linear scan up the pack to maximum range combining.
885
pack_readv_requests.sort()
886
# split out the readv and the node data.
887
pack_readv = [readv for readv, node in pack_readv_requests]
888
node_vector = [node for readv, node in pack_readv_requests]
889
result.append((index, pack_readv, node_vector))
892
871
def _log_copied_texts(self):
893
872
if 'pack' in debug.debug_flags:
894
873
mutter('%s: create_pack: file texts copied: %s%s %d items t+%6.3fs',
907
886
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
908
887
self._text_filter = text_filter
910
def _revision_node_readv(self, revision_nodes):
911
"""Return the total revisions and the readv's to issue.
913
:param revision_nodes: The revision index contents for the packs being
914
incorporated into the new pack.
915
:return: As per _least_readv_node_readv.
917
return self._least_readv_node_readv(revision_nodes)
919
889
def _use_pack(self, new_pack):
920
890
"""Return True if new_pack should be used.
925
895
return new_pack.data_inserted()
928
class OptimisingPacker(Packer):
929
"""A packer which spends more time to create better disk layouts."""
931
def _revision_node_readv(self, revision_nodes):
932
"""Return the total revisions and the readv's to issue.
934
This sort places revisions in topological order with the ancestors
937
:param revision_nodes: The revision index contents for the packs being
938
incorporated into the new pack.
939
:return: As per _least_readv_node_readv.
941
# build an ancestors dict
944
for index, key, value, references in revision_nodes:
945
ancestors[key] = references[0]
946
by_key[key] = (index, value, references)
947
order = tsort.topo_sort(ancestors)
949
# Single IO is pathological, but it will work as a starting point.
951
for key in reversed(order):
952
index, value, references = by_key[key]
953
# ---- KnitGraphIndex.get_position
954
bits = value[1:].split(' ')
955
offset, length = int(bits[0]), int(bits[1])
957
(index, [(offset, length)], [(key, value[0], references)]))
958
# TODO: combine requests in the same index that are in ascending order.
959
return total, requests
962
898
class ReconcilePacker(Packer):
963
899
"""A packer which regenerates indices etc as it copies.
1034
970
# 3) bulk copy the ok data
1035
total_items, readv_group_iter = self._least_readv_node_readv(ok_nodes)
1036
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
1037
self.new_pack.text_index, readv_group_iter, total_items))
971
list(self._copy_nodes_graph(ok_nodes, text_index_map,
972
self.new_pack._writer, self.new_pack.text_index))
1038
973
# 4) adhoc copy all the other texts.
1039
974
# We have to topologically insert all texts otherwise we can fail to
1040
975
# reconcile when parts of a single delta chain are preserved intact,
1229
1164
self._execute_pack_operations(pack_operations)
1232
def _execute_pack_operations(self, pack_operations, _packer_class=Packer):
1167
def _execute_pack_operations(self, pack_operations):
1233
1168
"""Execute a series of pack operations.
1235
1170
:param pack_operations: A list of [revision_count, packs_to_combine].
1236
:param _packer_class: The class of packer to use (default: Packer).
1239
1173
for revision_count, packs in pack_operations:
1240
1174
# we may have no-ops from the setup logic
1241
1175
if len(packs) == 0:
1243
_packer_class(self, packs, '.autopack').pack()
1177
Packer(self, packs, '.autopack').pack()
1244
1178
for pack in packs:
1245
1179
self._remove_pack_from_memory(pack)
1246
1180
# record the newly available packs and stop advertising the old
1277
1208
pack_distribution = [1]
1278
1209
pack_operations = [[0, []]]
1279
1210
for pack in self.all_packs():
1280
pack_operations[-1][0] += pack.get_revision_count()
1211
revision_count = pack.get_revision_count()
1212
pack_operations[-1][0] += revision_count
1281
1213
pack_operations[-1][1].append(pack)
1282
self._execute_pack_operations(pack_operations, OptimisingPacker)
1214
self._execute_pack_operations(pack_operations)
1284
1216
def plan_autopack_combinations(self, existing_packs, pack_distribution):
1285
1217
"""Plan a pack operation.
1899
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1900
1830
def get_parents(self, revision_ids):
1901
"""See graph._StackedParentsProvider.get_parents."""
1902
parent_map = self.get_parent_map(revision_ids)
1903
return [parent_map.get(r, None) for r in revision_ids]
1905
def get_parent_map(self, keys):
1906
"""See graph._StackedParentsProvider.get_parent_map
1831
"""See StackedParentsProvider.get_parents.
1908
1833
This implementation accesses the combined revision index to provide
1911
1836
self._pack_collection.ensure_loaded()
1912
1837
index = self._pack_collection.revision_index.combined_index
1914
if _mod_revision.NULL_REVISION in keys:
1915
keys.discard(_mod_revision.NULL_REVISION)
1916
found_parents = {_mod_revision.NULL_REVISION:()}
1919
search_keys = set((revision_id,) for revision_id in keys)
1839
for revision_id in revision_ids:
1840
if revision_id != _mod_revision.NULL_REVISION:
1841
search_keys.add((revision_id,))
1842
found_parents = {_mod_revision.NULL_REVISION:[]}
1920
1843
for index, key, value, refs in index.iter_entries(search_keys):
1921
1844
parents = refs[0]
1922
1845
if not parents:
1925
1848
parents = tuple(parent[0] for parent in parents)
1926
1849
found_parents[key[0]] = parents
1927
return found_parents
1929
def has_revisions(self, revision_ids):
1930
"""See Repository.has_revisions()."""
1931
revision_ids = set(revision_ids)
1932
result = revision_ids.intersection(
1933
set([None, _mod_revision.NULL_REVISION]))
1934
revision_ids.difference_update(result)
1935
index = self._pack_collection.revision_index.combined_index
1936
keys = [(revision_id,) for revision_id in revision_ids]
1937
result.update(node[1][0] for node in index.iter_entries(keys))
1851
for revision_id in revision_ids:
1853
result.append(found_parents[revision_id])
1940
1858
def _make_parents_provider(self):
1941
return graph.CachingParentsProvider(self)
1943
1861
def _refresh_data(self):
1944
1862
if self._write_lock_count == 1 or (