~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2011-05-17 20:58:43 UTC
  • mfrom: (5609.39.5 2.3)
  • mto: This revision was merged to the branch mainline in revision 5885.
  • Revision ID: v.ladeuil+lp@free.fr-20110517205843-uo1tntrz6kqfn1hb
Merge 2.3 into trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
 
26
from bzrlib.lazy_import import lazy_import
 
27
lazy_import(globals(), """
26
28
from bzrlib import (
27
29
    annotate,
 
30
    config,
28
31
    debug,
29
32
    errors,
30
33
    graph as _mod_graph,
31
 
    knit,
32
34
    osutils,
33
35
    pack,
34
36
    static_tuple,
35
37
    trace,
 
38
    tsort,
36
39
    )
 
40
 
 
41
from bzrlib.repofmt import pack_repo
 
42
""")
 
43
 
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
 
47
    _KeyRefs,
41
48
    adapter_registry,
42
49
    AbsentContentFactory,
43
50
    ChunkedContentFactory,
44
51
    FulltextContentFactory,
45
 
    VersionedFiles,
 
52
    VersionedFilesWithFallbacks,
46
53
    )
47
54
 
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
77
84
 
78
85
    present_keys = []
79
86
    for prefix in sorted(per_prefix_map):
80
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
87
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
88
    return present_keys
82
89
 
83
90
 
484
491
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
492
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
486
493
 
487
 
    def __init__(self, block):
 
494
    def __init__(self, block, get_compressor_settings=None):
488
495
        self._block = block
489
496
        # We need to preserve the ordering
490
497
        self._factories = []
491
498
        self._last_byte = 0
 
499
        self._get_settings = get_compressor_settings
 
500
        self._compressor_settings = None
 
501
 
 
502
    def _get_compressor_settings(self):
 
503
        if self._compressor_settings is not None:
 
504
            return self._compressor_settings
 
505
        settings = None
 
506
        if self._get_settings is not None:
 
507
            settings = self._get_settings()
 
508
        if settings is None:
 
509
            vf = GroupCompressVersionedFiles
 
510
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
511
        self._compressor_settings = settings
 
512
        return self._compressor_settings
492
513
 
493
514
    def add_factory(self, key, parents, start, end):
494
515
        if not self._factories:
527
548
        new_block.set_content(self._block._content[:last_byte])
528
549
        self._block = new_block
529
550
 
 
551
    def _make_group_compressor(self):
 
552
        return GroupCompressor(self._get_compressor_settings())
 
553
 
530
554
    def _rebuild_block(self):
531
555
        """Create a new GroupCompressBlock with only the referenced texts."""
532
 
        compressor = GroupCompressor()
 
556
        compressor = self._make_group_compressor()
533
557
        tstart = time.time()
534
558
        old_length = self._block._content_length
535
559
        end_point = 0
547
571
        #       block? It seems hard to come up with a method that it would
548
572
        #       expand, since we do full compression again. Perhaps based on a
549
573
        #       request that ends up poorly ordered?
 
574
        # TODO: If the content would have expanded, then we would want to
 
575
        #       handle a case where we need to split the block.
 
576
        #       Now that we have a user-tweakable option
 
577
        #       (max_bytes_to_index), it is possible that one person set it
 
578
        #       to a very low value, causing poor compression.
550
579
        delta = time.time() - tstart
551
580
        self._block = new_block
552
581
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
775
804
 
776
805
class _CommonGroupCompressor(object):
777
806
 
778
 
    def __init__(self):
 
807
    def __init__(self, settings=None):
779
808
        """Create a GroupCompressor."""
780
809
        self.chunks = []
781
810
        self._last = None
784
813
        self.labels_deltas = {}
785
814
        self._delta_index = None # Set by the children
786
815
        self._block = GroupCompressBlock()
 
816
        if settings is None:
 
817
            self._settings = {}
 
818
        else:
 
819
            self._settings = settings
787
820
 
788
821
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
789
822
        """Compress lines with label key.
904
937
 
905
938
class PythonGroupCompressor(_CommonGroupCompressor):
906
939
 
907
 
    def __init__(self):
 
940
    def __init__(self, settings=None):
908
941
        """Create a GroupCompressor.
909
942
 
910
943
        Used only if the pyrex version is not available.
911
944
        """
912
 
        super(PythonGroupCompressor, self).__init__()
 
945
        super(PythonGroupCompressor, self).__init__(settings)
913
946
        self._delta_index = LinesDeltaIndex([])
914
947
        # The actual content is managed by LinesDeltaIndex
915
948
        self.chunks = self._delta_index.lines
963
996
       left side.
964
997
    """
965
998
 
966
 
    def __init__(self):
967
 
        super(PyrexGroupCompressor, self).__init__()
968
 
        self._delta_index = DeltaIndex()
 
999
    def __init__(self, settings=None):
 
1000
        super(PyrexGroupCompressor, self).__init__(settings)
 
1001
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
1002
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
969
1003
 
970
1004
    def _compress(self, key, bytes, max_delta_size, soft=False):
971
1005
        """see _CommonGroupCompressor._compress"""
