~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Danny van Heumen
  • Date: 2010-03-09 21:42:11 UTC
  • mto: (4634.139.5 2.0)
  • mto: This revision was merged to the branch mainline in revision 5160.
  • Revision ID: danny@dannyvanheumen.nl-20100309214211-iqh42x6qcikgd9p3
Reverted now-useless TODO list.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
    knit,
32
32
    osutils,
33
33
    pack,
34
 
    static_tuple,
35
34
    trace,
36
35
    )
37
36
from bzrlib.btree_index import BTreeBuilder
120
119
        :param num_bytes: Ensure that we have extracted at least num_bytes of
121
120
            content. If None, consume everything
122
121
        """
123
 
        if self._content_length is None:
124
 
            raise AssertionError('self._content_length should never be None')
 
122
        # TODO: If we re-use the same content block at different times during
 
123
        #       get_record_stream(), it is possible that the first pass will
 
124
        #       get inserted, triggering an extract/_ensure_content() which
 
125
        #       will get rid of _z_content. And then the next use of the block
 
126
        #       will try to access _z_content (to send it over the wire), and
 
127
        #       fail because it is already extracted. Consider never releasing
 
128
        #       _z_content because of this.
125
129
        if num_bytes is None:
126
130
            num_bytes = self._content_length
127
131
        elif (self._content_length is not None
144
148
                self._content = pylzma.decompress(self._z_content)
145
149
            elif self._compressor_name == 'zlib':
146
150
                # Start a zlib decompressor
147
 
                if num_bytes * 4 > self._content_length * 3:
148
 
                    # If we are requesting more that 3/4ths of the content,
149
 
                    # just extract the whole thing in a single pass
150
 
                    num_bytes = self._content_length
 
151
                if num_bytes is None:
151
152
                    self._content = zlib.decompress(self._z_content)
152
153
                else:
153
154
                    self._z_content_decompressor = zlib.decompressobj()
155
156
                    # that the rest of the code is simplified
156
157
                    self._content = self._z_content_decompressor.decompress(
157
158
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
 
                    if not self._z_content_decompressor.unconsumed_tail:
159
 
                        self._z_content_decompressor = None
160
159
            else:
161
160
                raise AssertionError('Unknown compressor: %r'
162
161
                                     % self._compressor_name)
164
163
        # 'unconsumed_tail'
165
164
 
166
165
        # Do we have enough bytes already?
167
 
        if len(self._content) >= num_bytes:
 
166
        if num_bytes is not None and len(self._content) >= num_bytes:
 
167
            return
 
168
        if num_bytes is None and self._z_content_decompressor is None:
 
169
            # We must have already decompressed everything
168
170
            return
169
171
        # If we got this far, and don't have a decompressor, something is wrong
170
172
        if self._z_content_decompressor is None:
171
173
            raise AssertionError(
172
174
                'No decompressor to decompress %d bytes' % num_bytes)
173
175
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
 
        if not remaining_decomp:
175
 
            raise AssertionError('Nothing left to decompress')
176
 
        needed_bytes = num_bytes - len(self._content)
177
 
        # We always set max_size to 32kB over the minimum needed, so that
178
 
        # zlib will give us as much as we really want.
179
 
        # TODO: If this isn't good enough, we could make a loop here,
180
 
        #       that keeps expanding the request until we get enough
181
 
        self._content += self._z_content_decompressor.decompress(
182
 
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
183
 
        if len(self._content) < num_bytes:
184
 
            raise AssertionError('%d bytes wanted, only %d available'
185
 
                                 % (num_bytes, len(self._content)))
186
 
        if not self._z_content_decompressor.unconsumed_tail:
187
 
            # The stream is finished
188
 
            self._z_content_decompressor = None
 
176
        if num_bytes is None:
 
177
            if remaining_decomp:
 
178
                # We don't know how much is left, but we'll decompress it all
 
179
                self._content += self._z_content_decompressor.decompress(
 
180
                    remaining_decomp)
 
181
                # Note: There's what I consider a bug in zlib.decompressobj
 
182
                #       If you pass back in the entire unconsumed_tail, only
 
183
                #       this time you don't pass a max-size, it doesn't
 
184
                #       change the unconsumed_tail back to None/''.
 
185
                #       However, we know we are done with the whole stream
 
186
                self._z_content_decompressor = None
 
187
            # XXX: Why is this the only place in this routine we set this?
 
188
            self._content_length = len(self._content)
 
189
        else:
 
190
            if not remaining_decomp:
 
191
                raise AssertionError('Nothing left to decompress')
 
192
            needed_bytes = num_bytes - len(self._content)
 
193
            # We always set max_size to 32kB over the minimum needed, so that
 
194
            # zlib will give us as much as we really want.
 
195
            # TODO: If this isn't good enough, we could make a loop here,
 
196
            #       that keeps expanding the request until we get enough
 
197
            self._content += self._z_content_decompressor.decompress(
 
198
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
199
            if len(self._content) < num_bytes:
 
200
                raise AssertionError('%d bytes wanted, only %d available'
 
201
                                     % (num_bytes, len(self._content)))
 
202
            if not self._z_content_decompressor.unconsumed_tail:
 
203
                # The stream is finished
 
204
                self._z_content_decompressor = None
189
205
 
190
206
    def _parse_bytes(self, bytes, pos):
191
207
        """Read the various lengths from the header.
