~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2010-09-21 19:30:33 UTC
  • mto: This revision was merged to the branch mainline in revision 5451.
  • Revision ID: john@arbash-meinel.com-20100921193033-9ftw56og72mhlwo4
Change GroupCompressBlock to work in self._z_compress_chunks

This pushes down one of the peak memory locations. We still have a requirement
during commit of 1 fulltext + 2 compressed texts, but at least this code
path is now better about only using 1 fulltext and 1 compressed text.
We need to push this into more apis to get a bigger benefit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content = None
 
104
        self._z_content_chunks = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            if self._z_content is None:
 
138
            # We join self._z_content_chunks here, because if we are
 
139
            # decompressing, then it is *very* likely that we have a single
 
140
            # chunk
 
141
            if self._z_content_chunks is None:
139
142
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
143
            z_content = ''.join(self._z_content_chunks)
 
144
            if z_content == '':
141
145
                self._content = ''
142
146
            elif self._compressor_name == 'lzma':
143
147
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
148
                self._content = pylzma.decompress(z_content)
145
149
            elif self._compressor_name == 'zlib':
146
150
                # Start a zlib decompressor
147
151
                if num_bytes * 4 > self._content_length * 3:
148
152
                    # If we are requesting more that 3/4ths of the content,
149
153
                    # just extract the whole thing in a single pass
150
154
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
155
                    self._content = zlib.decompress(z_content)
152
156
                else:
153
157
                    self._z_content_decompressor = zlib.decompressobj()
154
158
                    # Seed the decompressor with the uncompressed bytes, so
155
159
                    # that the rest of the code is simplified
156
160
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
161
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
                    if not self._z_content_decompressor.unconsumed_tail:
159
163
                        self._z_content_decompressor = None
160
164
            else:
207
211
            # XXX: Define some GCCorrupt error ?
208
212
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
214
        self._z_content_chunks = (bytes[pos:],)
 
215
 
 
216
    @property
 
217
    def _z_content(self):
 
218
        if self._z_content_chunks is not None:
 
219
            return ''.join(self._z_content_chunks)
 
220
        return None
211
221
 
212
222
    @classmethod
213
223
    def from_bytes(cls, bytes):
269
279
        self._content_length = length
270
280
        self._content_chunks = content_chunks
271
281
        self._content = None
272
 
        self._z_content = None
 
282
        self._z_content_chunks = None
273
283
 
274
284
    def set_content(self, content):
275
285
        """Set the content of this block."""
276
286
        self._content_length = len(content)
277
287
        self._content = content
278
 
        self._z_content = None
 
288
        self._z_content_chunks = None
279
289
 
280
290
    def _create_z_content_using_lzma(self):
281
291
        if self._content_chunks is not None:
283
293
            self._content_chunks = None
284
294
        if self._content is None:
285
295
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
296
        z_content = pylzma.compress(self._content)
 
297
        self._z_content_chunks = (z_content,)
 
298
        self._z_content_length = len(z_content)
288
299
 
289
 
    def _create_z_content_from_chunks(self):
 
300
    def _create_z_content_from_chunks(self, chunks):
290
301
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
302
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
303
        # (measured peak is maybe 30MB over the above...)
 
304
        compressed_chunks = map(compressor.compress, chunks)
292
305
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
306
        # Ignore empty chunks
 
307
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
308
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
309
 
296
310
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
311
        if self._z_content_chunks is not None:
298
312
            return
299
313
        if _USE_LZMA:
300
314
            self._create_z_content_using_lzma()
301
315
            return
302
316
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
317
            chunks = self._content_chunks
 
318
        else:
 
319
            chunks = (self._content,)
 
320
        self._create_z_content_from_chunks(chunks)
307
321
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
322
    def to_chunks(self):
 
323
        """Create the byte stream as a series of 'chunks'"""
310
324
        self._create_z_content()
311
325
        if _USE_LZMA:
312
326
            header = self.GCB_LZ_HEADER
313
327
        else:
314
328
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
329
        chunks = ['%s%d\n%d\n'
 
330
                  % (header, self._z_content_length, self._content_length),
318
331
                 ]
 
332
        chunks.extend(self._z_content_chunks)
 
333
        total_len = sum(map(len, chunks))
 
334
        return total_len, chunks
 
335
 
 
336
    def to_bytes(self):
 
337
        """Encode the information into a byte stream."""
 
