~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2009-06-19 17:40:59 UTC
  • mto: This revision was merged to the branch mainline in revision 4466.
  • Revision ID: john@arbash-meinel.com-20090619174059-jzowjv0d86vzjg4m
Update the python code to do the same checking around known_parent_gdfos being present.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from itertools import izip
 
20
from cStringIO import StringIO
19
21
import time
20
22
import zlib
21
23
try:
26
28
from bzrlib import (
27
29
    annotate,
28
30
    debug,
 
31
    diff,
29
32
    errors,
30
33
    graph as _mod_graph,
31
34
    knit,
32
35
    osutils,
33
36
    pack,
34
 
    static_tuple,
 
37
    patiencediff,
35
38
    trace,
36
39
    )
 
40
from bzrlib.graph import Graph
37
41
from bzrlib.btree_index import BTreeBuilder
38
42
from bzrlib.lru_cache import LRUSizeCache
39
43
from bzrlib.tsort import topo_sort
45
49
    VersionedFiles,
46
50
    )
47
51
 
48
 
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
 
# groupcompress blocks.
50
 
BATCH_SIZE = 2**16
51
 
 
52
52
_USE_LZMA = False and (pylzma is not None)
53
53
 
54
54
# osutils.sha_string('')
55
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
56
56
 
 
57
 
57
58
def sort_gc_optimal(parent_map):
58
59
    """Sort and group the keys in parent_map into groupcompress order.
59
60
 
65
66
    # groupcompress ordering is approximately reverse topological,
66
67
    # properly grouped by file-id.
67
68
    per_prefix_map = {}
68
 
    for key, value in parent_map.iteritems():
 
69
    for item in parent_map.iteritems():
 
70
        key = item[0]
69
71
        if isinstance(key, str) or len(key) == 1:
70
72
            prefix = ''
71
73
        else:
72
74
            prefix = key[0]
73
75
        try:
74
 
            per_prefix_map[prefix][key] = value
 
76
            per_prefix_map[prefix].append(item)
75
77
        except KeyError:
76
 
            per_prefix_map[prefix] = {key: value}
 
78
            per_prefix_map[prefix] = [item]
77
79
 
78
80
    present_keys = []
79
81
    for prefix in sorted(per_prefix_map):
101
103
    def __init__(self):
102
104
        # map by key? or just order in file?
103
105
        self._compressor_name = None
104
 
        self._z_content_chunks = None
 
106
        self._z_content = None
105
107
        self._z_content_decompressor = None
106
108
        self._z_content_length = None
107
109
        self._content_length = None
108
110
        self._content = None
109
 
        self._content_chunks = None
110
111
 
111
112
    def __len__(self):
112
113
        # This is the maximum number of bytes this object will reference if
120
121
        :param num_bytes: Ensure that we have extracted at least num_bytes of
121
122
            content. If None, consume everything
122
123
        """
123
 
        if self._content_length is None:
124
 
            raise AssertionError('self._content_length should never be None')
 
124
        # TODO: If we re-use the same content block at different times during
 
125
        #       get_record_stream(), it is possible that the first pass will
 
126
        #       get inserted, triggering an extract/_ensure_content() which
 
127
        #       will get rid of _z_content. And then the next use of the block
 
128
        #       will try to access _z_content (to send it over the wire), and
 
129
        #       fail because it is already extracted. Consider never releasing
 
130
        #       _z_content because of this.
125
131
        if num_bytes is None:
126
132
            num_bytes = self._content_length
127
133
        elif (self._content_length is not None
131
137
                % (num_bytes, self._content_length))
132
138
        # Expand the content if required
133
139
        if self._content is None:
134
 
            if self._content_chunks is not None:
135
 
                self._content = ''.join(self._content_chunks)
136
 
                self._content_chunks = None
137
 
        if self._content is None:
138
 
            # We join self._z_content_chunks here, because if we are
139
 
            # decompressing, then it is *very* likely that we have a single
140
 
            # chunk
141
 
            if self._z_content_chunks is None:
 
140
            if self._z_content is None:
142
141
                raise AssertionError('No content to decompress')
143
 
            z_content = ''.join(self._z_content_chunks)
144
 
            if z_content == '':
 
142
            if self._z_content == '':
145
143
                self._content = ''
146
144
            elif self._compressor_name == 'lzma':
147
145
                # We don't do partial lzma decomp yet
148
 
                self._content = pylzma.decompress(z_content)
 
146
                self._content = pylzma.decompress(self._z_content)
149
147
            elif self._compressor_name == 'zlib':
150
148
                # Start a zlib decompressor
151
 
                if num_bytes * 4 > self._content_length * 3:
152
 
                    # If we are requesting more that 3/4ths of the content,
153
 
                    # just extract the whole thing in a single pass
154
 
                    num_bytes = self._content_length
155
 
                    self._content = zlib.decompress(z_content)
 
149
                if num_bytes is None:
 
150
                    self._content = zlib.decompress(self._z_content)
156
151
                else:
157
152
                    self._z_content_decompressor = zlib.decompressobj()
158
153
                    # Seed the decompressor with the uncompressed bytes, so
159
154
                    # that the rest of the code is simplified
