~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-13 16:23:07 UTC
  • mto: (4634.119.7 2.0)
  • mto: This revision was merged to the branch mainline in revision 4959.
  • Revision ID: john@arbash-meinel.com-20100113162307-0bs82td16gzih827
Update the MANIFEST.in file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
    knit,
32
32
    osutils,
33
33
    pack,
34
 
    static_tuple,
35
34
    trace,
36
35
    )
37
36
from bzrlib.btree_index import BTreeBuilder
101
100
    def __init__(self):
102
101
        # map by key? or just order in file?
103
102
        self._compressor_name = None
104
 
        self._z_content_chunks = None
 
103
        self._z_content = None
105
104
        self._z_content_decompressor = None
106
105
        self._z_content_length = None
107
106
        self._content_length = None
120
119
        :param num_bytes: Ensure that we have extracted at least num_bytes of
121
120
            content. If None, consume everything
122
121
        """
123
 
        if self._content_length is None:
124
 
            raise AssertionError('self._content_length should never be None')
 
122
        # TODO: If we re-use the same content block at different times during
 
123
        #       get_record_stream(), it is possible that the first pass will
 
124
        #       get inserted, triggering an extract/_ensure_content() which
 
125
        #       will get rid of _z_content. And then the next use of the block
 
126
        #       will try to access _z_content (to send it over the wire), and
 
127
        #       fail because it is already extracted. Consider never releasing
 
128
        #       _z_content because of this.
125
129
        if num_bytes is None:
126
130
            num_bytes = self._content_length
127
131
        elif (self._content_length is not None
135
139
                self._content = ''.join(self._content_chunks)
136
140
                self._content_chunks = None
137
141
        if self._content is None:
138
 
            # We join self._z_content_chunks here, because if we are
139
 
            # decompressing, then it is *very* likely that we have a single
140
 
            # chunk
141
 
            if self._z_content_chunks is None:
 
142
            if self._z_content is None:
142
143
                raise AssertionError('No content to decompress')
143
 
            z_content = ''.join(self._z_content_chunks)
144
 
            if z_content == '':
 
144
            if self._z_content == '':
145
145
                self._content = ''
146
146
            elif self._compressor_name == 'lzma':
147
147
                # We don't do partial lzma decomp yet
148
 
                self._content = pylzma.decompress(z_content)
 
148
                self._content = pylzma.decompress(self._z_content)
149
149
            elif self._compressor_name == 'zlib':
150
150
                # Start a zlib decompressor
151
 
                if num_bytes * 4 > self._content_length * 3:
152
 
                    # If we are requesting more that 3/4ths of the content,
153
 
                    # just extract the whole thing in a single pass
154
 
                    num_bytes = self._content_length
155
 
                    self._content = zlib.decompress(z_content)
 
151
                if num_bytes is None:
 
152
                    self._content = zlib.decompress(self._z_content)
156
153
                else:
157
154
                    self._z_content_decompressor = zlib.decompressobj()
158
155
                    # Seed the decompressor with the uncompressed bytes, so
159
156
                    # that the rest of the code is simplified
160
157
                    self._content = self._z_content_decompressor.decompress(
161
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
 
                    if not self._z_content_decompressor.unconsumed_tail:
163
 
                        self._z_content_decompressor = None
 
158
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
164
159
            else:
165
160
                raise AssertionError('Unknown compressor: %r'
166
161
                                     % self._compressor_name)
168
163
        # 'unconsumed_tail'
169
164
 
170
165
        # Do we have enough bytes already?
171
 
        if len(self._content) >= num_bytes:
 
166
        if num_bytes is not None and len(self._content) >= num_bytes:
 
167
            return
 
168
        if num_bytes is None and self._z_content_decompressor is None:
 
169
            # We must have already decompressed everything
172
170
            return
173
171
        # If we got this far, and don't have a decompressor, something is wrong
174
172
        if self._z_content_decompressor is None:
175
173
            raise AssertionError(
176
174
                'No decompressor to decompress %d bytes' % num_bytes)
177
175
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
 
        if not remaining_decomp:
179
 
            raise AssertionError('Nothing left to decompress')
180
 
        needed_bytes = num_bytes - len(self._content)
181
 
        # We always set max_size to 32kB over the minimum needed, so that
182
 
        # zlib will give us as much as we really want.
183
 
        # TODO: If this isn't good enough, we could make a loop here,
184
 
        #       that keeps expanding the request until we get enough
185
 
        self._content += self._z_content_decompressor.decompress(
186
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
 
        if len(self._content) < num_bytes:
188
 
            raise AssertionError('%d bytes wanted, only %d available'
189
 
                                 % (num_bytes, len(self._content)))
190
 
        if not self._z_content_decompressor.unconsumed_tail:
191
 
            # The stream is finished
192
 
            self._z_content_decompressor = None
 
176
        if num_bytes is None:
 
177
            if remaining_decomp:
 
178
                # We don't know how much is left, but we'll decompress it all
 
179
                self._content += self._z_content_decompressor.decompress(
 
180
                    remaining_decomp)
 
181
                # Note: There's what I consider a bug in zlib.decompressobj
 
182
                #       If you pass back in the entire unconsumed_tail, only
 
183
                #       this time you don't pass a max-size, it doesn't
 
184
                #       change the unconsumed_tail back to None/''.
 
185
                #       However, we know we are done with the whole stream
 
186
                self._z_content_decompressor = None
 
187
            # XXX: Why is this the only place in this routine we set this?
 
188
            self._content_length = len(self._content)
 
189
        else:
 
190
            if not remaining_decomp:
 
191
                raise AssertionError('Nothing left to decompress')
 
192
            needed_bytes = num_bytes - len(self._content)
 
193
            # We always set max_size to 32kB over the minimum needed, so that
 
194
            # zlib will give us as much as we really want.
 
195
            # TODO: If this isn't good enough, we could make a loop here,
 
196
            #       that keeps expanding the request until we get enough
 
197
            self._content += self._z_content_decompressor.decompress(
 
198
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
199
            if len(self._content) < num_bytes:
 
200
                raise AssertionError('%d bytes wanted, only %d available'
 
201
                                     % (num_bytes, len(self._content)))
 
202
            if not self._z_content_decompressor.unconsumed_tail:
 
203
                # The stream is finished
 
204
                self._z_content_decompressor = None
193
205
 
194
206
    def _parse_bytes(self, bytes, pos):
195
207
        """Read the various lengths from the header.
