956
970
contents, digest).
959
class ContinuousRange:
960
def __init__(self, rec_id, pos, size):
962
self.end_pos = pos + size
963
self.versions = [(rec_id, pos, size)]
965
def add(self, rec_id, pos, size):
966
if self.end_pos != pos:
968
self.end_pos = pos + size
969
self.versions.append((rec_id, pos, size))
973
for rec_id, pos, size in self.versions:
974
yield rec_id, fp.read(size)
976
# We take it that the transport optimizes the fetching as good
977
# as possible (ie, reads continous ranges.)
978
response = self._transport.readv(self._filename,
979
[(pos, size) for version_id, pos, size in records])
981
for (record_id, pos, size), (pos, data) in izip(iter(records), response):
982
content, digest = self._parse_record(record_id, data)
983
yield record_id, content, digest
974
for version_id, pos, size in records:
975
if version_id not in self._records:
976
needed_records.append((version_id, pos, size))
978
if len(needed_records):
979
# We take it that the transport optimizes the fetching as good
980
# as possible (ie, reads continous ranges.)
981
response = self._transport.readv(self._filename,
982
[(pos, size) for version_id, pos, size in needed_records])
984
for (record_id, pos, size), (pos, data) in izip(iter(records), response):
985
content, digest = self._parse_record(record_id, data)
986
self._records[record_id] = (digest, content)
988
for version_id, pos, size in records:
989
yield version_id, copy(self._records[version_id][1]), copy(self._records[version_id][0])
985
991
def read_records(self, records):
986
992
"""Read records into a dictionary."""
1021
1027
if not version_ids:
1025
from bzrlib.progress import DummyProgress
1026
pb = DummyProgress()
1028
version_ids = list(version_ids)
1029
if None in version_ids:
1030
version_ids.remove(None)
1032
self.source_ancestry = set(self.source.get_ancestry(version_ids))
1033
this_versions = set(self.target._index.get_versions())
1034
needed_versions = self.source_ancestry - this_versions
1035
cross_check_versions = self.source_ancestry.intersection(this_versions)
1036
mismatched_versions = set()
1037
for version in cross_check_versions:
1038
# scan to include needed parents.
1039
n1 = set(self.target.get_parents_with_ghosts(version))
1040
n2 = set(self.source.get_parents_with_ghosts(version))
1042
# FIXME TEST this check for cycles being introduced works
1043
# the logic is we have a cycle if in our graph we are an
1044
# ancestor of any of the n2 revisions.
1050
parent_ancestors = self.source.get_ancestry(parent)
1051
if version in parent_ancestors:
1052
raise errors.GraphCycleError([parent, version])
1053
# ensure this parent will be available later.
1054
new_parents = n2.difference(n1)
1055
needed_versions.update(new_parents.difference(this_versions))
1056
mismatched_versions.add(version)
1058
if not needed_versions and not cross_check_versions:
1060
full_list = topo_sort(self.source.get_graph())
1062
version_list = [i for i in full_list if (not self.target.has_version(i)
1063
and i in needed_versions)]
1066
for version_id in version_list:
1067
data_pos, data_size = self.source._index.get_position(version_id)
1068
records.append((version_id, data_pos, data_size))
1071
for version_id, lines, digest \
1072
in self.source._data.read_records_iter(records):
1073
options = self.source._index.get_options(version_id)
1074
parents = self.source._index.get_parents_with_ghosts(version_id)
1076
for parent in parents:
1077
# if source has the parent, we must hav grabbed it first.
1078
assert (self.target.has_version(parent) or not
1079
self.source.has_version(parent))
1081
if self.target.factory.annotated:
1082
# FIXME jrydberg: it should be possible to skip
1083
# re-annotating components if we know that we are
1084
# going to pull all revisions in the same order.
1085
new_version_id = version_id
1086
new_version_idx = self.target._index.num_versions()
1087
if 'fulltext' in options:
1088
lines = self.target._reannotate_fulltext(self.source, lines,
1089
new_version_id, new_version_idx)
1090
elif 'line-delta' in options:
1091
lines = self.target._reannotate_line_delta(self.source, lines,
1092
new_version_id, new_version_idx)
1095
pb.update("Joining knit", count, len(version_list))
1097
pos, size = self.target._data.add_record(version_id, digest, lines)
1098
self.target._index.add_version(version_id, options, pos, size, parents)
1100
for version in mismatched_versions:
1101
n1 = set(self.target.get_parents_with_ghosts(version))
1102
n2 = set(self.source.get_parents_with_ghosts(version))
1103
# write a combined record to our history preserving the current
1104
# parents as first in the list
1105
new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1106
self.target.fix_parents(version, new_parents)
1030
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1032
version_ids = list(version_ids)
1033
if None in version_ids:
1034
version_ids.remove(None)
1036
self.source_ancestry = set(self.source.get_ancestry(version_ids))
1037
this_versions = set(self.target._index.get_versions())
1038
needed_versions = self.source_ancestry - this_versions
1039
cross_check_versions = self.source_ancestry.intersection(this_versions)
1040
mismatched_versions = set()
1041
for version in cross_check_versions:
1042
# scan to include needed parents.
1043
n1 = set(self.target.get_parents_with_ghosts(version))
1044
n2 = set(self.source.get_parents_with_ghosts(version))
1046
# FIXME TEST this check for cycles being introduced works
1047
# the logic is we have a cycle if in our graph we are an
1048
# ancestor of any of the n2 revisions.
1054
parent_ancestors = self.source.get_ancestry(parent)
1055
if version in parent_ancestors:
1056
raise errors.GraphCycleError([parent, version])
1057
# ensure this parent will be available later.
1058
new_parents = n2.difference(n1)
1059
needed_versions.update(new_parents.difference(this_versions))
1060
mismatched_versions.add(version)
1062
if not needed_versions and not cross_check_versions:
1064
full_list = topo_sort(self.source.get_graph())
1066
version_list = [i for i in full_list if (not self.target.has_version(i)
1067
and i in needed_versions)]
1070
for version_id in version_list:
1071
data_pos, data_size = self.source._index.get_position(version_id)
1072
records.append((version_id, data_pos, data_size))
1075
for version_id, lines, digest \
1076
in self.source._data.read_records_iter(records):
1077
options = self.source._index.get_options(version_id)
1078
parents = self.source._index.get_parents_with_ghosts(version_id)
1080
for parent in parents:
1081
# if source has the parent, we must hav grabbed it first.
1082
assert (self.target.has_version(parent) or not
1083
self.source.has_version(parent))
1085
if self.target.factory.annotated:
1086
# FIXME jrydberg: it should be possible to skip
1087
# re-annotating components if we know that we are
1088
# going to pull all revisions in the same order.
1089
new_version_id = version_id
1090
new_version_idx = self.target._index.num_versions()
1091
if 'fulltext' in options:
1092
lines = self.target._reannotate_fulltext(self.source, lines,
1093
new_version_id, new_version_idx)
1094
elif 'line-delta' in options:
1095
lines = self.target._reannotate_line_delta(self.source, lines,
1096
new_version_id, new_version_idx)
1099
pb.update("Joining knit", count, len(version_list))
1101
pos, size = self.target._data.add_record(version_id, digest, lines)
1102
self.target._index.add_version(version_id, options, pos, size, parents)
1104
for version in mismatched_versions:
1105
n1 = set(self.target.get_parents_with_ghosts(version))
1106
n2 = set(self.source.get_parents_with_ghosts(version))
1107
# write a combined record to our history preserving the current
1108
# parents as first in the list
1109
new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1110
self.target.fix_parents(version, new_parents)
1111
1117
InterVersionedFile.register_optimiser(InterKnit)