~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
70
70
import warnings
71
71
 
72
72
import bzrlib
 
73
from bzrlib.lazy_import import lazy_import
 
74
lazy_import(globals(), """
 
75
from bzrlib import (
 
76
    pack,
 
77
    )
 
78
""")
73
79
from bzrlib import (
74
80
    cache_utf8,
75
81
    errors,
339
345
def make_empty_knit(transport, relpath):
340
346
    """Construct a empty knit at the specified location."""
341
347
    k = KnitVersionedFile(transport, relpath, 'w', KnitPlainFactory)
342
 
    k._data._open_file()
343
348
 
344
349
 
345
350
class KnitVersionedFile(VersionedFile):
360
365
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
361
366
                 factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
362
367
                 create=False, create_parent_dir=False, delay_create=False,
363
 
                 dir_mode=None, index=None):
 
368
                 dir_mode=None, index=None, access_method=None):
364
369
        """Construct a knit at location specified by relpath.
365
370
        
366
371
        :param create: If not True, only open an existing knit.
394
399
                dir_mode=dir_mode)
395
400
        else:
396
401
            self._index = index
397
 
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
398
 
            access_mode, create=create and not len(self), file_mode=file_mode,
399
 
            create_parent_dir=create_parent_dir, delay_create=delay_create,
400
 
            dir_mode=dir_mode)
 
402
        if access_method is None:
 
403
            _access = _KnitAccess(transport, relpath + DATA_SUFFIX, file_mode, dir_mode,
 
404
                ((create and not len(self)) and delay_create), create_parent_dir)
 
405
        else:
 
406
            _access = access_method
 
407
        if create and not len(self) and not delay_create:
 
408
            _access.create()
 
409
        self._data = _KnitData(_access)
401
410
 
402
411
    def __repr__(self):
403
412
        return '%s(%s)' % (self.__class__.__name__, 
420
429
        for count in xrange(self._max_delta_chain):
421
430
            parent = delta_parents[0]
422
431
            method = self._index.get_method(parent)
423
 
            pos, size = self._index.get_position(parent)
 
432
            index, pos, size = self._index.get_position(parent)
424
433
            if method == 'fulltext':
425
434
                fulltext_size = size
426
435
                break
484
493
        options.append('line-delta')
485
494
        store_lines = self.factory.lower_line_delta(delta)
486
495
 
487
 
        where, size = self._data.add_record(version_id, digest, store_lines)
488
 
        self._index.add_version(version_id, options, where, size, parents)
 
496
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
497
        self._index.add_version(version_id, options, access_memo, parents)
489
498
 
490
499
    def _add_raw_records(self, records, data):
491
500
        """Add all the records 'records' with data pre-joined in 'data'.
496
505
                     the preceding records sizes.