211
223
            # XXX: Define some GCCorrupt error ?
212
224
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
225
                                 (len(bytes), pos, self._z_content_length))
214
 
        self._z_content_chunks = (bytes[pos:],)
215
 
 
216
 
    @property
217
 
    def _z_content(self):
218
 
        """Return z_content_chunks as a simple string.
219
 
 
220
 
        Meant only to be used by the test suite.
221
 
        """
222
 
        if self._z_content_chunks is not None:
223
 
            return ''.join(self._z_content_chunks)
224
 
        return None
 
226
        self._z_content = bytes[pos:]
225
227
 
226
228
    @classmethod
227
229
    def from_bytes(cls, bytes):
283
285
        self._content_length = length
284
286
        self._content_chunks = content_chunks
285
287
        self._content = None
286
 
        self._z_content_chunks = None
 
288
        self._z_content = None
287
289
 
288
290
    def set_content(self, content):
289
291
        """Set the content of this block."""
290
292
        self._content_length = len(content)
291
293
        self._content = content
292
 
        self._z_content_chunks = None
 
294
        self._z_content = None
293
295
 
294
296
    def _create_z_content_using_lzma(self):
295
297
        if self._content_chunks is not None:
297
299
            self._content_chunks = None
298
300
        if self._content is None:
299
301
            raise AssertionError('Nothing to compress')
