156
156
def parse_fulltext(self, content, version):
157
"""Convert fulltext to internal representation
159
fulltext content is of the format
160
revid(utf8) plaintext\n
161
internal representation is of the format:
158
165
for line in content:
159
166
origin, text = line.split(' ', 1)
160
lines.append((int(origin), text))
167
lines.append((origin.decode('utf-8'), text))
161
168
return KnitContent(lines)
163
170
def parse_line_delta_iter(self, lines):
171
"""Convert a line based delta into internal representation.
173
line delta is in the form of:
174
intstart intend intcount
176
revid(utf8) newline\n
177
internal represnetation is
178
(start, end, count, [1..count tuples (revid, newline)])
165
181
header = lines.pop(0)
166
182
start, end, c = [int(n) for n in header.split(',')]
168
184
for i in range(c):
169
185
origin, text = lines.pop(0).split(' ', 1)
170
contents.append((int(origin), text))
186
contents.append((origin.decode('utf-8'), text))
171
187
yield start, end, c, contents
173
189
def parse_line_delta(self, lines, version):
174
190
return list(self.parse_line_delta_iter(lines))
176
192
def lower_fulltext(self, content):
177
return ['%d %s' % (o, t) for o, t in content._lines]
193
"""convert a fulltext content record into a serializable form.
195
see parse_fulltext which this inverts.
197
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
179
199
def lower_line_delta(self, delta):
200
"""convert a delta into a serializable form.
202
See parse_line_delta_iter which this inverts.
181
205
for start, end, c, lines in delta:
182
206
out.append('%d,%d,%d\n' % (start, end, c))
183
207
for origin, text in lines:
184
out.append('%d %s' % (origin, text))
208
out.append('%s %s' % (origin.encode('utf-8'), text))
483
512
options.append('no-eol')
484
513
lines[-1] = lines[-1] + '\n'
486
lines = self.factory.make(lines, len(self._index))
487
if self.factory.annotated and len(ghostless_parents) > 0:
515
lines = self.factory.make(lines, version_id)
516
if self.factory.annotated and len(present_parents) > 0:
488
517
# Merge annotations from parent texts if so is needed.
489
self._merge_annotations(lines, ghostless_parents)
518
self._merge_annotations(lines, present_parents)
491
if len(ghostless_parents) and delta:
520
if len(present_parents) and delta:
492
521
# To speed the extract of texts the delta chain is limited
493
522
# to a fixed number of deltas. This should minimize both
494
523
# I/O and the time spend applying deltas.
496
delta_parents = ghostless_parents
525
delta_parents = present_parents
497
526
while count < 25:
498
527
parent = delta_parents[0]
499
528
method = self._index.get_method(parent)
617
646
self._check_versions_present(versions)
618
647
return self._index.get_ancestry_with_ghosts(versions)
620
def _reannotate_line_delta(self, other, lines, new_version_id,
622
"""Re-annotate line-delta and return new delta."""
624
for start, end, count, contents \
625
in self.factory.parse_line_delta_iter(lines):
627
for origin, line in contents:
628
old_version_id = other._index.idx_to_name(origin)
629
if old_version_id == new_version_id:
630
idx = new_version_idx
632
idx = self._index.lookup(old_version_id)
633
new_lines.append((idx, line))
634
new_delta.append((start, end, count, new_lines))
636
return self.factory.lower_line_delta(new_delta)
638
def _reannotate_fulltext(self, other, lines, new_version_id,
640
"""Re-annotate fulltext and return new version."""
641
content = self.factory.parse_fulltext(lines, new_version_idx)
643
for origin, line in content.annotate_iter():
644
old_version_id = other._index.idx_to_name(origin)
645
if old_version_id == new_version_id:
646
idx = new_version_idx
648
idx = self._index.lookup(old_version_id)
649
new_lines.append((idx, line))
651
return self.factory.lower_fulltext(KnitContent(new_lines))
653
649
#@deprecated_method(zero_eight)
654
650
def walk(self, version_ids):
655
651
"""See VersionedFile.walk."""
857
853
"""Add a version record to the index."""
858
854
self._cache_version(version_id, options, pos, size, parents)
860
content = "%s %s %s %s %s\n" % (version_id,
856
content = "%s %s %s %s %s\n" % (version_id.encode('utf-8'),
861
857
','.join(options),
864
860
self._version_list_to_index(parents))
861
assert isinstance(content, str), 'content must be utf-8 encoded'
865
862
self._transport.append(self._filename, StringIO(content))
867
864
def has_version(self, version_id):
930
927
return self._file
932
def add_record(self, version_id, digest, lines):
933
"""Write new text record to disk. Returns the position in the
934
file where it was written."""
929
def _record_to_data(self, version_id, digest, lines):
930
"""Convert version_id, digest, lines into a raw data block.
932
:return: (len, a StringIO instance with the raw data ready to read.)
936
935
data_file = GzipFile(None, mode='wb', fileobj=sio)
937
print >>data_file, "version %s %d %s" % (version_id, len(lines), digest)
936
print >>data_file, "version %s %d %s" % (version_id.encode('utf-8'), len(lines), digest)
938
937
data_file.writelines(lines)
939
print >>data_file, "end %s\n" % version_id
938
print >>data_file, "end %s\n" % version_id.encode('utf-8')
940
939
data_file.close()
944
def add_raw_record(self, raw_data):
945
"""Append a prepared record to the data file."""
946
assert isinstance(raw_data, str), 'data must be plain bytes'
947
start_pos = self._transport.append(self._filename, StringIO(raw_data))
948
return start_pos, len(raw_data)
950
def add_record(self, version_id, digest, lines):
951
"""Write new text record to disk. Returns the position in the
952
file where it was written."""
953
size, sio = self._record_to_data(version_id, digest, lines)
943
955
self._records[version_id] = (digest, lines)
945
content = sio.getvalue()
947
957
start_pos = self._transport.append(self._filename, sio)
948
return start_pos, len(content)
950
def _parse_record(self, version_id, data):
951
df = GzipFile(mode='rb', fileobj=StringIO(data))
958
return start_pos, size
960
def _parse_record_header(self, version_id, raw_data):
961
"""Parse a record header for consistency.
963
:return: the header and the decompressor stream.
964
as (stream, header_record)
966
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
952
967
rec = df.readline().split()
953
968
if len(rec) != 4:
954
raise KnitCorrupt(self._filename, 'unexpected number of records')
955
if rec[1] != version_id:
969
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
970
if rec[1].decode('utf-8')!= version_id:
956
971
raise KnitCorrupt(self._filename,
957
972
'unexpected version, wanted %r, got %r' % (
958
973
version_id, rec[1]))
976
def _parse_record(self, version_id, data):
977
df, rec = self._parse_record_header(version_id, data)
959
978
lines = int(rec[2])
960
979
record_contents = self._read_record_contents(df, lines)
961
980
l = df.readline()
962
if l != 'end %s\n' % version_id:
981
if l.decode('utf-8') != 'end %s\n' % version_id:
963
982
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
964
983
% (l, version_id))
965
985
return record_contents, rec[3]
967
987
def _read_record_contents(self, df, record_lines):
971
991
r.append(df.readline())
994
def read_records_iter_raw(self, records):
995
"""Read text records from data file and yield raw data.
997
This unpacks enough of the text record to validate the id is
998
as expected but thats all.
1000
It will actively recompress currently cached records on the
1001
basis that that is cheaper than I/O activity.
1004
for version_id, pos, size in records:
1005
if version_id not in self._records:
1006
needed_records.append((version_id, pos, size))
1008
# setup an iterator of the external records:
1009
# uses readv so nice and fast we hope.
1010
if len(needed_records):
1011
# grab the disk data needed.
1012
raw_records = self._transport.readv(self._filename,
1013
[(pos, size) for version_id, pos, size in needed_records])
1015
for version_id, pos, size in records:
1016
if version_id in self._records:
1017
# compress a new version
1018
size, sio = self._record_to_data(version_id,
1019
self._records[version_id][0],
1020
self._records[version_id][1])
1021
yield version_id, sio.getvalue()
1023
pos, data = raw_records.next()
1024
# validate the header
1025
df, rec = self._parse_record_header(version_id, data)
1027
yield version_id, data
974
1030
def read_records_iter(self, records):
975
1031
"""Read text records from data file and yield result.
1075
1131
version_list = [i for i in full_list if (not self.target.has_version(i)
1076
1132
and i in needed_versions)]
1136
copy_queue_records = []
1079
1138
for version_id in version_list:
1080
data_pos, data_size = self.source._index.get_position(version_id)
1081
records.append((version_id, data_pos, data_size))
1084
for version_id, lines, digest \
1085
in self.source._data.read_records_iter(records):
1086
1139
options = self.source._index.get_options(version_id)
1087
1140
parents = self.source._index.get_parents_with_ghosts(version_id)
1141
# check that its will be a consistent copy:
1089
1142
for parent in parents:
1090
# if source has the parent, we must hav grabbed it first.
1091
assert (self.target.has_version(parent) or not
1092
self.source.has_version(parent))
1094
if self.target.factory.annotated:
1095
# FIXME jrydberg: it should be possible to skip
1096
# re-annotating components if we know that we are
1097
# going to pull all revisions in the same order.
1098
new_version_id = version_id
1099
new_version_idx = self.target._index.num_versions()
1100
if 'fulltext' in options:
1101
lines = self.target._reannotate_fulltext(self.source, lines,
1102
new_version_id, new_version_idx)
1103
elif 'line-delta' in options:
1104
lines = self.target._reannotate_line_delta(self.source, lines,
1105
new_version_id, new_version_idx)
1143
# if source has the parent, we must :
1144
# * already have it or
1145
# * have it scheduled already
1146
# otherwise we dont care
1147
assert (self.target.has_version(parent) or
1148
parent in copy_set or
1149
not self.source.has_version(parent))
1150
data_pos, data_size = self.source._index.get_position(version_id)
1151
copy_queue_records.append((version_id, data_pos, data_size))
1152
copy_queue.append((version_id, options, parents))
1153
copy_set.add(version_id)
1155
# data suck the join:
1157
total = len(version_list)
1158
# we want the raw gzip for bulk copying, but the record validated
1159
# just enough to be sure its the right one.
1160
# TODO: consider writev or write combining to reduce
1161
# death of a thousand cuts feeling.
1162
for (version_id, raw_data), \
1163
(version_id2, options, parents) in \
1164
izip(self.source._data.read_records_iter_raw(copy_queue_records),
1166
assert version_id == version_id2, 'logic error, inconsistent results'
1107
1167
count = count + 1
1108
pb.update("Joining knit", count, len(version_list))
1110
pos, size = self.target._data.add_record(version_id, digest, lines)
1168
pb.update("Joining knit", count, total)
1169
pos, size = self.target._data.add_raw_record(raw_data)
1111
1170
self.target._index.add_version(version_id, options, pos, size, parents)
1113
1172
for version in mismatched_versions:
1173
# FIXME RBC 20060309 is this needed?
1114
1174
n1 = set(self.target.get_parents_with_ghosts(version))
1115
1175
n2 = set(self.source.get_parents_with_ghosts(version))
1116
1176
# write a combined record to our history preserving the current