~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-29 22:03:03 UTC
  • mfrom: (5416.2.6 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20100929220303-cr95h8iwtggco721
(mbp) Add 'break-lock --force'

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content = None
 
104
        self._z_content_chunks = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            if self._z_content is None:
 
138
            # We join self._z_content_chunks here, because if we are
 
139
            # decompressing, then it is *very* likely that we have a single
 
140
            # chunk
 
141
            if self._z_content_chunks is None:
139
142
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
143
            z_content = ''.join(self._z_content_chunks)
 
144
            if z_content == '':
141
145
                self._content = ''
142
146
            elif self._compressor_name == 'lzma':
143
147
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
148
                self._content = pylzma.decompress(z_content)
145
149
            elif self._compressor_name == 'zlib':
146
150
                # Start a zlib decompressor
147
151
                if num_bytes * 4 > self._content_length * 3:
148
152
                    # If we are requesting more that 3/4ths of the content,
149
153
                    # just extract the whole thing in a single pass
150
154
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
155
                    self._content = zlib.decompress(z_content)
152
156
                else:
153
157
                    self._z_content_decompressor = zlib.decompressobj()
154
158
                    # Seed the decompressor with the uncompressed bytes, so
155
159
                    # that the rest of the code is simplified
156
160
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
161
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
                    if not self._z_content_decompressor.unconsumed_tail:
159
163
                        self._z_content_decompressor = None
160
164
            else:
207
211
            # XXX: Define some GCCorrupt error ?
208
212
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
214
        self._z_content_chunks = (bytes[pos:],)
 
215
 
 
216
    @property
 
217
    def _z_content(self):
 
218
        """Return z_content_chunks as a simple string.
 
219
 
 
220
        Meant only to be used by the test suite.
 
221
        """
 
222
        if self._z_content_chunks is not None:
 
223
            return ''.join(self._z_content_chunks)
 
224
        return None
211
225
 
212
226
    @classmethod
213
227
    def from_bytes(cls, bytes):
269
283
        self._content_length = length
270
284
        self._content_chunks = content_chunks
271
285
        self._content = None
272
 
        self._z_content = None
 
286
        self._z_content_chunks = None
273
287
 
274
288
    def set_content(self, content):
275
289
        """Set the content of this block."""
276
290
        self._content_length = len(content)
277
291
        self._content = content
278
 
        self._z_content = None
 
292
        self._z_content_chunks = None
279
293
 
280
294
    def _create_z_content_using_lzma(self):
281
295
        if self._content_chunks is not None:
283
297
            self._content_chunks = None
284
298
        if self._content is None:
285
299
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
300
        z_content = pylzma.compress(self._content)
 
301
        self._z_content_chunks = (z_content,)
 
302
        self._z_content_length = len(z_content)
288
303
 
289
 
    def _create_z_content_from_chunks(self):
 
304
    def _create_z_content_from_chunks(self, chunks):
290
305
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
306
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
307
        # (measured peak is maybe 30MB over the above...)
 
308
        compressed_chunks = map(compressor.compress, chunks)
292
309
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
310
        # Ignore empty chunks
 
311
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
312
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
313
 
296
314
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
315
        if self._z_content_chunks is not None:
298
316
            return
299
317
        if _USE_LZMA:
300
318
            self._create_z_content_using_lzma()
301
319
            return
302
320
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
321
            chunks = self._content_chunks
 
322
        else:
 
323
            chunks = (self._content,)
 
324
        self._create_z_content_from_chunks(chunks)
307
325
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
326
    def to_chunks(self):
 
327
        """Create the byte stream as a series of 'chunks'"""
310
328
        self._create_z_content()
311
329
        if _USE_LZMA:
312
330
            header = self.GCB_LZ_HEADER
313
331
        else:
314
332
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
333
        chunks = ['%s%d\n%d\n'
 
334
                  % (header, self._z_content_length, self._content_length),
318
335
                 ]
 
336
        chunks.extend(self._z_content_chunks)
 
337
        total_len = sum(map(len, chunks))
 
338
        return total_len, chunks
 
339
 
 
340
    def to_bytes(self):
 
341
        """Encode the information into a byte stream."""
 
342
        total_len, chunks = self.to_chunks()
319
343
        return ''.join(chunks)
320
344
 
321
345
    def _dump(self, include_text=False):
679
703
        z_header_bytes = zlib.compress(header_bytes)
680
704
        del header_bytes
681
705
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
706
        block_bytes_len, block_chunks = self._block.to_chunks()
683
707
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
708
                                       block_bytes_len))
685
709
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
710
        lines.extend(block_chunks)
 
711
        del z_header_bytes, block_chunks
 
712
        # TODO: This is a point where we will double the memory consumption. To
 
713
        #       avoid this, we probably have to switch to a 'chunked' api
688
714
        return ''.join(lines)
689
715
 
690
716
    @classmethod
691
717
    def from_bytes(cls, bytes):
692
718
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
719
        #       different way. At a minimum this creates 2 copies of the
 
720
        #       compressed content
694
721
        (storage_kind, z_header_len, header_len,
695
722
         block_len, rest) = bytes.split('\n', 4)
696
723
        del bytes
854
881
 
855
882
        After calling this, the compressor should no longer be used
856
883
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
884
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
885
        self.chunks = None
867
886
        self._delta_index = None
1630
1649
        self._unadded_refs = {}
1631
1650
        keys_to_add = []
1632
1651
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1652
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1653
            self._compressor = GroupCompressor()
 
1654
            # Note: At this point we still have 1 copy of the fulltext (in
 
1655
            #       record and the var 'bytes'), and this generates 2 copies of
 
1656
            #       the compressed text (one for bytes, one in chunks)
 
1657
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1658
            #       have to double compressed memory here
 
1659
            # TODO: Figure out how to indicate that we would be happy to free
 
1660
            #       the fulltext content at this point. Note that sometimes we
 
1661
            #       will want it later (streaming CHK pages), but most of the
 
1662
            #       time we won't (everything else)
 
1663
            bytes = ''.join(chunks)
 
1664
            del chunks
1635
1665
            index, start, length = self._access.add_raw_records(
1636
1666
                [(None, len(bytes))], bytes)[0]
1637
1667
            nodes = []