300
 
        z_content = pylzma.compress(self._content)
301
 
        self._z_content_chunks = (z_content,)
302
 
        self._z_content_length = len(z_content)
 
302
        self._z_content = pylzma.compress(self._content)
 
303
        self._z_content_length = len(self._z_content)
303
304
 
304
 
    def _create_z_content_from_chunks(self, chunks):
 
305
    def _create_z_content_from_chunks(self):
305
306
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
 
        # (measured peak is maybe 30MB over the above...)
308
 
        compressed_chunks = map(compressor.compress, chunks)
 
307
        compressed_chunks = map(compressor.compress, self._content_chunks)
309
308
        compressed_chunks.append(compressor.flush())
310
 
        # Ignore empty chunks
311
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
312
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
309
        self._z_content = ''.join(compressed_chunks)
 
310
        self._z_content_length = len(self._z_content)
313
311
 
314
312
    def _create_z_content(self):
315
 
        if self._z_content_chunks is not None:
 
313
        if self._z_content is not None:
316
314
            return
317
315
        if _USE_LZMA:
318
316
            self._create_z_content_using_lzma()
319
317
            return
320
318
        if self._content_chunks is not None:
321
 
            chunks = self._content_chunks
322
 
        else:
323
 
            chunks = (self._content,)
324
 
        self._create_z_content_from_chunks(chunks)
 
319
            self._create_z_content_from_chunks()
 
320
            return
 
321
        self._z_content = zlib.compress(self._content)
 
322
        self._z_content_length = len(self._z_content)
325
323
 
326
 
    def to_chunks(self):
327
 
        """Create the byte stream as a series of 'chunks'"""
 
324
    def to_bytes(self):
 
325
        """Encode the information into a byte stream."""
328
326
        self._create_z_content()
329
327
        if _USE_LZMA:
330
328
            header = self.GCB_LZ_HEADER
331
329
        else:
332
330
            header = self.GCB_HEADER
333
 
        chunks = ['%s%d\n%d\n'
334
 
                  % (header, self._z_content_length, self._content_length),
 
331
        chunks = [header,
 
332
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
333
                  self._z_content,
335
334
                 ]
336
 
        chunks.extend(self._z_content_chunks)
337
 
        total_len = sum(map(len, chunks))
338
 
        return total_len, chunks
339
 
 
340
 
    def to_bytes(self):
341
 
        """Encode the information into a byte stream."""
342
 
        total_len, chunks = self.to_chunks()
343
335
        return ''.join(chunks)
344
336
 
345
337
    def _dump(self, include_text=False):
703
695
        z_header_bytes = zlib.compress(header_bytes)
704
696
        del header_bytes
705
697
        z_header_bytes_len = len(z_header_bytes)
706
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
698
        block_bytes = self._block.to_bytes()
707
699
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
708
 
                                       block_bytes_len))
 
700
                                       len(block_bytes)))
709
701
        lines.append(z_header_bytes)
710
 
        lines.extend(block_chunks)
711
 
        del z_header_bytes, block_chunks
712
 
        # TODO: This is a point where we will double the memory consumption. To
713
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
702
        lines.append(block_bytes)
 
703
        del z_header_bytes, block_bytes
714
704
        return ''.join(lines)
715
705
 
716
706
    @classmethod
717
707
    def from_bytes(cls, bytes):
718
708
        # TODO: This does extra string copying, probably better to do it a
719
 
        #       different way. At a minimum this creates 2 copies of the
720
 
        #       compressed content
 
709
        #       different way
721
710
        (storage_kind, z_header_len, header_len,
722
711
         block_len, rest) = bytes.split('\n', 4)
723
712
        del bytes
881
870
 
882
871
        After calling this, the compressor should no longer be used
