~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-06-22 17:11:20 UTC
  • mfrom: (4398.8.10 1.16-commit-fulltext)
  • Revision ID: pqm@pqm.ubuntu.com-20090622171120-fuxez9ylfqpxynqn
(jam) Add VF._add_text and reduce memory overhead during commit (see
        bug #109114)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from itertools import izip
 
20
from cStringIO import StringIO
19
21
import time
20
22
import zlib
21
23
try:
23
25
except ImportError:
24
26
    pylzma = None
25
27
 
26
 
from bzrlib.lazy_import import lazy_import
27
 
lazy_import(globals(), """
28
28
from bzrlib import (
29
29
    annotate,
30
 
    config,
31
30
    debug,
 
31
    diff,
32
32
    errors,
33
33
    graph as _mod_graph,
 
34
    knit,
34
35
    osutils,
35
36
    pack,
36
 
    static_tuple,
 
37
    patiencediff,
37
38
    trace,
38
 
    tsort,
39
39
    )
40
 
 
41
 
from bzrlib.repofmt import pack_repo
42
 
""")
43
 
 
 
40
from bzrlib.graph import Graph
44
41
from bzrlib.btree_index import BTreeBuilder
45
42
from bzrlib.lru_cache import LRUSizeCache
 
43
from bzrlib.tsort import topo_sort
46
44
from bzrlib.versionedfile import (
47
 
    _KeyRefs,
48
45
    adapter_registry,
49
46
    AbsentContentFactory,
50
47
    ChunkedContentFactory,
51
48
    FulltextContentFactory,
52
 
    VersionedFilesWithFallbacks,
 
49
    VersionedFiles,
53
50
    )
54
51
 
55
 
# Minimum number of uncompressed bytes to try fetch at once when retrieving
56
 
# groupcompress blocks.
57
 
BATCH_SIZE = 2**16
58
 
 
59
52
_USE_LZMA = False and (pylzma is not None)
60
53
 
61
54
# osutils.sha_string('')
62
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
63
56
 
 
57
 
64
58
def sort_gc_optimal(parent_map):
65
59
    """Sort and group the keys in parent_map into groupcompress order.
66
60
 
72
66
    # groupcompress ordering is approximately reverse topological,
73
67
    # properly grouped by file-id.
74
68
    per_prefix_map = {}
75
 
    for key, value in parent_map.iteritems():
 
69
    for item in parent_map.iteritems():
 
70
        key = item[0]
76
71
        if isinstance(key, str) or len(key) == 1:
77
72
            prefix = ''
78
73
        else:
79
74
            prefix = key[0]
80
75
        try:
81
 
            per_prefix_map[prefix][key] = value
 
76
            per_prefix_map[prefix].append(item)
82
77
        except KeyError:
83
 
            per_prefix_map[prefix] = {key: value}
 
78
            per_prefix_map[prefix] = [item]
84
79
 
85
80
    present_keys = []
86
81
    for prefix in sorted(per_prefix_map):
87
 
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
82
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
88
83
    return present_keys
89
84
 
90
85
 
108
103
    def __init__(self):
109
104
        # map by key? or just order in file?
110
105
        self._compressor_name = None
111
 
        self._z_content_chunks = None
 
106
        self._z_content = None
112
107
        self._z_content_decompressor = None
113
108
        self._z_content_length = None
114
109
        self._content_length = None
115
110
        self._content = None
116
 
        self._content_chunks = None
117
111
 
118
112
    def __len__(self):
119
113
        # This is the maximum number of bytes this object will reference if
127
121
        :param num_bytes: Ensure that we have extracted at least num_bytes of
128
122
            content. If None, consume everything
129
123
        """
130
 
        if self._content_length is None:
131
 
            raise AssertionError('self._content_length should never be None')
 
124
        # TODO: If we re-use the same content block at different times during
 
125
        #       get_record_stream(), it is possible that the first pass will
 
126
        #       get inserted, triggering an extract/_ensure_content() which
 
127
        #       will get rid of _z_content. And then the next use of the block
 
128
        #       will try to access _z_content (to send it over the wire), and
 
129
        #       fail because it is already extracted. Consider never releasing
 
130
        #       _z_content because of this.
132
131
        if num_bytes is None:
133
132
            num_bytes = self._content_length
134
133
        elif (self._content_length is not None
138
137
                % (num_bytes, self._content_length))
139
138
        # Expand the content if required
140
139
        if self._content is None:
141
 
            if self._content_chunks is not None:
142
 
                self._content = ''.join(self._content_chunks)
143
 
                self._content_chunks = None
144
 
        if self._content is None:
145
 
            # We join self._z_content_chunks here, because if we are
146
 
            # decompressing, then it is *very* likely that we have a single
147
 
            # chunk
148
 
            if self._z_content_chunks is None:
 
140
            if self._z_content is None:
149
141
                raise AssertionError('No content to decompress')
150
 
            z_content = ''.join(self._z_content_chunks)
151
 
            if z_content == '':
 
142
            if self._z_content == '':
152
143
                self._content = ''
153
144
            elif self._compressor_name == 'lzma':
154
145
                # We don't do partial lzma decomp yet
155
 
                self._content = pylzma.decompress(z_content)
 
146
                self._content = pylzma.decompress(self._z_content)
156
147
            elif self._compressor_name == 'zlib':
157
148
                # Start a zlib decompressor
158
 
                if num_bytes * 4 > self._content_length * 3:
159
 
                    # If we are requesting more that 3/4ths of the content,
160
 
                    # just extract the whole thing in a single pass
161
 
                    num_bytes = self._content_length
162
 
                    self._content = zlib.decompress(z_content)
 
149
                if num_bytes is None:
 
150
                    self._content = zlib.decompress(self._z_content)
163
151
                else:
164
152
                    self._z_content_decompressor = zlib.decompressobj()
165
153
                    # Seed the decompressor with the uncompressed bytes, so
166
154
                    # that the rest of the code is simplified
167
155
                    self._content = self._z_content_decompressor.decompress(
168
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
 
                    if not self._z_content_decompressor.unconsumed_tail:
170
 
                        self._z_content_decompressor = None
 
156
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
171
157
            else:
172
158
                raise AssertionError('Unknown compressor: %r'
173
159
                                     % self._compressor_name)
175
161
        # 'unconsumed_tail'
176
162
 
177
163
        # Do we have enough bytes already?
178
 
        if len(self._content) >= num_bytes:
 
164
        if num_bytes is not None and len(self._content) >= num_bytes:
 
165
            return
 
166
        if num_bytes is None and self._z_content_decompressor is None:
 
167
            # We must have already decompressed everything
179
168
            return
180
169
        # If we got this far, and don't have a decompressor, something is wrong
181
170
        if self._z_content_decompressor is None:
182
171
            raise AssertionError(
183
172
                'No decompressor to decompress %d bytes' % num_bytes)
184
173
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
185
 
        if not remaining_decomp:
186
 
            raise AssertionError('Nothing left to decompress')
187
 
        needed_bytes = num_bytes - len(self._content)
188
 
        # We always set max_size to 32kB over the minimum needed, so that
189
 
        # zlib will give us as much as we really want.
190
 
        # TODO: If this isn't good enough, we could make a loop here,
191
 
        #       that keeps expanding the request until we get enough
192
 
        self._content += self._z_content_decompressor.decompress(
193
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
194
 
        if len(self._content) < num_bytes:
195
 
            raise AssertionError('%d bytes wanted, only %d available'
196
 
                                 % (num_bytes, len(self._content)))
197
 
        if not self._z_content_decompressor.unconsumed_tail:
198
 
            # The stream is finished
199
 
            self._z_content_decompressor = None
 
174
        if num_bytes is None:
 
175
            if remaining_decomp:
 
176
                # We don't know how much is left, but we'll decompress it all
 
177
                self._content += self._z_content_decompressor.decompress(
 
178
                    remaining_decomp)
 
179
                # Note: There's what I consider a bug in zlib.decompressobj
 
180
                #       If you pass back in the entire unconsumed_tail, only
 
181
                #       this time you don't pass a max-size, it doesn't
 
182
                #       change the unconsumed_tail back to None/''.
 
183
                #       However, we know we are done with the whole stream
 
184
                self._z_content_decompressor = None
 
185
            # XXX: Why is this the only place in this routine we set this?
 
186
            self._content_length = len(self._content)
 
187
        else:
 
188
            if not remaining_decomp:
 
189
                raise AssertionError('Nothing left to decompress')
 
190
            needed_bytes = num_bytes - len(self._content)
 
191
            # We always set max_size to 32kB over the minimum needed, so that
 
192
            # zlib will give us as much as we really want.
 
193
            # TODO: If this isn't good enough, we could make a loop here,
 
194
            #       that keeps expanding the request until we get enough
 
195
            self._content += self._z_content_decompressor.decompress(
 
196
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
197
            if len(self._content) < num_bytes:
 
198
                raise AssertionError('%d bytes wanted, only %d available'
 
199
                                     % (num_bytes, len(self._content)))
 
200
            if not self._z_content_decompressor.unconsumed_tail:
 
201
                # The stream is finished
 
202
                self._z_content_decompressor = None
200
203
 
201
204
    def _parse_bytes(self, bytes, pos):
202
205
        """Read the various lengths from the header.
218
221
            # XXX: Define some GCCorrupt error ?
219
222
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
220
223
                                 (len(bytes), pos, self._z_content_length))
221
 
        self._z_content_chunks = (bytes[pos:],)
222
 
 
223
 
    @property
224
 
    def _z_content(self):
225
 
        """Return z_content_chunks as a simple string.
226
 
 
227
 
        Meant only to be used by the test suite.
228
 
        """
229
 
        if self._z_content_chunks is not None:
230
 
            return ''.join(self._z_content_chunks)
231
 
        return None
 
224
        self._z_content = bytes[pos:]
232
225
 
233
226
    @classmethod
234
227
    def from_bytes(cls, bytes):
280
273
            bytes = apply_delta_to_source(self._content, content_start, end)
281
274
        return bytes
282
275
 
283
 
    def set_chunked_content(self, content_chunks, length):
284
 
        """Set the content of this block to the given chunks."""
285
 
        # If we have lots of short lines, it is may be more efficient to join
286
 
        # the content ahead of time. If the content is <10MiB, we don't really
287
 
        # care about the extra memory consumption, so we can just pack it and
288
 
        # be done. However, timing showed 18s => 17.9s for repacking 1k revs of
289
 
        # mysql, which is below the noise margin
290
 
        self._content_length = length
291
 
        self._content_chunks = content_chunks
292
 
        self._content = None
293
 
        self._z_content_chunks = None
294
 
 
295
276
    def set_content(self, content):
296
277
        """Set the content of this block."""
297
278
        self._content_length = len(content)
298
279
        self._content = content
299
 
        self._z_content_chunks = None
300
 
 
301
 
    def _create_z_content_using_lzma(self):
302
 
        if self._content_chunks is not None:
303
 
            self._content = ''.join(self._content_chunks)
304
 
            self._content_chunks = None
305
 
        if self._content is None:
306
 
            raise AssertionError('Nothing to compress')
307
 
        z_content = pylzma.compress(self._content)
308
 
        self._z_content_chunks = (z_content,)
309
 
        self._z_content_length = len(z_content)
310
 
 
311
 
    def _create_z_content_from_chunks(self, chunks):
312
 
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
 
        # (measured peak is maybe 30MB over the above...)
315
 
        compressed_chunks = map(compressor.compress, chunks)
316
 
        compressed_chunks.append(compressor.flush())
317
 
        # Ignore empty chunks
318
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
319
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
320
 
 
321
 
    def _create_z_content(self):
322
 
        if self._z_content_chunks is not None:
323
 
            return
 
280
        self._z_content = None
 
281
 
 
282
    def to_bytes(self):
 
283
        """Encode the information into a byte stream."""
 
284
        compress = zlib.compress
324
285
        if _USE_LZMA:
325
 
            self._create_z_content_using_lzma()
326
 
            return
327
 
        if self._content_chunks is not None:
328
 
            chunks = self._content_chunks
329
 
        else:
330
 
            chunks = (self._content,)
331
 
        self._create_z_content_from_chunks(chunks)
332
 
 
333
 
    def to_chunks(self):
334
 
        """Create the byte stream as a series of 'chunks'"""
335
 
        self._create_z_content()
 
286
            compress = pylzma.compress
 
287
        if self._z_content is None:
 
288
            if self._content is None:
 
289
                raise AssertionError('Nothing to compress')
 
290
            self._z_content = compress(self._content)
 
291
            self._z_content_length = len(self._z_content)
336
292
        if _USE_LZMA:
337
293
            header = self.GCB_LZ_HEADER
338
294
        else:
339
295
            header = self.GCB_HEADER
340
 
        chunks = ['%s%d\n%d\n'
341
 
                  % (header, self._z_content_length, self._content_length),
 
296
        chunks = [header,
 
297
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
298
                  self._z_content,
342
299
                 ]
343
 
        chunks.extend(self._z_content_chunks)
344
 
        total_len = sum(map(len, chunks))
345
 
        return total_len, chunks
346
 
 
347
 
    def to_bytes(self):
348
 
        """Encode the information into a byte stream."""
349
 
        total_len, chunks = self.to_chunks()
350
300
        return ''.join(chunks)
351
301
 
352
302
    def _dump(self, include_text=False):
466
416
                # Grab and cache the raw bytes for this entry
467
417
                # and break the ref-cycle with _manager since we don't need it
468
418
                # anymore
469
 
                try:
470
 
                    self._manager._prepare_for_extract()
471
 
                except zlib.error as value:
472
 
                    raise errors.DecompressCorruption("zlib: " + str(value))
 
419
                self._manager._prepare_for_extract()
473
420
                block = self._manager._block
474
421
                self._bytes = block.extract(self.key, self._start, self._end)
475
422
                # There are code paths that first extract as fulltext, and then
476
423
                # extract as storage_kind (smart fetch). So we don't break the
477
424
                # refcycle here, but instead in manager.get_record_stream()
 
425
                # self._manager = None
478
426
            if storage_kind == 'fulltext':
479
427
                return self._bytes
480
428
            else:
486
434
class _LazyGroupContentManager(object):
487
435
    """This manages a group of _LazyGroupCompressFactory objects."""
488
436
 
489
 
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
490
 
                             # current size, and still be considered
491
 
                             # resuable
492
 
    _full_block_size = 4*1024*1024
493
 
    _full_mixed_block_size = 2*1024*1024
494
 
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
 
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
496
 
 
497
 
    def __init__(self, block, get_compressor_settings=None):
 
437
    def __init__(self, block):
498
438
        self._block = block
499
439
        # We need to preserve the ordering
500
440
        self._factories = []
501
441
        self._last_byte = 0
502
 
        self._get_settings = get_compressor_settings
503
 
        self._compressor_settings = None
504
 
 
505
 
    def _get_compressor_settings(self):
506
 
        if self._compressor_settings is not None:
507
 
            return self._compressor_settings
508
 
        settings = None
509
 
        if self._get_settings is not None:
510
 
            settings = self._get_settings()
511
 
        if settings is None:
512
 
            vf = GroupCompressVersionedFiles
513
 
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
 
        self._compressor_settings = settings
515
 
        return self._compressor_settings
516
442
 
517
443
    def add_factory(self, key, parents, start, end):
518
444
        if not self._factories:
551
477
        new_block.set_content(self._block._content[:last_byte])
552
478
        self._block = new_block
553
479
 
554
 
    def _make_group_compressor(self):
555
 
        return GroupCompressor(self._get_compressor_settings())
556
 
 
557
480
    def _rebuild_block(self):
558
481
        """Create a new GroupCompressBlock with only the referenced texts."""
559
 
        compressor = self._make_group_compressor()
 
482
        compressor = GroupCompressor()
560
483
        tstart = time.time()
561
484
        old_length = self._block._content_length
562
485
        end_point = 0
574
497
        #       block? It seems hard to come up with a method that it would
575
498
        #       expand, since we do full compression again. Perhaps based on a
576
499
        #       request that ends up poorly ordered?
577
 
        # TODO: If the content would have expanded, then we would want to
578
 
        #       handle a case where we need to split the block.
579
 
        #       Now that we have a user-tweakable option
580
 
        #       (max_bytes_to_index), it is possible that one person set it
581
 
        #       to a very low value, causing poor compression.
582
500
        delta = time.time() - tstart
583
501
        self._block = new_block
584
502
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
593
511
        # time (self._block._content) is a little expensive.
594
512
        self._block._ensure_content(self._last_byte)
595
513
 
596
 
    def _check_rebuild_action(self):
 
514
    def _check_rebuild_block(self):
597
515
        """Check to see if our block should be repacked."""
598
516
        total_bytes_used = 0
599
517
        last_byte_used = 0
600
518
        for factory in self._factories:
601
519
            total_bytes_used += factory._end - factory._start
602
 
            if last_byte_used < factory._end:
603
 
                last_byte_used = factory._end
604
 
        # If we are using more than half of the bytes from the block, we have
605
 
        # nothing else to check
 
520
            last_byte_used = max(last_byte_used, factory._end)
 
521
        # If we are using most of the bytes from the block, we have nothing
 
522
        # else to check (currently more that 1/2)
606
523
        if total_bytes_used * 2 >= self._block._content_length:
607
 
            return None, last_byte_used, total_bytes_used
608
 
        # We are using less than 50% of the content. Is the content we are
609
 
        # using at the beginning of the block? If so, we can just trim the
610
 
        # tail, rather than rebuilding from scratch.
 
524
            return
 
525
        # Can we just strip off the trailing bytes? If we are going to be
 
526
        # transmitting more than 50% of the front of the content, go ahead
611
527
        if total_bytes_used * 2 > last_byte_used:
612
 
            return 'trim', last_byte_used, total_bytes_used
 
528
            self._trim_block(last_byte_used)
 
529
            return
613
530
 
614
531
        # We are using a small amount of the data, and it isn't just packed
615
532
        # nicely at the front, so rebuild the content.
622
539
        #       expanding many deltas into fulltexts, as well.
623
540
        #       If we build a cheap enough 'strip', then we could try a strip,
624
541
        #       if that expands the content, we then rebuild.
625
 
        return 'rebuild', last_byte_used, total_bytes_used
626
 
 
627
 
    def check_is_well_utilized(self):
628
 
        """Is the current block considered 'well utilized'?
629
 
 
630
 
        This heuristic asks if the current block considers itself to be a fully
631
 
        developed group, rather than just a loose collection of data.
632
 
        """
633
 
        if len(self._factories) == 1:
634
 
            # A block of length 1 could be improved by combining with other
635
 
            # groups - don't look deeper. Even larger than max size groups
636
 
            # could compress well with adjacent versions of the same thing.
637
 
            return False
638
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
639
 
        block_size = self._block._content_length
640
 
        if total_bytes_used < block_size * self._max_cut_fraction:
641
 
            # This block wants to trim itself small enough that we want to
642
 
            # consider it under-utilized.
643
 
            return False
644
 
        # TODO: This code is meant to be the twin of _insert_record_stream's
645
 
        #       'start_new_block' logic. It would probably be better to factor
646
 
        #       out that logic into a shared location, so that it stays
647
 
        #       together better
648
 
        # We currently assume a block is properly utilized whenever it is >75%
649
 
        # of the size of a 'full' block. In normal operation, a block is
650
 
        # considered full when it hits 4MB of same-file content. So any block
651
 
        # >3MB is 'full enough'.
652
 
        # The only time this isn't true is when a given block has large-object
653
 
        # content. (a single file >4MB, etc.)
654
 
        # Under these circumstances, we allow a block to grow to
655
 
        # 2 x largest_content.  Which means that if a given block had a large
656
 
        # object, it may actually be under-utilized. However, given that this
657
 
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
658
 
        # content blobs on-the-fly. Note that because we return False for all
659
 
        # 1-item blobs, we will repack them; we may wish to reevaluate our
660
 
        # treatment of large object blobs in the future.
661
 
        if block_size >= self._full_enough_block_size:
662
 
            return True
663
 
        # If a block is <3MB, it still may be considered 'full' if it contains
664
 
        # mixed content. The current rule is 2MB of mixed content is considered
665
 
        # full. So check to see if this block contains mixed content, and
666
 
        # set the threshold appropriately.
667
 
        common_prefix = None
668
 
        for factory in self._factories:
669
 
            prefix = factory.key[:-1]
670
 
            if common_prefix is None:
671
 
                common_prefix = prefix
672
 
            elif prefix != common_prefix:
673
 
                # Mixed content, check the size appropriately
674
 
                if block_size >= self._full_enough_mixed_block_size:
675
 
                    return True
676
 
                break
677
 
        # The content failed both the mixed check and the single-content check
678
 
        # so obviously it is not fully utilized
679
 
        # TODO: there is one other constraint that isn't being checked
680
 
        #       namely, that the entries in the block are in the appropriate
681
 
        #       order. For example, you could insert the entries in exactly
682
 
        #       reverse groupcompress order, and we would think that is ok.
683
 
        #       (all the right objects are in one group, and it is fully
684
 
        #       utilized, etc.) For now, we assume that case is rare,
685
 
        #       especially since we should always fetch in 'groupcompress'
686
 
        #       order.
687
 
        return False
688
 
 
689
 
    def _check_rebuild_block(self):
690
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
691
 
        if action is None:
692
 
            return
693
 
        if action == 'trim':
694
 
            self._trim_block(last_byte_used)
695
 
        elif action == 'rebuild':
696
 
            self._rebuild_block()
697
 
        else:
698
 
            raise ValueError('unknown rebuild action: %r' % (action,))
 
542
        self._rebuild_block()
699
543
 
700
544
    def _wire_bytes(self):
701
545
        """Return a byte stream suitable for transmitting over the wire."""
735
579
        z_header_bytes = zlib.compress(header_bytes)
736
580
        del header_bytes
737
581
        z_header_bytes_len = len(z_header_bytes)
738
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
582
        block_bytes = self._block.to_bytes()
739
583
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
740
 
                                       block_bytes_len))
 
584
                                       len(block_bytes)))
741
585
        lines.append(z_header_bytes)
742
 
        lines.extend(block_chunks)
743
 
        del z_header_bytes, block_chunks
744
 
        # TODO: This is a point where we will double the memory consumption. To
745
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
586
        lines.append(block_bytes)
 
587
        del z_header_bytes, block_bytes
746
588
        return ''.join(lines)
747
589
 
748
590
    @classmethod
749
591
    def from_bytes(cls, bytes):
750
592
        # TODO: This does extra string copying, probably better to do it a
751
 
        #       different way. At a minimum this creates 2 copies of the
752
 
        #       compressed content
 
593
        #       different way
753
594
        (storage_kind, z_header_len, header_len,
754
595
         block_len, rest) = bytes.split('\n', 4)
755
596
        del bytes
807
648
 
808
649
class _CommonGroupCompressor(object):
809
650
 
810
 
    def __init__(self, settings=None):
 
651
    def __init__(self):
811
652
        """Create a GroupCompressor."""
812
653
        self.chunks = []
813
654
        self._last = None
816
657
        self.labels_deltas = {}
817
658
        self._delta_index = None # Set by the children
818
659
        self._block = GroupCompressBlock()
819
 
        if settings is None:
820
 
            self._settings = {}
821
 
        else:
822
 
            self._settings = settings
823
660
 
824
661
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
825
662
        """Compress lines with label key.
917
754
 
918
755
        After calling this, the compressor should no longer be used
919
756
        """
920
 
        self._block.set_chunked_content(self.chunks, self.endpoint)
 
757
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
758
        #       group. This has an impact for 'commit' of large objects.
 
759
        #       One possibility is to use self._content_chunks, and be lazy and
 
760
        #       only fill out self._content as a full string when we actually
 
761
        #       need it. That would at least drop the peak memory consumption
 
762
        #       for 'commit' down to ~1x the size of the largest file, at a
 
763
        #       cost of increased complexity within this code. 2x is still <<
 
764
        #       3x the size of the largest file, so we are doing ok.
 
765
        content = ''.join(self.chunks)
921
766
        self.chunks = None
922
767
        self._delta_index = None
 
768
        self._block.set_content(content)
923
769
        return self._block
924
770
 
925
771
    def pop_last(self):
940
786
 
941
787
class PythonGroupCompressor(_CommonGroupCompressor):
942
788
 
943
 
    def __init__(self, settings=None):
 
789
    def __init__(self):
944
790
        """Create a GroupCompressor.
945
791
 
946
792
        Used only if the pyrex version is not available.
947
793
        """
948
 
        super(PythonGroupCompressor, self).__init__(settings)
 
794
        super(PythonGroupCompressor, self).__init__()
949
795
        self._delta_index = LinesDeltaIndex([])
950
796
        # The actual content is managed by LinesDeltaIndex
951
797
        self.chunks = self._delta_index.lines
988
834
 
989
835
    It contains code very similar to SequenceMatcher because of having a similar
990
836
    task. However some key differences apply:
991
 
 
992
 
    * there is no junk, we want a minimal edit not a human readable diff.
993
 
    * we don't filter very common lines (because we don't know where a good
994
 
      range will start, and after the first text we want to be emitting minmal
995
 
      edits only.
996
 
    * we chain the left side, not the right side
997
 
    * we incrementally update the adjacency matrix as new lines are provided.
998
 
    * we look for matches in all of the left side, so the routine which does
999
 
      the analagous task of find_longest_match does not need to filter on the
1000
 
      left side.
 
837
     - there is no junk, we want a minimal edit not a human readable diff.
 
838
     - we don't filter very common lines (because we don't know where a good
 
839
       range will start, and after the first text we want to be emitting minmal
 
840
       edits only.
 
841
     - we chain the left side, not the right side
 
842
     - we incrementally update the adjacency matrix as new lines are provided.
 
843
     - we look for matches in all of the left side, so the routine which does
 
844
       the analagous task of find_longest_match does not need to filter on the
 
845
       left side.
1001
846
    """
1002
847
 
1003
 
    def __init__(self, settings=None):
1004
 
        super(PyrexGroupCompressor, self).__init__(settings)
1005
 
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
 
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
848
    def __init__(self):
 
849
        super(PyrexGroupCompressor, self).__init__()
 
850
        self._delta_index = DeltaIndex()
1007
851
 
1008
852
    def _compress(self, key, bytes, max_delta_size, soft=False):
1009
853
        """see _CommonGroupCompressor._compress"""
1061
905
        self.endpoint = endpoint
1062
906
 
1063
907
 
1064
 
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
 
908
def make_pack_factory(graph, delta, keylength):
1065
909
    """Create a factory for creating a pack based groupcompress.
1066
910
 
1067
911
    This is only functional enough to run interface tests, it doesn't try to
1082
926
        writer = pack.ContainerWriter(stream.write)
1083
927
        writer.begin()
1084
928
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1085
 
            add_callback=graph_index.add_nodes,
1086
 
            inconsistency_fatal=inconsistency_fatal)
1087
 
        access = pack_repo._DirectPackAccess({})
 
929
            add_callback=graph_index.add_nodes)
 
930
        access = knit._DirectPackAccess({})
1088
931
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1089
932
        result = GroupCompressVersionedFiles(index, access, delta)
1090
933
        result.stream = stream
1098
941
    versioned_files.stream.close()
1099
942
 
1100
943
 
1101
 
class _BatchingBlockFetcher(object):
1102
 
    """Fetch group compress blocks in batches.
1103
 
 
1104
 
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1105
 
        currently pending batch.
1106
 
    """
1107
 
 
1108
 
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1109
 
        self.gcvf = gcvf
1110
 
        self.locations = locations
1111
 
        self.keys = []
1112
 
        self.batch_memos = {}
1113
 
        self.memos_to_get = []
1114
 
        self.total_bytes = 0
1115
 
        self.last_read_memo = None
1116
 
        self.manager = None
1117
 
        self._get_compressor_settings = get_compressor_settings
1118
 
 
1119
 
    def add_key(self, key):
1120
 
        """Add another to key to fetch.
1121
 
 
1122
 
        :return: The estimated number of bytes needed to fetch the batch so
1123
 
            far.
1124
 
        """
1125
 
        self.keys.append(key)
1126
 
        index_memo, _, _, _ = self.locations[key]
1127
 
        read_memo = index_memo[0:3]
1128
 
        # Three possibilities for this read_memo:
1129
 
        #  - it's already part of this batch; or
1130
 
        #  - it's not yet part of this batch, but is already cached; or
1131
 
        #  - it's not yet part of this batch and will need to be fetched.
1132
 
        if read_memo in self.batch_memos:
1133
 
            # This read memo is already in this batch.
1134
 
            return self.total_bytes
1135
 
        try:
1136
 
            cached_block = self.gcvf._group_cache[read_memo]
1137
 
        except KeyError:
1138
 
            # This read memo is new to this batch, and the data isn't cached
1139
 
            # either.
1140
 
            self.batch_memos[read_memo] = None
1141
 
            self.memos_to_get.append(read_memo)
1142
 
            byte_length = read_memo[2]
1143
 
            self.total_bytes += byte_length
1144
 
        else:
1145
 
            # This read memo is new to this batch, but cached.
1146
 
            # Keep a reference to the cached block in batch_memos because it's
1147
 
            # certain that we'll use it when this batch is processed, but
1148
 
            # there's a risk that it would fall out of _group_cache between now
1149
 
            # and then.
1150
 
            self.batch_memos[read_memo] = cached_block
1151
 
        return self.total_bytes
1152
 
 
1153
 
    def _flush_manager(self):
1154
 
        if self.manager is not None:
1155
 
            for factory in self.manager.get_record_stream():
1156
 
                yield factory
1157
 
            self.manager = None
1158
 
            self.last_read_memo = None
1159
 
 
1160
 
    def yield_factories(self, full_flush=False):
1161
 
        """Yield factories for keys added since the last yield.  They will be
1162
 
        returned in the order they were added via add_key.
1163
 
 
1164
 
        :param full_flush: by default, some results may not be returned in case
1165
 
            they can be part of the next batch.  If full_flush is True, then
1166
 
            all results are returned.
1167
 
        """
1168
 
        if self.manager is None and not self.keys:
1169
 
            return
1170
 
        # Fetch all memos in this batch.
1171
 
        blocks = self.gcvf._get_blocks(self.memos_to_get)
1172
 
        # Turn blocks into factories and yield them.
1173
 
        memos_to_get_stack = list(self.memos_to_get)
1174
 
        memos_to_get_stack.reverse()
1175
 
        for key in self.keys:
1176
 
            index_memo, _, parents, _ = self.locations[key]
1177
 
            read_memo = index_memo[:3]
1178
 
            if self.last_read_memo != read_memo:
1179
 
                # We are starting a new block. If we have a
1180
 
                # manager, we have found everything that fits for
1181
 
                # now, so yield records
1182
 
                for factory in self._flush_manager():
1183
 
                    yield factory
1184
 
                # Now start a new manager.
1185
 
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1186
 
                    # The next block from _get_blocks will be the block we
1187
 
                    # need.
1188
 
                    block_read_memo, block = blocks.next()
1189
 
                    if block_read_memo != read_memo:
1190
 
                        raise AssertionError(
1191
 
                            "block_read_memo out of sync with read_memo"
1192
 
                            "(%r != %r)" % (block_read_memo, read_memo))
1193
 
                    self.batch_memos[read_memo] = block
1194
 
                    memos_to_get_stack.pop()
1195
 
                else:
1196
 
                    block = self.batch_memos[read_memo]
1197
 
                self.manager = _LazyGroupContentManager(block,
1198
 
                    get_compressor_settings=self._get_compressor_settings)
1199
 
                self.last_read_memo = read_memo
1200
 
            start, end = index_memo[3:5]
1201
 
            self.manager.add_factory(key, parents, start, end)
1202
 
        if full_flush:
1203
 
            for factory in self._flush_manager():
1204
 
                yield factory
1205
 
        del self.keys[:]
1206
 
        self.batch_memos.clear()
1207
 
        del self.memos_to_get[:]
1208
 
        self.total_bytes = 0
1209
 
 
1210
 
 
1211
 
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
 
944
class GroupCompressVersionedFiles(VersionedFiles):
1212
945
    """A group-compress based VersionedFiles implementation."""
1213
946
 
1214
 
    # This controls how the GroupCompress DeltaIndex works. Basically, we
1215
 
    # compute hash pointers into the source blocks (so hash(text) => text).
1216
 
    # However each of these references costs some memory in trade against a
1217
 
    # more accurate match result. For very large files, they either are
1218
 
    # pre-compressed and change in bulk whenever they change, or change in just
1219
 
    # local blocks. Either way, 'improved resolution' is not very helpful,
1220
 
    # versus running out of memory trying to track everything. The default max
1221
 
    # gives 100% sampling of a 1MB file.
1222
 
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
 
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
 
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
1225
 
 
1226
 
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1227
 
                 _group_cache=None):
 
947
    def __init__(self, index, access, delta=True):
1228
948
        """Create a GroupCompressVersionedFiles object.
1229
949
 
1230
950
        :param index: The index object storing access and graph data.
1231
951
        :param access: The access object storing raw data.
1232
952
        :param delta: Whether to delta compress or just entropy compress.
1233
 
        :param _unadded_refs: private parameter, don't use.
1234
 
        :param _group_cache: private parameter, don't use.
1235
953
        """
1236
954
        self._index = index
1237
955
        self._access = access
1238
956
        self._delta = delta
1239
 
        if _unadded_refs is None:
1240
 
            _unadded_refs = {}
1241
 
        self._unadded_refs = _unadded_refs
1242
 
        if _group_cache is None:
1243
 
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
 
        self._group_cache = _group_cache
1245
 
        self._immediate_fallback_vfs = []
1246
 
        self._max_bytes_to_index = None
1247
 
 
1248
 
    def without_fallbacks(self):
1249
 
        """Return a clone of this object without any fallbacks configured."""
1250
 
        return GroupCompressVersionedFiles(self._index, self._access,
1251
 
            self._delta, _unadded_refs=dict(self._unadded_refs),
1252
 
            _group_cache=self._group_cache)
 
957
        self._unadded_refs = {}
 
958
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
 
959
        self._fallback_vfs = []
1253
960
 
1254
961
    def add_lines(self, key, parents, lines, parent_texts=None,
1255
962
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1259
966
        :param key: The key tuple of the text to add.
1260
967
        :param parents: The parents key tuples of the text to add.
1261
968
        :param lines: A list of lines. Each line must be a bytestring. And all
1262
 
            of them except the last must be terminated with \\n and contain no
1263
 
            other \\n's. The last line may either contain no \\n's or a single
1264
 
            terminating \\n. If the lines list does meet this constraint the
1265
 
            add routine may error or may succeed - but you will be unable to
1266
 
            read the data back accurately. (Checking the lines have been split
 
969
            of them except the last must be terminated with \n and contain no
 
970
            other \n's. The last line may either contain no \n's or a single
 
971
            terminating \n. If the lines list does meet this constraint the add
 
972
            routine may error or may succeed - but you will be unable to read
 
973
            the data back accurately. (Checking the lines have been split
1267
974
            correctly is expensive and extremely unlikely to catch bugs so it
1268
975
            is not done at runtime unless check_content is True.)
1269
976
        :param parent_texts: An optional dictionary containing the opaque
1302
1009
        return sha1, length, None
1303
1010
 
1304
1011
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1305
 
        """See VersionedFiles._add_text()."""
 
1012
        """See VersionedFiles.add_text()."""
1306
1013
        self._index._check_write_ok()
1307
1014
        self._check_add(key, None, random_id, check_content=False)
1308
1015
        if text.__class__ is not str:
1324
1031
 
1325
1032
        :param a_versioned_files: A VersionedFiles object.
1326
1033
        """
1327
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1034
        self._fallback_vfs.append(a_versioned_files)
1328
1035
 
1329
1036
    def annotate(self, key):
1330
1037
        """See VersionedFiles.annotate."""
1331
 
        ann = annotate.Annotator(self)
1332
 
        return ann.annotate_flat(key)
1333
 
 
1334
 
    def get_annotator(self):
1335
 
        return annotate.Annotator(self)
1336
 
 
1337
 
    def check(self, progress_bar=None, keys=None):
 
1038
        graph = Graph(self)
 
1039
        parent_map = self.get_parent_map([key])
 
1040
        if not parent_map:
 
1041
            raise errors.RevisionNotPresent(key, self)
 
1042
        if parent_map[key] is not None:
 
1043
            parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
 
1044
                              if v is not None)
 
1045
            keys = parent_map.keys()
 
1046
        else:
 
1047
            keys = [key]
 
1048
            parent_map = {key:()}
 
1049
        # We used Graph(self) to load the parent_map, but now that we have it,
 
1050
        # we can just query the parent map directly, so create a KnownGraph
 
1051
        heads_provider = _mod_graph.KnownGraph(parent_map)
 
1052
        parent_cache = {}
 
1053
        reannotate = annotate.reannotate
 
1054
        for record in self.get_record_stream(keys, 'topological', True):
 
1055
            key = record.key
 
1056
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1057
            parent_lines = [parent_cache[parent] for parent in parent_map[key]]
 
1058
            parent_cache[key] = list(
 
1059
                reannotate(parent_lines, lines, key, None, heads_provider))
 
1060
        return parent_cache[key]
 
1061
 
 
1062
    def check(self, progress_bar=None):
1338
1063
        """See VersionedFiles.check()."""
1339
 
        if keys is None:
1340
 
            keys = self.keys()
1341
 
            for record in self.get_record_stream(keys, 'unordered', True):
1342
 
                record.get_bytes_as('fulltext')
1343
 
        else:
1344
 
            return self.get_record_stream(keys, 'unordered', True)
1345
 
 
1346
 
    def clear_cache(self):
1347
 
        """See VersionedFiles.clear_cache()"""
1348
 
        self._group_cache.clear()
1349
 
        self._index._graph_index.clear_cache()
1350
 
        self._index._int_cache.clear()
 
1064
        keys = self.keys()
 
1065
        for record in self.get_record_stream(keys, 'unordered', True):
 
1066
            record.get_bytes_as('fulltext')
1351
1067
 
1352
1068
    def _check_add(self, key, lines, random_id, check_content):
1353
1069
        """check that version_id and lines are safe to add."""
1384
1100
            and so on.
1385
1101
        """
1386
1102
        result = {}
1387
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1103
        sources = [self._index] + self._fallback_vfs
1388
1104
        source_results = []
1389
1105
        missing = set(keys)
1390
1106
        for source in sources:
1396
1112
            missing.difference_update(set(new_result))
1397
1113
        return result, source_results
1398
1114
 
1399
 
    def _get_blocks(self, read_memos):
1400
 
        """Get GroupCompressBlocks for the given read_memos.
1401
 
 
1402
 
        :returns: a series of (read_memo, block) pairs, in the order they were
1403
 
            originally passed.
1404
 
        """
1405
 
        cached = {}
1406
 
        for read_memo in read_memos:
1407
 
            try:
1408
 
                block = self._group_cache[read_memo]
1409
 
            except KeyError:
1410
 
                pass
1411
 
            else:
1412
 
                cached[read_memo] = block
1413
 
        not_cached = []
1414
 
        not_cached_seen = set()
1415
 
        for read_memo in read_memos:
1416
 
            if read_memo in cached:
1417
 
                # Don't fetch what we already have
1418
 
                continue
1419
 
            if read_memo in not_cached_seen:
1420
 
                # Don't try to fetch the same data twice
1421
 
                continue
1422
 
            not_cached.append(read_memo)
1423
 
            not_cached_seen.add(read_memo)
1424
 
        raw_records = self._access.get_raw_records(not_cached)
1425
 
        for read_memo in read_memos:
1426
 
            try:
1427
 
                yield read_memo, cached[read_memo]
1428
 
            except KeyError:
1429
 
                # Read the block, and cache it.
1430
 
                zdata = raw_records.next()
1431
 
                block = GroupCompressBlock.from_bytes(zdata)
1432
 
                self._group_cache[read_memo] = block
1433
 
                cached[read_memo] = block
1434
 
                yield read_memo, block
 
1115
    def _get_block(self, index_memo):
 
1116
        read_memo = index_memo[0:3]
 
1117
        # get the group:
 
1118
        try:
 
1119
            block = self._group_cache[read_memo]
 
1120
        except KeyError:
 
1121
            # read the group
 
1122
            zdata = self._access.get_raw_records([read_memo]).next()
 
1123
            # decompress - whole thing - this is not a bug, as it
 
1124
            # permits caching. We might want to store the partially
 
1125
            # decompresed group and decompress object, so that recent
 
1126
            # texts are not penalised by big groups.
 
1127
            block = GroupCompressBlock.from_bytes(zdata)
 
1128
            self._group_cache[read_memo] = block
 
1129
        # cheapo debugging:
 
1130
        # print len(zdata), len(plain)
 
1131
        # parse - requires split_lines, better to have byte offsets
 
1132
        # here (but not by much - we only split the region for the
 
1133
        # recipe, and we often want to end up with lines anyway.
 
1134
        return block
1435
1135
 
1436
1136
    def get_missing_compression_parent_keys(self):
1437
1137
        """Return the keys of missing compression parents.
1491
1191
        parent_map = {}
1492
1192
        key_to_source_map = {}
1493
1193
        source_results = []
1494
 
        for source in self._immediate_fallback_vfs:
 
1194
        for source in self._fallback_vfs:
1495
1195
            if not missing:
1496
1196
                break
1497
1197
            source_parents = source.get_parent_map(missing)
1507
1207
 
1508
1208
        The returned objects should be in the order defined by 'ordering',
1509
1209
        which can weave between different sources.
1510
 
 
1511
1210
        :param ordering: Must be one of 'topological' or 'groupcompress'
1512
1211
        :return: List of [(source, [keys])] tuples, such that all keys are in
1513
1212
            the defined order, regardless of source.
1514
1213
        """
1515
1214
        if ordering == 'topological':
1516
 
            present_keys = tsort.topo_sort(parent_map)
 
1215
            present_keys = topo_sort(parent_map)
1517
1216
        else:
1518
1217
            # ordering == 'groupcompress'
1519
1218
            # XXX: This only optimizes for the target ordering. We may need
1604
1303
                unadded_keys, source_result)
1605
1304
        for key in missing:
1606
1305
            yield AbsentContentFactory(key)
1607
 
        # Batch up as many keys as we can until either:
1608
 
        #  - we encounter an unadded ref, or
1609
 
        #  - we run out of keys, or
1610
 
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1611
 
        batcher = _BatchingBlockFetcher(self, locations,
1612
 
            get_compressor_settings=self._get_compressor_settings)
 
1306
        manager = None
 
1307
        last_read_memo = None
 
1308
        # TODO: This works fairly well at batching up existing groups into a
 
1309
        #       streamable format, and possibly allowing for taking one big
 
1310
        #       group and splitting it when it isn't fully utilized.
 
1311
        #       However, it doesn't allow us to find under-utilized groups and
 
1312
        #       combine them into a bigger group on the fly.
 
1313
        #       (Consider the issue with how chk_map inserts texts
 
1314
        #       one-at-a-time.) This could be done at insert_record_stream()
 
1315
        #       time, but it probably would decrease the number of
 
1316
        #       bytes-on-the-wire for fetch.
1613
1317
        for source, keys in source_keys:
1614
1318
            if source is self:
1615
1319
                for key in keys:
1616
1320
                    if key in self._unadded_refs:
1617
 
                        # Flush batch, then yield unadded ref from
1618
 
                        # self._compressor.
1619
 
                        for factory in batcher.yield_factories(full_flush=True):
1620
 
                            yield factory
 
1321
                        if manager is not None:
 
1322
                            for factory in manager.get_record_stream():
 
1323
                                yield factory
 
1324
                            last_read_memo = manager = None
1621
1325
                        bytes, sha1 = self._compressor.extract(key)
1622
1326
                        parents = self._unadded_refs[key]
1623
1327
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1624
 
                        continue
1625
 
                    if batcher.add_key(key) > BATCH_SIZE:
1626
 
                        # Ok, this batch is big enough.  Yield some results.
1627
 
                        for factory in batcher.yield_factories():
1628
 
                            yield factory
 
1328
                    else:
 
1329
                        index_memo, _, parents, (method, _) = locations[key]
 
1330
                        read_memo = index_memo[0:3]
 
1331
                        if last_read_memo != read_memo:
 
1332
                            # We are starting a new block. If we have a
 
1333
                            # manager, we have found everything that fits for
 
1334
                            # now, so yield records
 
1335
                            if manager is not None:
 
1336
                                for factory in manager.get_record_stream():
 
1337
                                    yield factory
 
1338
                            # Now start a new manager
 
1339
                            block = self._get_block(index_memo)
 
1340
                            manager = _LazyGroupContentManager(block)
 
1341
                            last_read_memo = read_memo
 
1342
                        start, end = index_memo[3:5]
 
1343
                        manager.add_factory(key, parents, start, end)
1629
1344
            else:
1630
 
                for factory in batcher.yield_factories(full_flush=True):
1631
 
                    yield factory
 
1345
                if manager is not None:
 
1346
                    for factory in manager.get_record_stream():
 
1347
                        yield factory
 
1348
                    last_read_memo = manager = None
1632
1349
                for record in source.get_record_stream(keys, ordering,
1633
1350
                                                       include_delta_closure):
1634
1351
                    yield record
1635
 
        for factory in batcher.yield_factories(full_flush=True):
1636
 
            yield factory
 
1352
        if manager is not None:
 
1353
            for factory in manager.get_record_stream():
 
1354
                yield factory
1637
1355
 
1638
1356
    def get_sha1s(self, keys):
1639
1357
        """See VersionedFiles.get_sha1s()."""
1661
1379
        for _ in self._insert_record_stream(stream, random_id=False):
1662
1380
            pass
1663
1381
 
1664
 
    def _get_compressor_settings(self):
1665
 
        if self._max_bytes_to_index is None:
1666
 
            # TODO: VersionedFiles don't know about their containing
1667
 
            #       repository, so they don't have much of an idea about their
1668
 
            #       location. So for now, this is only a global option.
1669
 
            c = config.GlobalConfig()
1670
 
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
 
            if val is not None:
1672
 
                try:
1673
 
                    val = int(val)
1674
 
                except ValueError, e:
1675
 
                    trace.warning('Value for '
1676
 
                                  '"bzr.groupcompress.max_bytes_to_index"'
1677
 
                                  ' %r is not an integer'
1678
 
                                  % (val,))
1679
 
                    val = None
1680
 
            if val is None:
1681
 
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
 
            self._max_bytes_to_index = val
1683
 
        return {'max_bytes_to_index': self._max_bytes_to_index}
1684
 
 
1685
 
    def _make_group_compressor(self):
1686
 
        return GroupCompressor(self._get_compressor_settings())
1687
 
 
1688
1382
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1383
                              reuse_blocks=True):
1690
1384
        """Internal core to insert a record stream into this container.
1713
1407
                return adapter
1714
1408
        # This will go up to fulltexts for gc to gc fetching, which isn't
1715
1409
        # ideal.
1716
 
        self._compressor = self._make_group_compressor()
 
1410
        self._compressor = GroupCompressor()
1717
1411
        self._unadded_refs = {}
1718
1412
        keys_to_add = []
1719
1413
        def flush():
1720
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1721
 
            self._compressor = self._make_group_compressor()
1722
 
            # Note: At this point we still have 1 copy of the fulltext (in
1723
 
            #       record and the var 'bytes'), and this generates 2 copies of
1724
 
            #       the compressed text (one for bytes, one in chunks)
1725
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1726
 
            #       have to double compressed memory here
1727
 
            # TODO: Figure out how to indicate that we would be happy to free
1728
 
            #       the fulltext content at this point. Note that sometimes we
1729
 
            #       will want it later (streaming CHK pages), but most of the
1730
 
            #       time we won't (everything else)
1731
 
            bytes = ''.join(chunks)
1732
 
            del chunks
 
1414
            bytes = self._compressor.flush().to_bytes()
1733
1415
            index, start, length = self._access.add_raw_records(
1734
1416
                [(None, len(bytes))], bytes)[0]
1735
1417
            nodes = []
1738
1420
            self._index.add_records(nodes, random_id=random_id)
1739
1421
            self._unadded_refs = {}
1740
1422
            del keys_to_add[:]
 
1423
            self._compressor = GroupCompressor()
1741
1424
 
1742
1425
        last_prefix = None
1743
1426
        max_fulltext_len = 0
1747
1430
        block_length = None
1748
1431
        # XXX: TODO: remove this, it is just for safety checking for now
1749
1432
        inserted_keys = set()
1750
 
        reuse_this_block = reuse_blocks
1751
1433
        for record in stream:
1752
1434
            # Raise an error when a record is missing.
1753
1435
            if record.storage_kind == 'absent':
1761
1443
            if reuse_blocks:
1762
1444
                # If the reuse_blocks flag is set, check to see if we can just
1763
1445
                # copy a groupcompress block as-is.
1764
 
                # We only check on the first record (groupcompress-block) not
1765
 
                # on all of the (groupcompress-block-ref) entries.
1766
 
                # The reuse_this_block flag is then kept for as long as
1767
 
                if record.storage_kind == 'groupcompress-block':
1768
 
                    # Check to see if we really want to re-use this block
1769
 
                    insert_manager = record._manager
1770
 
                    reuse_this_block = insert_manager.check_is_well_utilized()
1771
 
            else:
1772
 
                reuse_this_block = False
1773
 
            if reuse_this_block:
1774
 
                # We still want to reuse this block
1775
1446
                if record.storage_kind == 'groupcompress-block':
1776
1447
                    # Insert the raw block into the target repo
1777
1448
                    insert_manager = record._manager
 
1449
                    insert_manager._check_rebuild_block()
1778
1450
                    bytes = record._manager._block.to_bytes()
1779
1451
                    _, start, length = self._access.add_raw_records(
1780
1452
                        [(None, len(bytes))], bytes)[0]
1785
1457
                                           'groupcompress-block-ref'):
1786
1458
                    if insert_manager is None:
1787
1459
                        raise AssertionError('No insert_manager set')
1788
 
                    if insert_manager is not record._manager:
1789
 
                        raise AssertionError('insert_manager does not match'
1790
 
                            ' the current record, we cannot be positive'
1791
 
                            ' that the appropriate content was inserted.'
1792
 
                            )
1793
1460
                    value = "%d %d %d %d" % (block_start, block_length,
1794
1461
                                             record._start, record._end)
1795
1462
                    nodes = [(record.key, value, (record.parents,))]
1845
1512
                key = record.key
1846
1513
            self._unadded_refs[key] = record.parents
1847
1514
            yield found_sha1
1848
 
            as_st = static_tuple.StaticTuple.from_sequence
1849
 
            if record.parents is not None:
1850
 
                parents = as_st([as_st(p) for p in record.parents])
1851
 
            else:
1852
 
                parents = None
1853
 
            refs = static_tuple.StaticTuple(parents)
1854
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1515
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1516
                (record.parents,)))
