~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2011-05-15 16:07:11 UTC
  • mto: (5743.13.4 config-editor-option)
  • mto: This revision was merged to the branch mainline in revision 5944.
  • Revision ID: v.ladeuil+lp@free.fr-20110515160711-l0og3ady9wuahz1k
Stack.get() provides the registered option default value.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
 
26
from bzrlib.lazy_import import lazy_import
 
27
lazy_import(globals(), """
26
28
from bzrlib import (
27
29
    annotate,
28
30
    debug,
29
31
    errors,
30
32
    graph as _mod_graph,
31
 
    knit,
32
33
    osutils,
33
34
    pack,
34
35
    static_tuple,
35
36
    trace,
 
37
    tsort,
36
38
    )
 
39
 
 
40
from bzrlib.repofmt import pack_repo
 
41
""")
 
42
 
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
 
46
    _KeyRefs,
41
47
    adapter_registry,
42
48
    AbsentContentFactory,
43
49
    ChunkedContentFactory,
77
83
 
78
84
    present_keys = []
79
85
    for prefix in sorted(per_prefix_map):
80
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
86
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
87
    return present_keys
82
88
 
83
89
 
101
107
    def __init__(self):
102
108
        # map by key? or just order in file?
103
109
        self._compressor_name = None
104
 
        self._z_content = None
 
110
        self._z_content_chunks = None
105
111
        self._z_content_decompressor = None
106
112
        self._z_content_length = None
107
113
        self._content_length = None
135
141
                self._content = ''.join(self._content_chunks)
136
142
                self._content_chunks = None
137
143
        if self._content is None:
138
 
            if self._z_content is None:
 
144
            # We join self._z_content_chunks here, because if we are
 
145
            # decompressing, then it is *very* likely that we have a single
 
146
            # chunk
 
147
            if self._z_content_chunks is None:
139
148
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
149
            z_content = ''.join(self._z_content_chunks)
 
150
            if z_content == '':
141
151
                self._content = ''
142
152
            elif self._compressor_name == 'lzma':
143
153
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
154
                self._content = pylzma.decompress(z_content)
145
155
            elif self._compressor_name == 'zlib':
146
156
                # Start a zlib decompressor
147
157
                if num_bytes * 4 > self._content_length * 3:
148
158
                    # If we are requesting more that 3/4ths of the content,
149
159
                    # just extract the whole thing in a single pass
150
160
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
161
                    self._content = zlib.decompress(z_content)
152
162
                else:
153
163
                    self._z_content_decompressor = zlib.decompressobj()
154
164
                    # Seed the decompressor with the uncompressed bytes, so
155
165
                    # that the rest of the code is simplified
156
166
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
167
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
168
                    if not self._z_content_decompressor.unconsumed_tail:
159
169
                        self._z_content_decompressor = None
160
170
            else:
207
217
            # XXX: Define some GCCorrupt error ?
208
218
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
219
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
220
        self._z_content_chunks = (bytes[pos:],)
 
221
 
 
222
    @property
 
223
    def _z_content(self):
 
224
        """Return z_content_chunks as a simple string.
 
225
 
 
226
        Meant only to be used by the test suite.
 
227
        """
 
228
        if self._z_content_chunks is not None:
 
229
            return ''.join(self._z_content_chunks)
 
230
        return None
211
231
 
212
232
    @classmethod
213
233
    def from_bytes(cls, bytes):
269
289
        self._content_length = length
270
290
        self._content_chunks = content_chunks
271
291
        self._content = None
272
 
        self._z_content = None
 
292
        self._z_content_chunks = None
273
293
 
274
294
    def set_content(self, content):
275
295
        """Set the content of this block."""
276
296
        self._content_length = len(content)
277
297
        self._content = content
278
 
        self._z_content = None
 
298
        self._z_content_chunks = None
279
299
 
280
300
    def _create_z_content_using_lzma(self):
281
301
        if self._content_chunks is not None:
283
303
            self._content_chunks = None
284
304
        if self._content is None:
285
305
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
306
        z_content = pylzma.compress(self._content)
 
307
        self._z_content_chunks = (z_content,)
 
308
        self._z_content_length = len(z_content)
288
309
 
289
 
    def _create_z_content_from_chunks(self):
 
310
    def _create_z_content_from_chunks(self, chunks):
290
311
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
312
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
313
        # (measured peak is maybe 30MB over the above...)
 
314
        compressed_chunks = map(compressor.compress, chunks)
292
315
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
316
        # Ignore empty chunks
 
317
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
318
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
319
 
296
320
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
321
        if self._z_content_chunks is not None:
298
322
            return
299
323
        if _USE_LZMA:
300
324
            self._create_z_content_using_lzma()
301
325
            return
302
326
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
327
            chunks = self._content_chunks
 
328
        else:
 
329
            chunks = (self._content,)
 
330
        self._create_z_content_from_chunks(chunks)
307
331
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
332
    def to_chunks(self):
 
333
        """Create the byte stream as a series of 'chunks'"""
310
334
        self._create_z_content()
311
335
        if _USE_LZMA:
312
336
            header = self.GCB_LZ_HEADER
313
337
        else:
314
338
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
339
        chunks = ['%s%d\n%d\n'
 
340
                  % (header, self._z_content_length, self._content_length),
318
341
                 ]
 
342
        chunks.extend(self._z_content_chunks)
 
343
        total_len = sum(map(len, chunks))
 
344
        return total_len, chunks
 
345
 
 
346
    def to_bytes(self):
 
347
        """Encode the information into a byte stream."""
 
348
        total_len, chunks = self.to_chunks()
319
349
        return ''.join(chunks)
320
350
 
321
351
    def _dump(self, include_text=False):
679
709
        z_header_bytes = zlib.compress(header_bytes)
680
710
        del header_bytes
681
711
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
712
        block_bytes_len, block_chunks = self._block.to_chunks()
683
713
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
714
                                       block_bytes_len))
685
715
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
716
        lines.extend(block_chunks)
 
717
        del z_header_bytes, block_chunks
 
718
        # TODO: This is a point where we will double the memory consumption. To
 
719
        #       avoid this, we probably have to switch to a 'chunked' api
688
720
        return ''.join(lines)
689
721
 
690
722
    @classmethod
691
723
    def from_bytes(cls, bytes):
692
724
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
725
        #       different way. At a minimum this creates 2 copies of the
 
726
        #       compressed content
694
727
        (storage_kind, z_header_len, header_len,
695
728
         block_len, rest) = bytes.split('\n', 4)
696
729
        del bytes
854
887
 
855
888
        After calling this, the compressor should no longer be used
856
889
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
890
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
891
        self.chunks = None
867
892
        self._delta_index = None
1027
1052
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1053
            add_callback=graph_index.add_nodes,
1029
1054
            inconsistency_fatal=inconsistency_fatal)
1030
 
        access = knit._DirectPackAccess({})
 
1055
        access = pack_repo._DirectPackAccess({})
1031
1056
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1057
        result = GroupCompressVersionedFiles(index, access, delta)
1033
1058
        result.stream = stream
1152
1177
class GroupCompressVersionedFiles(VersionedFiles):
1153
1178
    """A group-compress based VersionedFiles implementation."""
1154
1179
 
1155
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
1180
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1181
            _group_cache=None):
1156
1182
        """Create a GroupCompressVersionedFiles object.
1157
1183
 
1158
1184
        :param index: The index object storing access and graph data.
1159
1185
        :param access: The access object storing raw data.
1160
1186
        :param delta: Whether to delta compress or just entropy compress.
1161
1187
        :param _unadded_refs: private parameter, don't use.
 
1188
        :param _group_cache: private parameter, don't use.
1162
1189
        """
1163
1190
        self._index = index
1164
1191
        self._access = access
1166
1193
        if _unadded_refs is None:
1167
1194
            _unadded_refs = {}
1168
1195
        self._unadded_refs = _unadded_refs
1169
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
 
        self._fallback_vfs = []
 
