~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Tarmac
  • Author(s): Vincent Ladeuil
  • Date: 2017-01-30 14:42:05 UTC
  • mfrom: (6620.1.1 trunk)
  • Revision ID: tarmac-20170130144205-r8fh2xpmiuxyozpv
Merge  2.7 into trunk including fix for bug #1657238 [r=vila]

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
import zlib
21
 
try:
22
 
    import pylzma
23
 
except ImportError:
24
 
    pylzma = None
25
23
 
 
24
from bzrlib.lazy_import import lazy_import
 
25
lazy_import(globals(), """
26
26
from bzrlib import (
27
27
    annotate,
 
28
    config,
28
29
    debug,
29
30
    errors,
30
31
    graph as _mod_graph,
31
 
    knit,
32
32
    osutils,
33
33
    pack,
 
34
    static_tuple,
34
35
    trace,
 
36
    tsort,
35
37
    )
36
 
from bzrlib.graph import Graph
 
38
 
 
39
from bzrlib.repofmt import pack_repo
 
40
from bzrlib.i18n import gettext
 
41
""")
 
42
 
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
 
46
    _KeyRefs,
41
47
    adapter_registry,
42
48
    AbsentContentFactory,
43
49
    ChunkedContentFactory,
44
50
    FulltextContentFactory,
45
 
    VersionedFiles,
 
51
    VersionedFilesWithFallbacks,
46
52
    )
47
53
 
48
 
_USE_LZMA = False and (pylzma is not None)
 
54
# Minimum number of uncompressed bytes to try fetch at once when retrieving
 
55
# groupcompress blocks.
 
56
BATCH_SIZE = 2**16
49
57
 
50
58
# osutils.sha_string('')
51
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
52
60
 
53
 
 
54
61
def sort_gc_optimal(parent_map):
55
62
    """Sort and group the keys in parent_map into groupcompress order.
56
63
 
74
81
 
75
82
    present_keys = []
76
83
    for prefix in sorted(per_prefix_map):
77
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
84
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
78
85
    return present_keys
79
86
 
80
87
 
98
105
    def __init__(self):
99
106
        # map by key? or just order in file?
100
107
        self._compressor_name = None
101
 
        self._z_content = None
 
108
        self._z_content_chunks = None
102
109
        self._z_content_decompressor = None
103
110
        self._z_content_length = None
104
111
        self._content_length = None
117
124
        :param num_bytes: Ensure that we have extracted at least num_bytes of
118
125
            content. If None, consume everything
119
126
        """
120
 
        # TODO: If we re-use the same content block at different times during
121
 
        #       get_record_stream(), it is possible that the first pass will
122
 
        #       get inserted, triggering an extract/_ensure_content() which
123
 
        #       will get rid of _z_content. And then the next use of the block
124
 
        #       will try to access _z_content (to send it over the wire), and
125
 
        #       fail because it is already extracted. Consider never releasing
126
 
        #       _z_content because of this.
 
127
        if self._content_length is None:
 
128
            raise AssertionError('self._content_length should never be None')
127
129
        if num_bytes is None:
128
130
            num_bytes = self._content_length