1855
1517
        if len(keys_to_add):
1856
1518
            flush()
1857
1519
        self._compressor = None
1900
1562
        """See VersionedFiles.keys."""
1901
1563
        if 'evil' in debug.debug_flags:
1902
1564
            trace.mutter_callsite(2, "keys scales with size of history")
1903
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1565
        sources = [self._index] + self._fallback_vfs
1904
1566
        result = set()
1905
1567
        for source in sources:
1906
1568
            result.update(source.keys())
1907
1569
        return result
1908
1570
 
1909
1571
 
1910
 
class _GCBuildDetails(object):
1911
 
    """A blob of data about the build details.
1912
 
 
1913
 
    This stores the minimal data, which then allows compatibility with the old
1914
 
    api, without taking as much memory.
1915
 
    """
1916
 
 
1917
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1918
 
                 '_delta_end', '_parents')
1919
 
 
1920
 
    method = 'group'
1921
 
    compression_parent = None
1922
 
 
1923
 
    def __init__(self, parents, position_info):
1924
 
        self._parents = parents
1925
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1926
 
         self._delta_end) = position_info
1927
 
 
1928
 
    def __repr__(self):
1929
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1930
 
            self.index_memo, self._parents)
1931
 
 
1932
 
    @property
1933
 
    def index_memo(self):
