~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Matt Nordhoff
  • Date: 2009-04-04 02:50:01 UTC
  • mfrom: (4253 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4256.
  • Revision ID: mnordhoff@mattnordhoff.com-20090404025001-z1403k0tatmc8l91
Merge bzr.dev, fixing conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Knit versionedfile implementation.
18
18
 
20
20
updates.
21
21
 
22
22
Knit file layout:
23
 
lifeless: the data file is made up of "delta records".  each delta record has a delta header 
24
 
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of 
25
 
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a 
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
26
26
end-marker; simply "end VERSION"
27
27
 
28
28
delta can be line or full contents.a
35
35
130,130,2
36
36
8         if elt.get('executable') == 'yes':
37
37
8             ie.executable = True
38
 
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
39
39
 
40
40
 
41
41
whats in an index:
51
51
 
52
52
"""
53
53
 
54
 
# TODOS:
55
 
# 10:16 < lifeless> make partial index writes safe
56
 
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
 
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
58
 
#                    always' approach.
59
 
# move sha1 out of the content so that join is faster at verifying parents
60
 
# record content length ?
61
 
                  
62
54
 
63
55
from cStringIO import StringIO
64
56
from itertools import izip, chain
111
103
    ConstantMapper,
112
104
    ContentFactory,
113
105
    ChunkedContentFactory,
 
106
    sort_groupcompress,
114
107
    VersionedFile,
115
108
    VersionedFiles,
116
109
    )
131
124
 
132
125
DATA_SUFFIX = '.knit'
133
126
INDEX_SUFFIX = '.kndx'
 
127
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
134
128
 
135
129
 
136
130
class KnitAdapter(object):
138
132
 
139
133
    def __init__(self, basis_vf):
140
134
        """Create an adapter which accesses full texts from basis_vf.
141
 
        
 
135
 
142
136
        :param basis_vf: A versioned file to access basis texts of deltas from.
143
137
            May be None for adapters that do not need to access basis texts.
144
138
        """
248
242
 
249
243
class KnitContentFactory(ContentFactory):
250
244
    """Content factory for streaming from knits.
251
 
    
 
245
 
252
246
    :seealso ContentFactory:
253
247
    """
254
248
 
255
249
    def __init__(self, key, parents, build_details, sha1, raw_record,
256
250
        annotated, knit=None, network_bytes=None):
257
251
        """Create a KnitContentFactory for key.
258
 
        
 
252
 
259
253
        :param key: The key.
260
254
        :param parents: The parents.
261
255
        :param build_details: The build details as returned from
305
299
            if self._network_bytes is None:
306
300
                self._create_network_bytes()
307
301
            return self._network_bytes
 
302
        if ('-ft-' in self.storage_kind and
 
303
            storage_kind in ('chunked', 'fulltext')):
 
304
            adapter_key = (self.storage_kind, 'fulltext')
 
305
            adapter_factory = adapter_registry.get(adapter_key)
 
306
            adapter = adapter_factory(None)
 
307
            bytes = adapter.get_bytes(self)
 
308
            if storage_kind == 'chunked':
 
309
                return [bytes]
 
310
            else:
 
311
                return bytes
308
312
        if self._knit is not None:
 
313
            # Not redundant with direct conversion above - that only handles
 
314
            # fulltext cases.
309
315
            if storage_kind == 'chunked':
310
316
                return self._knit.get_lines(self.key[0])
311
317
            elif storage_kind == 'fulltext':
322
328
 
323
329
    def __init__(self, key, parents, generator, first):
324
330
        """Create a LazyKnitContentFactory.
325
 
        
 
331
 
326
332
        :param key: The key of the record.
327
333
        :param parents: The parents of the record.
328
334
        :param generator: A _ContentMapGenerator containing the record for this
403
409
 
404
410
class KnitContent(object):
405
411
    """Content of a knit version to which deltas can be applied.
406
 
    
 
412
 
407
413
    This is always stored in memory as a list of lines with \n at the end,
408
 
    plus a flag saying if the final ending is really there or not, because that 
 
414
    plus a flag saying if the final ending is really there or not, because that
409
415
    corresponds to the on-disk knit representation.
410
416
    """
411
417
 
500
506
 
501
507
class PlainKnitContent(KnitContent):
502
508
    """Unannotated content.
503
 
    
 
509
 
504
510
    When annotate[_iter] is called on this content, the same version is reported
505
511
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
506
512
    objects.
761
767
 
762
768
    This is only functional enough to run interface tests, it doesn't try to
763
769
    provide a full pack environment.
764
 
    
 
770
 
765
771
    :param annotated: knit annotations are wanted.
766
772
    :param mapper: The mapper from keys to paths.
767
773
    """
777
783
 
778
784
    This is only functional enough to run interface tests, it doesn't try to
779
785
    provide a full pack environment.
780
 
    
 
786
 
781
787
    :param graph: Store a graph.
782
788
    :param delta: Delta compress contents.
783
789
    :param keylength: How long should keys be.
814
820
    versioned_files.writer.end()
815
821
 
816
822
 
 
823
def _get_total_build_size(self, keys, positions):
 
824
    """Determine the total bytes to build these keys.
 
825
 
 
826
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
827
    don't inherit from a common base.)
 
828
 
 
829
    :param keys: Keys that we want to build
 
830
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
831
        as returned by _get_components_positions)
 