129
131
        elif (self._content_length is not None
137
139
                self._content = ''.join(self._content_chunks)
138
140
                self._content_chunks = None
139
141
        if self._content is None:
140
 
            if self._z_content is None:
 
142
            # We join self._z_content_chunks here, because if we are
 
143
            # decompressing, then it is *very* likely that we have a single
 
144
            # chunk
 
145
            if self._z_content_chunks is None:
141
146
                raise AssertionError('No content to decompress')
142
 
            if self._z_content == '':
 
147
            z_content = ''.join(self._z_content_chunks)
 
148
            if z_content == '':
143
149
                self._content = ''
144
150
            elif self._compressor_name == 'lzma':
145
151
                # We don't do partial lzma decomp yet
146
 
                self._content = pylzma.decompress(self._z_content)
 
152
                import pylzma
 
153
                self._content = pylzma.decompress(z_content)
147
154
            elif self._compressor_name == 'zlib':
148
155
                # Start a zlib decompressor
149
 
                if num_bytes is None:
150
 
                    self._content = zlib.decompress(self._z_content)
 
156
                if num_bytes * 4 > self._content_length * 3:
 
157
                    # If we are requesting more that 3/4ths of the content,
 
158
                    # just extract the whole thing in a single pass
 
159
                    num_bytes = self._content_length
 
160
                    self._content = zlib.decompress(z_content)
151
161
                else:
152
162
                    self._z_content_decompressor = zlib.decompressobj()
153
163
                    # Seed the decompressor with the uncompressed bytes, so
154
164
                    # that the rest of the code is simplified
155
165
                    self._content = self._z_content_decompressor.decompress(
156
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
166
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
167
                    if not self._z_content_decompressor.unconsumed_tail:
 
168
                        self._z_content_decompressor = None
157
169
            else:
158
170
                raise AssertionError('Unknown compressor: %r'
159
171
                                     % self._compressor_name)
161
173
        # 'unconsumed_tail'
162
174
 
163
175
        # Do we have enough bytes already?
164
 
        if num_bytes is not None and len(self._content) >= num_bytes:
165
 
            return
166
 
        if num_bytes is None and self._z_content_decompressor is None:
167
 
            # We must have already decompressed everything
 
176
        if len(self._content) >= num_bytes:
168
177
            return
169
178
        # If we got this far, and don't have a decompressor, something is wrong
170
179
        if self._z_content_decompressor is None:
171
180
            raise AssertionError(
172
181
                'No decompressor to decompress %d bytes' % num_bytes)
173
182
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
 
        if num_bytes is None:
175
 
            if remaining_decomp:
176
 
                # We don't know how much is left, but we'll decompress it all
177
 
                self._content += self._z_content_decompressor.decompress(
178
 
                    remaining_decomp)
179
 
                # Note: There's what I consider a bug in zlib.decompressobj
180
 
                #       If you pass back in the entire unconsumed_tail, only
181
 
                #       this time you don't pass a max-size, it doesn't
182
 
                #       change the unconsumed_tail back to None/''.
183
 
                #       However, we know we are done with the whole stream
184
 
                self._z_content_decompressor = None
185
 
            # XXX: Why is this the only place in this routine we set this?
186
 
            self._content_length = len(self._content)
187
 
        else:
188
 
            if not remaining_decomp:
189
 
                raise AssertionError('Nothing left to decompress')
190
 
            needed_bytes = num_bytes - len(self._content)
191
 
            # We always set max_size to 32kB over the minimum needed, so that
192
 
            # zlib will give us as much as we really want.
193
 
            # TODO: If this isn't good enough, we could make a loop here,
194
 
            #       that keeps expanding the request until we get enough
195
 
            self._content += self._z_content_decompressor.decompress(
196
 
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
197
 
            if len(self._content) < num_bytes:
198
 
                raise AssertionError('%d bytes wanted, only %d available'
199
 
                                     % (num_bytes, len(self._content)))
200
 
            if not self._z_content_decompressor.unconsumed_tail:
201
 
                # The stream is finished
202
 
                self._z_content_decompressor = None
 
183
        if not remaining_decomp:
 
184
            raise AssertionError('Nothing left to decompress')
 
185
        needed_bytes = num_bytes - len(self._content)
 
186
        # We always set max_size to 32kB over the minimum needed, so that
 
187
        # zlib will give us as much as we really want.
 
188
        # TODO: If this isn't good enough, we could make a loop here,
 
189
        #       that keeps expanding the request until we get enough
 
190
        self._content += self._z_content_decompressor.decompress(
 
191
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
192
        if len(self._content) < num_bytes:
 
193
            raise AssertionError('%d bytes wanted, only %d available'
 
194
                                 % (num_bytes, len(self._content)))
 
195
        if not self._z_content_decompressor.unconsumed_tail:
 
196
            # The stream is finished
 
197
            self._z_content_decompressor = None
203
198
 
204
199
    def _parse_bytes(self, bytes, pos):
205
200
        """Read the various lengths from the header.
221
216
            # XXX: Define some GCCorrupt error ?
222
217
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
223
218
                                 (len(bytes), pos, self._z_content_length))
224
 
        self._z_content = bytes[pos:]
 
219
        self._z_content_chunks = (bytes[pos:],)
 
220
 
 
221
    @property
 
222
    def _z_content(self):
 
223
        """Return z_content_chunks as a simple string.
 
224
 
 
225
        Meant only to be used by the test suite.
 
226
        """
 
227
        if self._z_content_chunks is not None:
 
228
            return ''.join(self._z_content_chunks)
 
229
        return None
225
230
 
226
231
    @classmethod
227
232
    def from_bytes(cls, bytes):
283
288
        self._content_length = length
284
289
        self._content_chunks = content_chunks
285
290
        self._content = None
286
 
        self._z_content = None
 
291
        self._z_content_chunks = None
287
292
 
288
293
    def set_content(self, content):
289
294
        """Set the content of this block."""
290
295
        self._content_length = len(content)
291
296
        self._content = content
292
 
        self._z_content = None
293
 
 
294
 
    def _create_z_content_using_lzma(self):
295
 
        if self._content_chunks is not None:
296
 
            self._content = ''.join(self._content_chunks)
297
 
            self._content_chunks = None
298
 
        if self._content is None:
299
 
            raise AssertionError('Nothing to compress')
300
 
        self._z_content = pylzma.compress(self._content)
301
 
        self._z_content_length = len(self._z_content)
302
 
 
303
 
    def _create_z_content_from_chunks(self):
 
297
        self._z_content_chunks = None
 
298
 
 
299
    def _create_z_content_from_chunks(self, chunks):
304
300
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
305
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
301
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
302
        # (measured peak is maybe 30MB over the above...)
 
303
        compressed_chunks = map(compressor.compress, chunks)
306
304
        compressed_chunks.append(compressor.flush())
307
 
        self._z_content = ''.join(compressed_chunks)
308
 
        self._z_content_length = len(self._z_content)
 
305
        # Ignore empty chunks
 
306
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
307
        self._z_content_length = sum(map(len, self._z_content_chunks))
309
308
 
310
309
    def _create_z_content(self):
311
 
        if self._z_content is not None:
312
 
            return
313
 
        if _USE_LZMA:
314
 
            self._create_z_content_using_lzma()
 
310
        if self._z_content_chunks is not None:
315
311
            return
316
312
        if self._content_chunks is not None:
317
 
            self._create_z_content_from_chunks()
318
 
            return
319
 
        self._z_content = zlib.compress(self._content)
320
 
        self._z_content_length = len(self._z_content)
 
313
            chunks = self._content_chunks
 
314
        else:
 
315
            chunks = (self._content,)
 
316
        self._create_z_content_from_chunks(chunks)
 
317
 
 
318
    def to_chunks(self):
 
319
        """Create the byte stream as a series of 'chunks'"""
 
320
        self._create_z_content()
 
321
        header = self.GCB_HEADER
 
322
        chunks = ['%s%d\n%d\n'
 
323
                  % (header, self._z_content_length, self._content_length),
 
324
                 ]
 
325
        chunks.extend(self._z_content_chunks)
 
326
        total_len = sum(map(len, chunks))
 
327
        return total_len, chunks
321
328
 
322
329
    def to_bytes(self):
323
330
        """Encode the information into a byte stream."""
324
 
        self._create_z_content()
325
 
        if _USE_LZMA:
326
 
            header = self.GCB_LZ_HEADER
327
 
        else:
328
 
            header = self.GCB_HEADER
329
 
        chunks = [header,
330
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
331
 
                  self._z_content,
332
 
                 ]
 
331
        total_len, chunks = self.to_chunks()
333
332
        return ''.join(chunks)
334
333
 
335
334
    def _dump(self, include_text=False):
449
448
                # Grab and cache the raw bytes for this entry
450
449
                # and break the ref-cycle with _manager since we don't need it
451
450
                # anymore
452
 
                self._manager._prepare_for_extract()
 
451
                try:
 
452
                    self._manager._prepare_for_extract()
 
453
                except zlib.error as value:
 
454
                    raise errors.DecompressCorruption("zlib: " + str(value))
453
455
                block = self._manager._block
454
456
                self._bytes = block.extract(self.key, self._start, self._end)
455
457
                # There are code paths that first extract as fulltext, and then
456
458
                # extract as storage_kind (smart fetch). So we don't break the
457
459
                # refcycle here, but instead in manager.get_record_stream()
458
 
                # self._manager = None
459
460
            if storage_kind == 'fulltext':
460
461
                return self._bytes
461
462
            else:
467
468
class _LazyGroupContentManager(object):
468
469
    """This manages a group of _LazyGroupCompressFactory objects."""
469
470
 
470
 
    def __init__(self, block):
 
471
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
472
                             # current size, and still be considered
 
473
                             # resuable
 
474
    _full_block_size = 4*1024*1024
 
475
    _full_mixed_block_size = 2*1024*1024
 
476
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
477
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
478
 
 
479
    def __init__(self, block, get_compressor_settings=None):
471
480
        self._block = block
472
481
        # We need to preserve the ordering
473
482
        self._factories = []
474
483
        self._last_byte = 0
 
484
        self._get_settings = get_compressor_settings
 
485
        self._compressor_settings = None
 
486
 
 
487
    def _get_compressor_settings(self):
 
488
        if self._compressor_settings is not None:
 
489
            return self._compressor_settings
 
490
        settings = None
 
491
        if self._get_settings is not None:
 
492
            settings = self._get_settings()
 
493
        if settings is None:
 
494
            vf = GroupCompressVersionedFiles
 
495
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
496
        self._compressor_settings = settings
 
497
        return self._compressor_settings
475
498
 
476
499
    def add_factory(self, key, parents, start, end):
477
500
        if not self._factories:
510
533
        new_block.set_content(self._block._content[:last_byte])
511
534
        self._block = new_block
512
535
 
 
536
    def _make_group_compressor(self):
 
537
        return GroupCompressor(self._get_compressor_settings())
 
538
 
513
539
    def _rebuild_block(self):
514
540
        """Create a new GroupCompressBlock with only the referenced texts."""
515
 
        compressor = GroupCompressor()
 
541
        compressor = self._make_group_compressor()
516
542
        tstart = time.time()
517
543
        old_length = self._block._content_length
518
544
        end_point = 0
530
556
        #       block? It seems hard to come up with a method that it would
531
557
        #       expand, since we do full compression again. Perhaps based on a
532
558
        #       request that ends up poorly ordered?
 
559
        # TODO: If the content would have expanded, then we would want to
 
560
        #       handle a case where we need to split the block.
 
561
        #       Now that we have a user-tweakable option
 
562
        #       (max_bytes_to_index), it is possible that one person set it
 
563
        #       to a very low value, causing poor compression.
533
564
        delta = time.time() - tstart
534
565
        self._block = new_block
535
566
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
544
575
        # time (self._block._content) is a little expensive.
545
576
        self._block._ensure_content(self._last_byte)
546
577
 
547
 
    def _check_rebuild_block(self):
 
578
    def _check_rebuild_action(self):
548
579
        """Check to see if our block should be repacked."""
549
580
        total_bytes_used = 0
550
581
        last_byte_used = 0
551
582
        for factory in self._factories:
552
583
            total_bytes_used += factory._end - factory._start
553
 
            last_byte_used = max(last_byte_used, factory._end)
554
 
        # If we are using most of the bytes from the block, we have nothing
555
 
        # else to check (currently more that 1/2)
 
584
            if last_byte_used < factory._end:
 
585
                last_byte_used = factory._end
 
586
        # If we are using more than half of the bytes from the block, we have
 
587
        # nothing else to check
556
588
        if total_bytes_used * 2 >= self._block._content_length:
557
 
            return
558
 
        # Can we just strip off the trailing bytes? If we are going to be
559
 
        # transmitting more than 50% of the front of the content, go ahead
 
589
            return None, last_byte_used, total_bytes_used
 
590
        # We are using less than 50% of the content. Is the content we are
 
591
        # using at the beginning of the block? If so, we can just trim the
 
592
        # tail, rather than rebuilding from scratch.
560
593
        if total_bytes_used * 2 > last_byte_used:
561
 
            self._trim_block(last_byte_used)
562
 
            return
 
594
            return 'trim', last_byte_used, total_bytes_used
563
595
 
564
596
        # We are using a small amount of the data, and it isn't just packed
565
597
        # nicely at the front, so rebuild the content.
572
604
        #       expanding many deltas into fulltexts, as well.
573
605
        #       If we build a cheap enough 'strip', then we could try a strip,
574
606
        #       if that expands the content, we then rebuild.
575
 
        self._rebuild_block()
 
607
        return 'rebuild', last_byte_used, total_bytes_used
 
608
 
 
609
    def check_is_well_utilized(self):
 
610
        """Is the current block considered 'well utilized'?
 
611
 
 
612
        This heuristic asks if the current block considers itself to be a fully
 
613
        developed group, rather than just a loose collection of data.
 
614
        """
 
615
        if len(self._factories) == 1:
 
616
            # A block of length 1 could be improved by combining with other
 
617
            # groups - don't look deeper. Even larger than max size groups
 
618
            # could compress well with adjacent versions of the same thing.
 
619
            return False
 
620
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
621
        block_size = self._block._content_length
 
622
        if total_bytes_used < block_size * self._max_cut_fraction:
 
623
            # This block wants to trim itself small enough that we want to
 
624
            # consider it under-utilized.
 
625
            return False
 
626
        # TODO: This code is meant to be the twin of _insert_record_stream's
 
627
        #       'start_new_block' logic. It would probably be better to factor
 
628
        #       out that logic into a shared location, so that it stays
 
629
        #       together better
 
630
        # We currently assume a block is properly utilized whenever it is >75%
 
631
        # of the size of a 'full' block. In normal operation, a block is
 
632
        # considered full when it hits 4MB of same-file content. So any block
 
633
        # >3MB is 'full enough'.
 
634
        # The only time this isn't true is when a given block has large-object
 
635
        # content. (a single file >4MB, etc.)
 
636
        # Under these circumstances, we allow a block to grow to
 
637
        # 2 x largest_content.  Which means that if a given block had a large
 
638
        # object, it may actually be under-utilized. However, given that this
 
639
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
 
640
        # content blobs on-the-fly. Note that because we return False for all
 
641
        # 1-item blobs, we will repack them; we may wish to reevaluate our
 
642
        # treatment of large object blobs in the future.
 
643
        if block_size >= self._full_enough_block_size:
 
644
            return True
 
645
        # If a block is <3MB, it still may be considered 'full' if it contains
 
646
        # mixed content. The current rule is 2MB of mixed content is considered
 
647
        # full. So check to see if this block contains mixed content, and
 
648
        # set the threshold appropriately.
 
649
        common_prefix = None
 
650
        for factory in self._factories:
 
651
            prefix = factory.key[:-1]
 
652
            if common_prefix is None:
 
653
                common_prefix = prefix
 
654
            elif prefix != common_prefix:
 
655
                # Mixed content, check the size appropriately
 
656
                if block_size >= self._full_enough_mixed_block_size:
 
657
                    return True
 
658
                break
 
659
        # The content failed both the mixed check and the single-content check
 
660
        # so obviously it is not fully utilized
 
661
        # TODO: there is one other constraint that isn't being checked
 
662
        #       namely, that the entries in the block are in the appropriate
 
663
        #       order. For example, you could insert the entries in exactly
 
664
        #       reverse groupcompress order, and we would think that is ok.
 
665
        #       (all the right objects are in one group, and it is fully
 
666
        #       utilized, etc.) For now, we assume that case is rare,
 
667
        #       especially since we should always fetch in 'groupcompress'
 
668
        #       order.
 
669
        return False
 
670
 
 
671
    def _check_rebuild_block(self):
 
672
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
673
        if action is None:
 
674
            return
 
675
        if action == 'trim':
 
676
            self._trim_block(last_byte_used)
 
677
        elif action == 'rebuild':
 
678
            self._rebuild_block()
 
679
        else:
 
680
            raise ValueError('unknown rebuild action: %r' % (action,))
576
681
 
577
682
    def _wire_bytes(self):
578
683
        """Return a byte stream suitable for transmitting over the wire."""
612
717
        z_header_bytes = zlib.compress(header_bytes)
613
718
        del header_bytes
614
719
        z_header_bytes_len = len(z_header_bytes)
615
 
        block_bytes = self._block.to_bytes()
 
720
        block_bytes_len, block_chunks = self._block.to_chunks()
616
721
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
617
 
                                       len(block_bytes)))
 
722
                                       block_bytes_len))
618
723
        lines.append(z_header_bytes)
619
 
        lines.append(block_bytes)
620
 
        del z_header_bytes, block_bytes
 
724
        lines.extend(block_chunks)
 
725
        del z_header_bytes, block_chunks
 
726
        # TODO: This is a point where we will double the memory consumption. To
 
727
        #       avoid this, we probably have to switch to a 'chunked' api
621
728
        return ''.join(lines)
622
729
 
623
730
    @classmethod
624
731
    def from_bytes(cls, bytes):
625
732
        # TODO: This does extra string copying, probably better to do it a
626
 
        #       different way
 
733
        #       different way. At a minimum this creates 2 copies of the
 
734
        #       compressed content
627
735
        (storage_kind, z_header_len, header_len,
628
736
         block_len, rest) = bytes.split('\n', 4)
629
737
        del bytes
681
789
 
682
790
class _CommonGroupCompressor(object):
683
791
 
684
 
    def __init__(self):
 
792
    def __init__(self, settings=None):
685
793
        """Create a GroupCompressor."""
686
794
        self.chunks = []
687
795
        self._last = None
690
798
        self.labels_deltas = {}
691
799
        self._delta_index = None # Set by the children
692
800
        self._block = GroupCompressBlock()
 
801
        if settings is None:
 
802
            self._settings = {}
 
803
        else:
 
804
            self._settings = settings
693
805
 
694
806
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
695
807
        """Compress lines with label key.
787
899
 
788
900
        After calling this, the compressor should no longer be used
789
901
        """
790
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
791
 
        #       group. This has an impact for 'commit' of large objects.
792
 
        #       One possibility is to use self._content_chunks, and be lazy and
793
 
        #       only fill out self._content as a full string when we actually
794
 
        #       need it. That would at least drop the peak memory consumption
795
 
        #       for 'commit' down to ~1x the size of the largest file, at a
796
 
        #       cost of increased complexity within this code. 2x is still <<
797
 
        #       3x the size of the largest file, so we are doing ok.
798
902
        self._block.set_chunked_content(self.chunks, self.endpoint)
799
903
        self.chunks = None
800
904
        self._delta_index = None
818
922
 
819
923
class PythonGroupCompressor(_CommonGroupCompressor):
820
924
 
821
 
    def __init__(self):
 
925
    def __init__(self, settings=None):
822
926
        """Create a GroupCompressor.
823
927
 
824
928
        Used only if the pyrex version is not available.
825
929
        """
826
 
        super(PythonGroupCompressor, self).__init__()
 
930
        super(PythonGroupCompressor, self).__init__(settings)
827
931
        self._delta_index = LinesDeltaIndex([])
828
932
        # The actual content is managed by LinesDeltaIndex
829
933
        self.chunks = self._delta_index.lines
866
970
 
867
971
    It contains code very similar to SequenceMatcher because of having a similar
868
972
    task. However some key differences apply:
869
 
     - there is no junk, we want a minimal edit not a human readable diff.
870
 
     - we don't filter very common lines (because we don't know where a good
871
 
       range will start, and after the first text we want to be emitting minmal
872
 
       edits only.
873
 
     - we chain the left side, not the right side
874
 
     - we incrementally update the adjacency matrix as new lines are provided.
875
 
     - we look for matches in all of the left side, so the routine which does
876
 
       the analagous task of find_longest_match does not need to filter on the
877
 
       left side.
 
973
 
 
974
    * there is no junk, we want a minimal edit not a human readable diff.
 
975
    * we don't filter very common lines (because we don't know where a good
 
976
      range will start, and after the first text we want to be emitting minmal
 
977
      edits only.
 
978
    * we chain the left side, not the right side
 
979
    * we incrementally update the adjacency matrix as new lines are provided.
 
980
    * we look for matches in all of the left side, so the routine which does
 
981
      the analagous task of find_longest_match does not need to filter on the
 
982
      left side.
878
983
    """
879
984
 
880
 
    def __init__(self):
881
 
        super(PyrexGroupCompressor, self).__init__()
882
 
        self._delta_index = DeltaIndex()
 
985
    def __init__(self, settings=None):
 
986
        super(PyrexGroupCompressor, self).__init__(settings)
 
987
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
988
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
883
989
 
884
990
    def _compress(self, key, bytes, max_delta_size, soft=False):
885
991
        """see _CommonGroupCompressor._compress"""
960
1066
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
961
1067
            add_callback=graph_index.add_nodes,
962
1068
            inconsistency_fatal=inconsistency_fatal)
963
 
        access = knit._DirectPackAccess({})
 
1069
        access = pack_repo._DirectPackAccess({})
964
1070
        access.set_writer(writer, graph_index, (transport, 'newpack'))
965
1071
        result = GroupCompressVersionedFiles(index, access, delta)
966
1072
        result.stream = stream
974
1080
    versioned_files.stream.close()
975
1081
 
976
1082
 
977
 
class GroupCompressVersionedFiles(VersionedFiles):
 
1083
class _BatchingBlockFetcher(object):
 
1084
    """Fetch group compress blocks in batches.
 
1085
 
 
1086
    :ivar total_bytes: int of expected number of bytes needed to fetch the
 
1087
        currently pending batch.
 
1088
    """
 
1089
 
 
1090
    def __init__(self, gcvf, locations, get_compressor_settings=None):
 
1091
        self.gcvf = gcvf
 
1092
        self.locations = locations
 
1093
        self.keys = []
 
1094
        self.batch_memos = {}
 
1095
        self.memos_to_get = []
 
1096
        self.total_bytes = 0
 
1097
        self.last_read_memo = None
 
1098
        self.manager = None
 
1099
        self._get_compressor_settings = get_compressor_settings
 
1100
 
 
1101
    def add_key(self, key):
 
1102
        """Add another to key to fetch.
 
1103
 
 
1104
        :return: The estimated number of bytes needed to fetch the batch so
 
1105
            far.
 
1106
        """
 
1107
        self.keys.append(key)
 
1108
        index_memo, _, _, _ = self.locations[key]
 
1109
        read_memo = index_memo[0:3]
 
1110
        # Three possibilities for this read_memo:
 
1111
        #  - it's already part of this batch; or
 
1112
        #  - it's not yet part of this batch, but is already cached; or
 
1113
        #  - it's not yet part of this batch and will need to be fetched.
 
1114
        if read_memo in self.batch_memos:
 
1115
            # This read memo is already in this batch.
 
1116
            return self.total_bytes
 
1117
        try:
 
1118
            cached_block = self.gcvf._group_cache[read_memo]
 
1119
        except KeyError:
 
1120
            # This read memo is new to this batch, and the data isn't cached
 
1121
            # either.
 
1122
            self.batch_memos[read_memo] = None
 
1123
            self.memos_to_get.append(read_memo)
 
1124
            byte_length = read_memo[2]
 
1125
            self.total_bytes += byte_length
 
1126
        else:
 
1127
            # This read memo is new to this batch, but cached.
 
1128
            # Keep a reference to the cached block in batch_memos because it's
 
1129
            # certain that we'll use it when this batch is processed, but
 
1130
            # there's a risk that it would fall out of _group_cache between now
 
1131
            # and then.
 
1132
            self.batch_memos[read_memo] = cached_block
 
1133
        return self.total_bytes
 
1134
 
 
1135
    def _flush_manager(self):
 
1136
        if self.manager is not None:
 
1137
            for factory in self.manager.get_record_stream():
 
1138
                yield factory
 
1139
            self.manager = None
 
1140
            self.last_read_memo = None
 
1141
 
 
1142
    def yield_factories(self, full_flush=False):
 
1143
        """Yield factories for keys added since the last yield.  They will be
 
1144
        returned in the order they were added via add_key.
 
1145
 
 
1146
        :param full_flush: by default, some results may not be returned in case
 
1147
            they can be part of the next batch.  If full_flush is True, then
 
1148
            all results are returned.
 
1149
        """
 
1150
        if self.manager is None and not self.keys:
 
1151
            return
 
1152
        # Fetch all memos in this batch.
 
1153
        blocks = self.gcvf._get_blocks(self.memos_to_get)
 
1154
        # Turn blocks into factories and yield them.
 
1155
        memos_to_get_stack = list(self.memos_to_get)
 
1156
        memos_to_get_stack.reverse()
 
1157
        for key in self.keys:
 
1158
            index_memo, _, parents, _ = self.locations[key]
 
1159
            read_memo = index_memo[:3]
 
1160
            if self.last_read_memo != read_memo:
 
1161
                # We are starting a new block. If we have a
 
1162
                # manager, we have found everything that fits for
 
1163
                # now, so yield records
 
1164
                for factory in self._flush_manager():
 
1165
                    yield factory
 
1166
                # Now start a new manager.
 
1167
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
 
1168
                    # The next block from _get_blocks will be the block we
 
1169
                    # need.
 
1170
                    block_read_memo, block = blocks.next()
 
1171
                    if block_read_memo != read_memo:
 
1172
                        raise AssertionError(
 
1173
                            "block_read_memo out of sync with read_memo"
 
1174
                            "(%r != %r)" % (block_read_memo, read_memo))
 
1175
                    self.batch_memos[read_memo] = block
 
1176
                    memos_to_get_stack.pop()
 
1177
                else:
 
1178
                    block = self.batch_memos[read_memo]
 
1179
                self.manager = _LazyGroupContentManager(block,
 
1180
                    get_compressor_settings=self._get_compressor_settings)
 
1181
                self.last_read_memo = read_memo
 
1182
            start, end = index_memo[3:5]
 
1183
            self.manager.add_factory(key, parents, start, end)
 
1184
        if full_flush:
 
1185
            for factory in self._flush_manager():
 
1186
                yield factory
 
1187
        del self.keys[:]
 
1188
        self.batch_memos.clear()
 
1189
        del self.memos_to_get[:]
 
1190
        self.total_bytes = 0
 
1191
 
 
1192
 
 
1193
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
978
1194
    """A group-compress based VersionedFiles implementation."""
979
1195
 
980
 
    def __init__(self, index, access, delta=True):
 
1196
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1197
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1198
    # However each of these references costs some memory in trade against a
 
1199
    # more accurate match result. For very large files, they either are
 
1200
    # pre-compressed and change in bulk whenever they change, or change in just
 
1201
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1202
    # versus running out of memory trying to track everything. The default max
 
1203
    # gives 100% sampling of a 1MB file.
 
1204
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1205
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1206
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
 
1207
 
 
1208
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1209
                 _group_cache=None):
981
1210
        """Create a GroupCompressVersionedFiles object.
982
1211
 
983
1212
        :param index: The index object storing access and graph data.
984
1213
        :param access: The access object storing raw data.
985
1214
        :param delta: Whether to delta compress or just entropy compress.
 
1215
        :param _unadded_refs: private parameter, don't use.
 
1216
        :param _group_cache: private parameter, don't use.
986
1217
        """
987
1218
        self._index = index
988
1219
        self._access = access
989
1220
        self._delta = delta
990
 
        self._unadded_refs = {}
991
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
992
 
        self._fallback_vfs = []
 
1221
        if _unadded_refs is None:
 
1222
            _unadded_refs = {}
 
1223
        self._unadded_refs = _unadded_refs
 
1224
        if _group_cache is None:
 
1225
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1226
        self._group_cache = _group_cache
 
1227
        self._immediate_fallback_vfs = []
 
1228
        self._max_bytes_to_index = None
 
1229
 
 
1230
    def without_fallbacks(self):
 
1231
        """Return a clone of this object without any fallbacks configured."""
 
1232
        return GroupCompressVersionedFiles(self._index, self._access,
 
1233
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1234
            _group_cache=self._group_cache)
993
1235
 
994
1236
    def add_lines(self, key, parents, lines, parent_texts=None,
995
1237
        left_matching_blocks=None, nostore_sha=None, random_id=False,
999
1241
        :param key: The key tuple of the text to add.
1000
1242
        :param parents: The parents key tuples of the text to add.
1001
1243
        :param lines: A list of lines. Each line must be a bytestring. And all
1002
 
            of them except the last must be terminated with \n and contain no
1003
 
            other \n's. The last line may either contain no \n's or a single
1004
 
            terminating \n. If the lines list does meet this constraint the add
1005
 
            routine may error or may succeed - but you will be unable to read
1006
 
            the data back accurately. (Checking the lines have been split
 
1244
            of them except the last must be terminated with \\n and contain no
 
1245
            other \\n's. The last line may either contain no \\n's or a single
 
1246
            terminating \\n. If the lines list does meet this constraint the
 
1247
            add routine may error or may succeed - but you will be unable to
 
1248
            read the data back accurately. (Checking the lines have been split
1007
1249
            correctly is expensive and extremely unlikely to catch bugs so it
1008
1250
            is not done at runtime unless check_content is True.)
1009
1251
        :param parent_texts: An optional dictionary containing the opaque
1064
1306
 
1065
1307
        :param a_versioned_files: A VersionedFiles object.
1066
1308
        """
1067
 
        self._fallback_vfs.append(a_versioned_files)
 
1309
        self._immediate_fallback_vfs.append(a_versioned_files)
1068
1310
 
1069
1311
    def annotate(self, key):
1070
1312
        """See VersionedFiles.annotate."""
1083
1325
        else:
1084
1326
            return self.get_record_stream(keys, 'unordered', True)
1085
1327
 
 
1328
    def clear_cache(self):
 
1329
        """See VersionedFiles.clear_cache()"""
 
1330
        self._group_cache.clear()
 
1331
        self._index._graph_index.clear_cache()
 
1332
        self._index._int_cache.clear()
 
1333
 
1086
1334
    def _check_add(self, key, lines, random_id, check_content):
1087
1335
        """check that version_id and lines are safe to add."""
1088
1336
        version_id = key[-1]
1098
1346
            self._check_lines_not_unicode(lines)
1099
1347
            self._check_lines_are_lines(lines)
1100
1348
 
1101
 
    def get_known_graph_ancestry(self, keys):
1102
 
        """Get a KnownGraph instance with the ancestry of keys."""
1103
 
        parent_map, missing_keys = self._index._graph_index.find_ancestry(keys,
1104
 
                                                                          0)
1105
 
        kg = _mod_graph.KnownGraph(parent_map)
1106
 
        return kg
1107
 
 
1108
1349
    def get_parent_map(self, keys):
1109
1350
        """Get a map of the graph parents of keys.
1110
1351
 
1125
1366
            and so on.
1126
1367
        """
1127
1368
        result = {}
1128
 
        sources = [self._index] + self._fallback_vfs
 
1369
        sources = [self._index] + self._immediate_fallback_vfs
1129
1370
        source_results = []
1130
1371
        missing = set(keys)
1131
1372
        for source in sources:
1137
1378
            missing.difference_update(set(new_result))
1138
1379
        return result, source_results
1139
1380
 
1140
 
    def _get_block(self, index_memo):
1141
 
        read_memo = index_memo[0:3]
1142
 
        # get the group:
1143
 
        try:
1144
 
            block = self._group_cache[read_memo]
1145
 
        except KeyError:
1146
 
            # read the group
1147
 
            zdata = self._access.get_raw_records([read_memo]).next()
1148
 
            # decompress - whole thing - this is not a bug, as it
1149
 
            # permits caching. We might want to store the partially
1150
 
            # decompresed group and decompress object, so that recent
1151
 
            # texts are not penalised by big groups.
1152
 
            block = GroupCompressBlock.from_bytes(zdata)
1153
 
            self._group_cache[read_memo] = block
1154
 
        # cheapo debugging:
1155
 
        # print len(zdata), len(plain)
1156
 
        # parse - requires split_lines, better to have byte offsets
1157
 
        # here (but not by much - we only split the region for the
1158
 
        # recipe, and we often want to end up with lines anyway.
1159
 
        return block
 
1381
    def _get_blocks(self, read_memos):
 
1382
        """Get GroupCompressBlocks for the given read_memos.
 
1383
 
 
1384
        :returns: a series of (read_memo, block) pairs, in the order they were
 
1385
            originally passed.
 
1386
        """
 
1387
        cached = {}
 
1388
        for read_memo in read_memos:
 
1389
            try:
 
1390
                block = self._group_cache[read_memo]
 
1391
            except KeyError:
 
1392
                pass
 
1393
            else:
 
1394
                cached[read_memo] = block
 
1395
        not_cached = []
 
1396
        not_cached_seen = set()
 
1397
        for read_memo in read_memos:
 
1398
            if read_memo in cached:
 
1399
                # Don't fetch what we already have
 
1400
                continue
 
1401
            if read_memo in not_cached_seen:
 
1402
                # Don't try to fetch the same data twice
 
1403
                continue
 
1404
            not_cached.append(read_memo)
 
1405
            not_cached_seen.add(read_memo)
 
1406
        raw_records = self._access.get_raw_records(not_cached)
 
1407
        for read_memo in read_memos:
 
1408
            try:
 
1409
                yield read_memo, cached[read_memo]
 
1410
            except KeyError:
 
1411
                # Read the block, and cache it.
 
1412
                zdata = raw_records.next()
 
1413
                block = GroupCompressBlock.from_bytes(zdata)
 
1414
                self._group_cache[read_memo] = block
 
1415
                cached[read_memo] = block
 
1416
                yield read_memo, block
1160
1417
 
1161
1418
    def get_missing_compression_parent_keys(self):
1162
1419
        """Return the keys of missing compression parents.
1216
1473
        parent_map = {}
1217
1474
        key_to_source_map = {}
1218
1475
        source_results = []
1219
 
        for source in self._fallback_vfs:
 
1476
        for source in self._immediate_fallback_vfs:
1220
1477
            if not missing:
1221
1478
                break
1222
1479
            source_parents = source.get_parent_map(missing)
1232
1489
 
1233
1490
        The returned objects should be in the order defined by 'ordering',
1234
1491
        which can weave between different sources.
 
1492
 
1235
1493
        :param ordering: Must be one of 'topological' or 'groupcompress'
1236
1494
        :return: List of [(source, [keys])] tuples, such that all keys are in
1237
1495
            the defined order, regardless of source.
1238
1496
        """
1239
1497
        if ordering == 'topological':
1240
 
            present_keys = topo_sort(parent_map)
 
1498
            present_keys = tsort.topo_sort(parent_map)
1241
1499
        else:
1242
1500
            # ordering == 'groupcompress'
1243
1501
            # XXX: This only optimizes for the target ordering. We may need
1328
1586
                unadded_keys, source_result)
1329
1587
        for key in missing:
1330
1588
            yield AbsentContentFactory(key)
1331
 
        manager = None
1332
 
        last_read_memo = None
1333
 
        # TODO: This works fairly well at batching up existing groups into a
1334
 
        #       streamable format, and possibly allowing for taking one big
1335
 
        #       group and splitting it when it isn't fully utilized.
1336
 
        #       However, it doesn't allow us to find under-utilized groups and
1337
 
        #       combine them into a bigger group on the fly.
1338
 
        #       (Consider the issue with how chk_map inserts texts
1339
 
        #       one-at-a-time.) This could be done at insert_record_stream()
1340
 
        #       time, but it probably would decrease the number of
1341
 
        #       bytes-on-the-wire for fetch.
 
1589
        # Batch up as many keys as we can until either:
 
1590
        #  - we encounter an unadded ref, or
 
1591
        #  - we run out of keys, or
 
1592
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
 
1593
        batcher = _BatchingBlockFetcher(self, locations,
 
1594
            get_compressor_settings=self._get_compressor_settings)
1342
1595
        for source, keys in source_keys:
1343
1596
            if source is self:
1344
1597
                for key in keys:
1345
1598
                    if key in self._unadded_refs:
1346
 
                        if manager is not None:
1347
 
                            for factory in manager.get_record_stream():
1348
 
                                yield factory
1349
 
                            last_read_memo = manager = None
 
1599
                        # Flush batch, then yield unadded ref from
 
1600
                        # self._compressor.
 
1601
                        for factory in batcher.yield_factories(full_flush=True):
 
1602
                            yield factory
1350
1603
                        bytes, sha1 = self._compressor.extract(key)
1351
1604
                        parents = self._unadded_refs[key]
1352
1605
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1353
 
                    else:
1354
 
                        index_memo, _, parents, (method, _) = locations[key]
1355
 
                        read_memo = index_memo[0:3]
1356
 
                        if last_read_memo != read_memo:
1357
 
                            # We are starting a new block. If we have a
1358
 
                            # manager, we have found everything that fits for
1359
 
                            # now, so yield records
1360
 
                            if manager is not None:
1361
 
                                for factory in manager.get_record_stream():
1362
 
                                    yield factory
1363
 
                            # Now start a new manager
1364
 
                            block = self._get_block(index_memo)
1365
 
                            manager = _LazyGroupContentManager(block)
1366
 
                            last_read_memo = read_memo
1367
 
                        start, end = index_memo[3:5]
1368
 
                        manager.add_factory(key, parents, start, end)
 
1606
                        continue
 
1607
                    if batcher.add_key(key) > BATCH_SIZE:
 
1608
                        # Ok, this batch is big enough.  Yield some results.
 
1609
                        for factory in batcher.yield_factories():
 
1610
                            yield factory
1369
1611
            else:
1370
 
                if manager is not None:
1371
 
                    for factory in manager.get_record_stream():
1372
 
                        yield factory
1373
 
                    last_read_memo = manager = None
 
1612
                for factory in batcher.yield_factories(full_flush=True):
 
1613
                    yield factory
1374
1614
                for record in source.get_record_stream(keys, ordering,
1375
1615
                                                       include_delta_closure):
1376
1616
                    yield record
1377
 
        if manager is not None:
1378
 
            for factory in manager.get_record_stream():
1379
 
                yield factory
 
1617
        for factory in batcher.yield_factories(full_flush=True):
 
1618
            yield factory
1380
1619
 
1381
1620
    def get_sha1s(self, keys):
1382
1621
        """See VersionedFiles.get_sha1s()."""
1404
1643
        for _ in self._insert_record_stream(stream, random_id=False):
1405
1644
            pass
1406
1645
 
 
1646
    def _get_compressor_settings(self):
 
1647
        if self._max_bytes_to_index is None:
 
1648
            # TODO: VersionedFiles don't know about their containing
 
1649
            #       repository, so they don't have much of an idea about their
 
1650
            #       location. So for now, this is only a global option.
 
1651
            c = config.GlobalConfig()
 
1652
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1653
            if val is not None:
 
1654
                try:
 
1655
                    val = int(val)
 
1656
                except ValueError, e:
 
1657
                    trace.warning('Value for '
 
1658
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1659
                                  ' %r is not an integer'
 
1660
                                  % (val,))
 
1661
                    val = None
 
1662
            if val is None:
 
1663
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1664
            self._max_bytes_to_index = val
 
1665
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1666
 
 
1667
    def _make_group_compressor(self):
 
1668
        return GroupCompressor(self._get_compressor_settings())
 
1669
 
1407
1670
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1408
1671
                              reuse_blocks=True):
1409
1672
        """Internal core to insert a record stream into this container.
1432
1695
                return adapter
1433
1696
        # This will go up to fulltexts for gc to gc fetching, which isn't
1434
1697
        # ideal.
1435
 
        self._compressor = GroupCompressor()
 
1698
        self._compressor = self._make_group_compressor()
1436
1699
        self._unadded_refs = {}
1437
1700
        keys_to_add = []
1438
1701
        def flush():
1439
 
            bytes = self._compressor.flush().to_bytes()
 
1702
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1703
            self._compressor = self._make_group_compressor()
 
1704
            # Note: At this point we still have 1 copy of the fulltext (in
 
1705
            #       record and the var 'bytes'), and this generates 2 copies of
 
1706
            #       the compressed text (one for bytes, one in chunks)
 
1707
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1708
            #       have to double compressed memory here
 
1709
            # TODO: Figure out how to indicate that we would be happy to free
 
1710
            #       the fulltext content at this point. Note that sometimes we
 
1711
            #       will want it later (streaming CHK pages), but most of the
 
1712
            #       time we won't (everything else)
 
1713
            bytes = ''.join(chunks)
 
1714
            del chunks
1440
1715
            index, start, length = self._access.add_raw_records(
1441
1716
                [(None, len(bytes))], bytes)[0]
1442
1717
            nodes = []
1445
1720
            self._index.add_records(nodes, random_id=random_id)
1446
1721
            self._unadded_refs = {}
1447
1722
            del keys_to_add[:]
1448
 
            self._compressor = GroupCompressor()
1449
1723
 
1450
1724
        last_prefix = None
1451
1725
        max_fulltext_len = 0
1455
1729
        block_length = None
1456
1730
        # XXX: TODO: remove this, it is just for safety checking for now
1457
1731
        inserted_keys = set()
 
1732
        reuse_this_block = reuse_blocks
1458
1733
        for record in stream:
1459
1734
            # Raise an error when a record is missing.
1460
1735
            if record.storage_kind == 'absent':
1461
1736
                raise errors.RevisionNotPresent(record.key, self)
1462
1737
            if random_id:
1463
1738
                if record.key in inserted_keys:
1464
 
                    trace.note('Insert claimed random_id=True,'
1465
 
                               ' but then inserted %r two times', record.key)
 
1739
                    trace.note(gettext('Insert claimed random_id=True,'
 
1740
                               ' but then inserted %r two times'), record.key)
1466
1741
                    continue
1467
1742
                inserted_keys.add(record.key)
1468
1743
            if reuse_blocks:
1469
1744
                # If the reuse_blocks flag is set, check to see if we can just
1470
1745
                # copy a groupcompress block as-is.
 
1746
                # We only check on the first record (groupcompress-block) not
 
1747
                # on all of the (groupcompress-block-ref) entries.
 
1748
                # The reuse_this_block flag is then kept for as long as
 
1749
                if record.storage_kind == 'groupcompress-block':
 
1750
                    # Check to see if we really want to re-use this block
 
1751
                    insert_manager = record._manager
 
1752
                    reuse_this_block = insert_manager.check_is_well_utilized()
 
1753
            else:
 
1754
                reuse_this_block = False
 
1755
            if reuse_this_block:
 
1756
                # We still want to reuse this block
1471
1757
                if record.storage_kind == 'groupcompress-block':
1472
1758
                    # Insert the raw block into the target repo
1473
1759
                    insert_manager = record._manager
1474
 
                    insert_manager._check_rebuild_block()
1475
1760
                    bytes = record._manager._block.to_bytes()
1476
1761
                    _, start, length = self._access.add_raw_records(
1477
1762
                        [(None, len(bytes))], bytes)[0]
1482
1767
                                           'groupcompress-block-ref'):
1483
1768
                    if insert_manager is None:
1484
1769
                        raise AssertionError('No insert_manager set')
 
1770
                    if insert_manager is not record._manager:
 
1771
                        raise AssertionError('insert_manager does not match'
 
1772
                            ' the current record, we cannot be positive'
 
1773
                            ' that the appropriate content was inserted.'
 
1774
                            )
1485
1775
                    value = "%d %d %d %d" % (block_start, block_length,
1486
1776
                                             record._start, record._end)
1487
1777
                    nodes = [(record.key, value, (record.parents,))]
1537
1827
                key = record.key
1538
1828
            self._unadded_refs[key] = record.parents
1539
1829
            yield found_sha1
1540
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
1541
 
                (record.parents,)))
 
1830
            as_st = static_tuple.StaticTuple.from_sequence
 
1831
            if record.parents is not None:
 
1832
                parents = as_st([as_st(p) for p in record.parents])
 
1833
            else:
 
1834
                parents = None
 
1835
            refs = static_tuple.StaticTuple(parents)
 
1836
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1542
1837
        if len(keys_to_add):
1543
1838
            flush()
1544
1839
        self._compressor = None
1587
1882
        """See VersionedFiles.keys."""
1588
1883
        if 'evil' in debug.debug_flags:
1589
1884
            trace.mutter_callsite(2, "keys scales with size of history")
1590
 
        sources = [self._index] + self._fallback_vfs
 
1885
        sources = [self._index] + self._immediate_fallback_vfs
1591
1886
        result = set()
1592
1887
        for source in sources:
1593
1888
            result.update(source.keys())
1594
1889
        return result
1595
1890
 
1596
1891
 
 
1892
class _GCBuildDetails(object):
 
1893
    """A blob of data about the build details.
 
1894
 
 
1895
    This stores the minimal data, which then allows compatibility with the old
 
1896
    api, without taking as much memory.
 
1897
    """
 
1898
 
 
1899
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1900
                 '_delta_end', '_parents')
 
