~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
374
374
        """
375
375
        # write all the data
376
376
        pos = self._data.add_raw_record(data)
 
377
        offset = 0
377
378
        index_entries = []
378
379
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
380
            index_entries.append((version_id, options, pos+offset,
 
381
                                  size, parents))
 
382
            if self._data._do_cache:
 
383
                self._data._cache[version_id] = data[offset:offset+size]
 
384
            offset += size
381
385
        self._index.add_versions(index_entries)
382
386
 
 
387
    def enable_cache(self):
 
388
        """Start caching data for this knit"""
 
389
        self._data.enable_cache()
 
390
 
383
391
    def clear_cache(self):
384
392
        """Clear the data cache only."""
385
393
        self._data.clear_cache()
695
703
        # c = component_id, m = method, p = position, s = size, n = next
696
704
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
697
705
        record_map = {}
698
 
        for component_id, content, digest in\
699
 
            self._data.read_records_iter(records): 
 
706
        for component_id, content, digest in \
 
707
                self._data.read_records_iter(records):
700
708
            method, position, size, next = position_map[component_id]
701
709
            record_map[component_id] = method, content, digest, next
702
710
                          
1284
1292
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
1293
        _KnitComponentFile.__init__(self, transport, filename, mode)
1286
1294
        self._checked = False
 
1295
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1296
        #       if the cached size gets larger than a certain amount
 
1297
        #       but it complicates the model a bit, so for now just use
 
1298
        #       a simple dictionary
 
1299
        self._cache = {}
 
1300
        self._do_cache = False
1287
1301
        if create:
1288
1302
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
1289
1303
 
 
1304
    def enable_cache(self):
 
1305
        """Enable caching of reads."""
 
1306
        self._do_cache = True
 
1307
 
1290
1308
    def clear_cache(self):
1291
1309
        """Clear the record cache."""
1292
 
        pass
 
1310
        self._do_cache = False
 
1311
        self._cache = {}
1293
1312
 
1294
1313
    def _open_file(self):
1295
1314
        try:
1331
1350
        size, sio = self._record_to_data(version_id, digest, lines)
1332
1351
        # write to disk
1333
1352
        start_pos = self._transport.append(self._filename, sio)
 
1353
        if self._do_cache:
 
1354
            self._cache[version_id] = sio.getvalue()
1334
1355
        return start_pos, size
1335
1356
 
1336
1357
    def _parse_record_header(self, version_id, raw_data):
1369
1390
 
1370
1391
        This unpacks enough of the text record to validate the id is
1371
1392
        as expected but thats all.
1372
 
 
1373
 
        It will actively recompress currently cached records on the
1374
 
        basis that that is cheaper than I/O activity.
1375
1393
        """
1376
1394
        # setup an iterator of the external records:
1377
1395
        # uses readv so nice and fast we hope.
1378
1396
        if len(records):
1379
1397
            # grab the disk data needed.
1380
 
            raw_records = self._transport.readv(self._filename,
1381
 
                [(pos, size) for version_id, pos, size in records])
 
1398
            if self._cache:
 
1399
                # Don't check _cache if it is empty
 
1400
                needed_offsets = [(pos, size) for version_id, pos, size
 
1401
                                              in records
 
1402
                                              if version_id not in self._cache]
 
1403
            else:
 
1404
                needed_offsets = [(pos, size) for version_id, pos, size
 
1405
                                               in records]
 
1406
 
 
1407
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1408
                
1382
1409
 
1383
1410
        for version_id, pos, size in records:
1384
 
            pos, data = raw_records.next()
1385
 
            # validate the header
1386
 
            df, rec = self._parse_record_header(version_id, data)
1387
 
            df.close()
 
1411
            if version_id in self._cache:
 
1412
                # This data has already been validated
 
1413
                data = self._cache[version_id]
 
1414
            else:
 
1415
                pos, data = raw_records.next()
 
1416
                if self._do_cache:
 
1417
                    self._cache[version_id] = data
 
1418
 
 
1419
                # validate the header
 
1420
                df, rec = self._parse_record_header(version_id, data)
 
1421
                df.close()
1388
1422
            yield version_id, data
1389
1423
 
1390
1424
    def read_records_iter(self, records):
1391
1425
        """Read text records from data file and yield result.
1392
1426
 
1393
 
        Each passed record is a tuple of (version_id, pos, len) and
1394
 
        will be read in the given order.  Yields (version_id,
1395
 
        contents, digest).
 
1427
        The result will be returned in whatever is the fastest to read.
 
1428
        Not by the order requested. Also, multiple requests for the same
 
1429
        record will only yield 1 response.
 
1430
        :param records: A list of (version_id, pos, len) entries
 
1431
        :return: Yields (version_id, contents, digest) in the order
 
1432
                 read, not the order requested
1396
1433
        """
1397
 
        if len(records) == 0:
1398
 
            return
1399
 
        # profiling notes:
1400
 
        # 60890  calls for 4168 extractions in 5045, 683 internal.
1401
 
        # 4168   calls to readv              in 1411
1402
 
        # 4168   calls to parse_record       in 2880
1403
 
 
1404
 
        # Get unique records, sorted by position
1405
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
1406
 
 
1407
 
        # We take it that the transport optimizes the fetching as good
1408
 
        # as possible (ie, reads continuous ranges.)
1409
 
        response = self._transport.readv(self._filename,
 
1434
        if not records:
 
1435
            return
 
1436
 
 
1437
        if self._cache:
 
1438
            # Skip records we have alread seen
 
1439
            yielded_records = set()
 
1440
            needed_records = set()
 
1441
            for record in records:
 
1442
                if record[0] in self._cache:
 
1443
                    if record[0] in yielded_records:
 
1444
                        continue
 
1445
                    yielded_records.add(record[0])
 
1446
                    data = self._cache[record[0]]
 
1447
                    content, digest = self._parse_record(record[0], data)
 
1448
                    yield (record[0], content, digest)
 
1449
                else:
 
1450
                    needed_records.add(record)
 
1451
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1452
        else:
 
1453
            needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1454
 
 
1455
        if not needed_records:
 
1456
            return
 
1457
 
 
1458
        # The transport optimizes the fetching as well 
 
1459
        # (ie, reads continuous ranges.)
 
1460
        readv_response = self._transport.readv(self._filename,
1410
1461
            [(pos, size) for version_id, pos, size in needed_records])
1411
1462
 
1412
 
        record_map = {}
1413
 
        for (record_id, pos, size), (pos, data) in \
1414
 
            izip(iter(needed_records), response):
1415
 
            content, digest = self._parse_record(record_id, data)
1416
 
            record_map[record_id] = (digest, content)
1417
 
 
1418
 
        for version_id, pos, size in records:
1419
 
            digest, content = record_map[version_id]
 
1463
        for (version_id, pos, size), (pos, data) in \
 
1464
                izip(iter(needed_records), readv_response):
 
1465
            content, digest = self._parse_record(version_id, data)
 
1466
            if self._do_cache:
 
1467
                self._cache[version_id] = data
1420
1468
            yield version_id, content, digest
1421
1469
 
1422
1470
    def read_records(self, records):
1423
1471
        """Read records into a dictionary."""
1424
1472
        components = {}
1425
 
        for record_id, content, digest in self.read_records_iter(records):
 
1473
        for record_id, content, digest in \
 
1474
                self.read_records_iter(records):
1426
1475
            components[record_id] = (content, digest)
1427
1476
        return components
1428
1477