~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2017-01-30 14:30:10 UTC
  • mfrom: (6615.3.7 merges)
  • mto: This revision was merged to the branch mainline in revision 6621.
  • Revision ID: v.ladeuil+lp@free.fr-20170130143010-p31t1ranfeqbaeki
Merge  2.7 into trunk including fix for bug #1657238

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
import zlib
21
 
try:
22
 
    import pylzma
23
 
except ImportError:
24
 
    pylzma = None
25
23
 
 
24
from bzrlib.lazy_import import lazy_import
 
25
lazy_import(globals(), """
26
26
from bzrlib import (
27
27
    annotate,
 
28
    config,
28
29
    debug,
29
30
    errors,
30
31
    graph as _mod_graph,
31
 
    knit,
32
32
    osutils,
33
33
    pack,
 
34
    static_tuple,
34
35
    trace,
 
36
    tsort,
35
37
    )
36
 
from bzrlib.graph import Graph
 
38
 
 
39
from bzrlib.repofmt import pack_repo
 
40
from bzrlib.i18n import gettext
 
41
""")
 
42
 
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
 
46
    _KeyRefs,
41
47
    adapter_registry,
42
48
    AbsentContentFactory,
43
49
    ChunkedContentFactory,
44
50
    FulltextContentFactory,
45
 
    VersionedFiles,
 
51
    VersionedFilesWithFallbacks,
46
52
    )
47
53
 
48
 
_USE_LZMA = False and (pylzma is not None)
 
54
# Minimum number of uncompressed bytes to try fetch at once when retrieving
 
55
# groupcompress blocks.
 
56
BATCH_SIZE = 2**16
49
57
 
50
58
# osutils.sha_string('')
51
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
52
60
 
53
 
 
54
61
def sort_gc_optimal(parent_map):
55
62
    """Sort and group the keys in parent_map into groupcompress order.
56
63
 
62
69
    # groupcompress ordering is approximately reverse topological,
63
70
    # properly grouped by file-id.
64
71
    per_prefix_map = {}
65
 
    for item in parent_map.iteritems():
66
 
        key = item[0]
 
72
    for key, value in parent_map.iteritems():
67
73
        if isinstance(key, str) or len(key) == 1:
68
74
            prefix = ''
69
75
        else:
70
76
            prefix = key[0]
71
77
        try:
72
 
            per_prefix_map[prefix].append(item)
 
78
            per_prefix_map[prefix][key] = value
73
79
        except KeyError:
74
 
            per_prefix_map[prefix] = [item]
 
80
            per_prefix_map[prefix] = {key: value}
75
81
 
76
82
    present_keys = []
77
83
    for prefix in sorted(per_prefix_map):
78
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
84
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
79
85
    return present_keys
80
86
 
81
87
 
99
105
    def __init__(self):
100
106
        # map by key? or just order in file?
101
107
        self._compressor_name = None
102
 
        self._z_content = None
 
108
        self._z_content_chunks = None
103
109
        self._z_content_decompressor = None
104
110
        self._z_content_length = None
105
111
        self._content_length = None
118
124
        :param num_bytes: Ensure that we have extracted at least num_bytes of
119
125
            content. If None, consume everything
120
126
        """
121
 
        # TODO: If we re-use the same content block at different times during
122
 
        #       get_record_stream(), it is possible that the first pass will
123
 
        #       get inserted, triggering an extract/_ensure_content() which
124
 
        #       will get rid of _z_content. And then the next use of the block
125
 
        #       will try to access _z_content (to send it over the wire), and
126
 
        #       fail because it is already extracted. Consider never releasing
127
 
        #       _z_content because of this.
 
127
        if self._content_length is None:
 
128
            raise AssertionError('self._content_length should never be None')
128
129
        if num_bytes is None:
129
130
            num_bytes = self._content_length