1934
 
        return (self._index, self._group_start, self._group_end,
1935
 
                self._basis_end, self._delta_end)
1936
 
 
1937
 
    @property
1938
 
    def record_details(self):
1939
 
        return static_tuple.StaticTuple(self.method, None)
1940
 
 
1941
 
    def __getitem__(self, offset):
1942
 
        """Compatibility thunk to act like a tuple."""
1943
 
        if offset == 0:
1944
 
            return self.index_memo
1945
 
        elif offset == 1:
1946
 
            return self.compression_parent # Always None
1947
 
        elif offset == 2:
1948
 
            return self._parents
1949
 
        elif offset == 3:
1950
 
            return self.record_details
1951
 
        else:
1952
 
            raise IndexError('offset out of range')
1953
 
            
1954
 
    def __len__(self):
1955
 
        return 4
1956
 
 
1957
 
 
1958
1572
class _GCGraphIndex(object):
1959
1573
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1960
1574
 
1961
1575
    def __init__(self, graph_index, is_locked, parents=True,
1962
 
        add_callback=None, track_external_parent_refs=False,
1963
 
        inconsistency_fatal=True, track_new_keys=False):
 
1576
        add_callback=None, track_external_parent_refs=False):
1964
1577
        """Construct a _GCGraphIndex on a graph_index.
1965
1578
 
1966
1579
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1974
1587
        :param track_external_parent_refs: As keys are added, keep track of the
1975
1588
            keys they reference, so that we can query get_missing_parents(),
1976
1589
            etc.
1977
 
        :param inconsistency_fatal: When asked to add records that are already
1978
 
            present, and the details are inconsistent with the existing
1979
 
            record, raise an exception instead of warning (and skipping the
1980
 
            record).
1981
1590
        """
