1370
1391
This unpacks enough of the text record to validate the id is
1371
1392
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1376
1394
# setup an iterator of the external records:
1377
1395
# uses readv so nice and fast we hope.
1378
1396
if len(records):
1379
1397
# grab the disk data needed.
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1399
# Don't check _cache if it is empty
1400
needed_offsets = [(pos, size) for version_id, pos, size
1402
if version_id not in self._cache]
1404
needed_offsets = [(pos, size) for version_id, pos, size
1407
raw_records = self._transport.readv(self._filename, needed_offsets)
1383
1410
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1411
if version_id in self._cache:
1412
# This data has already been validated
1413
data = self._cache[version_id]
1415
pos, data = raw_records.next()
1417
self._cache[version_id] = data
1419
# validate the header
1420
df, rec = self._parse_record_header(version_id, data)
1388
1422
yield version_id, data
1390
1424
def read_records_iter(self, records):
1391
1425
"""Read text records from data file and yield result.
1393
Each passed record is a tuple of (version_id, pos, len) and
1394
will be read in the given order. Yields (version_id,
1427
The result will be returned in whatever is the fastest to read.
1428
Not by the order requested. Also, multiple requests for the same
1429
record will only yield 1 response.
1430
:param records: A list of (version_id, pos, len) entries
1431
:return: Yields (version_id, contents, digest) in the order
1432
read, not the order requested
1397
if len(records) == 0:
1400
# 60890 calls for 4168 extractions in 5045, 683 internal.
1401
# 4168 calls to readv in 1411
1402
# 4168 calls to parse_record in 2880
1404
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
# We take it that the transport optimizes the fetching as good
1408
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1438
# Skip records we have alread seen
1439
yielded_records = set()
1440
needed_records = set()
1441
for record in records:
1442
if record[0] in self._cache:
1443
if record[0] in yielded_records:
1445
yielded_records.add(record[0])
1446
data = self._cache[record[0]]
1447
content, digest = self._parse_record(record[0], data)
1448
yield (record[0], content, digest)
1450
needed_records.add(record)
1451
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1453
needed_records = sorted(set(records), key=operator.itemgetter(1))
1455
if not needed_records:
1458
# The transport optimizes the fetching as well
1459
# (ie, reads continuous ranges.)
1460
readv_response = self._transport.readv(self._filename,
1410
1461
[(pos, size) for version_id, pos, size in needed_records])
1413
for (record_id, pos, size), (pos, data) in \
1414
izip(iter(needed_records), response):
1415
content, digest = self._parse_record(record_id, data)
1416
record_map[record_id] = (digest, content)
1418
for version_id, pos, size in records:
1419
digest, content = record_map[version_id]
1463
for (version_id, pos, size), (pos, data) in \
1464
izip(iter(needed_records), readv_response):
1465
content, digest = self._parse_record(version_id, data)
1467
self._cache[version_id] = data
1420
1468
yield version_id, content, digest
1422
1470
def read_records(self, records):
1423
1471
"""Read records into a dictionary."""
1424
1472
components = {}
1425
for record_id, content, digest in self.read_records_iter(records):
1473
for record_id, content, digest in \
1474
self.read_records_iter(records):
1426
1475
components[record_id] = (content, digest)
1427
1476
return components