~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
lazy_import(globals(), """
28
28
from bzrlib import (
29
29
    annotate,
30
 
    config,
31
30
    debug,
32
31
    errors,
33
32
    graph as _mod_graph,
466
465
                # Grab and cache the raw bytes for this entry
467
466
                # and break the ref-cycle with _manager since we don't need it
468
467
                # anymore
469
 
                try:
470
 
                    self._manager._prepare_for_extract()
471
 
                except zlib.error as value:
472
 
                    raise errors.DecompressCorruption("zlib: " + str(value))
 
468
                self._manager._prepare_for_extract()
473
469
                block = self._manager._block
474
470
                self._bytes = block.extract(self.key, self._start, self._end)
475
471
                # There are code paths that first extract as fulltext, and then
494
490
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
491
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
496
492
 
497
 
    def __init__(self, block, get_compressor_settings=None):
 
493
    def __init__(self, block):
498
494
        self._block = block
499
495
        # We need to preserve the ordering
500
496
        self._factories = []
501
497
        self._last_byte = 0
502
 
        self._get_settings = get_compressor_settings
503
 
        self._compressor_settings = None
504
 
 
505
 
    def _get_compressor_settings(self):
506
 
        if self._compressor_settings is not None:
507
 
            return self._compressor_settings
508
 
        settings = None
509
 
        if self._get_settings is not None:
510
 
            settings = self._get_settings()
511
 
        if settings is None:
512
 
            vf = GroupCompressVersionedFiles
513
 
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
 
        self._compressor_settings = settings
515
 
        return self._compressor_settings
516
498
 
517
499
    def add_factory(self, key, parents, start, end):
518
500
        if not self._factories:
551
533
        new_block.set_content(self._block._content[:last_byte])
552
534
        self._block = new_block
553
535
 
554
 
    def _make_group_compressor(self):
555
 
        return GroupCompressor(self._get_compressor_settings())
556
 
 
557
536
    def _rebuild_block(self):
558
537
        """Create a new GroupCompressBlock with only the referenced texts."""
559
 
        compressor = self._make_group_compressor()
 
538
        compressor = GroupCompressor()
560
539
        tstart = time.time()
561
540
        old_length = self._block._content_length
562
541
        end_point = 0
574
553
        #       block? It seems hard to come up with a method that it would
575
554
        #       expand, since we do full compression again. Perhaps based on a
576
555
        #       request that ends up poorly ordered?
577
 
        # TODO: If the content would have expanded, then we would want to
578
 
        #       handle a case where we need to split the block.
579
 
        #       Now that we have a user-tweakable option
580
 
        #       (max_bytes_to_index), it is possible that one person set it
581
 
        #       to a very low value, causing poor compression.
582
556
        delta = time.time() - tstart
583
557
        self._block = new_block
584
558
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
807
781
 
808
782
class _CommonGroupCompressor(object):
809
783
 
810
 
    def __init__(self, settings=None):
 
784
    def __init__(self):
811
785
        """Create a GroupCompressor."""
812
786
        self.chunks = []
813
787
        self._last = None
816
790
        self.labels_deltas = {}
817
791
        self._delta_index = None # Set by the children
818
792
        self._block = GroupCompressBlock()
819
 
        if settings is None:
820
 
            self._settings = {}
821
 
        else:
822
 
            self._settings = settings
823
793
 
824
794
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
825
795
        """Compress lines with label key.
940
910
 
941
911
class PythonGroupCompressor(_CommonGroupCompressor):
942
912
 
943
 
    def __init__(self, settings=None):
 
913
    def __init__(self):
944
914
        """Create a GroupCompressor.
945
915
 
946
916
        Used only if the pyrex version is not available.
947
917
        """
948
 
        super(PythonGroupCompressor, self).__init__(settings)
 
918
        super(PythonGroupCompressor, self).__init__()
949
919
        self._delta_index = LinesDeltaIndex([])
950
920
        # The actual content is managed by LinesDeltaIndex
951
921
        self.chunks = self._delta_index.lines
988
958
 
989
959
    It contains code very similar to SequenceMatcher because of having a similar
990
960
    task. However some key differences apply:
991
 
 
992
 
    * there is no junk, we want a minimal edit not a human readable diff.
993
 
    * we don't filter very common lines (because we don't know where a good
994
 
      range will start, and after the first text we want to be emitting minmal
995
 
      edits only.
996
 
    * we chain the left side, not the right side
997
 
    * we incrementally update the adjacency matrix as new lines are provided.
998
 
    * we look for matches in all of the left side, so the routine which does
999
 
      the analagous task of find_longest_match does not need to filter on the
1000
 
      left side.
 
961
     - there is no junk, we want a minimal edit not a human readable diff.
 
962
     - we don't filter very common lines (because we don't know where a good
 
963
       range will start, and after the first text we want to be emitting minmal
 
964
       edits only.
 
965
     - we chain the left side, not the right side
 
966
     - we incrementally update the adjacency matrix as new lines are provided.
 
967
     - we look for matches in all of the left side, so the routine which does
 
968
       the analagous task of find_longest_match does not need to filter on the
 
969
       left side.
1001
970
    """
1002
971
 
1003
 
    def __init__(self, settings=None):
1004
 
        super(PyrexGroupCompressor, self).__init__(settings)
1005
 
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
 
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
972
    def __init__(self):
 
973
        super(PyrexGroupCompressor, self).__init__()
 
974
        self._delta_index = DeltaIndex()
1007
975
 
1008
976
    def _compress(self, key, bytes, max_delta_size, soft=False):
1009
977
        """see _CommonGroupCompressor._compress"""
1100
1068
 
1101
1069
class _BatchingBlockFetcher(object):
1102
1070
    """Fetch group compress blocks in batches.
1103
 
 
 
1071
    
1104
1072
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1105
1073
        currently pending batch.
1106
1074
    """
1107
1075
 
1108
 
    def __init__(self, gcvf, locations, get_compressor_settings=None):
 
1076
    def __init__(self, gcvf, locations):
1109
1077
        self.gcvf = gcvf
1110
1078
        self.locations = locations
1111
1079
        self.keys = []
1114
1082
        self.total_bytes = 0
1115
1083
        self.last_read_memo = None
1116
1084
        self.manager = None
1117
 
        self._get_compressor_settings = get_compressor_settings
1118
1085
 
1119
1086
    def add_key(self, key):
1120
1087
        """Add another to key to fetch.
1121
 
 
 
1088
        
1122
1089
        :return: The estimated number of bytes needed to fetch the batch so
1123
1090
            far.
1124
1091
        """
1149
1116
            # and then.
1150
1117
            self.batch_memos[read_memo] = cached_block
1151
1118
        return self.total_bytes
1152
 
 
 
1119
        
1153
1120
    def _flush_manager(self):
1154
1121
        if self.manager is not None:
1155
1122
            for factory in self.manager.get_record_stream():
1160
1127
    def yield_factories(self, full_flush=False):
1161
1128
        """Yield factories for keys added since the last yield.  They will be
1162
1129
        returned in the order they were added via add_key.
1163
 
 
 
1130
        
1164
1131
        :param full_flush: by default, some results may not be returned in case
1165
1132
            they can be part of the next batch.  If full_flush is True, then
1166
1133
            all results are returned.
1194
1161
                    memos_to_get_stack.pop()
1195
1162
                else:
1196
1163
                    block = self.batch_memos[read_memo]
1197
 
                self.manager = _LazyGroupContentManager(block,
1198
 
                    get_compressor_settings=self._get_compressor_settings)
 
1164
                self.manager = _LazyGroupContentManager(block)
1199
1165
                self.last_read_memo = read_memo
1200
1166
            start, end = index_memo[3:5]
1201
1167
            self.manager.add_factory(key, parents, start, end)
1211
1177
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1212
1178
    """A group-compress based VersionedFiles implementation."""
1213
1179
 
1214
 
    # This controls how the GroupCompress DeltaIndex works. Basically, we
1215
 
    # compute hash pointers into the source blocks (so hash(text) => text).
1216
 
    # However each of these references costs some memory in trade against a
1217
 
    # more accurate match result. For very large files, they either are
1218
 
    # pre-compressed and change in bulk whenever they change, or change in just
1219
 
    # local blocks. Either way, 'improved resolution' is not very helpful,
1220
 
    # versus running out of memory trying to track everything. The default max
1221
 
    # gives 100% sampling of a 1MB file.
1222
 
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
 
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
 
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
1225
 
 
1226
1180
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1227
 
                 _group_cache=None):
 