1982
1591
        self._add_callback = add_callback
1983
1592
        self._graph_index = graph_index
1984
1593
        self._parents = parents
1985
1594
        self.has_graph = parents
1986
1595
        self._is_locked = is_locked
1987
 
        self._inconsistency_fatal = inconsistency_fatal
1988
 
        # GroupCompress records tend to have the same 'group' start + offset
1989
 
        # repeated over and over, this creates a surplus of ints
1990
 
        self._int_cache = {}
1991
1596
        if track_external_parent_refs:
1992
 
            self._key_dependencies = _KeyRefs(
1993
 
                track_new_keys=track_new_keys)
 
1597
            self._key_dependencies = knit._KeyRefs()
1994
1598
        else:
1995
1599
            self._key_dependencies = None
1996
1600
 
2029
1633
        if not random_id:
2030
1634
            present_nodes = self._get_entries(keys)
2031
1635
            for (index, key, value, node_refs) in present_nodes:
2032
 
                # Sometimes these are passed as a list rather than a tuple
2033
 
                node_refs = static_tuple.as_tuples(node_refs)
2034
 
                passed = static_tuple.as_tuples(keys[key])
2035
 
                if node_refs != passed[1]:
2036
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
2037
 
                    if self._inconsistency_fatal:
2038
 
                        raise errors.KnitCorrupt(self, "inconsistent details"
2039
 
                                                 " in add_records: %s" %
2040
 
                                                 details)