1196
        if _group_cache is None:
 
1197
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1198
        self._group_cache = _group_cache
 
1199
        self._immediate_fallback_vfs = []
1171
1200
 
1172
1201
    def without_fallbacks(self):
1173
1202
        """Return a clone of this object without any fallbacks configured."""
1174
1203
        return GroupCompressVersionedFiles(self._index, self._access,
1175
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
 
1204
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1205
            _group_cache=self._group_cache)
1176
1206
 
1177
1207
    def add_lines(self, key, parents, lines, parent_texts=None,
1178
1208
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1247
1277
 
1248
1278
        :param a_versioned_files: A VersionedFiles object.
1249
1279
        """
1250
 
        self._fallback_vfs.append(a_versioned_files)
 
1280
        self._immediate_fallback_vfs.append(a_versioned_files)
1251
1281
 
1252
1282
    def annotate(self, key):
1253
1283
        """See VersionedFiles.annotate."""
1293
1323
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
1324
        # ancestry.
1295
1325
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
 
1326
        for fallback in self._transitive_fallbacks():
1297
1327
            if not missing_keys:
1298
1328
                break
1299
1329
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1323
1353
            and so on.
1324
1354
        """
1325
1355
        result = {}
1326
 
        sources = [self._index] + self._fallback_vfs
 
1356
        sources = [self._index] + self._immediate_fallback_vfs
1327
1357
        source_results = []
1328
1358
        missing = set(keys)
1329
1359
        for source in sources:
1430
1460
        parent_map = {}
1431
1461
        key_to_source_map = {}
1432
1462
        source_results = []
1433
 
        for source in self._fallback_vfs:
 
1463
        for source in self._immediate_fallback_vfs:
1434
1464
            if not missing:
1435
1465
                break
1436
1466
            source_parents = source.get_parent_map(missing)
1451
1481
            the defined order, regardless of source.
1452
1482
        """
1453
1483
        if ordering == 'topological':
1454
 
            present_keys = topo_sort(parent_map)
 
1484
            present_keys = tsort.topo_sort(parent_map)
1455
1485
        else:
1456
1486
            # ordering == 'groupcompress'
1457
1487
            # XXX: This only optimizes for the target ordering. We may need
1630
1660
        self._unadded_refs = {}
1631
1661
        keys_to_add = []
1632
1662
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1663
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1664
            self._compressor = GroupCompressor()
 
1665
            # Note: At this point we still have 1 copy of the fulltext (in
 
1666
            #       record and the var 'bytes'), and this generates 2 copies of
 
1667
            #       the compressed text (one for bytes, one in chunks)
 
1668
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1669
            #       have to double compressed memory here
 
1670
            # TODO: Figure out how to indicate that we would be happy to free
 
1671
            #       the fulltext content at this point. Note that sometimes we
 
1672
            #       will want it later (streaming CHK pages), but most of the
 
1673
            #       time we won't (everything else)
 
1674
            bytes = ''.join(chunks)
 
1675
            del chunks
1635
1676
            index, start, length = self._access.add_raw_records(
1636
1677
                [(None, len(bytes))], bytes)[0]
1637
1678
            nodes = []
1802
1843
        """See VersionedFiles.keys."""
1803
1844
        if 'evil' in debug.debug_flags:
1804
1845
            trace.mutter_callsite(2, "keys scales with size of history")
1805
 
        sources = [self._index] + self._fallback_vfs
 
1846
        sources = [self._index] + self._immediate_fallback_vfs
1806
1847
        result = set()
1807
1848
        for source in sources:
1808
1849
            result.update(source.keys())
1891
1932
        # repeated over and over, this creates a surplus of ints
1892
1933
        self._int_cache = {}
1893
1934
        if track_external_parent_refs:
1894
 
            self._key_dependencies = knit._KeyRefs(
 
1935
            self._key_dependencies = _KeyRefs(
1895
1936
                track_new_keys=track_new_keys)
1896
1937
        else:
1897
1938
            self._key_dependencies = None