1181
            _group_cache=None):
1228
1182
        """Create a GroupCompressVersionedFiles object.
1229
1183
 
1230
1184
        :param index: The index object storing access and graph data.
1243
1197
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
1198
        self._group_cache = _group_cache
1245
1199
        self._immediate_fallback_vfs = []
1246
 
        self._max_bytes_to_index = None
1247
1200
 
1248
1201
    def without_fallbacks(self):
1249
1202
        """Return a clone of this object without any fallbacks configured."""
1259
1212
        :param key: The key tuple of the text to add.
1260
1213
        :param parents: The parents key tuples of the text to add.
1261
1214
        :param lines: A list of lines. Each line must be a bytestring. And all
1262
 
            of them except the last must be terminated with \\n and contain no
1263
 
            other \\n's. The last line may either contain no \\n's or a single
1264
 
            terminating \\n. If the lines list does meet this constraint the
1265
 
            add routine may error or may succeed - but you will be unable to
1266
 
            read the data back accurately. (Checking the lines have been split
 
1215
            of them except the last must be terminated with \n and contain no
 
1216
            other \n's. The last line may either contain no \n's or a single
 
1217
            terminating \n. If the lines list does meet this constraint the add
 
1218
            routine may error or may succeed - but you will be unable to read
 
1219
            the data back accurately. (Checking the lines have been split
1267
1220
            correctly is expensive and extremely unlikely to catch bugs so it
1268
1221
            is not done at runtime unless check_content is True.)
1269
1222
        :param parent_texts: An optional dictionary containing the opaque
1364
1317
            self._check_lines_not_unicode(lines)
1365
1318
            self._check_lines_are_lines(lines)
1366
1319
 
 
1320
    def get_known_graph_ancestry(self, keys):
 
1321
        """Get a KnownGraph instance with the ancestry of keys."""
 
1322
        # Note that this is identical to
 
1323
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
 
1324
        # ancestry.
 
1325
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1326
        for fallback in self._transitive_fallbacks():
 
1327
            if not missing_keys:
 
1328
                break
 
1329
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1330
                                                missing_keys)
 
1331
            parent_map.update(f_parent_map)
 
1332
            missing_keys = f_missing_keys
 
1333
        kg = _mod_graph.KnownGraph(parent_map)
 
1334
        return kg
 
1335
 
1367
1336
    def get_parent_map(self, keys):
1368
1337
        """Get a map of the graph parents of keys.
