81
from bzrlib.errors import (
89
RevisionAlreadyPresent,
77
import bzrlib.errors as errors
78
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
79
InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
80
RevisionNotPresent, RevisionAlreadyPresent
91
81
from bzrlib.tuned_gzip import GzipFile
92
82
from bzrlib.trace import mutter
93
from bzrlib.osutils import (
83
from bzrlib.osutils import contains_whitespace, contains_linebreaks, \
85
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
98
86
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
99
87
from bzrlib.tsort import topo_sort
101
88
import bzrlib.weave
102
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
105
91
# TODO: Split out code specific to this format into an associated object.
136
123
def line_delta_iter(self, new_lines):
137
124
"""Generate line-based delta from this content to new_lines."""
138
new_texts = new_lines.text()
139
old_texts = self.text()
125
new_texts = [text for origin, text in new_lines._lines]
126
old_texts = [text for origin, text in self._lines]
140
127
s = KnitSequenceMatcher(None, old_texts, new_texts)
141
for tag, i1, i2, j1, j2 in s.get_opcodes():
128
for op in s.get_opcodes():
144
# ofrom, oto, length, data
145
yield i1, i2, j2 - j1, new_lines._lines[j1:j2]
131
# ofrom oto length data
132
yield (op[1], op[2], op[4]-op[3], new_lines._lines[op[3]:op[4]])
147
134
def line_delta(self, new_lines):
148
135
return list(self.line_delta_iter(new_lines))
175
162
internal representation is of the format:
176
163
(revid, plaintext)
178
# TODO: jam 20070209 The tests expect this to be returned as tuples,
179
# but the code itself doesn't really depend on that.
180
# Figure out a way to not require the overhead of turning the
181
# list back into tuples.
182
lines = [tuple(line.split(' ', 1)) for line in content]
167
origin, text = line.split(' ', 1)
168
lines.append((origin.decode('utf-8'), text))
183
169
return KnitContent(lines)
185
171
def parse_line_delta_iter(self, lines):
186
return iter(self.parse_line_delta(lines))
172
for result_item in self.parse_line_delta[lines]:
188
def parse_line_delta(self, lines, version_id):
175
def parse_line_delta(self, lines, version):
189
176
"""Convert a line based delta into internal representation.
191
178
line delta is in the form of:
199
186
lines = iter(lines)
200
187
next = lines.next
203
def cache_and_return(line):
204
origin, text = line.split(' ', 1)
205
return cache.setdefault(origin, origin), text
207
188
# walk through the lines parsing.
208
189
for header in lines:
209
190
start, end, count = [int(n) for n in header.split(',')]
210
contents = [tuple(next().split(' ', 1)) for i in xrange(count)]
194
origin, text = next().split(' ', 1)
196
contents.append((origin.decode('utf-8'), text))
211
197
result.append((start, end, count, contents))
214
def get_fulltext_content(self, lines):
215
"""Extract just the content lines from a fulltext."""
216
return (line.split(' ', 1)[1] for line in lines)
218
def get_linedelta_content(self, lines):
219
"""Extract just the content from a line delta.
221
This doesn't return all of the extra information stored in a delta.
222
Only the actual content lines.
227
header = header.split(',')
228
count = int(header[2])
229
for i in xrange(count):
230
origin, text = next().split(' ', 1)
233
200
def lower_fulltext(self, content):
234
201
"""convert a fulltext content record into a serializable form.
236
203
see parse_fulltext which this inverts.
238
# TODO: jam 20070209 We only do the caching thing to make sure that
239
# the origin is a valid utf-8 line, eventually we could remove it
240
return ['%s %s' % (o, t) for o, t in content._lines]
205
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
242
207
def lower_line_delta(self, delta):
243
208
"""convert a delta into a serializable form.
245
210
See parse_line_delta which this inverts.
247
# TODO: jam 20070209 We only do the caching thing to make sure that
248
# the origin is a valid utf-8 line, eventually we could remove it
250
213
for start, end, c, lines in delta:
251
214
out.append('%d,%d,%d\n' % (start, end, c))
252
out.extend(origin + ' ' + text
253
for origin, text in lines)
215
for origin, text in lines:
216
out.append('%s %s' % (origin.encode('utf-8'), text))
260
223
annotated = False
262
def parse_fulltext(self, content, version_id):
225
def parse_fulltext(self, content, version):
263
226
"""This parses an unannotated fulltext.
265
228
Note that this is not a noop - the internal representation
266
229
has (versionid, line) - its just a constant versionid.
268
return self.make(content, version_id)
231
return self.make(content, version)
270
def parse_line_delta_iter(self, lines, version_id):
272
num_lines = len(lines)
273
while cur < num_lines:
233
def parse_line_delta_iter(self, lines, version):
235
header = lines.pop(0)
276
236
start, end, c = [int(n) for n in header.split(',')]
277
yield start, end, c, zip([version_id] * c, lines[cur:cur+c])
280
def parse_line_delta(self, lines, version_id):
281
return list(self.parse_line_delta_iter(lines, version_id))
283
def get_fulltext_content(self, lines):
284
"""Extract just the content lines from a fulltext."""
287
def get_linedelta_content(self, lines):
288
"""Extract just the content from a line delta.
290
This doesn't return all of the extra information stored in a delta.
291
Only the actual content lines.
296
header = header.split(',')
297
count = int(header[2])
298
for i in xrange(count):
237
yield start, end, c, zip([version] * c, lines[:c])
240
def parse_line_delta(self, lines, version):
241
return list(self.parse_line_delta_iter(lines, version))
301
243
def lower_fulltext(self, content):
302
244
return content.text()
330
272
stored and retrieved.
333
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
275
def __init__(self, relpath, transport, file_mode=None, access_mode=None,
334
276
factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
335
create=False, create_parent_dir=False, delay_create=False,
337
278
"""Construct a knit at location specified by relpath.
339
280
:param create: If not True, only open an existing knit.
340
:param create_parent_dir: If True, create the parent directory if
341
creating the file fails. (This is used for stores with
342
hash-prefixes that may not exist yet)
343
:param delay_create: The calling code is aware that the knit won't
344
actually be created until the first data is stored.
346
282
if deprecated_passed(basis_knit):
347
283
warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
357
293
self.writable = (access_mode == 'w')
358
294
self.delta = delta
360
self._max_delta_chain = 200
362
296
self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
363
access_mode, create=create, file_mode=file_mode,
364
create_parent_dir=create_parent_dir, delay_create=delay_create,
297
access_mode, create=create, file_mode=file_mode)
366
298
self._data = _KnitData(transport, relpath + DATA_SUFFIX,
367
access_mode, create=create and not len(self), file_mode=file_mode,
368
create_parent_dir=create_parent_dir, delay_create=delay_create,
299
access_mode, create=create and not len(self), file_mode=file_mode)
371
301
def __repr__(self):
372
302
return '%s(%s)' % (self.__class__.__name__,
373
303
self.transport.abspath(self.filename))
375
def _check_should_delta(self, first_parents):
376
"""Iterate back through the parent listing, looking for a fulltext.
378
This is used when we want to decide whether to add a delta or a new
379
fulltext. It searches for _max_delta_chain parents. When it finds a
380
fulltext parent, it sees if the total size of the deltas leading up to
381
it is large enough to indicate that we want a new full text anyway.
383
Return True if we should create a new delta, False if we should use a
388
delta_parents = first_parents
389
for count in xrange(self._max_delta_chain):
390
parent = delta_parents[0]
391
method = self._index.get_method(parent)
392
pos, size = self._index.get_position(parent)
393
if method == 'fulltext':
397
delta_parents = self._index.get_parents(parent)
399
# We couldn't find a fulltext, so we must create a new one
402
return fulltext_size > delta_size
404
305
def _add_delta(self, version_id, parents, delta_parent, sha1, noeol, delta):
405
306
"""See VersionedFile._add_delta()."""
406
307
self._check_add(version_id, []) # should we check the lines ?
488
388
"""See VersionedFile.copy_to()."""
489
389
# copy the current index to a temp index to avoid racing with local
491
transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
492
self.transport.get(self._index._filename))
391
transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
493
392
# copy the data file
494
393
f = self._data._open_file()
496
transport.put_file(name + DATA_SUFFIX, f)
395
transport.put(name + DATA_SUFFIX, f)
499
398
# move the copied index into place
500
399
transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
502
401
def create_empty(self, name, transport, mode=None):
503
return KnitVersionedFile(name, transport, factory=self.factory,
504
delta=self.delta, create=True)
402
return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
506
def _fix_parents(self, version_id, new_parents):
404
def _fix_parents(self, version, new_parents):
507
405
"""Fix the parents list for version.
509
407
This is done by appending a new version to the index
867
771
text_map[version_id] = text
868
772
return text_map, final_content
870
def iter_lines_added_or_present_in_versions(self, version_ids=None,
774
def iter_lines_added_or_present_in_versions(self, version_ids=None):
872
775
"""See VersionedFile.iter_lines_added_or_present_in_versions()."""
873
776
if version_ids is None:
874
777
version_ids = self.versions()
876
version_ids = [osutils.safe_revision_id(v) for v in version_ids]
878
pb = progress.DummyProgress()
879
778
# we don't care about inclusions, the caller cares.
880
779
# but we need to setup a list of records to visit.
881
780
# we need version_id, position, length
882
781
version_id_records = []
883
requested_versions = set(version_ids)
782
requested_versions = list(version_ids)
884
783
# filter for available versions
885
784
for version_id in requested_versions:
886
785
if not self.has_version(version_id):
887
786
raise RevisionNotPresent(version_id, self.filename)
888
787
# get a in-component-order queue:
889
789
for version_id in self.versions():
890
790
if version_id in requested_versions:
791
version_ids.append(version_id)
891
792
data_pos, length = self._index.get_position(version_id)
892
793
version_id_records.append((version_id, data_pos, length))
795
pb = bzrlib.ui.ui_factory.nested_progress_bar()
894
797
total = len(version_id_records)
895
for version_idx, (version_id, data, sha_value) in \
896
enumerate(self._data.read_records_iter(version_id_records)):
897
pb.update('Walking content.', version_idx, total)
898
method = self._index.get_method(version_id)
900
assert method in ('fulltext', 'line-delta')
901
if method == 'fulltext':
902
line_iterator = self.factory.get_fulltext_content(data)
904
line_iterator = self.factory.get_linedelta_content(data)
905
for line in line_iterator:
908
pb.update('Walking content.', total, total)
799
pb.update('Walking content.', count, total)
800
for version_id, data, sha_value in \
801
self._data.read_records_iter(version_id_records):
802
pb.update('Walking content.', count, total)
803
method = self._index.get_method(version_id)
804
version_idx = self._index.lookup(version_id)
805
assert method in ('fulltext', 'line-delta')
806
if method == 'fulltext':
807
content = self.factory.parse_fulltext(data, version_idx)
808
for line in content.text():
811
delta = self.factory.parse_line_delta(data, version_idx)
812
for start, end, count, lines in delta:
813
for origin, line in lines:
816
pb.update('Walking content.', total, total)
819
pb.update('Walking content.', total, total)
910
823
def num_versions(self):
911
824
"""See VersionedFile.num_versions()."""
1023
931
class _KnitComponentFile(object):
1024
932
"""One of the files used to implement a knit database"""
1026
def __init__(self, transport, filename, mode, file_mode=None,
1027
create_parent_dir=False, dir_mode=None):
934
def __init__(self, transport, filename, mode, file_mode=None):
1028
935
self._transport = transport
1029
936
self._filename = filename
1030
937
self._mode = mode
1031
self._file_mode = file_mode
1032
self._dir_mode = dir_mode
1033
self._create_parent_dir = create_parent_dir
1034
self._need_to_create = False
938
self._file_mode=file_mode
1036
def _full_path(self):
1037
"""Return the full path to this file."""
1038
return self._transport.base + self._filename
940
def write_header(self):
941
if self._transport.append(self._filename, StringIO(self.HEADER),
942
mode=self._file_mode):
943
raise KnitCorrupt(self._filename, 'misaligned after writing header')
1040
945
def check_header(self, fp):
1041
946
line = fp.readline()
1043
# An empty file can actually be treated as though the file doesn't
1045
raise errors.NoSuchFile(self._full_path())
1046
947
if line != self.HEADER:
1047
raise KnitHeaderError(badline=line,
1048
filename=self._transport.abspath(self._filename))
948
raise KnitHeaderError(badline=line)
1050
950
def commit(self):
1051
951
"""Commit is a nop."""
1126
1026
self._history.append(version_id)
1128
1028
index = self._cache[version_id][5]
1129
self._cache[version_id] = (version_id,
1029
self._cache[version_id] = (version_id,
1136
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1137
create_parent_dir=False, delay_create=False, dir_mode=None):
1138
_KnitComponentFile.__init__(self, transport, filename, mode,
1139
file_mode=file_mode,
1140
create_parent_dir=create_parent_dir,
1036
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1037
_KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
1142
1038
self._cache = {}
1143
1039
# position in _history is the 'official' index for a revision
1144
1040
# but the values may have come from a newer entry.
1145
1041
# so - wc -l of a knit index is != the number of unique names
1147
1043
self._history = []
1044
pb = bzrlib.ui.ui_factory.nested_progress_bar()
1149
fp = self._transport.get(self._filename)
1151
# _load_data may raise NoSuchFile if the target knit is
1153
_load_data(self, fp)
1157
if mode != 'w' or not create:
1160
self._need_to_create = True
1049
pb.update('read knit index', count, total)
1050
fp = self._transport.get(self._filename)
1052
self.check_header(fp)
1053
# readlines reads the whole file at once:
1054
# bad for transports like http, good for local disk
1055
# we save 60 ms doing this one change (
1056
# from calling readline each time to calling
1058
# probably what we want for nice behaviour on
1059
# http is a incremental readlines that yields, or
1060
# a check for local vs non local indexes,
1061
for l in fp.readlines():
1063
if len(rec) < 5 or rec[-1] != ':':
1065
# FIXME: in the future we should determine if its a
1066
# short write - and ignore it
1067
# or a different failure, and raise. RBC 20060407
1071
#pb.update('read knit index', count, total)
1072
# See self._parse_parents
1074
for value in rec[4:-1]:
1076
# uncompressed reference
1077
parents.append(value[1:])
1079
# this is 15/4000ms faster than isinstance,
1081
# this function is called thousands of times a
1082
# second so small variations add up.
1083
assert value.__class__ is str
1084
parents.append(self._history[int(value)])
1085
# end self._parse_parents
1086
# self._cache_version(rec[0],
1087
# rec[1].split(','),
1091
# --- self._cache_version
1092
# only want the _history index to reference the 1st
1093
# index entry for version_id
1095
if version_id not in self._cache:
1096
index = len(self._history)
1097
self._history.append(version_id)
1099
index = self._cache[version_id][5]
1100
self._cache[version_id] = (version_id,
1106
# --- self._cache_version
1109
except NoSuchFile, e:
1110
if mode != 'w' or not create:
1114
pb.update('read knit index', total, total)
1117
def _parse_parents(self, compressed_parents):
1118
"""convert a list of string parent values into version ids.
1120
ints are looked up in the index.
1121
.FOO values are ghosts and converted in to FOO.
1123
NOTE: the function is retained here for clarity, and for possible
1124
use in partial index reads. However bulk processing now has
1125
it inlined in __init__ for inner-loop optimisation.
1128
for value in compressed_parents:
1129
if value[-1] == '.':
1130
# uncompressed reference
1131
result.append(value[1:])
1162
self._transport.put_bytes_non_atomic(
1163
self._filename, self.HEADER, mode=self._file_mode)
1133
# this is 15/4000ms faster than isinstance,
1134
# this function is called thousands of times a
1135
# second so small variations add up.
1136
assert value.__class__ is str
1137
result.append(self._history[int(value)])
1165
1140
def get_graph(self):
1166
return [(vid, idx[4]) for vid, idx in self._cache.iteritems()]
1142
for version_id, index in self._cache.iteritems():
1143
graph.append((version_id, index[4]))
1168
def get_ancestry(self, versions, topo_sorted=True):
1146
def get_ancestry(self, versions):
1169
1147
"""See VersionedFile.get_ancestry."""
1170
1148
# get a graph of all the mentioned versions:
1172
1150
pending = set(versions)
1175
1152
version = pending.pop()
1153
parents = self._cache[version][4]
1154
# got the parents ok
1178
parents = [p for p in cache[version][4] if p in cache]
1180
raise RevisionNotPresent(version, self._filename)
1181
# if not completed and not a ghost
1182
pending.update([p for p in parents if p not in graph])
1156
parents = [parent for parent in parents if parent in self._cache]
1157
for parent in parents:
1158
# if not completed and not a ghost
1159
if parent not in graph:
1183
1161
graph[version] = parents
1186
1162
return topo_sort(graph.items())
1188
1164
def get_ancestry_with_ghosts(self, versions):
1189
1165
"""See VersionedFile.get_ancestry_with_ghosts."""
1190
1166
# get a graph of all the mentioned versions:
1191
self.check_versions_present(versions)
1194
1168
pending = set(versions)
1196
1170
version = pending.pop()
1198
parents = cache[version][4]
1172
parents = self._cache[version][4]
1199
1173
except KeyError:
1200
1174
# ghost, fake it
1201
1175
graph[version] = []
1204
pending.update([p for p in parents if p not in graph])
1178
# got the parents ok
1179
for parent in parents:
1180
if parent not in graph:
1205
1182
graph[version] = parents
1206
1183
return topo_sort(graph.items())
1243
1219
(version_id, options, pos, size, parents).
1246
orig_history = self._history[:]
1247
orig_cache = self._cache.copy()
1250
for version_id, options, pos, size, parents in versions:
1251
line = "\n%s %s %s %s %s :" % (version_id,
1255
self._version_list_to_index(parents))
1256
assert isinstance(line, str), \
1257
'content must be utf-8 encoded: %r' % (line,)
1259
self._cache_version(version_id, options, pos, size, parents)
1260
if not self._need_to_create:
1261
self._transport.append_bytes(self._filename, ''.join(lines))
1264
sio.write(self.HEADER)
1265
sio.writelines(lines)
1267
self._transport.put_file_non_atomic(self._filename, sio,
1268
create_parent_dir=self._create_parent_dir,
1269
mode=self._file_mode,
1270
dir_mode=self._dir_mode)
1271
self._need_to_create = False
1273
# If any problems happen, restore the original values and re-raise
1274
self._history = orig_history
1275
self._cache = orig_cache
1222
for version_id, options, pos, size, parents in versions:
1223
line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
1227
self._version_list_to_index(parents))
1228
assert isinstance(line, str), \
1229
'content must be utf-8 encoded: %r' % (line,)
1231
self._transport.append(self._filename, StringIO(''.join(lines)))
1232
# cache after writing, so that a failed write leads to missing cache
1233
# entries not extra ones. XXX TODO: RBC 20060502 in the event of a
1234
# failure, reload the index or flush it or some such, to prevent
1235
# writing records that did complete twice.
1236
for version_id, options, pos, size, parents in versions:
1237
self._cache_version(version_id, options, pos, size, parents)
1278
1239
def has_version(self, version_id):
1279
1240
"""True if the version is in the index."""
1280
return version_id in self._cache
1241
return self._cache.has_key(version_id)
1282
1243
def get_position(self, version_id):
1283
1244
"""Return data position and size of specified version."""
1284
entry = self._cache[version_id]
1285
return entry[2], entry[3]
1245
return (self._cache[version_id][2], \
1246
self._cache[version_id][3])
1287
1248
def get_method(self, version_id):
1288
1249
"""Return compression method of specified version."""
1309
1269
def check_versions_present(self, version_ids):
1310
1270
"""Check that all specified versions are present."""
1312
for version_id in version_ids:
1313
if version_id not in cache:
1314
raise RevisionNotPresent(version_id, self._filename)
1271
version_ids = set(version_ids)
1272
for version_id in list(version_ids):
1273
if version_id in self._cache:
1274
version_ids.remove(version_id)
1276
raise RevisionNotPresent(list(version_ids)[0], self.filename)
1317
1279
class _KnitData(_KnitComponentFile):
1318
1280
"""Contents of the knit data file"""
1320
def __init__(self, transport, filename, mode, create=False, file_mode=None,
1321
create_parent_dir=False, delay_create=False,
1323
_KnitComponentFile.__init__(self, transport, filename, mode,
1324
file_mode=file_mode,
1325
create_parent_dir=create_parent_dir,
1282
HEADER = "# bzr knit data 8\n"
1284
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
_KnitComponentFile.__init__(self, transport, filename, mode)
1327
1286
self._checked = False
1328
# TODO: jam 20060713 conceptually, this could spill to disk
1329
# if the cached size gets larger than a certain amount
1330
# but it complicates the model a bit, so for now just use
1331
# a simple dictionary
1333
self._do_cache = False
1336
self._need_to_create = create
1338
self._transport.put_bytes_non_atomic(self._filename, '',
1339
mode=self._file_mode)
1341
def enable_cache(self):
1342
"""Enable caching of reads."""
1343
self._do_cache = True
1288
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1345
1290
def clear_cache(self):
1346
1291
"""Clear the record cache."""
1347
self._do_cache = False
1350
1294
def _open_file(self):
1381
1323
:return: the offset in the data file raw_data was written.
1383
1325
assert isinstance(raw_data, str), 'data must be plain bytes'
1384
if not self._need_to_create:
1385
return self._transport.append_bytes(self._filename, raw_data)
1387
self._transport.put_bytes_non_atomic(self._filename, raw_data,
1388
create_parent_dir=self._create_parent_dir,
1389
mode=self._file_mode,
1390
dir_mode=self._dir_mode)
1391
self._need_to_create = False
1326
return self._transport.append(self._filename, StringIO(raw_data))
1394
1328
def add_record(self, version_id, digest, lines):
1395
1329
"""Write new text record to disk. Returns the position in the
1396
1330
file where it was written."""
1397
1331
size, sio = self._record_to_data(version_id, digest, lines)
1398
1332
# write to disk
1399
if not self._need_to_create:
1400
start_pos = self._transport.append_file(self._filename, sio)
1402
self._transport.put_file_non_atomic(self._filename, sio,
1403
create_parent_dir=self._create_parent_dir,
1404
mode=self._file_mode,
1405
dir_mode=self._dir_mode)
1406
self._need_to_create = False
1409
self._cache[version_id] = sio.getvalue()
1333
start_pos = self._transport.append(self._filename, sio)
1410
1334
return start_pos, size
1412
1336
def _parse_record_header(self, version_id, raw_data):
1416
1340
as (stream, header_record)
1418
1342
df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
1420
rec = self._check_header(version_id, df.readline())
1421
except Exception, e:
1422
raise KnitCorrupt(self._filename,
1423
"While reading {%s} got %s(%s)"
1424
% (version_id, e.__class__.__name__, str(e)))
1343
rec = df.readline().split()
1345
raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1346
if rec[1].decode('utf-8')!= version_id:
1347
raise KnitCorrupt(self._filename,
1348
'unexpected version, wanted %r, got %r' % (
1349
version_id, rec[1]))
1427
def _check_header(self, version_id, line):
1430
raise KnitCorrupt(self._filename,
1431
'unexpected number of elements in record header')
1432
if rec[1] != version_id:
1433
raise KnitCorrupt(self._filename,
1434
'unexpected version, wanted %r, got %r'
1435
% (version_id, rec[1]))
1438
1352
def _parse_record(self, version_id, data):
1439
1353
# profiling notes:
1440
1354
# 4168 calls in 2880 217 internal
1441
1355
# 4168 calls to _parse_record_header in 2121
1442
1356
# 4168 calls to readlines in 330
1443
df = GzipFile(mode='rb', fileobj=StringIO(data))
1446
record_contents = df.readlines()
1447
except Exception, e:
1448
raise KnitCorrupt(self._filename,
1449
"While reading {%s} got %s(%s)"
1450
% (version_id, e.__class__.__name__, str(e)))
1451
header = record_contents.pop(0)
1452
rec = self._check_header(version_id, header)
1454
last_line = record_contents.pop()
1455
if len(record_contents) != int(rec[2]):
1456
raise KnitCorrupt(self._filename,
1457
'incorrect number of lines %s != %s'
1459
% (len(record_contents), int(rec[2]),
1461
if last_line != 'end %s\n' % rec[1]:
1462
raise KnitCorrupt(self._filename,
1463
'unexpected version end line %r, wanted %r'
1464
% (last_line, version_id))
1357
df, rec = self._parse_record_header(version_id, data)
1358
record_contents = df.readlines()
1359
l = record_contents.pop()
1360
assert len(record_contents) == int(rec[2])
1361
if l.decode('utf-8') != 'end %s\n' % version_id:
1362
raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r'
1466
1365
return record_contents, rec[3]
1471
1370
This unpacks enough of the text record to validate the id is
1472
1371
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1474
1376
# setup an iterator of the external records:
1475
1377
# uses readv so nice and fast we hope.
1476
1378
if len(records):
1477
1379
# grab the disk data needed.
1479
# Don't check _cache if it is empty
1480
needed_offsets = [(pos, size) for version_id, pos, size
1482
if version_id not in self._cache]
1484
needed_offsets = [(pos, size) for version_id, pos, size
1487
raw_records = self._transport.readv(self._filename, needed_offsets)
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1489
1383
for version_id, pos, size in records:
1490
if version_id in self._cache:
1491
# This data has already been validated
1492
data = self._cache[version_id]
1494
pos, data = raw_records.next()
1496
self._cache[version_id] = data
1498
# validate the header
1499
df, rec = self._parse_record_header(version_id, data)
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1501
1388
yield version_id, data
1503
1390
def read_records_iter(self, records):
1504
1391
"""Read text records from data file and yield result.
1506
The result will be returned in whatever is the fastest to read.
1507
Not by the order requested. Also, multiple requests for the same
1508
record will only yield 1 response.
1509
:param records: A list of (version_id, pos, len) entries
1510
:return: Yields (version_id, contents, digest) in the order
1511
read, not the order requested
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
1517
# Skip records we have alread seen
1518
yielded_records = set()
1519
needed_records = set()
1520
for record in records:
1521
if record[0] in self._cache:
1522
if record[0] in yielded_records:
1524
yielded_records.add(record[0])
1525
data = self._cache[record[0]]
1526
content, digest = self._parse_record(record[0], data)
1527
yield (record[0], content, digest)
1529
needed_records.add(record)
1530
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1532
needed_records = sorted(set(records), key=operator.itemgetter(1))
1534
if not needed_records:
1537
# The transport optimizes the fetching as well
1538
# (ie, reads continuous ranges.)
1539
readv_response = self._transport.readv(self._filename,
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1540
1410
[(pos, size) for version_id, pos, size in needed_records])
1542
for (version_id, pos, size), (pos, data) in \
1543
izip(iter(needed_records), readv_response):
1544
content, digest = self._parse_record(version_id, data)
1546
self._cache[version_id] = data
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
1547
1420
yield version_id, content, digest
1549
1422
def read_records(self, records):
1550
1423
"""Read records into a dictionary."""
1551
1424
components = {}
1552
for record_id, content, digest in \
1553
self.read_records_iter(records):
1425
for record_id, content, digest in self.read_records_iter(records):
1554
1426
components[record_id] = (content, digest)
1555
1427
return components