~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Vincent Ladeuil
  • Date: 2009-07-02 13:07:14 UTC
  • mto: (4524.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 4525.
  • Revision ID: v.ladeuil+lp@free.fr-20090702130714-hsyqfusi8vn3a11m
Use tree.has_changes() where appropriate (the test suite caught a
bug in has_changes() (not filtering out the root) in an impressive
number of tests)

* bzrlib/send.py:
(send): Use tree.has_changes() instead of tree.changes_from().

* bzrlib/reconfigure.py:
(Reconfigure._check): Use tree.has_changes() instead of
tree.changes_from().

* bzrlib/merge.py:
(Merger.ensure_revision_trees, Merger.compare_basis): Use
tree.has_changes() instead of tree.changes_from().

* bzrlib/builtins.py:
(cmd_remove_tree.run, cmd_push.run, cmd_merge.run): Use
tree.has_changes() instead of tree.changes_from().

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Knit versionedfile implementation.
18
18
 
20
20
updates.
21
21
 
22
22
Knit file layout:
23
 
lifeless: the data file is made up of "delta records".  each delta record has a delta header 
24
 
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of 
25
 
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a 
 
23
lifeless: the data file is made up of "delta records".  each delta record has a delta header
 
24
that contains; (1) a version id, (2) the size of the delta (in lines), and (3)  the digest of
 
25
the -expanded data- (ie, the delta applied to the parent).  the delta also ends with a
26
26
end-marker; simply "end VERSION"
27
27
 
28
28
delta can be line or full contents.a
35
35
130,130,2
36
36
8         if elt.get('executable') == 'yes':
37
37
8             ie.executable = True
38
 
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad 
 
38
end robertc@robertcollins.net-20051003014215-ee2990904cc4c7ad
39
39
 
40
40
 
41
41
whats in an index:
51
51
 
52
52
"""
53
53
 
54
 
# TODOS:
55
 
# 10:16 < lifeless> make partial index writes safe
56
 
# 10:16 < lifeless> implement 'knit.check()' like weave.check()
57
 
# 10:17 < lifeless> record known ghosts so we can detect when they are filled in rather than the current 'reweave 
58
 
#                    always' approach.
59
 
# move sha1 out of the content so that join is faster at verifying parents
60
 
# record content length ?
61
 
                  
62
54
 
63
55
from cStringIO import StringIO
64
 
from itertools import izip, chain
 
56
from itertools import izip
65
57
import operator
66
58
import os
67
59
import sys
111
103
    ConstantMapper,
112
104
    ContentFactory,
113
105
    ChunkedContentFactory,
 
106
    sort_groupcompress,
114
107
    VersionedFile,
115
108
    VersionedFiles,
116
109
    )
131
124
 
132
125
DATA_SUFFIX = '.knit'
133
126
INDEX_SUFFIX = '.kndx'
 
127
_STREAM_MIN_BUFFER_SIZE = 5*1024*1024
134
128
 
135
129
 
136
130
class KnitAdapter(object):
138
132
 
139
133
    def __init__(self, basis_vf):
140
134
        """Create an adapter which accesses full texts from basis_vf.
141
 
        
 
135
 
142
136
        :param basis_vf: A versioned file to access basis texts of deltas from.
143
137
            May be None for adapters that do not need to access basis texts.
144
138
        """
151
145
class FTAnnotatedToUnannotated(KnitAdapter):
152
146
    """An adapter from FT annotated knits to unannotated ones."""
153
147
 
154
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
148
    def get_bytes(self, factory):
 
149
        annotated_compressed_bytes = factory._raw_record
155
150
        rec, contents = \
156
151
            self._data._parse_record_unchecked(annotated_compressed_bytes)
157
152
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
162
157
class DeltaAnnotatedToUnannotated(KnitAdapter):
163
158
    """An adapter for deltas from annotated to unannotated."""
164
159
 
165
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
160
    def get_bytes(self, factory):
 
161
        annotated_compressed_bytes = factory._raw_record
166
162
        rec, contents = \
167
163
            self._data._parse_record_unchecked(annotated_compressed_bytes)
168
164
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
175
171
class FTAnnotatedToFullText(KnitAdapter):
176
172
    """An adapter from FT annotated knits to unannotated ones."""
177
173
 
178
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
174
    def get_bytes(self, factory):
 
175
        annotated_compressed_bytes = factory._raw_record
179
176
        rec, contents = \
180
177
            self._data._parse_record_unchecked(annotated_compressed_bytes)
181
178
        content, delta = self._annotate_factory.parse_record(factory.key[-1],
186
183
class DeltaAnnotatedToFullText(KnitAdapter):
187
184
    """An adapter for deltas from annotated to unannotated."""
188
185
 
189
 
    def get_bytes(self, factory, annotated_compressed_bytes):
 
186
    def get_bytes(self, factory):
 
187
        annotated_compressed_bytes = factory._raw_record
190
188
        rec, contents = \
191
189
            self._data._parse_record_unchecked(annotated_compressed_bytes)
192
190
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
209
207
class FTPlainToFullText(KnitAdapter):
210
208
    """An adapter from FT plain knits to unannotated ones."""
211
209
 
212
 
    def get_bytes(self, factory, compressed_bytes):
 
210
    def get_bytes(self, factory):
 
211
        compressed_bytes = factory._raw_record
213
212
        rec, contents = \
214
213
            self._data._parse_record_unchecked(compressed_bytes)
215
214
        content, delta = self._plain_factory.parse_record(factory.key[-1],
220
219
class DeltaPlainToFullText(KnitAdapter):
221
220
    """An adapter for deltas from annotated to unannotated."""
222
221
 
223
 
    def get_bytes(self, factory, compressed_bytes):
 
222
    def get_bytes(self, factory):
 
223
        compressed_bytes = factory._raw_record
224
224
        rec, contents = \
225
225
            self._data._parse_record_unchecked(compressed_bytes)
226
226
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
242
242
 
243
243
class KnitContentFactory(ContentFactory):
244
244
    """Content factory for streaming from knits.
245
 
    
 
245
 
246
246
    :seealso ContentFactory:
247
247
    """
248
248
 
249
249
    def __init__(self, key, parents, build_details, sha1, raw_record,
250
 
        annotated, knit=None):
 
250
        annotated, knit=None, network_bytes=None):
251
251
        """Create a KnitContentFactory for key.
252
 
        
 
252
 
253
253
        :param key: The key.
254
254
        :param parents: The parents.
255
255
        :param build_details: The build details as returned from
257
257
        :param sha1: The sha1 expected from the full text of this object.
258
258
        :param raw_record: The bytes of the knit data from disk.
259
259
        :param annotated: True if the raw data is annotated.
 
260
        :param network_bytes: None to calculate the network bytes on demand,
 
261
            not-none if they are already known.
260
262
        """
261
263
        ContentFactory.__init__(self)
262
264
        self.sha1 = sha1
272
274
            annotated_kind = ''
273
275
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
274
276
        self._raw_record = raw_record
 
277
        self._network_bytes = network_bytes
275
278
        self._build_details = build_details
276
279
        self._knit = knit
277
280
 
 
281
    def _create_network_bytes(self):
 
282
        """Create a fully serialised network version for transmission."""
 
283
        # storage_kind, key, parents, Noeol, raw_record
 
284
        key_bytes = '\x00'.join(self.key)
 
285
        if self.parents is None:
 
286
            parent_bytes = 'None:'
 
287
        else:
 
288
            parent_bytes = '\t'.join('\x00'.join(key) for key in self.parents)
 
289
        if self._build_details[1]:
 
290
            noeol = 'N'
 
291
        else:
 
292
            noeol = ' '
 
293
        network_bytes = "%s\n%s\n%s\n%s%s" % (self.storage_kind, key_bytes,
 
294
            parent_bytes, noeol, self._raw_record)
 
295
        self._network_bytes = network_bytes
 
296
 
278
297
    def get_bytes_as(self, storage_kind):
279
298
        if storage_kind == self.storage_kind:
280
 
            return self._raw_record
 
299
            if self._network_bytes is None:
 
300
                self._create_network_bytes()
 
301
            return self._network_bytes
 
302
        if ('-ft-' in self.storage_kind and
 
303
            storage_kind in ('chunked', 'fulltext')):
 
304
            adapter_key = (self.storage_kind, 'fulltext')
 
305
            adapter_factory = adapter_registry.get(adapter_key)
 
306
            adapter = adapter_factory(None)
 
307
            bytes = adapter.get_bytes(self)
 
308
            if storage_kind == 'chunked':
 
309
                return [bytes]
 
310
            else:
 
311
                return bytes
281
312
        if self._knit is not None:
 
313
            # Not redundant with direct conversion above - that only handles
 
314
            # fulltext cases.
282
315
            if storage_kind == 'chunked':
283
316
                return self._knit.get_lines(self.key[0])
284
317
            elif storage_kind == 'fulltext':
287
320
            self.storage_kind)
288
321
 
289
322
 
 
323
class LazyKnitContentFactory(ContentFactory):
 
324
    """A ContentFactory which can either generate full text or a wire form.
 
325
 
 
326
    :seealso ContentFactory:
 
327
    """
 
328
 
 
329
    def __init__(self, key, parents, generator, first):
 
330
        """Create a LazyKnitContentFactory.
 
331
 
 
332
        :param key: The key of the record.
 
333
        :param parents: The parents of the record.
 
334
        :param generator: A _ContentMapGenerator containing the record for this
 
335
            key.
 
336
        :param first: Is this the first content object returned from generator?
 
337
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
338
            knit-delta-closure-ref
 
339
        """
 
340
        self.key = key
 
341
        self.parents = parents
 
342
        self.sha1 = None
 
343
        self._generator = generator
 
344
        self.storage_kind = "knit-delta-closure"
 
345
        if not first:
 
346
            self.storage_kind = self.storage_kind + "-ref"
 
347
        self._first = first
 
348
 
 
349
    def get_bytes_as(self, storage_kind):
 
350
        if storage_kind == self.storage_kind:
 
351
            if self._first:
 
352
                return self._generator._wire_bytes()
 
353
            else:
 
354
                # all the keys etc are contained in the bytes returned in the
 
355
                # first record.
 
356
                return ''
 
357
        if storage_kind in ('chunked', 'fulltext'):
 
358
            chunks = self._generator._get_one_work(self.key).text()
 
359
            if storage_kind == 'chunked':
 
360
                return chunks
 
361
            else:
 
362
                return ''.join(chunks)
 
363
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
364
            self.storage_kind)
 
365
 
 
366
 
 
367
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
368
    """Convert a network record to a iterator over stream records.
 
369
 
 
370
    :param storage_kind: The storage kind of the record.
 
371
        Must be 'knit-delta-closure'.
 
372
    :param bytes: The bytes of the record on the network.
 
373
    """
 
374
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
375
    return generator.get_record_stream()
 
376
 
 
377
 
 
378
def knit_network_to_record(storage_kind, bytes, line_end):
 
379
    """Convert a network record to a record object.
 
380
 
 
381
    :param storage_kind: The storage kind of the record.
 
382
    :param bytes: The bytes of the record on the network.
 
383
    """
 
384
    start = line_end
 
385
    line_end = bytes.find('\n', start)
 
386
    key = tuple(bytes[start:line_end].split('\x00'))
 
387
    start = line_end + 1
 
388
    line_end = bytes.find('\n', start)
 
389
    parent_line = bytes[start:line_end]
 
390
    if parent_line == 'None:':
 
391
        parents = None
 
392
    else:
 
393
        parents = tuple(
 
394
            [tuple(segment.split('\x00')) for segment in parent_line.split('\t')
 
395
             if segment])
 
396
    start = line_end + 1
 
397
    noeol = bytes[start] == 'N'
 
398
    if 'ft' in storage_kind:
 
399
        method = 'fulltext'
 
400
    else:
 
401
        method = 'line-delta'
 
402
    build_details = (method, noeol)
 
403
    start = start + 1
 
404
    raw_record = bytes[start:]
 
405
    annotated = 'annotated' in storage_kind
 
406
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
407
        annotated, network_bytes=bytes)]
 
408
 
 
409
 
290
410
class KnitContent(object):
291
411
    """Content of a knit version to which deltas can be applied.
292
 
    
 
412
 
293
413
    This is always stored in memory as a list of lines with \n at the end,
294
 
    plus a flag saying if the final ending is really there or not, because that 
 
414
    plus a flag saying if the final ending is really there or not, because that
295
415
    corresponds to the on-disk knit representation.
296
416
    """
297
417
 
386
506
 
387
507
class PlainKnitContent(KnitContent):
388
508
    """Unannotated content.
389
 
    
 
509
 
390
510
    When annotate[_iter] is called on this content, the same version is reported
391
511
    for all lines. Generally, annotate[_iter] is not useful on PlainKnitContent
392
512
    objects.
566
686
        content = knit._get_content(key)
567
687
        # adjust for the fact that serialised annotations are only key suffixes
568
688
        # for this factory.
569
 
        if type(key) == tuple:
 
689
        if type(key) is tuple:
570
690
            prefix = key[:-1]
571
691
            origins = content.annotate()
572
692
            result = []
647
767
 
648
768
    This is only functional enough to run interface tests, it doesn't try to
649
769
    provide a full pack environment.
650
 
    
 
770
 
651
771
    :param annotated: knit annotations are wanted.
652
772
    :param mapper: The mapper from keys to paths.
653
773
    """
663
783
 
664
784
    This is only functional enough to run interface tests, it doesn't try to
665
785
    provide a full pack environment.
666
 
    
 
786
 
667
787
    :param graph: Store a graph.
668
788
    :param delta: Delta compress contents.
669
789
    :param keylength: How long should keys be.
700
820
    versioned_files.writer.end()
701
821
 
702
822
 
 
823
def _get_total_build_size(self, keys, positions):
 
824
    """Determine the total bytes to build these keys.
 
825
 
 
826
    (helper function because _KnitGraphIndex and _KndxIndex work the same, but
 
827
    don't inherit from a common base.)
 
828
 
 
829
    :param keys: Keys that we want to build
 
830
    :param positions: dict of {key, (info, index_memo, comp_parent)} (such
 
831
        as returned by _get_components_positions)
 
832
    :return: Number of bytes to build those keys
 
833
    """
 
834
    all_build_index_memos = {}
 
835
    build_keys = keys
 
836
    while build_keys:
 
837
        next_keys = set()
 
838
        for key in build_keys:
 
839
            # This is mostly for the 'stacked' case
 
840
            # Where we will be getting the data from a fallback
 
841
            if key not in positions:
 
842
                continue
 
843
            _, index_memo, compression_parent = positions[key]
 
844
            all_build_index_memos[key] = index_memo
 
845
            if compression_parent not in all_build_index_memos:
 
846
                next_keys.add(compression_parent)
 
847
        build_keys = next_keys
 
848
    return sum([index_memo[2] for index_memo
 
849
                in all_build_index_memos.itervalues()])
 
850
 
 
851
 
703
852
class KnitVersionedFiles(VersionedFiles):
704
853
    """Storage for many versioned files using knit compression.
705
854
 
706
855
    Backend storage is managed by indices and data objects.
707
856
 
708
 
    :ivar _index: A _KnitGraphIndex or similar that can describe the 
709
 
        parents, graph, compression and data location of entries in this 
710
 
        KnitVersionedFiles.  Note that this is only the index for 
 
857
    :ivar _index: A _KnitGraphIndex or similar that can describe the
 
858
        parents, graph, compression and data location of entries in this
 
859
        KnitVersionedFiles.  Note that this is only the index for
711
860
        *this* vfs; if there are fallbacks they must be queried separately.
712
861
    """
713
862
 
760
909
            # indexes can't directly store that, so we give them
761
910
            # an empty tuple instead.
762
911
            parents = ()
 
912
        line_bytes = ''.join(lines)
763
913
        return self._add(key, lines, parents,
764
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
 
914
            parent_texts, left_matching_blocks, nostore_sha, random_id,
 
915
            line_bytes=line_bytes)
 
916
 
 
917
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
918
        """See VersionedFiles._add_text()."""
 
919
        self._index._check_write_ok()
 
920
        self._check_add(key, None, random_id, check_content=False)
 
921
        if text.__class__ is not str:
 
922
            raise errors.BzrBadParameterUnicode("text")
 
923
        if parents is None:
 
924
            # The caller might pass None if there is no graph data, but kndx
 
925
            # indexes can't directly store that, so we give them
 
926
            # an empty tuple instead.
 
927
            parents = ()
 
928
        return self._add(key, None, parents,
 
929
            None, None, nostore_sha, random_id,
 
930
            line_bytes=text)
765
931
 
766
932
    def _add(self, key, lines, parents, parent_texts,
767
 
        left_matching_blocks, nostore_sha, random_id):
 
933
        left_matching_blocks, nostore_sha, random_id,
 
934
        line_bytes):
768
935
        """Add a set of lines on top of version specified by parents.
769
936
 
770
937
        Any versions not present will be converted into ghosts.
 
938
 
 
939
        :param lines: A list of strings where each one is a single line (has a
 
940
            single newline at the end of the string) This is now optional
 
941
            (callers can pass None). It is left in its location for backwards
 
942
            compatibility. It should ''.join(lines) must == line_bytes
 
943
        :param line_bytes: A single string containing the content
 
944
 
 
945
        We pass both lines and line_bytes because different routes bring the
 
946
        values to this function. And for memory efficiency, we don't want to
 
947
        have to split/join on-demand.
771
948
        """
772
949
        # first thing, if the content is something we don't need to store, find
773
950
        # that out.
774
 
        line_bytes = ''.join(lines)
775
951
        digest = sha_string(line_bytes)
776
952
        if nostore_sha == digest:
777
953
            raise errors.ExistingContent
798
974
 
799
975
        text_length = len(line_bytes)
800
976
        options = []
801
 
        if lines:
802
 
            if lines[-1][-1] != '\n':
803
 
                # copy the contents of lines.
 
977
        no_eol = False
 
978
        # Note: line_bytes is not modified to add a newline, that is tracked
 
979
        #       via the no_eol flag. 'lines' *is* modified, because that is the
 
980
        #       general values needed by the Content code.
 
981
        if line_bytes and line_bytes[-1] != '\n':
 
982
            options.append('no-eol')
 
983
            no_eol = True
 
984
            # Copy the existing list, or create a new one
 
985
            if lines is None:
 
986
                lines = osutils.split_lines(line_bytes)
 
987
            else:
804
988
                lines = lines[:]
805
 
                options.append('no-eol')
806
 
                lines[-1] = lines[-1] + '\n'
807
 
                line_bytes += '\n'
 
989
            # Replace the last line with one that ends in a final newline
 
990
            lines[-1] = lines[-1] + '\n'
 
991
        if lines is None:
 
992
            lines = osutils.split_lines(line_bytes)
808
993
 
809
 
        for element in key:
810
 
            if type(element) != str:
 
994
        for element in key[:-1]:
 
995
            if type(element) is not str:
 
996
                raise TypeError("key contains non-strings: %r" % (key,))
 
997
        if key[-1] is None:
 
998
            key = key[:-1] + ('sha1:' + digest,)
 
999
        elif type(key[-1]) is not str:
811
1000
                raise TypeError("key contains non-strings: %r" % (key,))
812
1001
        # Knit hunks are still last-element only
813
1002
        version_id = key[-1]
814
1003
        content = self._factory.make(lines, version_id)
815
 
        if 'no-eol' in options:
 
1004
        if no_eol:
816
1005
            # Hint to the content object that its text() call should strip the
817
1006
            # EOL.
818
1007
            content._should_strip_eol = True
830
1019
        else:
831
1020
            options.append('fulltext')
832
1021
            # isinstance is slower and we have no hierarchy.
833
 
            if self._factory.__class__ == KnitPlainFactory:
 
1022
            if self._factory.__class__ is KnitPlainFactory:
834
1023
                # Use the already joined bytes saving iteration time in
835
1024
                # _record_to_data.
 
1025
                dense_lines = [line_bytes]
 
1026
                if no_eol:
 
1027
                    dense_lines.append('\n')
836
1028
                size, bytes = self._record_to_data(key, digest,
837
 
                    lines, [line_bytes])
 
1029
                    lines, dense_lines)
838
1030
            else:
839
1031
                # get mixed annotation + content and feed it into the
840
1032
                # serialiser.
873
1065
    def _check_add(self, key, lines, random_id, check_content):
874
1066
        """check that version_id and lines are safe to add."""
875
1067
        version_id = key[-1]
876
 
        if contains_whitespace(version_id):
877
 
            raise InvalidRevisionId(version_id, self)
878
 
        self.check_not_reserved_id(version_id)
 
1068
        if version_id is not None:
 
1069
            if contains_whitespace(version_id):
 
1070
                raise InvalidRevisionId(version_id, self)
 
1071
            self.check_not_reserved_id(version_id)
879
1072
        # TODO: If random_id==False and the key is already present, we should
880
1073
        # probably check that the existing content is identical to what is
881
1074
        # being inserted, and otherwise raise an exception.  This would make
891
1084
 
892
1085
    def _check_header_version(self, rec, version_id):
893
1086
        """Checks the header version on original format knit records.
894
 
        
 
1087
 
895
1088
        These have the last component of the key embedded in the record.
896
1089
        """
897
1090
        if rec[1] != version_id:
976
1169
            if missing and not allow_missing:
977
1170
                raise errors.RevisionNotPresent(missing.pop(), self)
978
1171
        return component_data
979
 
       
 
1172
 
980
1173
    def _get_content(self, key, parent_texts={}):
981
1174
        """Returns a content object that makes up the specified
982
1175
        version."""
986
1179
            if not self.get_parent_map([key]):
987
1180
                raise RevisionNotPresent(key, self)
988
1181
            return cached_version
989
 
        text_map, contents_map = self._get_content_maps([key])
990
 
        return contents_map[key]
991
 
 
992
 
    def _get_content_maps(self, keys, nonlocal_keys=None):
993
 
        """Produce maps of text and KnitContents
994
 
        
995
 
        :param keys: The keys to produce content maps for.
996
 
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
997
 
            which are known to not be in this knit, but rather in one of the
998
 
            fallback knits.
999
 
        :return: (text_map, content_map) where text_map contains the texts for
1000
 
            the requested versions and content_map contains the KnitContents.
1001
 
        """
1002
 
        # FUTURE: This function could be improved for the 'extract many' case
1003
 
        # by tracking each component and only doing the copy when the number of
1004
 
        # children than need to apply delta's to it is > 1 or it is part of the
1005
 
        # final output.
1006
 
        keys = list(keys)
1007
 
        multiple_versions = len(keys) != 1
1008
 
        record_map = self._get_record_map(keys, allow_missing=True)
1009
 
 
1010
 
        text_map = {}
1011
 
        content_map = {}
1012
 
        final_content = {}
1013
 
        if nonlocal_keys is None:
1014
 
            nonlocal_keys = set()
1015
 
        else:
1016
 
            nonlocal_keys = frozenset(nonlocal_keys)
1017
 
        missing_keys = set(nonlocal_keys)
1018
 
        for source in self._fallback_vfs:
1019
 
            if not missing_keys:
1020
 
                break
1021
 
            for record in source.get_record_stream(missing_keys,
1022
 
                'unordered', True):
1023
 
                if record.storage_kind == 'absent':
1024
 
                    continue
1025
 
                missing_keys.remove(record.key)
1026
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
 
                text_map[record.key] = lines
1028
 
                content_map[record.key] = PlainKnitContent(lines, record.key)
1029
 
                if record.key in keys:
1030
 
                    final_content[record.key] = content_map[record.key]
1031
 
        for key in keys:
1032
 
            if key in nonlocal_keys:
1033
 
                # already handled
1034
 
                continue
1035
 
            components = []
1036
 
            cursor = key
1037
 
            while cursor is not None:
1038
 
                try:
1039
 
                    record, record_details, digest, next = record_map[cursor]
1040
 
                except KeyError:
1041
 
                    raise RevisionNotPresent(cursor, self)
1042
 
                components.append((cursor, record, record_details, digest))
1043
 
                cursor = next
1044
 
                if cursor in content_map:
1045
 
                    # no need to plan further back
1046
 
                    components.append((cursor, None, None, None))
1047
 
                    break
1048
 
 
1049
 
            content = None
1050
 
            for (component_id, record, record_details,
1051
 
                 digest) in reversed(components):
1052
 
                if component_id in content_map:
1053
 
                    content = content_map[component_id]
1054
 
                else:
1055
 
                    content, delta = self._factory.parse_record(key[-1],
1056
 
                        record, record_details, content,
1057
 
                        copy_base_content=multiple_versions)
1058
 
                    if multiple_versions:
1059
 
                        content_map[component_id] = content
1060
 
 
1061
 
            final_content[key] = content
1062
 
 
1063
 
            # digest here is the digest from the last applied component.
1064
 
            text = content.text()
1065
 
            actual_sha = sha_strings(text)
1066
 
            if actual_sha != digest:
1067
 
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1068
 
            text_map[key] = text
1069
 
        return text_map, final_content
 
1182
        generator = _VFContentMapGenerator(self, [key])
 
1183
        return generator._get_content(key)
1070
1184
 
1071
1185
    def get_parent_map(self, keys):
1072
1186
        """Get a map of the graph parents of keys.
1102
1216
 
1103
1217
    def _get_record_map(self, keys, allow_missing=False):
1104
1218
        """Produce a dictionary of knit records.
1105
 
        
 
1219
 
1106
1220
        :return: {key:(record, record_details, digest, next)}
1107
1221
            record
1108
 
                data returned from read_records
 
1222
                data returned from read_records (a KnitContentobject)
1109
1223
            record_details
1110
1224
                opaque information to pass to parse_record
1111
1225
            digest
1114
1228
                build-parent of the version, i.e. the leftmost ancestor.
1115
1229
                Will be None if the record is not a delta.
1116
1230
        :param keys: The keys to build a map for
1117
 
        :param allow_missing: If some records are missing, rather than 
 
1231
        :param allow_missing: If some records are missing, rather than
1118
1232
            error, just return the data that could be generated.
1119
1233
        """
 
1234
        raw_map = self._get_record_map_unparsed(keys,
 
1235
            allow_missing=allow_missing)
 
1236
        return self._raw_map_to_record_map(raw_map)
 
1237
 
 
1238
    def _raw_map_to_record_map(self, raw_map):
 
1239
        """Parse the contents of _get_record_map_unparsed.
 
1240
 
 
1241
        :return: see _get_record_map.
 
1242
        """
 
1243
        result = {}
 
1244
        for key in raw_map:
 
1245
            data, record_details, next = raw_map[key]
 
1246
            content, digest = self._parse_record(key[-1], data)
 
1247
            result[key] = content, record_details, digest, next
 
1248
        return result
 
1249
 
 
1250
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1251
        """Get the raw data for reconstructing keys without parsing it.
 
1252
 
 
1253
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1254
            key-> raw_bytes, (method, noeol), compression_parent
 
1255
        """
1120
1256
        # This retries the whole request if anything fails. Potentially we
1121
1257
        # could be a bit more selective. We could track the keys whose records
1122
1258
        # we have successfully found, and then only request the new records
1132
1268
                # n = next
1133
1269
                records = [(key, i_m) for key, (r, i_m, n)
1134
1270
                                       in position_map.iteritems()]
1135
 
                record_map = {}
1136
 
                for key, record, digest in self._read_records_iter(records):
 
1271
                # Sort by the index memo, so that we request records from the
 
1272
                # same pack file together, and in forward-sorted order
 
1273
                records.sort(key=operator.itemgetter(1))
 
1274
                raw_record_map = {}
 
1275
                for key, data in self._read_records_iter_unchecked(records):
1137
1276
                    (record_details, index_memo, next) = position_map[key]
1138
 
                    record_map[key] = record, record_details, digest, next
1139
 
                return record_map
 
1277
                    raw_record_map[key] = data, record_details, next
 
1278
                return raw_record_map
1140
1279
            except errors.RetryWithNewPacks, e:
1141
1280
                self._access.reload_or_raise(e)
1142
1281
 
1143
 
    def _split_by_prefix(self, keys):
 
1282
    @classmethod
 
1283
    def _split_by_prefix(cls, keys):
1144
1284
        """For the given keys, split them up based on their prefix.
1145
1285
 
1146
1286
        To keep memory pressure somewhat under control, split the
1149
1289
        This should be revisited if _get_content_maps() can ever cross
1150
1290
        file-id boundaries.
1151
1291
 
 
1292
        The keys for a given file_id are kept in the same relative order.
 
1293
        Ordering between file_ids is not, though prefix_order will return the
 
1294
        order that the key was first seen.
 
1295
 
1152
1296
        :param keys: An iterable of key tuples
1153
 
        :return: A dict of {prefix: [key_list]}
 
1297
        :return: (split_map, prefix_order)
 
1298
            split_map       A dictionary mapping prefix => keys
 
1299
            prefix_order    The order that we saw the various prefixes
1154
1300
        """
1155
1301
        split_by_prefix = {}
 
1302
        prefix_order = []
1156
1303
        for key in keys:
1157
1304
            if len(key) == 1:
1158
 
                split_by_prefix.setdefault('', []).append(key)
1159
 
            else:
1160
 
                split_by_prefix.setdefault(key[0], []).append(key)
1161
 
        return split_by_prefix
 
1305
                prefix = ''
 
1306
            else:
 
1307
                prefix = key[0]
 
1308
 
 
1309
            if prefix in split_by_prefix:
 
1310
                split_by_prefix[prefix].append(key)
 
1311
            else:
 
1312
                split_by_prefix[prefix] = [key]
 
1313
                prefix_order.append(prefix)
 
1314
        return split_by_prefix, prefix_order
 
1315
 
 
1316
    def _group_keys_for_io(self, keys, non_local_keys, positions,
 
1317
                           _min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
 
1318
        """For the given keys, group them into 'best-sized' requests.
 
1319
 
 
1320
        The idea is to avoid making 1 request per file, but to never try to
 
1321
        unpack an entire 1.5GB source tree in a single pass. Also when
 
1322
        possible, we should try to group requests to the same pack file
 
1323
        together.
 
1324
 
 
1325
        :return: list of (keys, non_local) tuples that indicate what keys
 
1326
            should be fetched next.
 
1327
        """
 
1328
        # TODO: Ideally we would group on 2 factors. We want to extract texts
 
1329
        #       from the same pack file together, and we want to extract all
 
1330
        #       the texts for a given build-chain together. Ultimately it
 
1331
        #       probably needs a better global view.
 
1332
        total_keys = len(keys)
 
1333
        prefix_split_keys, prefix_order = self._split_by_prefix(keys)
 
1334
        prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
 
1335
        cur_keys = []
 
1336
        cur_non_local = set()
 
1337
        cur_size = 0
 
1338
        result = []
 
1339
        sizes = []
 
1340
        for prefix in prefix_order:
 
1341
            keys = prefix_split_keys[prefix]
 
1342
            non_local = prefix_split_non_local_keys.get(prefix, [])
 
1343
 
 
1344
            this_size = self._index._get_total_build_size(keys, positions)
 
1345
            cur_size += this_size
 
1346
            cur_keys.extend(keys)
 
1347
            cur_non_local.update(non_local)
 
1348
            if cur_size > _min_buffer_size:
 
1349
                result.append((cur_keys, cur_non_local))
 
1350
                sizes.append(cur_size)
 
1351
                cur_keys = []
 
1352
                cur_non_local = set()
 
1353
                cur_size = 0
 
1354
        if cur_keys:
 
1355
            result.append((cur_keys, cur_non_local))
 
1356
            sizes.append(cur_size)
 
1357
        return result
1162
1358
 
1163
1359
    def get_record_stream(self, keys, ordering, include_delta_closure):
1164
1360
        """Get a stream of records for keys.
1177
1373
        if not keys:
1178
1374
            return
1179
1375
        if not self._index.has_graph:
1180
 
            # Cannot topological order when no graph has been stored.
 
1376
            # Cannot sort when no graph has been stored.
1181
1377
            ordering = 'unordered'
1182
1378
 
1183
1379
        remaining_keys = keys
1206
1402
        absent_keys = keys.difference(set(positions))
1207
1403
        # There may be more absent keys : if we're missing the basis component
1208
1404
        # and are trying to include the delta closure.
 
1405
        # XXX: We should not ever need to examine remote sources because we do
 
1406
        # not permit deltas across versioned files boundaries.
1209
1407
        if include_delta_closure:
1210
1408
            needed_from_fallback = set()
1211
1409
            # Build up reconstructable_keys dict.  key:True in this dict means
1237
1435
                    needed_from_fallback.add(key)
1238
1436
        # Double index lookups here : need a unified api ?
1239
1437
        global_map, parent_maps = self._get_parent_map_with_sources(keys)
1240
 
        if ordering == 'topological':
1241
 
            # Global topological sort
1242
 
            present_keys = tsort.topo_sort(global_map)
 
1438
        if ordering in ('topological', 'groupcompress'):
 
1439
            if ordering == 'topological':
 
1440
                # Global topological sort
 
1441
                present_keys = tsort.topo_sort(global_map)
 
1442
            else:
 
1443
                present_keys = sort_groupcompress(global_map)
1243
1444
            # Now group by source:
1244
1445
            source_keys = []
1245
1446
            current_source = None
1255
1456
        else:
1256
1457
            if ordering != 'unordered':
1257
1458
                raise AssertionError('valid values for ordering are:'
1258
 
                    ' "unordered" or "topological" not: %r'
 
1459
                    ' "unordered", "groupcompress" or "topological" not: %r'
1259
1460
                    % (ordering,))
1260
1461
            # Just group by source; remote sources first.
1261
1462
            present_keys = []
1283
1484
            # XXX: get_content_maps performs its own index queries; allow state
1284
1485
            # to be passed in.
1285
1486
            non_local_keys = needed_from_fallback - absent_keys
1286
 
            prefix_split_keys = self._split_by_prefix(present_keys)
1287
 
            prefix_split_non_local_keys = self._split_by_prefix(non_local_keys)
1288
 
            for prefix, keys in prefix_split_keys.iteritems():
1289
 
                non_local = prefix_split_non_local_keys.get(prefix, [])
1290
 
                non_local = set(non_local)
1291
 
                text_map, _ = self._get_content_maps(keys, non_local)
1292
 
                for key in keys:
1293
 
                    lines = text_map.pop(key)
1294
 
                    yield ChunkedContentFactory(key, global_map[key], None,
1295
 
                                                lines)
 
1487
            for keys, non_local_keys in self._group_keys_for_io(present_keys,
 
1488
                                                                non_local_keys,
 
1489
                                                                positions):
 
1490
                generator = _VFContentMapGenerator(self, keys, non_local_keys,
 
1491
                                                   global_map)
 
1492
                for record in generator.get_record_stream():
 
1493
                    yield record
1296
1494
        else:
1297
1495
            for source, keys in source_keys:
1298
1496
                if source is parent_maps[0]:
1330
1528
    def insert_record_stream(self, stream):
1331
1529
        """Insert a record stream into this container.
1332
1530
 
1333
 
        :param stream: A stream of records to insert. 
 
1531
        :param stream: A stream of records to insert.
1334
1532
        :return: None
1335
1533
        :seealso VersionedFiles.get_record_stream:
1336
1534
        """
1376
1574
        # key = basis_parent, value = index entry to add
1377
1575
        buffered_index_entries = {}
1378
1576
        for record in stream:
 
1577
            buffered = False
1379
1578
            parents = record.parents
1380
1579
            if record.storage_kind in delta_types:
1381
1580
                # TODO: eventually the record itself should track
1408
1607
                    except KeyError:
1409
1608
                        adapter_key = (record.storage_kind, "knit-ft-gz")
1410
1609
                        adapter = get_adapter(adapter_key)
1411
 
                    bytes = adapter.get_bytes(
1412
 
                        record, record.get_bytes_as(record.storage_kind))
 
1610
                    bytes = adapter.get_bytes(record)
1413
1611
                else:
1414
 
                    bytes = record.get_bytes_as(record.storage_kind)
 
1612
                    # It's a knit record, it has a _raw_record field (even if
 
1613
                    # it was reconstituted from a network stream).
 
1614
                    bytes = record._raw_record
1415
1615
                options = [record._build_details[0]]
1416
1616
                if record._build_details[1]:
1417
1617
                    options.append('no-eol')
1426
1626
                access_memo = self._access.add_raw_records(
1427
1627
                    [(record.key, len(bytes))], bytes)[0]
1428
1628
                index_entry = (record.key, options, access_memo, parents)
1429
 
                buffered = False
1430
1629
                if 'fulltext' not in options:
1431
1630
                    # Not a fulltext, so we need to make sure the compression
1432
1631
                    # parent will also be present.
1448
1647
            elif record.storage_kind == 'chunked':
1449
1648
                self.add_lines(record.key, parents,
1450
1649
                    osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
 
            elif record.storage_kind == 'fulltext':
1452
 
                self.add_lines(record.key, parents,
1453
 
                    split_lines(record.get_bytes_as('fulltext')))
1454
1650
            else:
1455
 
                # Not a fulltext, and not suitable for direct insertion as a
 
1651
                # Not suitable for direct insertion as a
1456
1652
                # delta, either because it's not the right format, or this
1457
1653
                # KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
1654
                # 0) or because it depends on a base only present in the
1459
1655
                # fallback kvfs.
1460
 
                adapter_key = record.storage_kind, 'fulltext'
1461
 
                adapter = get_adapter(adapter_key)
1462
 
                lines = split_lines(adapter.get_bytes(
1463
 
                    record, record.get_bytes_as(record.storage_kind)))
 
1656
                self._access.flush()
 
1657
                try:
 
1658
                    # Try getting a fulltext directly from the record.
 
1659
                    bytes = record.get_bytes_as('fulltext')
 
1660
                except errors.UnavailableRepresentation:
 
1661
                    adapter_key = record.storage_kind, 'fulltext'
 
1662
                    adapter = get_adapter(adapter_key)
 
1663
                    bytes = adapter.get_bytes(record)
 
1664
                lines = split_lines(bytes)
1464
1665
                try:
1465
1666
                    self.add_lines(record.key, parents, lines)
1466
1667
                except errors.RevisionAlreadyPresent:
1467
1668
                    pass
1468
1669
            # Add any records whose basis parent is now available.
1469
 
            added_keys = [record.key]
1470
 
            while added_keys:
1471
 
                key = added_keys.pop(0)
1472
 
                if key in buffered_index_entries:
1473
 
                    index_entries = buffered_index_entries[key]
1474
 
                    self._index.add_records(index_entries)
1475
 
                    added_keys.extend(
1476
 
                        [index_entry[0] for index_entry in index_entries])
1477
 
                    del buffered_index_entries[key]
1478
 
        # If there were any deltas which had a missing basis parent, error.
 
1670
            if not buffered:
 
1671
                added_keys = [record.key]
 
1672
                while added_keys:
 
1673
                    key = added_keys.pop(0)
 
1674
                    if key in buffered_index_entries:
 
1675
                        index_entries = buffered_index_entries[key]
 
1676
                        self._index.add_records(index_entries)
 
1677
                        added_keys.extend(
 
1678
                            [index_entry[0] for index_entry in index_entries])
 
1679
                        del buffered_index_entries[key]
1479
1680
        if buffered_index_entries:
1480
 
            from pprint import pformat
1481
 
            raise errors.BzrCheckError(
1482
 
                "record_stream refers to compression parents not in %r:\n%s"
1483
 
                % (self, pformat(sorted(buffered_index_entries.keys()))))
 
1681
            # There were index entries buffered at the end of the stream,
 
1682
            # So these need to be added (if the index supports holding such
 
1683
            # entries for later insertion)
 
1684
            for key in buffered_index_entries:
 
1685
                index_entries = buffered_index_entries[key]
 
1686
                self._index.add_records(index_entries,
 
1687
                    missing_compression_parents=True)
 
1688
 
 
1689
    def get_missing_compression_parent_keys(self):
 
1690
        """Return an iterable of keys of missing compression parents.
 
1691
 
 
1692
        Check this after calling insert_record_stream to find out if there are
 
1693
        any missing compression parents.  If there are, the records that
 
1694
        depend on them are not able to be inserted safely. For atomic
 
1695
        KnitVersionedFiles built on packs, the transaction should be aborted or
 
1696
        suspended - commit will fail at this point. Nonatomic knits will error
 
1697
        earlier because they have no staging area to put pending entries into.
 
1698
        """
 
1699
        return self._index.get_missing_compression_parents()
1484
1700
 
1485
1701
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1486
1702
        """Iterate over the lines in the versioned files from keys.
1503
1719
         * If a requested key did not change any lines (or didn't have any
1504
1720
           lines), it may not be mentioned at all in the result.
1505
1721
 
 
1722
        :param pb: Progress bar supplied by caller.
1506
1723
        :return: An iterator over (line, key).
1507
1724
        """
1508
1725
        if pb is None:
1522
1739
                        key_records.append((key, details[0]))
1523
1740
                records_iter = enumerate(self._read_records_iter(key_records))
1524
1741
                for (key_idx, (key, data, sha_value)) in records_iter:
1525
 
                    pb.update('Walking content.', key_idx, total)
 
1742
                    pb.update('Walking content', key_idx, total)
1526
1743
                    compression_parent = build_details[key][1]
1527
1744
                    if compression_parent is None:
1528
1745
                        # fulltext
1529
1746
                        line_iterator = self._factory.get_fulltext_content(data)
1530
1747
                    else:
1531
 
                        # Delta 
 
1748
                        # Delta
1532
1749
                        line_iterator = self._factory.get_linedelta_content(data)
1533
1750
                    # Now that we are yielding the data for this key, remove it
1534
1751
                    # from the list
1545
1762
        # If there are still keys we've not yet found, we look in the fallback
1546
1763
        # vfs, and hope to find them there.  Note that if the keys are found
1547
1764
        # but had no changes or no content, the fallback may not return
1548
 
        # anything.  
 
1765
        # anything.
1549
1766
        if keys and not self._fallback_vfs:
1550
1767
            # XXX: strictly the second parameter is meant to be the file id
1551
1768
            # but it's not easily accessible here.
1558
1775
                source_keys.add(key)
1559
1776
                yield line, key
1560
1777
            keys.difference_update(source_keys)
1561
 
        pb.update('Walking content.', total, total)
 
1778
        pb.update('Walking content', total, total)
1562
1779
 
1563
1780
    def _make_line_delta(self, delta_seq, new_content):
1564
1781
        """Generate a line delta from delta_seq and new_content."""
1573
1790
                           delta=None, annotated=None,
1574
1791
                           left_matching_blocks=None):
1575
1792
        """Merge annotations for content and generate deltas.
1576
 
        
 
1793
 
1577
1794
        This is done by comparing the annotations based on changes to the text
1578
1795
        and generating a delta on the resulting full texts. If annotations are
1579
1796
        not being created then a simple delta is created.
1661
1878
                                 rec[1], record_contents))
1662
1879
        if last_line != 'end %s\n' % rec[1]:
1663
1880
            raise KnitCorrupt(self,
1664
 
                              'unexpected version end line %r, wanted %r' 
 
1881
                              'unexpected version end line %r, wanted %r'
1665
1882
                              % (last_line, rec[1]))
1666
1883
        df.close()
1667
1884
        return rec, record_contents
1684
1901
        if not needed_records:
1685
1902
            return
1686
1903
 
1687
 
        # The transport optimizes the fetching as well 
 
1904
        # The transport optimizes the fetching as well
1688
1905
        # (ie, reads continuous ranges.)
1689
1906
        raw_data = self._access.get_raw_records(
1690
1907
            [index_memo for key, index_memo in needed_records])
1700
1917
        This unpacks enough of the text record to validate the id is
1701
1918
        as expected but thats all.
1702
1919
 
1703
 
        Each item the iterator yields is (key, bytes, sha1_of_full_text).
 
1920
        Each item the iterator yields is (key, bytes,
 
1921
            expected_sha1_of_full_text).
 
1922
        """
 
1923
        for key, data in self._read_records_iter_unchecked(records):
 
1924
            # validate the header (note that we can only use the suffix in
 
1925
            # current knit records).
 
1926
            df, rec = self._parse_record_header(key, data)
 
1927
            df.close()
 
1928
            yield key, data, rec[3]
 
1929
 
 
1930
    def _read_records_iter_unchecked(self, records):
 
1931
        """Read text records from data file and yield raw data.
 
1932
 
 
1933
        No validation is done.
 
1934
 
 
1935
        Yields tuples of (key, data).
1704
1936
        """
1705
1937
        # setup an iterator of the external records:
1706
1938
        # uses readv so nice and fast we hope.
1712
1944
 
1713
1945
        for key, index_memo in records:
1714
1946
            data = raw_records.next()
1715
 
            # validate the header (note that we can only use the suffix in
1716
 
            # current knit records).
1717
 
            df, rec = self._parse_record_header(key, data)
1718
 
            df.close()
1719
 
            yield key, data, rec[3]
 
1947
            yield key, data
1720
1948
 
1721
1949
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1722
1950
        """Convert key, digest, lines into a raw data block.
1723
 
        
 
1951
 
1724
1952
        :param key: The key of the record. Currently keys are always serialised
1725
1953
            using just the trailing component.
1726
1954
        :param dense_lines: The bytes of lines but in a denser form. For
1731
1959
            function spends less time resizing the final string.
1732
1960
        :return: (len, a StringIO instance with the raw data ready to read.)
1733
1961
        """
1734
 
        # Note: using a string copy here increases memory pressure with e.g.
1735
 
        # ISO's, but it is about 3 seconds faster on a 1.2Ghz intel machine
1736
 
        # when doing the initial commit of a mozilla tree. RBC 20070921
1737
 
        bytes = ''.join(chain(
1738
 
            ["version %s %d %s\n" % (key[-1],
1739
 
                                     len(lines),
1740
 
                                     digest)],
1741
 
            dense_lines or lines,
1742
 
            ["end %s\n" % key[-1]]))
1743
 
        if type(bytes) != str:
1744
 
            raise AssertionError(
1745
 
                'data must be plain bytes was %s' % type(bytes))
 
1962
        chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
 
1963
        chunks.extend(dense_lines or lines)
 
1964
        chunks.append("end %s\n" % key[-1])
 
1965
        for chunk in chunks:
 
1966
            if type(chunk) is not str:
 
1967
                raise AssertionError(
 
1968
                    'data must be plain bytes was %s' % type(chunk))
1746
1969
        if lines and lines[-1][-1] != '\n':
1747
1970
            raise ValueError('corrupt lines value %r' % lines)
1748
 
        compressed_bytes = tuned_gzip.bytes_to_gzip(bytes)
 
1971
        compressed_bytes = tuned_gzip.chunks_to_gzip(chunks)
1749
1972
        return len(compressed_bytes), compressed_bytes
1750
1973
 
1751
1974
    def _split_header(self, line):
1766
1989
        return result
1767
1990
 
1768
1991
 
 
1992
class _ContentMapGenerator(object):
 
1993
    """Generate texts or expose raw deltas for a set of texts."""
 
1994
 
 
1995
    def _get_content(self, key):
 
1996
        """Get the content object for key."""
 
1997
        # Note that _get_content is only called when the _ContentMapGenerator
 
1998
        # has been constructed with just one key requested for reconstruction.
 
1999
        if key in self.nonlocal_keys:
 
2000
            record = self.get_record_stream().next()
 
2001
            # Create a content object on the fly
 
2002
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
2003
            return PlainKnitContent(lines, record.key)
 
2004
        else:
 
2005
            # local keys we can ask for directly
 
2006
            return self._get_one_work(key)
 
2007
 
 
2008
    def get_record_stream(self):
 
2009
        """Get a record stream for the keys requested during __init__."""
 
2010
        for record in self._work():
 
2011
            yield record
 
2012
 
 
2013
    def _work(self):
 
2014
        """Produce maps of text and KnitContents as dicts.
 
2015
 
 
2016
        :return: (text_map, content_map) where text_map contains the texts for
 
2017
            the requested versions and content_map contains the KnitContents.
 
2018
        """
 
2019
        # NB: By definition we never need to read remote sources unless texts
 
2020
        # are requested from them: we don't delta across stores - and we
 
2021
        # explicitly do not want to to prevent data loss situations.
 
2022
        if self.global_map is None:
 
2023
            self.global_map = self.vf.get_parent_map(self.keys)
 
2024
        nonlocal_keys = self.nonlocal_keys
 
2025
 
 
2026
        missing_keys = set(nonlocal_keys)
 
2027
        # Read from remote versioned file instances and provide to our caller.
 
2028
        for source in self.vf._fallback_vfs:
 
2029
            if not missing_keys:
 
2030
                break
 
2031
            # Loop over fallback repositories asking them for texts - ignore
 
2032
            # any missing from a particular fallback.
 
2033
            for record in source.get_record_stream(missing_keys,
 
2034
                'unordered', True):
 
2035
                if record.storage_kind == 'absent':
 
2036
                    # Not in thie particular stream, may be in one of the
 
2037
                    # other fallback vfs objects.
 
2038
                    continue
 
2039
                missing_keys.remove(record.key)
 
2040
                yield record
 
2041
 
 
2042
        if self._raw_record_map is None:
 
2043
            raise AssertionError('_raw_record_map should have been filled')
 
2044
        first = True
 
2045
        for key in self.keys:
 
2046
            if key in self.nonlocal_keys:
 
2047
                continue
 
2048
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2049
            first = False
 
2050
 
 
2051
    def _get_one_work(self, requested_key):
 
2052
        # Now, if we have calculated everything already, just return the
 
2053
        # desired text.
 
2054
        if requested_key in self._contents_map:
 
2055
            return self._contents_map[requested_key]
 
2056
        # To simplify things, parse everything at once - code that wants one text
 
2057
        # probably wants them all.
 
2058
        # FUTURE: This function could be improved for the 'extract many' case
 
2059
        # by tracking each component and only doing the copy when the number of
 
2060
        # children than need to apply delta's to it is > 1 or it is part of the
 
2061
        # final output.
 
2062
        multiple_versions = len(self.keys) != 1
 
2063
        if self._record_map is None:
 
2064
            self._record_map = self.vf._raw_map_to_record_map(
 
2065
                self._raw_record_map)
 
2066
        record_map = self._record_map
 
2067
        # raw_record_map is key:
 
2068
        # Have read and parsed records at this point.
 
2069
        for key in self.keys:
 
2070
            if key in self.nonlocal_keys:
 
2071
                # already handled
 
2072
                continue
 
2073
            components = []
 
2074
            cursor = key
 
2075
            while cursor is not None:
 
2076
                try:
 
2077
                    record, record_details, digest, next = record_map[cursor]
 
2078
                except KeyError:
 
2079
                    raise RevisionNotPresent(cursor, self)
 
2080
                components.append((cursor, record, record_details, digest))
 
2081
                cursor = next
 
2082
                if cursor in self._contents_map:
 
2083
                    # no need to plan further back
 
2084
                    components.append((cursor, None, None, None))
 
2085
                    break
 
2086
 
 
2087
            content = None
 
2088
            for (component_id, record, record_details,
 
2089
                 digest) in reversed(components):
 
2090
                if component_id in self._contents_map:
 
2091
                    content = self._contents_map[component_id]
 
2092
                else:
 
2093
                    content, delta = self._factory.parse_record(key[-1],
 
2094
                        record, record_details, content,
 
2095
                        copy_base_content=multiple_versions)
 
2096
                    if multiple_versions:
 
2097
                        self._contents_map[component_id] = content
 
2098
 
 
2099
            # digest here is the digest from the last applied component.
 
2100
            text = content.text()
 
2101
            actual_sha = sha_strings(text)
 
2102
            if actual_sha != digest:
 
2103
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
2104
        if multiple_versions:
 
2105
            return self._contents_map[requested_key]
 
2106
        else:
 
2107
            return content
 
2108
 
 
2109
    def _wire_bytes(self):
 
2110
        """Get the bytes to put on the wire for 'key'.
 
2111
 
 
2112
        The first collection of bytes asked for returns the serialised
 
2113
        raw_record_map and the additional details (key, parent) for key.
 
2114
        Subsequent calls return just the additional details (key, parent).
 
2115
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
2116
        For subsequent keys it is 'knit-delta-closure-ref'.
 
2117
 
 
2118
        :param key: A key from the content generator.
 
2119
        :return: Bytes to put on the wire.
 
2120
        """
 
2121
        lines = []
 
2122
        # kind marker for dispatch on the far side,
 
2123
        lines.append('knit-delta-closure')
 
2124
        # Annotated or not
 
2125
        if self.vf._factory.annotated:
 
2126
            lines.append('annotated')
 
2127
        else:
 
2128
            lines.append('')
 
2129
        # then the list of keys
 
2130
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
2131
            if key not in self.nonlocal_keys]))
 
2132
        # then the _raw_record_map in serialised form:
 
2133
        map_byte_list = []
 
2134
        # for each item in the map:
 
2135
        # 1 line with key
 
2136
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
2137
        # one line with method
 
2138
        # one line with noeol
 
2139
        # one line with next ('' for None)
 
2140
        # one line with byte count of the record bytes
 
2141
        # the record bytes
 
2142
        for key, (record_bytes, (method, noeol), next) in \
 
2143
            self._raw_record_map.iteritems():
 
2144
            key_bytes = '\x00'.join(key)
 
2145
            parents = self.global_map.get(key, None)
 
2146
            if parents is None:
 
2147
                parent_bytes = 'None:'
 
2148
            else:
 
2149
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
2150
            method_bytes = method
 
2151
            if noeol:
 
2152
                noeol_bytes = "T"
 
2153
            else:
 
2154
                noeol_bytes = "F"
 
2155
            if next:
 
2156
                next_bytes = '\x00'.join(next)
 
2157
            else:
 
2158
                next_bytes = ''
 
2159
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2160
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2161
                len(record_bytes), record_bytes))
 
2162
        map_bytes = ''.join(map_byte_list)
 
2163
        lines.append(map_bytes)
 
2164
        bytes = '\n'.join(lines)
 
2165
        return bytes
 
2166
 
 
2167
 
 
2168
class _VFContentMapGenerator(_ContentMapGenerator):
 
2169
    """Content map generator reading from a VersionedFiles object."""
 
2170
 
 
2171
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2172
        global_map=None, raw_record_map=None):
 
2173
        """Create a _ContentMapGenerator.
 
2174
 
 
2175
        :param versioned_files: The versioned files that the texts are being
 
2176
            extracted from.
 
2177
        :param keys: The keys to produce content maps for.
 
2178
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2179
            which are known to not be in this knit, but rather in one of the
 
2180
            fallback knits.
 
2181
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2182
            This is required if get_record_stream() is to be used.
 
2183
        :param raw_record_map: A unparsed raw record map to use for answering
 
2184
            contents.
 
2185
        """
 
2186
        # The vf to source data from
 
2187
        self.vf = versioned_files
 
2188
        # The keys desired
 
2189
        self.keys = list(keys)
 
2190
        # Keys known to be in fallback vfs objects
 
2191
        if nonlocal_keys is None:
 
2192
            self.nonlocal_keys = set()
 
2193
        else:
 
2194
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2195
        # Parents data for keys to be returned in get_record_stream
 
2196
        self.global_map = global_map
 
2197
        # The chunked lists for self.keys in text form
 
2198
        self._text_map = {}
 
2199
        # A cache of KnitContent objects used in extracting texts.
 
2200
        self._contents_map = {}
 
2201
        # All the knit records needed to assemble the requested keys as full
 
2202
        # texts.
 
2203
        self._record_map = None
 
2204
        if raw_record_map is None:
 
2205
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2206
                allow_missing=True)
 
2207
        else:
 
2208
            self._raw_record_map = raw_record_map
 
2209
        # the factory for parsing records
 
2210
        self._factory = self.vf._factory
 
2211
 
 
2212
 
 
2213
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2214
    """Content map generator sourced from a network stream."""
 
2215
 
 
2216
    def __init__(self, bytes, line_end):
 
2217
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2218
        self._bytes = bytes
 
2219
        self.global_map = {}
 
2220
        self._raw_record_map = {}
 
2221
        self._contents_map = {}
 
2222
        self._record_map = None
 
2223
        self.nonlocal_keys = []
 
2224
        # Get access to record parsing facilities
 
2225
        self.vf = KnitVersionedFiles(None, None)
 
2226
        start = line_end
 
2227
        # Annotated or not
 
2228
        line_end = bytes.find('\n', start)
 
2229
        line = bytes[start:line_end]
 
2230
        start = line_end + 1
 
2231
        if line == 'annotated':
 
2232
            self._factory = KnitAnnotateFactory()
 
2233
        else:
 
2234
            self._factory = KnitPlainFactory()
 
2235
        # list of keys to emit in get_record_stream
 
2236
        line_end = bytes.find('\n', start)
 
2237
        line = bytes[start:line_end]
 
2238
        start = line_end + 1
 
2239
        self.keys = [
 
2240
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2241
            if segment]
 
2242
        # now a loop until the end. XXX: It would be nice if this was just a
 
2243
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2244
        # there is a decent sized gap stopping that at the moment.
 
2245
        end = len(bytes)
 
2246
        while start < end:
 
2247
            # 1 line with key
 
2248
            line_end = bytes.find('\n', start)
 
2249
            key = tuple(bytes[start:line_end].split('\x00'))
 
2250
            start = line_end + 1
 
2251
            # 1 line with parents (None: for None, '' for ())
 
2252
            line_end = bytes.find('\n', start)
 
2253
            line = bytes[start:line_end]
 
2254
            if line == 'None:':
 
2255
                parents = None
 
2256
            else:
 
2257
                parents = tuple(
 
2258
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2259
                     if segment])
 
2260
            self.global_map[key] = parents
 
2261
            start = line_end + 1
 
2262
            # one line with method
 
2263
            line_end = bytes.find('\n', start)
 
2264
            line = bytes[start:line_end]
 
2265
            method = line
 
2266
            start = line_end + 1
 
2267
            # one line with noeol
 
2268
            line_end = bytes.find('\n', start)
 
2269
            line = bytes[start:line_end]
 
2270
            noeol = line == "T"
 
2271
            start = line_end + 1
 
2272
            # one line with next ('' for None)
 
2273
            line_end = bytes.find('\n', start)
 
2274
            line = bytes[start:line_end]
 
2275
            if not line:
 
2276
                next = None
 
2277
            else:
 
2278
                next = tuple(bytes[start:line_end].split('\x00'))
 
2279
            start = line_end + 1
 
2280
            # one line with byte count of the record bytes
 
2281
            line_end = bytes.find('\n', start)
 
2282
            line = bytes[start:line_end]
 
2283
            count = int(line)
 
2284
            start = line_end + 1
 
2285
            # the record bytes
 
2286
            record_bytes = bytes[start:start+count]
 
2287
            start = start + count
 
2288
            # put it in the map
 
2289
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2290
 
 
2291
    def get_record_stream(self):
 
2292
        """Get a record stream for for keys requested by the bytestream."""
 
2293
        first = True
 
2294
        for key in self.keys:
 
2295
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2296
            first = False
 
2297
 
 
2298
    def _wire_bytes(self):
 
2299
        return self._bytes
 
2300
 
 
2301
 
1769
2302
class _KndxIndex(object):
1770
2303
    """Manages knit index files
1771
2304
 
1785
2318
 
1786
2319
    Duplicate entries may be written to the index for a single version id
1787
2320
    if this is done then the latter one completely replaces the former:
1788
 
    this allows updates to correct version and parent information. 
 
2321
    this allows updates to correct version and parent information.
1789
2322
    Note that the two entries may share the delta, and that successive
1790
2323
    annotations and references MUST point to the first entry.
1791
2324
 
1792
2325
    The index file on disc contains a header, followed by one line per knit
1793
2326
    record. The same revision can be present in an index file more than once.
1794
 
    The first occurrence gets assigned a sequence number starting from 0. 
1795
 
    
 
2327
    The first occurrence gets assigned a sequence number starting from 0.
 
2328
 
1796
2329
    The format of a single line is
1797
2330
    REVISION_ID FLAGS BYTE_OFFSET LENGTH( PARENT_ID|PARENT_SEQUENCE_ID)* :\n
1798
2331
    REVISION_ID is a utf8-encoded revision id
1799
 
    FLAGS is a comma separated list of flags about the record. Values include 
 
2332
    FLAGS is a comma separated list of flags about the record. Values include
1800
2333
        no-eol, line-delta, fulltext.
1801
2334
    BYTE_OFFSET is the ascii representation of the byte offset in the data file
1802
2335
        that the the compressed data starts at.
1806
2339
    PARENT_SEQUENCE_ID the ascii representation of the sequence number of a
1807
2340
        revision id already in the knit that is a parent of REVISION_ID.
1808
2341
    The ' :' marker is the end of record marker.
1809
 
    
 
2342
 
1810
2343
    partial writes:
1811
2344
    when a write is interrupted to the index file, it will result in a line
1812
2345
    that does not end in ' :'. If the ' :' is not present at the end of a line,
1837
2370
        self._reset_cache()
1838
2371
        self.has_graph = True
1839
2372
 
1840
 
    def add_records(self, records, random_id=False):
 
2373
    def add_records(self, records, random_id=False, missing_compression_parents=False):
1841
2374
        """Add multiple records to the index.
1842
 
        
 
2375
 
1843
2376
        :param records: a list of tuples:
1844
2377
                         (key, options, access_memo, parents).
1845
2378
        :param random_id: If True the ids being added were randomly generated
1846
2379
            and no check for existence will be performed.
 
2380
        :param missing_compression_parents: If True the records being added are
 
2381
            only compressed against texts already in the index (or inside
 
2382
            records). If False the records all refer to unavailable texts (or
 
2383
            texts inside records) as compression parents.
1847
2384
        """
 
2385
        if missing_compression_parents:
 
2386
            # It might be nice to get the edge of the records. But keys isn't
 
2387
            # _wrong_.
 
2388
            keys = sorted(record[0] for record in records)
 
2389
            raise errors.RevisionNotPresent(keys, self)
1848
2390
        paths = {}
1849
2391
        for record in records:
1850
2392
            key = record[0]
1867
2409
                    line = "\n%s %s %s %s %s :" % (
1868
2410
                        key[-1], ','.join(options), pos, size,
1869
2411
                        self._dictionary_compress(parents))
1870
 
                    if type(line) != str:
 
2412
                    if type(line) is not str:
1871
2413
                        raise AssertionError(
1872
2414
                            'data must be utf8 was %s' % type(line))
1873
2415
                    lines.append(line)
1881
2423
                self._kndx_cache[prefix] = (orig_cache, orig_history)
1882
2424
                raise
1883
2425
 
 
2426
    def scan_unvalidated_index(self, graph_index):
 
2427
        """See _KnitGraphIndex.scan_unvalidated_index."""
 
2428
        # Because kndx files do not support atomic insertion via separate index
 
2429
        # files, they do not support this method.
 
2430
        raise NotImplementedError(self.scan_unvalidated_index)
 
2431
 
 
2432
    def get_missing_compression_parents(self):
 
2433
        """See _KnitGraphIndex.get_missing_compression_parents."""
 
2434
        # Because kndx files do not support atomic insertion via separate index
 
2435
        # files, they do not support this method.
 
2436
        raise NotImplementedError(self.get_missing_compression_parents)
 
2437
 
1884
2438
    def _cache_key(self, key, options, pos, size, parent_keys):
1885
2439
        """Cache a version record in the history array and index cache.
1886
2440
 
2019
2573
 
2020
2574
    def get_position(self, key):
2021
2575
        """Return details needed to access the version.
2022
 
        
 
2576
 
2023
2577
        :return: a tuple (key, data position, size) to hand to the access
2024
2578
            logic to get the record.
2025
2579
        """
2029
2583
        return key, entry[2], entry[3]
2030
2584
 
2031
2585
    has_key = _mod_index._has_key_from_parent_map
2032
 
    
 
2586
 
2033
2587
    def _init_index(self, path, extra_lines=[]):
2034
2588
        """Initialize an index."""
2035
2589
        sio = StringIO()
2044
2598
 
2045
2599
    def keys(self):
2046
2600
        """Get all the keys in the collection.
2047
 
        
 
2601
 
2048
2602
        The keys are not ordered.
2049
2603
        """
2050
2604
        result = set()
2051
2605
        # Identify all key prefixes.
2052
2606
        # XXX: A bit hacky, needs polish.
2053
 
        if type(self._mapper) == ConstantMapper:
 
2607
        if type(self._mapper) is ConstantMapper:
2054
2608
            prefixes = [()]
2055
2609
        else:
2056
2610
            relpaths = set()
2063
2617
            for suffix in self._kndx_cache[prefix][1]:
2064
2618
                result.add(prefix + (suffix,))
2065
2619
        return result
2066
 
    
 
2620
 
2067
2621
    def _load_prefixes(self, prefixes):
2068
2622
        """Load the indices for prefixes."""
2069
2623
        self._check_read()
2088
2642
                    del self._history
2089
2643
                except NoSuchFile:
2090
2644
                    self._kndx_cache[prefix] = ({}, [])
2091
 
                    if type(self._mapper) == ConstantMapper:
 
2645
                    if type(self._mapper) is ConstantMapper:
2092
2646
                        # preserve behaviour for revisions.kndx etc.
2093
2647
                        self._init_index(path)
2094
2648
                    del self._cache
2107
2661
 
2108
2662
    def _dictionary_compress(self, keys):
2109
2663
        """Dictionary compress keys.
2110
 
        
 
2664
 
2111
2665
        :param keys: The keys to generate references to.
2112
2666
        :return: A string representation of keys. keys which are present are
2113
2667
            dictionary compressed, and others are emitted as fulltext with a
2161
2715
            return index_memo[0][:-1], index_memo[1]
2162
2716
        return keys.sort(key=get_sort_key)
2163
2717
 
 
2718
    _get_total_build_size = _get_total_build_size
 
2719
 
2164
2720
    def _split_key(self, key):
2165
2721
        """Split key into a prefix and suffix."""
2166
2722
        return key[:-1], key[-1]
2167
2723
 
2168
2724
 
 
2725
class _KeyRefs(object):
 
2726
 
 
2727
    def __init__(self):
 
2728
        # dict mapping 'key' to 'set of keys referring to that key'
 
2729
        self.refs = {}
 
2730
 
 
2731
    def add_references(self, key, refs):
 
2732
        # Record the new references
 
2733
        for referenced in refs:
 
2734
            try:
 
2735
                needed_by = self.refs[referenced]
 
2736
            except KeyError:
 
2737
                needed_by = self.refs[referenced] = set()
 
2738
            needed_by.add(key)
 
2739
        # Discard references satisfied by the new key
 
2740
        self.add_key(key)
 
2741
 
 
2742
    def get_unsatisfied_refs(self):
 
2743
        return self.refs.iterkeys()
 
2744
 
 
2745
    def add_key(self, key):
 
2746
        try:
 
2747
            del self.refs[key]
 
2748
        except KeyError:
 
2749
            # No keys depended on this key.  That's ok.
 
2750
            pass
 
2751
 
 
2752
    def add_keys(self, keys):
 
2753
        for key in keys:
 
2754
            self.add_key(key)
 
2755
 
 
2756
    def get_referrers(self):
 
2757
        result = set()
 
2758
        for referrers in self.refs.itervalues():
 
2759
            result.update(referrers)
 
2760
        return result
 
2761
 
 
2762
 
2169
2763
class _KnitGraphIndex(object):
2170
2764
    """A KnitVersionedFiles index layered on GraphIndex."""
2171
2765
 
2172
2766
    def __init__(self, graph_index, is_locked, deltas=False, parents=True,
2173
 
        add_callback=None):
 
2767
        add_callback=None, track_external_parent_refs=False):
