~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2009-10-20 19:46:46 UTC
  • mfrom: (4759 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4771.
  • Revision ID: john@arbash-meinel.com-20091020194646-wnqpd15qs19y28z7
Merge bzr.dev 4759, bringing in static_tuple and streaming improvements.

Show diffs side-by-side

added added

removed removed

Lines of Context:
119
119
        :param num_bytes: Ensure that we have extracted at least num_bytes of
120
120
            content. If None, consume everything
121
121
        """
122
 
        # TODO: If we re-use the same content block at different times during
123
 
        #       get_record_stream(), it is possible that the first pass will
124
 
        #       get inserted, triggering an extract/_ensure_content() which
125
 
        #       will get rid of _z_content. And then the next use of the block
126
 
        #       will try to access _z_content (to send it over the wire), and
127
 
        #       fail because it is already extracted. Consider never releasing
128
 
        #       _z_content because of this.
 
122
        if self._content_length is None:
 
123
            raise AssertionError('self._content_length should never be None')
129
124
        if num_bytes is None:
130
125
            num_bytes = self._content_length
131
126
        elif (self._content_length is not None
148
143
                self._content = pylzma.decompress(self._z_content)
149
144
            elif self._compressor_name == 'zlib':
150
145
                # Start a zlib decompressor
151
 
                if num_bytes is None:
 
146
                if num_bytes * 4 > self._content_length * 3:
 
147
                    # If we are requesting more that 3/4ths of the content,
 
148
                    # just extract the whole thing in a single pass
 
149
                    num_bytes = self._content_length
152
150
                    self._content = zlib.decompress(self._z_content)
153
151
                else:
154
152
                    self._z_content_decompressor = zlib.decompressobj()
156
154
                    # that the rest of the code is simplified
157
155
                    self._content = self._z_content_decompressor.decompress(
158
156
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                    if not self._z_content_decompressor.unconsumed_tail:
 
158
                        self._z_content_decompressor = None
159
159
            else:
160
160
                raise AssertionError('Unknown compressor: %r'
161
161
                                     % self._compressor_name)
163
163
        # 'unconsumed_tail'
164
164
 
165
165
        # Do we have enough bytes already?
166
 
        if num_bytes is not None and len(self._content) >= num_bytes:
167
 
            return
168
 
        if num_bytes is None and self._z_content_decompressor is None:
169
 
            # We must have already decompressed everything
 
166
        if len(self._content) >= num_bytes:
170
167
            return
171
168
        # If we got this far, and don't have a decompressor, something is wrong
172
169
        if self._z_content_decompressor is None:
173
170
            raise AssertionError(
174
171
                'No decompressor to decompress %d bytes' % num_bytes)
175
172
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
176
 
        if num_bytes is None:
177
 
            if remaining_decomp:
178
 
                # We don't know how much is left, but we'll decompress it all
179
 
                self._content += self._z_content_decompressor.decompress(
180
 
                    remaining_decomp)
181
 
                # Note: There's what I consider a bug in zlib.decompressobj
182
 
                #       If you pass back in the entire unconsumed_tail, only
183
 
                #       this time you don't pass a max-size, it doesn't
184
 
                #       change the unconsumed_tail back to None/''.
185
 
                #       However, we know we are done with the whole stream
186
 
                self._z_content_decompressor = None
187
 
            # XXX: Why is this the only place in this routine we set this?
188
 
            self._content_length = len(self._content)
189
 
        else:
190
 
            if not remaining_decomp:
191
 
                raise AssertionError('Nothing left to decompress')
192
 
            needed_bytes = num_bytes - len(self._content)
193
 
            # We always set max_size to 32kB over the minimum needed, so that
194
 
            # zlib will give us as much as we really want.
195
 
            # TODO: If this isn't good enough, we could make a loop here,
196
 
            #       that keeps expanding the request until we get enough
197
 
            self._content += self._z_content_decompressor.decompress(
198
 
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
 
            if len(self._content) < num_bytes:
200
 
                raise AssertionError('%d bytes wanted, only %d available'
201
 
                                     % (num_bytes, len(self._content)))
202
 
            if not self._z_content_decompressor.unconsumed_tail:
203
 
                # The stream is finished
204
 
                self._z_content_decompressor = None
 
173
        if not remaining_decomp:
 
174
            raise AssertionError('Nothing left to decompress')
 
175
        needed_bytes = num_bytes - len(self._content)
 
176
        # We always set max_size to 32kB over the minimum needed, so that
 
177
        # zlib will give us as much as we really want.
 
178
        # TODO: If this isn't good enough, we could make a loop here,
 
179
        #       that keeps expanding the request until we get enough
 
180
        self._content += self._z_content_decompressor.decompress(
 
181
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
182
        if len(self._content) < num_bytes:
 
183
            raise AssertionError('%d bytes wanted, only %d available'
 
184
                                 % (num_bytes, len(self._content)))
 
185
        if not self._z_content_decompressor.unconsumed_tail:
 
186
            # The stream is finished
 
187
            self._z_content_decompressor = None
205
188
 
206
189
    def _parse_bytes(self, bytes, pos):
207
190
        """Read the various lengths from the header.
1282
1265
        else:
1283
1266
            return self.get_record_stream(keys, 'unordered', True)
1284
1267
 
 
1268
    def clear_cache(self):
 
1269
        """See VersionedFiles.clear_cache()"""
 
1270
        self._group_cache.clear()
 
1271
        self._index._graph_index.clear_cache()
 
1272
 
1285
1273
    def _check_add(self, key, lines, random_id, check_content):
1286
1274
        """check that version_id and lines are safe to add."""
1287
1275
        version_id = key[-1]