~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Gary van der Merwe
  • Date: 2010-08-02 19:56:52 UTC
  • mfrom: (5050.3.18 2.2)
  • mto: (5050.3.19 2.2)
  • mto: This revision was merged to the branch mainline in revision 5371.
  • Revision ID: garyvdm@gmail.com-20100802195652-o1ppjemhwrr98i61
MergeĀ lp:bzr/2.2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    def __init__(self):
102
102
        # map by key? or just order in file?
103
103
        self._compressor_name = None
104
 
        self._z_content_chunks = None
 
104
        self._z_content = None
105
105
        self._z_content_decompressor = None
106
106
        self._z_content_length = None
107
107
        self._content_length = None
135
135
                self._content = ''.join(self._content_chunks)
136
136
                self._content_chunks = None
137
137
        if self._content is None:
138
 
            # We join self._z_content_chunks here, because if we are
139
 
            # decompressing, then it is *very* likely that we have a single
140
 
            # chunk
141
 
            if self._z_content_chunks is None:
 
138
            if self._z_content is None:
142
139
                raise AssertionError('No content to decompress')
143
 
            z_content = ''.join(self._z_content_chunks)
144
 
            if z_content == '':
 
140
            if self._z_content == '':
145
141
                self._content = ''
146
142
            elif self._compressor_name == 'lzma':
147
143
                # We don't do partial lzma decomp yet
148
 
                self._content = pylzma.decompress(z_content)
 
144
                self._content = pylzma.decompress(self._z_content)
149
145
            elif self._compressor_name == 'zlib':
150
146
                # Start a zlib decompressor
151
147
                if num_bytes * 4 > self._content_length * 3:
152
148
                    # If we are requesting more that 3/4ths of the content,
153
149
                    # just extract the whole thing in a single pass
154
150
                    num_bytes = self._content_length
155
 
                    self._content = zlib.decompress(z_content)
 
151
                    self._content = zlib.decompress(self._z_content)
156
152
                else:
157
153
                    self._z_content_decompressor = zlib.decompressobj()
158
154
                    # Seed the decompressor with the uncompressed bytes, so
159
155
                    # that the rest of the code is simplified
160
156
                    self._content = self._z_content_decompressor.decompress(
161
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
158
                    if not self._z_content_decompressor.unconsumed_tail:
163
159
                        self._z_content_decompressor = None
164
160
            else:
211
207
            # XXX: Define some GCCorrupt error ?
212
208
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
209
                                 (len(bytes), pos, self._z_content_length))
214
 
        self._z_content_chunks = (bytes[pos:],)
215
 
 
216
 
    @property
217
 
    def _z_content(self):
218
 
        """Return z_content_chunks as a simple string.
219
 
 
220
 
        Meant only to be used by the test suite.
221
 
        """
222
 
        if self._z_content_chunks is not None:
223
 
            return ''.join(self._z_content_chunks)
224
 
        return None
 
210
        self._z_content = bytes[pos:]
225
211
 
226
212
    @classmethod
227
213
    def from_bytes(cls, bytes):
283
269
        self._content_length = length
284
270
        self._content_chunks = content_chunks
285
271
        self._content = None
286
 
        self._z_content_chunks = None
 
272
        self._z_content = None
287
273
 
288
274
    def set_content(self, content):
289
275
        """Set the content of this block."""
290
276
        self._content_length = len(content)
291
277
        self._content = content
292
 
        self._z_content_chunks = None
 
278
        self._z_content = None
293
279
 
294
280
    def _create_z_content_using_lzma(self):
295
281
        if self._content_chunks is not None:
297
283
            self._content_chunks = None
298
284
        if self._content is None:
299
285
            raise AssertionError('Nothing to compress')
300
 
        z_content = pylzma.compress(self._content)
301
 
        self._z_content_chunks = (z_content,)
302
 
        self._z_content_length = len(z_content)
 
286
        self._z_content = pylzma.compress(self._content)
 