160
155
                    self._content = self._z_content_decompressor.decompress(
161
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
 
                    if not self._z_content_decompressor.unconsumed_tail:
163
 
                        self._z_content_decompressor = None
 
156
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
164
157
            else:
165
158
                raise AssertionError('Unknown compressor: %r'
166
159
                                     % self._compressor_name)
168
161
        # 'unconsumed_tail'
169
162
 
170
163
        # Do we have enough bytes already?
171
 
        if len(self._content) >= num_bytes:
 
164
        if num_bytes is not None and len(self._content) >= num_bytes:
 
165
            return
 
166
        if num_bytes is None and self._z_content_decompressor is None:
 
167
            # We must have already decompressed everything
172
168
            return
173
169
        # If we got this far, and don't have a decompressor, something is wrong
174
170
        if self._z_content_decompressor is None:
175
171
            raise AssertionError(
176
172
                'No decompressor to decompress %d bytes' % num_bytes)
177
173
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
 
        if not remaining_decomp:
179
 
            raise AssertionError('Nothing left to decompress')
180
 
        needed_bytes = num_bytes - len(self._content)
181
 
        # We always set max_size to 32kB over the minimum needed, so that
182
 
        # zlib will give us as much as we really want.
183
 
        # TODO: If this isn't good enough, we could make a loop here,
184
 
        #       that keeps expanding the request until we get enough
185
 
        self._content += self._z_content_decompressor.decompress(
186
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
 
        if len(self._content) < num_bytes:
188
 
            raise AssertionError('%d bytes wanted, only %d available'
189
 
                                 % (num_bytes, len(self._content)))
190
 
        if not self._z_content_decompressor.unconsumed_tail:
191
 
            # The stream is finished
192
 
            self._z_content_decompressor = None
 
174
        if num_bytes is None:
 
175
            if remaining_decomp:
 
176
                # We don't know how much is left, but we'll decompress it all
 
177
                self._content += self._z_content_decompressor.decompress(
 
178
                    remaining_decomp)
 
179
                # Note: There's what I consider a bug in zlib.decompressobj
 
180
                #       If you pass back in the entire unconsumed_tail, only
 
181
                #       this time you don't pass a max-size, it doesn't
 
182
                #       change the unconsumed_tail back to None/''.
 
183
                #       However, we know we are done with the whole stream
 
184
                self._z_content_decompressor = None
 
185
            # XXX: Why is this the only place in this routine we set this?
 
186
            self._content_length = len(self._content)
 
187
        else:
 
188
            if not remaining_decomp:
 
189
                raise AssertionError('Nothing left to decompress')
 
190
            needed_bytes = num_bytes - len(self._content)
 
191
            # We always set max_size to 32kB over the minimum needed, so that
 
192
            # zlib will give us as much as we really want.
 
193
            # TODO: If this isn't good enough, we could make a loop here,
 
194
            #       that keeps expanding the request until we get enough
 
195
            self._content += self._z_content_decompressor.decompress(
 
196
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
197
            if len(self._content) < num_bytes:
 
198
                raise AssertionError('%d bytes wanted, only %d available'
 
199
                                     % (num_bytes, len(self._content)))
 
200
            if not self._z_content_decompressor.unconsumed_tail:
 
201
                # The stream is finished
 
202
                self._z_content_decompressor = None
193
203
 
194
204
    def _parse_bytes(self, bytes, pos):
195
205
        """Read the various lengths from the header.
211
221
            # XXX: Define some GCCorrupt error ?
212
222
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
223
                                 (len(bytes), pos, self._z_content_length))
214
 
        self._z_content_chunks = (bytes[pos:],)
215
 
 
216
 
    @property
217
 
    def _z_content(self):
218
 
        """Return z_content_chunks as a simple string.
219
 
 
220
 
        Meant only to be used by the test suite.
221
 
        """
222
 
        if self._z_content_chunks is not None:
223
 
            return ''.join(self._z_content_chunks)
224
 
        return None
 
224
        self._z_content = bytes[pos:]
225
225
 
226
226
    @classmethod
227
227
    def from_bytes(cls, bytes):
273
273
            bytes = apply_delta_to_source(self._content, content_start, end)
274
274
        return bytes
275
275
 
276
 
    def set_chunked_content(self, content_chunks, length):
277
 
        """Set the content of this block to the given chunks."""
278
 
        # If we have lots of short lines, it is may be more efficient to join
279
 
        # the content ahead of time. If the content is <10MiB, we don't really
280
 
        # care about the extra memory consumption, so we can just pack it and
281
 
        # be done. However, timing showed 18s => 17.9s for repacking 1k revs of
282
 
        # mysql, which is below the noise margin
283
 
        self._content_length = length
284
 
        self._content_chunks = content_chunks
285
 
        self._content = None
286
 
        self._z_content_chunks = None
287
 
 
288
276
    def set_content(self, content):
289
277
        """Set the content of this block."""
290
278
        self._content_length = len(content)
291
279
        self._content = content
292
 
        self._z_content_chunks = None
293
 
 
294
 
    def _create_z_content_using_lzma(self):
295
 
        if self._content_chunks is not None:
296
 
            self._content = ''.join(self._content_chunks)
297
 
            self._content_chunks = None
298
 
        if self._content is None:
299
 
            raise AssertionError('Nothing to compress')
300
 
        z_content = pylzma.compress(self._content)
301
 
        self._z_content_chunks = (z_content,)
302
 
        self._z_content_length = len(z_content)
303
 
 
304
 
    def _create_z_content_from_chunks(self, chunks):
305
 
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
 
        # (measured peak is maybe 30MB over the above...)
308
 
        compressed_chunks = map(compressor.compress, chunks)
309
 
        compressed_chunks.append(compressor.flush())
310
 
        # Ignore empty chunks
311
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
312
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
313
 
 
314
 
    def _create_z_content(self):
315
 
        if self._z_content_chunks is not None:
316
 
            return
 
280
        self._z_content = None
 
281
 
 
282
    def to_bytes(self):
 
283
        """Encode the information into a byte stream."""
 
284
        compress = zlib.compress
317
285
        if _USE_LZMA:
318
 
            self._create_z_content_using_lzma()
319
 
            return
320
 
        if self._content_chunks is not None:
321
 
            chunks = self._content_chunks
322
 
        else:
323
 
            chunks = (self._content,)
324
 
        self._create_z_content_from_chunks(chunks)
325
 
 
326
 
    def to_chunks(self):
327
 
        """Create the byte stream as a series of 'chunks'"""
328
 
        self._create_z_content()
 
286
            compress = pylzma.compress
 
287
        if self._z_content is None:
 
288
            if self._content is None:
 
289
                raise AssertionError('Nothing to compress')
 
290
            self._z_content = compress(self._content)
 
291
            self._z_content_length = len(self._z_content)
329
292
        if _USE_LZMA:
330
293
            header = self.GCB_LZ_HEADER
331
294
        else:
332
295
            header = self.GCB_HEADER
333
 
        chunks = ['%s%d\n%d\n'
334
 
                  % (header, self._z_content_length, self._content_length),
 
296
        chunks = [header,
 
297
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
298
                  self._z_content,
335
299
                 ]
336
 
        chunks.extend(self._z_content_chunks)
337
 
        total_len = sum(map(len, chunks))
338
 
        return total_len, chunks
339
 
 
340
 
    def to_bytes(self):
341
 
        """Encode the information into a byte stream."""
342
 
        total_len, chunks = self.to_chunks()
343
300
        return ''.join(chunks)
344
301
 
345
302
    def _dump(self, include_text=False):
465
422
                # There are code paths that first extract as fulltext, and then
466
423
                # extract as storage_kind (smart fetch). So we don't break the
467
424
                # refcycle here, but instead in manager.get_record_stream()
 
425
                # self._manager = None
468
426
            if storage_kind == 'fulltext':
469
427
                return self._bytes
470
428
            else:
476
434
class _LazyGroupContentManager(object):
477
435
    """This manages a group of _LazyGroupCompressFactory objects."""
478
436
 
479
 
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
480
 
                             # current size, and still be considered
481
 
                             # resuable
482
 
    _full_block_size = 4*1024*1024
483
 
    _full_mixed_block_size = 2*1024*1024
484
 
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
 
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
486
 
 
487
437
    def __init__(self, block):
488
438
        self._block = block
489
439
        # We need to preserve the ordering
561
511
        # time (self._block._content) is a little expensive.
562
512
        self._block._ensure_content(self._last_byte)
563
513
 
564
 
    def _check_rebuild_action(self):
 
514
    def _check_rebuild_block(self):
565
515
        """Check to see if our block should be repacked."""
566
516
        total_bytes_used = 0
567
517
        last_byte_used = 0
568
518
        for factory in self._factories:
569
519
            total_bytes_used += factory._end - factory._start
570
 
            if last_byte_used < factory._end:
571
 
                last_byte_used = factory._end
572
 
        # If we are using more than half of the bytes from the block, we have
573
 
        # nothing else to check
 
520
            last_byte_used = max(last_byte_used, factory._end)
 
521
        # If we are using most of the bytes from the block, we have nothing
 
522
        # else to check (currently more that 1/2)
574
523
        if total_bytes_used * 2 >= self._block._content_length:
575
 
            return None, last_byte_used, total_bytes_used
576
 
        # We are using less than 50% of the content. Is the content we are
577
 
        # using at the beginning of the block? If so, we can just trim the
578
 
        # tail, rather than rebuilding from scratch.
 
524
            return
 
525
        # Can we just strip off the trailing bytes? If we are going to be
 
526
        # transmitting more than 50% of the front of the content, go ahead
579
527
        if total_bytes_used * 2 > last_byte_used:
580
 
            return 'trim', last_byte_used, total_bytes_used
 
528
            self._trim_block(last_byte_used)
 
529
            return
581
530
 
582
531
        # We are using a small amount of the data, and it isn't just packed
583
532
        # nicely at the front, so rebuild the content.
590
539
        #       expanding many deltas into fulltexts, as well.
591
540
        #       If we build a cheap enough 'strip', then we could try a strip,
592
541
        #       if that expands the content, we then rebuild.
593
 
        return 'rebuild', last_byte_used, total_bytes_used
594
 
 
595
 
    def check_is_well_utilized(self):
596
 
        """Is the current block considered 'well utilized'?
597
 
 
598
 
        This heuristic asks if the current block considers itself to be a fully
599
 
        developed group, rather than just a loose collection of data.
600
 
        """
601
 
        if len(self._factories) == 1:
602
 
            # A block of length 1 could be improved by combining with other
603
 
            # groups - don't look deeper. Even larger than max size groups
604
 
            # could compress well with adjacent versions of the same thing.
605
 
            return False
606
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
607
 
        block_size = self._block._content_length
608
 
        if total_bytes_used < block_size * self._max_cut_fraction:
609
 
            # This block wants to trim itself small enough that we want to
610
 
            # consider it under-utilized.
611
 
            return False
612
 
        # TODO: This code is meant to be the twin of _insert_record_stream's
613
 
        #       'start_new_block' logic. It would probably be better to factor
614
 
        #       out that logic into a shared location, so that it stays
615
 
        #       together better
616
 
        # We currently assume a block is properly utilized whenever it is >75%
617
 
        # of the size of a 'full' block. In normal operation, a block is
618
 
        # considered full when it hits 4MB of same-file content. So any block
619
 
        # >3MB is 'full enough'.
620
 
        # The only time this isn't true is when a given block has large-object
621
 
        # content. (a single file >4MB, etc.)
622
 
        # Under these circumstances, we allow a block to grow to
623
 
        # 2 x largest_content.  Which means that if a given block had a large
624
 
        # object, it may actually be under-utilized. However, given that this
625
 
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
626
 
        # content blobs on-the-fly. Note that because we return False for all
627
 
        # 1-item blobs, we will repack them; we may wish to reevaluate our
628
 
        # treatment of large object blobs in the future.
629
 
        if block_size >= self._full_enough_block_size:
630
 
            return True
631
 
        # If a block is <3MB, it still may be considered 'full' if it contains
632
 
        # mixed content. The current rule is 2MB of mixed content is considered
633
 
        # full. So check to see if this block contains mixed content, and
634
 
        # set the threshold appropriately.
635
 
        common_prefix = None
636
 
        for factory in self._factories:
637
 
            prefix = factory.key[:-1]
638
 
            if common_prefix is None:
639
 
                common_prefix = prefix
640
 
            elif prefix != common_prefix:
641
 
                # Mixed content, check the size appropriately
642
 
                if block_size >= self._full_enough_mixed_block_size:
643
 
                    return True
644
 
                break
645
 
        # The content failed both the mixed check and the single-content check
646
 
        # so obviously it is not fully utilized
647
 
        # TODO: there is one other constraint that isn't being checked
648
 
        #       namely, that the entries in the block are in the appropriate
649
 
        #       order. For example, you could insert the entries in exactly
650
 
        #       reverse groupcompress order, and we would think that is ok.
651
 
        #       (all the right objects are in one group, and it is fully
652
 
        #       utilized, etc.) For now, we assume that case is rare,
653
 
        #       especially since we should always fetch in 'groupcompress'
654
 
        #       order.
655
 
        return False
656
 
 
657
 
    def _check_rebuild_block(self):
658
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
659
 
        if action is None:
660
 
            return
661
 
        if action == 'trim':
662
 
            self._trim_block(last_byte_used)
663
 
        elif action == 'rebuild':
664
 
            self._rebuild_block()
665
 
        else:
666
 
            raise ValueError('unknown rebuild action: %r' % (action,))
 
542
        self._rebuild_block()
667
543
 
668
544
    def _wire_bytes(self):
669
545
        """Return a byte stream suitable for transmitting over the wire."""
703
579
        z_header_bytes = zlib.compress(header_bytes)
704
580
        del header_bytes
705
581
        z_header_bytes_len = len(z_header_bytes)
706
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
582
        block_bytes = self._block.to_bytes()
707
583
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
708
 
                                       block_bytes_len))
 
584
                                       len(block_bytes)))
709
585
        lines.append(z_header_bytes)
710
 
        lines.extend(block_chunks)
711
 
        del z_header_bytes, block_chunks
712
 
        # TODO: This is a point where we will double the memory consumption. To
713
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
586
        lines.append(block_bytes)
 
587
        del z_header_bytes, block_bytes
714
588
        return ''.join(lines)
715
589
 
716
590
    @classmethod
717
591
    def from_bytes(cls, bytes):
718
592
        # TODO: This does extra string copying, probably better to do it a
719
 
        #       different way. At a minimum this creates 2 copies of the
720
 
        #       compressed content
 
593
        #       different way
721
594
        (storage_kind, z_header_len, header_len,
722
595
         block_len, rest) = bytes.split('\n', 4)
723
596
        del bytes
881
754
 
882
755
        After calling this, the compressor should no longer be used
883
756
        """
884
 
        self._block.set_chunked_content(self.chunks, self.endpoint)
 
757
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
758
        #       group. This has an impact for 'commit' of large objects.
 
759
        #       One possibility is to use self._content_chunks, and be lazy and
 
760
        #       only fill out self._content as a full string when we actually
 
761
        #       need it. That would at least drop the peak memory consumption
 
762
        #       for 'commit' down to ~1x the size of the largest file, at a
 
763
        #       cost of increased complexity within this code. 2x is still <<
 
764
        #       3x the size of the largest file, so we are doing ok.
 
765
        content = ''.join(self.chunks)
885
766
        self.chunks = None
886
767
        self._delta_index = None
 
768
        self._block.set_content(content)
887
769
        return self._block
888
770
 
889
771
    def pop_last(self):
1023
905
        self.endpoint = endpoint
1024
906
 
1025
907
 
1026
 
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
 
908
def make_pack_factory(graph, delta, keylength):
1027
909
    """Create a factory for creating a pack based groupcompress.
1028
910
 
1029
911
    This is only functional enough to run interface tests, it doesn't try to
1044
926
        writer = pack.ContainerWriter(stream.write)
1045
927
        writer.begin()
1046
928
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1047
 
            add_callback=graph_index.add_nodes,
1048
 
            inconsistency_fatal=inconsistency_fatal)
 
929
            add_callback=graph_index.add_nodes)
1049
930
        access = knit._DirectPackAccess({})
1050
931
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1051
932
        result = GroupCompressVersionedFiles(index, access, delta)
1060
941
    versioned_files.stream.close()
1061
942
 
1062
943
 
1063
 
class _BatchingBlockFetcher(object):
1064
 
    """Fetch group compress blocks in batches.
1065
 
    
1066
 
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1067
 
        currently pending batch.
1068
 
    """
1069
 
 
1070
 
    def __init__(self, gcvf, locations):
1071
 
        self.gcvf = gcvf
1072
 
        self.locations = locations
1073
 
        self.keys = []
1074
 
        self.batch_memos = {}
1075
 
        self.memos_to_get = []
1076
 
        self.total_bytes = 0
1077
 
        self.last_read_memo = None
1078
 
        self.manager = None
1079
 
 
1080
 
    def add_key(self, key):
1081
 
        """Add another to key to fetch.
1082
 
        
1083
 
        :return: The estimated number of bytes needed to fetch the batch so
1084
 
            far.
1085
 
        """
1086
 
        self.keys.append(key)
1087
 
        index_memo, _, _, _ = self.locations[key]
1088
 
        read_memo = index_memo[0:3]
1089
 
        # Three possibilities for this read_memo:
1090
 
        #  - it's already part of this batch; or
1091
 
        #  - it's not yet part of this batch, but is already cached; or
1092
 
        #  - it's not yet part of this batch and will need to be fetched.
1093
 
        if read_memo in self.batch_memos:
1094
 
            # This read memo is already in this batch.
1095
 
            return self.total_bytes
1096
 
        try:
1097
 
            cached_block = self.gcvf._group_cache[read_memo]
1098
 
        except KeyError:
1099
 
            # This read memo is new to this batch, and the data isn't cached
1100
 
            # either.
1101
 
            self.batch_memos[read_memo] = None
1102
 
            self.memos_to_get.append(read_memo)
1103
 
            byte_length = read_memo[2]
1104
 
            self.total_bytes += byte_length
1105
 
        else:
1106
 
            # This read memo is new to this batch, but cached.
1107
 
            # Keep a reference to the cached block in batch_memos because it's
1108
 
            # certain that we'll use it when this batch is processed, but
1109
 
            # there's a risk that it would fall out of _group_cache between now
1110
 
            # and then.
1111
 
            self.batch_memos[read_memo] = cached_block
1112
 
        return self.total_bytes
1113
 
        
1114
 
    def _flush_manager(self):
1115
 
        if self.manager is not None:
1116
 
            for factory in self.manager.get_record_stream():
1117
 
                yield factory
1118
 
            self.manager = None
1119
 
            self.last_read_memo = None
1120
 
 
1121
 
    def yield_factories(self, full_flush=False):
1122
 
        """Yield factories for keys added since the last yield.  They will be
1123
 
        returned in the order they were added via add_key.
1124
 
        
1125
 
        :param full_flush: by default, some results may not be returned in case
1126
 
            they can be part of the next batch.  If full_flush is True, then
1127
 
            all results are returned.
1128
 
        """
1129
 
        if self.manager is None and not self.keys:
1130
 
            return
1131
 
        # Fetch all memos in this batch.
1132
 
        blocks = self.gcvf._get_blocks(self.memos_to_get)
1133
 
        # Turn blocks into factories and yield them.
1134
 
        memos_to_get_stack = list(self.memos_to_get)
1135
 
        memos_to_get_stack.reverse()
1136
 
        for key in self.keys:
1137
 
            index_memo, _, parents, _ = self.locations[key]
1138
 
            read_memo = index_memo[:3]
1139
 
            if self.last_read_memo != read_memo:
1140
 
                # We are starting a new block. If we have a
1141
 
                # manager, we have found everything that fits for
1142
 
                # now, so yield records
1143
 
                for factory in self._flush_manager():
1144
 
                    yield factory
1145
 
                # Now start a new manager.
1146
 
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1147
 
                    # The next block from _get_blocks will be the block we
1148
 
                    # need.
1149
 
                    block_read_memo, block = blocks.next()
1150
 
                    if block_read_memo != read_memo:
1151
 
                        raise AssertionError(
1152
 
                            "block_read_memo out of sync with read_memo"
1153
 
                            "(%r != %r)" % (block_read_memo, read_memo))
1154
 
                    self.batch_memos[read_memo] = block
1155
 
                    memos_to_get_stack.pop()
1156
 
                else:
1157
 
                    block = self.batch_memos[read_memo]
1158
 
                self.manager = _LazyGroupContentManager(block)
1159
 
                self.last_read_memo = read_memo
1160
 
            start, end = index_memo[3:5]
1161
 
            self.manager.add_factory(key, parents, start, end)
1162
 
        if full_flush:
1163
 
            for factory in self._flush_manager():
1164
 
                yield factory
1165
 
        del self.keys[:]
1166
 
        self.batch_memos.clear()
1167
 
        del self.memos_to_get[:]
1168
 
        self.total_bytes = 0
1169
 
 
1170
 
 
1171
944
class GroupCompressVersionedFiles(VersionedFiles):
1172
945
    """A group-compress based VersionedFiles implementation."""
1173
946
 
1174
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
947
    def __init__(self, index, access, delta=True):
1175
948
        """Create a GroupCompressVersionedFiles object.
1176
949
 
1177
950
        :param index: The index object storing access and graph data.
1178
951
        :param access: The access object storing raw data.
1179
952
        :param delta: Whether to delta compress or just entropy compress.
1180
 
        :param _unadded_refs: private parameter, don't use.
1181
953
        """
1182
954
        self._index = index
1183
955
        self._access = access
1184
956
        self._delta = delta
1185
 
        if _unadded_refs is None:
1186
 
            _unadded_refs = {}
1187
 
        self._unadded_refs = _unadded_refs
 
957
        self._unadded_refs = {}
1188
958
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
959
        self._fallback_vfs = []
1190
960
 
1191
 
    def without_fallbacks(self):
1192
 
        """Return a clone of this object without any fallbacks configured."""
1193
 
        return GroupCompressVersionedFiles(self._index, self._access,
1194
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
1195
 
 
1196
961
    def add_lines(self, key, parents, lines, parent_texts=None,
1197
962
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1198
963
        check_content=True):
1243
1008
                                               nostore_sha=nostore_sha))[0]
1244
1009
        return sha1, length, None
1245
1010
 
1246
 
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1247
 
        """See VersionedFiles._add_text()."""
1248
 
        self._index._check_write_ok()
1249
 
        self._check_add(key, None, random_id, check_content=False)
1250
 
        if text.__class__ is not str:
1251
 
            raise errors.BzrBadParameterUnicode("text")
1252
 
        if parents is None:
1253
 
            # The caller might pass None if there is no graph data, but kndx
1254
 
            # indexes can't directly store that, so we give them
1255
 
            # an empty tuple instead.
1256
 
            parents = ()
1257
 
        # double handling for now. Make it work until then.
1258
 
        length = len(text)
1259
 
        record = FulltextContentFactory(key, parents, None, text)
1260
 
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
1261
 
                                               nostore_sha=nostore_sha))[0]
1262
 
        return sha1, length, None
1263
 
 
1264
1011
    def add_fallback_versioned_files(self, a_versioned_files):
1265
1012
        """Add a source of texts for texts not present in this knit.
1266
1013
 
1270
1017
 
1271
1018
    def annotate(self, key):
1272
1019
        """See VersionedFiles.annotate."""
1273
 
        ann = annotate.Annotator(self)
1274
 
        return ann.annotate_flat(key)
1275
 
 
1276
 
    def get_annotator(self):
1277
 
        return annotate.Annotator(self)
1278
 
 
1279
 
    def check(self, progress_bar=None, keys=None):
 
1020
        graph = Graph(self)
 
1021
        parent_map = self.get_parent_map([key])
 
1022
        if not parent_map:
 
1023
            raise errors.RevisionNotPresent(key, self)
 
1024
        if parent_map[key] is not None:
 
1025
            parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
 
1026
                              if v is not None)
 
1027
            keys = parent_map.keys()
 
1028
        else:
 
1029
            keys = [key]
 
1030
            parent_map = {key:()}
 
1031
        # We used Graph(self) to load the parent_map, but now that we have it,
 
1032
        # we can just query the parent map directly, so create a KnownGraph
 
1033
        heads_provider = _mod_graph.KnownGraph(parent_map)
 
1034
        parent_cache = {}
 
1035
        reannotate = annotate.reannotate
 
1036
        for record in self.get_record_stream(keys, 'topological', True):
 
1037
            key = record.key
 
1038
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1039
            parent_lines = [parent_cache[parent] for parent in parent_map[key]]
 
1040
            parent_cache[key] = list(
 
1041
                reannotate(parent_lines, lines, key, None, heads_provider))
 
1042
        return parent_cache[key]
 
1043
 
 
1044
    def check(self, progress_bar=None):
1280
1045
        """See VersionedFiles.check()."""
1281
 
        if keys is None:
1282
 
            keys = self.keys()
1283
 
            for record in self.get_record_stream(keys, 'unordered', True):
1284
 
                record.get_bytes_as('fulltext')
1285
 
        else:
1286
 
            return self.get_record_stream(keys, 'unordered', True)
1287
 
 
1288
 
    def clear_cache(self):
1289
 
        """See VersionedFiles.clear_cache()"""
1290
 
        self._group_cache.clear()
1291
 
        self._index._graph_index.clear_cache()
1292
 
        self._index._int_cache.clear()
 
1046
        keys = self.keys()
 
1047
        for record in self.get_record_stream(keys, 'unordered', True):
 
1048
            record.get_bytes_as('fulltext')
1293
1049
 
1294
1050
    def _check_add(self, key, lines, random_id, check_content):
1295
1051
        """check that version_id and lines are safe to add."""
1306
1062
            self._check_lines_not_unicode(lines)
1307
1063
            self._check_lines_are_lines(lines)
1308
1064
 
1309
 
    def get_known_graph_ancestry(self, keys):
1310
 
        """Get a KnownGraph instance with the ancestry of keys."""
1311
 
        # Note that this is identical to
1312
 
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1313
 
        # ancestry.
1314
 
        parent_map, missing_keys = self._index.find_ancestry(keys)
1315
 
        for fallback in self._fallback_vfs:
1316
 
            if not missing_keys:
1317
 
                break
1318
 
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1319
 
                                                missing_keys)
1320
 
            parent_map.update(f_parent_map)
1321
 
            missing_keys = f_missing_keys
1322
 
        kg = _mod_graph.KnownGraph(parent_map)
1323
 
        return kg
1324
 
 
1325
1065
    def get_parent_map(self, keys):
1326
1066
        """Get a map of the graph parents of keys.
1327
1067
 
1354
1094
            missing.difference_update(set(new_result))
1355
1095
        return result, source_results
1356
1096
 
1357
 
    def _get_blocks(self, read_memos):
1358
 
        """Get GroupCompressBlocks for the given read_memos.
1359
 
 
1360
 
        :returns: a series of (read_memo, block) pairs, in the order they were
1361
 
            originally passed.
1362
 
        """
1363
 
        cached = {}
1364
 
        for read_memo in read_memos:
1365
 
            try:
1366
 
                block = self._group_cache[read_memo]
1367
 
            except KeyError:
1368
 
                pass
1369
 
            else:
1370
 
                cached[read_memo] = block
1371
 
        not_cached = []
1372
 
        not_cached_seen = set()
1373
 
        for read_memo in read_memos:
1374
 
            if read_memo in cached:
1375
 
                # Don't fetch what we already have
1376
 
                continue
1377
 
            if read_memo in not_cached_seen:
1378
 
                # Don't try to fetch the same data twice
1379
 
                continue
1380
 
            not_cached.append(read_memo)
1381
 
            not_cached_seen.add(read_memo)
1382
 
        raw_records = self._access.get_raw_records(not_cached)
1383
 
        for read_memo in read_memos:
1384
 
            try:
1385
 
                yield read_memo, cached[read_memo]
1386
 
            except KeyError:
1387
 
                # Read the block, and cache it.
1388
 
                zdata = raw_records.next()
1389
 
                block = GroupCompressBlock.from_bytes(zdata)
1390
 
                self._group_cache[read_memo] = block
1391
 
                cached[read_memo] = block
1392
 
                yield read_memo, block
 
1097
    def _get_block(self, index_memo):
 
1098
        read_memo = index_memo[0:3]
 
1099
        # get the group:
 
1100
        try:
 
1101
            block = self._group_cache[read_memo]
 
1102
        except KeyError:
 
1103
            # read the group
 
1104
            zdata = self._access.get_raw_records([read_memo]).next()
 
1105
            # decompress - whole thing - this is not a bug, as it
 
1106
            # permits caching. We might want to store the partially
 
1107
            # decompresed group and decompress object, so that recent
 
1108
            # texts are not penalised by big groups.
 
1109
            block = GroupCompressBlock.from_bytes(zdata)
 
1110
            self._group_cache[read_memo] = block
 
1111
        # cheapo debugging:
 
1112
        # print len(zdata), len(plain)
 
1113
        # parse - requires split_lines, better to have byte offsets
 
1114
        # here (but not by much - we only split the region for the
 
1115
        # recipe, and we often want to end up with lines anyway.
 
1116
        return block
1393
1117
 
1394
1118
    def get_missing_compression_parent_keys(self):
1395
1119
        """Return the keys of missing compression parents.
1561
1285
                unadded_keys, source_result)
