~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-11-30 22:04:45 UTC
  • mfrom: (4789.28.4 2.1.0b4-builder-no-keys)
  • Revision ID: pqm@pqm.ubuntu.com-20091130220445-vbfmmgocbgcs195q
(jam) Update BTreeBuilder to remove ._keys and use StaticTuple

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
    knit,
32
32
    osutils,
33
33
    pack,
 
34
    static_tuple,
34
35
    trace,
35
36
    )
36
37
from bzrlib.btree_index import BTreeBuilder
119
120
        :param num_bytes: Ensure that we have extracted at least num_bytes of
120
121
            content. If None, consume everything
121
122
        """
122
 
        # TODO: If we re-use the same content block at different times during
123
 
        #       get_record_stream(), it is possible that the first pass will
124
 
        #       get inserted, triggering an extract/_ensure_content() which
125
 
        #       will get rid of _z_content. And then the next use of the block
126
 
        #       will try to access _z_content (to send it over the wire), and
127
 
        #       fail because it is already extracted. Consider never releasing
128
 
        #       _z_content because of this.
 
123
        if self._content_length is None:
 
124
            raise AssertionError('self._content_length should never be None')
129
125
        if num_bytes is None:
130
126
            num_bytes = self._content_length
131
127
        elif (self._content_length is not None
148
144
                self._content = pylzma.decompress(self._z_content)
149
145
            elif self._compressor_name == 'zlib':
150
146
                # Start a zlib decompressor
151
 
                if num_bytes is None:
 
147
                if num_bytes * 4 > self._content_length * 3:
 
148
                    # If we are requesting more that 3/4ths of the content,
 
149
                    # just extract the whole thing in a single pass
 
150
                    num_bytes = self._content_length
152
151
                    self._content = zlib.decompress(self._z_content)
153
152
                else:
154
153
                    self._z_content_decompressor = zlib.decompressobj()
156
155
                    # that the rest of the code is simplified
157
156
                    self._content = self._z_content_decompressor.decompress(
158
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
158
                    if not self._z_content_decompressor.unconsumed_tail:
 
159
                        self._z_content_decompressor = None
159
160
            else:
160
161
                raise AssertionError('Unknown compressor: %r'
161
162
                                     % self._compressor_name)
163
164
        # 'unconsumed_tail'
164
165
 
165
166
        # Do we have enough bytes already?
166
 
        if num_bytes is not None and len(self._content) >= num_bytes:
167
 
            return
168
 
        if num_bytes is None and self._z_content_decompressor is None:
169
 
            # We must have already decompressed everything
 
167
        if len(self._content) >= num_bytes:
170
168
            return
171
169
        # If we got this far, and don't have a decompressor, something is wrong
172
170
        if self._z_content_decompressor is None:
173
171
            raise AssertionError(
174
172
                'No decompressor to decompress %d bytes' % num_bytes)
175
173
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
176
 
        if num_bytes is None:
177
 
            if remaining_decomp:
178
 
                # We don't know how much is left, but we'll decompress it all
179
 
                self._content += self._z_content_decompressor.decompress(
180
 
                    remaining_decomp)
181
 
                # Note: There's what I consider a bug in zlib.decompressobj
182
 
                #       If you pass back in the entire unconsumed_tail, only
183
 
                #       this time you don't pass a max-size, it doesn't
184
 
                #       change the unconsumed_tail back to None/''.
185
 
                #       However, we know we are done with the whole stream
186
 
                self._z_content_decompressor = None
187
 
            # XXX: Why is this the only place in this routine we set this?
188
 
            self._content_length = len(self._content)
189
 
        else:
190
 
            if not remaining_decomp:
191
 
                raise AssertionError('Nothing left to decompress')
192
 
            needed_bytes = num_bytes - len(self._content)
193
 
            # We always set max_size to 32kB over the minimum needed, so that
194
 
            # zlib will give us as much as we really want.
195
 
            # TODO: If this isn't good enough, we could make a loop here,
196
 
            #       that keeps expanding the request until we get enough
197
 
            self._content += self._z_content_decompressor.decompress(
198
 
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
 
            if len(self._content) < num_bytes:
200
 
                raise AssertionError('%d bytes wanted, only %d available'
201
 
                                     % (num_bytes, len(self._content)))
202
 
            if not self._z_content_decompressor.unconsumed_tail:
203
 
                # The stream is finished
204
 
                self._z_content_decompressor = None
 
174
        if not remaining_decomp:
 
175
            raise AssertionError('Nothing left to decompress')
 
176
        needed_bytes = num_bytes - len(self._content)
 
177
        # We always set max_size to 32kB over the minimum needed, so that
 
178
        # zlib will give us as much as we really want.
 
179
        # TODO: If this isn't good enough, we could make a loop here,
 
180
        #       that keeps expanding the request until we get enough
 
181
        self._content += self._z_content_decompressor.decompress(
 
182
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
183
        if len(self._content) < num_bytes:
 
184
            raise AssertionError('%d bytes wanted, only %d available'
 
185
                                 % (num_bytes, len(self._content)))
 
186
        if not self._z_content_decompressor.unconsumed_tail:
 
187
            # The stream is finished
 
188
            self._z_content_decompressor = None
205
189
 
206
190
    def _parse_bytes(self, bytes, pos):
207
191
        """Read the various lengths from the header.