1046
1080
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1047
1081
            add_callback=graph_index.add_nodes,
1048
1082
            inconsistency_fatal=inconsistency_fatal)
1049
 
        access = knit._DirectPackAccess({})
 
1083
        access = pack_repo._DirectPackAccess({})
1050
1084
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1051
1085
        result = GroupCompressVersionedFiles(index, access, delta)
1052
1086
        result.stream = stream
1062
1096
 
1063
1097
class _BatchingBlockFetcher(object):
1064
1098
    """Fetch group compress blocks in batches.
1065
 
    
 
1099
 
1066
1100
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1067
1101
        currently pending batch.
1068
1102
    """
1069
1103
 
1070
 
    def __init__(self, gcvf, locations):
 
1104
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1071
1105
        self.gcvf = gcvf
1072
1106
        self.locations = locations
1073
1107
        self.keys = []
1076
1110
        self.total_bytes = 0
1077
1111
        self.last_read_memo = None
1078
1112
        self.manager = None
 
1113
        self._get_compressor_settings = get_compressor_settings
1079
1114
 
1080
1115
    def add_key(self, key):
1081
1116
        """Add another to key to fetch.
1082
 
        
 
1117
 
1083
1118
        :return: The estimated number of bytes needed to fetch the batch so
1084
1119
            far.
1085
1120
        """
1110
1145
            # and then.
1111
1146
            self.batch_memos[read_memo] = cached_block
1112
1147
        return self.total_bytes
1113
 
        
 
1148
 
1114
1149
    def _flush_manager(self):
1115
1150
        if self.manager is not None:
1116
1151
            for factory in self.manager.get_record_stream():
1121
1156
    def yield_factories(self, full_flush=False):
1122
1157
        """Yield factories for keys added since the last yield.  They will be
1123
1158
        returned in the order they were added via add_key.
1124
 
        
 
1159
 
1125
1160
        :param full_flush: by default, some results may not be returned in case
1126
1161
            they can be part of the next batch.  If full_flush is True, then
1127
1162
            all results are returned.
1155
1190
                    memos_to_get_stack.pop()
1156
1191
                else:
1157
1192
                    block = self.batch_memos[read_memo]
1158
 
                self.manager = _LazyGroupContentManager(block)
 
1193
                self.manager = _LazyGroupContentManager(block,
 
1194
                    get_compressor_settings=self._get_compressor_settings)
1159
1195
                self.last_read_memo = read_memo
1160
1196
            start, end = index_memo[3:5]
1161
1197
            self.manager.add_factory(key, parents, start, end)
1168
1204
        self.total_bytes = 0
1169
1205
 
1170
1206
 
1171
 
class GroupCompressVersionedFiles(VersionedFiles):
 
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1172
1208
    """A group-compress based VersionedFiles implementation."""
1173
1209
 
1174
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
1210
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1211
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1212
    # However each of these references costs some memory in trade against a
 
1213
    # more accurate match result. For very large files, they either are
 
1214
    # pre-compressed and change in bulk whenever they change, or change in just
 
1215
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1216
    # versus running out of memory trying to track everything. The default max
 
1217
    # gives 100% sampling of a 1MB file.
 
1218
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1219
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1220
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
 
1221
 
 
1222
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1223
                 _group_cache=None):
1175
1224
        """Create a GroupCompressVersionedFiles object.
1176
1225
 
1177
1226
        :param index: The index object storing access and graph data.
1178
1227
        :param access: The access object storing raw data.
1179
1228
        :param delta: Whether to delta compress or just entropy compress.
1180
1229
        :param _unadded_refs: private parameter, don't use.
 
1230
        :param _group_cache: private parameter, don't use.
1181
1231
        """
1182
1232
        self._index = index
1183
1233
        self._access = access
1185
1235
        if _unadded_refs is None:
1186
1236
            _unadded_refs = {}
1187
1237
        self._unadded_refs = _unadded_refs
1188
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
 
        self._fallback_vfs = []
 
1238
        if _group_cache is None:
 
1239
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1240
        self._group_cache = _group_cache
 
1241
        self._immediate_fallback_vfs = []
 
1242
        self._max_bytes_to_index = None
1190
1243
 
1191
1244
    def without_fallbacks(self):
1192
1245
        """Return a clone of this object without any fallbacks configured."""
1193
1246
        return GroupCompressVersionedFiles(self._index, self._access,
1194
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
 
1247
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1248
            _group_cache=self._group_cache)
1195
1249
 
1196
1250
    def add_lines(self, key, parents, lines, parent_texts=None,
1197
1251
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1266
1320
 
1267
1321
        :param a_versioned_files: A VersionedFiles object.
1268
1322
        """
1269
 
        self._fallback_vfs.append(a_versioned_files)
 
1323
        self._immediate_fallback_vfs.append(a_versioned_files)
1270
1324
 
1271
1325
    def annotate(self, key):
1272
1326
        """See VersionedFiles.annotate."""
1306
1360
            self._check_lines_not_unicode(lines)
1307
1361
            self._check_lines_are_lines(lines)