497
506
        """
498
507
        # write all the data
499
 
        pos = self._data.add_raw_record(data)
 
508
        raw_record_sizes = [record[3] for record in records]
 
509
        positions = self._data.add_raw_records(raw_record_sizes, data)
500
510
        offset = 0
501
511
        index_entries = []
502
 
        for (version_id, options, parents, size) in records:
503
 
            index_entries.append((version_id, options, pos+offset,
504
 
                                  size, parents))
 
512
        for (version_id, options, parents, size), access_memo in zip(
 
513
            records, positions):
 
514
            index_entries.append((version_id, options, access_memo, parents))
505
515
            if self._data._do_cache:
506
516
                self._data._cache[version_id] = data[offset:offset+size]
507
517
            offset += size
545
555
        current_values = self._index._cache[version_id]
546
556
        assert set(current_values[4]).difference(set(new_parents)) == set()
547
557
        self._index.add_version(version_id,
548
 
                                current_values[1], 
549
 
                                current_values[2],
550
 
                                current_values[3],
 
558
                                current_values[1],
 
559
                                (None, current_values[2], current_values[3]),
551
560
                                new_parents)
552
561
 
553
562
    def _extract_blocks(self, version_id, source, target):
568
577
            parent = parents[0]
569
578
        else:
570
579
            parent = None
571
 
        data_pos, data_size = self._index.get_position(version_id)
572
 
        data, sha1 = self._data.read_records(((version_id, data_pos, data_size),))[version_id]
 
580
        index_memo = self._index.get_position(version_id)
 
581
        data, sha1 = self._data.read_records(((version_id, index_memo),))[version_id]
573
582
        noeol = 'no-eol' in self._index.get_options(version_id)
574
583
        if 'fulltext' == self._index.get_method(version_id):
575
584
            new_content = self.factory.parse_fulltext(data, version_id)
693
702
                    next = None
694
703
                else:
695
704
                    next = self.get_parents(cursor)[0]
696
 
                data_pos, data_size = self._index.get_position(cursor)
697
 
                component_data[cursor] = (method, data_pos, data_size, next)
 
705
                index_memo = self._index.get_position(cursor)
 
706
                component_data[cursor] = (method, index_memo, next)
698
707
                cursor = next
699
708
        return component_data
700
709
       
798
807
            options.append('fulltext')
799
808
            store_lines = self.factory.lower_fulltext(lines)
800
809
 
801
 
        where, size = self._data.add_record(version_id, digest, store_lines)
802
 
        self._index.add_version(version_id, options, where, size, parents)
 
810
        access_memo = self._data.add_record(version_id, digest, store_lines)
 
811
        self._index.add_version(version_id, options, access_memo, parents)
803
812
        return lines
804
813
 
805
814
    def check(self, progress_bar=None):
827
836
        If the method is fulltext, next will be None.
828
837
        """
829
838
        position_map = self._get_components_positions(version_ids)
830
 
        # c = component_id, m = method, p = position, s = size, n = next
831
 
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
 
839
        # c = component_id, m = method, i_m = index_memo, n = next
 
840
        records = [(c, i_m) for c, (m, i_m, n) in position_map.iteritems()]
832
841
        record_map = {}
833
842
        for component_id, content, digest in \
834
843
                self._data.read_records_iter(records):
835
 
            method, position, size, next = position_map[component_id]
 
844
            method, index_memo, next = position_map[component_id]
836
845
            record_map[component_id] = method, content, digest, next
837
846
                          
838
847
        return record_map
930
939
        # get a in-component-order queue:
931
940
        for version_id in self.versions():
932
941
            if version_id in requested_versions:
933
 
                data_pos, length = self._index.get_position(version_id)
934
 
                version_id_records.append((version_id, data_pos, length))
 
942
                index_memo = self._index.get_position(version_id)
 
943
                version_id_records.append((version_id, index_memo))
935
944
 
936
945
        total = len(version_id_records)
937
946
        for version_idx, (version_id, data, sha_value) in \
1073
1082
            raise KnitHeaderError(badline=line,
1074
1083
                              filename=self._transport.abspath(self._filename))
1075
1084
 
1076
 
    def commit(self):
1077
 
        """Commit is a nop."""
1078
 
 
1079
1085
    def __repr__(self):
1080
1086
        return '%s(%s)' % (self.__class__.__name__, self._filename)
1081
1087
 
1268
1274
                result_list.append('.' + version)
1269
1275
        return ' '.join(result_list)
1270
1276
 
1271
 
    def add_version(self, version_id, options, pos, size, parents):
 
1277
    def add_version(self, version_id, options, index_memo, parents):
1272
1278
        """Add a version record to the index."""
1273
 
        self.add_versions(((version_id, options, pos, size, parents),))
 
1279
        self.add_versions(((version_id, options, index_memo, parents),))
1274
1280
 
1275
1281
    def add_versions(self, versions):
1276
1282
        """Add multiple versions to the index.
1283
1289
        orig_cache = self._cache.copy()
1284
1290
 
1285
1291
        try:
1286
 
            for version_id, options, pos, size, parents in versions:
 
1292
            for version_id, options, (index, pos, size), parents in versions:
1287
1293
                line = "\n%s %s %s %s %s :" % (version_id,
1288
1294
                                               ','.join(options),
1289
1295
                                               pos,
1316
1322
        return version_id in self._cache
1317
1323
 
1318
1324
    def get_position(self, version_id):
1319
 
        """Return data position and size of specified version."""
 
1325
        """Return details needed to access the version.
 
