~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2011-05-26 20:30:53 UTC
  • mfrom: (5920 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5924.
  • Revision ID: v.ladeuil+lp@free.fr-20110526203053-hbjn6yuzwg03wnuv
MergeĀ fromĀ trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
lazy_import(globals(), """
28
28
from bzrlib import (
29
29
    annotate,
 
30
    config,
30
31
    debug,
31
32
    errors,
32
33
    graph as _mod_graph,
490
491
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
491
492
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
492
493
 
493
 
    def __init__(self, block):
 
494
    def __init__(self, block, get_compressor_settings=None):
494
495
        self._block = block
495
496
        # We need to preserve the ordering
496
497
        self._factories = []
497
498
        self._last_byte = 0
 
499
        self._get_settings = get_compressor_settings
 
500
        self._compressor_settings = None
 
501
 
 
502
    def _get_compressor_settings(self):
 
503
        if self._compressor_settings is not None:
 
504
            return self._compressor_settings
 
505
        settings = None
 
506
        if self._get_settings is not None:
 
507
            settings = self._get_settings()
 
508
        if settings is None:
 
509
            vf = GroupCompressVersionedFiles
 
510
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
511
        self._compressor_settings = settings
 
512
        return self._compressor_settings
498
513
 
499
514
    def add_factory(self, key, parents, start, end):
500
515
        if not self._factories:
533
548
        new_block.set_content(self._block._content[:last_byte])
534
549
        self._block = new_block
535
550
 
 
551
    def _make_group_compressor(self):
 
552
        return GroupCompressor(self._get_compressor_settings())
 
553
 
536
554
    def _rebuild_block(self):
537
555
        """Create a new GroupCompressBlock with only the referenced texts."""
538
 
        compressor = GroupCompressor()
 
556
        compressor = self._make_group_compressor()
539
557
        tstart = time.time()
540
558
        old_length = self._block._content_length
541
559
        end_point = 0
553
571
        #       block? It seems hard to come up with a method that it would
554
572
        #       expand, since we do full compression again. Perhaps based on a
555
573
        #       request that ends up poorly ordered?
 
574
        # TODO: If the content would have expanded, then we would want to
 
575
        #       handle a case where we need to split the block.
 
576
        #       Now that we have a user-tweakable option
 
577
        #       (max_bytes_to_index), it is possible that one person set it
 
578
        #       to a very low value, causing poor compression.
556
579
        delta = time.time() - tstart
557
580
        self._block = new_block
558
581
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
781
804
 
782
805
class _CommonGroupCompressor(object):
783
806
 
784
 
    def __init__(self):
 
807
    def __init__(self, settings=None):
785
808
        """Create a GroupCompressor."""
786
809
        self.chunks = []
787
810
        self._last = None
790
813
        self.labels_deltas = {}
791
814
        self._delta_index = None # Set by the children
792
815
        self._block = GroupCompressBlock()
 
816
        if settings is None:
 
817
            self._settings = {}
 
818
        else:
 
819
            self._settings = settings
793
820
 
794
821
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
795
822
        """Compress lines with label key.
910
937
 
911
938
class PythonGroupCompressor(_CommonGroupCompressor):
912
939
 
913
 
    def __init__(self):
 
940
    def __init__(self, settings=None):
914
941
        """Create a GroupCompressor.
915
942
 
916
943
        Used only if the pyrex version is not available.
917
944
        """
918
 
        super(PythonGroupCompressor, self).__init__()
 
945
        super(PythonGroupCompressor, self).__init__(settings)
919
946
        self._delta_index = LinesDeltaIndex([])
920
947
        # The actual content is managed by LinesDeltaIndex
921
948
        self.chunks = self._delta_index.lines
958
985
 
959
986
    It contains code very similar to SequenceMatcher because of having a similar
960
987
    task. However some key differences apply:
961
 
     - there is no junk, we want a minimal edit not a human readable diff.
962
 
     - we don't filter very common lines (because we don't know where a good
963
 
       range will start, and after the first text we want to be emitting minmal
964
 
       edits only.
965
 
     - we chain the left side, not the right side
966
 
     - we incrementally update the adjacency matrix as new lines are provided.
967
 
     - we look for matches in all of the left side, so the routine which does
968
 
       the analagous task of find_longest_match does not need to filter on the
969
 
       left side.
 
988
 
 
989
    * there is no junk, we want a minimal edit not a human readable diff.
 
990
    * we don't filter very common lines (because we don't know where a good
 
991
      range will start, and after the first text we want to be emitting minmal
 
992
      edits only.
 
993
    * we chain the left side, not the right side
 
994
    * we incrementally update the adjacency matrix as new lines are provided.
 
995
    * we look for matches in all of the left side, so the routine which does
 
996
      the analagous task of find_longest_match does not need to filter on the
 
997
      left side.
970
998
    """
971
999
 
972
 
    def __init__(self):
973
 
        super(PyrexGroupCompressor, self).__init__()
974
 
        self._delta_index = DeltaIndex()
 
1000
    def __init__(self, settings=None):
 
1001
        super(PyrexGroupCompressor, self).__init__(settings)
 
1002
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
1003
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
975
1004
 
976
1005
    def _compress(self, key, bytes, max_delta_size, soft=False):
977
1006
        """see _CommonGroupCompressor._compress"""
1068
1097
 
1069
1098
class _BatchingBlockFetcher(object):
1070
1099
    """Fetch group compress blocks in batches.
1071
 
    
 
1100
 
1072
1101
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1073
1102
        currently pending batch.
1074
1103
    """
1075
1104
 
1076
 
    def __init__(self, gcvf, locations):
 
1105
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1077
1106
        self.gcvf = gcvf
1078
1107
        self.locations = locations
1079
1108
        self.keys = []
1082
1111
        self.total_bytes = 0
1083
1112
        self.last_read_memo = None
1084
1113
        self.manager = None
 
1114
        self._get_compressor_settings = get_compressor_settings
1085
1115
 
1086
1116
    def add_key(self, key):
1087
1117
        """Add another to key to fetch.
1088
 
        
 
1118
 
1089
1119
        :return: The estimated number of bytes needed to fetch the batch so
1090
1120
            far.
1091
1121
        """
1116
1146
            # and then.
1117
1147
            self.batch_memos[read_memo] = cached_block
1118
1148
        return self.total_bytes
1119
 
        
 
1149
 
1120
1150
    def _flush_manager(self):
1121
1151
        if self.manager is not None:
1122
1152
            for factory in self.manager.get_record_stream():
1127
1157
    def yield_factories(self, full_flush=False):
1128
1158
        """Yield factories for keys added since the last yield.  They will be
1129
1159
        returned in the order they were added via add_key.
1130
 
        
 
1160
 
1131
1161
        :param full_flush: by default, some results may not be returned in case
1132
1162
            they can be part of the next batch.  If full_flush is True, then
1133
1163
            all results are returned.
1161
1191
                    memos_to_get_stack.pop()
1162
1192
                else:
1163
1193
                    block = self.batch_memos[read_memo]
1164
 
                self.manager = _LazyGroupContentManager(block)
 
1194
                self.manager = _LazyGroupContentManager(block,
 
1195
                    get_compressor_settings=self._get_compressor_settings)
1165
1196
                self.last_read_memo = read_memo
1166
1197
            start, end = index_memo[3:5]
1167
1198
            self.manager.add_factory(key, parents, start, end)
1177
1208
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1178
1209
    """A group-compress based VersionedFiles implementation."""
1179
1210
 
 
1211
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1212
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1213
    # However each of these references costs some memory in trade against a
 
1214
    # more accurate match result. For very large files, they either are
 
1215
    # pre-compressed and change in bulk whenever they change, or change in just
 
1216
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1217
    # versus running out of memory trying to track everything. The default max
 
1218
    # gives 100% sampling of a 1MB file.
 
1219
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1220
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1221
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
 
1222
 
1180
1223
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1181
 
            _group_cache=None):
 
1224
                 _group_cache=None):
