135
135
self._content = ''.join(self._content_chunks)
136
136
self._content_chunks = None
137
137
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
138
if self._z_content is None:
142
139
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
140
if self._z_content == '':
145
141
self._content = ''
146
142
elif self._compressor_name == 'lzma':
147
143
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
144
self._content = pylzma.decompress(self._z_content)
149
145
elif self._compressor_name == 'zlib':
150
146
# Start a zlib decompressor
151
147
if num_bytes * 4 > self._content_length * 3:
152
148
# If we are requesting more that 3/4ths of the content,
153
149
# just extract the whole thing in a single pass
154
150
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
151
self._content = zlib.decompress(self._z_content)
157
153
self._z_content_decompressor = zlib.decompressobj()
158
154
# Seed the decompressor with the uncompressed bytes, so
159
155
# that the rest of the code is simplified
160
156
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
158
if not self._z_content_decompressor.unconsumed_tail:
163
159
self._z_content_decompressor = None
211
207
# XXX: Define some GCCorrupt error ?
212
208
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
209
(len(bytes), pos, self._z_content_length))
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
"""Return z_content_chunks as a simple string.
220
Meant only to be used by the test suite.
222
if self._z_content_chunks is not None:
223
return ''.join(self._z_content_chunks)
210
self._z_content = bytes[pos:]
227
213
def from_bytes(cls, bytes):
283
269
self._content_length = length
284
270
self._content_chunks = content_chunks
285
271
self._content = None
286
self._z_content_chunks = None
272
self._z_content = None
288
274
def set_content(self, content):
289
275
"""Set the content of this block."""
290
276
self._content_length = len(content)
291
277
self._content = content
292
self._z_content_chunks = None
278
self._z_content = None
294
280
def _create_z_content_using_lzma(self):
295
281
if self._content_chunks is not None:
297
283
self._content_chunks = None
298
284
if self._content is None:
299
285
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
304
def _create_z_content_from_chunks(self, chunks):
289
def _create_z_content_from_chunks(self):
305
290
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
309
292
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
314
296
def _create_z_content(self):
315
if self._z_content_chunks is not None:
297
if self._z_content is not None:
318
300
self._create_z_content_using_lzma()
320
302
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
327
"""Create the byte stream as a series of 'chunks'"""
309
"""Encode the information into a byte stream."""
328
310
self._create_z_content()
330
312
header = self.GCB_LZ_HEADER
332
314
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
343
319
return ''.join(chunks)
345
321
def _dump(self, include_text=False):
703
679
z_header_bytes = zlib.compress(header_bytes)
705
681
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
682
block_bytes = self._block.to_bytes()
707
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
685
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
714
688
return ''.join(lines)
717
691
def from_bytes(cls, bytes):
718
692
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
694
(storage_kind, z_header_len, header_len,
722
695
block_len, rest) = bytes.split('\n', 4)
882
855
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
884
865
self._block.set_chunked_content(self.chunks, self.endpoint)
885
866
self.chunks = None
886
867
self._delta_index = None
1649
1630
self._unadded_refs = {}
1650
1631
keys_to_add = []
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1633
bytes = self._compressor.flush().to_bytes()
1653
1634
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
1665
1635
index, start, length = self._access.add_raw_records(
1666
1636
[(None, len(bytes))], bytes)[0]