883
872
        """
 
873
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
874
        #       group. This has an impact for 'commit' of large objects.
 
875
        #       One possibility is to use self._content_chunks, and be lazy and
 
876
        #       only fill out self._content as a full string when we actually
 
877
        #       need it. That would at least drop the peak memory consumption
 
878
        #       for 'commit' down to ~1x the size of the largest file, at a
 
879
        #       cost of increased complexity within this code. 2x is still <<
 
880
        #       3x the size of the largest file, so we are doing ok.
884
881
        self._block.set_chunked_content(self.chunks, self.endpoint)
885
882
        self.chunks = None
886
883
        self._delta_index = None
1285
1282
        else:
1286
1283
            return self.get_record_stream(keys, 'unordered', True)
1287
1284
 
1288
 
    def clear_cache(self):
1289
 
        """See VersionedFiles.clear_cache()"""
1290
 
        self._group_cache.clear()
1291
 
        self._index._graph_index.clear_cache()
1292
 
        self._index._int_cache.clear()
1293
 
 
1294
1285
    def _check_add(self, key, lines, random_id, check_content):
1295
1286
        """check that version_id and lines are safe to add."""
1296
1287
        version_id = key[-1]
1649
1640
        self._unadded_refs = {}
1650
1641
        keys_to_add = []
1651
1642
        def flush():
1652
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1653
 
            self._compressor = GroupCompressor()
1654
 
            # Note: At this point we still have 1 copy of the fulltext (in
1655
 
            #       record and the var 'bytes'), and this generates 2 copies of
1656
 
            #       the compressed text (one for bytes, one in chunks)
1657
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1658
 
            #       have to double compressed memory here
1659
 
            # TODO: Figure out how to indicate that we would be happy to free
1660
 
            #       the fulltext content at this point. Note that sometimes we
1661
 
            #       will want it later (streaming CHK pages), but most of the
1662
 
            #       time we won't (everything else)
1663
 
            bytes = ''.join(chunks)
1664
 
            del chunks
 
1643
            bytes = self._compressor.flush().to_bytes()
1665
1644
            index, start, length = self._access.add_raw_records(
1666
1645
                [(None, len(bytes))], bytes)[0]
1667
1646
            nodes = []
1670
1649
            self._index.add_records(nodes, random_id=random_id)
1671
1650
            self._unadded_refs = {}
1672
1651
            del keys_to_add[:]
 
1652
            self._compressor = GroupCompressor()
1673
1653
 
1674
1654
        last_prefix = None
1675
1655
        max_fulltext_len = 0
1777
1757
                key = record.key
1778
1758
            self._unadded_refs[key] = record.parents
1779
1759
            yield found_sha1
1780
 
            as_st = static_tuple.StaticTuple.from_sequence
1781
 
            if record.parents is not None:
1782
 
                parents = as_st([as_st(p) for p in record.parents])
1783
 
            else:
1784
 
                parents = None
1785
 
            refs = static_tuple.StaticTuple(parents)
1786
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1760
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1761
                (record.parents,)))
1787
1762
        if len(keys_to_add):
1788
1763
            flush()
1789
1764
        self._compressor = None
1839
1814
        return result
1840
1815
 
1841
1816
 
1842
 
class _GCBuildDetails(object):
1843
 
    """A blob of data about the build details.
1844
 
 
1845
 
    This stores the minimal data, which then allows compatibility with the old
1846
 
    api, without taking as much memory.