1562
1286
        for key in missing:
1563
1287
            yield AbsentContentFactory(key)
1564
 
        # Batch up as many keys as we can until either:
1565
 
        #  - we encounter an unadded ref, or
1566
 
        #  - we run out of keys, or
1567
 
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1568
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1288
        manager = None
 
1289
        last_read_memo = None
 
1290
        # TODO: This works fairly well at batching up existing groups into a
 
1291
        #       streamable format, and possibly allowing for taking one big
 
1292
        #       group and splitting it when it isn't fully utilized.
 
1293
        #       However, it doesn't allow us to find under-utilized groups and
 
1294
        #       combine them into a bigger group on the fly.
 
1295
        #       (Consider the issue with how chk_map inserts texts
 
1296
        #       one-at-a-time.) This could be done at insert_record_stream()
 
1297
        #       time, but it probably would decrease the number of
 
1298
        #       bytes-on-the-wire for fetch.
1569
1299
        for source, keys in source_keys:
1570
1300
            if source is self:
1571
1301
                for key in keys:
1572
1302
                    if key in self._unadded_refs:
1573
 
                        # Flush batch, then yield unadded ref from
1574
 
                        # self._compressor.
1575
 
                        for factory in batcher.yield_factories(full_flush=True):
1576
 
                            yield factory
 
