~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-05-16 17:33:27 UTC
  • mfrom: (5755.2.10 2.4-max-entries-gc-602614)
  • Revision ID: pqm@pqm.ubuntu.com-20110516173327-5ehst0ttceohsf5w
(jameinel) Add bzr.groupcompress.max_bytes_to_index to limit peak memory
 when delta-compressing large files (bug #602614) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
lazy_import(globals(), """
28
28
from bzrlib import (
29
29
    annotate,
 
30
    config,
30
31
    debug,
31
32
    errors,
32
33
    graph as _mod_graph,
490
491
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
491
492
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
492
493
 
493
 
    def __init__(self, block):
 
494
    def __init__(self, block, get_compressor_settings=None):
494
495
        self._block = block
495
496
        # We need to preserve the ordering
496
497
        self._factories = []
497
498
        self._last_byte = 0
 
499
        self._get_settings = get_compressor_settings
 
500
        self._compressor_settings = None
 
501
 
 
502
    def _get_compressor_settings(self):
 
503
        if self._compressor_settings is not None:
 
504
            return self._compressor_settings
 
505
        settings = None
 
506
        if self._get_settings is not None:
 
507
            settings = self._get_settings()
 
508
        if settings is None:
 
509
            vf = GroupCompressVersionedFiles
 
510
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
511
        self._compressor_settings = settings
 
512
        return self._compressor_settings
498
513
 
499
514
    def add_factory(self, key, parents, start, end):
500
515
        if not self._factories:
533
548
        new_block.set_content(self._block._content[:last_byte])
534
549
        self._block = new_block
535
550
 
 
551
    def _make_group_compressor(self):
 
552
        return GroupCompressor(self._get_compressor_settings())
 
553
 
536
554
    def _rebuild_block(self):
537
555
        """Create a new GroupCompressBlock with only the referenced texts."""
538
 
        compressor = GroupCompressor()
 
556
        compressor = self._make_group_compressor()
539
557
        tstart = time.time()
540
558
        old_length = self._block._content_length
541
559
        end_point = 0
553
571
        #       block? It seems hard to come up with a method that it would
554
572
        #       expand, since we do full compression again. Perhaps based on a
555
573
        #       request that ends up poorly ordered?
 
574
        # TODO: If the content would have expanded, then we would want to
 
575
        #       handle a case where we need to split the block.
 
576
        #       Now that we have a user-tweakable option
 
577
        #       (max_bytes_to_index), it is possible that one person set it
 
578
        #       to a very low value, causing poor compression.
556
579
        delta = time.time() - tstart
557
580
        self._block = new_block
558
581
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
781
804
 
782
805
class _CommonGroupCompressor(object):
783
806
 
784
 
    def __init__(self):
 
807
    def __init__(self, settings=None):
785
808
        """Create a GroupCompressor."""
786
809
        self.chunks = []
787
810
        self._last = None
790
813
        self.labels_deltas = {}
791
814
        self._delta_index = None # Set by the children
792
815
        self._block = GroupCompressBlock()
 
816
        if settings is None:
 
817
            self._settings = {}
 
818
        else:
 
819
            self._settings = settings
793
820
 
794
821
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
795
822
        """Compress lines with label key.
910
937
 
911
938
class PythonGroupCompressor(_CommonGroupCompressor):
912
939
 
913
 
    def __init__(self):
 
940
    def __init__(self, settings=None):
914
941
        """Create a GroupCompressor.
915
942
 
916
943
        Used only if the pyrex version is not available.
917
944
        """
918
 
        super(PythonGroupCompressor, self).__init__()
 
945
        super(PythonGroupCompressor, self).__init__(settings)
919
946
        self._delta_index = LinesDeltaIndex([])
920
947
        # The actual content is managed by LinesDeltaIndex
921
948
        self.chunks = self._delta_index.lines
969
996
       left side.
970
997
    """
971
998
 
972
 
    def __init__(self):
973
 
        super(PyrexGroupCompressor, self).__init__()
974
 
        self._delta_index = DeltaIndex()
 
999
    def __init__(self, settings=None):
 
1000
        super(PyrexGroupCompressor, self).__init__(settings)
 
1001
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
1002
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
975
1003
 
976
1004
    def _compress(self, key, bytes, max_delta_size, soft=False):
977
1005
        """see _CommonGroupCompressor._compress"""
1068
1096
 
1069
1097
class _BatchingBlockFetcher(object):
1070
1098
    """Fetch group compress blocks in batches.
1071
 
    
 
1099
 
1072
1100
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1073
1101
        currently pending batch.
1074
1102
    """
1075
1103
 
1076
 
    def __init__(self, gcvf, locations):
 
1104
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1077
1105
        self.gcvf = gcvf
1078
1106
        self.locations = locations
1079
1107
        self.keys = []
1082
1110
        self.total_bytes = 0
1083
1111
        self.last_read_memo = None
1084
1112
        self.manager = None
 
1113
        self._get_compressor_settings = get_compressor_settings
1085
1114
 
1086
1115
    def add_key(self, key):
1087
1116
        """Add another to key to fetch.
1088
 
        
 
1117
 
1089
1118
        :return: The estimated number of bytes needed to fetch the batch so
1090
1119
            far.
1091
1120
        """
1116
1145
            # and then.
1117
1146
            self.batch_memos[read_memo] = cached_block
1118
1147
        return self.total_bytes
1119
 
        
 
1148
 
1120
1149
    def _flush_manager(self):
1121
1150
        if self.manager is not None:
1122
1151
            for factory in self.manager.get_record_stream():
1127
1156
    def yield_factories(self, full_flush=False):
1128
1157
        """Yield factories for keys added since the last yield.  They will be
1129
1158
        returned in the order they were added via add_key.
1130
 
        
 
1159
 
1131
1160
        :param full_flush: by default, some results may not be returned in case
1132
1161
            they can be part of the next batch.  If full_flush is True, then
1133
1162
            all results are returned.
1161
1190
                    memos_to_get_stack.pop()
1162
1191
                else:
1163
1192
                    block = self.batch_memos[read_memo]
1164
 
                self.manager = _LazyGroupContentManager(block)
 
1193
                self.manager = _LazyGroupContentManager(block,
 
1194
                    get_compressor_settings=self._get_compressor_settings)
1165
1195
                self.last_read_memo = read_memo
1166
1196
            start, end = index_memo[3:5]
1167
1197
            self.manager.add_factory(key, parents, start, end)
1177
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1178
1208
    """A group-compress based VersionedFiles implementation."""
1179
1209
 
 
1210
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1211
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1212
    # However each of these references costs some memory in trade against a
 
1213
    # more accurate match result. For very large files, they either are
 
1214
    # pre-compressed and change in bulk whenever they change, or change in just
 
1215
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1216
    # versus running out of memory trying to track everything. The default max
 
1217
    # gives 100% sampling of a 1MB file.
 
1218
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1219
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1220
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
 
1221
 
1180
1222
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1181
 
            _group_cache=None):
 
1223
                 _group_cache=None):