2174
2768
        """Construct a KnitGraphIndex on a graph_index.
2175
2769
 
2176
2770
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
2177
2771
        :param is_locked: A callback to check whether the object should answer
2178
2772
            queries.
2179
2773
        :param deltas: Allow delta-compressed records.
2180
 
        :param parents: If True, record knits parents, if not do not record 
 
2774
        :param parents: If True, record knits parents, if not do not record
2181
2775
            parents.
2182
2776
        :param add_callback: If not None, allow additions to the index and call
2183
2777
            this callback with a list of added GraphIndex nodes:
2184
2778
            [(node, value, node_refs), ...]
2185
2779
        :param is_locked: A callback, returns True if the index is locked and
2186
2780
            thus usable.
 
2781
        :param track_external_parent_refs: If True, record all external parent
 
2782
            references parents from added records.  These can be retrieved
 
2783
            later by calling get_missing_parents().
2187
2784
        """
2188
2785
        self._add_callback = add_callback
2189
2786
        self._graph_index = graph_index
2196
2793
                "parent tracking.")
2197
2794
        self.has_graph = parents
2198
2795
        self._is_locked = is_locked
 
2796
        self._missing_compression_parents = set()
 
2797
        if track_external_parent_refs:
 
2798
            self._key_dependencies = _KeyRefs()
 