1326
        
 
1327
        .kndx indices do not support split-out data, so return None for the 
 
1328
        index field.
 
1329
 
 
1330
        :return: a tuple (None, data position, size) to hand to the access
 
1331
            logic to get the record.
 
1332
        """
1320
1333
        entry = self._cache[version_id]
1321
 
        return entry[2], entry[3]
 
1334
        return None, entry[2], entry[3]
1322
1335
 
1323
1336
    def get_method(self, version_id):
1324
1337
        """Return compression method of specified version."""
1386
1399
        if self._parents:
1387
1400
            for node in self._graph_index.iter_entries(keys):
1388
1401
                yield node
1389
 
                found_keys.add(node[0])
 
1402
                found_keys.add(node[1])
1390
1403
        else:
1391
1404
            # adapt parentless index to the rest of the code.
1392
1405
            for node in self._graph_index.iter_entries(keys):
1393
 
                yield node[0], node[1], ()
1394
 
                found_keys.add(node[0])
 
1406
                yield node[0], node[1], node[2], ()
 
1407
                found_keys.add(node[1])
1395
1408
        if check_present:
1396
1409
            missing_keys = keys.difference(found_keys)
1397
1410
            if missing_keys:
1399
1412
 
1400
1413
    def _present_keys(self, version_ids):
1401
1414
        return set([
1402
 
            node[0] for node in self._get_entries(version_ids)])
 
1415
            node[1] for node in self._get_entries(version_ids)])
1403
1416
 
1404
1417
    def _parentless_ancestry(self, versions):
1405
1418
        """Honour the get_ancestry API for parentless knit indices."""
1427
1440
            new_nodes = self._get_entries(this_iteration)
1428
1441
            found = set()
1429
1442
            pending = set()
1430
 
            for (key, value, node_refs) in new_nodes:
 
1443
            for (index, key, value, node_refs) in new_nodes:
1431
1444
                # dont ask for ghosties - otherwise
1432
1445
                # we we can end up looping with pending
1433
1446
                # being entirely ghosted.
1464
1477
            this_iteration = pending
1465
1478
            new_nodes = self._get_entries(this_iteration)
1466
1479
            pending = set()
1467
 
            for (key, value, node_refs) in new_nodes:
 
1480
            for (index, key, value, node_refs) in new_nodes:
1468
1481
                graph[key] = node_refs[0]
1469
1482
                # queue parents 
1470
1483
                for parent in graph[key]:
1488
1501
        if not self._parents:
1489
1502
            return [(key, ()) for key in self.get_versions()]
1490
1503
        result = []
1491
 
        for key, value, refs in self._graph_index.iter_all_entries():
 
1504
        for index, key, value, refs in self._graph_index.iter_all_entries():
1492
1505
            result.append((key[0], tuple([ref[0] for ref in refs[0]])))
1493
1506
        return result
1494
1507
 
1506
1519
            all_parents = set()
1507
1520
            present_parents = set()
1508
1521
            for node in all_nodes:
1509
 
                all_parents.update(node[2][0])
 
1522
                all_parents.update(node[3][0])
1510
1523
                # any node we are querying must be present
1511
 
                present_parents.add(node[0])
 
1524
                present_parents.add(node[1])
1512
1525
            unknown_parents = all_parents.difference(present_parents)
1513
1526
            present_parents.update(self._present_keys(unknown_parents))
1514
1527
            for node in all_nodes:
1515
1528
                parents = []
1516
 
                for parent in node[2][0]:
 
1529
                for parent in node[3][0]:
1517
1530
                    if parent in present_parents:
1518
1531
                        parents.append(parent[0])
1519
 
                yield node[0][0], tuple(parents)
 
1532
                yield node[1][0], tuple(parents)
1520
1533
        else:
1521
1534
            for node in self._get_entries(self._version_ids_to_keys(version_ids)):
1522
 
                yield node[0][0], ()
 
1535
                yield node[1][0], ()
1523
1536
 
1524
1537
    def num_versions(self):
1525
1538
        return len(list(self._graph_index.iter_all_entries()))
1528
1541
 
1529
1542
    def get_versions(self):
1530
1543
        """Get all the versions in the file. not topologically sorted."""
1531
 
        return [node[0][0] for node in self._graph_index.iter_all_entries()]
 
1544
        return [node[1][0] for node in self._graph_index.iter_all_entries()]
1532
1545
    
1533
1546
    def has_version(self, version_id):
1534
1547
        """True if the version is in the index."""
1538
1551
        return tuple(key[0] for key in keys)
1539
1552
 
1540
1553
    def get_position(self, version_id):
1541
 
        """Return data position and size of specified version."""
1542
 
        bits = self._get_node(version_id)[1][1:].split(' ')
1543
 
        return int(bits[0]), int(bits[1])
 
1554
        """Return details needed to access the version.
 
