926
926
return self._file
928
def add_record(self, version_id, digest, lines):
929
"""Write new text record to disk. Returns the position in the
930
file where it was written."""
928
def _record_to_data(self, version_id, digest, lines):
929
"""Convert version_id, digest, lines into a raw data block.
931
:return: (len, a StringIO instance with the raw data ready to read.)
932
934
data_file = GzipFile(None, mode='wb', fileobj=sio)
933
935
print >>data_file, "version %s %d %s" % (version_id, len(lines), digest)
934
936
data_file.writelines(lines)
935
937
print >>data_file, "end %s\n" % version_id
936
938
data_file.close()
943
def add_raw_record(self, raw_data):
944
"""Append a prepared record to the data file."""
945
start_pos = self._transport.append(self._filename, StringIO(raw_data))
946
return start_pos, len(raw_data)
948
def add_record(self, version_id, digest, lines):
949
"""Write new text record to disk. Returns the position in the
950
file where it was written."""
951
size, sio = self._record_to_data(version_id, digest, lines)
939
953
self._records[version_id] = (digest, lines)
941
content = sio.getvalue()
943
955
start_pos = self._transport.append(self._filename, sio)
944
return start_pos, len(content)
946
def _parse_record(self, version_id, data):
947
df = GzipFile(mode='rb', fileobj=StringIO(data))
956
return start_pos, size
958
def _parse_record_header(self, version_id, raw_data):
959
"""Parse a record header for consistency.
961
:return: the header and the decompressor stream.
962
as (stream, header_record)
964
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
948
965
rec = df.readline().split()
949
966
if len(rec) != 4:
950
raise KnitCorrupt(self._filename, 'unexpected number of records')
967
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
951
968
if rec[1] != version_id:
952
969
raise KnitCorrupt(self._filename,
953
970
'unexpected version, wanted %r, got %r' % (
954
971
version_id, rec[1]))
974
def _parse_record(self, version_id, data):
975
df, rec = self._parse_record_header(version_id, data)
955
976
lines = int(rec[2])
956
977
record_contents = self._read_record_contents(df, lines)
957
978
l = df.readline()
958
979
if l != 'end %s\n' % version_id:
959
980
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
960
981
% (l, version_id))
961
983
return record_contents, rec[3]
963
985
def _read_record_contents(self, df, record_lines):
967
989
r.append(df.readline())
992
def read_records_iter_raw(self, records):
993
"""Read text records from data file and yield raw data.
995
This unpacks enough of the text record to validate the id is
996
as expected but thats all.
998
It will actively recompress currently cached records on the
999
basis that that is cheaper than I/O activity.
1002
for version_id, pos, size in records:
1003
if version_id not in self._records:
1004
needed_records.append((version_id, pos, size))
1006
# setup an iterator of the external records:
1007
# uses readv so nice and fast we hope.
1008
if len(needed_records):
1009
# grab the disk data needed.
1010
raw_records = self._transport.readv(self._filename,
1011
[(pos, size) for version_id, pos, size in needed_records])
1013
for version_id, pos, size in records:
1014
if version_id in self._records:
1015
# compress a new version
1016
size, sio = self._record_to_data(version_id,
1017
self._records[version_id][0],
1018
self._records[version_id][1])
1019
yield version_id, sio.getvalue()
1021
pos, data = raw_records.next()
1022
# validate the header
1023
df, rec = self._parse_record_header(version_id, data)
1025
yield version_id, data
970
1028
def read_records_iter(self, records):
971
1029
"""Read text records from data file and yield result.
1071
1129
version_list = [i for i in full_list if (not self.target.has_version(i)
1072
1130
and i in needed_versions)]
1134
copy_queue_records = []
1075
1136
for version_id in version_list:
1076
data_pos, data_size = self.source._index.get_position(version_id)
1077
records.append((version_id, data_pos, data_size))
1080
for version_id, lines, digest \
1081
in self.source._data.read_records_iter(records):
1082
1137
options = self.source._index.get_options(version_id)
1083
1138
parents = self.source._index.get_parents_with_ghosts(version_id)
1139
# check that its will be a consistent copy:
1085
1140
for parent in parents:
1086
# if source has the parent, we must hav grabbed it first.
1087
assert (self.target.has_version(parent) or not
1088
self.source.has_version(parent))
1141
# if source has the parent, we must :
1142
# * already have it or
1143
# * have it scheduled already
1144
# otherwise we dont care
1145
assert (self.target.has_version(parent) or
1146
parent in copy_set or
1147
not self.source.has_version(parent))
1148
data_pos, data_size = self.source._index.get_position(version_id)
1149
copy_queue_records.append((version_id, data_pos, data_size))
1150
copy_queue.append((version_id, options, parents))
1151
copy_set.add(version_id)
1153
# data suck the join:
1155
total = len(version_list)
1156
# we want the raw gzip for bulk copying, but the record validated
1157
# just enough to be sure its the right one.
1158
# TODO: consider writev or write combining to reduce
1159
# death of a thousand cuts feeling.
1160
for (version_id, raw_data), \
1161
(version_id2, options, parents) in \
1162
izip(self.source._data.read_records_iter_raw(copy_queue_records),
1164
assert version_id == version_id2, 'logic error, inconsistent results'
1090
1165
count = count + 1
1091
pb.update("Joining knit", count, len(version_list))
1093
pos, size = self.target._data.add_record(version_id, digest, lines)
1166
pb.update("Joining knit", count, total)
1167
pos, size = self.target._data.add_raw_record(raw_data)
1094
1168
self.target._index.add_version(version_id, options, pos, size, parents)
1096
1170
for version in mismatched_versions:
1171
# FIXME RBC 20060309 is this needed?
1097
1172
n1 = set(self.target.get_parents_with_ghosts(version))
1098
1173
n2 = set(self.source.get_parents_with_ghosts(version))
1099
1174
# write a combined record to our history preserving the current