2799
        else:
 
2800
            self._key_dependencies = None
2199
2801
 
2200
2802
    def __repr__(self):
2201
2803
        return "%s(%r)" % (self.__class__.__name__, self._graph_index)
2202
2804
 
2203
 
    def add_records(self, records, random_id=False):
 
2805
    def add_records(self, records, random_id=False,
 
2806
        missing_compression_parents=False):
2204
2807
        """Add multiple records to the index.
2205
 
        
 
2808
 
2206
2809
        This function does not insert data into the Immutable GraphIndex
2207
2810
        backing the KnitGraphIndex, instead it prepares data for insertion by
2208
2811
        the caller and checks that it is safe to insert then calls
2212
2815
                         (key, options, access_memo, parents).
2213
2816
        :param random_id: If True the ids being added were randomly generated
2214
2817
            and no check for existence will be performed.
 
2818
        :param missing_compression_parents: If True the records being added are
 
2819
            only compressed against texts already in the index (or inside
 
2820
            records). If False the records all refer to unavailable texts (or
 
2821
            texts inside records) as compression parents.
2215
2822
        """
2216
2823
        if not self._add_callback:
2217
2824
            raise errors.ReadOnlyError(self)
2219
2826
        # anymore.
2220
2827
 
2221
2828
        keys = {}
 