457
441
                # There are code paths that first extract as fulltext, and then
458
442
                # extract as storage_kind (smart fetch). So we don't break the
459
443
                # refcycle here, but instead in manager.get_record_stream()
460
 
                # self._manager = None
461
444
            if storage_kind == 'fulltext':
462
445
                return self._bytes
463
446
            else:
469
452
class _LazyGroupContentManager(object):
470
453
    """This manages a group of _LazyGroupCompressFactory objects."""
471
454
 
 
455
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
456
                             # current size, and still be considered
 
457
                             # resuable
 
458
    _full_block_size = 4*1024*1024
 
459
    _full_mixed_block_size = 2*1024*1024
 
460
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
461
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
462
 
472
463
    def __init__(self, block):
473
464
        self._block = block
474
465
        # We need to preserve the ordering
546
537
        # time (self._block._content) is a little expensive.
547
538
        self._block._ensure_content(self._last_byte)
548
539
 
549
 
    def _check_rebuild_block(self):
 
540
    def _check_rebuild_action(self):
550
541
        """Check to see if our block should be repacked."""
551
542
        total_bytes_used = 0
552
543
        last_byte_used = 0
553
544
        for factory in self._factories:
554
545
            total_bytes_used += factory._end - factory._start
555
 
            last_byte_used = max(last_byte_used, factory._end)
556
 
        # If we are using most of the bytes from the block, we have nothing
557
 
        # else to check (currently more that 1/2)
 
546
            if last_byte_used < factory._end:
 
547
                last_byte_used = factory._end
 
548
        # If we are using more than half of the bytes from the block, we have
 
549
        # nothing else to check
558
550
        if total_bytes_used * 2 >= self._block._content_length:
559
 
            return
560
 
        # Can we just strip off the trailing bytes? If we are going to be
561
 
        # transmitting more than 50% of the front of the content, go ahead
 
551
            return None, last_byte_used, total_bytes_used
 
552
        # We are using less than 50% of the content. Is the content we are
 
553
        # using at the beginning of the block? If so, we can just trim the
 
554
        # tail, rather than rebuilding from scratch.
562
555
        if total_bytes_used * 2 > last_byte_used:
563
 
            self._trim_block(last_byte_used)
564
 
            return
 
556
            return 'trim', last_byte_used, total_bytes_used
565
557
 
566
558
        # We are using a small amount of the data, and it isn't just packed
567
559
        # nicely at the front, so rebuild the content.
574
566
        #       expanding many deltas into fulltexts, as well.
575
567
        #       If we build a cheap enough 'strip', then we could try a strip,
576
568
        #       if that expands the content, we then rebuild.
577
 
        self._rebuild_block()
 
569
        return 'rebuild', last_byte_used, total_bytes_used
 
570
 
 
571
    def check_is_well_utilized(self):
 
572
        """Is the current block considered 'well utilized'?
 
573
 
 
574
        This heuristic asks if the current block considers itself to be a fully
 
575
        developed group, rather than just a loose collection of data.
 
576
        """
 
577
        if len(self._factories) == 1:
 
578
            # A block of length 1 could be improved by combining with other
 
579
            # groups - don't look deeper. Even larger than max size groups
 
580
            # could compress well with adjacent versions of the same thing.
 
581
            return False
 
582
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
583
        block_size = self._block._content_length
 