832
    :return: Number of bytes to build those keys
 
833
    """
 
834
    all_build_index_memos = {}
 
835
    build_keys = keys
 
836
    while build_keys:
 
837
        next_keys = set()
 
838
        for key in build_keys:
 
839
            # This is mostly for the 'stacked' case
 
840
            # Where we will be getting the data from a fallback
 
841
            if key not in positions:
 
842
                continue
 
843
            _, index_memo, compression_parent = positions[key]
 
844
            all_build_index_memos[key] = index_memo
 
845
            if compression_parent not in all_build_index_memos:
 
846
                next_keys.add(compression_parent)
 
847
        build_keys = next_keys
 
848
    return sum([index_memo[2] for index_memo
 
849
                in all_build_index_memos.itervalues()])
 
850
 
 
851
 
817
852
class KnitVersionedFiles(VersionedFiles):
818
853
    """Storage for many versioned files using knit compression.
819
854
 
820
855
    Backend storage is managed by indices and data objects.
821
856
 
822
 
    :ivar _index: A _KnitGraphIndex or similar that can describe the 
823
 
        parents, graph, compression and data location of entries in this 
824
 
        KnitVersionedFiles.  Note that this is only the index for 
 
857
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
858
        parents, graph, compression and data location of entries in this
 
859
        KnitVersionedFiles.  Note that this is only the index for
825
860
        *this* vfs; if there are fallbacks they must be queried separately.
826
861
    """
827
862
 
920
955
                lines[-1] = lines[-1] + '\n'
921
956
                line_bytes += '\n'
922
957
 
923
 
        for element in key:
 
958
        for element in key[:-1]:
924
959
            if type(element) != str:
925
960
                raise TypeError("key contains non-strings: %r" % (key,))
 
961
        if key[-1] is None:
 
962
            key = key[:-1] + ('sha1:' + digest,)
 
963
        elif type(key[-1]) != str:
 
964
                raise TypeError("key contains non-strings: %r" % (key,))
926
965
        # Knit hunks are still last-element only
927
966
        version_id = key[-1]
928
967
        content = self._factory.make(lines, version_id)
944
983
        else:
945
984
            options.append('fulltext')
946
985
            # isinstance is slower and we have no hierarchy.
947
 
            if self._factory.__class__ == KnitPlainFactory:
 
986
            if self._factory.__class__ is KnitPlainFactory:
948
987
                # Use the already joined bytes saving iteration time in
949
988
                # _record_to_data.
950
989
                size, bytes = self._record_to_data(key, digest,
987
1026
    def _check_add(self, key, lines, random_id, check_content):
988
1027
        """check that version_id and lines are safe to add."""
989
1028
        version_id = key[-1]
990
 
        if contains_whitespace(version_id):
991
 
            raise InvalidRevisionId(version_id, self)
992
 
        self.check_not_reserved_id(version_id)
 
1029
        if version_id is not None:
 
1030
            if contains_whitespace(version_id):
 
1031
                raise InvalidRevisionId(version_id, self)
 
1032
            self.check_not_reserved_id(version_id)
993
1033
        # TODO: If random_id==False and the key is already present, we should
994
1034
        # probably check that the existing content is identical to what is
995
1035
        # being inserted, and otherwise raise an exception.  This would make
1005
1045
 
1006
1046
    def _check_header_version(self, rec, version_id):
1007
1047
        """Checks the header version on original format knit records.
1008
 
        
 
1048
 
1009
1049
        These have the last component of the key embedded in the record.
1010
1050
        """
1011
1051
        if rec[1] != version_id:
1090
1130
            if missing and not allow_missing:
1091
1131
                raise errors.RevisionNotPresent(missing.pop(), self)
1092
1132
        return component_data
1093
 
       
 
1133
 
1094
1134
    def _get_content(self, key, parent_texts={}):
1095
1135
        """Returns a content object that makes up the specified
1096
1136
        version."""
1137
1177
 
1138
1178
    def _get_record_map(self, keys, allow_missing=False):
1139
1179
        """Produce a dictionary of knit records.
1140
 
        
 
1180
 
1141
1181
        :return: {key:(record, record_details, digest, next)}
1142
1182
            record
1143
1183
                data returned from read_records (a KnitContentobject)
1149
1189
                build-parent of the version, i.e. the leftmost ancestor.
1150
1190
                Will be None if the record is not a delta.
1151
1191
        :param keys: The keys to build a map for
1152
 
        :param allow_missing: If some records are missing, rather than 
 
1192
        :param allow_missing: If some records are missing, rather than
1153
1193
            error, just return the data that could be generated.
1154
1194
        """
1155
1195
        raw_map = self._get_record_map_unparsed(keys,
1158
1198
 
1159
1199
    def _raw_map_to_record_map(self, raw_map):
1160
1200
        """Parse the contents of _get_record_map_unparsed.
1161
 
        
 
1201
 
1162
1202
        :return: see _get_record_map.
1163
1203
        """
1164
1204
        result = {}
1170
1210
 
1171
1211
    def _get_record_map_unparsed(self, keys, allow_missing=False):
1172
1212
        """Get the raw data for reconstructing keys without parsing it.
1173
 
        
 
1213
 
1174
1214
        :return: A dict suitable for parsing via _raw_map_to_record_map.
1175
1215
            key-> raw_bytes, (method, noeol), compression_parent
1176
1216
        """
1189
1229
                # n = next
1190
1230
                records = [(key, i_m) for key, (r, i_m, n)
1191
1231
                                       in position_map.iteritems()]
 
1232
                # Sort by the index memo, so that we request records from the
 
1233
                # same pack file together, and in forward-sorted order
 
1234
                records.sort(key=operator.itemgetter(1))
1192
1235
                raw_record_map = {}
1193
1236
                for key, data in self._read_records_iter_unchecked(records):
1194
1237
                    (record_details, index_memo, next) = position_map[key]
1197
1240
            except errors.RetryWithNewPacks, e:
1198
1241
                self._access.reload_or_raise(e)
1199
1242
 
1200
 
    def _split_by_prefix(self, keys):
 
1243
    @classmethod
 
1244
    def _split_by_prefix(cls, keys):
1201
1245
        """For the given keys, split them up based on their prefix.
1202
1246
 
1203
1247
        To keep memory pressure somewhat under control, split the
1206
1250
        This should be revisited if _get_content_maps() can ever cross
1207
1251
        file-id boundaries.
1208
1252
 
 
1253
        The keys for a given file_id are kept in the same relative order.
 
1254
        Ordering between file_ids is not, though prefix_order will return the
 
1255
        order that the key was first seen.
 
1256
 
1209
1257
        :param keys: An iterable of key tuples
1210
 
        :return: A dict of {prefix: [key_list]}
 
1258
        :return: (split_map, prefix_order)
 
1259
            split_map       A dictionary mapping prefix => keys
 
1260
            prefix_order    The order that we saw the various prefixes
1211
1261
        """
1212
1262
        split_by_prefix = {}
 
1263
        prefix_order = []
1213
1264
        for key in keys:
1214
1265
            if len(key) == 1:
1215
 
                split_by_prefix.setdefault('', []).append(key)
1216
 
            else:
1217
 
                split_by_prefix.setdefault(key[0], []).append(key)
1218
 
        return split_by_prefix
 
1266
                prefix = ''
 
1267
            else:
 
1268
                prefix = key[0]
 
1269
 
 
1270
            if prefix in split_by_prefix:
 
1271
                split_by_prefix[prefix].append(key)
 
1272
            else:
 
1273
                split_by_prefix[prefix] = [key]
 
1274
                prefix_order.append(prefix)
 
1275
        return split_by_prefix, prefix_order
 
1276
 
 
1277
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1278
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1279
        """For the given keys, group them into 'best-sized' requests.
 
1280
 
 
1281
        The idea is to avoid making 1 request per file, but to never try to
 
1282
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1283
        possible, we should try to group requests to the same pack file
 
1284
        together.
 
1285
 
 
1286
        :return: list of (keys, non_local) tuples that indicate what keys
 
1287
            should be fetched next.
 
1288
        """
 
1289
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1290
        #       from the same pack file together, and we want to extract all
 
1291
        #       the texts for a given build-chain together. Ultimately it
 
1292
        #       probably needs a better global view.
 
1293
        total_keys = len(keys)
 
1294
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1295
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1296
        cur_keys = []
 
1297
        cur_non_local = set()
 
1298
        cur_size = 0
 
1299
        result = []
 
1300
        sizes = []
 
1301
        for prefix in prefix_order:
 
1302
            keys = prefix_split_keys[prefix]
 
1303
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1304
 
 
1305
            this_size = self._index._get_total_build_size(keys, positions)
 
1306
            cur_size += this_size
 
1307
            cur_keys.extend(keys)
 
1308
            cur_non_local.update(non_local)
 
1309
            if cur_size > _min_buffer_size:
 
1310
                result.append((cur_keys, cur_non_local))
 
1311
                sizes.append(cur_size)
 
1312
                cur_keys = []
 
1313
                cur_non_local = set()
 
1314
                cur_size = 0
 
1315
        if cur_keys:
 
1316
            result.append((cur_keys, cur_non_local))
 
1317
            sizes.append(cur_size)
 
1318
        return result
1219
1319
 
1220
1320
    def get_record_stream(self, keys, ordering, include_delta_closure):
1221
1321
        """Get a stream of records for keys.
1234
1334
        if not keys:
1235
1335
            return
1236
1336
        if not self._index.has_graph:
1237
 
            # Cannot topological order when no graph has been stored.
 
1337
            # Cannot sort when no graph has been stored.
1238
1338
            ordering = 'unordered'
1239
1339
 
1240
1340
        remaining_keys = keys
1296
1396
                    needed_from_fallback.add(key)
1297
1397
        # Double index lookups here : need a unified api ?
1298
1398
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
1299
 
        if ordering == 'topological':
1300
 
            # Global topological sort
1301
 
            present_keys = tsort.topo_sort(global_map)
 
1399
        if ordering in ('topological', 'groupcompress'):
 
1400
            if ordering == 'topological':
 
1401
                # Global topological sort
 
1402
                present_keys = tsort.topo_sort(global_map)
 
1403
            else:
 
1404
                present_keys = sort_groupcompress(global_map)
1302
1405
            # Now group by source:
1303
1406
            source_keys = []
1304
1407
            current_source = None
1314
1417
        else:
1315
1418
            if ordering != 'unordered':
1316
1419
                raise AssertionError('valid values for ordering are:'
1317
 
                    ' "unordered" or "topological" not: %r'
 
1420
                    ' "unordered", "groupcompress" or "topological" not: %r'
1318
1421
                    % (ordering,))
1319
1422
            # Just group by source; remote sources first.
1320
1423
            present_keys = []
1342
1445
            # XXX: get_content_maps performs its own index queries; allow state
1343
1446
            # to be passed in.
1344
1447
            non_local_keys = needed_from_fallback - absent_keys
1345
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1346
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1347
 
            for prefix, keys in prefix_split_keys.iteritems():
1348
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1349
 
                non_local = set(non_local)
1350
 
                generator = _VFContentMapGenerator(self, keys, non_local,
1351
 
                    global_map)
 
1448
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1449
                                                                non_local_keys,
 
1450
                                                                positions):
 
1451
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1452
                                                   global_map)
1352
1453
                for record in generator.get_record_stream():
1353
1454
                    yield record
1354
1455
        else:
1388
1489
    def insert_record_stream(self, stream):
1389
1490
        """Insert a record stream into this container.
1390
1491
 
1391
 
        :param stream: A stream of records to insert. 
 
1492
        :param stream: A stream of records to insert.
1392
1493
        :return: None
1393
1494
        :seealso VersionedFiles.get_record_stream:
1394
1495
        """
1434
1535
        # key = basis_parent, value = index entry to add
1435
1536
        buffered_index_entries = {}
1436
1537
        for record in stream:
 
1538
            buffered = False
1437
1539
            parents = record.parents
1438
1540
            if record.storage_kind in delta_types:
1439
1541
                # TODO: eventually the record itself should track
1485
1587
                access_memo = self._access.add_raw_records(
1486
1588
                    [(record.key, len(bytes))], bytes)[0]
1487
1589
                index_entry = (record.key, options, access_memo, parents)
1488
 
                buffered = False
1489
1590
                if 'fulltext' not in options:
1490
1591
                    # Not a fulltext, so we need to make sure the compression
1491
1592
                    # parent will also be present.
1513
1614
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1514
1615
                # 0) or because it depends on a base only present in the
1515
1616
                # fallback kvfs.
 
1617
                self._access.flush()
1516
1618
                try:
1517
1619
                    # Try getting a fulltext directly from the record.
1518
1620
                    bytes = record.get_bytes_as('fulltext')
1526
1628
                except errors.RevisionAlreadyPresent:
1527
1629
                    pass
1528
1630
            # Add any records whose basis parent is now available.
1529
 
            added_keys = [record.key]
1530
 
            while added_keys:
1531
 
                key = added_keys.pop(0)
1532
 
                if key in buffered_index_entries:
1533
 
                    index_entries = buffered_index_entries[key]
1534
 
                    self._index.add_records(index_entries)
1535
 
                    added_keys.extend(
1536
 
                        [index_entry[0] for index_entry in index_entries])
1537
 
                    del buffered_index_entries[key]
 
1631
            if not buffered:
 
1632
                added_keys = [record.key]
 
1633
                while added_keys:
 
1634
                    key = added_keys.pop(0)
 
1635
                    if key in buffered_index_entries:
 
1636
                        index_entries = buffered_index_entries[key]
 
1637
                        self._index.add_records(index_entries)
 
1638
                        added_keys.extend(
 
1639
                            [index_entry[0] for index_entry in index_entries])
 
1640
                        del buffered_index_entries[key]
1538
1641
        if buffered_index_entries:
1539
1642
            # There were index entries buffered at the end of the stream,
1540
1643
            # So these need to be added (if the index supports holding such
1577
1680
         * If a requested key did not change any lines (or didn't have any
1578
1681
           lines), it may not be mentioned at all in the result.
1579
1682
 
 
1683
        :param pb: Progress bar supplied by caller.
1580
1684
        :return: An iterator over (line, key).
1581
1685
        """
1582
1686
        if pb is None:
1596
1700
                        key_records.append((key, details[0]))
1597
1701
                records_iter = enumerate(self._read_records_iter(key_records))
1598
1702
                for (key_idx, (key, data, sha_value)) in records_iter:
1599
 
                    pb.update('Walking content.', key_idx, total)
 
1703
                    pb.update('Walking content', key_idx, total)
1600
1704
                    compression_parent = build_details[key][1]
1601
1705
                    if compression_parent is None:
1602
1706
                        # fulltext
1603
1707
                        line_iterator = self._factory.get_fulltext_content(data)
1604
1708
                    else:
1605
 
                        # Delta 
 
1709
                        # Delta
1606
1710
                        line_iterator = self._factory.get_linedelta_content(data)
1607
1711
                    # Now that we are yielding the data for this key, remove it
1608
1712
                    # from the list
1619
1723
        # If there are still keys we've not yet found, we look in the fallback
1620
1724
        # vfs, and hope to find them there.  Note that if the keys are found
1621
1725
        # but had no changes or no content, the fallback may not return
1622
 
        # anything.  
 
1726
        # anything.
1623
1727
        if keys and not self._fallback_vfs:
1624
1728
            # XXX: strictly the second parameter is meant to be the file id
1625
1729
            # but it's not easily accessible here.
1632
1736
                source_keys.add(key)
1633
1737
                yield line, key
1634
1738
            keys.difference_update(source_keys)
1635
 
        pb.update('Walking content.', total, total)
 
1739
        pb.update('Walking content', total, total)
1636
1740
 
1637
1741
    def _make_line_delta(self, delta_seq, new_content):
1638
1742
        """Generate a line delta from delta_seq and new_content."""
1647
1751
                           delta=None, annotated=None,
1648
1752
                           left_matching_blocks=None):
1649
1753
        """Merge annotations for content and generate deltas.
1650
 
        
 
1754
 
1651
1755
        This is done by comparing the annotations based on changes to the text
1652
1756
        and generating a delta on the resulting full texts. If annotations are
1653
1757
        not being created then a simple delta is created.
1735
1839
                                 rec[1], record_contents))
1736
1840
        if last_line != 'end %s\n' % rec[1]:
1737
1841
            raise KnitCorrupt(self,
1738
 
                              'unexpected version end line %r, wanted %r' 
 
1842
                              'unexpected version end line %r, wanted %r'
1739
1843
                              % (last_line, rec[1]))
1740
1844
        df.close()
1741
1845
        return rec, record_contents
1758
1862
        if not needed_records:
1759
1863
            return
1760
1864
 
1761
 
        # The transport optimizes the fetching as well 
 
1865
        # The transport optimizes the fetching as well
1762
1866
        # (ie, reads continuous ranges.)
1763
1867
        raw_data = self._access.get_raw_records(
1764
1868
            [index_memo for key, index_memo in needed_records])
1805
1909
 
1806
1910
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1807
1911
        """Convert key, digest, lines into a raw data block.
1808
 
        
 
1912
 
1809
1913
        :param key: The key of the record. Currently keys are always serialised
1810
1914
            using just the trailing component.
1811
1915
        :param dense_lines: The bytes of lines but in a denser form. For
1874
1978
 
1875
1979
    def _work(self):
1876
1980
        """Produce maps of text and KnitContents as dicts.
1877
 
        
 
1981
 
1878
1982
        :return: (text_map, content_map) where text_map contains the texts for
1879
1983
            the requested versions and content_map contains the KnitContents.
1880
1984
        """
1927
2031
                self._raw_record_map)
1928
2032
        record_map = self._record_map
1929
2033
        # raw_record_map is key:
1930
 
        # Have read and parsed records at this point. 
 
2034
        # Have read and parsed records at this point.
1931
2035
        for key in self.keys:
1932
2036
            if key in self.nonlocal_keys:
1933
2037
                # already handled
2033
2137
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
2034
2138
        global_map=None, raw_record_map=None):
2035
2139
        """Create a _ContentMapGenerator.
2036
 
        
 
2140
 
2037
2141
        :param versioned_files: The versioned files that the texts are being
2038
2142
            extracted from.
2039
2143
        :param keys: The keys to produce content maps for.
2180
2284
 
2181
2285
    Duplicate entries may be written to the index for a single version id
2182
2286
    if this is done then the latter one completely replaces the former:
2183
 
    this allows updates to correct version and parent information. 
 
2287
    this allows updates to correct version and parent information.
2184
2288
    Note that the two entries may share the delta, and that successive
2185
2289
    annotations and references MUST point to the first entry.
2186
2290
 
2187
2291
    The index file on disc contains a header, followed by one line per knit
2188
2292
    record. The same revision can be present in an index file more than once.
2189
 
    The first occurrence gets assigned a sequence number starting from 0. 
2190
 
    
 
2293
    The first occurrence gets assigned a sequence number starting from 0.
 
2294
 
2191
2295
    The format of a single line is
2192
2296
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2193
2297
    REVISION_ID is a utf8-encoded revision id
2194
 
    FLAGS is a comma separated list of flags about the record. Values include 
 
2298
    FLAGS is a comma separated list of flags about the record. Values include
2195
2299
        no-eol, line-delta, fulltext.
2196
2300
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
2197
2301
        that the the compressed data starts at.
2201
2305
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
2202
2306
        revision id already in the knit that is a parent of REVISION_ID.
2203
2307
    The ' :' marker is the end of record marker.
2204
 
    
 
2308
 
2205
2309
    partial writes:
2206
2310
    when a write is interrupted to the index file, it will result in a line
2207
2311
    that does not end in ' :'. If the ' :' is not present at the end of a line,
2234
2338
 
2235
2339
    def add_records(self, records, random_id=False, missing_compression_parents=False):
2236
2340
        """Add multiple records to the index.
2237
 
        
 
2341
 
2238
2342
        :param records: a list of tuples:
2239
2343
                         (key, options, access_memo, parents).
2240
2344
        :param random_id: If True the ids being added were randomly generated
2296
2400
        # Because kndx files do not support atomic insertion via separate index
2297
2401
        # files, they do not support this method.
2298
2402
        raise NotImplementedError(self.get_missing_compression_parents)
2299
 
    
 
2403
 
2300
2404
    def _cache_key(self, key, options, pos, size, parent_keys):
2301
2405
        """Cache a version record in the history array and index cache.
2302
2406
 
2435
2539
 
2436
2540
    def get_position(self, key):
2437
2541
        """Return details needed to access the version.
2438
 
        
 
2542
 
2439
2543
        :return: a tuple (key, data position, size) to hand to the access
2440
2544
            logic to get the record.
2441
2545
        """
2445
2549
        return key, entry[2], entry[3]
2446
2550
 
2447
2551
    has_key = _mod_index._has_key_from_parent_map
2448
 
    
 
2552
 
2449
2553
    def _init_index(self, path, extra_lines=[]):
2450
2554
        """Initialize an index."""
2451
2555
        sio = StringIO()
2460
2564
 
2461
2565
    def keys(self):
2462
2566
        """Get all the keys in the collection.
2463
 
        
 
2567
 
2464
2568
        The keys are not ordered.
2465
2569
        """
2466
2570
        result = set()
2479
2583
            for suffix in self._kndx_cache[prefix][1]:
2480
2584
                result.add(prefix + (suffix,))
2481
2585
        return result
2482
 
    
 
2586
 
2483
2587
    def _load_prefixes(self, prefixes):
2484
2588
        """Load the indices for prefixes."""
2485
2589
        self._check_read()
2523
2627
 
2524
2628
    def _dictionary_compress(self, keys):
2525
2629
        """Dictionary compress keys.
2526
 
        
 
2630
 
2527
2631
        :param keys: The keys to generate references to.
2528
2632
        :return: A string representation of keys. keys which are present are
2529
2633
            dictionary compressed, and others are emitted as fulltext with a
2577
2681
            return index_memo[0][:-1], index_memo[1]
2578
2682
        return keys.sort(key=get_sort_key)
2579
2683
 
 
2684
    _get_total_build_size = _get_total_build_size
 
2685
 
2580
2686
    def _split_key(self, key):
2581
2687
        """Split key into a prefix and suffix."""
2582
2688
        return key[:-1], key[-1]
2593
2699
        :param is_locked: A callback to check whether the object should answer
2594
2700
            queries.
2595
2701
        :param deltas: Allow delta-compressed records.
2596
 
        :param parents: If True, record knits parents, if not do not record 
 
2702
        :param parents: If True, record knits parents, if not do not record
2597
2703
            parents.
2598
2704
        :param add_callback: If not None, allow additions to the index and call
2599
2705
            this callback with a list of added GraphIndex nodes:
2620
2726
    def add_records(self, records, random_id=False,
2621
2727
        missing_compression_parents=False):
2622
2728
        """Add multiple records to the index.
