135
135
self._content = ''.join(self._content_chunks)
136
136
self._content_chunks = None
137
137
if self._content is None:
138
if self._z_content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
139
142
raise AssertionError('No content to decompress')
140
if self._z_content == '':
143
z_content = ''.join(self._z_content_chunks)
141
145
self._content = ''
142
146
elif self._compressor_name == 'lzma':
143
147
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
148
self._content = pylzma.decompress(z_content)
145
149
elif self._compressor_name == 'zlib':
146
150
# Start a zlib decompressor
147
151
if num_bytes * 4 > self._content_length * 3:
148
152
# If we are requesting more that 3/4ths of the content,
149
153
# just extract the whole thing in a single pass
150
154
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
155
self._content = zlib.decompress(z_content)
153
157
self._z_content_decompressor = zlib.decompressobj()
154
158
# Seed the decompressor with the uncompressed bytes, so
155
159
# that the rest of the code is simplified
156
160
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
162
if not self._z_content_decompressor.unconsumed_tail:
159
163
self._z_content_decompressor = None
207
211
# XXX: Define some GCCorrupt error ?
208
212
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
213
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
if self._z_content_chunks is not None:
219
return ''.join(self._z_content_chunks)
213
223
def from_bytes(cls, bytes):
269
279
self._content_length = length
270
280
self._content_chunks = content_chunks
271
281
self._content = None
272
self._z_content = None
282
self._z_content_chunks = None
274
284
def set_content(self, content):
275
285
"""Set the content of this block."""
276
286
self._content_length = len(content)
277
287
self._content = content
278
self._z_content = None
288
self._z_content_chunks = None
280
290
def _create_z_content_using_lzma(self):
281
291
if self._content_chunks is not None:
283
293
self._content_chunks = None
284
294
if self._content is None:
285
295
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
296
z_content = pylzma.compress(self._content)
297
self._z_content_chunks = (z_content,)
298
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
300
def _create_z_content_from_chunks(self, chunks):
290
301
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
302
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
303
# (measured peak is maybe 30MB over the above...)
304
compressed_chunks = map(compressor.compress, chunks)
292
305
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
306
# Ignore empty chunks
307
self._z_content_chunks = [c for c in compressed_chunks if c]
308
self._z_content_length = sum(map(len, self._z_content_chunks))
296
310
def _create_z_content(self):
297
if self._z_content is not None:
311
if self._z_content_chunks is not None:
300
314
self._create_z_content_using_lzma()
302
316
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
317
chunks = self._content_chunks
319
chunks = (self._content,)
320
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
323
"""Create the byte stream as a series of 'chunks'"""
310
324
self._create_z_content()
312
326
header = self.GCB_LZ_HEADER
314
328
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
329
chunks = ['%s%d\n%d\n'
330
% (header, self._z_content_length, self._content_length),
332
chunks.extend(self._z_content_chunks)
333
total_len = sum(map(len, chunks))
334
return total_len, chunks
337
"""Encode the information into a byte stream."""
338
total_len, chunks = self.to_chunks()
319
339
return ''.join(chunks)
321
341
def _dump(self, include_text=False):
679
699
z_header_bytes = zlib.compress(header_bytes)
681
701
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
702
block_bytes_len, block_chunks = self._block.to_chunks()
683
703
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
705
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
706
lines.extend(block_chunks)
707
del z_header_bytes, block_chunks
708
# TODO: This is a point where we will double the memory consumption. To
709
# avoid this, we probably have to switch to a 'chunked' api
688
710
return ''.join(lines)
691
713
def from_bytes(cls, bytes):
692
714
# TODO: This does extra string copying, probably better to do it a
715
# different way. At a minimum this creates 2 copies of the
694
717
(storage_kind, z_header_len, header_len,
695
718
block_len, rest) = bytes.split('\n', 4)
855
878
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
880
self._block.set_chunked_content(self.chunks, self.endpoint)
866
881
self.chunks = None
867
882
self._delta_index = None
1630
1645
self._unadded_refs = {}
1631
1646
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1648
bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1649
self._compressor = GroupCompressor()
1650
# Note: At this point we still have 1 copy of the fulltext (in
1651
# record and the var 'bytes'), and this generates 2 copies of
1652
# the compressed text (one for bytes, one in chunks)
1653
# TODO: Push 'chunks' down into the _access api, so that we don't
1654
# have to double compressed memory here
1655
# TODO: Figure out how to indicate that we would be happy to free
1656
# the fulltext content at this point. Note that sometimes we
1657
# will want it later (streaming CHK pages), but most of the
1658
# time we won't (everything else)
1659
bytes = ''.join(chunks)
1635
1661
index, start, length = self._access.add_raw_records(
1636
1662
[(None, len(bytes))], bytes)[0]
1838
class _GCBuildDetails(object):
1839
"""A blob of data about the build details.
1841
This stores the minimal data, which then allows compatibility with the old
1842
api, without taking as much memory.
1845
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1846
'_delta_end', '_parents')
1849
compression_parent = None
1851
def __init__(self, parents, position_info):
1852
self._parents = parents
1853
(self._index, self._group_start, self._group_end, self._basis_end,
1854
self._delta_end) = position_info
1857
return '%s(%s, %s)' % (self.__class__.__name__,
1858
self.index_memo, self._parents)
1861
def index_memo(self):
1862
return (self._index, self._group_start, self._group_end,
1863
self._basis_end, self._delta_end)
1866
def record_details(self):
1867
return static_tuple.StaticTuple(self.method, None)
1869
def __getitem__(self, offset):
1870
"""Compatibility thunk to act like a tuple."""
1872
return self.index_memo
1874
return self.compression_parent # Always None
1876
return self._parents
1878
return self.record_details
1880
raise IndexError('offset out of range')
1812
1886
class _GCGraphIndex(object):
1813
1887
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
2033
2106
# each, or about 7MB. Note that it might be even more when you consider
2034
2107
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2108
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2109
# a LIFO when re-using PyInt slots, which might cause more
2037
2110
# fragmentation.
2038
2111
start = int(bits[0])
2039
2112
start = self._int_cache.setdefault(start, start)