1308
1362
 
1309
 
    def get_known_graph_ancestry(self, keys):
1310
 
        """Get a KnownGraph instance with the ancestry of keys."""
1311
 
        # Note that this is identical to
1312
 
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1313
 
        # ancestry.
1314
 
        parent_map, missing_keys = self._index.find_ancestry(keys)
1315
 
        for fallback in self._fallback_vfs:
1316
 
            if not missing_keys:
1317
 
                break
1318
 
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1319
 
                                                missing_keys)
1320
 
            parent_map.update(f_parent_map)
1321
 
            missing_keys = f_missing_keys
1322
 
        kg = _mod_graph.KnownGraph(parent_map)
1323
 
        return kg
1324
 
 
1325
1363
    def get_parent_map(self, keys):
1326
1364
        """Get a map of the graph parents of keys.
1327
1365
 
1342
1380
            and so on.
1343
1381
        """
1344
1382
        result = {}
1345
 
        sources = [self._index] + self._fallback_vfs
 
1383
        sources = [self._index] + self._immediate_fallback_vfs
1346
1384
        source_results = []
1347
1385
        missing = set(keys)
1348
1386
        for source in sources:
1449
1487
        parent_map = {}
1450
1488
        key_to_source_map = {}
1451
1489
        source_results = []
1452
 
        for source in self._fallback_vfs:
 
1490
        for source in self._immediate_fallback_vfs:
1453
1491
            if not missing:
1454
1492
                break
1455
1493
            source_parents = source.get_parent_map(missing)
1470
1508
            the defined order, regardless of source.
1471
1509
        """
1472
1510
        if ordering == 'topological':
1473
 
            present_keys = topo_sort(parent_map)
 
1511
            present_keys = tsort.topo_sort(parent_map)
1474
1512
        else:
1475
1513
            # ordering == 'groupcompress'
1476
1514
            # XXX: This only optimizes for the target ordering. We may need
1565
1603
        #  - we encounter an unadded ref, or
1566
1604
        #  - we run out of keys, or
1567
1605
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1568
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1606
        batcher = _BatchingBlockFetcher(self, locations,
 
1607
            get_compressor_settings=self._get_compressor_settings)
1569
1608
        for source, keys in source_keys:
1570
1609
            if source is self:
1571
1610
                for key in keys:
1617
1656
        for _ in self._insert_record_stream(stream, random_id=False):
1618
1657
            pass
1619
1658
 
 
1659
    def _get_compressor_settings(self):
 
1660
        if self._max_bytes_to_index is None:
 
1661
            # TODO: VersionedFiles don't know about their containing
 
1662
            #       repository, so they don't have much of an idea about their
 
1663
            #       location. So for now, this is only a global option.
 
1664
            c = config.GlobalConfig()
 
1665
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1666
            if val is not None:
 
1667
                try:
 
1668
                    val = int(val)
 
1669
                except ValueError, e:
 
1670
                    trace.warning('Value for '
 
1671
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1672
                                  ' %r is not an integer'
 
1673
                                  % (val,))
 
1674
                    val = None
 
1675
            if val is None:
 
1676
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1677
            self._max_bytes_to_index = val
 
1678
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1679
 
 
1680
    def _make_group_compressor(self):
 
1681
        return GroupCompressor(self._get_compressor_settings())
 
1682
 
1620
1683
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1621
1684
                              reuse_blocks=True):
1622
1685
        """Internal core to insert a record stream into this container.
1645
1708
                return adapter
1646
1709
        # This will go up to fulltexts for gc to gc fetching, which isn't
1647
1710
        # ideal.
1648
 
        self._compressor = GroupCompressor()
 
1711
        self._compressor = self._make_group_compressor()
1649
1712
        self._unadded_refs = {}
1650
1713
        keys_to_add = []
1651
1714
        def flush():
1652
1715
            bytes_len, chunks = self._compressor.flush().to_chunks()
1653
 
            self._compressor = GroupCompressor()
 
1716
            self._compressor = self._make_group_compressor()
1654
1717
            # Note: At this point we still have 1 copy of the fulltext (in
1655
1718
            #       record and the var 'bytes'), and this generates 2 copies of
1656
1719
            #       the compressed text (one for bytes, one in chunks)
1832
1895
        """See VersionedFiles.keys."""
1833
1896
        if 'evil' in debug.debug_flags:
1834
1897
            trace.mutter_callsite(2, "keys scales with size of history")
1835
 
        sources = [self._index] + self._fallback_vfs
 
1898
        sources = [self._index] + self._immediate_fallback_vfs
1836
1899
        result = set()
1837
1900
        for source in sources:
1838
1901
            result.update(source.keys())
1921
1984
        # repeated over and over, this creates a surplus of ints
1922
1985
        self._int_cache = {}
1923
1986
        if track_external_parent_refs:
1924
 
            self._key_dependencies = knit._KeyRefs(
 
1987
            self._key_dependencies = _KeyRefs(
1925
1988
                track_new_keys=track_new_keys)
1926
1989
        else:
1927
1990
            self._key_dependencies = None