2041
 
                    else:
2042
 
                        trace.warning("inconsistent details in skipped"
2043
 
                                      " record: %s", details)
 
1636
                if node_refs != keys[key][1]:
 
1637
                    raise errors.KnitCorrupt(self, "inconsistent details in add_records"
 
1638
                        ": %s %s" % ((value, node_refs), keys[key]))
2044
1639
                del keys[key]
2045
1640
                changed = True
2046
1641
        if changed:
2053
1648
                    result.append((key, value))
2054
1649
            records = result
2055
1650
        key_dependencies = self._key_dependencies
2056
 
        if key_dependencies is not None:
2057
 
            if self._parents:
2058
 
                for key, value, refs in records:
2059
 
                    parents = refs[0]
2060
 
                    key_dependencies.add_references(key, parents)
2061
 
            else:
2062
 
                for key, value, refs in records:
2063
 
                    new_keys.add_key(key)
 
1651
        if key_dependencies is not None and self._parents:
 
1652
            for key, value, refs in records:
 
1653
                parents = refs[0]
 
1654
                key_dependencies.add_references(key, parents)
2064
1655
        self._add_callback(records)
2065
1656
 
2066
1657
    def _check_read(self):
2097
1688
            if missing_keys:
2098
1689
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
2099
1690
 