338
        total_len, chunks = self.to_chunks()
319
339
        return ''.join(chunks)
320
340
 
321
341
    def _dump(self, include_text=False):
679
699
        z_header_bytes = zlib.compress(header_bytes)
680
700
        del header_bytes
681
701
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
702
        block_bytes_len, block_chunks = self._block.to_chunks()
683
703
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
704
                                       block_bytes_len))
685
705
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
706
        lines.extend(block_chunks)
 
707
        del z_header_bytes, block_chunks
 
708
        # TODO: This is a point where we will double the memory consumption. To
 
709
        #       avoid this, we probably have to switch to a 'chunked' api
688
710
        return ''.join(lines)
689
711
 
690
712
    @classmethod
691
713
    def from_bytes(cls, bytes):
692
714
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
715
        #       different way. At a minimum this creates 2 copies of the
 
716
        #       compressed content
694
717
        (storage_kind, z_header_len, header_len,
695
718
         block_len, rest) = bytes.split('\n', 4)
696
719
        del bytes
854
877
 
855
878
        After calling this, the compressor should no longer be used
856
879
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
880
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
881
        self.chunks = None
867
882
        self._delta_index = None
1630
1645
        self._unadded_refs = {}
1631
1646
        keys_to_add = []
1632
1647
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1648
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1649
            self._compressor = GroupCompressor()
 
1650
            # Note: At this point we still have 1 copy of the fulltext (in
 
1651
            #       record and the var 'bytes'), and this generates 2 copies of
 
1652
            #       the compressed text (one for bytes, one in chunks)
 
1653
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1654
            #       have to double compressed memory here
 
1655
            # TODO: Figure out how to indicate that we would be happy to free
 
1656
            #       the fulltext content at this point. Note that sometimes we
 
1657
            #       will want it later (streaming CHK pages), but most of the
 
1658
            #       time we won't (everything else)
 
1659
            bytes = ''.join(chunks)
 
1660
            del chunks
1635
1661
            index, start, length = self._access.add_raw_records(
1636
1662
                [(None, len(bytes))], bytes)[0]
1637
1663
            nodes = []
1809
1835
        return result
1810
1836
 
1811
1837
 
 
1838
class _GCBuildDetails(object):
 
1839
    """A blob of data about the build details.
 
1840
 
 
1841
    This stores the minimal data, which then allows compatibility with the old
 
1842
    api, without taking as much memory.
 
1843
    """
 
1844
 
 
1845
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1846
                 '_delta_end', '_parents')
 
1847
 
 
1848
    method = 'group'
 
1849
    compression_parent = None
 
1850
 
 
1851
    def __init__(self, parents, position_info):
 
1852
        self._parents = parents
 
1853
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1854
         self._delta_end) = position_info
 
1855
 
 
1856
    def __repr__(self):
 
1857
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1858
            self.index_memo, self._parents)
 
1859
 
 
1860
    @property
 
1861
    def index_memo(self):
 
1862
        return (self._index, self._group_start, self._group_end,
 
1863
                self._basis_end, self._delta_end)
 
1864
 
 
1865
    @property
 
1866
    def record_details(self):
 
1867
        return static_tuple.StaticTuple(self.method, None)
 
1868
 
 
1869
    def __getitem__(self, offset):
 
1870
        """Compatibility thunk to act like a tuple."""
 
1871
        if offset == 0:
 
1872
            return self.index_memo
 
1873
        elif offset == 1:
 
1874
            return self.compression_parent # Always None
 
1875
        elif offset == 2:
 
1876
            return self._parents
 
1877
        elif offset == 3:
 
1878
            return self.record_details
 
1879
        else:
 
1880
            raise IndexError('offset out of range')
 
1881
            
 
1882
    def __len__(self):
 
1883
        return 4
 
1884
 
 
1885
 
1812
1886
class _GCGraphIndex(object):
1813
1887
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1888
 
2009
2083
                parents = None
2010
2084
            else:
2011
2085
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2086
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2087
            result[key] = details
2015
2088
        return result
2016
2089
 
2017
2090
    def keys(self):
2033
2106
        # each, or about 7MB. Note that it might be even more when you consider
2034
2107
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2108
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2109
        # a LIFO when re-using PyInt slots, which might cause more
2037
2110
        # fragmentation.
2038
2111
        start = int(bits[0])
2039
2112
        start = self._int_cache.setdefault(start, start)