1303
                        if manager is not None:
 
1304
                            for factory in manager.get_record_stream():
 
1305
                                yield factory
 
1306
                            last_read_memo = manager = None
1577
1307
                        bytes, sha1 = self._compressor.extract(key)
1578
1308
                        parents = self._unadded_refs[key]
1579
1309
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1580
 
                        continue
1581
 
                    if batcher.add_key(key) > BATCH_SIZE:
1582
 
                        # Ok, this batch is big enough.  Yield some results.
1583
 
                        for factory in batcher.yield_factories():
1584
 
                            yield factory
 
1310
                    else:
 
1311
                        index_memo, _, parents, (method, _) = locations[key]
 
1312
                        read_memo = index_memo[0:3]
 
1313
                        if last_read_memo != read_memo:
 
1314
                            # We are starting a new block. If we have a
 
1315
                            # manager, we have found everything that fits for
 
1316
                            # now, so yield records
 
1317
                            if manager is not None:
 
1318
                                for factory in manager.get_record_stream():
 
1319
                                    yield factory
 
1320
                            # Now start a new manager
 
1321
                            block = self._get_block(index_memo)
 
1322
                            manager = _LazyGroupContentManager(block)
 
1323
                            last_read_memo = read_memo
 
1324
                        start, end = index_memo[3:5]
 