2100
 
    def find_ancestry(self, keys):
2101
 
        """See CombinedGraphIndex.find_ancestry"""
2102
 
        return self._graph_index.find_ancestry(keys, 0)
2103
 
 
2104
1691
    def get_parent_map(self, keys):
2105
1692
        """Get a map of the parents of keys.
2106
1693
 
2123
1710
        """Return the keys of missing parents."""
2124
1711
        # Copied from _KnitGraphIndex.get_missing_parents
2125
1712
        # We may have false positives, so filter those out.
2126
 
        self._key_dependencies.satisfy_refs_for_keys(
 
1713
        self._key_dependencies.add_keys(
2127
1714
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
2128
1715
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
2129
1716
 
2135
1722
        :param keys: An iterable of keys.
2136
1723
        :return: A dict of key:
2137
1724
            (index_memo, compression_parent, parents, record_details).
2138
 
 
2139
 
            * index_memo: opaque structure to pass to read_records to extract
2140
 
              the raw data
2141
 
            * compression_parent: Content that this record is built upon, may
2142
 
              be None
2143
 
            * parents: Logical parents of this node
2144
 
            * record_details: extra information about the content which needs
2145
 
              to be passed to Factory.parse_record
 
1725
            index_memo
 
1726
                opaque structure to pass to read_records to extract the raw
 
1727
                data
 
1728
            compression_parent
 
1729
                Content that this record is built upon, may be None
 
1730
            parents
 
1731
                Logical parents of this node
 
1732
            record_details
 
1733
                extra information about the content which needs to be passed to
 
1734
                Factory.parse_record
2146
1735
        """