1847
 
    """
1848
 
 
1849
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
 
                 '_delta_end', '_parents')
1851
 
 
1852
 
    method = 'group'
1853
 
    compression_parent = None
1854
 
 
1855
 
    def __init__(self, parents, position_info):
1856
 
        self._parents = parents
1857
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1858
 
         self._delta_end) = position_info
1859
 
 
1860
 
    def __repr__(self):
1861
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1862
 
            self.index_memo, self._parents)
1863
 
 
1864
 
    @property
1865
 
    def index_memo(self):
1866
 
        return (self._index, self._group_start, self._group_end,
1867
 
                self._basis_end, self._delta_end)
1868
 
 
1869
 
    @property
1870
 
    def record_details(self):
1871
 
        return static_tuple.StaticTuple(self.method, None)
1872
 
 
1873
 
    def __getitem__(self, offset):
1874
 
        """Compatibility thunk to act like a tuple."""
1875
 
        if offset == 0:
1876
 
            return self.index_memo
1877
 
        elif offset == 1:
1878
 
            return self.compression_parent # Always None
1879
 
        elif offset == 2:
1880
 
            return self._parents
1881
 
        elif offset == 3:
1882
 
            return self.record_details
1883
 
        else:
1884
 
            raise IndexError('offset out of range')
1885
 
            
1886
 
    def __len__(self):
1887
 
        return 4
1888
 
 
1889
 
 
1890
1817
class _GCGraphIndex(object):
1891
1818
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1892
1819
 
1917
1844
        self.has_graph = parents
1918
1845
        self._is_locked = is_locked
1919
1846
        self._inconsistency_fatal = inconsistency_fatal
1920
 
        # GroupCompress records tend to have the same 'group' start + offset
1921
 
        # repeated over and over, this creates a surplus of ints
1922
 
        self._int_cache = {}
1923
1847
        if track_external_parent_refs:
1924
1848
            self._key_dependencies = knit._KeyRefs(
1925
1849
                track_new_keys=track_new_keys)
1961
1885
        if not random_id:
1962
1886
            present_nodes = self._get_entries(keys)
1963
1887
            for (index, key, value, node_refs) in present_nodes:
1964
 
                # Sometimes these are passed as a list rather than a tuple
1965
 
                node_refs = static_tuple.as_tuples(node_refs)
1966
 
                passed = static_tuple.as_tuples(keys[key])
1967
 
                if node_refs != passed[1]:
1968
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
 
1888
                if node_refs != keys[key][1]:
 
1889
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
1969
1890
                    if self._inconsistency_fatal:
1970
1891
                        raise errors.KnitCorrupt(self, "inconsistent details"
1971
1892
                                                 " in add_records: %s" %
2087
2008
                parents = None
2088
2009
            else:
2089
2010
                parents = entry[3][0]
2090
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2091
 
            result[key] = details
 
2011
            method = 'group'
 
2012
            result[key] = (self._node_to_position(entry),
 
2013
                                  None, parents, (method, None))
2092
2014
        return result
2093
2015
 
2094
2016
    def keys(self):
2103
2025
        """Convert an index value to position details."""
2104
2026
        bits = node[2].split(' ')
2105
2027
        # It would be nice not to read the entire gzip.
2106
 
        # start and stop are put into _int_cache because they are very common.
2107
 
        # They define the 'group' that an entry is in, and many groups can have
2108
 
        # thousands of objects.
2109
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2110
 
        # each, or about 7MB. Note that it might be even more when you consider
2111
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2112
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2113
 
        # a LIFO when re-using PyInt slots, which might cause more
2114
 
        # fragmentation.
2115
2028
        start = int(bits[0])
2116
 
        start = self._int_cache.setdefault(start, start)
2117
2029
        stop = int(bits[1])
2118
 
        stop = self._int_cache.setdefault(stop, stop)
2119
2030
        basis_end = int(bits[2])
2120
2031
        delta_end = int(bits[3])
2121
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2122
 
        # instance...
2123
 
        return (node[0], start, stop, basis_end, delta_end)
 
2032
        return node[0], start, stop, basis_end, delta_end
2124
2033
 
2125
2034
    def scan_unvalidated_index(self, graph_index):
2126
2035
        """Inform this _GCGraphIndex that there is an unvalidated index.
2157
2066
        decode_base128_int,
2158
2067
        )
2159
2068
    GroupCompressor = PyrexGroupCompressor
2160
 
except ImportError, e:
2161
 
    osutils.failed_to_load_extension(e)
 
2069
except ImportError:
2162
2070
    GroupCompressor = PythonGroupCompressor
2163
2071