1182
1225
        """Create a GroupCompressVersionedFiles object.
1183
1226
 
1184
1227
        :param index: The index object storing access and graph data.
1197
1240
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1198
1241
        self._group_cache = _group_cache
1199
1242
        self._immediate_fallback_vfs = []
 
1243
        self._max_bytes_to_index = None
1200
1244
 
1201
1245
    def without_fallbacks(self):
1202
1246
        """Return a clone of this object without any fallbacks configured."""
1212
1256
        :param key: The key tuple of the text to add.
1213
1257
        :param parents: The parents key tuples of the text to add.
1214
1258
        :param lines: A list of lines. Each line must be a bytestring. And all
1215
 
            of them except the last must be terminated with \n and contain no
1216
 
            other \n's. The last line may either contain no \n's or a single
1217
 
            terminating \n. If the lines list does meet this constraint the add
1218
 
            routine may error or may succeed - but you will be unable to read
1219
 
            the data back accurately. (Checking the lines have been split
 
1259
            of them except the last must be terminated with \\n and contain no
 
1260
            other \\n's. The last line may either contain no \\n's or a single
 
1261
            terminating \\n. If the lines list does meet this constraint the
 
1262
            add routine may error or may succeed - but you will be unable to
 
1263
            read the data back accurately. (Checking the lines have been split
1220
1264
            correctly is expensive and extremely unlikely to catch bugs so it
1221
1265
            is not done at runtime unless check_content is True.)
1222
1266
        :param parent_texts: An optional dictionary containing the opaque
1317
1361
            self._check_lines_not_unicode(lines)
1318
1362
            self._check_lines_are_lines(lines)
1319
1363
 
1320
 
    def get_known_graph_ancestry(self, keys):
1321
 
        """Get a KnownGraph instance with the ancestry of keys."""
1322
 
        # Note that this is identical to
1323
 
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1324
 
        # ancestry.
1325
 
        parent_map, missing_keys = self._index.find_ancestry(keys)
1326
 
        for fallback in self._transitive_fallbacks():
1327
 
            if not missing_keys:
1328
 
                break
1329
 
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1330
 
                                                missing_keys)
1331
 
            parent_map.update(f_parent_map)
1332
 
            missing_keys = f_missing_keys
1333
 
        kg = _mod_graph.KnownGraph(parent_map)
1334
 
        return kg
1335
 
 
1336
1364
    def get_parent_map(self, keys):
1337
1365
        """Get a map of the graph parents of keys.
