135
135
self._content = ''.join(self._content_chunks)
136
136
self._content_chunks = None
137
137
if self._content is None:
138
if self._z_content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
139
142
raise AssertionError('No content to decompress')
140
if self._z_content == '':
143
z_content = ''.join(self._z_content_chunks)
141
145
self._content = ''
142
146
elif self._compressor_name == 'lzma':
143
147
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
148
self._content = pylzma.decompress(z_content)
145
149
elif self._compressor_name == 'zlib':
146
150
# Start a zlib decompressor
147
151
if num_bytes * 4 > self._content_length * 3:
148
152
# If we are requesting more that 3/4ths of the content,
149
153
# just extract the whole thing in a single pass
150
154
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
155
self._content = zlib.decompress(z_content)
153
157
self._z_content_decompressor = zlib.decompressobj()
154
158
# Seed the decompressor with the uncompressed bytes, so
155
159
# that the rest of the code is simplified
156
160
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
if not self._z_content_decompressor.unconsumed_tail:
159
163
self._z_content_decompressor = None
207
211
# XXX: Define some GCCorrupt error ?
208
212
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
"""Return z_content_chunks as a simple string.
220
Meant only to be used by the test suite.
222
if self._z_content_chunks is not None:
223
return ''.join(self._z_content_chunks)
213
227
def from_bytes(cls, bytes):
269
283
self._content_length = length
270
284
self._content_chunks = content_chunks
271
285
self._content = None
272
self._z_content = None
286
self._z_content_chunks = None
274
288
def set_content(self, content):
275
289
"""Set the content of this block."""
276
290
self._content_length = len(content)
277
291
self._content = content
278
self._z_content = None
292
self._z_content_chunks = None
280
294
def _create_z_content_using_lzma(self):
281
295
if self._content_chunks is not None:
283
297
self._content_chunks = None
284
298
if self._content is None:
285
299
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
304
def _create_z_content_from_chunks(self, chunks):
290
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
292
309
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
296
314
def _create_z_content(self):
297
if self._z_content is not None:
315
if self._z_content_chunks is not None:
300
318
self._create_z_content_using_lzma()
302
320
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
327
"""Create the byte stream as a series of 'chunks'"""
310
328
self._create_z_content()
312
330
header = self.GCB_LZ_HEADER
314
332
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
319
343
return ''.join(chunks)
321
345
def _dump(self, include_text=False):
679
703
z_header_bytes = zlib.compress(header_bytes)
681
705
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
706
block_bytes_len, block_chunks = self._block.to_chunks()
683
707
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
709
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
688
714
return ''.join(lines)
691
717
def from_bytes(cls, bytes):
692
718
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
694
721
(storage_kind, z_header_len, header_len,
695
722
block_len, rest) = bytes.split('\n', 4)
855
882
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
884
self._block.set_chunked_content(self.chunks, self.endpoint)
866
885
self.chunks = None
867
886
self._delta_index = None
1630
1649
self._unadded_refs = {}
1631
1650
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1653
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
1635
1665
index, start, length = self._access.add_raw_records(
1636
1666
[(None, len(bytes))], bytes)[0]