2147
1736
        self._check_read()
2148
1737
        result = {}
2153
1742
                parents = None
2154
1743
            else:
2155
1744
                parents = entry[3][0]
2156
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2157
 
            result[key] = details
 
1745
            method = 'group'
 
1746
            result[key] = (self._node_to_position(entry),
 
1747
                                  None, parents, (method, None))
2158
1748
        return result
2159
1749
 
2160
1750
    def keys(self):
2169
1759
        """Convert an index value to position details."""
2170
1760
        bits = node[2].split(' ')
2171
1761
        # It would be nice not to read the entire gzip.
2172
 
        # start and stop are put into _int_cache because they are very common.
2173
 
        # They define the 'group' that an entry is in, and many groups can have
2174
 
        # thousands of objects.
2175
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2176
 
        # each, or about 7MB. Note that it might be even more when you consider
2177
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2178
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2179
 
        # a LIFO when re-using PyInt slots, which might cause more
2180
 
        # fragmentation.
2181
1762
        start = int(bits[0])
2182
 
        start = self._int_cache.setdefault(start, start)
2183
1763
        stop = int(bits[1])
2184
 
        stop = self._int_cache.setdefault(stop, stop)
2185
1764
        basis_end = int(bits[2])
2186
1765
        delta_end = int(bits[3])