1901
 
 
1902
    method = 'group'
 
1903
    compression_parent = None
 
1904
 
 
1905
    def __init__(self, parents, position_info):
 
1906
        self._parents = parents
 
1907
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1908
         self._delta_end) = position_info
 
1909
 
 
1910
    def __repr__(self):
 
1911
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1912
            self.index_memo, self._parents)
 
1913
 
 
1914
    @property
 
1915
    def index_memo(self):
 
1916
        return (self._index, self._group_start, self._group_end,
 
1917
                self._basis_end, self._delta_end)
 
1918
 
 
1919
    @property
 
1920
    def record_details(self):
 
1921
        return static_tuple.StaticTuple(self.method, None)
 
1922
 
 
1923
    def __getitem__(self, offset):
 
1924
        """Compatibility thunk to act like a tuple."""
 
1925
        if offset == 0:
 
1926
            return self.index_memo
 
1927
        elif offset == 1:
 
1928
            return self.compression_parent # Always None
 
1929
        elif offset == 2:
 
1930
            return self._parents
 
1931
        elif offset == 3:
 
1932
            return self.record_details
 
1933
        else:
 
1934
            raise IndexError('offset out of range')
 
1935
            
 
1936
    def __len__(self):
 
1937
        return 4
 
1938
 
 
1939
 
