955
978
contents, digest).
958
class ContinuousRange:
959
def __init__(self, rec_id, pos, size):
961
self.end_pos = pos + size
962
self.versions = [(rec_id, pos, size)]
964
def add(self, rec_id, pos, size):
965
if self.end_pos != pos:
967
self.end_pos = pos + size
968
self.versions.append((rec_id, pos, size))
972
for rec_id, pos, size in self.versions:
973
yield rec_id, fp.read(size)
975
# We take it that the transport optimizes the fetching as good
976
# as possible (ie, reads continous ranges.)
977
response = self._transport.readv(self._filename,
978
[(pos, size) for version_id, pos, size in records])
980
for (record_id, pos, size), (pos, data) in izip(iter(records), response):
981
content, digest = self._parse_record(record_id, data)
982
yield record_id, content, digest
982
for version_id, pos, size in records:
983
if version_id not in self._records:
984
needed_records.append((version_id, pos, size))
986
if len(needed_records):
987
# We take it that the transport optimizes the fetching as good
988
# as possible (ie, reads continous ranges.)
989
response = self._transport.readv(self._filename,
990
[(pos, size) for version_id, pos, size in needed_records])
992
for (record_id, pos, size), (pos, data) in izip(iter(records), response):
993
content, digest = self._parse_record(record_id, data)
994
self._records[record_id] = (digest, content)
996
for version_id, pos, size in records:
997
yield version_id, copy(self._records[version_id][1]), copy(self._records[version_id][0])
984
999
def read_records(self, records):
985
1000
"""Read records into a dictionary."""
1020
1035
if not version_ids:
1024
from bzrlib.progress import DummyProgress
1025
pb = DummyProgress()
1027
version_ids = list(version_ids)
1028
if None in version_ids:
1029
version_ids.remove(None)
1031
self.source_ancestry = set(self.source.get_ancestry(version_ids))
1032
this_versions = set(self.target._index.get_versions())
1033
needed_versions = self.source_ancestry - this_versions
1034
cross_check_versions = self.source_ancestry.intersection(this_versions)
1035
mismatched_versions = set()
1036
for version in cross_check_versions:
1037
# scan to include needed parents.
1038
n1 = set(self.target.get_parents_with_ghosts(version))
1039
n2 = set(self.source.get_parents_with_ghosts(version))
1041
# FIXME TEST this check for cycles being introduced works
1042
# the logic is we have a cycle if in our graph we are an
1043
# ancestor of any of the n2 revisions.
1049
parent_ancestors = self.source.get_ancestry(parent)
1050
if version in parent_ancestors:
1051
raise errors.GraphCycleError([parent, version])
1052
# ensure this parent will be available later.
1053
new_parents = n2.difference(n1)
1054
needed_versions.update(new_parents.difference(this_versions))
1055
mismatched_versions.add(version)
1057
if not needed_versions and not cross_check_versions:
1059
full_list = topo_sort(self.source.get_graph())
1061
version_list = [i for i in full_list if (not self.target.has_version(i)
1062
and i in needed_versions)]
1065
for version_id in version_list:
1066
data_pos, data_size = self.source._index.get_position(version_id)
1067
records.append((version_id, data_pos, data_size))
1070
for version_id, lines, digest \
1071
in self.source._data.read_records_iter(records):
1072
options = self.source._index.get_options(version_id)
1073
parents = self.source._index.get_parents_with_ghosts(version_id)
1075
for parent in parents:
1076
# if source has the parent, we must hav grabbed it first.
1077
assert (self.target.has_version(parent) or not
1078
self.source.has_version(parent))
1080
if self.target.factory.annotated:
1081
# FIXME jrydberg: it should be possible to skip
1082
# re-annotating components if we know that we are
1083
# going to pull all revisions in the same order.
1084
new_version_id = version_id
1085
new_version_idx = self.target._index.num_versions()
1086
if 'fulltext' in options:
1087
lines = self.target._reannotate_fulltext(self.source, lines,
1088
new_version_id, new_version_idx)
1089
elif 'line-delta' in options:
1090
lines = self.target._reannotate_line_delta(self.source, lines,
1091
new_version_id, new_version_idx)
1094
pb.update("Joining knit", count, len(version_list))
1096
pos, size = self.target._data.add_record(version_id, digest, lines)
1097
self.target._index.add_version(version_id, options, pos, size, parents)
1099
for version in mismatched_versions:
1100
n1 = set(self.target.get_parents_with_ghosts(version))
1101
n2 = set(self.source.get_parents_with_ghosts(version))
1102
# write a combined record to our history preserving the current
1103
# parents as first in the list
1104
new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1105
self.target.fix_parents(version, new_parents)
1038
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1040
version_ids = list(version_ids)
1041
if None in version_ids:
1042
version_ids.remove(None)
1044
self.source_ancestry = set(self.source.get_ancestry(version_ids))
1045
this_versions = set(self.target._index.get_versions())
1046
needed_versions = self.source_ancestry - this_versions
1047
cross_check_versions = self.source_ancestry.intersection(this_versions)
1048
mismatched_versions = set()
1049
for version in cross_check_versions:
1050
# scan to include needed parents.
1051
n1 = set(self.target.get_parents_with_ghosts(version))
1052
n2 = set(self.source.get_parents_with_ghosts(version))
1054
# FIXME TEST this check for cycles being introduced works
1055
# the logic is we have a cycle if in our graph we are an
1056
# ancestor of any of the n2 revisions.
1062
parent_ancestors = self.source.get_ancestry(parent)
1063
if version in parent_ancestors:
1064
raise errors.GraphCycleError([parent, version])
1065
# ensure this parent will be available later.
1066
new_parents = n2.difference(n1)
1067
needed_versions.update(new_parents.difference(this_versions))
1068
mismatched_versions.add(version)
1070
if not needed_versions and not cross_check_versions:
1072
full_list = topo_sort(self.source.get_graph())
1074
version_list = [i for i in full_list if (not self.target.has_version(i)
1075
and i in needed_versions)]
1078
for version_id in version_list:
1079
data_pos, data_size = self.source._index.get_position(version_id)
1080
records.append((version_id, data_pos, data_size))
1083
for version_id, lines, digest \
1084
in self.source._data.read_records_iter(records):
1085
options = self.source._index.get_options(version_id)
1086
parents = self.source._index.get_parents_with_ghosts(version_id)
1088
for parent in parents:
1089
# if source has the parent, we must hav grabbed it first.
1090
assert (self.target.has_version(parent) or not
1091
self.source.has_version(parent))
1093
if self.target.factory.annotated:
1094
# FIXME jrydberg: it should be possible to skip
1095
# re-annotating components if we know that we are
1096
# going to pull all revisions in the same order.
1097
new_version_id = version_id
1098
new_version_idx = self.target._index.num_versions()
1099
if 'fulltext' in options:
1100
lines = self.target._reannotate_fulltext(self.source, lines,
1101
new_version_id, new_version_idx)
1102
elif 'line-delta' in options:
1103
lines = self.target._reannotate_line_delta(self.source, lines,
1104
new_version_id, new_version_idx)
1107
pb.update("Joining knit", count, len(version_list))
1109
pos, size = self.target._data.add_record(version_id, digest, lines)
1110
self.target._index.add_version(version_id, options, pos, size, parents)
1112
for version in mismatched_versions:
1113
n1 = set(self.target.get_parents_with_ghosts(version))
1114
n2 = set(self.source.get_parents_with_ghosts(version))
1115
# write a combined record to our history preserving the current
1116
# parents as first in the list
1117
new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1118
self.target.fix_parents(version, new_parents)
1110
1125
InterVersionedFile.register_optimiser(InterKnit)