375
375
# write all the data
376
376
pos = self._data.add_raw_record(data)
377
378
index_entries = []
378
379
for (version_id, options, parents, size) in records:
379
index_entries.append((version_id, options, pos, size, parents))
380
index_entries.append((version_id, options, pos+offset,
382
if self._data._do_cache:
383
self._data._cache[version_id] = data[offset:offset+size]
381
385
self._index.add_versions(index_entries)
387
def enable_cache(self):
388
"""Start caching data for this knit"""
389
self._data.enable_cache()
383
391
def clear_cache(self):
384
392
"""Clear the data cache only."""
385
393
self._data.clear_cache()
1284
1292
def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
1293
_KnitComponentFile.__init__(self, transport, filename, mode)
1286
1294
self._checked = False
1295
# TODO: jam 20060713 conceptually, this could spill to disk
1296
# if the cached size gets larger than a certain amount
1297
# but it complicates the model a bit, so for now just use
1298
# a simple dictionary
1300
self._do_cache = False
1288
1302
self._transport.put(self._filename, StringIO(''), mode=file_mode)
1304
def enable_cache(self):
1305
"""Enable caching of reads."""
1306
self._do_cache = True
1290
1308
def clear_cache(self):
1291
1309
"""Clear the record cache."""
1310
self._do_cache = False
1294
1313
def _open_file(self):
1331
1350
size, sio = self._record_to_data(version_id, digest, lines)
1332
1351
# write to disk
1333
1352
start_pos = self._transport.append(self._filename, sio)
1354
self._cache[version_id] = sio.getvalue()
1334
1355
return start_pos, size
1336
1357
def _parse_record_header(self, version_id, raw_data):
1370
1391
This unpacks enough of the text record to validate the id is
1371
1392
as expected but thats all.
1373
It will actively recompress currently cached records on the
1374
basis that that is cheaper than I/O activity.
1376
1394
# setup an iterator of the external records:
1377
1395
# uses readv so nice and fast we hope.
1378
1396
if len(records):
1379
1397
# grab the disk data needed.
1380
raw_records = self._transport.readv(self._filename,
1381
[(pos, size) for version_id, pos, size in records])
1399
# Don't check _cache if it is empty
1400
needed_offsets = [(pos, size) for version_id, pos, size
1402
if version_id not in self._cache]
1404
needed_offsets = [(pos, size) for version_id, pos, size
1407
raw_records = self._transport.readv(self._filename, needed_offsets)
1383
1410
for version_id, pos, size in records:
1384
pos, data = raw_records.next()
1385
# validate the header
1386
df, rec = self._parse_record_header(version_id, data)
1411
if version_id in self._cache:
1412
# This data has already been validated
1413
data = self._cache[version_id]
1415
pos, data = raw_records.next()
1417
self._cache[version_id] = data
1419
# validate the header
1420
df, rec = self._parse_record_header(version_id, data)
1388
1422
yield version_id, data
1390
1424
def read_records_iter(self, records):
1402
1436
# 4168 calls to parse_record in 2880
1404
1438
# Get unique records, sorted by position
1405
needed_records = sorted(set(records), key=operator.itemgetter(1))
1441
needed_records = set()
1442
for record in records:
1443
if record[0] in self._cache:
1444
have_records.append(record[0])
1446
needed_records.add(record)
1447
needed_records = sorted(needed_records, key=operator.itemgetter(1))
1449
needed_records = sorted(set(records), key=operator.itemgetter(1))
1407
1451
# We take it that the transport optimizes the fetching as good
1408
1452
# as possible (ie, reads continuous ranges.)
1409
response = self._transport.readv(self._filename,
1410
[(pos, size) for version_id, pos, size in needed_records])
1454
response = self._transport.readv(self._filename,
1455
[(pos, size) for version_id, pos, size in needed_records])
1412
1459
record_map = {}
1462
for version_id in have_records:
1463
data = self._cache[version_id]
1464
content, digest = self._parse_record(version_id, data)
1465
record_map[version_id] = (digest, content)
1413
1467
for (record_id, pos, size), (pos, data) in \
1414
1468
izip(iter(needed_records), response):
1415
1469
content, digest = self._parse_record(record_id, data)
1416
1470
record_map[record_id] = (digest, content)
1472
self._cache[record_id] = data
1418
1474
for version_id, pos, size in records:
1419
1475
digest, content = record_map[version_id]