~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-27 08:02:52 UTC
  • mto: This revision was merged to the branch mainline in revision 4573.
  • Revision ID: andrew.bennetts@canonical.com-20090727080252-1r4s9oqwlkzgywx7
Fix trivial bug in _vfs_set_tags_bytes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
26
 
from bzrlib.lazy_import import lazy_import
27
 
lazy_import(globals(), """
28
26
from bzrlib import (
29
27
    annotate,
30
 
    config,
31
28
    debug,
32
29
    errors,
33
30
    graph as _mod_graph,
 
31
    knit,
34
32
    osutils,
35
33
    pack,
36
 
    static_tuple,
37
34
    trace,
38
 
    tsort,
39
35
    )
40
 
 
41
 
from bzrlib.repofmt import pack_repo
42
 
""")
43
 
 
 
36
from bzrlib.graph import Graph
44
37
from bzrlib.btree_index import BTreeBuilder
45
38
from bzrlib.lru_cache import LRUSizeCache
 
39
from bzrlib.tsort import topo_sort
46
40
from bzrlib.versionedfile import (
47
 
    _KeyRefs,
48
41
    adapter_registry,
49
42
    AbsentContentFactory,
50
43
    ChunkedContentFactory,
51
44
    FulltextContentFactory,
52
 
    VersionedFilesWithFallbacks,
 
45
    VersionedFiles,
53
46
    )
54
47
 
55
 
# Minimum number of uncompressed bytes to try fetch at once when retrieving
56
 
# groupcompress blocks.
57
 
BATCH_SIZE = 2**16
58
 
 
59
48
_USE_LZMA = False and (pylzma is not None)
60
49
 
61
50
# osutils.sha_string('')
62
51
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
63
52
 
 
53
 
64
54
def sort_gc_optimal(parent_map):
65
55
    """Sort and group the keys in parent_map into groupcompress order.
66
56
 
72
62
    # groupcompress ordering is approximately reverse topological,
73
63
    # properly grouped by file-id.
74
64
    per_prefix_map = {}
75
 
    for key, value in parent_map.iteritems():
 
65
    for item in parent_map.iteritems():
 
66
        key = item[0]
76
67
        if isinstance(key, str) or len(key) == 1:
77
68
            prefix = ''
78
69
        else:
79
70
            prefix = key[0]
80
71
        try:
81
 
            per_prefix_map[prefix][key] = value
 
72
            per_prefix_map[prefix].append(item)
82
73
        except KeyError:
83
 
            per_prefix_map[prefix] = {key: value}
 
74
            per_prefix_map[prefix] = [item]
84
75
 
85
76
    present_keys = []
86
77
    for prefix in sorted(per_prefix_map):
87
 
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
78
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
88
79
    return present_keys
89
80
 
90
81
 
108
99
    def __init__(self):
109
100
        # map by key? or just order in file?
110
101
        self._compressor_name = None
111
 
        self._z_content_chunks = None
 
102
        self._z_content = None
112
103
        self._z_content_decompressor = None
113
104
        self._z_content_length = None
114
105
        self._content_length = None
127
118
        :param num_bytes: Ensure that we have extracted at least num_bytes of
128
119
            content. If None, consume everything
129
120
        """
130
 
        if self._content_length is None:
131
 
            raise AssertionError('self._content_length should never be None')
 
121
        # TODO: If we re-use the same content block at different times during
 
122
        #       get_record_stream(), it is possible that the first pass will
 
123
        #       get inserted, triggering an extract/_ensure_content() which
 
124
        #       will get rid of _z_content. And then the next use of the block
 
125
        #       will try to access _z_content (to send it over the wire), and
 
126
        #       fail because it is already extracted. Consider never releasing
 
127
        #       _z_content because of this.
132
128
        if num_bytes is None:
133
129
            num_bytes = self._content_length
