119
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
120
120
content. If None, consume everything
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
122
if self._content_length is None:
123
raise AssertionError('self._content_length should never be None')
129
124
if num_bytes is None:
130
125
num_bytes = self._content_length
131
126
elif (self._content_length is not None
148
143
self._content = pylzma.decompress(self._z_content)
149
144
elif self._compressor_name == 'zlib':
150
145
# Start a zlib decompressor
151
if num_bytes is None:
146
if num_bytes * 4 > self._content_length * 3:
147
# If we are requesting more that 3/4ths of the content,
148
# just extract the whole thing in a single pass
149
num_bytes = self._content_length
152
150
self._content = zlib.decompress(self._z_content)
154
152
self._z_content_decompressor = zlib.decompressobj()
163
163
# 'unconsumed_tail'
165
165
# Do we have enough bytes already?
166
if num_bytes is not None and len(self._content) >= num_bytes:
168
if num_bytes is None and self._z_content_decompressor is None:
169
# We must have already decompressed everything
166
if len(self._content) >= num_bytes:
171
168
# If we got this far, and don't have a decompressor, something is wrong
172
169
if self._z_content_decompressor is None:
173
170
raise AssertionError(
174
171
'No decompressor to decompress %d bytes' % num_bytes)
175
172
remaining_decomp = self._z_content_decompressor.unconsumed_tail
176
if num_bytes is None:
178
# We don't know how much is left, but we'll decompress it all
179
self._content += self._z_content_decompressor.decompress(
181
# Note: There's what I consider a bug in zlib.decompressobj
182
# If you pass back in the entire unconsumed_tail, only
183
# this time you don't pass a max-size, it doesn't
184
# change the unconsumed_tail back to None/''.
185
# However, we know we are done with the whole stream
186
self._z_content_decompressor = None
187
# XXX: Why is this the only place in this routine we set this?
188
self._content_length = len(self._content)
190
if not remaining_decomp:
191
raise AssertionError('Nothing left to decompress')
192
needed_bytes = num_bytes - len(self._content)
193
# We always set max_size to 32kB over the minimum needed, so that
194
# zlib will give us as much as we really want.
195
# TODO: If this isn't good enough, we could make a loop here,
196
# that keeps expanding the request until we get enough
197
self._content += self._z_content_decompressor.decompress(
198
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
if len(self._content) < num_bytes:
200
raise AssertionError('%d bytes wanted, only %d available'
201
% (num_bytes, len(self._content)))
202
if not self._z_content_decompressor.unconsumed_tail:
203
# The stream is finished
204
self._z_content_decompressor = None
173
if not remaining_decomp:
174
raise AssertionError('Nothing left to decompress')
175
needed_bytes = num_bytes - len(self._content)
176
# We always set max_size to 32kB over the minimum needed, so that
177
# zlib will give us as much as we really want.
178
# TODO: If this isn't good enough, we could make a loop here,
179
# that keeps expanding the request until we get enough
180
self._content += self._z_content_decompressor.decompress(
181
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
182
if len(self._content) < num_bytes:
183
raise AssertionError('%d bytes wanted, only %d available'
184
% (num_bytes, len(self._content)))
185
if not self._z_content_decompressor.unconsumed_tail:
186
# The stream is finished
187
self._z_content_decompressor = None
206
189
def _parse_bytes(self, bytes, pos):
207
190
"""Read the various lengths from the header.
1283
1266
return self.get_record_stream(keys, 'unordered', True)
1268
def clear_cache(self):
1269
"""See VersionedFiles.clear_cache()"""
1270
self._group_cache.clear()
1271
self._index._graph_index.clear_cache()
1285
1273
def _check_add(self, key, lines, random_id, check_content):
1286
1274
"""check that version_id and lines are safe to add."""
1287
1275
version_id = key[-1]