~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-14 04:04:50 UTC
  • mto: This revision was merged to the branch mainline in revision 1885.
  • Revision ID: john@arbash-meinel.com-20060714040450-8a2022f094a2b73c
Allow Versioned files to do caching if explicitly asked, and implement for Knit

Show diffs side-by-side

added added

removed removed

Lines of Context:
374
374
        """
375
375
        # write all the data
376
376
        pos = self._data.add_raw_record(data)
 
377
        offset = 0
377
378
        index_entries = []
378
379
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
380
            index_entries.append((version_id, options, pos+offset,
 
381
                                  size, parents))
 
382
            if self._data._do_cache:
 
383
                self._data._cache[version_id] = data[offset:offset+size]
 
384
            offset += size
381
385
        self._index.add_versions(index_entries)
382
386
 
 
387
    def enable_cache(self):
 
388
        """Start caching data for this knit"""
 
389
        self._data.enable_cache()
 
390
 
383
391
    def clear_cache(self):
384
392
        """Clear the data cache only."""
385
393
        self._data.clear_cache()
1284
1292
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
1293
        _KnitComponentFile.__init__(self, transport, filename, mode)
1286
1294
        self._checked = False
 
1295
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1296
        #       if the cached size gets larger than a certain amount
 
1297
        #       but it complicates the model a bit, so for now just use
 
1298
        #       a simple dictionary
 
1299
        self._cache = {}
 
1300
        self._do_cache = False
1287
1301
        if create:
1288
1302
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
1289
1303
 
 
1304
    def enable_cache(self):
 
1305
        """Enable caching of reads."""
 
1306
        self._do_cache = True
 
1307
 
1290
1308
    def clear_cache(self):
1291
1309
        """Clear the record cache."""
1292
 
        pass
 
1310
        self._do_cache = False
 
1311
        self._cache = {}
1293
1312
 
1294
1313
    def _open_file(self):
1295
1314
        try:
1331
1350
        size, sio = self._record_to_data(version_id, digest, lines)
1332
1351
        # write to disk
1333
1352
        start_pos = self._transport.append(self._filename, sio)
 
1353
        if self._do_cache:
 
1354
            self._cache[version_id] = sio.getvalue()
1334
1355
        return start_pos, size
1335
1356
 
1336
1357
    def _parse_record_header(self, version_id, raw_data):
1369
1390
 
1370
1391
        This unpacks enough of the text record to validate the id is
1371
1392
        as expected but thats all.
1372
 
 
1373
 
        It will actively recompress currently cached records on the
1374
 
        basis that that is cheaper than I/O activity.
1375
1393
        """
1376
1394
        # setup an iterator of the external records:
1377
1395
        # uses readv so nice and fast we hope.
1378
1396
        if len(records):
1379
1397
            # grab the disk data needed.
1380
 
            raw_records = self._transport.readv(self._filename,
1381
 
                [(pos, size) for version_id, pos, size in records])
 
1398
            if self._cache:
 
1399
                # Don't check _cache if it is empty
 
1400
                needed_offsets = [(pos, size) for version_id, pos, size
 
1401
                                              in records
 
1402
                                              if version_id not in self._cache]
 
1403
            else:
 
1404
                needed_offsets = [(pos, size) for version_id, pos, size
 
1405
                                               in records]
 
1406
 
 
1407
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1408
                
1382
1409
 
1383
1410
        for version_id, pos, size in records:
1384
 
            pos, data = raw_records.next()
1385
 
            # validate the header
1386
 
            df, rec = self._parse_record_header(version_id, data)
1387
 
            df.close()
 
1411
            if version_id in self._cache:
 
1412
                # This data has already been validated
 
1413
                data = self._cache[version_id]
 
1414
            else:
 
1415
                pos, data = raw_records.next()
 
1416
                if self._do_cache:
 
1417
                    self._cache[version_id] = data
 
1418
 
 
1419
                # validate the header
 
1420
                df, rec = self._parse_record_header(version_id, data)
 
1421
                df.close()
1388
1422
            yield version_id, data
1389
1423
 
1390
1424
    def read_records_iter(self, records):
1402
1436
        # 4168   calls to parse_record       in 2880
1403
1437
 
1404
1438
        # Get unique records, sorted by position
1405
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1439
        have_records = []
 
1440
        if self._cache:
 
1441
            needed_records = set()
 
1442
            for record in records:
 
1443
                if record[0] in self._cache:
 
1444
                    have_records.append(record[0])
 
1445
                else:
 
1446
                    needed_records.add(record)
 
1447
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1448
        else:
 
1449
            needed_records = sorted(set(records), key=operator.itemgetter(1))
1406
1450
 
1407
1451
        # We take it that the transport optimizes the fetching as good
1408
1452
        # as possible (ie, reads continuous ranges.)
1409
 
        response = self._transport.readv(self._filename,
1410
 
            [(pos, size) for version_id, pos, size in needed_records])
 
1453
        if needed_records:
 
1454
            response = self._transport.readv(self._filename,
 
1455
                [(pos, size) for version_id, pos, size in needed_records])
 
1456
        else:
 
1457
            response = []
1411
1458
 
1412
1459
        record_map = {}
 
1460
 
 
1461
        if have_records:
 
1462
            for version_id in have_records:
 
1463
                data = self._cache[version_id]
 
1464
                content, digest = self._parse_record(version_id, data)
 
1465
                record_map[version_id] = (digest, content)
 
1466
 
1413
1467
        for (record_id, pos, size), (pos, data) in \
1414
1468
            izip(iter(needed_records), response):
1415
1469
            content, digest = self._parse_record(record_id, data)
1416
1470
            record_map[record_id] = (digest, content)
 
1471
            if self._do_cache:
 
1472
                self._cache[record_id] = data
1417
1473
 
1418
1474
        for version_id, pos, size in records:
1419
1475
            digest, content = record_map[version_id]