584
        if total_bytes_used < block_size * self._max_cut_fraction:
 
585
            # This block wants to trim itself small enough that we want to
 
586
            # consider it under-utilized.
 
587
            return False
 
588
        # TODO: This code is meant to be the twin of _insert_record_stream's
 
589
        #       'start_new_block' logic. It would probably be better to factor
 
590
        #       out that logic into a shared location, so that it stays
 
591
        #       together better
 
592
        # We currently assume a block is properly utilized whenever it is >75%
 
593
        # of the size of a 'full' block. In normal operation, a block is
 
594
        # considered full when it hits 4MB of same-file content. So any block
 
595
        # >3MB is 'full enough'.
 
596
        # The only time this isn't true is when a given block has large-object
 
597
        # content. (a single file >4MB, etc.)
 
598
        # Under these circumstances, we allow a block to grow to
 
599
        # 2 x largest_content.  Which means that if a given block had a large
 
600
        # object, it may actually be under-utilized. However, given that this
 
601
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
 
602
        # content blobs on-the-fly. Note that because we return False for all
 
603
        # 1-item blobs, we will repack them; we may wish to reevaluate our
 
604
        # treatment of large object blobs in the future.
 
605
        if block_size >= self._full_enough_block_size:
 
606
            return True
 
607
        # If a block is <3MB, it still may be considered 'full' if it contains
 
608
        # mixed content. The current rule is 2MB of mixed content is considered
 
609
        # full. So check to see if this block contains mixed content, and
 
610
        # set the threshold appropriately.
 
611
        common_prefix = None
 
612
        for factory in self._factories:
 
613
            prefix = factory.key[:-1]
 
614
            if common_prefix is None:
 
615
                common_prefix = prefix
 
616
            elif prefix != common_prefix:
 
617
                # Mixed content, check the size appropriately
 
618
                if block_size >= self._full_enough_mixed_block_size:
 
619
                    return True
 
620
                break
 
621
        # The content failed both the mixed check and the single-content check
 
622
        # so obviously it is not fully utilized
 
623
        # TODO: there is one other constraint that isn't being checked
 
624
        #       namely, that the entries in the block are in the appropriate
 
625
        #       order. For example, you could insert the entries in exactly
 
626
        #       reverse groupcompress order, and we would think that is ok.
 
627
        #       (all the right objects are in one group, and it is fully
 
628
        #       utilized, etc.) For now, we assume that case is rare,
 
629
        #       especially since we should always fetch in 'groupcompress'
 
630
        #       order.
 
631
        return False
 
632
 
 
633
    def _check_rebuild_block(self):
 
634
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
635
        if action is None:
 
636
            return
 
637
        if action == 'trim':
 
638
            self._trim_block(last_byte_used)
 
639
        elif action == 'rebuild':
 
640
            self._rebuild_block()
 
641
        else:
 
642
            raise ValueError('unknown rebuild action: %r' % (action,))
578
643
 
579
644
    def _wire_bytes(self):
580
645
        """Return a byte stream suitable for transmitting over the wire."""
1087
1152
class GroupCompressVersionedFiles(VersionedFiles):
1088
1153
    """A group-compress based VersionedFiles implementation."""
1089
1154
 
1090
 
    def __init__(self, index, access, delta=True):
 
1155
    def __init__(self, index, access, delta=True, _unadded_refs=None):
1091
1156
        """Create a GroupCompressVersionedFiles object.
1092
1157
 
1093
1158
        :param index: The index object storing access and graph data.
1094
1159
        :param access: The access object storing raw data.
1095
1160
        :param delta: Whether to delta compress or just entropy compress.
 
1161
        :param _unadded_refs: private parameter, don't use.
1096
1162
        """
1097
1163
        self._index = index
1098
1164
        self._access = access
1099
1165
        self._delta = delta
1100
 
        self._unadded_refs = {}
 
1166
        if _unadded_refs is None:
 
1167
            _unadded_refs = {}
 
1168
        self._unadded_refs = _unadded_refs
1101
1169
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1102
1170
        self._fallback_vfs = []
1103
1171
 
 
1172
    def without_fallbacks(self):
 
1173
        """Return a clone of this object without any fallbacks configured."""
 
1174
        return GroupCompressVersionedFiles(self._index, self._access,
 
1175
            self._delta, _unadded_refs=dict(self._unadded_refs))
 
1176
 