1369
1338
 
1507
1476
 
1508
1477
        The returned objects should be in the order defined by 'ordering',
1509
1478
        which can weave between different sources.
1510
 
 
1511
1479
        :param ordering: Must be one of 'topological' or 'groupcompress'
1512
1480
        :return: List of [(source, [keys])] tuples, such that all keys are in
1513
1481
            the defined order, regardless of source.
1608
1576
        #  - we encounter an unadded ref, or
1609
1577
        #  - we run out of keys, or
1610
1578
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1611
 
        batcher = _BatchingBlockFetcher(self, locations,
1612
 
            get_compressor_settings=self._get_compressor_settings)
 
1579
        batcher = _BatchingBlockFetcher(self, locations)
1613
1580
        for source, keys in source_keys:
1614
1581
            if source is self:
1615
1582
                for key in keys:
1661
1628
        for _ in self._insert_record_stream(stream, random_id=False):
1662
1629
            pass
1663
1630
 
1664
 
    def _get_compressor_settings(self):
1665
 
        if self._max_bytes_to_index is None:
1666
 
            # TODO: VersionedFiles don't know about their containing
1667
 
            #       repository, so they don't have much of an idea about their
1668
 
            #       location. So for now, this is only a global option.
1669
 
            c = config.GlobalConfig()
1670
 
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
 
            if val is not None:
1672
 
                try:
1673
 
                    val = int(val)
1674
 
                except ValueError, e:
1675
 
                    trace.warning('Value for '
1676
 
                                  '"bzr.groupcompress.max_bytes_to_index"'
1677
 
                                  ' %r is not an integer'
1678
 
                                  % (val,))
1679
 
                    val = None
1680
 
            if val is None:
1681
 
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
 
            self._max_bytes_to_index = val
1683
 
        return {'max_bytes_to_index': self._max_bytes_to_index}
1684
 
 
1685
 
    def _make_group_compressor(self):
1686
 
        return GroupCompressor(self._get_compressor_settings())
1687
 
 
1688
1631
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1632
                              reuse_blocks=True):
1690
1633
        """Internal core to insert a record stream into this container.
1713
1656
                return adapter
1714
1657
        # This will go up to fulltexts for gc to gc fetching, which isn't
1715
1658
        # ideal.
1716
 
        self._compressor = self._make_group_compressor()
 
1659
        self._compressor = GroupCompressor()
1717
1660
        self._unadded_refs = {}
1718
1661
        keys_to_add = []
1719
1662
        def flush():
1720
1663
            bytes_len, chunks = self._compressor.flush().to_chunks()
1721
 
            self._compressor = self._make_group_compressor()
 
1664
            self._compressor = GroupCompressor()
1722
1665
            # Note: At this point we still have 1 copy of the fulltext (in
1723
1666
            #       record and the var 'bytes'), and this generates 2 copies of
1724
1667
            #       the compressed text (one for bytes, one in chunks)
2135
2078
        :param keys: An iterable of keys.
2136
2079
        :return: A dict of key:
2137
2080
            (index_memo, compression_parent, parents, record_details).
2138
 
 
2139
 
            * index_memo: opaque structure to pass to read_records to extract
2140
 
              the raw data
2141
 
            * compression_parent: Content that this record is built upon, may
2142
 
              be None
2143
 
            * parents: Logical parents of this node
2144
 
            * record_details: extra information about the content which needs
2145
 
              to be passed to Factory.parse_record
 
2081
            index_memo
 
2082
                opaque structure to pass to read_records to extract the raw
 
2083
                data
 
2084
            compression_parent
 
2085
                Content that this record is built upon, may be None
 
2086
            parents
 
2087
                Logical parents of this node
 
2088
            record_details
 
2089
                extra information about the content which needs to be passed to
 
2090
                Factory.parse_record
2146
2091
        """
2147
2092
        self._check_read()
2148
2093
        result = {}