203
208
see parse_fulltext which this inverts.
205
return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
210
encode_utf8 = cache_utf8.encode
211
return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
207
213
def lower_line_delta(self, delta):
208
214
"""convert a delta into a serializable form.
210
216
See parse_line_delta which this inverts.
218
encode_utf8 = cache_utf8.encode
213
220
for start, end, c, lines in delta:
214
221
out.append('%d,%d,%d\n' % (start, end, c))
215
for origin, text in lines:
216
out.append('%s %s' % (origin.encode('utf-8'), text))
222
out.extend(encode_utf8(origin) + ' ' + text
223
for origin, text in lines)
1370
1401
This unpacks enough of the text record to validate the id is
1371
1402
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1376
1404
# setup an iterator of the external records:
1377
1405
# uses readv so nice and fast we hope.
1378
1406
if len(records):
1379
1407
# grab the disk data needed.
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1409
# Don't check _cache if it is empty
1410
needed_offsets = [(pos, size) for version_id, pos, size
1412
if version_id not in self._cache]
1414
needed_offsets = [(pos, size) for version_id, pos, size
1417
raw_records = self._transport.readv(self._filename, needed_offsets)
1383
1420
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1421
if version_id in self._cache:
1422
# This data has already been validated
1423
data = self._cache[version_id]
1425
pos, data = raw_records.next()
1427
self._cache[version_id] = data
1429
# validate the header
1430
df, rec = self._parse_record_header(version_id, data)
1388
1432
yield version_id, data
1390
1434
def read_records_iter(self, records):
1391
1435
"""Read text records from data file and yield result.
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
1437
The result will be returned in whatever is the fastest to read.
1438
Not by the order requested. Also, multiple requests for the same
1439
record will only yield 1 response.
1440
:param records: A list of (version_id, pos, len) entries
1441
:return: Yields (version_id, contents, digest) in the order
1442
read, not the order requested
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1448
# Skip records we have alread seen
1449
yielded_records = set()
1450
needed_records = set()
1451
for record in records:
1452
if record[0] in self._cache:
1453
if record[0] in yielded_records:
1455
yielded_records.add(record[0])
1456
data = self._cache[record[0]]
1457
content, digest = self._parse_record(record[0], data)
1458
yield (record[0], content, digest)
1460
needed_records.add(record)
1461
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1463
needed_records = sorted(set(records), key=operator.itemgetter(1))
1465
if not needed_records:
1468
# The transport optimizes the fetching as well
1469
# (ie, reads continuous ranges.)
1470
readv_response = self._transport.readv(self._filename,
1410
1471
[(pos, size) for version_id, pos, size in needed_records])
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
1473
for (version_id, pos, size), (pos, data) in \
1474
izip(iter(needed_records), readv_response):
1475
content, digest = self._parse_record(version_id, data)
1477
self._cache[version_id] = data
1420
1478
yield version_id, content, digest
1422
1480
def read_records(self, records):
1423
1481
"""Read records into a dictionary."""
1424
1482
components = {}
1425
for record_id, content, digest in self.read_records_iter(records):
1483
for record_id, content, digest in \
1484
self.read_records_iter(records):
1426
1485
components[record_id] = (content, digest)
1427
1486
return components