~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2010-09-01 19:38:55 UTC
  • mto: This revision was merged to the branch mainline in revision 5405.
  • Revision ID: john@arbash-meinel.com-20100901193855-bazh9hh25cpm1986
Use the simpler form for the test server

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content_chunks = None
 
104
        self._z_content = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            # We join self._z_content_chunks here, because if we are
139
 
            # decompressing, then it is *very* likely that we have a single
140
 
            # chunk
141
 
            if self._z_content_chunks is None:
 
138
            if self._z_content is None:
142
139
                raise AssertionError('No content to decompress')
143
 
            z_content = ''.join(self._z_content_chunks)
144
 
            if z_content == '':
 
140
            if self._z_content == '':
145
141
                self._content = ''
146
142
            elif self._compressor_name == 'lzma':
147
143
                # We don't do partial lzma decomp yet
148
 
                self._content = pylzma.decompress(z_content)
 
144
                self._content = pylzma.decompress(self._z_content)
149
145
            elif self._compressor_name == 'zlib':
150
146
                # Start a zlib decompressor
151
147
                if num_bytes * 4 > self._content_length * 3:
152
148
                    # If we are requesting more that 3/4ths of the content,
153
149
                    # just extract the whole thing in a single pass
154
150
                    num_bytes = self._content_length
155
 
                    self._content = zlib.decompress(z_content)
 
151
                    self._content = zlib.decompress(self._z_content)
156
152
                else:
157
153
                    self._z_content_decompressor = zlib.decompressobj()
158
154
                    # Seed the decompressor with the uncompressed bytes, so
159
155
                    # that the rest of the code is simplified
160
156
                    self._content = self._z_content_decompressor.decompress(
161
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
158
                    if not self._z_content_decompressor.unconsumed_tail:
163
159
                        self._z_content_decompressor = None
164
160
            else:
211
207
            # XXX: Define some GCCorrupt error ?
212
208
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
209
                                 (len(bytes), pos, self._z_content_length))
214
 
        self._z_content_chunks = (bytes[pos:],)
215
 
 
216
 
    @property
217
 
    def _z_content(self):
218
 
        """Return z_content_chunks as a simple string.
219
 
 
220
 
        Meant only to be used by the test suite.
221
 
        """
222
 
        if self._z_content_chunks is not None:
223
 
            return ''.join(self._z_content_chunks)
224
 
        return None
 
210
        self._z_content = bytes[pos:]
225
211
 
226
212
    @classmethod
227
213
    def from_bytes(cls, bytes):
283
269
        self._content_length = length
284
270
        self._content_chunks = content_chunks
285
271
        self._content = None
286
 
        self._z_content_chunks = None
 
272
        self._z_content = None
287
273
 
288
274
    def set_content(self, content):
289
275
        """Set the content of this block."""
290
276
        self._content_length = len(content)
291
277
        self._content = content
292
 
        self._z_content_chunks = None
 
278
        self._z_content = None
293
279
 
294
280
    def _create_z_content_using_lzma(self):
295
281
        if self._content_chunks is not None:
297
283
            self._content_chunks = None
298
284
        if self._content is None:
299
285
            raise AssertionError('Nothing to compress')
300
 
        z_content = pylzma.compress(self._content)
301
 
        self._z_content_chunks = (z_content,)
302
 
        self._z_content_length = len(z_content)
 
286
        self._z_content = pylzma.compress(self._content)
 
287
        self._z_content_length = len(self._z_content)
303
288
 
304
 
    def _create_z_content_from_chunks(self, chunks):
 
289
    def _create_z_content_from_chunks(self):
305
290
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
 
        # (measured peak is maybe 30MB over the above...)
308
 
        compressed_chunks = map(compressor.compress, chunks)
 
291
        compressed_chunks = map(compressor.compress, self._content_chunks)
309
292
        compressed_chunks.append(compressor.flush())
310
 
        # Ignore empty chunks
311
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
312
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
293
        self._z_content = ''.join(compressed_chunks)
 
294
        self._z_content_length = len(self._z_content)
313
295
 
314
296
    def _create_z_content(self):
315
 
        if self._z_content_chunks is not None:
 
297
        if self._z_content is not None:
316
298
            return
317
299
        if _USE_LZMA:
318
300
            self._create_z_content_using_lzma()