287
        self._z_content_length = len(self._z_content)
303
288
 
304
 
    def _create_z_content_from_chunks(self, chunks):
 
289
    def _create_z_content_from_chunks(self):
305
290
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
 
        # (measured peak is maybe 30MB over the above...)
308
 
        compressed_chunks = map(compressor.compress, chunks)
 
291
        compressed_chunks = map(compressor.compress, self._content_chunks)
309
292
        compressed_chunks.append(compressor.flush())
310
 
        # Ignore empty chunks
311
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
312
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
293
        self._z_content = ''.join(compressed_chunks)
 
294
        self._z_content_length = len(self._z_content)
313
295
 
314
296
    def _create_z_content(self):
315
 
        if self._z_content_chunks is not None:
 
297
        if self._z_content is not None:
316
298
            return
317
299
        if _USE_LZMA:
318
300
            self._create_z_content_using_lzma()
319
301
            return
320
302
        if self._content_chunks is not None:
321
 
            chunks = self._content_chunks
322
 
        else:
323
 
            chunks = (self._content,)
324
 
        self._create_z_content_from_chunks(chunks)
 
303
            self._create_z_content_from_chunks()
 
304
            return
 
305
        self._z_content = zlib.compress(self._content)
 
306
        self._z_content_length = len(self._z_content)
325
307
 
326
 
    def to_chunks(self):
327
 
        """Create the byte stream as a series of 'chunks'"""
 
308
    def to_bytes(self):
 
309
        """Encode the information into a byte stream."""
328
310
        self._create_z_content()
329
311
        if _USE_LZMA:
330
312
            header = self.GCB_LZ_HEADER
331
313
        else:
332
314
            header = self.GCB_HEADER
333
 
        chunks = ['%s%d\n%d\n'
334
 
                  % (header, self._z_content_length, self._content_length),
 
315
        chunks = [header,
 
316
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
317
                  self._z_content,
335
318
                 ]
336
 
        chunks.extend(self._z_content_chunks)
337
 
        total_len = sum(map(len, chunks))
338
 
        return total_len, chunks
339
 
 
340
 
    def to_bytes(self):
341
 
        """Encode the information into a byte stream."""
342
 
        total_len, chunks = self.to_chunks()
343
319
        return ''.join(chunks)
344
320
 
345
321
    def _dump(self, include_text=False):
703
679
        z_header_bytes = zlib.compress(header_bytes)
704
680
        del header_bytes
705
681
        z_header_bytes_len = len(z_header_bytes)
706
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
682
        block_bytes = self._block.to_bytes()
707
683
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
708
 
                                       block_bytes_len))
 
684
                                       len(block_bytes)))
709
685
        lines.append(z_header_bytes)
710
 
        lines.extend(block_chunks)
711
 
        del z_header_bytes, block_chunks
712
 
        # TODO: This is a point where we will double the memory consumption. To
713
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
686
        lines.append(block_bytes)
 
687
        del z_header_bytes, block_bytes
714
688
        return ''.join(lines)
715
689
 
716
690
    @classmethod
717
691
    def from_bytes(cls, bytes):
718
692
        # TODO: This does extra string copying, probably better to do it a
719
 
        #       different way. At a minimum this creates 2 copies of the
720
 
        #       compressed content
 
693
        #       different way
721
694
        (storage_kind, z_header_len, header_len,
722
695
         block_len, rest) = bytes.split('\n', 4)
723
696
        del bytes
881
854
 
882
855
        After calling this, the compressor should no longer be used