1325
                        manager.add_factory(key, parents, start, end)
1585
1326
            else:
1586
 
                for factory in batcher.yield_factories(full_flush=True):
1587
 
                    yield factory
 
1327
                if manager is not None:
 
1328
                    for factory in manager.get_record_stream():
 
1329
                        yield factory
 
1330
                    last_read_memo = manager = None
1588
1331
                for record in source.get_record_stream(keys, ordering,
1589
1332
                                                       include_delta_closure):
1590
1333
                    yield record
1591
 
        for factory in batcher.yield_factories(full_flush=True):
1592
 
            yield factory
 
1334
        if manager is not None:
 
1335
            for factory in manager.get_record_stream():
 
1336
                yield factory
1593
1337
 
1594
1338
    def get_sha1s(self, keys):
1595
1339
        """See VersionedFiles.get_sha1s()."""
1649
1393
        self._unadded_refs = {}
1650
1394
        keys_to_add = []
1651
1395
        def flush():
1652
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1653
 
            self._compressor = GroupCompressor()
1654
 
            # Note: At this point we still have 1 copy of the fulltext (in
1655
 
            #       record and the var 'bytes'), and this generates 2 copies of
1656
 
            #       the compressed text (one for bytes, one in chunks)
1657
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1658
 
            #       have to double compressed memory here