130
131
        elif (self._content_length is not None
138
139
                self._content = ''.join(self._content_chunks)
139
140
                self._content_chunks = None
140
141
        if self._content is None:
141
 
            if self._z_content is None:
 
142
            # We join self._z_content_chunks here, because if we are
 
143
            # decompressing, then it is *very* likely that we have a single
 
144
            # chunk
 
145
            if self._z_content_chunks is None:
142
146
                raise AssertionError('No content to decompress')
143
 
            if self._z_content == '':
 
147
            z_content = ''.join(self._z_content_chunks)
 
148
            if z_content == '':
144
149
                self._content = ''
145
150
            elif self._compressor_name == 'lzma':
146
151
                # We don't do partial lzma decomp yet
147
 
                self._content = pylzma.decompress(self._z_content)
 
152
                import pylzma
 
153
                self._content = pylzma.decompress(z_content)
148
154
            elif self._compressor_name == 'zlib':
149
155
                # Start a zlib decompressor
150
 
                if num_bytes is None:
151
 
                    self._content = zlib.decompress(self._z_content)
 
156
                if num_bytes * 4 > self._content_length * 3:
 
157
                    # If we are requesting more that 3/4ths of the content,
 
158
                    # just extract the whole thing in a single pass
 
159
                    num_bytes = self._content_length
 
160
                    self._content = zlib.decompress(z_content)
152
161
                else:
153
162
                    self._z_content_decompressor = zlib.decompressobj()
154
163
                    # Seed the decompressor with the uncompressed bytes, so
155
164
                    # that the rest of the code is simplified
156
165
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
166
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
167
                    if not self._z_content_decompressor.unconsumed_tail:
 
168
                        self._z_content_decompressor = None
158
169
            else:
159
170
                raise AssertionError('Unknown compressor: %r'
160
171
                                     % self._compressor_name)
162
173
        # 'unconsumed_tail'
163
174
 
164
175
        # Do we have enough bytes already?
165
 
        if num_bytes is not None and len(self._content) >= num_bytes:
166
 
            return
167
 
        if num_bytes is None and self._z_content_decompressor is None:
168
 
            # We must have already decompressed everything
 
176
        if len(self._content) >= num_bytes:
169
177
            return
170
178
        # If we got this far, and don't have a decompressor, something is wrong
171
179
        if self._z_content_decompressor is None:
172
180
            raise AssertionError(
173
181
                'No decompressor to decompress %d bytes' % num_bytes)
174
182
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
175
 
        if num_bytes is None:
176
 
            if remaining_decomp:
177
 
                # We don't know how much is left, but we'll decompress it all
178
 
                self._content += self._z_content_decompressor.decompress(
179
 
                    remaining_decomp)
180
 
                # Note: There's what I consider a bug in zlib.decompressobj
181
 
                #       If you pass back in the entire unconsumed_tail, only
182
 
                #       this time you don't pass a max-size, it doesn't
183
 
                #       change the unconsumed_tail back to None/''.
184
 
                #       However, we know we are done with the whole stream
185
 
                self._z_content_decompressor = None
186
 
            # XXX: Why is this the only place in this routine we set this?
187
 
            self._content_length = len(self._content)
188
 
        else:
189
 
            if not remaining_decomp:
190
 
                raise AssertionError('Nothing left to decompress')
191
 
            needed_bytes = num_bytes - len(self._content)
192
 
            # We always set max_size to 32kB over the minimum needed, so that
193
 
            # zlib will give us as much as we really want.
194
 
            # TODO: If this isn't good enough, we could make a loop here,
195
 
            #       that keeps expanding the request until we get enough
196
 
            self._content += self._z_content_decompressor.decompress(
197
 
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
198
 
            if len(self._content) < num_bytes:
199
 
                raise AssertionError('%d bytes wanted, only %d available'
200
 
                                     % (num_bytes, len(self._content)))
201
 
            if not self._z_content_decompressor.unconsumed_tail:
202
 
                # The stream is finished
203
 
                self._z_content_decompressor = None
 
183
        if not remaining_decomp:
 
184
            raise AssertionError('Nothing left to decompress')
 
185
        needed_bytes = num_bytes - len(self._content)
 
186
        # We always set max_size to 32kB over the minimum needed, so that
 
187
        # zlib will give us as much as we really want.
 
188
        # TODO: If this isn't good enough, we could make a loop here,
 
189
        #       that keeps expanding the request until we get enough
 
190
        self._content += self._z_content_decompressor.decompress(
 
191
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
192
        if len(self._content) < num_bytes:
 
193
            raise AssertionError('%d bytes wanted, only %d available'
 
194
                                 % (num_bytes, len(self._content)))
 
195
        if not self._z_content_decompressor.unconsumed_tail:
 
196
            # The stream is finished
 
197
            self._z_content_decompressor = None
204
198
 
205
199
    def _parse_bytes(self, bytes, pos):
206
200
        """Read the various lengths from the header.
222
216
            # XXX: Define some GCCorrupt error ?
223
217
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
224
218
                                 (len(bytes), pos, self._z_content_length))
225
 
        self._z_content = bytes[pos:]
 
219
        self._z_content_chunks = (bytes[pos:],)
 
220
 
 
221
    @property
 
222
    def _z_content(self):
 
223
        """Return z_content_chunks as a simple string.
 
224
 
 
225
        Meant only to be used by the test suite.
 
226
        """
 
227
        if self._z_content_chunks is not None:
 
228
            return ''.join(self._z_content_chunks)
 
229
        return None
226
230
 
227
231
    @classmethod
228
232
    def from_bytes(cls, bytes):
284
288
        self._content_length = length
285
289
        self._content_chunks = content_chunks
286
290
        self._content = None
287
 
        self._z_content = None
 
291
        self._z_content_chunks = None
288
292
 
289
293
    def set_content(self, content):
290
294
        """Set the content of this block."""
291
295
        self._content_length = len(content)
292
296
        self._content = content
293
 
        self._z_content = None
294
 
 
295
 
    def _create_z_content_using_lzma(self):
296
 
        if self._content_chunks is not None:
297
 
            self._content = ''.join(self._content_chunks)
298
 
            self._content_chunks = None
299
 
        if self._content is None:
300
 
            raise AssertionError('Nothing to compress')
301
 
        self._z_content = pylzma.compress(self._content)
302
 
        self._z_content_length = len(self._z_content)
303
 
 
304
 
    def _create_z_content_from_chunks(self):
 
297
        self._z_content_chunks = None
 
298
 
 
299
    def _create_z_content_from_chunks(self, chunks):
305
300
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
301
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
302
        # (measured peak is maybe 30MB over the above...)
 
303
        compressed_chunks = map(compressor.compress, chunks)
307
304
        compressed_chunks.append(compressor.flush())
308
 
        self._z_content = ''.join(compressed_chunks)
309
 
        self._z_content_length = len(self._z_content)
 
305
        # Ignore empty chunks
 
306
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
307
        self._z_content_length = sum(map(len, self._z_content_chunks))
310
308
 
311
309
    def _create_z_content(self):
312
 
        if self._z_content is not None:
313
 
            return
314
 
        if _USE_LZMA:
315
 
            self._create_z_content_using_lzma()
 
310
        if self._z_content_chunks is not None:
316
311
            return
317
312
        if self._content_chunks is not None:
318
 
            self._create_z_content_from_chunks()
319
 
            return
320
 
        self._z_content = zlib.compress(self._content)
321
 
        self._z_content_length = len(self._z_content)
 
313
            chunks = self._content_chunks
 
314
        else:
 
315
            chunks = (self._content,)
 
316
        self._create_z_content_from_chunks(chunks)
 
317
 
 
318
    def to_chunks(self):
 
319
        """Create the byte stream as a series of 'chunks'"""
 
320
        self._create_z_content()
 
321
        header = self.GCB_HEADER
 
322
        chunks = ['%s%d\n%d\n'
 
323
                  % (header, self._z_content_length, self._content_length),
 
324
                 ]
 
325
        chunks.extend(self._z_content_chunks)
 
326
        total_len = sum(map(len, chunks))
 
327
        return total_len, chunks
322
328
 
323
329
    def to_bytes(self):
324
330
        """Encode the information into a byte stream."""
325
 
        self._create_z_content()
326
 
        if _USE_LZMA:
327
 
            header = self.GCB_LZ_HEADER
328
 
        else:
329
 
            header = self.GCB_HEADER
330
 
        chunks = [header,
331
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
332
 
                  self._z_content,
333
 
                 ]
 
331
        total_len, chunks = self.to_chunks()
334
332
        return ''.join(chunks)
335
333
 
336
334
    def _dump(self, include_text=False):
450
448
                # Grab and cache the raw bytes for this entry
451
449
                # and break the ref-cycle with _manager since we don't need it
452
450
                # anymore
453
 
                self._manager._prepare_for_extract()
 
451
                try:
 
452
                    self._manager._prepare_for_extract()
 
453
                except zlib.error as value:
 
454
                    raise errors.DecompressCorruption("zlib: " + str(value))
454
455
                block = self._manager._block
455
456
                self._bytes = block.extract(self.key, self._start, self._end)
456
457
                # There are code paths that first extract as fulltext, and then
457
458
                # extract as storage_kind (smart fetch). So we don't break the
458
459
                # refcycle here, but instead in manager.get_record_stream()
459
 
                # self._manager = None
460
460
            if storage_kind == 'fulltext':
461
461
                return self._bytes
462
462
            else:
468
468
class _LazyGroupContentManager(object):
469
469
    """This manages a group of _LazyGroupCompressFactory objects."""
470
470
 
471
 
    def __init__(self, block):
 
471
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
472
                             # current size, and still be considered
 
473
                             # resuable
 
474
    _full_block_size = 4*1024*1024
 
475
    _full_mixed_block_size = 2*1024*1024
 
476
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
477
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
478
 
 
479
    def __init__(self, block, get_compressor_settings=None):
472
480
        self._block = block
473
481
        # We need to preserve the ordering
474
482
        self._factories = []
475
483
        self._last_byte = 0
 
484
        self._get_settings = get_compressor_settings
 
485
        self._compressor_settings = None
 
486
 
 
487
    def _get_compressor_settings(self):
 
488
        if self._compressor_settings is not None:
 
489
            return self._compressor_settings
 
490
        settings = None
 
491
        if self._get_settings is not None:
 
492
            settings = self._get_settings()
 
493
        if settings is None:
 
494
            vf = GroupCompressVersionedFiles
 
495
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
496
        self._compressor_settings = settings
 
497
        return self._compressor_settings
476
498
 
477
499
    def add_factory(self, key, parents, start, end):
478
500
        if not self._factories:
511
533
        new_block.set_content(self._block._content[:last_byte])
512
534
        self._block = new_block
513
535
 
 
536
    def _make_group_compressor(self):
 
537
        return GroupCompressor(self._get_compressor_settings())
 
538
 
514
539
    def _rebuild_block(self):
515
540
        """Create a new GroupCompressBlock with only the referenced texts."""
516
 
        compressor = GroupCompressor()
 
541
        compressor = self._make_group_compressor()
517
542
        tstart = time.time()
518
543
        old_length = self._block._content_length
519
544
        end_point = 0
531
556
        #       block? It seems hard to come up with a method that it would
532
557
        #       expand, since we do full compression again. Perhaps based on a
533
558
        #       request that ends up poorly ordered?
 
559
        # TODO: If the content would have expanded, then we would want to
 
560
        #       handle a case where we need to split the block.
 
561
        #       Now that we have a user-tweakable option
 
562
        #       (max_bytes_to_index), it is possible that one person set it
 
563
        #       to a very low value, causing poor compression.
534
564
        delta = time.time() - tstart
535
565
        self._block = new_block
536
566
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
545
575
        # time (self._block._content) is a little expensive.
546
576
        self._block._ensure_content(self._last_byte)
547
577
 
548
 
    def _check_rebuild_block(self):
 
578
    def _check_rebuild_action(self):
549
579
        """Check to see if our block should be repacked."""
550
580
        total_bytes_used = 0
551
581
        last_byte_used = 0
552
582
        for factory in self._factories:
553
583
            total_bytes_used += factory._end - factory._start
554
 
            last_byte_used = max(last_byte_used, factory._end)
555
 
        # If we are using most of the bytes from the block, we have nothing
556
 
        # else to check (currently more that 1/2)
 
584
            if last_byte_used < factory._end:
 
585
                last_byte_used = factory._end
 
586
        # If we are using more than half of the bytes from the block, we have
 
587
        # nothing else to check
557
588
        if total_bytes_used * 2 >= self._block._content_length:
558
 
            return
559
 
        # Can we just strip off the trailing bytes? If we are going to be
560
 
        # transmitting more than 50% of the front of the content, go ahead
 
589
            return None, last_byte_used, total_bytes_used
 
590
        # We are using less than 50% of the content. Is the content we are
 
591
        # using at the beginning of the block? If so, we can just trim the
 
592
        # tail, rather than rebuilding from scratch.
561
593
        if total_bytes_used * 2 > last_byte_used:
562
 
            self._trim_block(last_byte_used)
563
 
            return
 
594
            return 'trim', last_byte_used, total_bytes_used
564
595
 
565
596
        # We are using a small amount of the data, and it isn't just packed
566
597
        # nicely at the front, so rebuild the content.
573
604
        #       expanding many deltas into fulltexts, as well.
574
605
        #       If we build a cheap enough 'strip', then we could try a strip,
575
606
        #       if that expands the content, we then rebuild.
576
 
        self._rebuild_block()
 
607
        return 'rebuild', last_byte_used, total_bytes_used
 
608
 
 
609
    def check_is_well_utilized(self):
 
610
        """Is the current block considered 'well utilized'?
 
611
 
 
612
        This heuristic asks if the current block considers itself to be a fully
 
613
        developed group, rather than just a loose collection of data.
 
614
        """
 
615
        if len(self._factories) == 1:
 
616
            # A block of length 1 could be improved by combining with other
 
617
            # groups - don't look deeper. Even larger than max size groups
 
618
            # could compress well with adjacent versions of the same thing.
 
619
            return False
 
620
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
621
        block_size = self._block._content_length
 
622
        if total_bytes_used < block_size * self._max_cut_fraction:
 
623
            # This block wants to trim itself small enough that we want to
 
624
            # consider it under-utilized.
 
625
            return False
 
626
        # TODO: This code is meant to be the twin of _insert_record_stream's
 
627
        #       'start_new_block' logic. It would probably be better to factor
 
628
        #       out that logic into a shared location, so that it stays
 
629
        #       together better
 
630
        # We currently assume a block is properly utilized whenever it is >75%
 
631
        # of the size of a 'full' block. In normal operation, a block is
 
632
        # considered full when it hits 4MB of same-file content. So any block
 
633
        # >3MB is 'full enough'.
 
634
        # The only time this isn't true is when a given block has large-object
 
635
        # content. (a single file >4MB, etc.)
 
636
        # Under these circumstances, we allow a block to grow to
 
637
        # 2 x largest_content.  Which means that if a given block had a large
 
638
        # object, it may actually be under-utilized. However, given that this
 
639
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
 
640
        # content blobs on-the-fly. Note that because we return False for all
 
641
        # 1-item blobs, we will repack them; we may wish to reevaluate our
 
642
        # treatment of large object blobs in the future.
 
643
        if block_size >= self._full_enough_block_size:
 
644
            return True
 
645
        # If a block is <3MB, it still may be considered 'full' if it contains
 
646
        # mixed content. The current rule is 2MB of mixed content is considered
 
647
        # full. So check to see if this block contains mixed content, and
 
648
        # set the threshold appropriately.
 
649
        common_prefix = None
 
650
        for factory in self._factories:
 
651
            prefix = factory.key[:-1]
 
652
            if common_prefix is None:
 
653
                common_prefix = prefix
 
654
            elif prefix != common_prefix:
 
655
                # Mixed content, check the size appropriately
 
656
                if block_size >= self._full_enough_mixed_block_size:
 
657
                    return True
 
658
                break
 
659
        # The content failed both the mixed check and the single-content check
 
660
        # so obviously it is not fully utilized
 
661
        # TODO: there is one other constraint that isn't being checked
 
662
        #       namely, that the entries in the block are in the appropriate
 
663
        #       order. For example, you could insert the entries in exactly
 
664
        #       reverse groupcompress order, and we would think that is ok.
 
665
        #       (all the right objects are in one group, and it is fully
 
666
        #       utilized, etc.) For now, we assume that case is rare,
 
667
        #       especially since we should always fetch in 'groupcompress'
 
668
        #       order.
 
669
        return False
 
670
 
 
671
    def _check_rebuild_block(self):
 
672
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
673
        if action is None:
 
674
            return
 
675
        if action == 'trim':
 
676
            self._trim_block(last_byte_used)
 
677
        elif action == 'rebuild':
 
678
            self._rebuild_block()
 
679
        else:
 
680
            raise ValueError('unknown rebuild action: %r' % (action,))
577
681
 
578
682
    def _wire_bytes(self):
579
683
        """Return a byte stream suitable for transmitting over the wire."""
613
717
        z_header_bytes = zlib.compress(header_bytes)
614
718
        del header_bytes
615
719
        z_header_bytes_len = len(z_header_bytes)
616
 
        block_bytes = self._block.to_bytes()
 
720
        block_bytes_len, block_chunks = self._block.to_chunks()
617
721
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
618
 
                                       len(block_bytes)))
 
722
                                       block_bytes_len))
619
723
        lines.append(z_header_bytes)
620
 
        lines.append(block_bytes)
621
 
        del z_header_bytes, block_bytes
 
724
        lines.extend(block_chunks)
 
725
        del z_header_bytes, block_chunks
 
726
        # TODO: This is a point where we will double the memory consumption. To
 
727
        #       avoid this, we probably have to switch to a 'chunked' api
622
728
        return ''.join(lines)
623
729
 
624
730
    @classmethod
625
731
    def from_bytes(cls, bytes):
626
732
        # TODO: This does extra string copying, probably better to do it a
627
 
        #       different way
 
733
        #       different way. At a minimum this creates 2 copies of the
 
734
        #       compressed content
628
735
        (storage_kind, z_header_len, header_len,
629
736
         block_len, rest) = bytes.split('\n', 4)
630
737
        del bytes
682
789
 
683
790
class _CommonGroupCompressor(object):
684
791
 
685
 
    def __init__(self):
 
792
    def __init__(self, settings=None):
686
793
        """Create a GroupCompressor."""
687
794
        self.chunks = []
688
795
        self._last = None
691
798
        self.labels_deltas = {}
692
799
        self._delta_index = None # Set by the children
693
800
        self._block = GroupCompressBlock()
 
801
        if settings is None:
 
802
            self._settings = {}
 
803
        else:
 
804
            self._settings = settings
694
805
 
695
806
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
696
807
        """Compress lines with label key.
788
899
 
789
900
        After calling this, the compressor should no longer be used
790
901
        """
791
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
792
 
        #       group. This has an impact for 'commit' of large objects.
793
 
        #       One possibility is to use self._content_chunks, and be lazy and
794
 
        #       only fill out self._content as a full string when we actually
795
 
        #       need it. That would at least drop the peak memory consumption
796
 
        #       for 'commit' down to ~1x the size of the largest file, at a
797
 
        #       cost of increased complexity within this code. 2x is still <<
798
 
        #       3x the size of the largest file, so we are doing ok.
799
902
        self._block.set_chunked_content(self.chunks, self.endpoint)
800
903
        self.chunks = None
801
904
        self._delta_index = None
819
922
 
820
923
class PythonGroupCompressor(_CommonGroupCompressor):
821
924
 
822
 
    def __init__(self):
 
925
    def __init__(self, settings=None):
823
926
        """Create a GroupCompressor.
824
927
 
825
928
        Used only if the pyrex version is not available.
826
929
        """
827
 
        super(PythonGroupCompressor, self).__init__()
 
930
        super(PythonGroupCompressor, self).__init__(settings)
828
931
        self._delta_index = LinesDeltaIndex([])
829
932
        # The actual content is managed by LinesDeltaIndex
830
933
        self.chunks = self._delta_index.lines
867
970
 
868
971
    It contains code very similar to SequenceMatcher because of having a similar
869
972
    task. However some key differences apply:
870
 
     - there is no junk, we want a minimal edit not a human readable diff.
871
 
     - we don't filter very common lines (because we don't know where a good
872
 
       range will start, and after the first text we want to be emitting minmal
873
 
       edits only.
874
 
     - we chain the left side, not the right side
875
 
     - we incrementally update the adjacency matrix as new lines are provided.
876
 
     - we look for matches in all of the left side, so the routine which does
877
 
       the analagous task of find_longest_match does not need to filter on the
878
 
       left side.
 
973
 
 
974
    * there is no junk, we want a minimal edit not a human readable diff.
 
975
    * we don't filter very common lines (because we don't know where a good
 
976
      range will start, and after the first text we want to be emitting minmal
 
977
      edits only.
 
978
    * we chain the left side, not the right side
 
979
    * we incrementally update the adjacency matrix as new lines are provided.
 
980
    * we look for matches in all of the left side, so the routine which does
 
981
      the analagous task of find_longest_match does not need to filter on the
 
982
      left side.
879
983
    """
880
984
 
881
 
    def __init__(self):
882
 
        super(PyrexGroupCompressor, self).__init__()
883
 
        self._delta_index = DeltaIndex()
 
985
    def __init__(self, settings=None):
 
986
        super(PyrexGroupCompressor, self).__init__(settings)
 
987
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
988
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
884
989
 
885
990
    def _compress(self, key, bytes, max_delta_size, soft=False):
886
991
        """see _CommonGroupCompressor._compress"""
961
1066
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
962
1067
            add_callback=graph_index.add_nodes,
963
1068
            inconsistency_fatal=inconsistency_fatal)
964
 
        access = knit._DirectPackAccess({})
 
1069
        access = pack_repo._DirectPackAccess({})
965
1070
        access.set_writer(writer, graph_index, (transport, 'newpack'))
966
1071
        result = GroupCompressVersionedFiles(index, access, delta)
967
1072
        result.stream = stream
975
1080
    versioned_files.stream.close()
976
1081
 
977
1082
 
978
 
class GroupCompressVersionedFiles(VersionedFiles):
 
1083
class _BatchingBlockFetcher(object):
 
1084
    """Fetch group compress blocks in batches.
 
1085
 
 
1086
    :ivar total_bytes: int of expected number of bytes needed to fetch the
 
1087
        currently pending batch.
 
1088
    """
 
1089
 
 
1090
    def __init__(self, gcvf, locations, get_compressor_settings=None):
 
1091
        self.gcvf = gcvf
 
1092
        self.locations = locations
 
1093
        self.keys = []
 
1094
        self.batch_memos = {}
 
1095
        self.memos_to_get = []
 
1096
        self.total_bytes = 0
 
1097
        self.last_read_memo = None
 
1098
        self.manager = None
 
1099
        self._get_compressor_settings = get_compressor_settings
 
1100
 
 
1101
    def add_key(self, key):
 
1102
        """Add another to key to fetch.
 
1103
 
 
1104
        :return: The estimated number of bytes needed to fetch the batch so
 
1105
            far.
 
1106
        """
 
1107
        self.keys.append(key)
 
1108
        index_memo, _, _, _ = self.locations[key]
 
1109
        read_memo = index_memo[0:3]
 
1110
        # Three possibilities for this read_memo:
 
1111
        #  - it's already part of this batch; or
 
1112
        #  - it's not yet part of this batch, but is already cached; or
 
1113
        #  - it's not yet part of this batch and will need to be fetched.
 
1114
        if read_memo in self.batch_memos:
 
1115
            # This read memo is already in this batch.
 
1116
            return self.total_bytes
 
1117
        try:
 
1118
            cached_block = self.gcvf._group_cache[read_memo]
 
1119
        except KeyError:
 
1120
            # This read memo is new to this batch, and the data isn't cached
 
1121
            # either.
 
1122
            self.batch_memos[read_memo] = None
 
1123
            self.memos_to_get.append(read_memo)
 
1124
            byte_length = read_memo[2]
 
1125
            self.total_bytes += byte_length
 
1126
        else:
 
1127
            # This read memo is new to this batch, but cached.
 
1128
            # Keep a reference to the cached block in batch_memos because it's
 
1129
            # certain that we'll use it when this batch is processed, but
 
1130
            # there's a risk that it would fall out of _group_cache between now
 
1131
            # and then.
 
1132
            self.batch_memos[read_memo] = cached_block
 
1133
        return self.total_bytes
 
1134
 
 
1135
    def _flush_manager(self):
 
1136
        if self.manager is not None:
 
1137
            for factory in self.manager.get_record_stream():
 
1138
                yield factory
 
1139
            self.manager = None
 
1140
            self.last_read_memo = None
 
1141
 
 
1142
    def yield_factories(self, full_flush=False):
 
1143
        """Yield factories for keys added since the last yield.  They will be
 
1144
        returned in the order they were added via add_key.
 
1145
 
 
1146
        :param full_flush: by default, some results may not be returned in case
 
1147
            they can be part of the next batch.  If full_flush is True, then
 
1148
            all results are returned.
 
1149
        """
 
1150
        if self.manager is None and not self.keys:
 
1151
            return
 
1152
        # Fetch all memos in this batch.
 
1153
        blocks = self.gcvf._get_blocks(self.memos_to_get)
 
1154
        # Turn blocks into factories and yield them.
 
1155
        memos_to_get_stack = list(self.memos_to_get)
 
1156
        memos_to_get_stack.reverse()
 
1157
        for key in self.keys:
 
1158
            index_memo, _, parents, _ = self.locations[key]
 
1159
            read_memo = index_memo[:3]
 
1160
            if self.last_read_memo != read_memo:
 
1161
                # We are starting a new block. If we have a
 
1162
                # manager, we have found everything that fits for
 
1163
                # now, so yield records
 
1164
                for factory in self._flush_manager():
 
1165
                    yield factory
 
1166
                # Now start a new manager.
 
1167
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
 
1168
                    # The next block from _get_blocks will be the block we
 
1169
                    # need.
 
1170
                    block_read_memo, block = blocks.next()
 
1171
                    if block_read_memo != read_memo:
 
1172
                        raise AssertionError(
 
1173
                            "block_read_memo out of sync with read_memo"
 
1174
                            "(%r != %r)" % (block_read_memo, read_memo))
 
1175
                    self.batch_memos[read_memo] = block
 
1176
                    memos_to_get_stack.pop()
 
1177
                else:
 
1178
                    block = self.batch_memos[read_memo]
 
1179
                self.manager = _LazyGroupContentManager(block,
 
1180
                    get_compressor_settings=self._get_compressor_settings)
 
1181
                self.last_read_memo = read_memo
 
1182
            start, end = index_memo[3:5]
 
1183
            self.manager.add_factory(key, parents, start, end)
 
1184
        if full_flush:
 
1185
            for factory in self._flush_manager():
 
1186
                yield factory
 
1187
        del self.keys[:]
 
1188
        self.batch_memos.clear()
 
1189
        del self.memos_to_get[:]
 
1190
        self.total_bytes = 0
 
1191
 
 
1192
 
 
1193
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
979
1194
    """A group-compress based VersionedFiles implementation."""
980
1195
 
981
 
    def __init__(self, index, access, delta=True):
 
1196
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1197
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1198
    # However each of these references costs some memory in trade against a
 
1199
    # more accurate match result. For very large files, they either are
 
1200
    # pre-compressed and change in bulk whenever they change, or change in just
 
1201
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1202
    # versus running out of memory trying to track everything. The default max
 
1203
    # gives 100% sampling of a 1MB file.
 
1204
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1205
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1206
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
 
1207
 
 
1208
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1209
                 _group_cache=None):
982
1210
        """Create a GroupCompressVersionedFiles object.
983
1211
 
984
1212
        :param index: The index object storing access and graph data.
985
1213
        :param access: The access object storing raw data.
986
1214
        :param delta: Whether to delta compress or just entropy compress.
 
1215
        :param _unadded_refs: private parameter, don't use.
 
1216
        :param _group_cache: private parameter, don't use.
987
1217
        """
988
1218
        self._index = index
989
1219
        self._access = access
990
1220
        self._delta = delta
991
 
        self._unadded_refs = {}
992
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
993
 
        self._fallback_vfs = []
 
1221
        if _unadded_refs is None:
 
1222
            _unadded_refs = {}
 
1223
        self._unadded_refs = _unadded_refs
 
1224
        if _group_cache is None:
 
1225
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1226
        self._group_cache = _group_cache
 
1227
        self._immediate_fallback_vfs = []
 
1228
        self._max_bytes_to_index = None
 
1229
 
 
1230
    def without_fallbacks(self):
 
1231
        """Return a clone of this object without any fallbacks configured."""
 
1232
        return GroupCompressVersionedFiles(self._index, self._access,
 
1233
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1234
            _group_cache=self._group_cache)
994
1235
 
995
1236
    def add_lines(self, key, parents, lines, parent_texts=None,
996
1237
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1000
1241
        :param key: The key tuple of the text to add.
1001
1242
        :param parents: The parents key tuples of the text to add.
1002
1243
        :param lines: A list of lines. Each line must be a bytestring. And all
1003
 
            of them except the last must be terminated with \n and contain no
1004
 
            other \n's. The last line may either contain no \n's or a single
1005
 
            terminating \n. If the lines list does meet this constraint the add
1006
 
            routine may error or may succeed - but you will be unable to read
1007
 
            the data back accurately. (Checking the lines have been split
 
1244
            of them except the last must be terminated with \\n and contain no
 
1245
            other \\n's. The last line may either contain no \\n's or a single
 
1246
            terminating \\n. If the lines list does meet this constraint the
 
1247
            add routine may error or may succeed - but you will be unable to
 
1248
            read the data back accurately. (Checking the lines have been split
1008
1249
            correctly is expensive and extremely unlikely to catch bugs so it
1009
1250
            is not done at runtime unless check_content is True.)
1010
1251
        :param parent_texts: An optional dictionary containing the opaque
1065
1306
 
1066
1307
        :param a_versioned_files: A VersionedFiles object.
1067
1308
        """
1068
 
        self._fallback_vfs.append(a_versioned_files)
 
1309
        self._immediate_fallback_vfs.append(a_versioned_files)
1069
1310
 
1070
1311
    def annotate(self, key):
1071
1312
        """See VersionedFiles.annotate."""
1075
1316
    def get_annotator(self):
1076
1317
        return annotate.Annotator(self)
1077
1318
 
1078
 
    def check(self, progress_bar=None):
 
1319
    def check(self, progress_bar=None, keys=None):
1079
1320
        """See VersionedFiles.check()."""
1080
 
        keys = self.keys()
1081
 
        for record in self.get_record_stream(keys, 'unordered', True):
1082
 
            record.get_bytes_as('fulltext')
 
1321
        if keys is None:
 
1322
            keys = self.keys()
 
1323
            for record in self.get_record_stream(keys, 'unordered', True):
 
1324
                record.get_bytes_as('fulltext')
 
1325
        else:
 
1326
            return self.get_record_stream(keys, 'unordered', True)
 
1327
 
 
1328
    def clear_cache(self):
 
1329
        """See VersionedFiles.clear_cache()"""
 
1330
        self._group_cache.clear()
 
1331
        self._index._graph_index.clear_cache()
 
1332
        self._index._int_cache.clear()
1083
1333
 
1084
1334
    def _check_add(self, key, lines, random_id, check_content):
1085
1335
        """check that version_id and lines are safe to add."""
1116
1366
            and so on.
1117
1367
        """
1118
1368
        result = {}
1119
 
        sources = [self._index] + self._fallback_vfs
 
1369
        sources = [self._index] + self._immediate_fallback_vfs
1120
1370
        source_results = []
1121
1371
        missing = set(keys)
1122
1372
        for source in sources:
1128
1378
            missing.difference_update(set(new_result))
1129
1379
        return result, source_results
1130
1380
 
1131
 
    def _get_block(self, index_memo):
1132
 
        read_memo = index_memo[0:3]
1133
 
        # get the group:
1134
 
        try:
1135
 
            block = self._group_cache[read_memo]
1136
 
        except KeyError:
1137
 
            # read the group
1138
 
            zdata = self._access.get_raw_records([read_memo]).next()
1139
 
            # decompress - whole thing - this is not a bug, as it
1140
 
            # permits caching. We might want to store the partially
1141
 
            # decompresed group and decompress object, so that recent
1142
 
            # texts are not penalised by big groups.
1143
 
            block = GroupCompressBlock.from_bytes(zdata)
1144
 
            self._group_cache[read_memo] = block
1145
 
        # cheapo debugging:
1146
 
        # print len(zdata), len(plain)
1147
 
        # parse - requires split_lines, better to have byte offsets
1148
 
        # here (but not by much - we only split the region for the
1149
 
        # recipe, and we often want to end up with lines anyway.
1150
 
        return block
 
1381
    def _get_blocks(self, read_memos):
 
1382
        """Get GroupCompressBlocks for the given read_memos.
 
1383
 
 
1384
        :returns: a series of (read_memo, block) pairs, in the order they were
 
1385
            originally passed.
 
1386
        """
 
1387
        cached = {}
 
1388
        for read_memo in read_memos:
 
1389
            try:
 
1390
                block = self._group_cache[read_memo]
 
1391
            except KeyError:
 
1392
                pass
 
1393
            else:
 
1394
                cached[read_memo] = block
 
1395
        not_cached = []
 
1396
        not_cached_seen = set()
 
1397
        for read_memo in read_memos:
 
1398
            if read_memo in cached:
 
1399
                # Don't fetch what we already have
 
1400
                continue
 
1401
            if read_memo in not_cached_seen:
 
1402
                # Don't try to fetch the same data twice
 
1403
                continue
 
1404
            not_cached.append(read_memo)
 
1405
            not_cached_seen.add(read_memo)
 
1406
        raw_records = self._access.get_raw_records(not_cached)
 
1407
        for read_memo in read_memos:
 
1408
            try:
 
1409
                yield read_memo, cached[read_memo]
 
1410
            except KeyError:
 
1411
                # Read the block, and cache it.
 
1412
                zdata = raw_records.next()
 
1413
                block = GroupCompressBlock.from_bytes(zdata)
 
1414
                self._group_cache[read_memo] = block
 
1415
                cached[read_memo] = block
 
1416
                yield read_memo, block
1151
1417
 
1152
1418
    def get_missing_compression_parent_keys(self):
1153
1419
        """Return the keys of missing compression parents.
1207
1473
        parent_map = {}
1208
1474
        key_to_source_map = {}
1209
1475
        source_results = []
1210
 
        for source in self._fallback_vfs:
 
1476
        for source in self._immediate_fallback_vfs:
1211
1477
            if not missing:
1212
1478
                break
1213
1479
            source_parents = source.get_parent_map(missing)
1223
1489
 
1224
1490
        The returned objects should be in the order defined by 'ordering',
1225
1491
        which can weave between different sources.
 
1492
 
1226
1493
        :param ordering: Must be one of 'topological' or 'groupcompress'
1227
1494
        :return: List of [(source, [keys])] tuples, such that all keys are in
1228
1495
            the defined order, regardless of source.
1229
1496
        """
1230
1497
        if ordering == 'topological':
1231
 
            present_keys = topo_sort(parent_map)
 
1498
            present_keys = tsort.topo_sort(parent_map)
1232
1499
        else:
1233
1500
            # ordering == 'groupcompress'
1234
1501
            # XXX: This only optimizes for the target ordering. We may need
1319
1586
                unadded_keys, source_result)
1320
1587
        for key in missing:
1321
1588
            yield AbsentContentFactory(key)
1322
 
        manager = None
1323
 
        last_read_memo = None
1324
 
        # TODO: This works fairly well at batching up existing groups into a
1325
 
        #       streamable format, and possibly allowing for taking one big
1326
 
        #       group and splitting it when it isn't fully utilized.
1327
 
        #       However, it doesn't allow us to find under-utilized groups and
1328
 
        #       combine them into a bigger group on the fly.
1329
 
        #       (Consider the issue with how chk_map inserts texts
1330
 
        #       one-at-a-time.) This could be done at insert_record_stream()
1331
 
        #       time, but it probably would decrease the number of
1332
 
        #       bytes-on-the-wire for fetch.
 
1589
        # Batch up as many keys as we can until either:
 
1590
        #  - we encounter an unadded ref, or
 
1591
        #  - we run out of keys, or
 
1592
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
 
1593
        batcher = _BatchingBlockFetcher(self, locations,
 
1594
            get_compressor_settings=self._get_compressor_settings)
1333
1595
        for source, keys in source_keys:
1334
1596
            if source is self:
1335
1597
                for key in keys:
1336
1598
                    if key in self._unadded_refs:
1337
 
                        if manager is not None:
1338
 
                            for factory in manager.get_record_stream():
1339
 
                                yield factory
1340
 
                            last_read_memo = manager = None
 
1599
                        # Flush batch, then yield unadded ref from
 
1600
                        # self._compressor.
 
1601
                        for factory in batcher.yield_factories(full_flush=True):
 
1602
                            yield factory
1341
1603
                        bytes, sha1 = self._compressor.extract(key)
1342
1604
                        parents = self._unadded_refs[key]
1343
1605
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1344
 
                    else:
1345
 
                        index_memo, _, parents, (method, _) = locations[key]
1346
 
                        read_memo = index_memo[0:3]
1347
 
                        if last_read_memo != read_memo:
1348
 
                            # We are starting a new block. If we have a
1349
 
                            # manager, we have found everything that fits for
1350
 
                            # now, so yield records
1351
 
                            if manager is not None:
1352
 
                                for factory in manager.get_record_stream():
1353
 
                                    yield factory
1354
 
                            # Now start a new manager
1355
 
                            block = self._get_block(index_memo)
1356
 
                            manager = _LazyGroupContentManager(block)
1357
 
                            last_read_memo = read_memo
1358
 
                        start, end = index_memo[3:5]
1359
 
                        manager.add_factory(key, parents, start, end)
 
1606
                        continue
 
1607
                    if batcher.add_key(key) > BATCH_SIZE:
 
1608
                        # Ok, this batch is big enough.  Yield some results.
 
1609
                        for factory in batcher.yield_factories():
 
1610
                            yield factory
1360
1611
            else:
1361
 
                if manager is not None:
1362
 
                    for factory in manager.get_record_stream():
1363
 
                        yield factory
1364
 
                    last_read_memo = manager = None
 
1612
                for factory in batcher.yield_factories(full_flush=True):
 
1613
                    yield factory
1365
1614
                for record in source.get_record_stream(keys, ordering,
1366
1615
                                                       include_delta_closure):
1367
1616
                    yield record
1368
 
        if manager is not None:
1369
 
            for factory in manager.get_record_stream():
1370
 
                yield factory
 
1617
        for factory in batcher.yield_factories(full_flush=True):
 
1618
            yield factory
1371
1619
 
1372
1620
    def get_sha1s(self, keys):
1373
1621
        """See VersionedFiles.get_sha1s()."""
1395
1643
        for _ in self._insert_record_stream(stream, random_id=False):
1396
1644
            pass
1397
1645
 
 
1646
    def _get_compressor_settings(self):
 
1647
        if self._max_bytes_to_index is None:
 
1648
            # TODO: VersionedFiles don't know about their containing
 
1649
            #       repository, so they don't have much of an idea about their
 
1650
            #       location. So for now, this is only a global option.
 
1651
            c = config.GlobalConfig()
 
1652
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1653
            if val is not None:
 
1654
                try:
 
1655
                    val = int(val)
 
1656
                except ValueError, e:
 
1657
                    trace.warning('Value for '
 
1658
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1659
                                  ' %r is not an integer'
 
1660
                                  % (val,))
 
1661
                    val = None
 
1662
            if val is None:
 
1663
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1664
            self._max_bytes_to_index = val
 
1665
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1666
 
 
1667
    def _make_group_compressor(self):
 
1668
        return GroupCompressor(self._get_compressor_settings())
 
1669
 
1398
1670
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1399
1671
                              reuse_blocks=True):
1400
1672
        """Internal core to insert a record stream into this container.
1423
1695
                return adapter
1424
1696
        # This will go up to fulltexts for gc to gc fetching, which isn't
1425
1697
        # ideal.
1426
 
        self._compressor = GroupCompressor()
 
1698
        self._compressor = self._make_group_compressor()
1427
1699
        self._unadded_refs = {}
1428
1700
        keys_to_add = []
1429
1701
        def flush():
1430
 
            bytes = self._compressor.flush().to_bytes()
 
1702
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1703
            self._compressor = self._make_group_compressor()
 
1704
            # Note: At this point we still have 1 copy of the fulltext (in
 
1705
            #       record and the var 'bytes'), and this generates 2 copies of
 
1706
            #       the compressed text (one for bytes, one in chunks)
 
1707
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1708
            #       have to double compressed memory here
 
1709
            # TODO: Figure out how to indicate that we would be happy to free
 
1710
            #       the fulltext content at this point. Note that sometimes we
 
1711
            #       will want it later (streaming CHK pages), but most of the
 
1712
            #       time we won't (everything else)
 
1713
            bytes = ''.join(chunks)
 
1714
            del chunks
1431
1715
            index, start, length = self._access.add_raw_records(
1432
1716
                [(None, len(bytes))], bytes)[0]
1433
1717
            nodes = []
1436
1720
            self._index.add_records(nodes, random_id=random_id)
1437
1721
            self._unadded_refs = {}
1438
1722
            del keys_to_add[:]
1439
 
            self._compressor = GroupCompressor()
1440
1723
 
1441
1724
        last_prefix = None
1442
1725
        max_fulltext_len = 0
1446
1729
        block_length = None
1447
1730
        # XXX: TODO: remove this, it is just for safety checking for now
1448
1731
        inserted_keys = set()
 
1732
        reuse_this_block = reuse_blocks
1449
1733
        for record in stream:
1450
1734
            # Raise an error when a record is missing.
1451
1735
            if record.storage_kind == 'absent':
1452
1736
                raise errors.RevisionNotPresent(record.key, self)
1453
1737
            if random_id:
1454
1738
                if record.key in inserted_keys:
1455
 
                    trace.note('Insert claimed random_id=True,'
1456
 
                               ' but then inserted %r two times', record.key)
 
1739
                    trace.note(gettext('Insert claimed random_id=True,'
 
1740
                               ' but then inserted %r two times'), record.key)
1457
1741
                    continue
1458
1742
                inserted_keys.add(record.key)
1459
1743
            if reuse_blocks:
1460
1744
                # If the reuse_blocks flag is set, check to see if we can just
1461
1745
                # copy a groupcompress block as-is.
 
1746
                # We only check on the first record (groupcompress-block) not
 
1747
                # on all of the (groupcompress-block-ref) entries.
 
1748
                # The reuse_this_block flag is then kept for as long as
 
1749
                if record.storage_kind == 'groupcompress-block':
 
1750
                    # Check to see if we really want to re-use this block
 
1751
                    insert_manager = record._manager
 
1752
                    reuse_this_block = insert_manager.check_is_well_utilized()
 
1753
            else:
 
1754
                reuse_this_block = False
 
1755
            if reuse_this_block:
 
1756
                # We still want to reuse this block
1462
1757
                if record.storage_kind == 'groupcompress-block':
1463
1758
                    # Insert the raw block into the target repo
1464
1759
                    insert_manager = record._manager
1465
 
                    insert_manager._check_rebuild_block()
1466
1760
                    bytes = record._manager._block.to_bytes()
1467
1761
                    _, start, length = self._access.add_raw_records(
1468
1762
                        [(None, len(bytes))], bytes)[0]
1473
1767
                                           'groupcompress-block-ref'):
1474
1768
                    if insert_manager is None:
1475
1769
                        raise AssertionError('No insert_manager set')
 
1770
                    if insert_manager is not record._manager:
 
1771
                        raise AssertionError('insert_manager does not match'
 
1772
                            ' the current record, we cannot be positive'
 
1773
                            ' that the appropriate content was inserted.'
 
1774
                            )
1476
1775
                    value = "%d %d %d %d" % (block_start, block_length,
1477
1776
                                             record._start, record._end)
1478
1777
                    nodes = [(record.key, value, (record.parents,))]
1528
1827
                key = record.key
1529
1828
            self._unadded_refs[key] = record.parents
1530
1829
            yield found_sha1
1531
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
1532
 
                (record.parents,)))
 
1830
            as_st = static_tuple.StaticTuple.from_sequence
 
1831
            if record.parents is not None:
 
1832
                parents = as_st([as_st(p) for p in record.parents])
 
1833
            else:
 
1834
                parents = None
 
1835
            refs = static_tuple.StaticTuple(parents)
 
1836
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1533
1837
        if len(keys_to_add):
1534
1838
            flush()
1535
1839
        self._compressor = None
1578
1882
        """See VersionedFiles.keys."""
1579
1883
        if 'evil' in debug.debug_flags:
1580
1884
            trace.mutter_callsite(2, "keys scales with size of history")
1581
 
        sources = [self._index] + self._fallback_vfs
 
1885
        sources = [self._index] + self._immediate_fallback_vfs
1582
1886
        result = set()
1583
1887
        for source in sources:
1584
1888
            result.update(source.keys())
1585
1889
        return result
1586
1890
 
1587
1891
 
 
1892
class _GCBuildDetails(object):
 
1893
    """A blob of data about the build details.
 
1894
 
 
1895
    This stores the minimal data, which then allows compatibility with the old
 
1896
    api, without taking as much memory.
 
1897
    """
 
1898
 
 
1899
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1900
                 '_delta_end', '_parents')
 
1901
 
 
1902
    method = 'group'
 
1903
    compression_parent = None
 
1904
 
 
1905
    def __init__(self, parents, position_info):
 
1906
        self._parents = parents
 
1907
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1908
         self._delta_end) = position_info
 
1909
 
 
1910
    def __repr__(self):
 
1911
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1912
            self.index_memo, self._parents)
 
1913
 
 
1914
    @property
 
1915
    def index_memo(self):
 
1916
        return (self._index, self._group_start, self._group_end,
 
1917
                self._basis_end, self._delta_end)
 
1918
 
 
1919
    @property
 
1920
    def record_details(self):
 
1921
        return static_tuple.StaticTuple(self.method, None)
 
1922
 
 
1923
    def __getitem__(self, offset):
 
1924
        """Compatibility thunk to act like a tuple."""
 
1925
        if offset == 0:
 
1926
            return self.index_memo
 
1927
        elif offset == 1:
 
1928
            return self.compression_parent # Always None
 
1929
        elif offset == 2:
 
1930
            return self._parents
 
1931
        elif offset == 3:
 
1932
            return self.record_details
 
1933
        else:
 
1934
            raise IndexError('offset out of range')
 
1935
            
 
1936
    def __len__(self):
 
1937
        return 4
 
1938
 
 
1939
 
1588
1940
class _GCGraphIndex(object):
1589
1941
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1590
1942
 
1591
1943
    def __init__(self, graph_index, is_locked, parents=True,
1592
1944
        add_callback=None, track_external_parent_refs=False,
1593
 
        inconsistency_fatal=True):
 
1945
        inconsistency_fatal=True, track_new_keys=False):
1594
1946
        """Construct a _GCGraphIndex on a graph_index.
1595
1947
 
1596
1948
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1615
1967
        self.has_graph = parents
1616
1968
        self._is_locked = is_locked
1617
1969
        self._inconsistency_fatal = inconsistency_fatal
 
1970
        # GroupCompress records tend to have the same 'group' start + offset
 
1971
        # repeated over and over, this creates a surplus of ints
 
1972
        self._int_cache = {}
1618
1973
        if track_external_parent_refs:
1619
 
            self._key_dependencies = knit._KeyRefs()
 
1974
            self._key_dependencies = _KeyRefs(
 
1975
                track_new_keys=track_new_keys)
1620
1976
        else:
1621
1977
            self._key_dependencies = None
1622
1978
 
1655
2011
        if not random_id:
1656
2012
            present_nodes = self._get_entries(keys)
1657
2013
            for (index, key, value, node_refs) in present_nodes:
1658
 
                if node_refs != keys[key][1]:
1659
 
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
 
2014
                # Sometimes these are passed as a list rather than a tuple
 
2015
                node_refs = static_tuple.as_tuples(node_refs)
 
2016
                passed = static_tuple.as_tuples(keys[key])
 
2017
                if node_refs != passed[1]:
 
2018
                    details = '%s %s %s' % (key, (value, node_refs), passed)
1660
2019
                    if self._inconsistency_fatal:
1661
2020
                        raise errors.KnitCorrupt(self, "inconsistent details"
1662
2021
                                                 " in add_records: %s" %
1676
2035
                    result.append((key, value))
1677
2036
            records = result
1678
2037
        key_dependencies = self._key_dependencies
1679
 
        if key_dependencies is not None and self._parents:
1680
 
            for key, value, refs in records:
1681
 
                parents = refs[0]
1682
 
                key_dependencies.add_references(key, parents)
 
2038
        if key_dependencies is not None:
 
2039
            if self._parents:
 
2040
                for key, value, refs in records:
 
2041
                    parents = refs[0]
 
2042
                    key_dependencies.add_references(key, parents)
 
2043
            else:
 
2044
                for key, value, refs in records:
 
2045
                    new_keys.add_key(key)
1683
2046
        self._add_callback(records)
1684
2047
 
1685
2048
    def _check_read(self):
1716
2079
            if missing_keys:
1717
2080
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
1718
2081
 
 
2082
    def find_ancestry(self, keys):
 
2083
        """See CombinedGraphIndex.find_ancestry"""
 
2084
        return self._graph_index.find_ancestry(keys, 0)
 
2085
 
1719
2086
    def get_parent_map(self, keys):
1720
2087
        """Get a map of the parents of keys.
1721
2088
 
1738
2105
        """Return the keys of missing parents."""
1739
2106
        # Copied from _KnitGraphIndex.get_missing_parents
1740
2107
        # We may have false positives, so filter those out.
1741
 
        self._key_dependencies.add_keys(
 
2108
        self._key_dependencies.satisfy_refs_for_keys(
1742
2109
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
1743
2110
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
1744
2111
 
1750
2117
        :param keys: An iterable of keys.
1751
2118
        :return: A dict of key:
1752
2119
            (index_memo, compression_parent, parents, record_details).
1753
 
            index_memo
1754
 
                opaque structure to pass to read_records to extract the raw
1755
 
                data
1756
 
            compression_parent
1757
 
                Content that this record is built upon, may be None
1758
 
            parents
1759
 
                Logical parents of this node
1760
 
            record_details
1761
 
                extra information about the content which needs to be passed to
1762
 
                Factory.parse_record
 
2120
 
 
2121
            * index_memo: opaque structure to pass to read_records to extract
 
2122
              the raw data
 
2123
            * compression_parent: Content that this record is built upon, may
 
2124
              be None
 
2125
            * parents: Logical parents of this node
 
2126
            * record_details: extra information about the content which needs
 
2127
              to be passed to Factory.parse_record
1763
2128
        """
1764
2129
        self._check_read()
1765
2130
        result = {}
1770
2135
                parents = None
1771
2136
            else:
1772
2137
                parents = entry[3][0]
1773
 
            method = 'group'
1774
 
            result[key] = (self._node_to_position(entry),
1775
 
                                  None, parents, (method, None))
 
2138
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2139
            result[key] = details
1776
2140
        return result
1777
2141
 
1778
2142
    def keys(self):
1787
2151
        """Convert an index value to position details."""
1788
2152
        bits = node[2].split(' ')
1789
2153
        # It would be nice not to read the entire gzip.
 
2154
        # start and stop are put into _int_cache because they are very common.
 
2155
        # They define the 'group' that an entry is in, and many groups can have
 
2156
        # thousands of objects.
 
2157
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
 
2158
        # each, or about 7MB. Note that it might be even more when you consider
 
2159
        # how PyInt is allocated in separate slabs. And you can't return a slab
 
2160
        # to the OS if even 1 int on it is in use. Note though that Python uses
 
2161
        # a LIFO when re-using PyInt slots, which might cause more
 
2162
        # fragmentation.
1790
2163
        start = int(bits[0])
 
2164
        start = self._int_cache.setdefault(start, start)
1791
2165
        stop = int(bits[1])
 
2166
        stop = self._int_cache.setdefault(stop, stop)
1792
2167
        basis_end = int(bits[2])
1793
2168
        delta_end = int(bits[3])
1794
 
        return node[0], start, stop, basis_end, delta_end
 
2169
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
 
2170
        # instance...
 
2171
        return (node[0], start, stop, basis_end, delta_end)
1795
2172
 
1796
2173
    def scan_unvalidated_index(self, graph_index):
1797
2174
        """Inform this _GCGraphIndex that there is an unvalidated index.
1798
2175
 
1799
2176
        This allows this _GCGraphIndex to keep track of any missing
1800
2177
        compression parents we may want to have filled in to make those
1801
 
        indices valid.
 
2178
        indices valid.  It also allows _GCGraphIndex to track any new keys.
1802
2179
 
1803
2180
        :param graph_index: A GraphIndex
1804
2181
        """
1805
 
        if self._key_dependencies is not None:
1806
 
            # Add parent refs from graph_index (and discard parent refs that
1807
 
            # the graph_index has).
1808
 
            add_refs = self._key_dependencies.add_references
1809
 
            for node in graph_index.iter_all_entries():
1810
 
                add_refs(node[1], node[3][0])
1811
 
 
 
2182
        key_dependencies = self._key_dependencies
 
2183
        if key_dependencies is None:
 
2184
            return
 
2185
        for node in graph_index.iter_all_entries():
 
2186
            # Add parent refs from graph_index (and discard parent refs
 
2187
            # that the graph_index has).
 
2188
            key_dependencies.add_references(node[1], node[3][0])
1812
2189
 
1813
2190
 
1814
2191
from bzrlib._groupcompress_py import (
1828
2205
        decode_base128_int,
1829
2206
        )
1830
2207
    GroupCompressor = PyrexGroupCompressor
1831
 
except ImportError:
 
2208
except ImportError, e:
 
2209
    osutils.failed_to_load_extension(e)
1832
2210
    GroupCompressor = PythonGroupCompressor
1833
2211