1266
1282
        else:
1267
1283
            return self.get_record_stream(keys, 'unordered', True)
1268
1284
 
1269
 
    def clear_cache(self):
1270
 
        """See VersionedFiles.clear_cache()"""
1271
 
        self._group_cache.clear()
1272
 
        self._index._graph_index.clear_cache()
1273
 
        self._index._int_cache.clear()
1274
 
 
1275
1285
    def _check_add(self, key, lines, random_id, check_content):
1276
1286
        """check that version_id and lines are safe to add."""
1277
1287
        version_id = key[-1]
1631
1641
        keys_to_add = []
1632
1642
        def flush():
1633
1643
            bytes = self._compressor.flush().to_bytes()
1634
 
            self._compressor = GroupCompressor()
1635
1644
            index, start, length = self._access.add_raw_records(
1636
1645
                [(None, len(bytes))], bytes)[0]
1637
1646
            nodes = []
1640
1649
            self._index.add_records(nodes, random_id=random_id)
1641
1650
            self._unadded_refs = {}
1642
1651
            del keys_to_add[:]
 
1652
            self._compressor = GroupCompressor()
1643
1653
 
1644
1654
        last_prefix = None
1645
1655
        max_fulltext_len = 0
1747
1757
                key = record.key
1748
1758
            self._unadded_refs[key] = record.parents
1749
1759
            yield found_sha1
1750
 
            as_st = static_tuple.StaticTuple.from_sequence
1751
 
            if record.parents is not None:
1752
 
                parents = as_st([as_st(p) for p in record.parents])
1753
 
            else:
1754
 
                parents = None
1755
 
            refs = static_tuple.StaticTuple(parents)
1756
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1760
            keys_to_add.append((key, '%d %d' % (start_point, end_point),
 
1761
                (record.parents,)))
1757
1762
        if len(keys_to_add):
1758
1763
            flush()
1759
1764
        self._compressor = None
1809
1814
        return result
1810
1815
 
1811
1816
 
1812
 
class _GCBuildDetails(object):
1813
 
    """A blob of data about the build details.
1814
 
 
1815
 
    This stores the minimal data, which then allows compatibility with the old
1816
 
    api, without taking as much memory.
1817
 
    """
1818
 
 
1819
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1820
 
                 '_delta_end', '_parents')
1821
 
 
1822
 
    method = 'group'
1823
 
    compression_parent = None
1824
 
 
1825
 
    def __init__(self, parents, position_info):
1826
 
        self._parents = parents
1827
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1828
 
         self._delta_end) = position_info
1829
 
 
1830
 
    def __repr__(self):
1831
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1832
 
            self.index_memo, self._parents)
1833
 
 
1834
 
    @property
1835
 
    def index_memo(self):
1836
 
        return (self._index, self._group_start, self._group_end,
1837
 
                self._basis_end, self._delta_end)
1838
 
 
1839
 
    @property
1840
 
    def record_details(self):