1182
1224
        """Create a GroupCompressVersionedFiles object.
1183
1225
 
1184
1226
        :param index: The index object storing access and graph data.
1197
1239
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1198
1240
        self._group_cache = _group_cache
1199
1241
        self._immediate_fallback_vfs = []
 
1242
        self._max_bytes_to_index = None
1200
1243
 
1201
1244
    def without_fallbacks(self):
1202
1245
        """Return a clone of this object without any fallbacks configured."""
1560
1603
        #  - we encounter an unadded ref, or
1561
1604
        #  - we run out of keys, or
1562
1605
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1563
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1606
        batcher = _BatchingBlockFetcher(self, locations,
 
1607
            get_compressor_settings=self._get_compressor_settings)
1564
1608
        for source, keys in source_keys:
1565
1609
            if source is self:
1566
1610
                for key in keys:
1612
1656
        for _ in self._insert_record_stream(stream, random_id=False):
1613
1657
            pass
1614
1658
 
 
1659
    def _get_compressor_settings(self):
 
1660
        if self._max_bytes_to_index is None:
 
1661
            # TODO: VersionedFiles don't know about their containing
 
1662
            #       repository, so they don't have much of an idea about their
 
1663
            #       location. So for now, this is only a global option.
 
1664
            c = config.GlobalConfig()
 
1665
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1666
            if val is not None:
 
1667
                try:
 
1668
                    val = int(val)
 
1669
                except ValueError, e:
 
1670
                    trace.warning('Value for '
 
1671
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1672
                                  ' %r is not an integer'
 
1673
                                  % (val,))
 
1674
                    val = None
 
1675
            if val is None:
 
1676
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1677
            self._max_bytes_to_index = val
 
1678
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1679
 
 
1680
    def _make_group_compressor(self):
 
1681
        return GroupCompressor(self._get_compressor_settings())
 
1682
 
1615
1683
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1616
1684
                              reuse_blocks=True):
1617
1685
        """Internal core to insert a record stream into this container.
1640
1708
                return adapter
1641
1709
        # This will go up to fulltexts for gc to gc fetching, which isn't
1642
1710
        # ideal.
1643
 
        self._compressor = GroupCompressor()
 
1711
        self._compressor = self._make_group_compressor()
1644
1712
        self._unadded_refs = {}
1645
1713
        keys_to_add = []
1646
1714
        def flush():
1647
1715
            bytes_len, chunks = self._compressor.flush().to_chunks()
1648
 
            self._compressor = GroupCompressor()
 
1716
            self._compressor = self._make_group_compressor()
1649
1717
            # Note: At this point we still have 1 copy of the fulltext (in
1650
1718
            #       record and the var 'bytes'), and this generates 2 copies of
1651
1719
            #       the compressed text (one for bytes, one in chunks)