~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Andrew Bennetts
  • Date: 2010-10-13 00:26:41 UTC
  • mto: This revision was merged to the branch mainline in revision 5498.
  • Revision ID: andrew.bennetts@canonical.com-20101013002641-9tlh9k89mlj1666m
Keep docs-plain working.

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content = None
 
104
        self._z_content_chunks = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            if self._z_content is None:
 
138
            # We join self._z_content_chunks here, because if we are
 
139
            # decompressing, then it is *very* likely that we have a single
 
140
            # chunk
 
141
            if self._z_content_chunks is None:
139
142
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
143
            z_content = ''.join(self._z_content_chunks)
 
144
            if z_content == '':
141
145
                self._content = ''
142
146
            elif self._compressor_name == 'lzma':
143
147
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
148
                self._content = pylzma.decompress(z_content)
145
149
            elif self._compressor_name == 'zlib':
146
150
                # Start a zlib decompressor
147
151
                if num_bytes * 4 > self._content_length * 3:
148
152
                    # If we are requesting more that 3/4ths of the content,
149
153
                    # just extract the whole thing in a single pass
150
154
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
155
                    self._content = zlib.decompress(z_content)
152
156
                else:
153
157
                    self._z_content_decompressor = zlib.decompressobj()
154
158
                    # Seed the decompressor with the uncompressed bytes, so
155
159
                    # that the rest of the code is simplified
156
160
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
161
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
                    if not self._z_content_decompressor.unconsumed_tail:
159
163
                        self._z_content_decompressor = None
160
164
            else:
207
211
            # XXX: Define some GCCorrupt error ?
208
212
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
214
        self._z_content_chunks = (bytes[pos:],)
 
215
 
 
216
    @property
 
217
    def _z_content(self):
 
218
        """Return z_content_chunks as a simple string.
 
219
 
 
220
        Meant only to be used by the test suite.
 
221
        """
 
222
        if self._z_content_chunks is not None:
 
223
            return ''.join(self._z_content_chunks)
 
224
        return None
211
225
 
212
226
    @classmethod
213
227
    def from_bytes(cls, bytes):
269
283
        self._content_length = length
270
284
        self._content_chunks = content_chunks
271
285
        self._content = None
272
 
        self._z_content = None
 
286
        self._z_content_chunks = None
273
287
 
274
288
    def set_content(self, content):
275
289
        """Set the content of this block."""
276
290
        self._content_length = len(content)
277
291
        self._content = content
278
 
        self._z_content = None
 
292
        self._z_content_chunks = None
279
293
 
280
294
    def _create_z_content_using_lzma(self):
281
295
        if self._content_chunks is not None:
283
297
            self._content_chunks = None
284
298
        if self._content is None:
285
299
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
300
        z_content = pylzma.compress(self._content)
 
301
        self._z_content_chunks = (z_content,)
 
302
        self._z_content_length = len(z_content)
288
303
 
289
 
    def _create_z_content_from_chunks(self):
 
304
    def _create_z_content_from_chunks(self, chunks):
290
305
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
306
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
307
        # (measured peak is maybe 30MB over the above...)
 
308
        compressed_chunks = map(compressor.compress, chunks)
292
309
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
310
        # Ignore empty chunks
 
311
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
312
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
313
 
296
314
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
315
        if self._z_content_chunks is not None:
298
316
            return
299
317
        if _USE_LZMA:
300
318
            self._create_z_content_using_lzma()
301
319
            return
302
320
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
321
            chunks = self._content_chunks
 
322
        else:
 
323
            chunks = (self._content,)
 
324
        self._create_z_content_from_chunks(chunks)
307
325
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
326
    def to_chunks(self):
 
327
        """Create the byte stream as a series of 'chunks'"""
310
328
        self._create_z_content()
311
329
        if _USE_LZMA:
312
330
            header = self.GCB_LZ_HEADER
313
331
        else:
314
332
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
333
        chunks = ['%s%d\n%d\n'
 
334
                  % (header, self._z_content_length, self._content_length),
318
335
                 ]
 
336
        chunks.extend(self._z_content_chunks)
 
337
        total_len = sum(map(len, chunks))
 
338
        return total_len, chunks
 
339
 
 
340
    def to_bytes(self):
 
341
        """Encode the information into a byte stream."""
 
342
        total_len, chunks = self.to_chunks()