883
856
        """
 
857
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
858
        #       group. This has an impact for 'commit' of large objects.
 
859
        #       One possibility is to use self._content_chunks, and be lazy and
 
860
        #       only fill out self._content as a full string when we actually
 
861
        #       need it. That would at least drop the peak memory consumption
 
862
        #       for 'commit' down to ~1x the size of the largest file, at a
 
863
        #       cost of increased complexity within this code. 2x is still <<
 
864
        #       3x the size of the largest file, so we are doing ok.
884
865
        self._block.set_chunked_content(self.chunks, self.endpoint)
885
866
        self.chunks = None
886
867
        self._delta_index = None
1649
1630
        self._unadded_refs = {}
1650
1631
        keys_to_add = []
1651
1632
        def flush():
1652
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1633
            bytes = self._compressor.flush().to_bytes()
1653
1634
            self._compressor = GroupCompressor()
1654
 
            # Note: At this point we still have 1 copy of the fulltext (in
1655
 
            #       record and the var 'bytes'), and this generates 2 copies of
1656
 
            #       the compressed text (one for bytes, one in chunks)
1657
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1658
 
            #       have to double compressed memory here
1659
 
            # TODO: Figure out how to indicate that we would be happy to free
1660
 
            #       the fulltext content at this point. Note that sometimes we
1661
 
            #       will want it later (streaming CHK pages), but most of the
1662
 
            #       time we won't (everything else)
1663
 
            bytes = ''.join(chunks)
1664
 
            del chunks
1665
1635
            index, start, length = self._access.add_raw_records(
1666
1636
                [(None, len(bytes))], bytes)[0]
1667
1637
            nodes = []
1839
1809
        return result
1840
1810
 
1841
1811
 
1842
 
class _GCBuildDetails(object):
1843
 
    """A blob of data about the build details.
1844
 
 
1845
 
    This stores the minimal data, which then allows compatibility with the old
1846
 
    api, without taking as much memory.
1847
 
    """
1848
 
 
1849
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
 
                 '_delta_end', '_parents')
1851
 
 
1852
 
    method = 'group'
1853
 
    compression_parent = None
1854
 
 
1855
 
    def __init__(self, parents, position_info):
1856
 
        self._parents = parents
1857
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1858
 
         self._delta_end) = position_info
1859
 
 
1860
 
    def __repr__(self):
1861
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1862
 
            self.index_memo, self._parents)
1863
 
 
1864
 
    @property
1865
 
    def index_memo(self):
1866
 
        return (self._index, self._group_start, self._group_end,
1867
 
                self._basis_end, self._delta_end)
1868
 
 
1869
 
    @property
1870
 
    def record_details(self):
1871
 
        return static_tuple.StaticTuple(self.method, None)
1872
 
 
1873
 
    def __getitem__(self, offset):
1874
 
        """Compatibility thunk to act like a tuple."""
1875
 
        if offset == 0:
1876
 
            return self.index_memo
1877
 
        elif offset == 1:
1878
 
            return self.compression_parent # Always None
1879
 
        elif offset == 2:
1880
 
            return self._parents
1881
 
        elif offset == 3:
1882
 
            return self.record_details
1883
 
        else:
1884
 
            raise IndexError('offset out of range')
1885
 
            
1886
 
    def __len__(self):
1887
 
        return 4
1888
 
 
1889
 
 
1890
1812
class _GCGraphIndex(object):
1891
1813
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1892
1814
 
2087
2009
                parents = None
2088
2010
            else:
2089
2011
                parents = entry[3][0]
2090
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2091
 
            result[key] = details
 
2012
            method = 'group'
 
2013
            result[key] = (self._node_to_position(entry),
 
2014
                                  None, parents, (method, None))
2092
2015
        return result
2093
2016
 
2094
2017
    def keys(self):
2110
2033
        # each, or about 7MB. Note that it might be even more when you consider
2111
2034
        # how PyInt is allocated in separate slabs. And you can't return a slab
2112
2035
        # to the OS if even 1 int on it is in use. Note though that Python uses
2113
 
        # a LIFO when re-using PyInt slots, which might cause more
 
2036
        # a LIFO when re-using PyInt slots, which probably causes more
2114
2037
        # fragmentation.
2115
2038
        start = int(bits[0])
2116
2039
        start = self._int_cache.setdefault(start, start)