~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2010-09-21 19:30:33 UTC
  • mto: This revision was merged to the branch mainline in revision 5451.
  • Revision ID: john@arbash-meinel.com-20100921193033-9ftw56og72mhlwo4
Change GroupCompressBlock to work in self._z_compress_chunks

This pushes down one of the peak memory locations. We still have a requirement
during commit of 1 fulltext + 2 compressed texts, but at least this code
path is now better about only using 1 fulltext and 1 compressed text.
We need to push this into more apis to get a bigger benefit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content = None
 
104
        self._z_content_chunks = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            if self._z_content is None:
 
138
            # We join self._z_content_chunks here, because if we are
 
139
            # decompressing, then it is *very* likely that we have a single
 
140
            # chunk
 
141
            if self._z_content_chunks is None:
139
142
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
143
            z_content = ''.join(self._z_content_chunks)
 
144
            if z_content == '':
141
145
                self._content = ''
142
146
            elif self._compressor_name == 'lzma':
143
147
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
148
                self._content = pylzma.decompress(z_content)
145
149
            elif self._compressor_name == 'zlib':
146
150
                # Start a zlib decompressor
147
151
                if num_bytes * 4 > self._content_length * 3:
148
152
                    # If we are requesting more that 3/4ths of the content,
149
153
                    # just extract the whole thing in a single pass
150
154
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
155
                    self._content = zlib.decompress(z_content)
152
156
                else:
153
157
                    self._z_content_decompressor = zlib.decompressobj()
154
158
                    # Seed the decompressor with the uncompressed bytes, so
155
159
                    # that the rest of the code is simplified
156
160
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
161
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
                    if not self._z_content_decompressor.unconsumed_tail:
159
163
                        self._z_content_decompressor = None
160
164
            else:
207
211
            # XXX: Define some GCCorrupt error ?
208
212
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
214
        self._z_content_chunks = (bytes[pos:],)
 
215
 
 
216
    @property
 
217
    def _z_content(self):
 
218
        if self._z_content_chunks is not None:
 
219
            return ''.join(self._z_content_chunks)
 
220
        return None
211
221
 
212
222
    @classmethod
213
223
    def from_bytes(cls, bytes):
269
279
        self._content_length = length
270
280
        self._content_chunks = content_chunks
271
281
        self._content = None
272
 
        self._z_content = None
 
282
        self._z_content_chunks = None
273
283
 
274
284
    def set_content(self, content):
275
285
        """Set the content of this block."""
276
286
        self._content_length = len(content)
277
287
        self._content = content
278
 
        self._z_content = None
 
288
        self._z_content_chunks = None
279
289
 
280
290
    def _create_z_content_using_lzma(self):
281
291
        if self._content_chunks is not None:
283
293
            self._content_chunks = None
284
294
        if self._content is None:
285
295
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
296
        z_content = pylzma.compress(self._content)
 
297
        self._z_content_chunks = (z_content,)
 
298
        self._z_content_length = len(z_content)
288
299
 
289
 
    def _create_z_content_from_chunks(self):
 
300
    def _create_z_content_from_chunks(self, chunks):
290
301
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
302
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
303
        # (measured peak is maybe 30MB over the above...)
 
304
        compressed_chunks = map(compressor.compress, chunks)
292
305
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
306
        # Ignore empty chunks
 
307
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
308
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
309
 
296
310
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
311
        if self._z_content_chunks is not None:
298
312
            return
299
313
        if _USE_LZMA:
300
314
            self._create_z_content_using_lzma()
301
315
            return
302
316
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
317
            chunks = self._content_chunks
 
318
        else:
 
319
            chunks = (self._content,)
 
320
        self._create_z_content_from_chunks(chunks)
307
321
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
322
    def to_chunks(self):
 
323
        """Create the byte stream as a series of 'chunks'"""
310
324
        self._create_z_content()
311
325
        if _USE_LZMA:
312
326
            header = self.GCB_LZ_HEADER
313
327
        else:
314
328
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
329
        chunks = ['%s%d\n%d\n'
 
330
                  % (header, self._z_content_length, self._content_length),
318
331
                 ]
 
332
        chunks.extend(self._z_content_chunks)
 
333
        total_len = sum(map(len, chunks))
 
334
        return total_len, chunks
 
335
 
 
336
    def to_bytes(self):
 
337
        """Encode the information into a byte stream."""
 
338
        total_len, chunks = self.to_chunks()
319
339
        return ''.join(chunks)
320
340
 
321
341
    def _dump(self, include_text=False):
679
699
        z_header_bytes = zlib.compress(header_bytes)
680
700
        del header_bytes
681
701
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
702
        block_bytes_len, block_chunks = self._block.to_chunks()
683
703
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
704
                                       block_bytes_len))
685
705
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
706
        lines.extend(block_chunks)
 
707
        del z_header_bytes, block_chunks
 
708
        # TODO: This is a point where we will double the memory consumption. To
 
709
        #       avoid this, we probably have to switch to a 'chunked' api
688
710
        return ''.join(lines)
689
711
 
690
712
    @classmethod
691
713
    def from_bytes(cls, bytes):
692
714
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
715
        #       different way. At a minimum this creates 2 copies of the
 
716
        #       compressed content
694
717
        (storage_kind, z_header_len, header_len,
695
718
         block_len, rest) = bytes.split('\n', 4)
696
719
        del bytes
854
877
 
855
878
        After calling this, the compressor should no longer be used
856
879
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
880
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
881
        self.chunks = None
867
882
        self._delta_index = None
1630
1645
        self._unadded_refs = {}
1631
1646
        keys_to_add = []
1632
1647
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1648
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1649
            self._compressor = GroupCompressor()
 
1650
            # Note: At this point we still have 1 copy of the fulltext (in
 
1651
            #       record and the var 'bytes'), and this generates 2 copies of
 
1652
            #       the compressed text (one for bytes, one in chunks)
 
1653
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1654
            #       have to double compressed memory here
 
1655
            # TODO: Figure out how to indicate that we would be happy to free
 
1656
            #       the fulltext content at this point. Note that sometimes we
 
1657
            #       will want it later (streaming CHK pages), but most of the
 
1658
            #       time we won't (everything else)
 
1659
            bytes = ''.join(chunks)
 
1660
            del chunks
1635
1661
            index, start, length = self._access.add_raw_records(
1636
1662
                [(None, len(bytes))], bytes)[0]
1637
1663
            nodes = []