1104
1177
    def add_lines(self, key, parents, lines, parent_texts=None,
1105
1178
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1106
1179
        check_content=True):
1193
1266
        else:
1194
1267
            return self.get_record_stream(keys, 'unordered', True)
1195
1268
 
 
1269
    def clear_cache(self):
 
1270
        """See VersionedFiles.clear_cache()"""
 
1271
        self._group_cache.clear()
 
1272
        self._index._graph_index.clear_cache()
 
1273
        self._index._int_cache.clear()
 
1274
 
1196
1275
    def _check_add(self, key, lines, random_id, check_content):
1197
1276
        """check that version_id and lines are safe to add."""
1198
1277
        version_id = key[-1]
1570
1649
        block_length = None
1571
1650
        # XXX: TODO: remove this, it is just for safety checking for now
1572
1651
        inserted_keys = set()
 
1652
        reuse_this_block = reuse_blocks
1573
1653
        for record in stream:
1574
1654
            # Raise an error when a record is missing.
1575
1655
            if record.storage_kind == 'absent':
1583
1663
            if reuse_blocks:
1584
1664
                # If the reuse_blocks flag is set, check to see if we can just
1585
1665
                # copy a groupcompress block as-is.
 
1666
                # We only check on the first record (groupcompress-block) not
 
1667
                # on all of the (groupcompress-block-ref) entries.
 
1668
                # The reuse_this_block flag is then kept for as long as
 
1669
                if record.storage_kind == 'groupcompress-block':
 
1670
                    # Check to see if we really want to re-use this block
 
1671
                    insert_manager = record._manager
 
1672
                    reuse_this_block = insert_manager.check_is_well_utilized()
 
1673
            else:
 
1674
                reuse_this_block = False
 
1675
            if reuse_this_block:
 
1676
                # We still want to reuse this block
1586
1677
                if record.storage_kind == 'groupcompress-block':
1587
1678
                    # Insert the raw block into the target repo
1588
1679
                    insert_manager = record._manager
1589
 
                    insert_manager._check_rebuild_block()
1590
1680
                    bytes = record._manager._block.to_bytes()
1591
1681
                    _, start, length = self._access.add_raw_records(
1592
1682
                        [(None, len(bytes))], bytes)[0]
1597
1687
                                           'groupcompress-block-ref'):
1598
1688
                    if insert_manager is None:
1599
1689
                        raise AssertionError('No insert_manager set')
 
1690
                    if insert_manager is not record._manager:
 
1691
                        raise AssertionError('insert_manager does not match'
 
1692
                            ' the current record, we cannot be positive'
 
1693
                            ' that the appropriate content was inserted.'
 
1694
                            )
1600
1695
                    value = "%d %d %d %d" % (block_start, block_length,
1601
1696
                                             record._start, record._end)
1602
1697
                    nodes = [(record.key, value, (record.parents,))]
1714
1809
 
1715
1810
    def __init__(self, graph_index, is_locked, parents=True,
1716
1811
        add_callback=None, track_external_parent_refs=False,
1717
 
        inconsistency_fatal=True):
 
1812
        inconsistency_fatal=True, track_new_keys=False):
