~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Karl Bielefeldt
  • Date: 2010-09-29 19:57:28 UTC
  • mto: (5483.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5484.
  • Revision ID: 7mq3cbbd9q@snkmail.com-20100929195728-nvuqlepsrwcxbziw
Use meliae to dump memory to a file upon MemoryError.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
26
 
from bzrlib.lazy_import import lazy_import
27
 
lazy_import(globals(), """
28
26
from bzrlib import (
29
27
    annotate,
30
28
    debug,
31
29
    errors,
32
30
    graph as _mod_graph,
 
31
    knit,
33
32
    osutils,
34
33
    pack,
35
34
    static_tuple,
36
35
    trace,
37
 
    tsort,
38
36
    )
39
 
 
40
 
from bzrlib.repofmt import pack_repo
41
 
""")
42
 
 
43
37
from bzrlib.btree_index import BTreeBuilder
44
38
from bzrlib.lru_cache import LRUSizeCache
 
39
from bzrlib.tsort import topo_sort
45
40
from bzrlib.versionedfile import (
46
 
    _KeyRefs,
47
41
    adapter_registry,
48
42
    AbsentContentFactory,
49
43
    ChunkedContentFactory,
83
77
 
84
78
    present_keys = []
85
79
    for prefix in sorted(per_prefix_map):
86
 
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
80
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
87
81
    return present_keys
88
82
 
89
83
 
107
101
    def __init__(self):
108
102
        # map by key? or just order in file?
109
103
        self._compressor_name = None
110
 
        self._z_content_chunks = None
 
104
        self._z_content = None
111
105
        self._z_content_decompressor = None
112
106
        self._z_content_length = None
113
107
        self._content_length = None
141
135
                self._content = ''.join(self._content_chunks)
142
136
                self._content_chunks = None
143
137
        if self._content is None:
144
 
            # We join self._z_content_chunks here, because if we are
145
 
            # decompressing, then it is *very* likely that we have a single
146
 
            # chunk
147
 
            if self._z_content_chunks is None:
 
138
            if self._z_content is None:
148
139
                raise AssertionError('No content to decompress')
149
 
            z_content = ''.join(self._z_content_chunks)
150
 
            if z_content == '':
 
140
            if self._z_content == '':
151
141
                self._content = ''
152
142
            elif self._compressor_name == 'lzma':
153
143
                # We don't do partial lzma decomp yet
154
 
                self._content = pylzma.decompress(z_content)
 
144
                self._content = pylzma.decompress(self._z_content)
155
145
            elif self._compressor_name == 'zlib':
156
146
                # Start a zlib decompressor
157
147
                if num_bytes * 4 > self._content_length * 3:
158
148
                    # If we are requesting more that 3/4ths of the content,
159
149
                    # just extract the whole thing in a single pass
160
150
                    num_bytes = self._content_length
161
 
                    self._content = zlib.decompress(z_content)
 
151
                    self._content = zlib.decompress(self._z_content)
162
152
                else:
163
153
                    self._z_content_decompressor = zlib.decompressobj()
164
154
                    # Seed the decompressor with the uncompressed bytes, so
165
155
                    # that the rest of the code is simplified
166
156
                    self._content = self._z_content_decompressor.decompress(
167
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
158
                    if not self._z_content_decompressor.unconsumed_tail:
169
159
                        self._z_content_decompressor = None
170
160
            else:
217
207
            # XXX: Define some GCCorrupt error ?
218
208
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
219
209
                                 (len(bytes), pos, self._z_content_length))
220
 
        self._z_content_chunks = (bytes[pos:],)
221
 
 
222
 
    @property
223
 
    def _z_content(self):
224
 
        """Return z_content_chunks as a simple string.
225
 
 
226
 
        Meant only to be used by the test suite.
227
 
        """
228
 
        if self._z_content_chunks is not None:
229
 
            return ''.join(self._z_content_chunks)
230
 
        return None
 
210
        self._z_content = bytes[pos:]
231
211
 
232
212
    @classmethod
233
213
    def from_bytes(cls, bytes):
289
269
        self._content_length = length
290
270
        self._content_chunks = content_chunks
291
271
        self._content = None
292
 
        self._z_content_chunks = None
 
272
        self._z_content = None
293
273
 
294
274
    def set_content(self, content):
295
275
        """Set the content of this block."""
296
276
        self._content_length = len(content)
297
277
        self._content = content
298
 
        self._z_content_chunks = None
 
278
        self._z_content = None
299
279
 
300
280
    def _create_z_content_using_lzma(self):
301
281
        if self._content_chunks is not None:
303
283
            self._content_chunks = None
304
284
        if self._content is None:
305
285
            raise AssertionError('Nothing to compress')
306
 
        z_content = pylzma.compress(self._content)
307
 
        self._z_content_chunks = (z_content,)
308
 
        self._z_content_length = len(z_content)
 
286
        self._z_content = pylzma.compress(self._content)
 
287
        self._z_content_length = len(self._z_content)
309
288
 
310
 
    def _create_z_content_from_chunks(self, chunks):
 
289
    def _create_z_content_from_chunks(self):
311
290
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
312
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
313
 
        # (measured peak is maybe 30MB over the above...)
314
 
        compressed_chunks = map(compressor.compress, chunks)
 
291
        compressed_chunks = map(compressor.compress, self._content_chunks)
315
292
        compressed_chunks.append(compressor.flush())
316
 
        # Ignore empty chunks
317
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
318
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
293
        self._z_content = ''.join(compressed_chunks)
 
294
        self._z_content_length = len(self._z_content)
319
295
 
320
296
    def _create_z_content(self):
321
 
        if self._z_content_chunks is not None:
 
297
        if self._z_content is not None:
322
298
            return
323
299
        if _USE_LZMA:
324
300
            self._create_z_content_using_lzma()
325
301
            return
326
302
        if self._content_chunks is not None:
327
 
            chunks = self._content_chunks
328
 
        else:
329
 
            chunks = (self._content,)
330
 
        self._create_z_content_from_chunks(chunks)
 
303
            self._create_z_content_from_chunks()
 
304
            return
 
305
        self._z_content = zlib.compress(self._content)
 
306
        self._z_content_length = len(self._z_content)
331
307
 
332
 
    def to_chunks(self):
333
 
        """Create the byte stream as a series of 'chunks'"""
 
308
    def to_bytes(self):
 
309
        """Encode the information into a byte stream."""
334
310
        self._create_z_content()
335
311
        if _USE_LZMA:
336
312
            header = self.GCB_LZ_HEADER
337
313
        else:
338
314
            header = self.GCB_HEADER
339
 
        chunks = ['%s%d\n%d\n'
340
 
                  % (header, self._z_content_length, self._content_length),
 
315
        chunks = [header,
 
316
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
317
                  self._z_content,
341
318
                 ]
342
 
        chunks.extend(self._z_content_chunks)
343
 
        total_len = sum(map(len, chunks))
344
 
        return total_len, chunks
345
 
 
346
 
    def to_bytes(self):
347
 
        """Encode the information into a byte stream."""
348
 
        total_len, chunks = self.to_chunks()
349
319
        return ''.join(chunks)
350
320
 
351
321
    def _dump(self, include_text=False):
709
679
        z_header_bytes = zlib.compress(header_bytes)
710
680
        del header_bytes
711
681
        z_header_bytes_len = len(z_header_bytes)
712
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
682
        block_bytes = self._block.to_bytes()
713
683
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
714
 
                                       block_bytes_len))
 
684
                                       len(block_bytes)))