1659
 
            # TODO: Figure out how to indicate that we would be happy to free
1660
 
            #       the fulltext content at this point. Note that sometimes we
1661
 
            #       will want it later (streaming CHK pages), but most of the
1662
 
            #       time we won't (everything else)
1663
 
            bytes = ''.join(chunks)
1664
 
            del chunks
 
1396
            bytes = self._compressor.flush().to_bytes()
1665
1397
            index, start, length = self._access.add_raw_records(
1666
1398
                [(None, len(bytes))], bytes)[0]
1667
1399
            nodes = []
1670
1402
            self._index.add_records(nodes, random_id=random_id)
1671
1403
            self._unadded_refs = {}
1672
1404
            del keys_to_add[:]
 
1405
            self._compressor = GroupCompressor()
1673
1406
 
1674
1407
        last_prefix = None
1675
1408
        max_fulltext_len = 0
1679
1412
        block_length = None
1680
1413
        # XXX: TODO: remove this, it is just for safety checking for now
1681
1414
        inserted_keys = set()
1682
 
        reuse_this_block = reuse_blocks
1683
1415
        for record in stream:
1684
1416
            # Raise an error when a record is missing.
1685
1417
            if record.storage_kind == 'absent':
1693
1425
            if reuse_blocks:
1694
1426
                # If the reuse_blocks flag is set, check to see if we can just
