~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-17 02:01:07 UTC
  • mto: This revision was merged to the branch mainline in revision 4546.
  • Revision ID: andrew.bennetts@canonical.com-20090717020107-zdic2sfv4wtfgi7s
Document hpssvfs tag.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
    knit,
32
32
    osutils,
33
33
    pack,
34
 
    static_tuple,
35
34
    trace,
36
35
    )
 
36
from bzrlib.graph import Graph
37
37
from bzrlib.btree_index import BTreeBuilder
38
38
from bzrlib.lru_cache import LRUSizeCache
39
39
from bzrlib.tsort import topo_sort
45
45
    VersionedFiles,
46
46
    )
47
47
 
48
 
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
 
# groupcompress blocks.
50
 
BATCH_SIZE = 2**16
51
 
 
52
48
_USE_LZMA = False and (pylzma is not None)
53
49
 
54
50
# osutils.sha_string('')
55
51
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
56
52
 
 
53
 
57
54
def sort_gc_optimal(parent_map):
58
55
    """Sort and group the keys in parent_map into groupcompress order.
59
56
 
65
62
    # groupcompress ordering is approximately reverse topological,
66
63
    # properly grouped by file-id.
67
64
    per_prefix_map = {}
68
 
    for key, value in parent_map.iteritems():
 
65
    for item in parent_map.iteritems():
 
66
        key = item[0]
69
67
        if isinstance(key, str) or len(key) == 1:
70
68
            prefix = ''
71
69
        else:
72
70
            prefix = key[0]
73
71
        try:
74
 
            per_prefix_map[prefix][key] = value
 
72
            per_prefix_map[prefix].append(item)
75
73
        except KeyError:
76
 
            per_prefix_map[prefix] = {key: value}
 
74
            per_prefix_map[prefix] = [item]
77
75
 
78
76
    present_keys = []
79
77
    for prefix in sorted(per_prefix_map):
120
118
        :param num_bytes: Ensure that we have extracted at least num_bytes of
121
119
            content. If None, consume everything
122
120
        """
123
 
        if self._content_length is None:
124
 
            raise AssertionError('self._content_length should never be None')
 
121
        # TODO: If we re-use the same content block at different times during
 
122
        #       get_record_stream(), it is possible that the first pass will
 
123
        #       get inserted, triggering an extract/_ensure_content() which
 
124
        #       will get rid of _z_content. And then the next use of the block
 
125
        #       will try to access _z_content (to send it over the wire), and
 
126
        #       fail because it is already extracted. Consider never releasing
 
127
        #       _z_content because of this.
125
128
        if num_bytes is None:
126
129
            num_bytes = self._content_length