2187
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2188
 
        # instance...
2189
 
        return (node[0], start, stop, basis_end, delta_end)
 
1766
        return node[0], start, stop, basis_end, delta_end
2190
1767
 
2191
1768
    def scan_unvalidated_index(self, graph_index):
2192
1769
        """Inform this _GCGraphIndex that there is an unvalidated index.
2193
1770
 
2194
1771
        This allows this _GCGraphIndex to keep track of any missing
2195
1772
        compression parents we may want to have filled in to make those
2196
 
        indices valid.  It also allows _GCGraphIndex to track any new keys.
 
1773
        indices valid.
2197
1774
 
2198
1775
        :param graph_index: A GraphIndex
2199
1776
        """
2200
 
        key_dependencies = self._key_dependencies
2201
 
        if key_dependencies is None:
2202
 
            return
2203
 
        for node in graph_index.iter_all_entries():
2204
 
            # Add parent refs from graph_index (and discard parent refs
2205
 
            # that the graph_index has).
2206
 
            key_dependencies.add_references(node[1], node[3][0])
 
1777
        if self._key_dependencies is not None:
 
1778
            # Add parent refs from graph_index (and discard parent refs that
 
1779
            # the graph_index has).
 
1780
            add_refs = self._key_dependencies.add_references
 
1781
            for node in graph_index.iter_all_entries():
 
1782
                add_refs(node[1], node[3][0])
 
1783
 
2207
1784
 
2208
1785
 
2209
1786
from bzrlib._groupcompress_py import (
2223
1800
        decode_base128_int,
2224
1801
        )
2225
1802
    GroupCompressor = PyrexGroupCompressor
2226
 
except ImportError, e:
2227
 
    osutils.failed_to_load_extension(e)
 
1803
except ImportError:
2228
1804
    GroupCompressor = PythonGroupCompressor
2229
1805