1695
1427
                # copy a groupcompress block as-is.
1696
 
                # We only check on the first record (groupcompress-block) not
1697
 
                # on all of the (groupcompress-block-ref) entries.
1698
 
                # The reuse_this_block flag is then kept for as long as
1699
 
                if record.storage_kind == 'groupcompress-block':
1700
 
                    # Check to see if we really want to re-use this block
1701
 
                    insert_manager = record._manager
1702
 
                    reuse_this_block = insert_manager.check_is_well_utilized()
1703
 
            else:
1704
 
                reuse_this_block = False
1705
 
            if reuse_this_block:
1706
 
                # We still want to reuse this block
1707
1428
                if record.storage_kind == 'groupcompress-block':
1708
1429
                    # Insert the raw block into the target repo
1709
1430
                    insert_manager = record._manager
 
1431
                    insert_manager._check_rebuild_block()
1710
1432
                    bytes = record._manager._block.to_bytes()
1711
1433
                    _, start, length = self._access.add_raw_records(
1712
1434
                        [(None, len(bytes))], bytes)[0]
1717
1439
                                           'groupcompress-block-ref'):
1718
1440
                    if insert_manager is None:
1719
1441
                        raise AssertionError('No insert_manager set')
1720
 
                    if insert_manager is not record._manager:
1721
 
                        raise AssertionError('insert_manager does not match'
1722
 
                            ' the current record, we cannot be positive'
1723
 
                            ' that the appropriate content was inserted.'
1724
 
                            )
1725
1442
                    value = "%d %d %d %d" % (block_start, block_length,
1726
1443
                                             record._start, record._end)
1727
1444
                    nodes = [(record.key, value, (record.parents,))]
1777
1494
                key = record.key
1778
1495
            self._unadded_refs[key] = record.parents
1779
1496
            yield found_sha1
1780
 
            as_st = static_tuple.StaticTuple.from_sequence
1781
 
            if record.parents is not None:
1782
 
                parents = as_st([as_st(p) for p in record.parents])
1783
 
            else:
1784
 
                parents = None
1785
 
            refs = static_tuple.StaticTuple(parents)
1786
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1497
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1498
                (record.parents,)))
1787
1499
        if len(keys_to_add):
1788
1500
            flush()
1789
1501
        self._compressor = None
1809
1521
 
1810
1522
        :return: An iterator over (line, key).
1811
1523
        """
 
1524
        if pb is None:
 
1525
            pb = progress.DummyProgress()
1812
1526
        keys = set(keys)
1813
1527
        total = len(keys)
1814
1528
        # we don't care about inclusions, the caller cares.
1818
1532
            'unordered', True)):
1819
1533
            # XXX: todo - optimise to use less than full texts.
1820
1534
            key = record.key
1821
 
            if pb is not None:
1822
 
                pb.update('Walking content', key_idx, total)
 
1535
            pb.update('Walking content', key_idx, total)
1823
1536
            if record.storage_kind == 'absent':
1824
1537
                raise errors.RevisionNotPresent(key, self)
1825
1538
            lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1826
1539
            for line in lines:
1827
1540
                yield line, key
1828
 
        if pb is not None:
1829
 
            pb.update('Walking content', total, total)
 
1541
        pb.update('Walking content', total, total)
1830
1542
 
1831
1543
    def keys(self):
1832
1544
        """See VersionedFiles.keys."""
1839
1551
        return result
1840
1552
 
1841
1553
 
1842
 
class _GCBuildDetails(object):
1843
 
    """A blob of data about the build details.
1844
 
 
1845
 
    This stores the minimal data, which then allows compatibility with the old
1846
 
    api, without taking as much memory.
1847
 
    """
1848
 
 
1849
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
 
                 '_delta_end', '_parents')
1851
 
 
1852
 
    method = 'group'
1853
 
    compression_parent = None
1854
 
 
1855
 
    def __init__(self, parents, position_info):
1856
 
        self._parents = parents
1857
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1858
 
         self._delta_end) = position_info
1859
 
 
1860
 
    def __repr__(self):
1861
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1862
 
            self.index_memo, self._parents)
1863
 
 
1864
 
    @property
1865
 
    def index_memo(self):
1866
 
        return (self._index, self._group_start, self._group_end,
1867
 
                self._basis_end, self._delta_end)
1868
 
 
1869
 
    @property
1870
 
    def record_details(self):
1871
 
        return static_tuple.StaticTuple(self.method, None)
1872
 
 
1873
 
    def __getitem__(self, offset):
1874
 
        """Compatibility thunk to act like a tuple."""
1875
 
        if offset == 0:
1876
 
            return self.index_memo
1877
 
        elif offset == 1:
1878
 
            return self.compression_parent # Always None
1879
 
        elif offset == 2:
1880
 
            return self._parents
1881
 
        elif offset == 3:
1882
 
            return self.record_details
1883
 
        else:
1884
 
            raise IndexError('offset out of range')
1885
 
            
1886
 
    def __len__(self):
1887
 
        return 4
1888
 
 
1889
 
 
1890
1554
class _GCGraphIndex(object):
1891
1555
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1892
1556
 
1893
1557
    def __init__(self, graph_index, is_locked, parents=True,
1894
 
        add_callback=None, track_external_parent_refs=False,
1895
 
        inconsistency_fatal=True, track_new_keys=False):
 
1558
        add_callback=None, track_external_parent_refs=False):
1896
1559
        """Construct a _GCGraphIndex on a graph_index.
1897
1560
 
1898
1561
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1906
1569
        :param track_external_parent_refs: As keys are added, keep track of the
1907
1570
            keys they reference, so that we can query get_missing_parents(),
1908
1571
            etc.
1909
 
        :param inconsistency_fatal: When asked to add records that are already
1910
 
            present, and the details are inconsistent with the existing
1911
 
            record, raise an exception instead of warning (and skipping the
1912
 
            record).