1718
1813
        """Construct a _GCGraphIndex on a graph_index.
1719
1814
 
1720
1815
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
1739
1834
        self.has_graph = parents
1740
1835
        self._is_locked = is_locked
1741
1836
        self._inconsistency_fatal = inconsistency_fatal
 
1837
        # GroupCompress records tend to have the same 'group' start + offset
 
1838
        # repeated over and over, this creates a surplus of ints
 
1839
        self._int_cache = {}
1742
1840
        if track_external_parent_refs:
1743
 
            self._key_dependencies = knit._KeyRefs()
 
1841
            self._key_dependencies = knit._KeyRefs(
 
1842
                track_new_keys=track_new_keys)
1744
1843
        else:
1745
1844
            self._key_dependencies = None
1746
1845
 
1779
1878
        if not random_id:
1780
1879
            present_nodes = self._get_entries(keys)
1781
1880
            for (index, key, value, node_refs) in present_nodes:
1782
 
                if node_refs != keys[key][1]:
1783
 
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
 
1881
                # Sometimes these are passed as a list rather than a tuple
 
1882
                node_refs = static_tuple.as_tuples(node_refs)
 
1883
                passed = static_tuple.as_tuples(keys[key])
 
1884
                if node_refs != passed[1]:
 
1885
                    details = '%s %s %s' % (key, (value, node_refs), passed)
1784
1886
                    if self._inconsistency_fatal:
1785
1887
                        raise errors.KnitCorrupt(self, "inconsistent details"
1786
1888
                                                 " in add_records: %s" %
1800
1902
                    result.append((key, value))
1801
1903
            records = result
1802
1904
        key_dependencies = self._key_dependencies
1803
 
        if key_dependencies is not None and self._parents:
1804
 
            for key, value, refs in records:
1805
 
                parents = refs[0]
1806
 
                key_dependencies.add_references(key, parents)
 
1905
        if key_dependencies is not None:
 
1906
            if self._parents:
 
1907
                for key, value, refs in records:
 
1908
                    parents = refs[0]
 
1909
                    key_dependencies.add_references(key, parents)
 
1910
            else:
 
1911
                for key, value, refs in records:
 
1912
                    new_keys.add_key(key)
1807
1913
        self._add_callback(records)
1808
1914
 
1809
1915
    def _check_read(self):
1866
1972
        """Return the keys of missing parents."""
1867
1973
        # Copied from _KnitGraphIndex.get_missing_parents
1868
1974
        # We may have false positives, so filter those out.
1869
 
        self._key_dependencies.add_keys(
 
1975
        self._key_dependencies.satisfy_refs_for_keys(
1870
1976
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
1871
1977
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
1872
1978
 
1915
2021
        """Convert an index value to position details."""
1916
2022
        bits = node[2].split(' ')
1917
2023
        # It would be nice not to read the entire gzip.
 
2024
        # start and stop are put into _int_cache because they are very common.
 
2025
        # They define the 'group' that an entry is in, and many groups can have
 
2026
        # thousands of objects.
 
2027
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
 
2028
        # each, or about 7MB. Note that it might be even more when you consider
 
2029
        # how PyInt is allocated in separate slabs. And you can't return a slab
 
2030
        # to the OS if even 1 int on it is in use. Note though that Python uses
 
2031
        # a LIFO when re-using PyInt slots, which probably causes more
 
2032
        # fragmentation.
1918
2033
        start = int(bits[0])
 
2034
        start = self._int_cache.setdefault(start, start)
1919
2035
        stop = int(bits[1])
 
2036
        stop = self._int_cache.setdefault(stop, stop)
1920
2037
        basis_end = int(bits[2])
1921
2038
        delta_end = int(bits[3])
1922
 
        return node[0], start, stop, basis_end, delta_end
 
2039
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
 
2040
        # instance...
 
2041
        return (node[0], start, stop, basis_end, delta_end)
1923
2042
 
1924
2043
    def scan_unvalidated_index(self, graph_index):
1925
2044
        """Inform this _GCGraphIndex that there is an unvalidated index.
1926
2045
 
1927
2046
        This allows this _GCGraphIndex to keep track of any missing
1928
2047
        compression parents we may want to have filled in to make those
1929
 
        indices valid.
 
2048
        indices valid.  It also allows _GCGraphIndex to track any new keys.
1930
2049
 
1931
2050
        :param graph_index: A GraphIndex
1932
2051
        """
1933
 
        if self._key_dependencies is not None:
1934
 
            # Add parent refs from graph_index (and discard parent refs that
1935
 
            # the graph_index has).
1936
 
            add_refs = self._key_dependencies.add_references
1937
 
            for node in graph_index.iter_all_entries():
1938
 
                add_refs(node[1], node[3][0])
1939
 
 
 
2052
        key_dependencies = self._key_dependencies
 
2053
        if key_dependencies is None:
 
2054
            return
 
2055
        for node in graph_index.iter_all_entries():
 
2056
            # Add parent refs from graph_index (and discard parent refs
 
2057
            # that the graph_index has).
 
2058
            key_dependencies.add_references(node[1], node[3][0])
1940
2059
 
1941
2060
 
1942
2061
from bzrlib._groupcompress_py import (
1956
2075
        decode_base128_int,
1957
2076
        )
1958
2077
    GroupCompressor = PyrexGroupCompressor
1959
 
except ImportError:
 
2078
except ImportError, e:
 
2079
    osutils.failed_to_load_extension(e)
1960
2080
    GroupCompressor = PythonGroupCompressor
1961
2081