134
130
        elif (self._content_length is not None
142
138
                self._content = ''.join(self._content_chunks)
143
139
                self._content_chunks = None
144
140
        if self._content is None:
145
 
            # We join self._z_content_chunks here, because if we are
146
 
            # decompressing, then it is *very* likely that we have a single
147
 
            # chunk
148
 
            if self._z_content_chunks is None:
 
141
            if self._z_content is None:
149
142
                raise AssertionError('No content to decompress')
150
 
            z_content = ''.join(self._z_content_chunks)
151
 
            if z_content == '':
 
143
            if self._z_content == '':
152
144
                self._content = ''
153
145
            elif self._compressor_name == 'lzma':
154
146
                # We don't do partial lzma decomp yet
155
 
                self._content = pylzma.decompress(z_content)
 
147
                self._content = pylzma.decompress(self._z_content)
156
148
            elif self._compressor_name == 'zlib':
157
149
                # Start a zlib decompressor
158
 
                if num_bytes * 4 > self._content_length * 3:
159
 
                    # If we are requesting more that 3/4ths of the content,
160
 
                    # just extract the whole thing in a single pass
161
 
                    num_bytes = self._content_length
162
 
                    self._content = zlib.decompress(z_content)
 
150
                if num_bytes is None:
 
151
                    self._content = zlib.decompress(self._z_content)
163
152
                else:
164
153
                    self._z_content_decompressor = zlib.decompressobj()
165
154
                    # Seed the decompressor with the uncompressed bytes, so
166
155
                    # that the rest of the code is simplified
167
156
                    self._content = self._z_content_decompressor.decompress(
168
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
 
                    if not self._z_content_decompressor.unconsumed_tail:
170
 
                        self._z_content_decompressor = None
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
171
158
            else:
172
159
                raise AssertionError('Unknown compressor: %r'
173
160
                                     % self._compressor_name)
175
162
        # 'unconsumed_tail'
176
163
 
177
164
        # Do we have enough bytes already?
178
 
        if len(self._content) >= num_bytes:
 
165
        if num_bytes is not None and len(self._content) >= num_bytes:
 
166
            return
 
167
        if num_bytes is None and self._z_content_decompressor is None:
 
168
            # We must have already decompressed everything
179
169
            return
180
170
        # If we got this far, and don't have a decompressor, something is wrong
181
171
        if self._z_content_decompressor is None:
182
172
            raise AssertionError(
183
173
                'No decompressor to decompress %d bytes' % num_bytes)
184
174
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
185
 
        if not remaining_decomp:
186
 
            raise AssertionError('Nothing left to decompress')
187
 
        needed_bytes = num_bytes - len(self._content)
188
 
        # We always set max_size to 32kB over the minimum needed, so that
189
 
        # zlib will give us as much as we really want.
190
 
        # TODO: If this isn't good enough, we could make a loop here,
191
 
        #       that keeps expanding the request until we get enough
192
 
        self._content += self._z_content_decompressor.decompress(
193
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
194
 
        if len(self._content) < num_bytes:
195
 
            raise AssertionError('%d bytes wanted, only %d available'
196
 
                                 % (num_bytes, len(self._content)))
197
 
        if not self._z_content_decompressor.unconsumed_tail:
198
 
            # The stream is finished
199
 
            self._z_content_decompressor = None
 
175
        if num_bytes is None:
 
176
            if remaining_decomp:
 
177
                # We don't know how much is left, but we'll decompress it all
 
178
                self._content += self._z_content_decompressor.decompress(
 
179
                    remaining_decomp)
 
180
                # Note: There's what I consider a bug in zlib.decompressobj
 
181
                #       If you pass back in the entire unconsumed_tail, only
 
182
                #       this time you don't pass a max-size, it doesn't
 
183
                #       change the unconsumed_tail back to None/''.
 
184
                #       However, we know we are done with the whole stream
 
185
                self._z_content_decompressor = None
 
186
            # XXX: Why is this the only place in this routine we set this?
 
187
            self._content_length = len(self._content)
 
188
        else:
 
189
            if not remaining_decomp:
 
190
                raise AssertionError('Nothing left to decompress')
 
191
            needed_bytes = num_bytes - len(self._content)
 
192
            # We always set max_size to 32kB over the minimum needed, so that
 
193
            # zlib will give us as much as we really want.
 
194
            # TODO: If this isn't good enough, we could make a loop here,
 
195
            #       that keeps expanding the request until we get enough
 
196
            self._content += self._z_content_decompressor.decompress(
 
197
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
198
            if len(self._content) < num_bytes:
 
199
                raise AssertionError('%d bytes wanted, only %d available'
 
200
                                     % (num_bytes, len(self._content)))
 
201
            if not self._z_content_decompressor.unconsumed_tail:
 
202
                # The stream is finished
 
203
                self._z_content_decompressor = None
200
204
 
201
205
    def _parse_bytes(self, bytes, pos):
202
206
        """Read the various lengths from the header.
218
222
            # XXX: Define some GCCorrupt error ?
219
223
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
220
224
                                 (len(bytes), pos, self._z_content_length))
221
 
        self._z_content_chunks = (bytes[pos:],)
222
 
 
223
 
    @property
224
 
    def _z_content(self):
225
 
        """Return z_content_chunks as a simple string.
226
 
 
227
 
        Meant only to be used by the test suite.
228
 
        """
229
 
        if self._z_content_chunks is not None:
230
 
            return ''.join(self._z_content_chunks)
231
 
        return None
 
225
        self._z_content = bytes[pos:]
232
226
 
233
227
    @classmethod
234
228
    def from_bytes(cls, bytes):
290
284
        self._content_length = length
291
285
        self._content_chunks = content_chunks
292
286
        self._content = None
293
 
        self._z_content_chunks = None
 
287
        self._z_content = None
294
288
 
295
289
    def set_content(self, content):
296
290
        """Set the content of this block."""
297
291
        self._content_length = len(content)
298
292
        self._content = content
299
 
        self._z_content_chunks = None
 
293
        self._z_content = None
300
294
 
301
295
    def _create_z_content_using_lzma(self):
302
296
        if self._content_chunks is not None:
304
298
            self._content_chunks = None
305
299
        if self._content is None:
306
300
            raise AssertionError('Nothing to compress')
307
 
        z_content = pylzma.compress(self._content)
308
 
        self._z_content_chunks = (z_content,)
309
 
        self._z_content_length = len(z_content)
 
301
        self._z_content = pylzma.compress(self._content)
 
302
        self._z_content_length = len(self._z_content)
310
303
 
311
 
    def _create_z_content_from_chunks(self, chunks):
 
304
    def _create_z_content_from_chunks(self):
312
305
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
 
        # (measured peak is maybe 30MB over the above...)
315
 
        compressed_chunks = map(compressor.compress, chunks)
 
306
        compressed_chunks = map(compressor.compress, self._content_chunks)
316
307
        compressed_chunks.append(compressor.flush())
317
 
        # Ignore empty chunks
318
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
319
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
308
        self._z_content = ''.join(compressed_chunks)
 
309
        self._z_content_length = len(self._z_content)
320
310
 
321
311
    def _create_z_content(self):
322
 
        if self._z_content_chunks is not None:
 
312
        if self._z_content is not None:
323
313
            return
324
314
        if _USE_LZMA:
325
315
            self._create_z_content_using_lzma()
326
316
            return
327
317
        if self._content_chunks is not None:
328
 
            chunks = self._content_chunks
329
 
        else:
330
 
            chunks = (self._content,)
331
 
        self._create_z_content_from_chunks(chunks)
 
318
            self._create_z_content_from_chunks()
 
319
            return
 
320
        self._z_content = zlib.compress(self._content)
 
321
        self._z_content_length = len(self._z_content)
332
322
 
333
 
    def to_chunks(self):
334
 
        """Create the byte stream as a series of 'chunks'"""
 
323
    def to_bytes(self):
 
324
        """Encode the information into a byte stream."""
335
325
        self._create_z_content()
336
326
        if _USE_LZMA:
337
327
            header = self.GCB_LZ_HEADER
338
328
        else:
339
329
            header = self.GCB_HEADER
340
 
        chunks = ['%s%d\n%d\n'
341
 
                  % (header, self._z_content_length, self._content_length),
 
330
        chunks = [header,
 
331
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
332
                  self._z_content,
342
333
                 ]
343
 
        chunks.extend(self._z_content_chunks)
344
 
        total_len = sum(map(len, chunks))
345
 
        return total_len, chunks
346
 
 
347
 
    def to_bytes(self):
348
 
        """Encode the information into a byte stream."""
349
 
        total_len, chunks = self.to_chunks()
350
334
        return ''.join(chunks)
351
335
 
352
336
    def _dump(self, include_text=False):
466
450
                # Grab and cache the raw bytes for this entry
467
451
                # and break the ref-cycle with _manager since we don't need it
468
452
                # anymore
469
 
                try:
470
 
                    self._manager._prepare_for_extract()
471
 
                except zlib.error as value:
472
 
                    raise errors.DecompressCorruption("zlib: " + str(value))
 
453
                self._manager._prepare_for_extract()
473
454
                block = self._manager._block
474
455
                self._bytes = block.extract(self.key, self._start, self._end)
475
456
                # There are code paths that first extract as fulltext, and then
476
457
                # extract as storage_kind (smart fetch). So we don't break the
477
458
                # refcycle here, but instead in manager.get_record_stream()
 
459
                # self._manager = None
478
460
            if storage_kind == 'fulltext':
479
461
                return self._bytes
480
462
            else:
486
468
class _LazyGroupContentManager(object):
487
469
    """This manages a group of _LazyGroupCompressFactory objects."""
488
470
 
489
 
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
490
 
                             # current size, and still be considered
491
 
                             # resuable
492
 
    _full_block_size = 4*1024*1024
493
 
    _full_mixed_block_size = 2*1024*1024
494
 
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
 
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
496
 
 
497
 
    def __init__(self, block, get_compressor_settings=None):
 
471
    def __init__(self, block):
498
472
        self._block = block
499
473
        # We need to preserve the ordering
500
474
        self._factories = []
501
475
        self._last_byte = 0
502
 
        self._get_settings = get_compressor_settings
503
 
        self._compressor_settings = None
504
 
 
505
 
    def _get_compressor_settings(self):
506
 
        if self._compressor_settings is not None:
507
 
            return self._compressor_settings
508
 
        settings = None
509
 
        if self._get_settings is not None:
510
 
            settings = self._get_settings()
511
 
        if settings is None:
512
 
            vf = GroupCompressVersionedFiles
513
 
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
 
        self._compressor_settings = settings
515
 
        return self._compressor_settings
516
476
 
517
477
    def add_factory(self, key, parents, start, end):
518
478
        if not self._factories:
551
511
        new_block.set_content(self._block._content[:last_byte])
552
512
        self._block = new_block
553
513
 
554
 
    def _make_group_compressor(self):
555
 
        return GroupCompressor(self._get_compressor_settings())
556
 
 
557
514
    def _rebuild_block(self):
558
515
        """Create a new GroupCompressBlock with only the referenced texts."""
559
 
        compressor = self._make_group_compressor()
 
516
        compressor = GroupCompressor()
560
517
        tstart = time.time()
561
518
        old_length = self._block._content_length
562
519
        end_point = 0
574
531
        #       block? It seems hard to come up with a method that it would
575
532
        #       expand, since we do full compression again. Perhaps based on a
576
533
        #       request that ends up poorly ordered?
577
 
        # TODO: If the content would have expanded, then we would want to
578
 
        #       handle a case where we need to split the block.
579
 
        #       Now that we have a user-tweakable option
580
 
        #       (max_bytes_to_index), it is possible that one person set it
581
 
        #       to a very low value, causing poor compression.
582
534
        delta = time.time() - tstart
583
535
        self._block = new_block
584
536
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
593
545
        # time (self._block._content) is a little expensive.
594
546
        self._block._ensure_content(self._last_byte)
595
547
 
596
 
    def _check_rebuild_action(self):
 
548
    def _check_rebuild_block(self):
597
549
        """Check to see if our block should be repacked."""
598
550
        total_bytes_used = 0
599
551
        last_byte_used = 0
600
552
        for factory in self._factories:
601
553
            total_bytes_used += factory._end - factory._start
602
 
            if last_byte_used < factory._end:
603
 
                last_byte_used = factory._end
604
 
        # If we are using more than half of the bytes from the block, we have
605
 
        # nothing else to check
 
554
            last_byte_used = max(last_byte_used, factory._end)
 
555
        # If we are using most of the bytes from the block, we have nothing
 
556
        # else to check (currently more that 1/2)
606
557
        if total_bytes_used * 2 >= self._block._content_length:
607
 
            return None, last_byte_used, total_bytes_used
608
 
        # We are using less than 50% of the content. Is the content we are
609
 
        # using at the beginning of the block? If so, we can just trim the
610
 
        # tail, rather than rebuilding from scratch.
 
558
            return
 
559
        # Can we just strip off the trailing bytes? If we are going to be
 
560
        # transmitting more than 50% of the front of the content, go ahead
611
561
        if total_bytes_used * 2 > last_byte_used:
612
 
            return 'trim', last_byte_used, total_bytes_used
 
562
            self._trim_block(last_byte_used)
 
563
            return
613
564
 
614
565
        # We are using a small amount of the data, and it isn't just packed
615
566
        # nicely at the front, so rebuild the content.
622
573
        #       expanding many deltas into fulltexts, as well.
623
574
        #       If we build a cheap enough 'strip', then we could try a strip,
624
575
        #       if that expands the content, we then rebuild.
625
 
        return 'rebuild', last_byte_used, total_bytes_used
626
 
 
627
 
    def check_is_well_utilized(self):
628
 
        """Is the current block considered 'well utilized'?
629
 
 
630
 
        This heuristic asks if the current block considers itself to be a fully
631
 
        developed group, rather than just a loose collection of data.
632
 
        """
633
 
        if len(self._factories) == 1:
634
 
            # A block of length 1 could be improved by combining with other
635
 
            # groups - don't look deeper. Even larger than max size groups
636
 
            # could compress well with adjacent versions of the same thing.
637
 
            return False
638
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
639
 
        block_size = self._block._content_length
640
 
        if total_bytes_used < block_size * self._max_cut_fraction:
641
 
            # This block wants to trim itself small enough that we want to
642
 
            # consider it under-utilized.
643
 
            return False
644
 
        # TODO: This code is meant to be the twin of _insert_record_stream's
645
 
        #       'start_new_block' logic. It would probably be better to factor
646
 
        #       out that logic into a shared location, so that it stays
647
 
        #       together better
648
 
        # We currently assume a block is properly utilized whenever it is >75%
649
 
        # of the size of a 'full' block. In normal operation, a block is
650
 
        # considered full when it hits 4MB of same-file content. So any block
651
 
        # >3MB is 'full enough'.
652
 
        # The only time this isn't true is when a given block has large-object
653
 
        # content. (a single file >4MB, etc.)
654
 
        # Under these circumstances, we allow a block to grow to
655
 
        # 2 x largest_content.  Which means that if a given block had a large
656
 
        # object, it may actually be under-utilized. However, given that this
657
 
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
658
 
        # content blobs on-the-fly. Note that because we return False for all
659
 
        # 1-item blobs, we will repack them; we may wish to reevaluate our
660
 
        # treatment of large object blobs in the future.
661
 
        if block_size >= self._full_enough_block_size:
662
 
            return True
663
 
        # If a block is <3MB, it still may be considered 'full' if it contains
664
 
        # mixed content. The current rule is 2MB of mixed content is considered
665
 
        # full. So check to see if this block contains mixed content, and
666
 
        # set the threshold appropriately.
667
 
        common_prefix = None
668
 
        for factory in self._factories:
669
 
            prefix = factory.key[:-1]
670
 
            if common_prefix is None:
671
 
                common_prefix = prefix
672
 
            elif prefix != common_prefix:
673
 
                # Mixed content, check the size appropriately
674
 
                if block_size >= self._full_enough_mixed_block_size:
675
 
                    return True
676
 
                break
677
 
        # The content failed both the mixed check and the single-content check
678
 
        # so obviously it is not fully utilized
679
 
        # TODO: there is one other constraint that isn't being checked
680
 
        #       namely, that the entries in the block are in the appropriate
681
 
        #       order. For example, you could insert the entries in exactly
682
 
        #       reverse groupcompress order, and we would think that is ok.
683
 
        #       (all the right objects are in one group, and it is fully
684
 
        #       utilized, etc.) For now, we assume that case is rare,
685
 
        #       especially since we should always fetch in 'groupcompress'
686
 
        #       order.
687
 
        return False
688
 
 
689
 
    def _check_rebuild_block(self):
690
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
691
 
        if action is None:
692
 
            return
693
 
        if action == 'trim':
694
 
            self._trim_block(last_byte_used)
695
 
        elif action == 'rebuild':
696
 
            self._rebuild_block()
697
 
        else:
698
 
            raise ValueError('unknown rebuild action: %r' % (action,))
 
576
        self._rebuild_block()
699
577
 
700
578
    def _wire_bytes(self):
701
579
        """Return a byte stream suitable for transmitting over the wire."""
735
613
        z_header_bytes = zlib.compress(header_bytes)
736
614
        del header_bytes
737
615
        z_header_bytes_len = len(z_header_bytes)
738
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
616
        block_bytes = self._block.to_bytes()
739
617
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
740
 
                                       block_bytes_len))
 
618
                                       len(block_bytes)))
741
619
        lines.append(z_header_bytes)
742
 
        lines.extend(block_chunks)
743
 
        del z_header_bytes, block_chunks
744
 
        # TODO: This is a point where we will double the memory consumption. To
745
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
620
        lines.append(block_bytes)
 
621
        del z_header_bytes, block_bytes
746
622
        return ''.join(lines)
747
623
 
748
624
    @classmethod
749
625
    def from_bytes(cls, bytes):
750
626
        # TODO: This does extra string copying, probably better to do it a
751
 
        #       different way. At a minimum this creates 2 copies of the
752
 
        #       compressed content
 
627
        #       different way
753
628
        (storage_kind, z_header_len, header_len,
754
629
         block_len, rest) = bytes.split('\n', 4)
755
630
        del bytes
807
682
 
808
683
class _CommonGroupCompressor(object):
809
684
 
810
 
    def __init__(self, settings=None):
 
685
    def __init__(self):
811
686
        """Create a GroupCompressor."""
812
687
        self.chunks = []
813
688
        self._last = None
816
691
        self.labels_deltas = {}
817
692
        self._delta_index = None # Set by the children
818
693
        self._block = GroupCompressBlock()
819
 
        if settings is None:
820
 
            self._settings = {}
821
 
        else:
822
 
            self._settings = settings
823
694
 
824
695
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
825
696
        """Compress lines with label key.
917
788
 
918
789
        After calling this, the compressor should no longer be used
919
790
        """
 
791
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
792
        #       group. This has an impact for 'commit' of large objects.
 
793
        #       One possibility is to use self._content_chunks, and be lazy and
 
794
        #       only fill out self._content as a full string when we actually
 
795
        #       need it. That would at least drop the peak memory consumption
 
796
        #       for 'commit' down to ~1x the size of the largest file, at a
 
797
        #       cost of increased complexity within this code. 2x is still <<
 
798
        #       3x the size of the largest file, so we are doing ok.
920
799
        self._block.set_chunked_content(self.chunks, self.endpoint)
921
800
        self.chunks = None
922
801
        self._delta_index = None
940
819
 
941
820
class PythonGroupCompressor(_CommonGroupCompressor):
942
821
 
943
 
    def __init__(self, settings=None):
 
822
    def __init__(self):
944
823
        """Create a GroupCompressor.
945
824
 
946
825
        Used only if the pyrex version is not available.
947
826
        """
948
 
        super(PythonGroupCompressor, self).__init__(settings)
 
827
        super(PythonGroupCompressor, self).__init__()
949
828
        self._delta_index = LinesDeltaIndex([])
950
829
        # The actual content is managed by LinesDeltaIndex
951
830
        self.chunks = self._delta_index.lines
988
867
 
989
868
    It contains code very similar to SequenceMatcher because of having a similar
990
869
    task. However some key differences apply:
991
 
 
992
 
    * there is no junk, we want a minimal edit not a human readable diff.
993
 
    * we don't filter very common lines (because we don't know where a good
994
 
      range will start, and after the first text we want to be emitting minmal
995
 
      edits only.
996
 
    * we chain the left side, not the right side
997
 
    * we incrementally update the adjacency matrix as new lines are provided.
998
 
    * we look for matches in all of the left side, so the routine which does
999
 
      the analagous task of find_longest_match does not need to filter on the
1000
 
      left side.
 
870
     - there is no junk, we want a minimal edit not a human readable diff.
 
871
     - we don't filter very common lines (because we don't know where a good
 
872
       range will start, and after the first text we want to be emitting minmal
 
873
       edits only.
 
874
     - we chain the left side, not the right side
 
875
     - we incrementally update the adjacency matrix as new lines are provided.
 
876
     - we look for matches in all of the left side, so the routine which does
 
877
       the analagous task of find_longest_match does not need to filter on the
 
878
       left side.
1001
879
    """
1002
880
 
1003
 
    def __init__(self, settings=None):
1004
 
        super(PyrexGroupCompressor, self).__init__(settings)
1005
 
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
 
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
881
    def __init__(self):
 
882
        super(PyrexGroupCompressor, self).__init__()
 
883
        self._delta_index = DeltaIndex()
1007
884
 
1008
885
    def _compress(self, key, bytes, max_delta_size, soft=False):
1009
886
        """see _CommonGroupCompressor._compress"""
1084
961
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1085
962
            add_callback=graph_index.add_nodes,
1086
963
            inconsistency_fatal=inconsistency_fatal)
1087
 
        access = pack_repo._DirectPackAccess({})
 
964
        access = knit._DirectPackAccess({})
1088
965
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1089
966
        result = GroupCompressVersionedFiles(index, access, delta)
1090
967
        result.stream = stream
1098
975
    versioned_files.stream.close()
1099
976
 
1100
977
 
1101
 
class _BatchingBlockFetcher(object):
1102
 
    """Fetch group compress blocks in batches.
1103
 
 
1104
 
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1105
 
        currently pending batch.
1106
 
    """
1107
 
 
1108
 
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1109
 
        self.gcvf = gcvf
1110
 
        self.locations = locations
1111
 
        self.keys = []
1112
 
        self.batch_memos = {}
1113
 
        self.memos_to_get = []
1114
 
        self.total_bytes = 0
1115
 
        self.last_read_memo = None
1116
 
        self.manager = None
1117
 
        self._get_compressor_settings = get_compressor_settings
1118
 
 
1119
 
    def add_key(self, key):
1120
 
        """Add another to key to fetch.
1121
 
 
1122
 
        :return: The estimated number of bytes needed to fetch the batch so
1123
 
            far.
1124
 
        """
1125
 
        self.keys.append(key)
1126
 
        index_memo, _, _, _ = self.locations[key]
1127
 
        read_memo = index_memo[0:3]
1128
 
        # Three possibilities for this read_memo:
1129
 
        #  - it's already part of this batch; or
1130
 
        #  - it's not yet part of this batch, but is already cached; or
1131
 
        #  - it's not yet part of this batch and will need to be fetched.
1132
 
        if read_memo in self.batch_memos:
1133
 
            # This read memo is already in this batch.
1134
 
            return self.total_bytes
1135
 
        try:
1136
 
            cached_block = self.gcvf._group_cache[read_memo]
1137
 
        except KeyError:
1138
 
            # This read memo is new to this batch, and the data isn't cached
1139
 
            # either.
1140
 
            self.batch_memos[read_memo] = None
1141
 
            self.memos_to_get.append(read_memo)
1142
 
            byte_length = read_memo[2]
1143
 
            self.total_bytes += byte_length
1144
 
        else:
1145
 
            # This read memo is new to this batch, but cached.
1146
 
            # Keep a reference to the cached block in batch_memos because it's
1147
 
            # certain that we'll use it when this batch is processed, but
1148
 
            # there's a risk that it would fall out of _group_cache between now
1149
 
            # and then.
1150
 
            self.batch_memos[read_memo] = cached_block
1151
 
        return self.total_bytes
1152
 
 
1153
 
    def _flush_manager(self):
1154
 
        if self.manager is not None:
1155
 
            for factory in self.manager.get_record_stream():
1156
 
                yield factory
1157
 
            self.manager = None
1158
 
            self.last_read_memo = None
1159
 
 
1160
 
    def yield_factories(self, full_flush=False):
1161
 
        """Yield factories for keys added since the last yield.  They will be
1162
 
        returned in the order they were added via add_key.
1163
 
 
1164
 
        :param full_flush: by default, some results may not be returned in case
1165
 
            they can be part of the next batch.  If full_flush is True, then
1166
 
            all results are returned.
1167
 
        """
1168
 
        if self.manager is None and not self.keys:
1169
 
            return
1170
 
        # Fetch all memos in this batch.
1171
 
        blocks = self.gcvf._get_blocks(self.memos_to_get)
1172
 
        # Turn blocks into factories and yield them.
1173
 
        memos_to_get_stack = list(self.memos_to_get)
1174
 
        memos_to_get_stack.reverse()
1175
 
        for key in self.keys:
1176
 
            index_memo, _, parents, _ = self.locations[key]
1177
 
            read_memo = index_memo[:3]
1178
 
            if self.last_read_memo != read_memo:
1179
 
                # We are starting a new block. If we have a
1180
 
                # manager, we have found everything that fits for
1181
 
                # now, so yield records
1182
 
                for factory in self._flush_manager():
1183
 
                    yield factory
1184
 
                # Now start a new manager.
1185
 
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1186
 
                    # The next block from _get_blocks will be the block we
1187
 
                    # need.
1188
 
                    block_read_memo, block = blocks.next()
1189
 
                    if block_read_memo != read_memo:
1190
 
                        raise AssertionError(
1191
 
                            "block_read_memo out of sync with read_memo"
1192
 
                            "(%r != %r)" % (block_read_memo, read_memo))
1193
 
                    self.batch_memos[read_memo] = block
1194
 
                    memos_to_get_stack.pop()
1195
 
                else:
1196
 
                    block = self.batch_memos[read_memo]
1197
 
                self.manager = _LazyGroupContentManager(block,
1198
 
                    get_compressor_settings=self._get_compressor_settings)
1199
 
                self.last_read_memo = read_memo
1200
 
            start, end = index_memo[3:5]
1201
 
            self.manager.add_factory(key, parents, start, end)
1202
 
        if full_flush:
1203
 
            for factory in self._flush_manager():
1204
 
                yield factory
1205
 
        del self.keys[:]
1206
 
        self.batch_memos.clear()
1207
 
        del self.memos_to_get[:]
1208
 
        self.total_bytes = 0
1209
 
 
1210
 
 
1211
 
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
 
978
class GroupCompressVersionedFiles(VersionedFiles):
1212
979
    """A group-compress based VersionedFiles implementation."""
1213
980
 
1214
 
    # This controls how the GroupCompress DeltaIndex works. Basically, we
1215
 
    # compute hash pointers into the source blocks (so hash(text) => text).
1216
 
    # However each of these references costs some memory in trade against a
1217
 
    # more accurate match result. For very large files, they either are
1218
 
    # pre-compressed and change in bulk whenever they change, or change in just
1219
 
    # local blocks. Either way, 'improved resolution' is not very helpful,
1220
 
    # versus running out of memory trying to track everything. The default max
1221
 
    # gives 100% sampling of a 1MB file.
1222
 
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
 
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
 
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
1225
 
 
1226
 
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1227
 
                 _group_cache=None):
 
981
    def __init__(self, index, access, delta=True):
1228
982
        """Create a GroupCompressVersionedFiles object.
1229
983
 
1230
984
        :param index: The index object storing access and graph data.
1231
985
        :param access: The access object storing raw data.
1232
986
        :param delta: Whether to delta compress or just entropy compress.
1233
 
        :param _unadded_refs: private parameter, don't use.
1234
 
        :param _group_cache: private parameter, don't use.
1235
987
        """
1236
988
        self._index = index
1237
989
        self._access = access
1238
990
        self._delta = delta
1239
 
        if _unadded_refs is None:
1240
 
            _unadded_refs = {}
1241
 
        self._unadded_refs = _unadded_refs
1242
 
        if _group_cache is None:
1243
 
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
 
        self._group_cache = _group_cache
1245
 
        self._immediate_fallback_vfs = []
1246
 
        self._max_bytes_to_index = None
1247
 
 
1248
 
    def without_fallbacks(self):
1249
 
        """Return a clone of this object without any fallbacks configured."""
1250
 
        return GroupCompressVersionedFiles(self._index, self._access,
1251
 
            self._delta, _unadded_refs=dict(self._unadded_refs),
1252
 
            _group_cache=self._group_cache)
 
991
        self._unadded_refs = {}
 
992
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
 
993
        self._fallback_vfs = []
1253
994
 
1254
995
    def add_lines(self, key, parents, lines, parent_texts=None,
1255
996
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1259
1000
        :param key: The key tuple of the text to add.
1260
1001
        :param parents: The parents key tuples of the text to add.
1261
1002
        :param lines: A list of lines. Each line must be a bytestring. And all
1262
 
            of them except the last must be terminated with \\n and contain no
1263
 
            other \\n's. The last line may either contain no \\n's or a single
1264
 
            terminating \\n. If the lines list does meet this constraint the
1265
 
            add routine may error or may succeed - but you will be unable to
1266
 
            read the data back accurately. (Checking the lines have been split
 
1003
            of them except the last must be terminated with \n and contain no
 
1004
            other \n's. The last line may either contain no \n's or a single
 
1005
            terminating \n. If the lines list does meet this constraint the add
 
1006
            routine may error or may succeed - but you will be unable to read
 
1007
            the data back accurately. (Checking the lines have been split
1267
1008
            correctly is expensive and extremely unlikely to catch bugs so it
1268
1009
            is not done at runtime unless check_content is True.)
1269
1010
        :param parent_texts: An optional dictionary containing the opaque
1324
1065
 
1325
1066
        :param a_versioned_files: A VersionedFiles object.
1326
1067
        """
1327
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1068
        self._fallback_vfs.append(a_versioned_files)
1328
1069
 
1329
1070
    def annotate(self, key):
1330
1071
        """See VersionedFiles.annotate."""
1334
1075
    def get_annotator(self):
1335
1076
        return annotate.Annotator(self)
1336
1077
 
1337
 
    def check(self, progress_bar=None, keys=None):
 
1078
    def check(self, progress_bar=None):
1338
1079
        """See VersionedFiles.check()."""
1339
 
        if keys is None:
1340
 
            keys = self.keys()
1341
 
            for record in self.get_record_stream(keys, 'unordered', True):
1342
 
                record.get_bytes_as('fulltext')
1343
 
        else:
1344
 
            return self.get_record_stream(keys, 'unordered', True)
1345
 
 
1346
 
    def clear_cache(self):
1347
 
        """See VersionedFiles.clear_cache()"""
1348
 
        self._group_cache.clear()
1349
 
        self._index._graph_index.clear_cache()
1350
 
        self._index._int_cache.clear()
 
1080
        keys = self.keys()
 
1081
        for record in self.get_record_stream(keys, 'unordered', True):
 
1082
            record.get_bytes_as('fulltext')
1351
1083
 
1352
1084
    def _check_add(self, key, lines, random_id, check_content):
1353
1085
        """check that version_id and lines are safe to add."""
1384
1116
            and so on.
1385
1117
        """
1386
1118
        result = {}
1387
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1119
        sources = [self._index] + self._fallback_vfs
1388
1120
        source_results = []
1389
1121
        missing = set(keys)
1390
1122
        for source in sources:
1396
1128
            missing.difference_update(set(new_result))
1397
1129
        return result, source_results
1398
1130
 
1399
 
    def _get_blocks(self, read_memos):
1400
 
        """Get GroupCompressBlocks for the given read_memos.
1401
 
 
1402
 
        :returns: a series of (read_memo, block) pairs, in the order they were
1403
 
            originally passed.
1404
 
        """
1405
 
        cached = {}
1406
 
        for read_memo in read_memos:
1407
 
            try:
1408
 
                block = self._group_cache[read_memo]
1409
 
            except KeyError:
1410
 
                pass
1411
 
            else:
1412
 
                cached[read_memo] = block
1413
 
        not_cached = []
1414
 
        not_cached_seen = set()
1415
 
        for read_memo in read_memos:
1416
 
            if read_memo in cached:
1417
 
                # Don't fetch what we already have
1418
 
                continue
1419
 
            if read_memo in not_cached_seen:
1420
 
                # Don't try to fetch the same data twice
1421
 
                continue
1422
 
            not_cached.append(read_memo)
1423
 
            not_cached_seen.add(read_memo)
1424
 
        raw_records = self._access.get_raw_records(not_cached)
1425
 
        for read_memo in read_memos:
1426
 
            try:
1427
 
                yield read_memo, cached[read_memo]
1428
 
            except KeyError:
1429
 
                # Read the block, and cache it.
1430
 
                zdata = raw_records.next()
1431
 
                block = GroupCompressBlock.from_bytes(zdata)
1432
 
                self._group_cache[read_memo] = block
1433
 
                cached[read_memo] = block
1434
 
                yield read_memo, block
 
1131
    def _get_block(self, index_memo):
 
1132
        read_memo = index_memo[0:3]
 
1133
        # get the group:
 
1134
        try:
 
1135
            block = self._group_cache[read_memo]
 
1136
        except KeyError:
 
1137
            # read the group
 
1138
            zdata = self._access.get_raw_records([read_memo]).next()
 
1139
            # decompress - whole thing - this is not a bug, as it
 
1140
            # permits caching. We might want to store the partially
 
1141
            # decompresed group and decompress object, so that recent
 
1142
            # texts are not penalised by big groups.
 
1143
            block = GroupCompressBlock.from_bytes(zdata)
 
1144
            self._group_cache[read_memo] = block
 
1145
        # cheapo debugging:
 
1146
        # print len(zdata), len(plain)
 
1147
        # parse - requires split_lines, better to have byte offsets
 
1148
        # here (but not by much - we only split the region for the
 
1149
        # recipe, and we often want to end up with lines anyway.
 
1150
        return block
1435
1151
 
1436
1152
    def get_missing_compression_parent_keys(self):
1437
1153
        """Return the keys of missing compression parents.
1491
1207
        parent_map = {}
1492
1208
        key_to_source_map = {}
1493
1209
        source_results = []
1494
 
        for source in self._immediate_fallback_vfs:
 
1210
        for source in self._fallback_vfs:
1495
1211
            if not missing:
1496
1212
                break
1497
1213
            source_parents = source.get_parent_map(missing)
1507
1223
 
1508
1224
        The returned objects should be in the order defined by 'ordering',
1509
1225
        which can weave between different sources.
1510
 
 
1511
1226
        :param ordering: Must be one of 'topological' or 'groupcompress'
1512
1227
        :return: List of [(source, [keys])] tuples, such that all keys are in
1513
1228
            the defined order, regardless of source.
1514
1229
        """
1515
1230
        if ordering == 'topological':
1516
 
            present_keys = tsort.topo_sort(parent_map)
 
1231
            present_keys = topo_sort(parent_map)
1517
1232
        else:
1518
1233
            # ordering == 'groupcompress'
1519
1234
            # XXX: This only optimizes for the target ordering. We may need
1604
1319
                unadded_keys, source_result)
1605
1320
        for key in missing:
1606
1321
            yield AbsentContentFactory(key)
1607
 
        # Batch up as many keys as we can until either:
1608
 
        #  - we encounter an unadded ref, or
1609
 
        #  - we run out of keys, or
1610
 
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1611
 
        batcher = _BatchingBlockFetcher(self, locations,
1612
 
            get_compressor_settings=self._get_compressor_settings)
 
1322
        manager = None
 
1323
        last_read_memo = None
 
1324
        # TODO: This works fairly well at batching up existing groups into a
 
1325
        #       streamable format, and possibly allowing for taking one big
 
1326
        #       group and splitting it when it isn't fully utilized.
 
1327
        #       However, it doesn't allow us to find under-utilized groups and
 
1328
        #       combine them into a bigger group on the fly.
 
1329
        #       (Consider the issue with how chk_map inserts texts
 
1330
        #       one-at-a-time.) This could be done at insert_record_stream()
 
1331
        #       time, but it probably would decrease the number of
 
1332
        #       bytes-on-the-wire for fetch.
1613
1333
        for source, keys in source_keys:
1614
1334
            if source is self:
1615
1335
                for key in keys:
1616
1336
                    if key in self._unadded_refs:
1617
 
                        # Flush batch, then yield unadded ref from
1618
 
                        # self._compressor.
1619
 
                        for factory in batcher.yield_factories(full_flush=True):
1620
 
                            yield factory
 
1337
                        if manager is not None:
 
1338
                            for factory in manager.get_record_stream():
 
1339
                                yield factory
 
1340
                            last_read_memo = manager = None
1621
1341
                        bytes, sha1 = self._compressor.extract(key)
1622
1342
                        parents = self._unadded_refs[key]
1623
1343
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1624
 
                        continue
1625
 
                    if batcher.add_key(key) > BATCH_SIZE:
1626
 
                        # Ok, this batch is big enough.  Yield some results.
1627
 
                        for factory in batcher.yield_factories():
1628
 
                            yield factory
 
1344
                    else:
 
1345
                        index_memo, _, parents, (method, _) = locations[key]
 
1346
                        read_memo = index_memo[0:3]
 
1347
                        if last_read_memo != read_memo:
 
1348
                            # We are starting a new block. If we have a
 
1349
                            # manager, we have found everything that fits for
 
1350
                            # now, so yield records
 
1351
                            if manager is not None:
 
1352
                                for factory in manager.get_record_stream():
 
1353
                                    yield factory
 
1354
                            # Now start a new manager
 
1355
                            block = self._get_block(index_memo)
 
1356
                            manager = _LazyGroupContentManager(block)
 
1357
                            last_read_memo = read_memo
 
1358
                        start, end = index_memo[3:5]
 
1359
                        manager.add_factory(key, parents, start, end)
1629
1360
            else:
1630
 
                for factory in batcher.yield_factories(full_flush=True):
1631
 
                    yield factory
 
1361
                if manager is not None:
 
1362
                    for factory in manager.get_record_stream():
 
1363
                        yield factory
 
1364
                    last_read_memo = manager = None
1632
1365
                for record in source.get_record_stream(keys, ordering,
1633
1366
                                                       include_delta_closure):
1634
1367
                    yield record
1635
 
        for factory in batcher.yield_factories(full_flush=True):
1636
 
            yield factory
 
1368
        if manager is not None:
 
1369
            for factory in manager.get_record_stream():
 
1370
                yield factory
1637
1371
 
1638
1372
    def get_sha1s(self, keys):
1639
1373
        """See VersionedFiles.get_sha1s()."""
1661
1395
        for _ in self._insert_record_stream(stream, random_id=False):
1662
1396
            pass
1663
1397
 
1664
 
    def _get_compressor_settings(self):
1665
 
        if self._max_bytes_to_index is None:
1666
 
            # TODO: VersionedFiles don't know about their containing
1667
 
            #       repository, so they don't have much of an idea about their
1668
 
            #       location. So for now, this is only a global option.
1669
 
            c = config.GlobalConfig()
1670
 
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
 
            if val is not None:
1672
 
                try:
1673
 
                    val = int(val)
1674
 
                except ValueError, e:
1675
 
                    trace.warning('Value for '
1676
 
                                  '"bzr.groupcompress.max_bytes_to_index"'
1677
 
                                  ' %r is not an integer'
1678
 
                                  % (val,))
1679
 
                    val = None
1680
 
            if val is None:
1681
 
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
 
            self._max_bytes_to_index = val
1683
 
        return {'max_bytes_to_index': self._max_bytes_to_index}
1684
 
 
1685
 
    def _make_group_compressor(self):
1686
 
        return GroupCompressor(self._get_compressor_settings())
1687
 
 
1688
1398
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1399
                              reuse_blocks=True):
1690
1400
        """Internal core to insert a record stream into this container.
1713
1423
                return adapter
1714
1424
        # This will go up to fulltexts for gc to gc fetching, which isn't
1715
1425
        # ideal.
1716
 
        self._compressor = self._make_group_compressor()
 
1426
        self._compressor = GroupCompressor()
1717
1427
        self._unadded_refs = {}
1718
1428
        keys_to_add = []
1719
1429
        def flush():
1720
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1721
 
            self._compressor = self._make_group_compressor()
1722
 
            # Note: At this point we still have 1 copy of the fulltext (in
1723
 
            #       record and the var 'bytes'), and this generates 2 copies of
1724
 
            #       the compressed text (one for bytes, one in chunks)
1725
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1726
 
            #       have to double compressed memory here
1727
 
            # TODO: Figure out how to indicate that we would be happy to free
1728
 
            #       the fulltext content at this point. Note that sometimes we
1729
 
            #       will want it later (streaming CHK pages), but most of the
1730
 
            #       time we won't (everything else)
1731
 
            bytes = ''.join(chunks)
1732
 
            del chunks
 
1430
            bytes = self._compressor.flush().to_bytes()
1733
1431
            index, start, length = self._access.add_raw_records(
1734
1432
                [(None, len(bytes))], bytes)[0]
1735
1433
            nodes = []
1738
1436
            self._index.add_records(nodes, random_id=random_id)
1739
1437
            self._unadded_refs = {}
1740
1438
            del keys_to_add[:]
 
1439
            self._compressor = GroupCompressor()
1741
1440
 
1742
1441
        last_prefix = None
1743
1442
        max_fulltext_len = 0
1747
1446
        block_length = None
1748
1447
        # XXX: TODO: remove this, it is just for safety checking for now
1749
1448
        inserted_keys = set()
1750
 
        reuse_this_block = reuse_blocks
1751
1449
        for record in stream:
1752
1450
            # Raise an error when a record is missing.
1753
1451
            if record.storage_kind == 'absent':
1761
1459
            if reuse_blocks:
1762
1460
                # If the reuse_blocks flag is set, check to see if we can just
1763
1461
                # copy a groupcompress block as-is.
1764
 
                # We only check on the first record (groupcompress-block) not
1765
 
                # on all of the (groupcompress-block-ref) entries.
1766
 
                # The reuse_this_block flag is then kept for as long as
1767
 
                if record.storage_kind == 'groupcompress-block':
1768
 
                    # Check to see if we really want to re-use this block
1769
 
                    insert_manager = record._manager
1770
 
                    reuse_this_block = insert_manager.check_is_well_utilized()
1771
 
            else:
1772
 
                reuse_this_block = False
1773
 
            if reuse_this_block:
1774
 
                # We still want to reuse this block
1775
1462
                if record.storage_kind == 'groupcompress-block':
1776
1463
                    # Insert the raw block into the target repo
1777
1464
                    insert_manager = record._manager
 
1465
                    insert_manager._check_rebuild_block()
1778
1466
                    bytes = record._manager._block.to_bytes()
1779
1467
                    _, start, length = self._access.add_raw_records(
1780
1468
                        [(None, len(bytes))], bytes)[0]
1785
1473
                                           'groupcompress-block-ref'):
1786
1474
                    if insert_manager is None:
1787
1475
                        raise AssertionError('No insert_manager set')
1788
 
                    if insert_manager is not record._manager:
1789
 
                        raise AssertionError('insert_manager does not match'
1790
 
                            ' the current record, we cannot be positive'
1791
 
                            ' that the appropriate content was inserted.'
1792
 
                            )
1793
1476
                    value = "%d %d %d %d" % (block_start, block_length,
1794
1477
                                             record._start, record._end)
1795
1478
                    nodes = [(record.key, value, (record.parents,))]
1845
1528
                key = record.key
1846
1529
            self._unadded_refs[key] = record.parents
1847
1530
            yield found_sha1
1848
 
            as_st = static_tuple.StaticTuple.from_sequence
1849
 
            if record.parents is not None:
1850
 
                parents = as_st([as_st(p) for p in record.parents])
1851
 
            else:
1852
 
                parents = None
1853
 
            refs = static_tuple.StaticTuple(parents)
1854
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1531
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1532
                (record.parents,)))
1855
1533
        if len(keys_to_add):
1856
1534
            flush()
1857
1535
        self._compressor = None
1900
1578
        """See VersionedFiles.keys."""
1901
1579
        if 'evil' in debug.debug_flags:
1902
1580
            trace.mutter_callsite(2, "keys scales with size of history")
1903
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1581
        sources = [self._index] + self._fallback_vfs
1904
1582
        result = set()
1905
1583
        for source in sources:
1906
1584
            result.update(source.keys())
1907
1585
        return result
1908
1586
 
1909
1587
 
1910
 
class _GCBuildDetails(object):
1911
 
    """A blob of data about the build details.
1912
 
 
1913
 
    This stores the minimal data, which then allows compatibility with the old
1914
 
    api, without taking as much memory.
1915
 
    """
1916
 
 
1917
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1918
 
                 '_delta_end', '_parents')
1919
 
 
1920
 
    method = 'group'
1921
 
    compression_parent = None
1922
 
 
1923
 
    def __init__(self, parents, position_info):
1924
 
        self._parents = parents
1925
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1926
 
         self._delta_end) = position_info
1927
 
 
1928
 
    def __repr__(self):
1929
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1930
 
            self.index_memo, self._parents)
1931
 
 
1932
 
    @property
1933
 
    def index_memo(self):
1934
 
        return (self._index, self._group_start, self._group_end,
1935
 
                self._basis_end, self._delta_end)
1936
 
 
1937
 
    @property
1938
 
    def record_details(self):
1939
 
        return static_tuple.StaticTuple(self.method, None)
1940
 
 
1941
 
    def __getitem__(self, offset):
1942
 
        """Compatibility thunk to act like a tuple."""
1943
 
        if offset == 0:
1944
 
            return self.index_memo
1945
 
        elif offset == 1:
1946
 
            return self.compression_parent # Always None
1947
 
        elif offset == 2:
1948
 
            return self._parents
1949
 
        elif offset == 3:
1950
 
            return self.record_details
1951
 
        else:
1952
 
            raise IndexError('offset out of range')
1953
 
            
1954
 
    def __len__(self):
1955
 
        return 4
1956
 
 
1957
 
 
1958
1588
class _GCGraphIndex(object):
1959
1589
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1960
1590
 
1961
1591
    def __init__(self, graph_index, is_locked, parents=True,
1962
1592
        add_callback=None, track_external_parent_refs=False,
1963
 
        inconsistency_fatal=True, track_new_keys=False):
 
1593
        inconsistency_fatal=True):
1964
1594
        """Construct a _GCGraphIndex on a graph_index.
1965
1595
 
1966
1596
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1985
1615
        self.has_graph = parents
1986
1616
        self._is_locked = is_locked
1987
1617
        self._inconsistency_fatal = inconsistency_fatal
1988
 
        # GroupCompress records tend to have the same 'group' start + offset
1989
 
        # repeated over and over, this creates a surplus of ints
1990
 
        self._int_cache = {}
1991
1618
        if track_external_parent_refs:
1992
 
            self._key_dependencies = _KeyRefs(
1993
 
                track_new_keys=track_new_keys)
 
1619
            self._key_dependencies = knit._KeyRefs()
1994
1620
        else:
1995
1621
            self._key_dependencies = None
1996
1622
 
2029
1655
        if not random_id:
2030
1656
            present_nodes = self._get_entries(keys)
2031
1657
            for (index, key, value, node_refs) in present_nodes:
2032
 
                # Sometimes these are passed as a list rather than a tuple
2033
 
                node_refs = static_tuple.as_tuples(node_refs)
2034
 
                passed = static_tuple.as_tuples(keys[key])
2035
 
                if node_refs != passed[1]:
2036
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
 
1658
                if node_refs != keys[key][1]:
 
1659
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
2037
1660
                    if self._inconsistency_fatal:
2038
1661
                        raise errors.KnitCorrupt(self, "inconsistent details"
2039
1662
                                                 " in add_records: %s" %
2053
1676
                    result.append((key, value))
2054
1677
            records = result
2055
1678
        key_dependencies = self._key_dependencies
2056
 
        if key_dependencies is not None:
2057
 
            if self._parents:
2058
 
                for key, value, refs in records:
2059
 
                    parents = refs[0]
2060
 
                    key_dependencies.add_references(key, parents)
2061
 
            else:
2062
 
                for key, value, refs in records:
2063
 
                    new_keys.add_key(key)
 
1679
        if key_dependencies is not None and self._parents:
 
1680
            for key, value, refs in records:
 
1681
                parents = refs[0]
 
1682
                key_dependencies.add_references(key, parents)
2064
1683
        self._add_callback(records)
2065
1684
 
2066
1685
    def _check_read(self):
2097
1716
            if missing_keys:
2098
1717
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
2099
1718
 
2100
 
    def find_ancestry(self, keys):
2101
 
        """See CombinedGraphIndex.find_ancestry"""
2102
 
        return self._graph_index.find_ancestry(keys, 0)
2103
 
 
2104
1719
    def get_parent_map(self, keys):
2105
1720
        """Get a map of the parents of keys.
2106
1721
 
2123
1738
        """Return the keys of missing parents."""
2124
1739
        # Copied from _KnitGraphIndex.get_missing_parents
2125
1740
        # We may have false positives, so filter those out.
2126
 
        self._key_dependencies.satisfy_refs_for_keys(
 
1741
        self._key_dependencies.add_keys(
2127
1742
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
2128
1743
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
2129
1744
 
2135
1750
        :param keys: An iterable of keys.
2136
1751
        :return: A dict of key:
2137
1752
            (index_memo, compression_parent, parents, record_details).
2138
 
 
2139
 
            * index_memo: opaque structure to pass to read_records to extract
2140
 
              the raw data
2141
 
            * compression_parent: Content that this record is built upon, may
2142
 
              be None
2143
 
            * parents: Logical parents of this node
2144
 
            * record_details: extra information about the content which needs
2145
 
              to be passed to Factory.parse_record
 
1753
            index_memo
 
1754
                opaque structure to pass to read_records to extract the raw
 
1755
                data
 
1756
            compression_parent
 
1757
                Content that this record is built upon, may be None
 
1758
            parents
 
1759
                Logical parents of this node
 
1760
            record_details
 
1761
                extra information about the content which needs to be passed to
 
1762
                Factory.parse_record
2146
1763
        """
2147
1764
        self._check_read()
2148
1765
        result = {}
2153
1770
                parents = None
2154
1771
            else:
2155
1772
                parents = entry[3][0]
2156
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2157
 
            result[key] = details
 
1773
            method = 'group'
 
1774
            result[key] = (self._node_to_position(entry),
 
1775
                                  None, parents, (method, None))
2158
1776
        return result
2159
1777
 
2160
1778
    def keys(self):
2169
1787
        """Convert an index value to position details."""
2170
1788
        bits = node[2].split(' ')
2171
1789
        # It would be nice not to read the entire gzip.
2172
 
        # start and stop are put into _int_cache because they are very common.
2173
 
        # They define the 'group' that an entry is in, and many groups can have
2174
 
        # thousands of objects.
2175
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2176
 
        # each, or about 7MB. Note that it might be even more when you consider
2177
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2178
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2179
 
        # a LIFO when re-using PyInt slots, which might cause more
2180
 
        # fragmentation.
2181
1790
        start = int(bits[0])
2182
 
        start = self._int_cache.setdefault(start, start)
2183
1791
        stop = int(bits[1])
2184
 
        stop = self._int_cache.setdefault(stop, stop)
2185
1792
        basis_end = int(bits[2])
2186
1793
        delta_end = int(bits[3])
2187
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2188
 
        # instance...
2189
 
        return (node[0], start, stop, basis_end, delta_end)
 
1794
        return node[0], start, stop, basis_end, delta_end
2190
1795
 
2191
1796
    def scan_unvalidated_index(self, graph_index):
2192
1797
        """Inform this _GCGraphIndex that there is an unvalidated index.
2193
1798
 
2194
1799
        This allows this _GCGraphIndex to keep track of any missing
2195
1800
        compression parents we may want to have filled in to make those
2196
 
        indices valid.  It also allows _GCGraphIndex to track any new keys.
 
1801
        indices valid.
2197
1802
 
2198
1803
        :param graph_index: A GraphIndex
2199
1804
        """
2200
 
        key_dependencies = self._key_dependencies
2201
 
        if key_dependencies is None:
2202
 
            return
2203
 
        for node in graph_index.iter_all_entries():
2204
 
            # Add parent refs from graph_index (and discard parent refs
2205
 
            # that the graph_index has).
2206
 
            key_dependencies.add_references(node[1], node[3][0])
 
1805
        if self._key_dependencies is not None:
 
1806
            # Add parent refs from graph_index (and discard parent refs that
 
1807
            # the graph_index has).
 
1808
            add_refs = self._key_dependencies.add_references
 
1809
            for node in graph_index.iter_all_entries():
 
1810
                add_refs(node[1], node[3][0])
 
1811
 
2207
1812
 
2208
1813
 
2209
1814
from bzrlib._groupcompress_py import (
2223
1828
        decode_base128_int,
2224
1829
        )
2225
1830
    GroupCompressor = PyrexGroupCompressor
2226
 
except ImportError, e:
2227
 
    osutils.failed_to_load_extension(e)
 
1831
except ImportError:
2228
1832
    GroupCompressor = PythonGroupCompressor
2229
1833