1555
        
 
1556
        :return: a tuple (index, data position, size) to hand to the access
 
1557
            logic to get the record.
 
1558
        """
 
1559
        node = self._get_node(version_id)
 
1560
        bits = node[2][1:].split(' ')
 
1561
        return node[0], int(bits[0]), int(bits[1])
1544
1562
 
1545
1563
    def get_method(self, version_id):
1546
1564
        """Return compression method of specified version."""
1547
1565
        if not self._deltas:
1548
1566
            return 'fulltext'
1549
 
        return self._parent_compression(self._get_node(version_id)[2][1])
 
1567
        return self._parent_compression(self._get_node(version_id)[3][1])
1550
1568
 
1551
1569
    def _parent_compression(self, reference_list):
1552
1570
        # use the second reference list to decide if this is delta'd or not.
1567
1585
        if not self._deltas:
1568
1586
            options = ['fulltext']
1569
1587
        else:
1570
 
            options = [self._parent_compression(node[2][1])]
1571
 
        if node[1][0] == 'N':
 
1588
            options = [self._parent_compression(node[3][1])]
 
1589
        if node[2][0] == 'N':
1572
1590
            options.append('no-eol')
1573
1591
        return options
1574
1592
 
1586
1604
            check_present=True))
1587
1605
        if not self._parents:
1588
1606
            return ()
1589
 
        return self._keys_to_version_ids(nodes[0][2][0])
 
1607
        return self._keys_to_version_ids(nodes[0][3][0])
1590
1608
 
1591
1609
    def check_versions_present(self, version_ids):
1592
1610
        """Check that all specified versions are present."""
1596
1614
        if missing:
1597
1615
            raise RevisionNotPresent(missing.pop(), self)
1598
1616
 
1599
 
    def add_version(self, version_id, options, pos, size, parents):
 
1617
    def add_version(self, version_id, options, access_memo, parents):
1600
1618
        """Add a version record to the index."""
1601
 
        return self.add_versions(((version_id, options, pos, size, parents),))
 
1619
        return self.add_versions(((version_id, options, access_memo, parents),))
1602
1620
 
1603
1621
    def add_versions(self, versions):
1604
1622
        """Add multiple versions to the index.
1618
1636
        # check for dups
1619
1637
 
1620
1638
        keys = {}
1621
 
        for (version_id, options, pos, size, parents) in versions:
1622
 
            # index keys are tuples:
 
1639
        for (version_id, options, access_memo, parents) in versions:
 
1640
            index, pos, size = access_memo
1623
1641
            key = (version_id, )
1624
1642
            parents = tuple((parent, ) for parent in parents)
1625
1643
            if 'no-eol' in options:
1645
1663
                node_refs = ()
1646
1664
            keys[key] = (value, node_refs)
1647
1665
        present_nodes = self._get_entries(keys)
1648
 
        for (key, value, node_refs) in present_nodes:
 
1666
        for (index, key, value, node_refs) in present_nodes:
1649
1667
            if (value, node_refs) != keys[key]:
1650
1668
                raise KnitCorrupt(self, "inconsistent details in add_versions"
1651
1669
                    ": %s %s" % ((value, node_refs), keys[key]))
1661
1679
        
1662
1680
    def _version_ids_to_keys(self, version_ids):
1663
1681
        return set((version_id, ) for version_id in version_ids)
1664
 
        
1665
 
 
1666
 