1338
1366
 
1476
1504
 
1477
1505
        The returned objects should be in the order defined by 'ordering',
1478
1506
        which can weave between different sources.
 
1507
 
1479
1508
        :param ordering: Must be one of 'topological' or 'groupcompress'
1480
1509
        :return: List of [(source, [keys])] tuples, such that all keys are in
1481
1510
            the defined order, regardless of source.
1576
1605
        #  - we encounter an unadded ref, or
1577
1606
        #  - we run out of keys, or
1578
1607
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1579
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1608
        batcher = _BatchingBlockFetcher(self, locations,
 
1609
            get_compressor_settings=self._get_compressor_settings)
1580
1610
        for source, keys in source_keys:
1581
1611
            if source is self:
1582
1612
                for key in keys:
1628
1658
        for _ in self._insert_record_stream(stream, random_id=False):
1629
1659
            pass
1630
1660
 
 
1661
    def _get_compressor_settings(self):
 
1662
        if self._max_bytes_to_index is None:
 
1663
            # TODO: VersionedFiles don't know about their containing
 
1664
            #       repository, so they don't have much of an idea about their
 
1665
            #       location. So for now, this is only a global option.
 
1666
            c = config.GlobalConfig()
 
1667
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1668
            if val is not None:
 
1669
                try:
 
1670
                    val = int(val)
 
1671
                except ValueError, e:
 
1672
                    trace.warning('Value for '
 
1673
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1674
                                  ' %r is not an integer'
 
1675
                                  % (val,))
 
1676
                    val = None
 
1677
            if val is None:
 
1678
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1679
            self._max_bytes_to_index = val
 
1680
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1681
 
 
1682
    def _make_group_compressor(self):
 
1683
        return GroupCompressor(self._get_compressor_settings())
 
1684
 
1631
1685
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1632
1686
                              reuse_blocks=True):
1633
1687
        """Internal core to insert a record stream into this container.
1656
1710
                return adapter
1657
1711
        # This will go up to fulltexts for gc to gc fetching, which isn't
1658
1712
        # ideal.
1659
 
        self._compressor = GroupCompressor()
 
1713
        self._compressor = self._make_group_compressor()
1660
1714
        self._unadded_refs = {}
1661
1715
        keys_to_add = []
1662
1716
        def flush():
1663
1717
            bytes_len, chunks = self._compressor.flush().to_chunks()
1664
 
            self._compressor = GroupCompressor()
 
1718
            self._compressor = self._make_group_compressor()
1665
1719
            # Note: At this point we still have 1 copy of the fulltext (in
1666
1720
            #       record and the var 'bytes'), and this generates 2 copies of
1667
1721
            #       the compressed text (one for bytes, one in chunks)
2078
2132
        :param keys: An iterable of keys.
2079
2133
        :return: A dict of key:
2080
2134
            (index_memo, compression_parent, parents, record_details).
2081
 
            index_memo
2082
 
                opaque structure to pass to read_records to extract the raw
2083
 
                data
2084
 
            compression_parent
2085
 
                Content that this record is built upon, may be None
2086
 
            parents
2087
 
                Logical parents of this node
2088
 
            record_details
2089
 
                extra information about the content which needs to be passed to
2090
 
                Factory.parse_record
 
2135
 
 
2136
            * index_memo: opaque structure to pass to read_records to extract
 
2137
              the raw data
 
2138
            * compression_parent: Content that this record is built upon, may
 
2139
              be None
 
2140
            * parents: Logical parents of this node
 
2141
            * record_details: extra information about the content which needs
 
2142
              to be passed to Factory.parse_record
2091
2143
        """
2092
2144
        self._check_read()
2093
2145
        result = {}