127
130
        elif (self._content_length is not None
144
147
                self._content = pylzma.decompress(self._z_content)
145
148
            elif self._compressor_name == 'zlib':
146
149
                # Start a zlib decompressor
147
 
                if num_bytes * 4 > self._content_length * 3:
148
 
                    # If we are requesting more that 3/4ths of the content,
149
 
                    # just extract the whole thing in a single pass
150
 
                    num_bytes = self._content_length
 
150
                if num_bytes is None:
151
151
                    self._content = zlib.decompress(self._z_content)
152
152
                else:
153
153
                    self._z_content_decompressor = zlib.decompressobj()
155
155
                    # that the rest of the code is simplified
156
156
                    self._content = self._z_content_decompressor.decompress(
157
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
 
                    if not self._z_content_decompressor.unconsumed_tail:
159
 
                        self._z_content_decompressor = None
160
158
            else:
161
159
                raise AssertionError('Unknown compressor: %r'
162
160
                                     % self._compressor_name)
164
162
        # 'unconsumed_tail'
165
163
 
166
164
        # Do we have enough bytes already?
167
 
        if len(self._content) >= num_bytes:
 
165
        if num_bytes is not None and len(self._content) >= num_bytes:
 
166
            return
 
167
        if num_bytes is None and self._z_content_decompressor is None:
 
168
            # We must have already decompressed everything
168
169
            return
169
170
        # If we got this far, and don't have a decompressor, something is wrong
170
171
        if self._z_content_decompressor is None:
171
172
            raise AssertionError(
172
173
                'No decompressor to decompress %d bytes' % num_bytes)
173
174
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
 
        if not remaining_decomp:
175
 
            raise AssertionError('Nothing left to decompress')
176
 
        needed_bytes = num_bytes - len(self._content)
177
 
        # We always set max_size to 32kB over the minimum needed, so that
178
 
        # zlib will give us as much as we really want.
179
 
        # TODO: If this isn't good enough, we could make a loop here,
180
 
        #       that keeps expanding the request until we get enough
181
 
        self._content += self._z_content_decompressor.decompress(
182
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
183
 
        if len(self._content) < num_bytes:
184
 
            raise AssertionError('%d bytes wanted, only %d available'
185
 
                                 % (num_bytes, len(self._content)))
186
 
        if not self._z_content_decompressor.unconsumed_tail:
187
 
            # The stream is finished
188
 
            self._z_content_decompressor = None
 
175
        if num_bytes is None:
 
176
            if remaining_decomp:
 
177
                # We don't know how much is left, but we'll decompress it all
 
178
                self._content += self._z_content_decompressor.decompress(
 
179
                    remaining_decomp)
 
180
                # Note: There's what I consider a bug in zlib.decompressobj
 
181
                #       If you pass back in the entire unconsumed_tail, only
 
182
                #       this time you don't pass a max-size, it doesn't
 
183
                #       change the unconsumed_tail back to None/''.
 
184
                #       However, we know we are done with the whole stream
 
185
                self._z_content_decompressor = None
 
186
            # XXX: Why is this the only place in this routine we set this?
 
187
            self._content_length = len(self._content)
 
188
        else:
 
189
            if not remaining_decomp:
 
190
                raise AssertionError('Nothing left to decompress')
 
191
            needed_bytes = num_bytes - len(self._content)
 
192
            # We always set max_size to 32kB over the minimum needed, so that
 
193
            # zlib will give us as much as we really want.
 
194
            # TODO: If this isn't good enough, we could make a loop here,
 
195
            #       that keeps expanding the request until we get enough
 
196
            self._content += self._z_content_decompressor.decompress(
 
197
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
198
            if len(self._content) < num_bytes:
 
199
                raise AssertionError('%d bytes wanted, only %d available'
 
200
                                     % (num_bytes, len(self._content)))
 
201
            if not self._z_content_decompressor.unconsumed_tail:
 
202
                # The stream is finished
 
203
                self._z_content_decompressor = None
189
204
 
190
205
    def _parse_bytes(self, bytes, pos):
191
206
        """Read the various lengths from the header.
441
456
                # There are code paths that first extract as fulltext, and then
442
457
                # extract as storage_kind (smart fetch). So we don't break the
443
458
                # refcycle here, but instead in manager.get_record_stream()
 
459
                # self._manager = None
444
460
            if storage_kind == 'fulltext':
445
461
                return self._bytes
446
462
            else:
452
468
class _LazyGroupContentManager(object):
453
469
    """This manages a group of _LazyGroupCompressFactory objects."""
454
470
 
455
 
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
456
 
                             # current size, and still be considered
457
 
                             # resuable
458
 
    _full_block_size = 4*1024*1024
459
 
    _full_mixed_block_size = 2*1024*1024
460
 
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
 
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
462
 
 
463
471
    def __init__(self, block):
464
472
        self._block = block
465
473
        # We need to preserve the ordering
537
545
        # time (self._block._content) is a little expensive.
538
546
        self._block._ensure_content(self._last_byte)
539
547
 
540
 
    def _check_rebuild_action(self):
 
548
    def _check_rebuild_block(self):
541
549
        """Check to see if our block should be repacked."""
542
550
        total_bytes_used = 0
543
551
        last_byte_used = 0
544
552
        for factory in self._factories:
545
553
            total_bytes_used += factory._end - factory._start
546
 
            if last_byte_used < factory._end:
547
 
                last_byte_used = factory._end
548
 
        # If we are using more than half of the bytes from the block, we have
549
 
        # nothing else to check
 
554
            last_byte_used = max(last_byte_used, factory._end)
 
555
        # If we are using most of the bytes from the block, we have nothing
 
556
        # else to check (currently more that 1/2)
550
557
        if total_bytes_used * 2 >= self._block._content_length:
551
 
            return None, last_byte_used, total_bytes_used
552
 
        # We are using less than 50% of the content. Is the content we are
553
 
        # using at the beginning of the block? If so, we can just trim the
554
 
        # tail, rather than rebuilding from scratch.
 
558
            return
 
559
        # Can we just strip off the trailing bytes? If we are going to be
 
560
        # transmitting more than 50% of the front of the content, go ahead
555
561
        if total_bytes_used * 2 > last_byte_used:
556
 
            return 'trim', last_byte_used, total_bytes_used
 
562
            self._trim_block(last_byte_used)
 
563
            return
557
564
 
558
565
        # We are using a small amount of the data, and it isn't just packed
559
566
        # nicely at the front, so rebuild the content.
566
573
        #       expanding many deltas into fulltexts, as well.
567
574
        #       If we build a cheap enough 'strip', then we could try a strip,
568
575
        #       if that expands the content, we then rebuild.
569
 
        return 'rebuild', last_byte_used, total_bytes_used
570
 
 
571
 
    def check_is_well_utilized(self):
572
 
        """Is the current block considered 'well utilized'?
573
 
 
574
 
        This heuristic asks if the current block considers itself to be a fully
575
 
        developed group, rather than just a loose collection of data.
576
 
        """
577
 
        if len(self._factories) == 1:
578
 
            # A block of length 1 could be improved by combining with other
579
 
            # groups - don't look deeper. Even larger than max size groups
580
 
            # could compress well with adjacent versions of the same thing.
581
 
            return False
582
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
583
 
        block_size = self._block._content_length
584
 
        if total_bytes_used < block_size * self._max_cut_fraction:
585
 
            # This block wants to trim itself small enough that we want to
586
 
            # consider it under-utilized.
587
 
            return False
588
 
        # TODO: This code is meant to be the twin of _insert_record_stream's
589
 
        #       'start_new_block' logic. It would probably be better to factor
590
 
        #       out that logic into a shared location, so that it stays
591
 
        #       together better
592
 
        # We currently assume a block is properly utilized whenever it is >75%
593
 
        # of the size of a 'full' block. In normal operation, a block is
594
 
        # considered full when it hits 4MB of same-file content. So any block
595
 
        # >3MB is 'full enough'.
596
 
        # The only time this isn't true is when a given block has large-object
597
 
        # content. (a single file >4MB, etc.)
598
 
        # Under these circumstances, we allow a block to grow to
599
 
        # 2 x largest_content.  Which means that if a given block had a large
600
 
        # object, it may actually be under-utilized. However, given that this
601
 
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
602
 
        # content blobs on-the-fly. Note that because we return False for all
603
 
        # 1-item blobs, we will repack them; we may wish to reevaluate our
604
 
        # treatment of large object blobs in the future.
605
 
        if block_size >= self._full_enough_block_size:
606
 
            return True
607
 
        # If a block is <3MB, it still may be considered 'full' if it contains
608
 
        # mixed content. The current rule is 2MB of mixed content is considered
609
 
        # full. So check to see if this block contains mixed content, and
610
 
        # set the threshold appropriately.
611
 
        common_prefix = None
612
 
        for factory in self._factories:
613
 
            prefix = factory.key[:-1]
614
 
            if common_prefix is None:
615
 
                common_prefix = prefix
616
 
            elif prefix != common_prefix:
617
 
                # Mixed content, check the size appropriately
618
 
                if block_size >= self._full_enough_mixed_block_size:
619
 
                    return True
620
 
                break
621
 
        # The content failed both the mixed check and the single-content check
622
 
        # so obviously it is not fully utilized
623
 
        # TODO: there is one other constraint that isn't being checked
624
 
        #       namely, that the entries in the block are in the appropriate
625
 
        #       order. For example, you could insert the entries in exactly
626
 
        #       reverse groupcompress order, and we would think that is ok.
627
 
        #       (all the right objects are in one group, and it is fully
628
 
        #       utilized, etc.) For now, we assume that case is rare,
629
 
        #       especially since we should always fetch in 'groupcompress'
630
 
        #       order.
631
 
        return False
632
 
 
633
 
    def _check_rebuild_block(self):
634
 
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
635
 
        if action is None:
636
 
            return
637
 
        if action == 'trim':
638
 
            self._trim_block(last_byte_used)
639
 
        elif action == 'rebuild':
640
 
            self._rebuild_block()
641
 
        else:
642
 
            raise ValueError('unknown rebuild action: %r' % (action,))
 
576
        self._rebuild_block()
643
577
 
644
578
    def _wire_bytes(self):
645
579
        """Return a byte stream suitable for transmitting over the wire."""
1041
975
    versioned_files.stream.close()
1042
976
 
1043
977
 
1044
 
class _BatchingBlockFetcher(object):
1045
 
    """Fetch group compress blocks in batches.
1046
 
    
1047
 
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1048
 
        currently pending batch.
1049
 
    """
1050
 
 
1051
 
    def __init__(self, gcvf, locations):
1052
 
        self.gcvf = gcvf
1053
 
        self.locations = locations
1054
 
        self.keys = []
1055
 
        self.batch_memos = {}
1056
 
        self.memos_to_get = []
1057
 
        self.total_bytes = 0
1058
 
        self.last_read_memo = None
1059
 
        self.manager = None
1060
 
 
1061
 
    def add_key(self, key):
1062
 
        """Add another to key to fetch.
1063
 
        
1064
 
        :return: The estimated number of bytes needed to fetch the batch so
1065
 
            far.
1066
 
        """
1067
 
        self.keys.append(key)
1068
 
        index_memo, _, _, _ = self.locations[key]
1069
 
        read_memo = index_memo[0:3]
1070
 
        # Three possibilities for this read_memo:
1071
 
        #  - it's already part of this batch; or
1072
 
        #  - it's not yet part of this batch, but is already cached; or
1073
 
        #  - it's not yet part of this batch and will need to be fetched.
1074
 
        if read_memo in self.batch_memos:
1075
 
            # This read memo is already in this batch.
1076
 
            return self.total_bytes
1077
 
        try:
1078
 
            cached_block = self.gcvf._group_cache[read_memo]
1079
 
        except KeyError:
1080
 
            # This read memo is new to this batch, and the data isn't cached
1081
 
            # either.
1082
 
            self.batch_memos[read_memo] = None
1083
 
            self.memos_to_get.append(read_memo)
1084
 
            byte_length = read_memo[2]
1085
 
            self.total_bytes += byte_length
1086
 
        else:
1087
 
            # This read memo is new to this batch, but cached.
1088
 
            # Keep a reference to the cached block in batch_memos because it's
1089
 
            # certain that we'll use it when this batch is processed, but
1090
 
            # there's a risk that it would fall out of _group_cache between now
1091
 
            # and then.
1092
 
            self.batch_memos[read_memo] = cached_block
1093
 
        return self.total_bytes
1094
 
        
1095
 
    def _flush_manager(self):
1096
 
        if self.manager is not None:
1097
 
            for factory in self.manager.get_record_stream():
1098
 
                yield factory
1099
 
            self.manager = None
1100
 
            self.last_read_memo = None
1101
 
 
1102
 
    def yield_factories(self, full_flush=False):
1103
 
        """Yield factories for keys added since the last yield.  They will be
1104
 
        returned in the order they were added via add_key.
1105
 
        
1106
 
        :param full_flush: by default, some results may not be returned in case
1107
 
            they can be part of the next batch.  If full_flush is True, then
1108
 
            all results are returned.
1109
 
        """
1110
 
        if self.manager is None and not self.keys:
1111
 
            return
1112
 
        # Fetch all memos in this batch.
1113
 
        blocks = self.gcvf._get_blocks(self.memos_to_get)
1114
 
        # Turn blocks into factories and yield them.
1115
 
        memos_to_get_stack = list(self.memos_to_get)
1116
 
        memos_to_get_stack.reverse()
1117
 
        for key in self.keys:
1118
 
            index_memo, _, parents, _ = self.locations[key]
1119
 
            read_memo = index_memo[:3]
1120
 
            if self.last_read_memo != read_memo:
1121
 
                # We are starting a new block. If we have a
1122
 
                # manager, we have found everything that fits for
1123
 
                # now, so yield records
1124
 
                for factory in self._flush_manager():
1125
 
                    yield factory
1126
 
                # Now start a new manager.
1127
 
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1128
 
                    # The next block from _get_blocks will be the block we
1129
 
                    # need.
1130
 
                    block_read_memo, block = blocks.next()
1131
 
                    if block_read_memo != read_memo:
1132
 
                        raise AssertionError(
1133
 
                            "block_read_memo out of sync with read_memo"
1134
 
                            "(%r != %r)" % (block_read_memo, read_memo))
1135
 
                    self.batch_memos[read_memo] = block
1136
 
                    memos_to_get_stack.pop()
1137
 
                else:
1138
 
                    block = self.batch_memos[read_memo]
1139
 
                self.manager = _LazyGroupContentManager(block)
1140
 
                self.last_read_memo = read_memo
1141
 
            start, end = index_memo[3:5]
1142
 
            self.manager.add_factory(key, parents, start, end)
1143
 
        if full_flush:
1144
 
            for factory in self._flush_manager():
1145
 
                yield factory
1146
 
        del self.keys[:]
1147
 
        self.batch_memos.clear()
1148
 
        del self.memos_to_get[:]
1149
 
        self.total_bytes = 0
1150
 
 
1151
 
 
1152
978
class GroupCompressVersionedFiles(VersionedFiles):
1153
979
    """A group-compress based VersionedFiles implementation."""
1154
980
 
1155
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
981
    def __init__(self, index, access, delta=True):
1156
982
        """Create a GroupCompressVersionedFiles object.
1157
983
 
1158
984
        :param index: The index object storing access and graph data.
1159
985
        :param access: The access object storing raw data.
1160
986
        :param delta: Whether to delta compress or just entropy compress.
1161
 
        :param _unadded_refs: private parameter, don't use.
1162
987
        """
1163
988
        self._index = index
1164
989
        self._access = access
1165
990
        self._delta = delta
1166
 
        if _unadded_refs is None:
1167
 
            _unadded_refs = {}
1168
 
        self._unadded_refs = _unadded_refs
 
991
        self._unadded_refs = {}
1169
992
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
993
        self._fallback_vfs = []
1171
994
 
1172
 
    def without_fallbacks(self):
1173
 
        """Return a clone of this object without any fallbacks configured."""
1174
 
        return GroupCompressVersionedFiles(self._index, self._access,
1175
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
1176
 
 
1177
995
    def add_lines(self, key, parents, lines, parent_texts=None,
1178
996
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1179
997
        check_content=True):
1257
1075
    def get_annotator(self):
1258
1076
        return annotate.Annotator(self)
1259
1077
 
1260
 
    def check(self, progress_bar=None, keys=None):
 
1078
    def check(self, progress_bar=None):
1261
1079
        """See VersionedFiles.check()."""
1262
 
        if keys is None:
1263
 
            keys = self.keys()
1264
 
            for record in self.get_record_stream(keys, 'unordered', True):
1265
 
                record.get_bytes_as('fulltext')
1266
 
        else:
1267
 
            return self.get_record_stream(keys, 'unordered', True)
1268
 
 
1269
 
    def clear_cache(self):
1270
 
        """See VersionedFiles.clear_cache()"""
1271
 
        self._group_cache.clear()
1272
 
        self._index._graph_index.clear_cache()
1273
 
        self._index._int_cache.clear()
 
1080
        keys = self.keys()
 
1081
        for record in self.get_record_stream(keys, 'unordered', True):
 
1082
            record.get_bytes_as('fulltext')
1274
1083
 
1275
1084
    def _check_add(self, key, lines, random_id, check_content):
1276
1085
        """check that version_id and lines are safe to add."""
1287
1096
            self._check_lines_not_unicode(lines)
1288
1097
            self._check_lines_are_lines(lines)
1289
1098
 
1290
 
    def get_known_graph_ancestry(self, keys):
1291
 
        """Get a KnownGraph instance with the ancestry of keys."""
1292
 
        # Note that this is identical to
1293
 
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
 
        # ancestry.
1295
 
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
1297
 
            if not missing_keys:
1298
 
                break
1299
 
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1300
 
                                                missing_keys)
1301
 
            parent_map.update(f_parent_map)
1302
 
            missing_keys = f_missing_keys
1303
 
        kg = _mod_graph.KnownGraph(parent_map)
1304
 
        return kg
1305
 
 
1306
1099
    def get_parent_map(self, keys):
1307
1100
        """Get a map of the graph parents of keys.
1308
1101
 
1335
1128
            missing.difference_update(set(new_result))
1336
1129
        return result, source_results
1337
1130
 
1338
 
    def _get_blocks(self, read_memos):
1339
 
        """Get GroupCompressBlocks for the given read_memos.
1340
 
 
1341
 
        :returns: a series of (read_memo, block) pairs, in the order they were
1342
 
            originally passed.
1343
 
        """
1344
 
        cached = {}
1345
 
        for read_memo in read_memos:
1346
 
            try:
1347
 
                block = self._group_cache[read_memo]
1348
 
            except KeyError:
1349
 
                pass
1350
 
            else:
1351
 
                cached[read_memo] = block
1352
 
        not_cached = []
1353
 
        not_cached_seen = set()
1354
 
        for read_memo in read_memos:
1355
 
            if read_memo in cached:
1356
 
                # Don't fetch what we already have
1357
 
                continue
1358
 
            if read_memo in not_cached_seen:
1359
 
                # Don't try to fetch the same data twice
1360
 
                continue
1361
 
            not_cached.append(read_memo)
1362
 
            not_cached_seen.add(read_memo)
1363
 
        raw_records = self._access.get_raw_records(not_cached)
1364
 
        for read_memo in read_memos:
1365
 
            try:
1366
 
                yield read_memo, cached[read_memo]
1367
 
            except KeyError:
1368
 
                # Read the block, and cache it.
1369
 
                zdata = raw_records.next()
1370
 
                block = GroupCompressBlock.from_bytes(zdata)
1371
 
                self._group_cache[read_memo] = block
1372
 
                cached[read_memo] = block
1373
 
                yield read_memo, block
 
1131
    def _get_block(self, index_memo):
 
1132
        read_memo = index_memo[0:3]
 
1133
        # get the group:
 
1134
        try:
 
1135
            block = self._group_cache[read_memo]
 
1136
        except KeyError:
 
1137
            # read the group
 
1138
            zdata = self._access.get_raw_records([read_memo]).next()
 
1139
            # decompress - whole thing - this is not a bug, as it
 
1140
            # permits caching. We might want to store the partially
 
1141
            # decompresed group and decompress object, so that recent
 
1142
            # texts are not penalised by big groups.
 
1143
            block = GroupCompressBlock.from_bytes(zdata)
 
1144
            self._group_cache[read_memo] = block
 
1145
        # cheapo debugging:
 
1146
        # print len(zdata), len(plain)
 
1147
        # parse - requires split_lines, better to have byte offsets
 
1148
        # here (but not by much - we only split the region for the
 
1149
        # recipe, and we often want to end up with lines anyway.
 
1150
        return block
1374
1151
 
1375
1152
    def get_missing_compression_parent_keys(self):
1376
1153
        """Return the keys of missing compression parents.
1542
1319
                unadded_keys, source_result)
1543
1320
        for key in missing:
1544
1321
            yield AbsentContentFactory(key)
1545
 
        # Batch up as many keys as we can until either:
1546
 
        #  - we encounter an unadded ref, or
1547
 
        #  - we run out of keys, or
1548
 
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1549
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1322
        manager = None
 
1323
        last_read_memo = None
 
1324
        # TODO: This works fairly well at batching up existing groups into a
 
1325
        #       streamable format, and possibly allowing for taking one big
 
1326
        #       group and splitting it when it isn't fully utilized.
 
1327
        #       However, it doesn't allow us to find under-utilized groups and
 
1328
        #       combine them into a bigger group on the fly.
 
1329
        #       (Consider the issue with how chk_map inserts texts
 
1330
        #       one-at-a-time.) This could be done at insert_record_stream()
 
1331
        #       time, but it probably would decrease the number of
 
1332
        #       bytes-on-the-wire for fetch.
1550
1333
        for source, keys in source_keys:
1551
1334
            if source is self:
1552
1335
                for key in keys:
1553
1336
                    if key in self._unadded_refs:
1554
 
                        # Flush batch, then yield unadded ref from
1555
 
                        # self._compressor.
1556
 
                        for factory in batcher.yield_factories(full_flush=True):
1557
 
                            yield factory
 
1337
                        if manager is not None:
 
1338
                            for factory in manager.get_record_stream():
 
1339
                                yield factory
 
1340
                            last_read_memo = manager = None
1558
1341
                        bytes, sha1 = self._compressor.extract(key)
1559
1342
                        parents = self._unadded_refs[key]
1560
1343
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1561
 
                        continue
1562
 
                    if batcher.add_key(key) > BATCH_SIZE:
1563
 
                        # Ok, this batch is big enough.  Yield some results.
1564
 
                        for factory in batcher.yield_factories():
1565
 
                            yield factory
 
1344
                    else:
 
1345
                        index_memo, _, parents, (method, _) = locations[key]
 
1346
                        read_memo = index_memo[0:3]
 
1347
                        if last_read_memo != read_memo:
 
1348
                            # We are starting a new block. If we have a
 
1349
                            # manager, we have found everything that fits for
 
1350
                            # now, so yield records
 
1351
                            if manager is not None:
 
1352
                                for factory in manager.get_record_stream():
 
1353
                                    yield factory
 
1354
                            # Now start a new manager
 
1355
                            block = self._get_block(index_memo)
 
1356
                            manager = _LazyGroupContentManager(block)
 
1357
                            last_read_memo = read_memo
 
1358
                        start, end = index_memo[3:5]
 
1359
                        manager.add_factory(key, parents, start, end)
1566
1360
            else:
1567
 
                for factory in batcher.yield_factories(full_flush=True):
1568
 
                    yield factory
 
1361
                if manager is not None:
 
1362
                    for factory in manager.get_record_stream():
 
1363
                        yield factory
 
1364
                    last_read_memo = manager = None
1569
1365
                for record in source.get_record_stream(keys, ordering,
1570
1366
                                                       include_delta_closure):
1571
1367
                    yield record
1572
 
        for factory in batcher.yield_factories(full_flush=True):
1573
 
            yield factory
 
1368
        if manager is not None:
 
1369
            for factory in manager.get_record_stream():
 
1370
                yield factory
1574
1371
 
1575
1372
    def get_sha1s(self, keys):
1576
1373
        """See VersionedFiles.get_sha1s()."""
1631
1428
        keys_to_add = []
1632
1429
        def flush():
1633
1430
            bytes = self._compressor.flush().to_bytes()
1634
 
            self._compressor = GroupCompressor()
1635
1431
            index, start, length = self._access.add_raw_records(
1636
1432
                [(None, len(bytes))], bytes)[0]
1637
1433
            nodes = []
1640
1436
            self._index.add_records(nodes, random_id=random_id)
1641
1437
            self._unadded_refs = {}
1642
1438
            del keys_to_add[:]
 
1439
            self._compressor = GroupCompressor()
1643
1440
 
1644
1441
        last_prefix = None
1645
1442
        max_fulltext_len = 0
1649
1446
        block_length = None
1650
1447
        # XXX: TODO: remove this, it is just for safety checking for now
1651
1448
        inserted_keys = set()
1652
 
        reuse_this_block = reuse_blocks
1653
1449
        for record in stream:
1654
1450
            # Raise an error when a record is missing.
1655
1451
            if record.storage_kind == 'absent':
1663
1459
            if reuse_blocks:
1664
1460
                # If the reuse_blocks flag is set, check to see if we can just
1665
1461
                # copy a groupcompress block as-is.
1666
 
                # We only check on the first record (groupcompress-block) not
1667
 
                # on all of the (groupcompress-block-ref) entries.
1668
 
                # The reuse_this_block flag is then kept for as long as
1669
 
                if record.storage_kind == 'groupcompress-block':
1670
 
                    # Check to see if we really want to re-use this block
1671
 
                    insert_manager = record._manager
1672
 
                    reuse_this_block = insert_manager.check_is_well_utilized()
1673
 
            else:
1674
 
                reuse_this_block = False
1675
 
            if reuse_this_block:
1676
 
                # We still want to reuse this block
1677
1462
                if record.storage_kind == 'groupcompress-block':
1678
1463
                    # Insert the raw block into the target repo
1679
1464
                    insert_manager = record._manager
 
1465
                    insert_manager._check_rebuild_block()
1680
1466
                    bytes = record._manager._block.to_bytes()
1681
1467
                    _, start, length = self._access.add_raw_records(
1682
1468
                        [(None, len(bytes))], bytes)[0]
1687
1473
                                           'groupcompress-block-ref'):
1688
1474
                    if insert_manager is None:
1689
1475
                        raise AssertionError('No insert_manager set')
1690
 
                    if insert_manager is not record._manager:
1691
 
                        raise AssertionError('insert_manager does not match'
1692
 
                            ' the current record, we cannot be positive'
1693
 
                            ' that the appropriate content was inserted.'
1694
 
                            )
1695
1476
                    value = "%d %d %d %d" % (block_start, block_length,
1696
1477
                                             record._start, record._end)
1697
1478
                    nodes = [(record.key, value, (record.parents,))]
1747
1528
                key = record.key
1748
1529
            self._unadded_refs[key] = record.parents
1749
1530
            yield found_sha1
1750
 
            as_st = static_tuple.StaticTuple.from_sequence
1751
 
            if record.parents is not None:
1752
 
                parents = as_st([as_st(p) for p in record.parents])
1753
 
            else:
1754
 
                parents = None
1755
 
            refs = static_tuple.StaticTuple(parents)
1756
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1531
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1532
                (record.parents,)))
1757
1533
        if len(keys_to_add):
1758
1534
            flush()
1759
1535
        self._compressor = None
1814
1590
 
1815
1591
    def __init__(self, graph_index, is_locked, parents=True,
1816
1592
        add_callback=None, track_external_parent_refs=False,
1817
 
        inconsistency_fatal=True, track_new_keys=False):
 
1593
        inconsistency_fatal=True):
1818
1594
        """Construct a _GCGraphIndex on a graph_index.
1819
1595
 
1820
1596
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1839
1615
        self.has_graph = parents
1840
1616
        self._is_locked = is_locked
1841
1617
        self._inconsistency_fatal = inconsistency_fatal
1842
 
        # GroupCompress records tend to have the same 'group' start + offset
1843
 
        # repeated over and over, this creates a surplus of ints
1844
 
        self._int_cache = {}
1845
1618
        if track_external_parent_refs:
1846
 
            self._key_dependencies = knit._KeyRefs(
1847
 
                track_new_keys=track_new_keys)
 
1619
            self._key_dependencies = knit._KeyRefs()
1848
1620
        else:
1849
1621
            self._key_dependencies = None
1850
1622
 
1883
1655
        if not random_id:
1884
1656
            present_nodes = self._get_entries(keys)
1885
1657
            for (index, key, value, node_refs) in present_nodes:
1886
 
                # Sometimes these are passed as a list rather than a tuple
1887
 
                node_refs = static_tuple.as_tuples(node_refs)
1888
 
                passed = static_tuple.as_tuples(keys[key])
1889
 
                if node_refs != passed[1]:
1890
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
 
1658
                if node_refs != keys[key][1]:
 
1659
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
1891
1660
                    if self._inconsistency_fatal:
1892
1661
                        raise errors.KnitCorrupt(self, "inconsistent details"
1893
1662
                                                 " in add_records: %s" %
1907
1676
                    result.append((key, value))
1908
1677
            records = result
1909
1678
        key_dependencies = self._key_dependencies
1910
 
        if key_dependencies is not None:
1911
 
            if self._parents:
1912
 
                for key, value, refs in records:
1913
 
                    parents = refs[0]
1914
 
                    key_dependencies.add_references(key, parents)
1915
 
            else:
1916
 
                for key, value, refs in records:
1917
 
                    new_keys.add_key(key)
 
1679
        if key_dependencies is not None and self._parents:
 
1680
            for key, value, refs in records:
 
1681
                parents = refs[0]
 
1682
                key_dependencies.add_references(key, parents)
1918
1683
        self._add_callback(records)
1919
1684
 
1920
1685
    def _check_read(self):
1951
1716
            if missing_keys:
1952
1717
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
1953
1718
 
1954
 
    def find_ancestry(self, keys):
1955
 
        """See CombinedGraphIndex.find_ancestry"""
1956
 
        return self._graph_index.find_ancestry(keys, 0)
1957
 
 
1958
1719
    def get_parent_map(self, keys):
1959
1720
        """Get a map of the parents of keys.
1960
1721
 
1977
1738
        """Return the keys of missing parents."""
1978
1739
        # Copied from _KnitGraphIndex.get_missing_parents
1979
1740
        # We may have false positives, so filter those out.
1980
 
        self._key_dependencies.satisfy_refs_for_keys(
 
1741
        self._key_dependencies.add_keys(
1981
1742
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
1982
1743
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
1983
1744
 
2026
1787
        """Convert an index value to position details."""
2027
1788
        bits = node[2].split(' ')
2028
1789
        # It would be nice not to read the entire gzip.
2029
 
        # start and stop are put into _int_cache because they are very common.
2030
 
        # They define the 'group' that an entry is in, and many groups can have
2031
 
        # thousands of objects.
2032
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2033
 
        # each, or about 7MB. Note that it might be even more when you consider
2034
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
2037
 
        # fragmentation.
2038
1790
        start = int(bits[0])
2039
 
        start = self._int_cache.setdefault(start, start)
2040
1791
        stop = int(bits[1])
2041
 
        stop = self._int_cache.setdefault(stop, stop)
2042
1792
        basis_end = int(bits[2])
2043
1793
        delta_end = int(bits[3])
2044
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2045
 
        # instance...
2046
 
        return (node[0], start, stop, basis_end, delta_end)
 
1794
        return node[0], start, stop, basis_end, delta_end
2047
1795
 
2048
1796
    def scan_unvalidated_index(self, graph_index):
2049
1797
        """Inform this _GCGraphIndex that there is an unvalidated index.
2050
1798
 
2051
1799
        This allows this _GCGraphIndex to keep track of any missing
2052
1800
        compression parents we may want to have filled in to make those
2053
 
        indices valid.  It also allows _GCGraphIndex to track any new keys.
 
1801
        indices valid.
2054
1802
 
2055
1803
        :param graph_index: A GraphIndex
2056
1804
        """
2057
 
        key_dependencies = self._key_dependencies
2058
 
        if key_dependencies is None:
2059
 
            return
2060
 
        for node in graph_index.iter_all_entries():
2061
 
            # Add parent refs from graph_index (and discard parent refs
2062
 
            # that the graph_index has).
2063
 
            key_dependencies.add_references(node[1], node[3][0])
 
1805
        if self._key_dependencies is not None:
 
1806
            # Add parent refs from graph_index (and discard parent refs that
 
1807
            # the graph_index has).
 
1808
            add_refs = self._key_dependencies.add_references
 
1809
            for node in graph_index.iter_all_entries():
 
1810
                add_refs(node[1], node[3][0])
 
1811
 
2064
1812
 
2065
1813
 
2066
1814
from bzrlib._groupcompress_py import (
2080
1828
        decode_base128_int,
2081
1829
        )
2082
1830
    GroupCompressor = PyrexGroupCompressor
2083
 
except ImportError, e:
2084
 
    osutils.failed_to_load_extension(e)
 
1831
except ImportError:
2085
1832
    GroupCompressor = PythonGroupCompressor
2086
1833