1597
1940
class _GCGraphIndex(object):
1598
1941
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1599
1942
 
1600
1943
    def __init__(self, graph_index, is_locked, parents=True,
1601
1944
        add_callback=None, track_external_parent_refs=False,
1602
 
        inconsistency_fatal=True):
 
1945
        inconsistency_fatal=True, track_new_keys=False):
1603
1946
        """Construct a _GCGraphIndex on a graph_index.
1604
1947
 
1605
1948
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1624
1967
        self.has_graph = parents
1625
1968
        self._is_locked = is_locked
1626
1969
        self._inconsistency_fatal = inconsistency_fatal
 
1970
        # GroupCompress records tend to have the same 'group' start + offset
 
1971
        # repeated over and over, this creates a surplus of ints
 
1972
        self._int_cache = {}
1627
1973
        if track_external_parent_refs:
1628
 
            self._key_dependencies = knit._KeyRefs()
 
1974
            self._key_dependencies = _KeyRefs(
 
1975
                track_new_keys=track_new_keys)
1629
1976
        else:
1630
1977
            self._key_dependencies = None
1631
1978
 
1664
2011
        if not random_id:
1665
2012
            present_nodes = self._get_entries(keys)
1666
2013
            for (index, key, value, node_refs) in present_nodes:
1667
 
                if node_refs != keys[key][1]:
1668
 
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
 
2014
                # Sometimes these are passed as a list rather than a tuple
 
2015
                node_refs = static_tuple.as_tuples(node_refs)
 
2016
                passed = static_tuple.as_tuples(keys[key])
 
2017
                if node_refs != passed[1]:
 
2018
                    details = '%s %s %s' % (key, (value, node_refs), passed)
1669
2019
                    if self._inconsistency_fatal:
1670
2020
                        raise errors.KnitCorrupt(self, "inconsistent details"
1671
2021
                                                 " in add_records: %s" %
1685
2035
                    result.append((key, value))
1686
2036
            records = result
1687
2037
        key_dependencies = self._key_dependencies
1688
 
        if key_dependencies is not None and self._parents:
1689
 
            for key, value, refs in records:
1690
 
                parents = refs[0]
1691
 
                key_dependencies.add_references(key, parents)
 
2038
        if key_dependencies is not None:
 
2039
            if self._parents:
 
2040
                for key, value, refs in records:
 
2041
                    parents = refs[0]
 
2042
                    key_dependencies.add_references(key, parents)
 
2043
            else:
 
2044
                for key, value, refs in records:
 
2045
                    new_keys.add_key(key)
1692
2046
        self._add_callback(records)
1693
2047
 
1694
2048
    def _check_read(self):
1725
2079
            if missing_keys:
1726
2080
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
1727
2081
 
 
2082
    def find_ancestry(self, keys):
 
2083
        """See CombinedGraphIndex.find_ancestry"""
 
2084
        return self._graph_index.find_ancestry(keys, 0)
 
2085
 
1728
2086
    def get_parent_map(self, keys):
1729
2087
        """Get a map of the parents of keys.
