1372
1369
It will actively recompress currently cached records on the
1373
1370
basis that that is cheaper than I/O activity.
1376
for version_id, pos, size in records:
1377
if version_id not in self._records:
1378
needed_records.append((version_id, pos, size))
1380
1372
# setup an iterator of the external records:
1381
1373
# uses readv so nice and fast we hope.
1382
if len(needed_records):
1383
1375
# grab the disk data needed.
1384
1376
raw_records = self._transport.readv(self._filename,
1385
[(pos, size) for version_id, pos, size in needed_records])
1377
[(pos, size) for version_id, pos, size in records])
1387
1379
for version_id, pos, size in records:
1388
if version_id in self._records:
1389
# compress a new version
1390
size, sio = self._record_to_data(version_id,
1391
self._records[version_id][0],
1392
self._records[version_id][1])
1393
yield version_id, sio.getvalue()
1395
pos, data = raw_records.next()
1396
# validate the header
1397
df, rec = self._parse_record_header(version_id, data)
1399
yield version_id, data
1380
pos, data = raw_records.next()
1381
# validate the header
1382
df, rec = self._parse_record_header(version_id, data)
1384
yield version_id, data
1402
1386
def read_records_iter(self, records):
1403
1387
"""Read text records from data file and yield result.
1406
1390
will be read in the given order. Yields (version_id,
1407
1391
contents, digest).
1393
if len(records) == 0:
1409
1395
# profiling notes:
1410
1396
# 60890 calls for 4168 extractions in 5045, 683 internal.
1411
1397
# 4168 calls to readv in 1411
1412
1398
# 4168 calls to parse_record in 2880
1414
needed_records = set()
1415
for version_id, pos, size in records:
1416
if version_id not in self._records:
1417
needed_records.add((version_id, pos, size))
1419
# turn our set into a list, sorted by file position
1420
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1422
if len(needed_records):
1423
# We take it that the transport optimizes the fetching as good
1424
# as possible (ie, reads continuous ranges.)
1425
response = self._transport.readv(self._filename,
1426
[(pos, size) for version_id, pos, size in needed_records])
1428
for (record_id, pos, size), (pos, data) in \
1429
izip(iter(needed_records), response):
1430
content, digest = self._parse_record(record_id, data)
1431
self._records[record_id] = (digest, content)
1433
for version_id, pos, size in records:
1434
yield version_id, list(self._records[version_id][1]), self._records[version_id][0]
1400
# Get unique records, sorted by position
1401
needed_records = sorted(set(records), key=operator.itemgetter(1))
1403
# We take it that the transport optimizes the fetching as good
1404
# as possible (ie, reads continuous ranges.)
1405
response = self._transport.readv(self._filename,
1406
[(pos, size) for version_id, pos, size in needed_records])
1409
for (record_id, pos, size), (pos, data) in \
1410
izip(iter(needed_records), response):
1411
content, digest = self._parse_record(record_id, data)
1412
record_map[record_id] = (digest, content)
1414
for version_id, pos, size in records:
1415
digest, content = record_map[version_id]
1416
yield version_id, content, digest
1436
1418
def read_records(self, records):
1437
1419
"""Read records into a dictionary."""