2829
        compression_parents = set()
 
2830
        key_dependencies = self._key_dependencies
2222
2831
        for (key, options, access_memo, parents) in records:
2223
2832
            if self._parents:
2224
2833
                parents = tuple(parents)
 
2834
                if key_dependencies is not None:
 
2835
                    key_dependencies.add_references(key, parents)
2225
2836
            index, pos, size = access_memo
2226
2837
            if 'no-eol' in options:
2227
2838
                value = 'N'
2235
2846
                if self._deltas:
2236
2847
                    if 'line-delta' in options:
2237
2848
                        node_refs = (parents, (parents[0],))
 
2849
                        if missing_compression_parents:
 
2850
                            compression_parents.add(parents[0])
2238
2851
                    else:
2239
2852
                        node_refs = (parents, ())
2240
2853
                else:
2262
2875
            for key, (value, node_refs) in keys.iteritems():
2263
2876
                result.append((key, value))
2264
2877
        self._add_callback(result)
2265
 
        
 
2878
        if missing_compression_parents:
 
2879
            # This may appear to be incorrect (it does not check for
 
2880
            # compression parents that are in the existing graph index),
 
2881
            # but such records won't have been buffered, so this is
 
2882
            # actually correct: every entry when
 
2883
            # missing_compression_parents==True either has a missing parent, or
 