1730
2088
 
1747
2105
        """Return the keys of missing parents."""
1748
2106
        # Copied from _KnitGraphIndex.get_missing_parents
1749
2107
        # We may have false positives, so filter those out.
1750
 
        self._key_dependencies.add_keys(
 
2108
        self._key_dependencies.satisfy_refs_for_keys(
1751
2109
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
1752
2110
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
1753
2111
 
1759
2117
        :param keys: An iterable of keys.
1760
2118
        :return: A dict of key:
1761
2119
            (index_memo, compression_parent, parents, record_details).
1762
 
            index_memo
1763
 
                opaque structure to pass to read_records to extract the raw
1764
 
                data
1765
 
            compression_parent
1766
 
                Content that this record is built upon, may be None
1767
 
            parents
1768
 
                Logical parents of this node
1769
 
            record_details
1770
 
                extra information about the content which needs to be passed to
1771
 
                Factory.parse_record
 
2120
 
 
2121
            * index_memo: opaque structure to pass to read_records to extract
 
2122
              the raw data
 
2123
            * compression_parent: Content that this record is built upon, may
 
2124
              be None
 
2125
            * parents: Logical parents of this node
 
2126
            * record_details: extra information about the content which needs
 
2127
              to be passed to Factory.parse_record
1772
2128
        """
1773
2129
        self._check_read()
1774
2130
        result = {}
1779
2135
                parents = None
1780
2136
            else:
1781
2137
                parents = entry[3][0]
1782
 
            method = 'group'
1783
 
            result[key] = (self._node_to_position(entry),
1784
 
                                  None, parents, (method, None))
 
2138
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2139
            result[key] = details
1785
2140
        return result
1786
2141
 
1787
2142
    def keys(self):
1796
2151
        """Convert an index value to position details."""
1797
2152
        bits = node[2].split(' ')
1798
2153
        # It would be nice not to read the entire gzip.
 
2154
        # start and stop are put into _int_cache because they are very common.
 
2155
        # They define the 'group' that an entry is in, and many groups can have
 
2156
        # thousands of objects.
 
2157
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
 
2158
        # each, or about 7MB. Note that it might be even more when you consider
 
2159
        # how PyInt is allocated in separate slabs. And you can't return a slab
 
2160
        # to the OS if even 1 int on it is in use. Note though that Python uses
 
2161
        # a LIFO when re-using PyInt slots, which might cause more
 
2162
        # fragmentation.
1799
2163
        start = int(bits[0])
 
2164
        start = self._int_cache.setdefault(start, start)
1800
2165
        stop = int(bits[1])
 
2166
        stop = self._int_cache.setdefault(stop, stop)
1801
2167
        basis_end = int(bits[2])
1802
2168
        delta_end = int(bits[3])
1803
 
        return node[0], start, stop, basis_end, delta_end
 
2169
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
 
2170
        # instance...
 
2171
        return (node[0], start, stop, basis_end, delta_end)
1804
2172
 
1805
2173
    def scan_unvalidated_index(self, graph_index):
1806
2174
        """Inform this _GCGraphIndex that there is an unvalidated index.
1807
2175
 
1808
2176
        This allows this _GCGraphIndex to keep track of any missing
1809
2177
        compression parents we may want to have filled in to make those
1810
 
        indices valid.
 
2178
        indices valid.  It also allows _GCGraphIndex to track any new keys.
1811
2179
 
1812
2180
        :param graph_index: A GraphIndex
1813
2181
        """
1814
 
        if self._key_dependencies is not None:
1815
 
            # Add parent refs from graph_index (and discard parent refs that
1816
 
            # the graph_index has).
1817
 
            add_refs = self._key_dependencies.add_references
1818
 
            for node in graph_index.iter_all_entries():
1819
 
                add_refs(node[1], node[3][0])
1820
 
 
 
2182
        key_dependencies = self._key_dependencies
 
2183
        if key_dependencies is None:
 
2184
            return
 
2185
        for node in graph_index.iter_all_entries():
 
2186
            # Add parent refs from graph_index (and discard parent refs
 
2187
            # that the graph_index has).
 
2188
            key_dependencies.add_references(node[1], node[3][0])
1821
2189
 
1822
2190
 
1823
2191
from bzrlib._groupcompress_py import (
1837
2205
        decode_base128_int,
1838
2206
        )
1839
2207
    GroupCompressor = PyrexGroupCompressor
1840
 
except ImportError:
 
2208
except ImportError, e:
 
2209
    osutils.failed_to_load_extension(e)
1841
2210
    GroupCompressor = PythonGroupCompressor
1842
2211