319
343
        return ''.join(chunks)
320
344
 
321
345
    def _dump(self, include_text=False):
679
703
        z_header_bytes = zlib.compress(header_bytes)
680
704
        del header_bytes
681
705
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
706
        block_bytes_len, block_chunks = self._block.to_chunks()
683
707
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
708
                                       block_bytes_len))
685
709
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
710
        lines.extend(block_chunks)
 
711
        del z_header_bytes, block_chunks
 
712
        # TODO: This is a point where we will double the memory consumption. To
 
713
        #       avoid this, we probably have to switch to a 'chunked' api
688
714
        return ''.join(lines)
689
715
 
690
716
    @classmethod
691
717
    def from_bytes(cls, bytes):
692
718
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
719
        #       different way. At a minimum this creates 2 copies of the
 
720
        #       compressed content
694
721
        (storage_kind, z_header_len, header_len,
695
722
         block_len, rest) = bytes.split('\n', 4)
696
723
        del bytes
854
881
 
855
882
        After calling this, the compressor should no longer be used
856
883
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
884
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
885
        self.chunks = None
867
886
        self._delta_index = None
1630
1649
        self._unadded_refs = {}
1631
1650
        keys_to_add = []
1632
1651
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1652
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1653
            self._compressor = GroupCompressor()
 
1654
            # Note: At this point we still have 1 copy of the fulltext (in
 
1655
            #       record and the var 'bytes'), and this generates 2 copies of
 
1656
            #       the compressed text (one for bytes, one in chunks)
 
1657
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1658
            #       have to double compressed memory here
 
1659
            # TODO: Figure out how to indicate that we would be happy to free
 
1660
            #       the fulltext content at this point. Note that sometimes we
 
1661
            #       will want it later (streaming CHK pages), but most of the
 
1662
            #       time we won't (everything else)
 
1663
            bytes = ''.join(chunks)
 
1664
            del chunks
1635
1665
            index, start, length = self._access.add_raw_records(
1636
1666
                [(None, len(bytes))], bytes)[0]
1637
1667
            nodes = []
1809
1839
        return result
1810
1840
 
1811
1841
 
 
1842
class _GCBuildDetails(object):
 
1843
    """A blob of data about the build details.
 
1844
 
 
1845
    This stores the minimal data, which then allows compatibility with the old
 
1846
    api, without taking as much memory.
 
1847
    """
 
1848
 
 
1849
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1850
                 '_delta_end', '_parents')
 
1851
 
 
1852
    method = 'group'
 
1853
    compression_parent = None
 
1854
 
 
1855
    def __init__(self, parents, position_info):
 
1856
        self._parents = parents
 
1857
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1858
         self._delta_end) = position_info
 
1859
 
 
1860
    def __repr__(self):
 
1861
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1862
            self.index_memo, self._parents)
 
1863
 
 
1864
    @property
 
1865
    def index_memo(self):
 
1866
        return (self._index, self._group_start, self._group_end,
 
1867
                self._basis_end, self._delta_end)
 
1868
 
 
1869
    @property
 
1870
    def record_details(self):
 
1871
        return static_tuple.StaticTuple(self.method, None)
 
1872
 
 
1873
    def __getitem__(self, offset):
 
1874
        """Compatibility thunk to act like a tuple."""
 
1875
        if offset == 0:
 
1876
            return self.index_memo
 
1877
        elif offset == 1:
 
1878
            return self.compression_parent # Always None
 
1879
        elif offset == 2:
 
1880
            return self._parents
 
1881
        elif offset == 3:
 
1882
            return self.record_details
 
1883
        else:
 
1884
            raise IndexError('offset out of range')
 
1885
            
 
1886
    def __len__(self):
 
1887
        return 4
 
1888
 
 
1889
 
1812
1890
class _GCGraphIndex(object):
1813
1891
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1892
 
2009
2087
                parents = None
2010
2088
            else:
2011
2089
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2090
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2091
            result[key] = details
2015
2092
        return result
2016
2093
 
2017
2094
    def keys(self):
2033
2110
        # each, or about 7MB. Note that it might be even more when you consider
2034
2111
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2112
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2113
        # a LIFO when re-using PyInt slots, which might cause more
2037
2114
        # fragmentation.
2038
2115
        start = int(bits[0])
2039
2116
        start = self._int_cache.setdefault(start, start)