2884
            # a parent that is one of the keys in records.
 
2885
            compression_parents.difference_update(keys)
 
2886
            self._missing_compression_parents.update(compression_parents)
 
2887
        # Adding records may have satisfied missing compression parents.
 
2888
        self._missing_compression_parents.difference_update(keys)
 
2889
 
 
2890
    def scan_unvalidated_index(self, graph_index):
 
2891
        """Inform this _KnitGraphIndex that there is an unvalidated index.
 
2892
 
 
2893
        This allows this _KnitGraphIndex to keep track of any missing
 
2894
        compression parents we may want to have filled in to make those
 
2895
        indices valid.
 
2896
 
 
2897
        :param graph_index: A GraphIndex
 
2898
        """
 
2899
        if self._deltas:
 
2900
            new_missing = graph_index.external_references(ref_list_num=1)
 
2901
            new_missing.difference_update(self.get_parent_map(new_missing))
 
2902
            self._missing_compression_parents.update(new_missing)
 
2903
        if self._key_dependencies is not None:
 
2904
            # Add parent refs from graph_index (and discard parent refs that
 
2905
            # the graph_index has).
 
2906
            for node in graph_index.iter_all_entries():
 
2907
                self._key_dependencies.add_references(node[1], node[3][0])
 
2908
 
 
2909
    def get_missing_compression_parents(self):
 