1841
 
        return static_tuple.StaticTuple(self.method, None)
1842
 
 
1843
 
    def __getitem__(self, offset):
1844
 
        """Compatibility thunk to act like a tuple."""
1845
 
        if offset == 0:
1846
 
            return self.index_memo
1847
 
        elif offset == 1:
1848
 
            return self.compression_parent # Always None
1849
 
        elif offset == 2:
1850
 
            return self._parents
1851
 
        elif offset == 3:
1852
 
            return self.record_details
1853
 
        else:
1854
 
            raise IndexError('offset out of range')
1855
 
            
1856
 
    def __len__(self):
1857
 
        return 4
1858
 
 
1859
 
 
1860
1817
class _GCGraphIndex(object):
1861
1818
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1862
1819
 
1887
1844
        self.has_graph = parents
1888
1845
        self._is_locked = is_locked
1889
1846
        self._inconsistency_fatal = inconsistency_fatal
1890
 
        # GroupCompress records tend to have the same 'group' start + offset
1891
 
        # repeated over and over, this creates a surplus of ints
1892
 
        self._int_cache = {}
1893
1847
        if track_external_parent_refs:
1894
1848
            self._key_dependencies = knit._KeyRefs(
1895
1849
                track_new_keys=track_new_keys)
1931
1885
        if not random_id:
1932
1886
            present_nodes = self._get_entries(keys)
1933
1887
            for (index, key, value, node_refs) in present_nodes:
1934
 
                # Sometimes these are passed as a list rather than a tuple
1935
 
                node_refs = static_tuple.as_tuples(node_refs)
1936
 
                passed = static_tuple.as_tuples(keys[key])
1937
 
                if node_refs != passed[1]:
1938
 
                    details = '%s %s %s' % (key, (value, node_refs), passed)
 
1888
                if node_refs != keys[key][1]:
 
1889
                    details = '%s %s %s' % (key, (value, node_refs), keys[key])
1939
1890
                    if self._inconsistency_fatal:
1940
1891
                        raise errors.KnitCorrupt(self, "inconsistent details"
1941
1892
                                                 " in add_records: %s" %
2057
2008
                parents = None
2058
2009
            else:
2059
2010
                parents = entry[3][0]
2060
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2061
 
            result[key] = details
 
2011
            method = 'group'
 
2012
            result[key] = (self._node_to_position(entry),
 
2013
                                  None, parents, (method, None))
2062
2014
        return result
2063
2015
 
2064
2016
    def keys(self):
2073
2025
        """Convert an index value to position details."""
2074
2026
        bits = node[2].split(' ')
2075
2027
        # It would be nice not to read the entire gzip.
2076
 
        # start and stop are put into _int_cache because they are very common.
2077
 
        # They define the 'group' that an entry is in, and many groups can have
2078
 
        # thousands of objects.
2079
 
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2080
 
        # each, or about 7MB. Note that it might be even more when you consider
2081
 
        # how PyInt is allocated in separate slabs. And you can't return a slab
2082
 
        # to the OS if even 1 int on it is in use. Note though that Python uses
2083
 
        # a LIFO when re-using PyInt slots, which might cause more
2084
 
        # fragmentation.
2085
2028
        start = int(bits[0])
2086
 
        start = self._int_cache.setdefault(start, start)
2087
2029
        stop = int(bits[1])
2088
 
        stop = self._int_cache.setdefault(stop, stop)
2089
2030
        basis_end = int(bits[2])
2090
2031
        delta_end = int(bits[3])
2091
 
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2092
 
        # instance...
2093
 
        return (node[0], start, stop, basis_end, delta_end)
 
2032
        return node[0], start, stop, basis_end, delta_end
2094
2033
 
2095
2034
    def scan_unvalidated_index(self, graph_index):
2096
2035
        """Inform this _GCGraphIndex that there is an unvalidated index.
2127
2066
        decode_base128_int,
2128
2067
        )
2129
2068
    GroupCompressor = PyrexGroupCompressor
2130
 
except ImportError, e:
2131
 
    osutils.failed_to_load_extension(e)
 
2069
except ImportError:
2132
2070
    GroupCompressor = PythonGroupCompressor
2133
2071