class _KnitData(_KnitComponentFile):
1667
 
    """Contents of the knit data file"""
1668
 
 
1669
 
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
1670
 
                 create_parent_dir=False, delay_create=False,
1671
 
                 dir_mode=None):
1672
 
        _KnitComponentFile.__init__(self, transport, filename, mode,
1673
 
                                    file_mode=file_mode,
1674
 
                                    create_parent_dir=create_parent_dir,
1675
 
                                    dir_mode=dir_mode)
 
1682
 
 
1683
 
 
1684
class _KnitAccess(object):
 
1685
    """Access to knit records in a .knit file."""
 
1686
 
 
1687
    def __init__(self, transport, filename, _file_mode, _dir_mode,
 
1688
        _need_to_create, _create_parent_dir):
 
1689
        """Create a _KnitAccess for accessing and inserting data.
 
1690
 
 
1691
        :param transport: The transport the .knit is located on.
 
1692
        :param filename: The filename of the .knit.
 
1693
        """
 
1694
        self._transport = transport
 
1695
        self._filename = filename
 
1696
        self._file_mode = _file_mode
 
1697
        self._dir_mode = _dir_mode
 
1698
        self._need_to_create = _need_to_create
 
1699
        self._create_parent_dir = _create_parent_dir
 
1700
 
 
1701
    def add_raw_records(self, sizes, raw_data):
 
1702
        """Add raw knit bytes to a storage area.
 
1703
 
 
1704
        The data is spooled to whereever the access method is storing data.
 
1705
 
 
1706
        :param sizes: An iterable containing the size of each raw data segment.
 
1707
        :param raw_data: A bytestring containing the data.
 
1708
        :return: A list of memos to retrieve the record later. Each memo is a
 
1709
            tuple - (index, pos, length), where the index field is always None
 
1710
            for the .knit access method.
 
1711
        """
 
1712
        assert type(raw_data) == str, \
 
1713
            'data must be plain bytes was %s' % type(raw_data)
 
1714
        if not self._need_to_create:
 
1715
            base = self._transport.append_bytes(self._filename, raw_data)
 
1716
        else:
 
1717
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
 
1718
                                   create_parent_dir=self._create_parent_dir,
 
1719
                                   mode=self._file_mode,
 
1720
                                   dir_mode=self._dir_mode)
 
1721
            self._need_to_create = False
 
1722
            base = 0
 
1723
        result = []
 
1724
        for size in sizes:
 
1725
            result.append((None, base, size))
 
1726
            base += size
 
1727
        return result
 
1728
 
 
1729
    def create(self):
 
1730
        """IFF this data access has its own storage area, initialise it.
 
1731
 
 
1732
        :return: None.
 
1733
        """
 
1734
        self._transport.put_bytes_non_atomic(self._filename, '',
 
1735
                                             mode=self._file_mode)
 
1736
 
 
1737
    def open_file(self):
 
1738
        """IFF this data access can be represented as a single file, open it.
 
1739
 
 
1740
        For knits that are not mapped to a single file on disk this will
 
1741
        always return None.
 
1742
 
 
1743
        :return: None or a file handle.
 
1744
        """
 
1745
        try:
 
1746
            return self._transport.get(self._filename)
 
1747
        except NoSuchFile:
 
1748
            pass
 
1749
        return None
 
1750
 
 
1751
    def get_raw_records(self, memos_for_retrieval):
 
1752
        """Get the raw bytes for a records.
 
1753
 
 
1754
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1755
            length) memo for retrieving the bytes. The .knit method ignores
 
1756
            the index as there is always only a single file.
 
1757
        :return: An iterator over the bytes of the records.
 
1758
        """
 
1759
        read_vector = [(pos, size) for (index, pos, size) in memos_for_retrieval]
 
1760
        for pos, data in self._transport.readv(self._filename, read_vector):
 
1761
            yield data
 
1762
 
 
1763
 
 
1764
class _PackAccess(object):
 
1765
    """Access to knit records via a collection of packs."""
 
1766
 
 
1767
    def __init__(self, index_to_packs, writer=None):
 
1768
        """Create a _PackAccess object.
 
1769
 
 
1770
        :param index_to_packs: A dict mapping index objects to the transport
 
1771
            and file names for obtaining data.
 
1772
        :param writer: A tuple (pack.ContainerWriter, write_index) which
 
1773
            contains the pack to write, and the index that reads from it will
 
1774
            be associated with.
 
1775
        """
 
1776
        if writer:
 
1777
            self.container_writer = writer[0]
 
1778
            self.write_index = writer[1]
 
1779
        else:
 
1780
            self.container_writer = None
 
1781
            self.write_index = None
 
1782
        self.indices = index_to_packs
 
1783
 
 
1784
    def add_raw_records(self, sizes, raw_data):
 
1785
        """Add raw knit bytes to a storage area.
 
1786
 
 
1787
        The data is spooled to the container writer in one bytes-record per
 
1788
        raw data item.
 
1789
 
 
1790
        :param sizes: An iterable containing the size of each raw data segment.
 
1791
        :param raw_data: A bytestring containing the data.
 
1792
        :return: A list of memos to retrieve the record later. Each memo is a
 
1793
            tuple - (index, pos, length), where the index field is the 
 
1794
            write_index object supplied to the PackAccess object.
 
1795
        """
 
1796
        assert type(raw_data) == str, \
 
1797
            'data must be plain bytes was %s' % type(raw_data)
 
1798
        result = []
 
1799
        offset = 0
 
1800
        for size in sizes:
 
1801
            p_offset, p_length = self.container_writer.add_bytes_record(
 
1802
                raw_data[offset:offset+size], [])
 
1803
            offset += size
 
1804
            result.append((self.write_index, p_offset, p_length))
 
1805
        return result
 
1806
 
 
1807
    def create(self):
 
1808
        """Pack based knits do not get individually created."""
 
1809
 
 
1810
    def get_raw_records(self, memos_for_retrieval):
 
1811
        """Get the raw bytes for a records.
 
1812
 
 
1813
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
1814
            length) memo for retrieving the bytes. The Pack access method
 
1815
            looks up the pack to use for a given record in its index_to_pack
 
1816
            map.
 
1817
        :return: An iterator over the bytes of the records.
 
1818
        """
 
1819
        # first pass, group into same-index requests
 
1820
        request_lists = []
 
1821
        current_index = None
 
1822
        for (index, offset, length) in memos_for_retrieval:
 
1823
            if current_index == index:
 
1824
                current_list.append((offset, length))
 
1825
            else:
 
1826
                if current_index is not None:
 
1827
                    request_lists.append((current_index, current_list))
 
1828
                current_index = index
 
1829
                current_list = [(offset, length)]
 
1830
        # handle the last entry
 
1831
        if current_index is not None:
 
1832
            request_lists.append((current_index, current_list))
 
1833
        for index, offsets in request_lists:
 
1834
            transport, path = self.indices[index]
 
1835
            reader = pack.make_readv_reader(transport, path, offsets)
 
1836
            for names, read_func in reader.iter_records():
 
1837
                yield read_func(None)
 
1838
 
 
1839
    def open_file(self):
 
1840
        """Pack based knits have no single file."""
 
1841
        return None
 
1842
 
 
1843
    def set_writer(self, writer, index, (transport, packname)):
 
1844
        """Set a writer to use for adding data."""
 
1845
        self.indices[index] = (transport, packname)
 
1846
        self.container_writer = writer
 
1847
        self.write_index = index
 
1848
 
 
1849
 
 
1850
class _KnitData(object):
 
1851
    """Manage extraction of data from a KnitAccess, caching and decompressing.
 
1852
    
 
1853
    The KnitData class provides the logic for parsing and using knit records,
 
1854
    making use of an access method for the low level read and write operations.
 
1855
    """
 
1856
 
 
1857
    def __init__(self, access):
 
1858
        """Create a KnitData object.
 
1859
 
 
1860
        :param access: The access method to use. Access methods such as
 
1861
            _KnitAccess manage the insertion of raw records and the subsequent
 
1862
            retrieval of the same.
 
1863
        """
 
1864
        self._access = access
1676
1865
        self._checked = False
1677
1866
        # TODO: jam 20060713 conceptually, this could spill to disk
1678
1867
        #       if the cached size gets larger than a certain amount
1680
1869
        #       a simple dictionary
1681
1870
        self._cache = {}
1682
1871
        self._do_cache = False
1683
 
        if create:
1684
 
            if delay_create:
1685
 
                self._need_to_create = create
1686
 
            else:
1687
 
                self._transport.put_bytes_non_atomic(self._filename, '',
1688
 
                                                     mode=self._file_mode)
1689
1872
 
1690
1873
    def enable_cache(self):
1691
1874
        """Enable caching of reads."""
1697
1880
        self._cache = {}
1698
1881
 
1699
1882
    def _open_file(self):
1700
 
        try:
1701
 
            return self._transport.get(self._filename)
1702
 
        except NoSuchFile:
1703
 
            pass
1704
 
        return None
 
1883
        return self._access.open_file()
1705
1884
 
1706
1885
    def _record_to_data(self, version_id, digest, lines):
1707
1886
        """Convert version_id, digest, lines into a raw data block.
1724
1903
        sio.seek(0)
1725
1904
        return length, sio
1726
1905
 
1727
 
    def add_raw_record(self, raw_data):
 
1906
    def add_raw_records(self, sizes, raw_data):
1728
1907
        """Append a prepared record to the data file.
1729
1908
        
1730
 
        :return: the offset in the data file raw_data was written.
 
1909
        :param sizes: An iterable containing the size of each raw data segment.
 
1910
        :param raw_data: A bytestring containing the data.
 
1911
        :return: a list of index data for the way the data was stored.
 
1912
            See the access method add_raw_records documentation for more
 
1913
            details.
1731
1914
        """
1732
 
        assert isinstance(raw_data, str), 'data must be plain bytes'
1733
 
        if not self._need_to_create:
1734
 
            return self._transport.append_bytes(self._filename, raw_data)
1735
 
        else:
1736
 
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
1737
 
                                   create_parent_dir=self._create_parent_dir,
1738
 
                                   mode=self._file_mode,
1739
 
                                   dir_mode=self._dir_mode)
1740
 
            self._need_to_create = False
1741
 
            return 0
 
1915
        return self._access.add_raw_records(sizes, raw_data)
1742
1916
        
1743
1917
    def add_record(self, version_id, digest, lines):
1744
 
        """Write new text record to disk.  Returns the position in the
1745
 
        file where it was written."""
 
1918
        """Write new text record to disk. 
 
1919
        
 
1920
        Returns index data for retrieving it later, as per add_raw_records.
 
1921
        """
1746
1922
        size, sio = self._record_to_data(version_id, digest, lines)
1747
 
        # write to disk
1748
 
        if not self._need_to_create:
1749
 
            start_pos = self._transport.append_file(self._filename, sio)
1750
 
        else:
1751
 
            self._transport.put_file_non_atomic(self._filename, sio,
1752
 
                               create_parent_dir=self._create_parent_dir,
1753
 
                               mode=self._file_mode,
1754
 
                               dir_mode=self._dir_mode)
1755
 
            self._need_to_create = False
1756
 
            start_pos = 0
 
1923
        result = self.add_raw_records([size], sio.getvalue())
1757
1924
        if self._do_cache:
1758
1925
            self._cache[version_id] = sio.getvalue()
1759
 
        return start_pos, size
 
1926
        return result[0]
1760
1927
 
1761
1928
    def _parse_record_header(self, version_id, raw_data):
1762
1929
        """Parse a record header for consistency.
1768
1935
        try:
1769
1936
            rec = self._check_header(version_id, df.readline())
1770
1937
        except Exception, e:
1771
 
            raise KnitCorrupt(self._filename,
 
1938
            raise KnitCorrupt(self._access,
1772
1939
                              "While reading {%s} got %s(%s)"
1773
1940
                              % (version_id, e.__class__.__name__, str(e)))
1774
1941
        return df, rec
1776
1943
    def _check_header(self, version_id, line):
1777
1944
        rec = line.split()
1778
1945
        if len(rec) != 4:
1779
 
            raise KnitCorrupt(self._filename,
 
1946
            raise KnitCorrupt(self._access,
1780
1947
                              'unexpected number of elements in record header')
1781
1948
        if rec[1] != version_id:
1782
 
            raise KnitCorrupt(self._filename,
 
1949
            raise KnitCorrupt(self._access,
1783
1950
                              'unexpected version, wanted %r, got %r'
1784
1951
                              % (version_id, rec[1]))
1785
1952
        return rec
1794
1961
        try:
1795
1962
            record_contents = df.readlines()
1796
1963
        except Exception, e:
1797
 
            raise KnitCorrupt(self._filename,
 
1964
            raise KnitCorrupt(self._access,
1798
1965
                              "While reading {%s} got %s(%s)"
1799
1966
                              % (version_id, e.__class__.__name__, str(e)))
1800
1967
        header = record_contents.pop(0)
1802
1969
 
1803
1970
        last_line = record_contents.pop()
1804
1971
        if len(record_contents) != int(rec[2]):
1805
 
            raise KnitCorrupt(self._filename,
 
1972
            raise KnitCorrupt(self._access,
1806
1973
                              'incorrect number of lines %s != %s'
1807
1974
                              ' for version {%s}'
1808
1975
                              % (len(record_contents), int(rec[2]),
1809
1976
                                 version_id))
1810
1977
        if last_line != 'end %s\n' % rec[1]:
1811
 
            raise KnitCorrupt(self._filename,
 
1978
            raise KnitCorrupt(self._access,
1812
1979
                              'unexpected version end line %r, wanted %r' 
1813
1980
                              % (last_line, version_id))
1814
1981
        df.close()
1826
1993
            # grab the disk data needed.
1827
1994
            if self._cache:
1828
1995
                # Don't check _cache if it is empty
1829
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
1996
                needed_offsets = [index_memo for version_id, index_memo
1830
1997
                                              in records
1831
1998
                                              if version_id not in self._cache]
1832
1999
            else:
1833
 
                needed_offsets = [(pos, size) for version_id, pos, size
 
2000
                needed_offsets = [index_memo for version_id, index_memo
1834
2001
                                               in records]
1835
2002
 
1836
 
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
2003
            raw_records = self._access.get_raw_records(needed_offsets)
1837
2004
 
1838
 
        for version_id, pos, size in records:
 
2005
        for version_id, index_memo in records:
1839
2006
            if version_id in self._cache:
1840
2007
                # This data has already been validated
1841
2008
                data = self._cache[version_id]
1842
2009
            else:
1843
 
                pos, data = raw_records.next()
 
2010
                data = raw_records.next()
1844
2011
                if self._do_cache:
1845
2012
                    self._cache[version_id] = data
1846
2013
 
1885
2052
 
1886
2053
        # The transport optimizes the fetching as well 
1887
2054
        # (ie, reads continuous ranges.)
1888
 
        readv_response = self._transport.readv(self._filename,
1889
 
            [(pos, size) for version_id, pos, size in needed_records])
 
2055
        raw_data = self._access.get_raw_records(
 
2056
            [index_memo for version_id, index_memo in needed_records])
1890
2057
 
1891
 
        for (version_id, pos, size), (pos, data) in \
1892
 
                izip(iter(needed_records), readv_response):
 
2058
        for (version_id, index_memo), data in \
 
2059
                izip(iter(needed_records), raw_data):
1893
2060
            content, digest = self._parse_record(version_id, data)
1894
2061
            if self._do_cache:
1895
2062
                self._cache[version_id] = data
1984
2151
                    assert (self.target.has_version(parent) or
1985
2152
                            parent in copy_set or
1986
2153
                            not self.source.has_version(parent))
1987
 
                data_pos, data_size = self.source._index.get_position(version_id)
1988
 
                copy_queue_records.append((version_id, data_pos, data_size))
 
2154
                index_memo = self.source._index.get_position(version_id)
 
2155
                copy_queue_records.append((version_id, index_memo))
1989
2156
                copy_queue.append((version_id, options, parents))
1990
2157
                copy_set.add(version_id)
1991
2158