2910
        """Return the keys of missing compression parents.
 
2911
 
 
2912
        Missing compression parents occur when a record stream was missing
 
2913
        basis texts, or a index was scanned that had missing basis texts.
 
2914
        """
 
2915
        return frozenset(self._missing_compression_parents)
 
2916
 
 
2917
    def get_missing_parents(self):
 
2918
        """Return the keys of missing parents."""
 
2919
        # If updating this, you should also update
 
2920
        # groupcompress._GCGraphIndex.get_missing_parents
 
2921
        # We may have false positives, so filter those out.
 
2922
        self._key_dependencies.add_keys(
 
2923
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
2924
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
2925
 
2266
2926
    def _check_read(self):
2267
2927
        """raise if reads are not permitted."""
2268
2928
        if not self._is_locked():
2328
2988
 
2329
2989
    def _get_entries(self, keys, check_present=False):
2330
2990
        """Get the entries for keys.
2331
 
        
 
2991
 
2332
2992
        :param keys: An iterable of index key tuples.
2333
2993
        """
2334
2994
        keys = set(keys)
2396
3056
 
2397
3057
    def get_position(self, key):
2398
3058
        """Return details needed to access the version.
2399
 
        
 
3059
 
2400
3060
        :return: a tuple (index, data position, size) to hand to the access
2401
3061
            logic to get the record.
2402
3062
        """
2407
3067
 
2408
3068
    def keys(self):
2409
3069
        """Get all the keys in the collection.
2410
 
        
 
3070
 
2411
3071
        The keys are not ordered.
2412
3072
        """
2413
3073
        self._check_read()
2414
3074
        return [node[1] for node in self._graph_index.iter_all_entries()]
2415
 
    
 
3075
 
2416
3076
    missing_keys = _mod_index._missing_keys_from_parent_map
2417
3077
 
2418
3078
    def _node_to_position(self, node):
2440
3100
            return positions[key][1]
2441
3101
        return keys.sort(key=get_index_memo)
2442
3102
 
 
3103
    _get_total_build_size = _get_total_build_size
 
3104
 
2443
3105
 
2444
3106
class _KnitKeyAccess(object):
2445
3107
    """Access to records in .knit files."""
2466
3128
            opaque index memo. For _KnitKeyAccess the memo is (key, pos,
2467
3129
            length), where the key is the record key.
2468
3130
        """
2469
 
        if type(raw_data) != str:
 
3131
        if type(raw_data) is not str:
2470
3132
            raise AssertionError(
2471
3133
                'data must be plain bytes was %s' % type(raw_data))
2472
3134
        result = []
2489
3151
            result.append((key, base, size))
2490
3152
        return result
2491
3153
 
 
3154
    def flush(self):
 
3155
        """Flush pending writes on this access object.
 
3156
        
 
3157
        For .knit files this is a no-op.
 
3158
        """
 
3159
        pass
 
3160
 
2492
3161
    def get_raw_records(self, memos_for_retrieval):
2493
3162
        """Get the raw bytes for a records.
2494
3163
 
2519
3188
class _DirectPackAccess(object):
2520
3189
    """Access to data in one or more packs with less translation."""
2521
3190
 
2522
 
    def __init__(self, index_to_packs, reload_func=None):
 
3191
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
2523
3192
        """Create a _DirectPackAccess object.
2524
3193
 
2525
3194
        :param index_to_packs: A dict mapping index objects to the transport
2532
3201
        self._write_index = None
2533
3202
        self._indices = index_to_packs
2534
3203
        self._reload_func = reload_func
 
3204
        self._flush_func = flush_func
2535
3205
 
2536
3206
    def add_raw_records(self, key_sizes, raw_data):
2537
3207
        """Add raw knit bytes to a storage area.
2547
3217
            length), where the index field is the write_index object supplied