715
685
        lines.append(z_header_bytes)
716
 
        lines.extend(block_chunks)
717
 
        del z_header_bytes, block_chunks
718
 
        # TODO: This is a point where we will double the memory consumption. To
719
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
686
        lines.append(block_bytes)
 
687
        del z_header_bytes, block_bytes
720
688
        return ''.join(lines)
721
689
 
722
690
    @classmethod
723
691
    def from_bytes(cls, bytes):
724
692
        # TODO: This does extra string copying, probably better to do it a
725
 
        #       different way. At a minimum this creates 2 copies of the
726
 
        #       compressed content
 
693
        #       different way
727
694
        (storage_kind, z_header_len, header_len,
728
695
         block_len, rest) = bytes.split('\n', 4)
729
696
        del bytes
887
854
 
888
855
        After calling this, the compressor should no longer be used
889
856
        """
 
857
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
858
        #       group. This has an impact for 'commit' of large objects.
 
859
        #       One possibility is to use self._content_chunks, and be lazy and
 
860
        #       only fill out self._content as a full string when we actually
 
861
        #       need it. That would at least drop the peak memory consumption
 
862
        #       for 'commit' down to ~1x the size of the largest file, at a
 
863
        #       cost of increased complexity within this code. 2x is still <<
 
864
        #       3x the size of the largest file, so we are doing ok.
890
865
        self._block.set_chunked_content(self.chunks, self.endpoint)
891
866
        self.chunks = None
892
867
        self._delta_index = None
1052
1027
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1053
1028
            add_callback=graph_index.add_nodes,
1054
1029
            inconsistency_fatal=inconsistency_fatal)
1055
 
        access = pack_repo._DirectPackAccess({})
 
1030
        access = knit._DirectPackAccess({})
1056
1031
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1057
1032
        result = GroupCompressVersionedFiles(index, access, delta)
1058
1033
        result.stream = stream
1192
1167
            _unadded_refs = {}
1193
1168
        self._unadded_refs = _unadded_refs
1194
1169
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1195
 
        self._immediate_fallback_vfs = []
 
1170
        self._fallback_vfs = []
1196
1171
 
1197
1172
    def without_fallbacks(self):
1198
1173
        """Return a clone of this object without any fallbacks configured."""
1272
1247
 
1273
1248
        :param a_versioned_files: A VersionedFiles object.
1274
1249
        """