319
301
            return
320
302
        if self._content_chunks is not None:
321
 
            chunks = self._content_chunks
322
 
        else:
323
 
            chunks = (self._content,)
324
 
        self._create_z_content_from_chunks(chunks)
 
303
            self._create_z_content_from_chunks()
 
304
            return
 
305
        self._z_content = zlib.compress(self._content)
 
306
        self._z_content_length = len(self._z_content)
325
307
 
326
 
    def to_chunks(self):
327
 
        """Create the byte stream as a series of 'chunks'"""
 
308
    def to_bytes(self):
 
309
        """Encode the information into a byte stream."""
328
310
        self._create_z_content()
329
311
        if _USE_LZMA:
330
312
            header = self.GCB_LZ_HEADER
331
313
        else:
332
314
            header = self.GCB_HEADER
333
 
        chunks = ['%s%d\n%d\n'
334
 
                  % (header, self._z_content_length, self._content_length),
 
315
        chunks = [header,
 
316
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
317
                  self._z_content,
335
318
                 ]
336
 
        chunks.extend(self._z_content_chunks)
337
 
        total_len = sum(map(len, chunks))
338
 
        return total_len, chunks
339
 
 
340
 
    def to_bytes(self):
341
 
        """Encode the information into a byte stream."""
342
 
        total_len, chunks = self.to_chunks()
343
319
        return ''.join(chunks)
344
320
 
345
321
    def _dump(self, include_text=False):
703
679
        z_header_bytes = zlib.compress(header_bytes)
704
680
        del header_bytes
705
681
        z_header_bytes_len = len(z_header_bytes)
706
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
682
        block_bytes = self._block.to_bytes()
707
683
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
708
 
                                       block_bytes_len))
 
684
                                       len(block_bytes)))
709
685
        lines.append(z_header_bytes)
710
 
        lines.extend(block_chunks)
711
 
        del z_header_bytes, block_chunks
712
 
        # TODO: This is a point where we will double the memory consumption. To
713
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
686
        lines.append(block_bytes)
 
687
        del z_header_bytes, block_bytes
714
688
        return ''.join(lines)
715
689
 
716
690
    @classmethod
717
691
    def from_bytes(cls, bytes):
718
692
        # TODO: This does extra string copying, probably better to do it a
719
 
        #       different way. At a minimum this creates 2 copies of the
720
 
        #       compressed content
 
693
        #       different way
721
694
        (storage_kind, z_header_len, header_len,
722
695
         block_len, rest) = bytes.split('\n', 4)
723
696
        del bytes
881
854
 
882
855
        After calling this, the compressor should no longer be used
883
856
        """
 
857
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
858
        #       group. This has an impact for 'commit' of large objects.
 
859
        #       One possibility is to use self._content_chunks, and be lazy and
 
860
        #       only fill out self._content as a full string when we actually
 
861
        #       need it. That would at least drop the peak memory consumption
 
862
        #       for 'commit' down to ~1x the size of the largest file, at a
 
863
        #       cost of increased complexity within this code. 2x is still <<
 
864
        #       3x the size of the largest file, so we are doing ok.
884
865
        self._block.set_chunked_content(self.chunks, self.endpoint)
885
866
        self.chunks = None
886
867
        self._delta_index = None
1649
1630
        self._unadded_refs = {}
1650
1631
        keys_to_add = []
1651
1632
        def flush():
1652
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1633
            bytes = self._compressor.flush().to_bytes()
1653
1634
            self._compressor = GroupCompressor()
1654
 
            # Note: At this point we still have 1 copy of the fulltext (in
1655
 
            #       record and the var 'bytes'), and this generates 2 copies of
1656
 
            #       the compressed text (one for bytes, one in chunks)
1657
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1658
 
            #       have to double compressed memory here
1659
 
            # TODO: Figure out how to indicate that we would be happy to free
1660
 
            #       the fulltext content at this point. Note that sometimes we
1661
 
            #       will want it later (streaming CHK pages), but most of the
1662
 
            #       time we won't (everything else)
1663
 
            bytes = ''.join(chunks)
1664
 
            del chunks
1665
1635
            index, start, length = self._access.add_raw_records(
1666
1636
                [(None, len(bytes))], bytes)[0]
1667
1637
            nodes = []