203
208
see parse_fulltext which this inverts.
205
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
210
encode_utf8 = cache_utf8.encode
211
return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
207
213
def lower_line_delta(self, delta):
208
214
"""convert a delta into a serializable form.
210
216
See parse_line_delta which this inverts.
218
encode_utf8 = cache_utf8.encode
213
220
for start, end, c, lines in delta:
214
221
out.append('%d,%d,%d\n' % (start, end, c))
215
for origin, text in lines:
216
out.append('%s %s' % (origin.encode('utf-8'), text))
222
out.extend(encode_utf8(origin) + ' ' + text
223
for origin, text in lines)
1370
1402
This unpacks enough of the text record to validate the id is
1371
1403
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1376
1405
# setup an iterator of the external records:
1377
1406
# uses readv so nice and fast we hope.
1378
1407
if len(records):
1379
1408
# grab the disk data needed.
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1410
# Don't check _cache if it is empty
1411
needed_offsets = [(pos, size) for version_id, pos, size
1413
if version_id not in self._cache]
1415
needed_offsets = [(pos, size) for version_id, pos, size
1418
raw_records = self._transport.readv(self._filename, needed_offsets)
1383
1421
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1422
if version_id in self._cache:
1423
# This data has already been validated
1424
data = self._cache[version_id]
1426
pos, data = raw_records.next()
1428
self._cache[version_id] = data
1430
# validate the header
1431
df, rec = self._parse_record_header(version_id, data)
1388
1433
yield version_id, data
1390
1435
def read_records_iter(self, records):
1391
1436
"""Read text records from data file and yield result.
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
1438
The result will be returned in whatever is the fastest to read.
1439
Not by the order requested. Also, multiple requests for the same
1440
record will only yield 1 response.
1441
:param records: A list of (version_id, pos, len) entries
1442
:return: Yields (version_id, contents, digest) in the order
1443
read, not the order requested
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1449
# Skip records we have alread seen
1450
yielded_records = set()
1451
needed_records = set()
1452
for record in records:
1453
if record[0] in self._cache:
1454
if record[0] in yielded_records:
1456
yielded_records.add(record[0])
1457
data = self._cache[record[0]]
1458
content, digest = self._parse_record(record[0], data)
1459
yield (record[0], content, digest)
1461
needed_records.add(record)
1462
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1464
needed_records = sorted(set(records), key=operator.itemgetter(1))
1466
if not needed_records:
1469
# The transport optimizes the fetching as well
1470
# (ie, reads continuous ranges.)
1471
readv_response = self._transport.readv(self._filename,
1410
1472
[(pos, size) for version_id, pos, size in needed_records])
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
1474
for (version_id, pos, size), (pos, data) in \
1475
izip(iter(needed_records), readv_response):
1476
content, digest = self._parse_record(version_id, data)
1478
self._cache[version_id] = data
1420
1479
yield version_id, content, digest
1422
1481
def read_records(self, records):
1423
1482
"""Read records into a dictionary."""
1424
1483
components = {}
1425
for record_id, content, digest in self.read_records_iter(records):
1484
for record_id, content, digest in \
1485
self.read_records_iter(records):
1426
1486
components[record_id] = (content, digest)
1427
1487
return components