1913
1572
        """
1914
1573
        self._add_callback = add_callback
1915
1574
        self._graph_index = graph_index
1916
1575
        self._parents = parents
1917
1576
        self.has_graph = parents
1918
1577
        self._is_locked = is_locked
1919
 
        self._inconsistency_fatal = inconsistency_fatal
1920
 
        # GroupCompress records tend to have the same 'group' start + offset
1921
 
        # repeated over and over, this creates a surplus of ints
1922
 
        self._int_cache = {}
1923
1578
        if track_external_parent_refs:
1924
 
            self._key_dependencies = knit._KeyRefs(
1925
 
                track_new_keys=track_new_keys)
 
1579
            self._key_dependencies = knit._KeyRefs()
1926
1580
        else:
1927
1581
            self._key_dependencies = None
1928
1582
 
1951
1605
                if refs:
1952
1606
                    for ref in refs:
1953
1607
                        if ref:
1954
 
                            raise errors.KnitCorrupt(self,
 
1608
                            raise KnitCorrupt(self,
1955
1609
                                "attempt to add node with parents "
1956
1610
                                "in parentless index.")
1957
1611
                    refs = ()
1961
1615
        if not random_id:
1962
1616
            present_nodes = self._get_entries(keys)
1963
1617
            for (index, key, value, node_refs) in present_nodes:
1964
 
                # Sometimes these are passed as a list rather than a tuple
1965
 
                node_refs = static_tuple.as_tuples(node_refs)
1966
 
                passed = static_tuple.as_tuples(keys[key])
1967
 
                if node_refs != passed[1]:
1968
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
1969
 
                    if self._inconsistency_fatal:
1970
 
                        raise errors.KnitCorrupt(self, "inconsistent details"
1971
 
                                                 " in add_records: %s" %
1972
 
                                                 details)
1973
 
                    else:
1974
 
                        trace.warning("inconsistent details in skipped"
1975
 
                                      " record: %s", details)
 
1618
                if node_refs != keys[key][1]:
 
1619
                    raise errors.KnitCorrupt(self, "inconsistent details in add_records"
 
1620
                        ": %s %s" % ((value, node_refs), keys[key]))
1976
1621
                del keys[key]
1977
1622
                changed = True
1978
1623
        if changed:
1985
1630
                    result.append((key, value))
1986
1631
            records = result
1987
1632
        key_dependencies = self._key_dependencies
1988
 
        if key_dependencies is not None:
1989
 
            if self._parents:
1990
 
                for key, value, refs in records:
1991
 
                    parents = refs[0]
1992
 
                    key_dependencies.add_references(key, parents)
1993
 
            else:
1994
 
                for key, value, refs in records:
1995
 
                    new_keys.add_key(key)
 
1633
        if key_dependencies is not None and self._parents:
 
1634
            for key, value, refs in records:
 
1635
                parents = refs[0]
 
1636
                key_dependencies.add_references(key, parents)
1996
1637
        self._add_callback(records)
1997
1638
 
1998
1639
    def _check_read(self):
2027
1668
        if check_present:
2028
1669
            missing_keys = keys.difference(found_keys)
2029
1670
            if missing_keys:
2030
 
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
2031
 
 
2032
 
    def find_ancestry(self, keys):
2033
 
        """See CombinedGraphIndex.find_ancestry"""
2034
 
        return self._graph_index.find_ancestry(keys, 0)
 
1671
                raise RevisionNotPresent(missing_keys.pop(), self)
2035
1672
 
2036
1673
    def get_parent_map(self, keys):
2037
1674
        """Get a map of the parents of keys.
2055
1692
        """Return the keys of missing parents."""
2056
1693
        # Copied from _KnitGraphIndex.get_missing_parents
2057
1694
        # We may have false positives, so filter those out.
2058
 
        self._key_dependencies.satisfy_refs_for_keys(
 
1695
        self._key_dependencies.add_keys(
2059
1696
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
2060
1697
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
2061
1698
 
2087
1724
                parents = None
2088
1725
            else:
2089
1726
                parents = entry[3][0]
2090
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2091
 
            result[key] = details
 
1727
            method = 'group'
 
1728
            result[key] = (self._node_to_position(entry),
 
1729
                                  None, parents, (method, None))
2092
1730
        return result
2093
1731
 
2094
1732
    def keys(self):
2103
1741
        """Convert an index value to position details."""
2104
1742
        bits = node[2].split(' ')
2105
1743
        # It would be nice not to read the entire gzip.
2106
 
        # start and stop are put into _int_cache because they are very common.
2107
 
        # They define the 'group' that an entry is in, and many groups can have
2108
 
        # thousands of objects.
2109
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2110
 
        # each, or about 7MB. Note that it might be even more when you consider
2111
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2112
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2113
 
        # a LIFO when re-using PyInt slots, which might cause more
2114
 
        # fragmentation.
2115
1744
        start = int(bits[0])
2116
 
        start = self._int_cache.setdefault(start, start)
2117
1745
        stop = int(bits[1])
2118
 
        stop = self._int_cache.setdefault(stop, stop)
2119
1746
        basis_end = int(bits[2])
2120
1747
        delta_end = int(bits[3])
2121
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2122
 
        # instance...
2123
 
        return (node[0], start, stop, basis_end, delta_end)
 
1748
        return node[0], start, stop, basis_end, delta_end
2124
1749
 
2125
1750
    def scan_unvalidated_index(self, graph_index):
2126
1751
        """Inform this _GCGraphIndex that there is an unvalidated index.
2127
1752
 
2128
1753
        This allows this _GCGraphIndex to keep track of any missing
2129
1754
        compression parents we may want to have filled in to make those
2130
 
        indices valid.  It also allows _GCGraphIndex to track any new keys.
 
1755
        indices valid.
2131
1756
 
2132
1757
        :param graph_index: A GraphIndex
2133
1758
        """
2134
 
        key_dependencies = self._key_dependencies
2135
 
        if key_dependencies is None:
2136
 
            return
2137
 
        for node in graph_index.iter_all_entries():
2138
 
            # Add parent refs from graph_index (and discard parent refs
2139
 
            # that the graph_index has).
2140
 
            key_dependencies.add_references(node[1], node[3][0])
 
1759
        if self._key_dependencies is not None:
 
1760
            # Add parent refs from graph_index (and discard parent refs that
 
1761
            # the graph_index has).
 
1762
            add_refs = self._key_dependencies.add_references
 
1763
            for node in graph_index.iter_all_entries():
 
1764
                add_refs(node[1], node[3][0])
 
1765
 
2141
1766
 
2142
1767
 
2143
1768
from bzrlib._groupcompress_py import (
2157
1782
        decode_base128_int,
2158
1783
        )
2159
1784
    GroupCompressor = PyrexGroupCompressor
2160
 
except ImportError, e:
2161
 
    osutils.failed_to_load_extension(e)
 
1785
except ImportError:
2162
1786
    GroupCompressor = PythonGroupCompressor
2163
1787