2623
 
        
 
2729
 
2624
2730
        This function does not insert data into the Immutable GraphIndex
2625
2731
        backing the KnitGraphIndex, instead it prepares data for insertion by
2626
2732
        the caller and checks that it is safe to insert then calls
2698
2804
            self._missing_compression_parents.update(compression_parents)
2699
2805
        # Adding records may have satisfied missing compression parents.
2700
2806
        self._missing_compression_parents.difference_update(keys)
2701
 
        
 
2807
 
2702
2808
    def scan_unvalidated_index(self, graph_index):
2703
2809
        """Inform this _KnitGraphIndex that there is an unvalidated index.
2704
2810
 
2786
2892
 
2787
2893
    def _get_entries(self, keys, check_present=False):
2788
2894
        """Get the entries for keys.
2789
 
        
 
2895
 
2790
2896
        :param keys: An iterable of index key tuples.
2791
2897
        """
2792
2898
        keys = set(keys)
2854
2960
 
2855
2961
    def get_position(self, key):
2856
2962
        """Return details needed to access the version.
2857
 
        
 
2963
 
2858
2964
        :return: a tuple (index, data position, size) to hand to the access
2859
2965
            logic to get the record.
2860
2966
        """
2865
2971
 
2866
2972
    def keys(self):
2867
2973
        """Get all the keys in the collection.
2868
 
        
 
2974
 
2869
2975
        The keys are not ordered.
2870
2976
        """
2871
2977
        self._check_read()
2872
2978
        return [node[1] for node in self._graph_index.iter_all_entries()]
2873
 
    
 
2979
 
2874
2980
    missing_keys = _mod_index._missing_keys_from_parent_map
2875
2981
 
2876
2982
    def _node_to_position(self, node):
2898
3004
            return positions[key][1]
2899
3005
        return keys.sort(key=get_index_memo)
2900
3006
 
 
3007
    _get_total_build_size = _get_total_build_size
 
3008
 
2901
3009
 
2902
3010
class _KnitKeyAccess(object):
2903
3011
    """Access to records in .knit files."""
2947
3055
            result.append((key, base, size))
2948
3056
        return result
2949
3057
 
 
3058
    def flush(self):
 
3059
        """Flush pending writes on this access object.
 
3060
        
 
3061
        For .knit files this is a no-op.
 
3062
        """
 
3063
        pass
 
3064
 
2950
3065
    def get_raw_records(self, memos_for_retrieval):
2951
3066
        """Get the raw bytes for a records.
2952
3067
 
2977
3092
class _DirectPackAccess(object):
2978
3093
    """Access to data in one or more packs with less translation."""
2979
3094
 
2980
 
    def __init__(self, index_to_packs, reload_func=None):
 
3095
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2981
3096
        """Create a _DirectPackAccess object.
2982
3097
 
2983
3098
        :param index_to_packs: A dict mapping index objects to the transport
2990
3105
        self._write_index = None
2991
3106
        self._indices = index_to_packs
2992
3107
        self._reload_func = reload_func
 
3108
        self._flush_func = flush_func
2993
3109
 
2994
3110
    def add_raw_records(self, key_sizes, raw_data):
2995
3111
        """Add raw knit bytes to a storage area.
3017
3133
            result.append((self._write_index, p_offset, p_length))
3018
3134
        return result
3019
3135
 
 
3136
    def flush(self):
 
3137
        """Flush pending writes on this access object.
 
3138
 
 
3139
        This will flush any buffered writes to a NewPack.
 
3140
        """
 
3141
        if self._flush_func is not None:
 
3142
            self._flush_func()
 
3143
            
3020
3144
    def get_raw_records(self, memos_for_retrieval):
3021
3145
        """Get the raw bytes for a records.
3022
3146
 
3023
 
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
3147
        :param memos_for_retrieval: An iterable containing the (index, pos,
3024
3148
            length) memo for retrieving the bytes. The Pack access method
3025
3149
            looks up the pack to use for a given record in its index_to_pack
3026
3150
            map.