1275
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1250
        self._fallback_vfs.append(a_versioned_files)
1276
1251
 
1277
1252
    def annotate(self, key):
1278
1253
        """See VersionedFiles.annotate."""
1318
1293
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1319
1294
        # ancestry.
1320
1295
        parent_map, missing_keys = self._index.find_ancestry(keys)
1321
 
        for fallback in self._transitive_fallbacks():
 
1296
        for fallback in self._fallback_vfs:
1322
1297
            if not missing_keys:
1323
1298
                break
1324
1299
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1348
1323
            and so on.
1349
1324
        """
1350
1325
        result = {}
1351
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1326
        sources = [self._index] + self._fallback_vfs
1352
1327
        source_results = []
1353
1328
        missing = set(keys)
1354
1329
        for source in sources:
1455
1430
        parent_map = {}
1456
1431
        key_to_source_map = {}
1457
1432
        source_results = []
1458
 
        for source in self._immediate_fallback_vfs:
 
1433
        for source in self._fallback_vfs:
1459
1434
            if not missing:
1460
1435
                break
1461
1436
            source_parents = source.get_parent_map(missing)
1476
1451
            the defined order, regardless of source.
1477
1452
        """
1478
1453
        if ordering == 'topological':
1479
 
            present_keys = tsort.topo_sort(parent_map)
 
1454
            present_keys = topo_sort(parent_map)
1480
1455
        else:
1481
1456
            # ordering == 'groupcompress'
1482
1457
            # XXX: This only optimizes for the target ordering. We may need
1655
1630
        self._unadded_refs = {}
1656
1631
        keys_to_add = []
1657
1632
        def flush():
1658
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1633
            bytes = self._compressor.flush().to_bytes()
1659
1634
            self._compressor = GroupCompressor()
1660
 
            # Note: At this point we still have 1 copy of the fulltext (in
1661
 
            #       record and the var 'bytes'), and this generates 2 copies of
1662
 
            #       the compressed text (one for bytes, one in chunks)
1663
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1664
 
            #       have to double compressed memory here
1665
 
            # TODO: Figure out how to indicate that we would be happy to free
1666
 
            #       the fulltext content at this point. Note that sometimes we
1667
 
            #       will want it later (streaming CHK pages), but most of the
1668
 
            #       time we won't (everything else)
1669
 
            bytes = ''.join(chunks)
1670
 
            del chunks
1671
1635
            index, start, length = self._access.add_raw_records(
1672
1636
                [(None, len(bytes))], bytes)[0]
1673
1637
            nodes = []
1838
1802
        """See VersionedFiles.keys."""
1839
1803
        if 'evil' in debug.debug_flags:
1840
1804
            trace.mutter_callsite(2, "keys scales with size of history")
1841
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1805
        sources = [self._index] + self._fallback_vfs
1842
1806
        result = set()
1843
1807
        for source in sources:
1844
1808
            result.update(source.keys())
1927
1891
        # repeated over and over, this creates a surplus of ints
1928
1892
        self._int_cache = {}
1929
1893
        if track_external_parent_refs:
1930
 
            self._key_dependencies = _KeyRefs(
 
1894
            self._key_dependencies = knit._KeyRefs(
1931
1895
                track_new_keys=track_new_keys)
1932
1896
        else:
1933
1897
            self._key_dependencies = None