2548
3218
            to the PackAccess object.
2549
3219
        """
2550
 
        if type(raw_data) != str:
 
3220
        if type(raw_data) is not str:
2551
3221
            raise AssertionError(
2552
3222
                'data must be plain bytes was %s' % type(raw_data))
2553
3223
        result = []
2559
3229
            result.append((self._write_index, p_offset, p_length))
2560
3230
        return result
2561
3231
 
 
3232
    def flush(self):
 
3233
        """Flush pending writes on this access object.
 
3234
 
 
3235
        This will flush any buffered writes to a NewPack.
 
3236
        """
 
3237
        if self._flush_func is not None:
 
3238
            self._flush_func()
 
3239
            
2562
3240
    def get_raw_records(self, memos_for_retrieval):
2563
3241
        """Get the raw bytes for a records.
2564
3242
 
2565
 
        :param memos_for_retrieval: An iterable containing the (index, pos, 
 
3243
        :param memos_for_retrieval: An iterable containing the (index, pos,
2566
3244
            length) memo for retrieving the bytes. The Pack access method
2567
3245
            looks up the pack to use for a given record in its index_to_pack
2568
3246
            map.
2765
3443
        fulltext.)
2766
3444
 
2767
3445
        :return: A list of (key, index_memo) records, suitable for
