~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Robert Collins
  • Date: 2010-05-06 07:48:22 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506074822-0bsgf2j4h8jx0xkk
Added ``bzrlib.tests.matchers`` as a place to put matchers, along with
our first in-tree matcher. See the module docstring for details.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Knit versionedfile implementation.
18
18
 
20
20
updates.
21
21
 
22
22
Knit file layout:
23
 
lifeless: the data file is made up of "delta records".  each delta record has a delta header 
24
 
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of 
25
 
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a 
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
26
26
end-marker; simply "end VERSION"
27
27
 
28
28
delta can be line or full contents.a
35
35
130,130,2
36
36
8         if elt.get('executable') == 'yes':
37
37
8             ie.executable = True
38
 
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
39
39
 
40
40
 
41
41
whats in an index:
51
51
 
52
52
"""
53
53
 
54
 
# TODOS:
55
 
# 10:16 < lifeless> make partial index writes safe
56
 
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
 
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
58
 
#                    always' approach.
59
 
# move sha1 out of the content so that join is faster at verifying parents
60
 
# record content length ?
61
 
                  
62
54
 
63
55
from cStringIO import StringIO
64
 
from itertools import izip, chain
 
56
from itertools import izip
65
57
import operator
66
58
import os
67
59
import sys
77
69
    lru_cache,
78
70
    pack,
79
71
    progress,
 
72
    static_tuple,
80
73
    trace,
81
74
    tsort,
82
75
    tuned_gzip,
 
76
    ui,
83
77
    )
84
78
""")
85
79
from bzrlib import (
111
105
    ConstantMapper,
112
106
    ContentFactory,
113
107
    ChunkedContentFactory,
 
108
    sort_groupcompress,
114
109
    VersionedFile,
115
110
    VersionedFiles,
116
111
    )
131
126
 
132
127
DATA_SUFFIX = '.knit'
133
128
INDEX_SUFFIX = '.kndx'
 
129
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
134
130
 
135
131
 
136
132
class KnitAdapter(object):
138
134
 
139
135
    def __init__(self, basis_vf):
140
136
        """Create an adapter which accesses full texts from basis_vf.
141
 
        
 
137
 
142
138
        :param basis_vf: A versioned file to access basis texts of deltas from.
143
139
            May be None for adapters that do not need to access basis texts.
144
140
        """
248
244
 
249
245
class KnitContentFactory(ContentFactory):
250
246
    """Content factory for streaming from knits.
251
 
    
 
247
 
252
248
    :seealso ContentFactory:
253
249
    """
254
250
 
255
251
    def __init__(self, key, parents, build_details, sha1, raw_record,
256
252
        annotated, knit=None, network_bytes=None):
257
253
        """Create a KnitContentFactory for key.
258
 
        
 
254
 
259
255
        :param key: The key.
260
256
        :param parents: The parents.
261
257
        :param build_details: The build details as returned from
305
301
            if self._network_bytes is None:
306
302
                self._create_network_bytes()
307
303
            return self._network_bytes
 
304
        if ('-ft-' in self.storage_kind and
 
305
            storage_kind in ('chunked', 'fulltext')):
 
306
            adapter_key = (self.storage_kind, 'fulltext')
 
307
            adapter_factory = adapter_registry.get(adapter_key)
 
308
            adapter = adapter_factory(None)
 
309
            bytes = adapter.get_bytes(self)
 
310
            if storage_kind == 'chunked':
 
311
                return [bytes]
 
312
            else:
 
313
                return bytes
308
314
        if self._knit is not None:
 
315
            # Not redundant with direct conversion above - that only handles
 
316
            # fulltext cases.
309
317
            if storage_kind == 'chunked':
310
318
                return self._knit.get_lines(self.key[0])
311
319
            elif storage_kind == 'fulltext':
322
330
 
323
331
    def __init__(self, key, parents, generator, first):
324
332
        """Create a LazyKnitContentFactory.
325
 
        
 
333
 
326
334
        :param key: The key of the record.
327
335
        :param parents: The parents of the record.
328
336
        :param generator: A _ContentMapGenerator containing the record for this
403
411
 
404
412
class KnitContent(object):
405
413
    """Content of a knit version to which deltas can be applied.
406
 
    
 
414
 
407
415
    This is always stored in memory as a list of lines with \n at the end,
408
 
    plus a flag saying if the final ending is really there or not, because that 
 
416
    plus a flag saying if the final ending is really there or not, because that
409
417
    corresponds to the on-disk knit representation.
410
418
    """
411
419
 
500
508
 
501
509
class PlainKnitContent(KnitContent):
502
510
    """Unannotated content.
503
 
    
 
511
 
504
512
    When annotate[_iter] is called on this content, the same version is reported
505
513
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
506
514
    objects.
658
666
 
659
667
        see parse_fulltext which this inverts.
660
668
        """
661
 
        # TODO: jam 20070209 We only do the caching thing to make sure that
662
 
        #       the origin is a valid utf-8 line, eventually we could remove it
663
669
        return ['%s %s' % (o, t) for o, t in content._lines]
664
670
 
665
671
    def lower_line_delta(self, delta):
680
686
        content = knit._get_content(key)
681
687
        # adjust for the fact that serialised annotations are only key suffixes
682
688
        # for this factory.
683
 
        if type(key) == tuple:
 
689
        if type(key) is tuple:
684
690
            prefix = key[:-1]
685
691
            origins = content.annotate()
686
692
            result = []
752
758
 
753
759
    def annotate(self, knit, key):
754
760
        annotator = _KnitAnnotator(knit)
755
 
        return annotator.annotate(key)
 
761
        return annotator.annotate_flat(key)
756
762
 
757
763
 
758
764
 
761
767
 
762
768
    This is only functional enough to run interface tests, it doesn't try to
763
769
    provide a full pack environment.
764
 
    
 
770
 
765
771
    :param annotated: knit annotations are wanted.
766
772
    :param mapper: The mapper from keys to paths.
767
773
    """
777
783
 
778
784
    This is only functional enough to run interface tests, it doesn't try to
779
785
    provide a full pack environment.
780
 
    
 
786
 
781
787
    :param graph: Store a graph.
782
788
    :param delta: Delta compress contents.
783
789
    :param keylength: How long should keys be.
814
820
    versioned_files.writer.end()
815
821
 
816
822
 
 
823
def _get_total_build_size(self, keys, positions):
 
824
    """Determine the total bytes to build these keys.
 
825
 
 
826
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
827
    don't inherit from a common base.)
 
828
 
 
829
    :param keys: Keys that we want to build
 
830
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
831
        as returned by _get_components_positions)
 
832
    :return: Number of bytes to build those keys
 
833
    """
 
834
    all_build_index_memos = {}
 
835
    build_keys = keys
 
836
    while build_keys:
 
837
        next_keys = set()
 
838
        for key in build_keys:
 
839
            # This is mostly for the 'stacked' case
 
840
            # Where we will be getting the data from a fallback
 
841
            if key not in positions:
 
842
                continue
 
843
            _, index_memo, compression_parent = positions[key]
 
844
            all_build_index_memos[key] = index_memo
 
845
            if compression_parent not in all_build_index_memos:
 
846
                next_keys.add(compression_parent)
 
847
        build_keys = next_keys
 
848
    return sum([index_memo[2] for index_memo
 
849
                in all_build_index_memos.itervalues()])
 
850
 
 
851
 
817
852
class KnitVersionedFiles(VersionedFiles):
818
853
    """Storage for many versioned files using knit compression.
819
854
 
820
855
    Backend storage is managed by indices and data objects.
821
856
 
822
 
    :ivar _index: A _KnitGraphIndex or similar that can describe the 
823
 
        parents, graph, compression and data location of entries in this 
824
 
        KnitVersionedFiles.  Note that this is only the index for 
 
857
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
858
        parents, graph, compression and data location of entries in this
 
859
        KnitVersionedFiles.  Note that this is only the index for
825
860
        *this* vfs; if there are fallbacks they must be queried separately.
826
861
    """
827
862
 
874
909
            # indexes can't directly store that, so we give them
875
910
            # an empty tuple instead.
876
911
            parents = ()
 
912
        line_bytes = ''.join(lines)
877
913
        return self._add(key, lines, parents,
878
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
 
914
            parent_texts, left_matching_blocks, nostore_sha, random_id,
 
915
            line_bytes=line_bytes)
 
916
 
 
917
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
918
        """See VersionedFiles._add_text()."""
 
919
        self._index._check_write_ok()
 
920
        self._check_add(key, None, random_id, check_content=False)
 
921
        if text.__class__ is not str:
 
922
            raise errors.BzrBadParameterUnicode("text")
 
923
        if parents is None:
 
924
            # The caller might pass None if there is no graph data, but kndx
 
925
            # indexes can't directly store that, so we give them
 
926
            # an empty tuple instead.
 
927
            parents = ()
 
928
        return self._add(key, None, parents,
 
929
            None, None, nostore_sha, random_id,
 
930
            line_bytes=text)
879
931
 
880
932
    def _add(self, key, lines, parents, parent_texts,
881
 
        left_matching_blocks, nostore_sha, random_id):
 
933
        left_matching_blocks, nostore_sha, random_id,
 
934
        line_bytes):
882
935
        """Add a set of lines on top of version specified by parents.
883
936
 
884
937
        Any versions not present will be converted into ghosts.
 
938
 
 
939
        :param lines: A list of strings where each one is a single line (has a
 
940
            single newline at the end of the string) This is now optional
 
941
            (callers can pass None). It is left in its location for backwards
 
942
            compatibility. It should ''.join(lines) must == line_bytes
 
943
        :param line_bytes: A single string containing the content
 
944
 
 
945
        We pass both lines and line_bytes because different routes bring the
 
946
        values to this function. And for memory efficiency, we don't want to
 
947
        have to split/join on-demand.
885
948
        """
886
949
        # first thing, if the content is something we don't need to store, find
887
950
        # that out.
888
 
        line_bytes = ''.join(lines)
889
951
        digest = sha_string(line_bytes)
890
952
        if nostore_sha == digest:
891
953
            raise errors.ExistingContent
912
974
 
913
975
        text_length = len(line_bytes)
914
976
        options = []
915
 
        if lines:
916
 
            if lines[-1][-1] != '\n':
917
 
                # copy the contents of lines.
 
977
        no_eol = False
 
978
        # Note: line_bytes is not modified to add a newline, that is tracked
 
979
        #       via the no_eol flag. 'lines' *is* modified, because that is the
 
980
        #       general values needed by the Content code.
 
981
        if line_bytes and line_bytes[-1] != '\n':
 
982
            options.append('no-eol')
 
983
            no_eol = True
 
984
            # Copy the existing list, or create a new one
 
985
            if lines is None:
 
986
                lines = osutils.split_lines(line_bytes)
 
987
            else:
918
988
                lines = lines[:]
919
 
                options.append('no-eol')
920
 
                lines[-1] = lines[-1] + '\n'
921
 
                line_bytes += '\n'
 
989
            # Replace the last line with one that ends in a final newline
 
990
            lines[-1] = lines[-1] + '\n'
 
991
        if lines is None:
 
992
            lines = osutils.split_lines(line_bytes)
922
993
 
923
 
        for element in key:
924
 
            if type(element) != str:
 
994
        for element in key[:-1]:
 
995
            if type(element) is not str:
 
996
                raise TypeError("key contains non-strings: %r" % (key,))
 
997
        if key[-1] is None:
 
998
            key = key[:-1] + ('sha1:' + digest,)
 
999
        elif type(key[-1]) is not str:
925
1000
                raise TypeError("key contains non-strings: %r" % (key,))
926
1001
        # Knit hunks are still last-element only
927
1002
        version_id = key[-1]
928
1003
        content = self._factory.make(lines, version_id)
929
 
        if 'no-eol' in options:
 
1004
        if no_eol:
930
1005
            # Hint to the content object that its text() call should strip the
931
1006
            # EOL.
932
1007
            content._should_strip_eol = True
944
1019
        else:
945
1020
            options.append('fulltext')
946
1021
            # isinstance is slower and we have no hierarchy.
947
 
            if self._factory.__class__ == KnitPlainFactory:
 
1022
            if self._factory.__class__ is KnitPlainFactory:
948
1023
                # Use the already joined bytes saving iteration time in
949
1024
                # _record_to_data.
 
1025
                dense_lines = [line_bytes]
 
1026
                if no_eol:
 
1027
                    dense_lines.append('\n')
950
1028
                size, bytes = self._record_to_data(key, digest,
951
 
                    lines, [line_bytes])
 
1029
                    lines, dense_lines)
952
1030
            else:
953
1031
                # get mixed annotation + content and feed it into the
954
1032
                # serialiser.
966
1044
        """See VersionedFiles.annotate."""
967
1045
        return self._factory.annotate(self, key)
968
1046
 
969
 
    def check(self, progress_bar=None):
 
1047
    def get_annotator(self):
 
1048
        return _KnitAnnotator(self)
 
1049
 
 
1050
    def check(self, progress_bar=None, keys=None):
970
1051
        """See VersionedFiles.check()."""
 
1052
        if keys is None:
 
1053
            return self._logical_check()
 
1054
        else:
 
1055
            # At the moment, check does not extra work over get_record_stream
 
1056
            return self.get_record_stream(keys, 'unordered', True)
 
1057
 
 
1058
    def _logical_check(self):
971
1059
        # This doesn't actually test extraction of everything, but that will
972
1060
        # impact 'bzr check' substantially, and needs to be integrated with
973
1061
        # care. However, it does check for the obvious problem of a delta with
987
1075
    def _check_add(self, key, lines, random_id, check_content):
988
1076
        """check that version_id and lines are safe to add."""
989
1077
        version_id = key[-1]
990
 
        if contains_whitespace(version_id):
991
 
            raise InvalidRevisionId(version_id, self)
992
 
        self.check_not_reserved_id(version_id)
 
1078
        if version_id is not None:
 
1079
            if contains_whitespace(version_id):
 
1080
                raise InvalidRevisionId(version_id, self)
 
1081
            self.check_not_reserved_id(version_id)
993
1082
        # TODO: If random_id==False and the key is already present, we should
994
1083
        # probably check that the existing content is identical to what is
995
1084
        # being inserted, and otherwise raise an exception.  This would make
1005
1094
 
1006
1095
    def _check_header_version(self, rec, version_id):
1007
1096
        """Checks the header version on original format knit records.
1008
 
        
 
1097
 
1009
1098
        These have the last component of the key embedded in the record.
1010
1099
        """
1011
1100
        if rec[1] != version_id:
1090
1179
            if missing and not allow_missing:
1091
1180
                raise errors.RevisionNotPresent(missing.pop(), self)
1092
1181
        return component_data
1093
 
       
 
1182
 
1094
1183
    def _get_content(self, key, parent_texts={}):
1095
1184
        """Returns a content object that makes up the specified
1096
1185
        version."""
1103
1192
        generator = _VFContentMapGenerator(self, [key])
1104
1193
        return generator._get_content(key)
1105
1194
 
 
1195
    def get_known_graph_ancestry(self, keys):
 
1196
        """Get a KnownGraph instance with the ancestry of keys."""
 
1197
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1198
        for fallback in self._fallback_vfs:
 
1199
            if not missing_keys:
 
1200
                break
 
1201
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1202
                                                missing_keys)
 
1203
            parent_map.update(f_parent_map)
 
1204
            missing_keys = f_missing_keys
 
1205
        kg = _mod_graph.KnownGraph(parent_map)
 
1206
        return kg
 
1207
 
1106
1208
    def get_parent_map(self, keys):
1107
1209
        """Get a map of the graph parents of keys.
1108
1210
 
1137
1239
 
1138
1240
    def _get_record_map(self, keys, allow_missing=False):
1139
1241
        """Produce a dictionary of knit records.
1140
 
        
 
1242
 
1141
1243
        :return: {key:(record, record_details, digest, next)}
1142
1244
            record
1143
1245
                data returned from read_records (a KnitContentobject)
1149
1251
                build-parent of the version, i.e. the leftmost ancestor.
1150
1252
                Will be None if the record is not a delta.
1151
1253
        :param keys: The keys to build a map for
1152
 
        :param allow_missing: If some records are missing, rather than 
 
1254
        :param allow_missing: If some records are missing, rather than
1153
1255
            error, just return the data that could be generated.
1154
1256
        """
1155
1257
        raw_map = self._get_record_map_unparsed(keys,
1158
1260
 
1159
1261
    def _raw_map_to_record_map(self, raw_map):
1160
1262
        """Parse the contents of _get_record_map_unparsed.
1161
 
        
 
1263
 
1162
1264
        :return: see _get_record_map.
1163
1265
        """
1164
1266
        result = {}
1170
1272
 
1171
1273
    def _get_record_map_unparsed(self, keys, allow_missing=False):
1172
1274
        """Get the raw data for reconstructing keys without parsing it.
1173
 
        
 
1275
 
1174
1276
        :return: A dict suitable for parsing via _raw_map_to_record_map.
1175
1277
            key-> raw_bytes, (method, noeol), compression_parent
1176
1278
        """
1189
1291
                # n = next
1190
1292
                records = [(key, i_m) for key, (r, i_m, n)
1191
1293
                                       in position_map.iteritems()]
 
1294
                # Sort by the index memo, so that we request records from the
 
1295
                # same pack file together, and in forward-sorted order
 
1296
                records.sort(key=operator.itemgetter(1))
1192
1297
                raw_record_map = {}
1193
1298
                for key, data in self._read_records_iter_unchecked(records):
1194
1299
                    (record_details, index_memo, next) = position_map[key]
1197
1302
            except errors.RetryWithNewPacks, e:
1198
1303
                self._access.reload_or_raise(e)
1199
1304
 
1200
 
    def _split_by_prefix(self, keys):
 
1305
    @classmethod
 
1306
    def _split_by_prefix(cls, keys):
1201
1307
        """For the given keys, split them up based on their prefix.
1202
1308
 
1203
1309
        To keep memory pressure somewhat under control, split the
1206
1312
        This should be revisited if _get_content_maps() can ever cross
1207
1313
        file-id boundaries.
1208
1314
 
 
1315
        The keys for a given file_id are kept in the same relative order.
 
1316
        Ordering between file_ids is not, though prefix_order will return the
 
1317
        order that the key was first seen.
 
1318
 
1209
1319
        :param keys: An iterable of key tuples
1210
 
        :return: A dict of {prefix: [key_list]}
 
1320
        :return: (split_map, prefix_order)
 
1321
            split_map       A dictionary mapping prefix => keys
 
1322
            prefix_order    The order that we saw the various prefixes
1211
1323
        """
1212
1324
        split_by_prefix = {}
 
1325
        prefix_order = []
1213
1326
        for key in keys:
1214
1327
            if len(key) == 1:
1215
 
                split_by_prefix.setdefault('', []).append(key)
1216
 
            else:
1217
 
                split_by_prefix.setdefault(key[0], []).append(key)
1218
 
        return split_by_prefix
 
1328
                prefix = ''
 
1329
            else:
 
1330
                prefix = key[0]
 
1331
 
 
1332
            if prefix in split_by_prefix:
 
1333
                split_by_prefix[prefix].append(key)
 
1334
            else:
 
1335
                split_by_prefix[prefix] = [key]
 
1336
                prefix_order.append(prefix)
 
1337
        return split_by_prefix, prefix_order
 
1338
 
 
1339
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1340
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1341
        """For the given keys, group them into 'best-sized' requests.
 
1342
 
 
1343
        The idea is to avoid making 1 request per file, but to never try to
 
1344
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1345
        possible, we should try to group requests to the same pack file
 
1346
        together.
 
1347
 
 
1348
        :return: list of (keys, non_local) tuples that indicate what keys
 
1349
            should be fetched next.
 
1350
        """
 
1351
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1352
        #       from the same pack file together, and we want to extract all
 
1353
        #       the texts for a given build-chain together. Ultimately it
 
1354
        #       probably needs a better global view.
 
1355
        total_keys = len(keys)
 
1356
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1357
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1358
        cur_keys = []
 
1359
        cur_non_local = set()
 
1360
        cur_size = 0
 
1361
        result = []
 
1362
        sizes = []
 
1363
        for prefix in prefix_order:
 
1364
            keys = prefix_split_keys[prefix]
 
1365
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1366
 
 
1367
            this_size = self._index._get_total_build_size(keys, positions)
 
1368
            cur_size += this_size
 
1369
            cur_keys.extend(keys)
 
1370
            cur_non_local.update(non_local)
 
1371
            if cur_size > _min_buffer_size:
 
1372
                result.append((cur_keys, cur_non_local))
 
1373
                sizes.append(cur_size)
 
1374
                cur_keys = []
 
1375
                cur_non_local = set()
 
1376
                cur_size = 0
 
1377
        if cur_keys:
 
1378
            result.append((cur_keys, cur_non_local))
 
1379
            sizes.append(cur_size)
 
1380
        return result
1219
1381
 
1220
1382
    def get_record_stream(self, keys, ordering, include_delta_closure):
1221
1383
        """Get a stream of records for keys.
1234
1396
        if not keys:
1235
1397
            return
1236
1398
        if not self._index.has_graph:
1237
 
            # Cannot topological order when no graph has been stored.
 
1399
            # Cannot sort when no graph has been stored.
1238
1400
            ordering = 'unordered'
1239
1401
 
1240
1402
        remaining_keys = keys
1296
1458
                    needed_from_fallback.add(key)
1297
1459
        # Double index lookups here : need a unified api ?
1298
1460
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
1299
 
        if ordering == 'topological':
1300
 
            # Global topological sort
1301
 
            present_keys = tsort.topo_sort(global_map)
 
1461
        if ordering in ('topological', 'groupcompress'):
 
1462
            if ordering == 'topological':
 
1463
                # Global topological sort
 
1464
                present_keys = tsort.topo_sort(global_map)
 
1465
            else:
 
1466
                present_keys = sort_groupcompress(global_map)
1302
1467
            # Now group by source:
1303
1468
            source_keys = []
1304
1469
            current_source = None
1314
1479
        else:
1315
1480
            if ordering != 'unordered':
1316
1481
                raise AssertionError('valid values for ordering are:'
1317
 
                    ' "unordered" or "topological" not: %r'
 
1482
                    ' "unordered", "groupcompress" or "topological" not: %r'
1318
1483
                    % (ordering,))
1319
1484
            # Just group by source; remote sources first.
1320
1485
            present_keys = []
1342
1507
            # XXX: get_content_maps performs its own index queries; allow state
1343
1508
            # to be passed in.
1344
1509
            non_local_keys = needed_from_fallback - absent_keys
1345
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1346
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1347
 
            for prefix, keys in prefix_split_keys.iteritems():
1348
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1349
 
                non_local = set(non_local)
1350
 
                generator = _VFContentMapGenerator(self, keys, non_local,
1351
 
                    global_map)
 
1510
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1511
                                                                non_local_keys,
 
1512
                                                                positions):
 
1513
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1514
                                                   global_map,
 
1515
                                                   ordering=ordering)
1352
1516
                for record in generator.get_record_stream():
1353
1517
                    yield record
1354
1518
        else:
1356
1520
                if source is parent_maps[0]:
1357
1521
                    # this KnitVersionedFiles
1358
1522
                    records = [(key, positions[key][1]) for key in keys]
1359
 
                    for key, raw_data, sha1 in self._read_records_iter_raw(records):
 
1523
                    for key, raw_data in self._read_records_iter_unchecked(records):
1360
1524
                        (record_details, index_memo, _) = positions[key]
1361
1525
                        yield KnitContentFactory(key, global_map[key],
1362
 
                            record_details, sha1, raw_data, self._factory.annotated, None)
 
1526
                            record_details, None, raw_data, self._factory.annotated, None)
1363
1527
                else:
1364
1528
                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
1365
1529
                    for record in vf.get_record_stream(keys, ordering,
1388
1552
    def insert_record_stream(self, stream):
1389
1553
        """Insert a record stream into this container.
1390
1554
 
1391
 
        :param stream: A stream of records to insert. 
 
1555
        :param stream: A stream of records to insert.
1392
1556
        :return: None
1393
1557
        :seealso VersionedFiles.get_record_stream:
1394
1558
        """
1434
1598
        # key = basis_parent, value = index entry to add
1435
1599
        buffered_index_entries = {}
1436
1600
        for record in stream:
 
1601
            kind = record.storage_kind
 
1602
            if kind.startswith('knit-') and kind.endswith('-gz'):
 
1603
                # Check that the ID in the header of the raw knit bytes matches
 
1604
                # the record metadata.
 
1605
                raw_data = record._raw_record
 
1606
                df, rec = self._parse_record_header(record.key, raw_data)
 
1607
                df.close()
 
1608
            buffered = False
1437
1609
            parents = record.parents
1438
1610
            if record.storage_kind in delta_types:
1439
1611
                # TODO: eventually the record itself should track
1485
1657
                access_memo = self._access.add_raw_records(
1486
1658
                    [(record.key, len(bytes))], bytes)[0]
1487
1659
                index_entry = (record.key, options, access_memo, parents)
1488
 
                buffered = False
1489
1660
                if 'fulltext' not in options:
1490
1661
                    # Not a fulltext, so we need to make sure the compression
1491
1662
                    # parent will also be present.
1513
1684
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1514
1685
                # 0) or because it depends on a base only present in the
1515
1686
                # fallback kvfs.
 
1687
                self._access.flush()
1516
1688
                try:
1517
1689
                    # Try getting a fulltext directly from the record.
1518
1690
                    bytes = record.get_bytes_as('fulltext')
1526
1698
                except errors.RevisionAlreadyPresent:
1527
1699
                    pass
1528
1700
            # Add any records whose basis parent is now available.
1529
 
            added_keys = [record.key]
1530
 
            while added_keys:
1531
 
                key = added_keys.pop(0)
1532
 
                if key in buffered_index_entries:
1533
 
                    index_entries = buffered_index_entries[key]
1534
 
                    self._index.add_records(index_entries)
1535
 
                    added_keys.extend(
1536
 
                        [index_entry[0] for index_entry in index_entries])
1537
 
                    del buffered_index_entries[key]
1538
 
        # If there were any deltas which had a missing basis parent, error.
 
1701
            if not buffered:
 
1702
                added_keys = [record.key]
 
1703
                while added_keys:
 
1704
                    key = added_keys.pop(0)
 
1705
                    if key in buffered_index_entries:
 
1706
                        index_entries = buffered_index_entries[key]
 
1707
                        self._index.add_records(index_entries)
 
1708
                        added_keys.extend(
 
1709
                            [index_entry[0] for index_entry in index_entries])
 
1710
                        del buffered_index_entries[key]
1539
1711
        if buffered_index_entries:
1540
 
            from pprint import pformat
1541
 
            raise errors.BzrCheckError(
1542
 
                "record_stream refers to compression parents not in %r:\n%s"
1543
 
                % (self, pformat(sorted(buffered_index_entries.keys()))))
 
1712
            # There were index entries buffered at the end of the stream,
 
1713
            # So these need to be added (if the index supports holding such
 
1714
            # entries for later insertion)
 
1715
            all_entries = []
 
1716
            for key in buffered_index_entries:
 
1717
                index_entries = buffered_index_entries[key]
 
1718
                all_entries.extend(index_entries)
 
1719
            self._index.add_records(
 
1720
                all_entries, missing_compression_parents=True)
 
1721
 
 
1722
    def get_missing_compression_parent_keys(self):
 
1723
        """Return an iterable of keys of missing compression parents.
 
1724
 
 
1725
        Check this after calling insert_record_stream to find out if there are
 
1726
        any missing compression parents.  If there are, the records that
 
1727
        depend on them are not able to be inserted safely. For atomic
 
1728
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1729
        suspended - commit will fail at this point. Nonatomic knits will error
 
1730
        earlier because they have no staging area to put pending entries into.
 
1731
        """
 
1732
        return self._index.get_missing_compression_parents()
1544
1733
 
1545
1734
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1546
1735
        """Iterate over the lines in the versioned files from keys.
1563
1752
         * If a requested key did not change any lines (or didn't have any
1564
1753
           lines), it may not be mentioned at all in the result.
1565
1754
 
 
1755
        :param pb: Progress bar supplied by caller.
1566
1756
        :return: An iterator over (line, key).
1567
1757
        """
1568
1758
        if pb is None:
1569
 
            pb = progress.DummyProgress()
 
1759
            pb = ui.ui_factory.nested_progress_bar()
1570
1760
        keys = set(keys)
1571
1761
        total = len(keys)
1572
1762
        done = False
1582
1772
                        key_records.append((key, details[0]))
1583
1773
                records_iter = enumerate(self._read_records_iter(key_records))
1584
1774
                for (key_idx, (key, data, sha_value)) in records_iter:
1585
 
                    pb.update('Walking content.', key_idx, total)
 
1775
                    pb.update('Walking content', key_idx, total)
1586
1776
                    compression_parent = build_details[key][1]
1587
1777
                    if compression_parent is None:
1588
1778
                        # fulltext
1589
1779
                        line_iterator = self._factory.get_fulltext_content(data)
1590
1780
                    else:
1591
 
                        # Delta 
 
1781
                        # Delta
1592
1782
                        line_iterator = self._factory.get_linedelta_content(data)
1593
1783
                    # Now that we are yielding the data for this key, remove it
1594
1784
                    # from the list
1605
1795
        # If there are still keys we've not yet found, we look in the fallback
1606
1796
        # vfs, and hope to find them there.  Note that if the keys are found
1607
1797
        # but had no changes or no content, the fallback may not return
1608
 
        # anything.  
 
1798
        # anything.
1609
1799
        if keys and not self._fallback_vfs:
1610
1800
            # XXX: strictly the second parameter is meant to be the file id
1611
1801
            # but it's not easily accessible here.
1618
1808
                source_keys.add(key)
1619
1809
                yield line, key
1620
1810
            keys.difference_update(source_keys)
1621
 
        pb.update('Walking content.', total, total)
 
1811
        pb.update('Walking content', total, total)
1622
1812
 
1623
1813
    def _make_line_delta(self, delta_seq, new_content):
1624
1814
        """Generate a line delta from delta_seq and new_content."""
1633
1823
                           delta=None, annotated=None,
1634
1824
                           left_matching_blocks=None):
1635
1825
        """Merge annotations for content and generate deltas.
1636
 
        
 
1826
 
1637
1827
        This is done by comparing the annotations based on changes to the text
1638
1828
        and generating a delta on the resulting full texts. If annotations are
1639
1829
        not being created then a simple delta is created.
1721
1911
                                 rec[1], record_contents))
1722
1912
        if last_line != 'end %s\n' % rec[1]:
1723
1913
            raise KnitCorrupt(self,
1724
 
                              'unexpected version end line %r, wanted %r' 
 
1914
                              'unexpected version end line %r, wanted %r'
1725
1915
                              % (last_line, rec[1]))
1726
1916
        df.close()
1727
1917
        return rec, record_contents
1744
1934
        if not needed_records:
1745
1935
            return
1746
1936
 
1747
 
        # The transport optimizes the fetching as well 
 
1937
        # The transport optimizes the fetching as well
1748
1938
        # (ie, reads continuous ranges.)
1749
1939
        raw_data = self._access.get_raw_records(
1750
1940
            [index_memo for key, index_memo in needed_records])
1791
1981
 
1792
1982
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1793
1983
        """Convert key, digest, lines into a raw data block.
1794
 
        
 
1984
 
1795
1985
        :param key: The key of the record. Currently keys are always serialised
1796
1986
            using just the trailing component.
1797
1987
        :param dense_lines: The bytes of lines but in a denser form. For
1802
1992
            function spends less time resizing the final string.
1803
1993
        :return: (len, a StringIO instance with the raw data ready to read.)
1804
1994
        """
1805
 
        # Note: using a string copy here increases memory pressure with e.g.
1806
 
        # ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
1807
 
        # when doing the initial commit of a mozilla tree. RBC 20070921
1808
 
        bytes = ''.join(chain(
1809
 
            ["version %s %d %s\n" % (key[-1],
1810
 
                                     len(lines),
1811
 
                                     digest)],
1812
 
            dense_lines or lines,
1813
 
            ["end %s\n" % key[-1]]))
1814
 
        if type(bytes) != str:
1815
 
            raise AssertionError(
1816
 
                'data must be plain bytes was %s' % type(bytes))
 
1995
        chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
 
1996
        chunks.extend(dense_lines or lines)
 
1997
        chunks.append("end %s\n" % key[-1])
 
1998
        for chunk in chunks:
 
1999
            if type(chunk) is not str:
 
2000
                raise AssertionError(
 
2001
                    'data must be plain bytes was %s' % type(chunk))
1817
2002
        if lines and lines[-1][-1] != '\n':
1818
2003
            raise ValueError('corrupt lines value %r' % lines)
1819
 
        compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
 
2004
        compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
1820
2005
        return len(compressed_bytes), compressed_bytes
1821
2006
 
1822
2007
    def _split_header(self, line):
1840
2025
class _ContentMapGenerator(object):
1841
2026
    """Generate texts or expose raw deltas for a set of texts."""
1842
2027
 
 
2028
    def __init__(self, ordering='unordered'):
 
2029
        self._ordering = ordering
 
2030
 
1843
2031
    def _get_content(self, key):
1844
2032
        """Get the content object for key."""
1845
2033
        # Note that _get_content is only called when the _ContentMapGenerator
1860
2048
 
1861
2049
    def _work(self):
1862
2050
        """Produce maps of text and KnitContents as dicts.
1863
 
        
 
2051
 
1864
2052
        :return: (text_map, content_map) where text_map contains the texts for
1865
2053
            the requested versions and content_map contains the KnitContents.
1866
2054
        """
1879
2067
            # Loop over fallback repositories asking them for texts - ignore
1880
2068
            # any missing from a particular fallback.
1881
2069
            for record in source.get_record_stream(missing_keys,
1882
 
                'unordered', True):
 
2070
                self._ordering, True):
1883
2071
                if record.storage_kind == 'absent':
1884
2072
                    # Not in thie particular stream, may be in one of the
1885
2073
                    # other fallback vfs objects.
1887
2075
                missing_keys.remove(record.key)
1888
2076
                yield record
1889
2077
 
1890
 
        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
1891
 
            allow_missing=True)
 
2078
        if self._raw_record_map is None:
 
2079
            raise AssertionError('_raw_record_map should have been filled')
1892
2080
        first = True
1893
2081
        for key in self.keys:
1894
2082
            if key in self.nonlocal_keys:
1913
2101
                self._raw_record_map)
1914
2102
        record_map = self._record_map
1915
2103
        # raw_record_map is key:
1916
 
        # Have read and parsed records at this point. 
 
2104
        # Have read and parsed records at this point.
1917
2105
        for key in self.keys:
1918
2106
            if key in self.nonlocal_keys:
1919
2107
                # already handled
2017
2205
    """Content map generator reading from a VersionedFiles object."""
2018
2206
 
2019
2207
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
2020
 
        global_map=None, raw_record_map=None):
 
2208
        global_map=None, raw_record_map=None, ordering='unordered'):
2021
2209
        """Create a _ContentMapGenerator.
2022
 
        
 
2210
 
2023
2211
        :param versioned_files: The versioned files that the texts are being
2024
2212
            extracted from.
2025
2213
        :param keys: The keys to produce content maps for.
2031
2219
        :param raw_record_map: A unparsed raw record map to use for answering
2032
2220
            contents.
2033
2221
        """
 
2222
        _ContentMapGenerator.__init__(self, ordering=ordering)
2034
2223
        # The vf to source data from
2035
2224
        self.vf = versioned_files
2036
2225
        # The keys desired
2166
2355
 
2167
2356
    Duplicate entries may be written to the index for a single version id
2168
2357
    if this is done then the latter one completely replaces the former:
2169
 
    this allows updates to correct version and parent information. 
 
2358
    this allows updates to correct version and parent information.
2170
2359
    Note that the two entries may share the delta, and that successive
2171
2360
    annotations and references MUST point to the first entry.
2172
2361
 
2173
2362
    The index file on disc contains a header, followed by one line per knit
2174
2363
    record. The same revision can be present in an index file more than once.
2175
 
    The first occurrence gets assigned a sequence number starting from 0. 
2176
 
    
 
2364
    The first occurrence gets assigned a sequence number starting from 0.
 
2365
 
2177
2366
    The format of a single line is
2178
2367
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
2179
2368
    REVISION_ID is a utf8-encoded revision id
2180
 
    FLAGS is a comma separated list of flags about the record. Values include 
 
2369
    FLAGS is a comma separated list of flags about the record. Values include
2181
2370
        no-eol, line-delta, fulltext.
2182
2371
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
2183
 
        that the the compressed data starts at.
 
2372
        that the compressed data starts at.
2184
2373
    LENGTH is the ascii representation of the length of the data file.
2185
2374
    PARENT_ID a utf-8 revision id prefixed by a '.' that is a parent of
2186
2375
        REVISION_ID.
2187
2376
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
2188
2377
        revision id already in the knit that is a parent of REVISION_ID.
2189
2378
    The ' :' marker is the end of record marker.
2190
 
    
 
2379
 
2191
2380
    partial writes:
2192
2381
    when a write is interrupted to the index file, it will result in a line
2193
2382
    that does not end in ' :'. If the ' :' is not present at the end of a line,
2218
2407
        self._reset_cache()
2219
2408
        self.has_graph = True
2220
2409
 
2221
 
    def add_records(self, records, random_id=False):
 
2410
    def add_records(self, records, random_id=False, missing_compression_parents=False):
2222
2411
        """Add multiple records to the index.
2223
 
        
 
2412
 
2224
2413
        :param records: a list of tuples:
2225
2414
                         (key, options, access_memo, parents).
2226
2415
        :param random_id: If True the ids being added were randomly generated
2227
2416
            and no check for existence will be performed.
 
2417
        :param missing_compression_parents: If True the records being added are
 
2418
            only compressed against texts already in the index (or inside
 
2419
            records). If False the records all refer to unavailable texts (or
 
2420
            texts inside records) as compression parents.
2228
2421
        """
 
2422
        if missing_compression_parents:
 
2423
            # It might be nice to get the edge of the records. But keys isn't
 
2424
            # _wrong_.
 
2425
            keys = sorted(record[0] for record in records)
 
2426
            raise errors.RevisionNotPresent(keys, self)
2229
2427
        paths = {}
2230
2428
        for record in records:
2231
2429
            key = record[0]
2248
2446
                    line = "\n%s %s %s %s %s :" % (
2249
2447
                        key[-1], ','.join(options), pos, size,
2250
2448
                        self._dictionary_compress(parents))
2251
 
                    if type(line) != str:
 
2449
                    if type(line) is not str:
2252
2450
                        raise AssertionError(
2253
2451
                            'data must be utf8 was %s' % type(line))
2254
2452
                    lines.append(line)
2273
2471
        # Because kndx files do not support atomic insertion via separate index
2274
2472
        # files, they do not support this method.
2275
2473
        raise NotImplementedError(self.get_missing_compression_parents)
2276
 
    
 
2474
 
2277
2475
    def _cache_key(self, key, options, pos, size, parent_keys):
2278
2476
        """Cache a version record in the history array and index cache.
2279
2477
 
2386
2584
        except KeyError:
2387
2585
            raise RevisionNotPresent(key, self)
2388
2586
 
 
2587
    def find_ancestry(self, keys):
 
2588
        """See CombinedGraphIndex.find_ancestry()"""
 
2589
        prefixes = set(key[:-1] for key in keys)
 
2590
        self._load_prefixes(prefixes)
 
2591
        result = {}
 
2592
        parent_map = {}
 
2593
        missing_keys = set()
 
2594
        pending_keys = list(keys)
 
2595
        # This assumes that keys will not reference parents in a different
 
2596
        # prefix, which is accurate so far.
 
2597
        while pending_keys:
 
2598
            key = pending_keys.pop()
 
2599
            if key in parent_map:
 
2600
                continue
 
2601
            prefix = key[:-1]
 
2602
            try:
 
2603
                suffix_parents = self._kndx_cache[prefix][0][key[-1]][4]
 
2604
            except KeyError:
 
2605
                missing_keys.add(key)
 
2606
            else:
 
2607
                parent_keys = tuple([prefix + (suffix,)
 
2608
                                     for suffix in suffix_parents])
 
2609
                parent_map[key] = parent_keys
 
2610
                pending_keys.extend([p for p in parent_keys
 
2611
                                        if p not in parent_map])
 
2612
        return parent_map, missing_keys
 
2613
 
2389
2614
    def get_parent_map(self, keys):
2390
2615
        """Get a map of the parents of keys.
2391
2616
 
2412
2637
 
2413
2638
    def get_position(self, key):
2414
2639
        """Return details needed to access the version.
2415
 
        
 
2640
 
2416
2641
        :return: a tuple (key, data position, size) to hand to the access
2417
2642
            logic to get the record.
2418
2643
        """
2422
2647
        return key, entry[2], entry[3]
2423
2648
 
2424
2649
    has_key = _mod_index._has_key_from_parent_map
2425
 
    
 
2650
 
2426
2651
    def _init_index(self, path, extra_lines=[]):
2427
2652
        """Initialize an index."""
2428
2653
        sio = StringIO()
2437
2662
 
2438
2663
    def keys(self):
2439
2664
        """Get all the keys in the collection.
2440
 
        
 
2665
 
2441
2666
        The keys are not ordered.
2442
2667
        """
2443
2668
        result = set()
2444
2669
        # Identify all key prefixes.
2445
2670
        # XXX: A bit hacky, needs polish.
2446
 
        if type(self._mapper) == ConstantMapper:
 
2671
        if type(self._mapper) is ConstantMapper:
2447
2672
            prefixes = [()]
2448
2673
        else:
2449
2674
            relpaths = set()
2456
2681
            for suffix in self._kndx_cache[prefix][1]:
2457
2682
                result.add(prefix + (suffix,))
2458
2683
        return result
2459
 
    
 
2684
 
2460
2685
    def _load_prefixes(self, prefixes):
2461
2686
        """Load the indices for prefixes."""
2462
2687
        self._check_read()
2481
2706
                    del self._history
2482
2707
                except NoSuchFile:
2483
2708
                    self._kndx_cache[prefix] = ({}, [])
2484
 
                    if type(self._mapper) == ConstantMapper:
 
2709
                    if type(self._mapper) is ConstantMapper:
2485
2710
                        # preserve behaviour for revisions.kndx etc.
2486
2711
                        self._init_index(path)
2487
2712
                    del self._cache
2500
2725
 
2501
2726
    def _dictionary_compress(self, keys):
2502
2727
        """Dictionary compress keys.
2503
 
        
 
2728
 
2504
2729
        :param keys: The keys to generate references to.
2505
2730
        :return: A string representation of keys. keys which are present are
2506
2731
            dictionary compressed, and others are emitted as fulltext with a
2554
2779
            return index_memo[0][:-1], index_memo[1]
2555
2780
        return keys.sort(key=get_sort_key)
2556
2781
 
 
2782
    _get_total_build_size = _get_total_build_size
 
2783
 
2557
2784
    def _split_key(self, key):
2558
2785
        """Split key into a prefix and suffix."""
2559
2786
        return key[:-1], key[-1]
2560
2787
 
2561
2788
 
 
2789
class _KeyRefs(object):
 
2790
 
 
2791
    def __init__(self, track_new_keys=False):
 
2792
        # dict mapping 'key' to 'set of keys referring to that key'
 
2793
        self.refs = {}
 
2794
        if track_new_keys:
 
2795
            # set remembering all new keys
 
2796
            self.new_keys = set()
 
2797
        else:
 
2798
            self.new_keys = None
 
2799
 
 
2800
    def clear(self):
 
2801
        if self.refs:
 
2802
            self.refs.clear()
 
2803
        if self.new_keys:
 
2804
            self.new_keys.clear()
 
2805
 
 
2806
    def add_references(self, key, refs):
 
2807
        # Record the new references
 
2808
        for referenced in refs:
 
2809
            try:
 
2810
                needed_by = self.refs[referenced]
 
2811
            except KeyError:
 
2812
                needed_by = self.refs[referenced] = set()
 
2813
            needed_by.add(key)
 
2814
        # Discard references satisfied by the new key
 
2815
        self.add_key(key)
 
2816
 
 
2817
    def get_new_keys(self):
 
2818
        return self.new_keys
 
2819
    
 
2820
    def get_unsatisfied_refs(self):
 
2821
        return self.refs.iterkeys()
 
2822
 
 
2823
    def _satisfy_refs_for_key(self, key):
 
2824
        try:
 
2825
            del self.refs[key]
 
2826
        except KeyError:
 
2827
            # No keys depended on this key.  That's ok.
 
2828
            pass
 
2829
 
 
2830
    def add_key(self, key):
 
2831
        # satisfy refs for key, and remember that we've seen this key.
 
2832
        self._satisfy_refs_for_key(key)
 
2833
        if self.new_keys is not None:
 
2834
            self.new_keys.add(key)
 
2835
 
 
2836
    def satisfy_refs_for_keys(self, keys):
 
2837
        for key in keys:
 
2838
            self._satisfy_refs_for_key(key)
 
2839
 
 
2840
    def get_referrers(self):
 
2841
        result = set()
 
2842
        for referrers in self.refs.itervalues():
 
2843
            result.update(referrers)
 
2844
        return result
 
2845
 
 
2846
 
2562
2847
class _KnitGraphIndex(object):
2563
2848
    """A KnitVersionedFiles index layered on GraphIndex."""
2564
2849
 
2565
2850
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2566
 
        add_callback=None):
 
2851
        add_callback=None, track_external_parent_refs=False):
2567
2852
        """Construct a KnitGraphIndex on a graph_index.
2568
2853
 
2569
2854
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
2570
2855
        :param is_locked: A callback to check whether the object should answer
2571
2856
            queries.
2572
2857
        :param deltas: Allow delta-compressed records.
2573
 
        :param parents: If True, record knits parents, if not do not record 
 
2858
        :param parents: If True, record knits parents, if not do not record
2574
2859
            parents.
2575
2860
        :param add_callback: If not None, allow additions to the index and call
2576
2861
            this callback with a list of added GraphIndex nodes:
2577
2862
            [(node, value, node_refs), ...]
2578
2863
        :param is_locked: A callback, returns True if the index is locked and
2579
2864
            thus usable.
 
2865
        :param track_external_parent_refs: If True, record all external parent
 
2866
            references parents from added records.  These can be retrieved
 
2867
            later by calling get_missing_parents().
2580
2868
        """
2581
2869
        self._add_callback = add_callback
2582
2870
        self._graph_index = graph_index
2590
2878
        self.has_graph = parents
2591
2879
        self._is_locked = is_locked
2592
2880
        self._missing_compression_parents = set()
 
2881
        if track_external_parent_refs:
 
2882
            self._key_dependencies = _KeyRefs()
 
2883
        else:
 
2884
            self._key_dependencies = None
2593
2885
 
2594
2886
    def __repr__(self):
2595
2887
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2596
2888
 
2597
 
    def add_records(self, records, random_id=False):
 
2889
    def add_records(self, records, random_id=False,
 
2890
        missing_compression_parents=False):
2598
2891
        """Add multiple records to the index.
2599
 
        
 
2892
 
2600
2893
        This function does not insert data into the Immutable GraphIndex
2601
2894
        backing the KnitGraphIndex, instead it prepares data for insertion by
2602
2895
        the caller and checks that it is safe to insert then calls
2606
2899
                         (key, options, access_memo, parents).
2607
2900
        :param random_id: If True the ids being added were randomly generated
2608
2901
            and no check for existence will be performed.
 
2902
        :param missing_compression_parents: If True the records being added are
 
2903
            only compressed against texts already in the index (or inside
 
2904
            records). If False the records all refer to unavailable texts (or
 
2905
            texts inside records) as compression parents.
2609
2906
        """
2610
2907
        if not self._add_callback:
2611
2908
            raise errors.ReadOnlyError(self)
2613
2910
        # anymore.
2614
2911
 
2615
2912
        keys = {}
 
2913
        compression_parents = set()
 
2914
        key_dependencies = self._key_dependencies
2616
2915
        for (key, options, access_memo, parents) in records:
2617
2916
            if self._parents:
2618
2917
                parents = tuple(parents)
 
2918
                if key_dependencies is not None:
 
2919
                    key_dependencies.add_references(key, parents)
2619
2920
            index, pos, size = access_memo
2620
2921
            if 'no-eol' in options:
2621
2922
                value = 'N'
2629
2930
                if self._deltas:
2630
2931
                    if 'line-delta' in options:
2631
2932
                        node_refs = (parents, (parents[0],))
 
2933
                        if missing_compression_parents:
 
2934
                            compression_parents.add(parents[0])
2632
2935
                    else:
2633
2936
                        node_refs = (parents, ())
2634
2937
                else:
2643
2946
        if not random_id:
2644
2947
            present_nodes = self._get_entries(keys)
2645
2948
            for (index, key, value, node_refs) in present_nodes:
 
2949
                parents = node_refs[:1]
 
2950
                # Sometimes these are passed as a list rather than a tuple
 
2951
                passed = static_tuple.as_tuples(keys[key])
 
2952
                passed_parents = passed[1][:1]
2646
2953
                if (value[0] != keys[key][0][0] or
2647
 
                    node_refs[:1] != keys[key][1][:1]):
 
2954
                    parents != passed_parents):
 
2955
                    node_refs = static_tuple.as_tuples(node_refs)
2648
2956
                    raise KnitCorrupt(self, "inconsistent details in add_records"
2649
 
                        ": %s %s" % ((value, node_refs), keys[key]))
 
2957
                        ": %s %s" % ((value, node_refs), passed))
2650
2958
                del keys[key]
2651
2959
        result = []
2652
2960
        if self._parents:
2656
2964
            for key, (value, node_refs) in keys.iteritems():
2657
2965
                result.append((key, value))
2658
2966
        self._add_callback(result)
2659
 
        
 
2967
        if missing_compression_parents:
 
2968
            # This may appear to be incorrect (it does not check for
 
2969
            # compression parents that are in the existing graph index),
 
2970
            # but such records won't have been buffered, so this is
 
2971
            # actually correct: every entry when
 
2972
            # missing_compression_parents==True either has a missing parent, or
 
2973
            # a parent that is one of the keys in records.
 
2974
            compression_parents.difference_update(keys)
 
2975
            self._missing_compression_parents.update(compression_parents)
 
2976
        # Adding records may have satisfied missing compression parents.
 
2977
        self._missing_compression_parents.difference_update(keys)
 
2978
 
2660
2979
    def scan_unvalidated_index(self, graph_index):
2661
2980
        """Inform this _KnitGraphIndex that there is an unvalidated index.
2662
2981
 
2670
2989
            new_missing = graph_index.external_references(ref_list_num=1)
2671
2990
            new_missing.difference_update(self.get_parent_map(new_missing))
2672
2991
            self._missing_compression_parents.update(new_missing)
 
2992
        if self._key_dependencies is not None:
 
2993
            # Add parent refs from graph_index (and discard parent refs that
 
2994
            # the graph_index has).
 
2995
            for node in graph_index.iter_all_entries():
 
2996
                self._key_dependencies.add_references(node[1], node[3][0])
2673
2997
 
2674
2998
    def get_missing_compression_parents(self):
2675
 
        """Return the keys of compression parents missing from unvalidated
2676
 
        indices.
 
2999
        """Return the keys of missing compression parents.
 
3000
 
 
3001
        Missing compression parents occur when a record stream was missing
 
3002
        basis texts, or a index was scanned that had missing basis texts.
2677
3003
        """
2678
3004
        return frozenset(self._missing_compression_parents)
2679
3005
 
 
3006
    def get_missing_parents(self):
 
3007
        """Return the keys of missing parents."""
 
3008
        # If updating this, you should also update
 
3009
        # groupcompress._GCGraphIndex.get_missing_parents
 
3010
        # We may have false positives, so filter those out.
 
3011
        self._key_dependencies.satisfy_refs_for_keys(
 
3012
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
3013
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
3014
 
2680
3015
    def _check_read(self):
2681
3016
        """raise if reads are not permitted."""
2682
3017
        if not self._is_locked():
2742
3077
 
2743
3078
    def _get_entries(self, keys, check_present=False):
2744
3079
        """Get the entries for keys.
2745
 
        
 
3080
 
2746
3081
        :param keys: An iterable of index key tuples.
2747
3082
        """
2748
3083
        keys = set(keys)
2790
3125
            options.append('no-eol')
2791
3126
        return options
2792
3127
 
 
3128
    def find_ancestry(self, keys):
 
3129
        """See CombinedGraphIndex.find_ancestry()"""
 
3130
        return self._graph_index.find_ancestry(keys, 0)
 
3131
 
2793
3132
    def get_parent_map(self, keys):
2794
3133
        """Get a map of the parents of keys.
2795
3134
 
2810
3149
 
2811
3150
    def get_position(self, key):
2812
3151
        """Return details needed to access the version.
2813
 
        
 
3152
 
2814
3153
        :return: a tuple (index, data position, size) to hand to the access
2815
3154
            logic to get the record.
2816
3155
        """
2821
3160
 
2822
3161
    def keys(self):
2823
3162
        """Get all the keys in the collection.
2824
 
        
 
3163
 
2825
3164
        The keys are not ordered.
2826
3165
        """
2827
3166
        self._check_read()
2828
3167
        return [node[1] for node in self._graph_index.iter_all_entries()]
2829
 
    
 
3168
 
2830
3169
    missing_keys = _mod_index._missing_keys_from_parent_map
2831
3170
 
2832
3171
    def _node_to_position(self, node):
2854
3193
            return positions[key][1]
2855
3194
        return keys.sort(key=get_index_memo)
2856
3195
 
 
3196
    _get_total_build_size = _get_total_build_size
 
3197
 
2857
3198
 
2858
3199
class _KnitKeyAccess(object):
2859
3200
    """Access to records in .knit files."""
2880
3221
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
2881
3222
            length), where the key is the record key.
2882
3223
        """
2883
 
        if type(raw_data) != str:
 
3224
        if type(raw_data) is not str:
2884
3225
            raise AssertionError(
2885
3226
                'data must be plain bytes was %s' % type(raw_data))
2886
3227
        result = []
2903
3244
            result.append((key, base, size))
2904
3245
        return result
2905
3246
 
 
3247
    def flush(self):
 
3248
        """Flush pending writes on this access object.
 
3249
        
 
3250
        For .knit files this is a no-op.
 
3251
        """
 
3252
        pass
 
3253
 
2906
3254
    def get_raw_records(self, memos_for_retrieval):
2907
3255
        """Get the raw bytes for a records.
2908
3256
 
2933
3281
class _DirectPackAccess(object):
2934
3282
    """Access to data in one or more packs with less translation."""
2935
3283
 
2936
 
    def __init__(self, index_to_packs, reload_func=None):
 
3284
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2937
3285
        """Create a _DirectPackAccess object.
2938
3286
 
2939
3287
        :param index_to_packs: A dict mapping index objects to the transport
2946
3294
        self._write_index = None
2947
3295
        self._indices = index_to_packs
2948
3296
        self._reload_func = reload_func
 
3297
        self._flush_func = flush_func
2949
3298
 
2950
3299
    def add_raw_records(self, key_sizes, raw_data):
2951
3300
        """Add raw knit bytes to a storage area.
2961
3310
            length), where the index field is the write_index object supplied
2962
3311
            to the PackAccess object.
2963
3312
        """
2964
 
        if type(raw_data) != str:
 
3313
        if type(raw_data) is not str:
2965
3314
            raise AssertionError(
2966
3315
                'data must be plain bytes was %s' % type(raw_data))
2967
3316
        result = []
2973
3322
            result.append((self._write_index, p_offset, p_length))
2974
3323
        return result
2975
3324
 
 
3325
    def flush(self):
 
3326
        """Flush pending writes on this access object.
 
3327
 
 
3328
        This will flush any buffered writes to a NewPack.
 
3329
        """
 
3330
        if self._flush_func is not None:
 
3331
            self._flush_func()
 
3332
            
2976
3333
    def get_raw_records(self, memos_for_retrieval):
2977
3334
        """Get the raw bytes for a records.
2978
3335
 
2979
 
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
3336
        :param memos_for_retrieval: An iterable containing the (index, pos,
2980
3337
            length) memo for retrieving the bytes. The Pack access method
2981
3338
            looks up the pack to use for a given record in its index_to_pack
2982
3339
            map.
3072
3429
    recommended.
3073
3430
    """
3074
3431
    annotator = _KnitAnnotator(knit)
3075
 
    return iter(annotator.annotate(revision_id))
3076
 
 
3077
 
 
3078
 
class _KnitAnnotator(object):
 
3432
    return iter(annotator.annotate_flat(revision_id))
 
3433
 
 
3434
 
 
3435
class _KnitAnnotator(annotate.Annotator):
3079
3436
    """Build up the annotations for a text."""
3080
3437
 
3081
 
    def __init__(self, knit):
3082
 
        self._knit = knit
3083
 
 
3084
 
        # Content objects, differs from fulltexts because of how final newlines
3085
 
        # are treated by knits. the content objects here will always have a
3086
 
        # final newline
3087
 
        self._fulltext_contents = {}
3088
 
 
3089
 
        # Annotated lines of specific revisions
3090
 
        self._annotated_lines = {}
3091
 
 
3092
 
        # Track the raw data for nodes that we could not process yet.
3093
 
        # This maps the revision_id of the base to a list of children that will
3094
 
        # annotated from it.
3095
 
        self._pending_children = {}
3096
 
 
3097
 
        # Nodes which cannot be extracted
3098
 
        self._ghosts = set()
3099
 
 
3100
 
        # Track how many children this node has, so we know if we need to keep
3101
 
        # it
3102
 
        self._annotate_children = {}
3103
 
        self._compression_children = {}
 
3438
    def __init__(self, vf):
 
3439
        annotate.Annotator.__init__(self, vf)
 
3440
 
 
3441
        # TODO: handle Nodes which cannot be extracted
 
3442
        # self._ghosts = set()
 
3443
 
 
3444
        # Map from (key, parent_key) => matching_blocks, should be 'use once'
 
3445
        self._matching_blocks = {}
 
3446
 
 
3447
        # KnitContent objects
 
3448
        self._content_objects = {}
 
3449
        # The number of children that depend on this fulltext content object
 
3450
        self._num_compression_children = {}
 
3451
        # Delta records that need their compression parent before they can be
 
3452
        # expanded
 
3453
        self._pending_deltas = {}
 
3454
        # Fulltext records that are waiting for their parents fulltexts before
 
3455
        # they can be yielded for annotation
 
3456
        self._pending_annotation = {}
3104
3457
 
3105
3458
        self._all_build_details = {}
3106
 
        # The children => parent revision_id graph
3107
 
        self._revision_id_graph = {}
3108
 
 
3109
 
        self._heads_provider = None
3110
 
 
3111
 
        self._nodes_to_keep_annotations = set()
3112
 
        self._generations_until_keep = 100
3113
 
 
3114
 
    def set_generations_until_keep(self, value):
3115
 
        """Set the number of generations before caching a node.
3116
 
 
3117
 
        Setting this to -1 will cache every merge node, setting this higher
3118
 
        will cache fewer nodes.
3119
 
        """
3120
 
        self._generations_until_keep = value
3121
 
 
3122
 
    def _add_fulltext_content(self, revision_id, content_obj):
3123
 
        self._fulltext_contents[revision_id] = content_obj
3124
 
        # TODO: jam 20080305 It might be good to check the sha1digest here
3125
 
        return content_obj.text()
3126
 
 
3127
 
    def _check_parents(self, child, nodes_to_annotate):
3128
 
        """Check if all parents have been processed.
3129
 
 
3130
 
        :param child: A tuple of (rev_id, parents, raw_content)
3131
 
        :param nodes_to_annotate: If child is ready, add it to
3132
 
            nodes_to_annotate, otherwise put it back in self._pending_children
3133
 
        """
3134
 
        for parent_id in child[1]:
3135
 
            if (parent_id not in self._annotated_lines):
3136
 
                # This parent is present, but another parent is missing
3137
 
                self._pending_children.setdefault(parent_id,
3138
 
                                                  []).append(child)
3139
 
                break
3140
 
        else:
3141
 
            # This one is ready to be processed
3142
 
            nodes_to_annotate.append(child)
3143
 
 
3144
 
    def _add_annotation(self, revision_id, fulltext, parent_ids,
3145
 
                        left_matching_blocks=None):
3146
 
        """Add an annotation entry.
3147
 
 
3148
 
        All parents should already have been annotated.
3149
 
        :return: A list of children that now have their parents satisfied.
3150
 
        """
3151
 
        a = self._annotated_lines
3152
 
        annotated_parent_lines = [a[p] for p in parent_ids]
3153
 
        annotated_lines = list(annotate.reannotate(annotated_parent_lines,
3154
 
            fulltext, revision_id, left_matching_blocks,
3155
 
            heads_provider=self._get_heads_provider()))
3156
 
        self._annotated_lines[revision_id] = annotated_lines
3157
 
        for p in parent_ids:
3158
 
            ann_children = self._annotate_children[p]
3159
 
            ann_children.remove(revision_id)
3160
 
            if (not ann_children
3161
 
                and p not in self._nodes_to_keep_annotations):
3162
 
                del self._annotated_lines[p]
3163
 
                del self._all_build_details[p]
3164
 
                if p in self._fulltext_contents:
3165
 
                    del self._fulltext_contents[p]
3166
 
        # Now that we've added this one, see if there are any pending
3167
 
        # deltas to be done, certainly this parent is finished
3168
 
        nodes_to_annotate = []
3169
 
        for child in self._pending_children.pop(revision_id, []):
3170
 
            self._check_parents(child, nodes_to_annotate)
3171
 
        return nodes_to_annotate
3172
3459
 
3173
3460
    def _get_build_graph(self, key):
3174
3461
        """Get the graphs for building texts and annotations.
3179
3466
        fulltext.)
3180
3467
 
3181
3468
        :return: A list of (key, index_memo) records, suitable for
3182
 
            passing to read_records_iter to start reading in the raw data fro/
 
3469
            passing to read_records_iter to start reading in the raw data from
3183
3470
            the pack file.
3184
3471
        """
3185
 
        if key in self._annotated_lines:
3186
 
            # Nothing to do
3187
 
            return []
3188
3472
        pending = set([key])
3189
3473
        records = []
3190
 
        generation = 0
3191
 
        kept_generation = 0
 
3474
        ann_keys = set()
 
3475
        self._num_needed_children[key] = 1
3192
3476
        while pending:
3193
3477
            # get all pending nodes
3194
 
            generation += 1
3195
3478
            this_iteration = pending
3196
 
            build_details = self._knit._index.get_build_details(this_iteration)
 
3479
            build_details = self._vf._index.get_build_details(this_iteration)
3197
3480
            self._all_build_details.update(build_details)
3198
 
            # new_nodes = self._knit._index._get_entries(this_iteration)
 
3481
            # new_nodes = self._vf._index._get_entries(this_iteration)
3199
3482
            pending = set()
3200
3483
            for key, details in build_details.iteritems():
3201
 
                (index_memo, compression_parent, parents,
 
3484
                (index_memo, compression_parent, parent_keys,
3202
3485
                 record_details) = details
3203
 
                self._revision_id_graph[key] = parents
 
3486
                self._parent_map[key] = parent_keys
 
3487
                self._heads_provider = None
3204
3488
                records.append((key, index_memo))
3205
3489
                # Do we actually need to check _annotated_lines?
3206
 
                pending.update(p for p in parents
3207
 
                                 if p not in self._all_build_details)
 
3490
                pending.update([p for p in parent_keys
 
3491
                                   if p not in self._all_build_details])
 
3492
                if parent_keys:
 
3493
                    for parent_key in parent_keys:
 
3494
                        if parent_key in self._num_needed_children:
 
3495
                            self._num_needed_children[parent_key] += 1
 
3496
                        else:
 
3497
                            self._num_needed_children[parent_key] = 1
3208
3498
                if compression_parent:
3209
 
                    self._compression_children.setdefault(compression_parent,
3210
 
                        []).append(key)
3211
 
                if parents:
3212
 
                    for parent in parents:
3213
 
                        self._annotate_children.setdefault(parent,
3214
 
                            []).append(key)
3215
 
                    num_gens = generation - kept_generation
3216
 
                    if ((num_gens >= self._generations_until_keep)
3217
 
                        and len(parents) > 1):
3218
 
                        kept_generation = generation
3219
 
                        self._nodes_to_keep_annotations.add(key)
 
3499
                    if compression_parent in self._num_compression_children:
 
3500
                        self._num_compression_children[compression_parent] += 1
 
3501
                    else:
 
3502
                        self._num_compression_children[compression_parent] = 1
3220
3503
 
3221
3504
            missing_versions = this_iteration.difference(build_details.keys())
3222
 
            self._ghosts.update(missing_versions)
3223
 
            for missing_version in missing_versions:
3224
 
                # add a key, no parents
3225
 
                self._revision_id_graph[missing_version] = ()
3226
 
                pending.discard(missing_version) # don't look for it
3227
 
        if self._ghosts.intersection(self._compression_children):
3228
 
            raise KnitCorrupt(
3229
 
                "We cannot have nodes which have a ghost compression parent:\n"
3230
 
                "ghosts: %r\n"
3231
 
                "compression children: %r"
3232
 
                % (self._ghosts, self._compression_children))
3233
 
        # Cleanout anything that depends on a ghost so that we don't wait for
3234
 
        # the ghost to show up
3235
 
        for node in self._ghosts:
3236
 
            if node in self._annotate_children:
3237
 
                # We won't be building this node
3238
 
                del self._annotate_children[node]
 
3505
            if missing_versions:
 
3506
                for key in missing_versions:
 
3507
                    if key in self._parent_map and key in self._text_cache:
 
3508
                        # We already have this text ready, we just need to
 
3509
                        # yield it later so we get it annotated
 
3510
                        ann_keys.add(key)
 
3511
                        parent_keys = self._parent_map[key]
 
3512
                        for parent_key in parent_keys:
 
3513
                            if parent_key in self._num_needed_children:
 
3514
                                self._num_needed_children[parent_key] += 1
 
3515
                            else:
 
3516
                                self._num_needed_children[parent_key] = 1
 
3517
                        pending.update([p for p in parent_keys
 
3518
                                           if p not in self._all_build_details])
 
3519
                    else:
 
3520
                        raise errors.RevisionNotPresent(key, self._vf)
3239
3521
        # Generally we will want to read the records in reverse order, because
3240
3522
        # we find the parent nodes after the children
3241
3523
        records.reverse()
3242
 
        return records
3243
 
 
3244
 
    def _annotate_records(self, records):
3245
 
        """Build the annotations for the listed records."""
 
3524
        return records, ann_keys
 
3525
 
 
3526
    def _get_needed_texts(self, key, pb=None):
 
3527
        # if True or len(self._vf._fallback_vfs) > 0:
 
3528
        if len(self._vf._fallback_vfs) > 0:
 
3529
            # If we have fallbacks, go to the generic path
 
3530
            for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
 
3531
                yield v
 
3532
            return
 
3533
        while True:
 
3534
            try:
 
3535
                records, ann_keys = self._get_build_graph(key)
 
3536
                for idx, (sub_key, text, num_lines) in enumerate(
 
3537
                                                self._extract_texts(records)):
 
3538
                    if pb is not None:
 
3539
                        pb.update('annotating', idx, len(records))
 
3540
                    yield sub_key, text, num_lines
 
3541
                for sub_key in ann_keys:
 
3542
                    text = self._text_cache[sub_key]
 
3543
                    num_lines = len(text) # bad assumption
 
3544
                    yield sub_key, text, num_lines
 
3545
                return
 
3546
            except errors.RetryWithNewPacks, e:
 
3547
                self._vf._access.reload_or_raise(e)
 
3548
                # The cached build_details are no longer valid
 
3549
                self._all_build_details.clear()
 
3550
 
 
3551
    def _cache_delta_blocks(self, key, compression_parent, delta, lines):
 
3552
        parent_lines = self._text_cache[compression_parent]
 
3553
        blocks = list(KnitContent.get_line_delta_blocks(delta, parent_lines, lines))
 
3554
        self._matching_blocks[(key, compression_parent)] = blocks
 
3555
 
 
3556
    def _expand_record(self, key, parent_keys, compression_parent, record,
 
3557
                       record_details):
 
3558
        delta = None
 
3559
        if compression_parent:
 
3560
            if compression_parent not in self._content_objects:
 
3561
                # Waiting for the parent
 
3562
                self._pending_deltas.setdefault(compression_parent, []).append(
 
3563
                    (key, parent_keys, record, record_details))
 
3564
                return None
 
3565
            # We have the basis parent, so expand the delta
 
3566
            num = self._num_compression_children[compression_parent]
 
3567
            num -= 1
 
3568
            if num == 0:
 
3569
                base_content = self._content_objects.pop(compression_parent)
 
3570
                self._num_compression_children.pop(compression_parent)
 
3571
            else:
 
3572
                self._num_compression_children[compression_parent] = num
 
3573
                base_content = self._content_objects[compression_parent]
 
3574
            # It is tempting to want to copy_base_content=False for the last
 
3575
            # child object. However, whenever noeol=False,
 
3576
            # self._text_cache[parent_key] is content._lines. So mutating it
 
3577
            # gives very bad results.
 
3578
            # The alternative is to copy the lines into text cache, but then we
 
3579
            # are copying anyway, so just do it here.
 
3580
            content, delta = self._vf._factory.parse_record(
 
3581
                key, record, record_details, base_content,
 
3582
                copy_base_content=True)
 
3583
        else:
 
3584
            # Fulltext record
 
3585
            content, _ = self._vf._factory.parse_record(
 
3586
                key, record, record_details, None)
 
3587
        if self._num_compression_children.get(key, 0) > 0:
 
3588
            self._content_objects[key] = content
 
3589
        lines = content.text()
 
3590
        self._text_cache[key] = lines
 
3591
        if delta is not None:
 
3592
            self._cache_delta_blocks(key, compression_parent, delta, lines)
 
3593
        return lines
 
3594
 
 
3595
    def _get_parent_annotations_and_matches(self, key, text, parent_key):
 
3596
        """Get the list of annotations for the parent, and the matching lines.
 
3597
 
 
3598
        :param text: The opaque value given by _get_needed_texts
 
3599
        :param parent_key: The key for the parent text
 
3600
        :return: (parent_annotations, matching_blocks)
 
3601
            parent_annotations is a list as long as the number of lines in
 
3602
                parent
 
3603
            matching_blocks is a list of (parent_idx, text_idx, len) tuples
 
3604
                indicating which lines match between the two texts
 
3605
        """
 
3606
        block_key = (key, parent_key)
 
3607
        if block_key in self._matching_blocks:
 
3608
            blocks = self._matching_blocks.pop(block_key)
 
3609
            parent_annotations = self._annotations_cache[parent_key]
 
3610
            return parent_annotations, blocks
 
3611
        return annotate.Annotator._get_parent_annotations_and_matches(self,
 
3612
            key, text, parent_key)
 
3613
 
 
3614
    def _process_pending(self, key):
 
3615
        """The content for 'key' was just processed.
 
3616
 
 
3617
        Determine if there is any more pending work to be processed.
 
3618
        """
 
3619
        to_return = []
 
3620
        if key in self._pending_deltas:
 
3621
            compression_parent = key
 
3622
            children = self._pending_deltas.pop(key)
 
3623
            for child_key, parent_keys, record, record_details in children:
 
3624
                lines = self._expand_record(child_key, parent_keys,
 
3625
                                            compression_parent,
 
3626
                                            record, record_details)
 
3627
                if self._check_ready_for_annotations(child_key, parent_keys):
 
3628
                    to_return.append(child_key)
 
3629
        # Also check any children that are waiting for this parent to be
 
3630
        # annotation ready
 
3631
        if key in self._pending_annotation:
 
3632
            children = self._pending_annotation.pop(key)
 
3633
            to_return.extend([c for c, p_keys in children
 
3634
                              if self._check_ready_for_annotations(c, p_keys)])
 
3635
        return to_return
 
3636
 
 
3637
    def _check_ready_for_annotations(self, key, parent_keys):
 
3638
        """return true if this text is ready to be yielded.
 
3639
 
 
3640
        Otherwise, this will return False, and queue the text into
 
3641
        self._pending_annotation
 
3642
        """
 
3643
        for parent_key in parent_keys:
 
3644
            if parent_key not in self._annotations_cache:
 
3645
                # still waiting on at least one parent text, so queue it up
 
3646
                # Note that if there are multiple parents, we need to wait
 
3647
                # for all of them.
 
3648
                self._pending_annotation.setdefault(parent_key,
 
3649
                    []).append((key, parent_keys))
 
3650
                return False
 
3651
        return True
 
3652
 
 
3653
    def _extract_texts(self, records):
 
3654
        """Extract the various texts needed based on records"""
3246
3655
        # We iterate in the order read, rather than a strict order requested
3247
3656
        # However, process what we can, and put off to the side things that
3248
3657
        # still need parents, cleaning them up when those parents are
3249
3658
        # processed.
3250
 
        for (rev_id, record,
3251
 
             digest) in self._knit._read_records_iter(records):
3252
 
            if rev_id in self._annotated_lines:
 
3659
        # Basic data flow:
 
3660
        #   1) As 'records' are read, see if we can expand these records into
 
3661
        #      Content objects (and thus lines)
 
3662
        #   2) If a given line-delta is waiting on its compression parent, it
 
3663
        #      gets queued up into self._pending_deltas, otherwise we expand
 
3664
        #      it, and put it into self._text_cache and self._content_objects
 
3665
        #   3) If we expanded the text, we will then check to see if all
 
3666
        #      parents have also been processed. If so, this text gets yielded,
 
3667
        #      else this record gets set aside into pending_annotation
 
3668
        #   4) Further, if we expanded the text in (2), we will then check to
 
3669
        #      see if there are any children in self._pending_deltas waiting to
 
3670
        #      also be processed. If so, we go back to (2) for those
 
3671
        #   5) Further again, if we yielded the text, we can then check if that
 
3672
        #      'unlocks' any of the texts in pending_annotations, which should
 
3673
        #      then get yielded as well
 
3674
        # Note that both steps 4 and 5 are 'recursive' in that unlocking one
 
3675
        # compression child could unlock yet another, and yielding a fulltext
 
3676
        # will also 'unlock' the children that are waiting on that annotation.
 
3677
        # (Though also, unlocking 1 parent's fulltext, does not unlock a child
 
3678
        # if other parents are also waiting.)
 
3679
        # We want to yield content before expanding child content objects, so
 
3680
        # that we know when we can re-use the content lines, and the annotation
 
3681
        # code can know when it can stop caching fulltexts, as well.
 
3682
 
 
3683
        # Children that are missing their compression parent
 
3684
        pending_deltas = {}
 
3685
        for (key, record, digest) in self._vf._read_records_iter(records):
 
3686
            # ghosts?
 
3687
            details = self._all_build_details[key]
 
3688
            (_, compression_parent, parent_keys, record_details) = details
 
3689
            lines = self._expand_record(key, parent_keys, compression_parent,
 
3690
                                        record, record_details)
 
3691
            if lines is None:
 
3692
                # Pending delta should be queued up
3253
3693
                continue
3254
 
            parent_ids = self._revision_id_graph[rev_id]
3255
 
            parent_ids = [p for p in parent_ids if p not in self._ghosts]
3256
 
            details = self._all_build_details[rev_id]
3257
 
            (index_memo, compression_parent, parents,
3258
 
             record_details) = details
3259
 
            nodes_to_annotate = []
3260
 
            # TODO: Remove the punning between compression parents, and
3261
 
            #       parent_ids, we should be able to do this without assuming
3262
 
            #       the build order
3263
 
            if len(parent_ids) == 0:
3264
 
                # There are no parents for this node, so just add it
3265
 
                # TODO: This probably needs to be decoupled
3266
 
                fulltext_content, delta = self._knit._factory.parse_record(
3267
 
                    rev_id, record, record_details, None)
3268
 
                fulltext = self._add_fulltext_content(rev_id, fulltext_content)
3269
 
                nodes_to_annotate.extend(self._add_annotation(rev_id, fulltext,
3270
 
                    parent_ids, left_matching_blocks=None))
3271
 
            else:
3272
 
                child = (rev_id, parent_ids, record)
3273
 
                # Check if all the parents are present
3274
 
                self._check_parents(child, nodes_to_annotate)
3275
 
            while nodes_to_annotate:
3276
 
                # Should we use a queue here instead of a stack?
3277
 
                (rev_id, parent_ids, record) = nodes_to_annotate.pop()
3278
 
                (index_memo, compression_parent, parents,
3279
 
                 record_details) = self._all_build_details[rev_id]
3280
 
                blocks = None
3281
 
                if compression_parent is not None:
3282
 
                    comp_children = self._compression_children[compression_parent]
3283
 
                    if rev_id not in comp_children:
3284
 
                        raise AssertionError("%r not in compression children %r"
3285
 
                            % (rev_id, comp_children))
3286
 
                    # If there is only 1 child, it is safe to reuse this
3287
 
                    # content
3288
 
                    reuse_content = (len(comp_children) == 1
3289
 
                        and compression_parent not in
3290
 
                            self._nodes_to_keep_annotations)
3291
 
                    if reuse_content:
3292
 
                        # Remove it from the cache since it will be changing
3293
 
                        parent_fulltext_content = self._fulltext_contents.pop(compression_parent)
3294
 
                        # Make sure to copy the fulltext since it might be
3295
 
                        # modified
3296
 
                        parent_fulltext = list(parent_fulltext_content.text())
3297
 
                    else:
3298
 
                        parent_fulltext_content = self._fulltext_contents[compression_parent]
3299
 
                        parent_fulltext = parent_fulltext_content.text()
3300
 
                    comp_children.remove(rev_id)
3301
 
                    fulltext_content, delta = self._knit._factory.parse_record(
3302
 
                        rev_id, record, record_details,
3303
 
                        parent_fulltext_content,
3304
 
                        copy_base_content=(not reuse_content))
3305
 
                    fulltext = self._add_fulltext_content(rev_id,
3306
 
                                                          fulltext_content)
3307
 
                    if compression_parent == parent_ids[0]:
3308
 
                        # the compression_parent is the left parent, so we can
3309
 
                        # re-use the delta
3310
 
                        blocks = KnitContent.get_line_delta_blocks(delta,
3311
 
                                parent_fulltext, fulltext)
3312
 
                else:
3313
 
                    fulltext_content = self._knit._factory.parse_fulltext(
3314
 
                        record, rev_id)
3315
 
                    fulltext = self._add_fulltext_content(rev_id,
3316
 
                        fulltext_content)
3317
 
                nodes_to_annotate.extend(
3318
 
                    self._add_annotation(rev_id, fulltext, parent_ids,
3319
 
                                     left_matching_blocks=blocks))
3320
 
 
3321
 
    def _get_heads_provider(self):
3322
 
        """Create a heads provider for resolving ancestry issues."""
3323
 
        if self._heads_provider is not None:
3324
 
            return self._heads_provider
3325
 
        parent_provider = _mod_graph.DictParentsProvider(
3326
 
            self._revision_id_graph)
3327
 
        graph_obj = _mod_graph.Graph(parent_provider)
3328
 
        head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
3329
 
        self._heads_provider = head_cache
3330
 
        return head_cache
3331
 
 
3332
 
    def annotate(self, key):
3333
 
        """Return the annotated fulltext at the given key.
3334
 
 
3335
 
        :param key: The key to annotate.
3336
 
        """
3337
 
        if len(self._knit._fallback_vfs) > 0:
3338
 
            # stacked knits can't use the fast path at present.
3339
 
            return self._simple_annotate(key)
3340
 
        while True:
3341
 
            try:
3342
 
                records = self._get_build_graph(key)
3343
 
                if key in self._ghosts:
3344
 
                    raise errors.RevisionNotPresent(key, self._knit)
3345
 
                self._annotate_records(records)
3346
 
                return self._annotated_lines[key]
3347
 
            except errors.RetryWithNewPacks, e:
3348
 
                self._knit._access.reload_or_raise(e)
3349
 
                # The cached build_details are no longer valid
3350
 
                self._all_build_details.clear()
3351
 
 
3352
 
    def _simple_annotate(self, key):
3353
 
        """Return annotated fulltext, rediffing from the full texts.
3354
 
 
3355
 
        This is slow but makes no assumptions about the repository
3356
 
        being able to produce line deltas.
3357
 
        """
3358
 
        # TODO: this code generates a parent maps of present ancestors; it
3359
 
        # could be split out into a separate method, and probably should use
3360
 
        # iter_ancestry instead. -- mbp and robertc 20080704
3361
 
        graph = _mod_graph.Graph(self._knit)
3362
 
        head_cache = _mod_graph.FrozenHeadsCache(graph)
3363
 
        search = graph._make_breadth_first_searcher([key])
3364
 
        keys = set()
3365
 
        while True:
3366
 
            try:
3367
 
                present, ghosts = search.next_with_ghosts()
3368
 
            except StopIteration:
3369
 
                break
3370
 
            keys.update(present)
3371
 
        parent_map = self._knit.get_parent_map(keys)
3372
 
        parent_cache = {}
3373
 
        reannotate = annotate.reannotate
3374
 
        for record in self._knit.get_record_stream(keys, 'topological', True):
3375
 
            key = record.key
3376
 
            fulltext = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
3377
 
            parents = parent_map[key]
3378
 
            if parents is not None:
3379
 
                parent_lines = [parent_cache[parent] for parent in parent_map[key]]
3380
 
            else:
3381
 
                parent_lines = []
3382
 
            parent_cache[key] = list(
3383
 
                reannotate(parent_lines, fulltext, key, None, head_cache))
3384
 
        try:
3385
 
            return parent_cache[key]
3386
 
        except KeyError, e:
3387
 
            raise errors.RevisionNotPresent(key, self._knit)
3388
 
 
 
3694
            # At this point, we may be able to yield this content, if all
 
3695
            # parents are also finished
 
3696
            yield_this_text = self._check_ready_for_annotations(key,
 
3697
                                                                parent_keys)
 
3698
            if yield_this_text:
 
3699
                # All parents present
 
3700
                yield key, lines, len(lines)
 
3701
            to_process = self._process_pending(key)
 
3702
            while to_process:
 
3703
                this_process = to_process
 
3704
                to_process = []
 
3705
                for key in this_process:
 
3706
                    lines = self._text_cache[key]
 
3707
                    yield key, lines, len(lines)
 
3708
                    to_process.extend(self._process_pending(key))
3389
3709
 
3390
3710
try:
3391
 
    from bzrlib._knit_load_data_c import _load_data_c as _load_data
3392
 
except ImportError:
 
3711
    from bzrlib._knit_load_data_pyx import _load_data_c as _load_data
 
3712
except ImportError, e:
 
3713
    osutils.failed_to_load_extension(e)
3393
3714
    from bzrlib._knit_load_data_py import _load_data_py as _load_data