2768
 
            passing to read_records_iter to start reading in the raw data fro/
 
3446
            passing to read_records_iter to start reading in the raw data from
2769
3447
            the pack file.
2770
3448
        """
2771
3449
        if key in self._annotated_lines:
2908
3586
        """Create a heads provider for resolving ancestry issues."""
2909
3587
        if self._heads_provider is not None:
2910
3588
            return self._heads_provider
2911
 
        parent_provider = _mod_graph.DictParentsProvider(
2912
 
            self._revision_id_graph)
2913
 
        graph_obj = _mod_graph.Graph(parent_provider)
2914
 
        head_cache = _mod_graph.FrozenHeadsCache(graph_obj)
2915
 
        self._heads_provider = head_cache
2916
 
        return head_cache
 
3589
        self._heads_provider = _mod_graph.KnownGraph(self._revision_id_graph)
 
3590
        return self._heads_provider
2917
3591
 
2918
3592
    def annotate(self, key):
2919
3593
        """Return the annotated fulltext at the given key.
2942
3616
        being able to produce line deltas.
2943
3617
        """
2944
3618
        # TODO: this code generates a parent maps of present ancestors; it
2945
 
        # could be split out into a separate method, and probably should use
2946
 
        # iter_ancestry instead. -- mbp and robertc 20080704
 
3619
        #       could be split out into a separate method
 
3620
        #       -- mbp and robertc 20080704
2947
3621
        graph = _mod_graph.Graph(self._knit)
2948
 
        head_cache = _mod_graph.FrozenHeadsCache(graph)
2949
 
        search = graph._make_breadth_first_searcher([key])
2950
 
        keys = set()
2951
 
        while True:
2952
 
            try:
2953
 
                present, ghosts = search.next_with_ghosts()
2954
 
            except StopIteration:
2955
 
                break
2956
 
            keys.update(present)
2957
 
        parent_map = self._knit.get_parent_map(keys)
 
3622
        parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
 
3623
                          if v is not None)
 
3624
        if not parent_map:
 
3625
            raise errors.RevisionNotPresent(key, self)
 
3626
        keys = parent_map.keys()
 
3627
        heads_provider = _mod_graph.KnownGraph(parent_map)
2958
3628
        parent_cache = {}
2959
3629
        reannotate = annotate.reannotate
2960
3630
        for record in self._knit.get_record_stream(keys, 'topological', True):
2966
3636
            else:
2967
3637
                parent_lines = []
2968
3638
            parent_cache[key] = list(
2969
 
                reannotate(parent_lines, fulltext, key, None, head_cache))
 
3639
                reannotate(parent_lines, fulltext, key, None, heads_provider))
2970
3640
        try:
2971
3641
            return parent_cache[key]
2972
3642
        except KeyError, e: