120
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
121
120
content. If None, consume everything
123
if self._content_length is None:
124
raise AssertionError('self._content_length should never be None')
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
125
129
if num_bytes is None:
126
130
num_bytes = self._content_length
127
131
elif (self._content_length is not None
135
139
self._content = ''.join(self._content_chunks)
136
140
self._content_chunks = None
137
141
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
142
if self._z_content is None:
142
143
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
144
if self._z_content == '':
145
145
self._content = ''
146
146
elif self._compressor_name == 'lzma':
147
147
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
148
self._content = pylzma.decompress(self._z_content)
149
149
elif self._compressor_name == 'zlib':
150
150
# Start a zlib decompressor
151
if num_bytes * 4 > self._content_length * 3:
152
# If we are requesting more that 3/4ths of the content,
153
# just extract the whole thing in a single pass
154
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
151
if num_bytes is None:
152
self._content = zlib.decompress(self._z_content)
157
154
self._z_content_decompressor = zlib.decompressobj()
158
155
# Seed the decompressor with the uncompressed bytes, so
159
156
# that the rest of the code is simplified
160
157
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
if not self._z_content_decompressor.unconsumed_tail:
163
self._z_content_decompressor = None
158
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
165
160
raise AssertionError('Unknown compressor: %r'
166
161
% self._compressor_name)
168
163
# 'unconsumed_tail'
170
165
# Do we have enough bytes already?
171
if len(self._content) >= num_bytes:
166
if num_bytes is not None and len(self._content) >= num_bytes:
168
if num_bytes is None and self._z_content_decompressor is None:
169
# We must have already decompressed everything
173
171
# If we got this far, and don't have a decompressor, something is wrong
174
172
if self._z_content_decompressor is None:
175
173
raise AssertionError(
176
174
'No decompressor to decompress %d bytes' % num_bytes)
177
175
remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
if not remaining_decomp:
179
raise AssertionError('Nothing left to decompress')
180
needed_bytes = num_bytes - len(self._content)
181
# We always set max_size to 32kB over the minimum needed, so that
182
# zlib will give us as much as we really want.
183
# TODO: If this isn't good enough, we could make a loop here,
184
# that keeps expanding the request until we get enough
185
self._content += self._z_content_decompressor.decompress(
186
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
if len(self._content) < num_bytes:
188
raise AssertionError('%d bytes wanted, only %d available'
189
% (num_bytes, len(self._content)))
190
if not self._z_content_decompressor.unconsumed_tail:
191
# The stream is finished
192
self._z_content_decompressor = None
176
if num_bytes is None:
178
# We don't know how much is left, but we'll decompress it all
179
self._content += self._z_content_decompressor.decompress(
181
# Note: There's what I consider a bug in zlib.decompressobj
182
# If you pass back in the entire unconsumed_tail, only
183
# this time you don't pass a max-size, it doesn't
184
# change the unconsumed_tail back to None/''.
185
# However, we know we are done with the whole stream
186
self._z_content_decompressor = None
187
# XXX: Why is this the only place in this routine we set this?
188
self._content_length = len(self._content)
190
if not remaining_decomp:
191
raise AssertionError('Nothing left to decompress')
192
needed_bytes = num_bytes - len(self._content)
193
# We always set max_size to 32kB over the minimum needed, so that
194
# zlib will give us as much as we really want.
195
# TODO: If this isn't good enough, we could make a loop here,
196
# that keeps expanding the request until we get enough
197
self._content += self._z_content_decompressor.decompress(
198
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
if len(self._content) < num_bytes:
200
raise AssertionError('%d bytes wanted, only %d available'
201
% (num_bytes, len(self._content)))
202
if not self._z_content_decompressor.unconsumed_tail:
203
# The stream is finished
204
self._z_content_decompressor = None
194
206
def _parse_bytes(self, bytes, pos):
195
207
"""Read the various lengths from the header.
211
223
# XXX: Define some GCCorrupt error ?
212
224
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
225
(len(bytes), pos, self._z_content_length))
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
"""Return z_content_chunks as a simple string.
220
Meant only to be used by the test suite.
222
if self._z_content_chunks is not None:
223
return ''.join(self._z_content_chunks)
226
self._z_content = bytes[pos:]
227
229
def from_bytes(cls, bytes):
283
285
self._content_length = length
284
286
self._content_chunks = content_chunks
285
287
self._content = None
286
self._z_content_chunks = None
288
self._z_content = None
288
290
def set_content(self, content):
289
291
"""Set the content of this block."""
290
292
self._content_length = len(content)
291
293
self._content = content
292
self._z_content_chunks = None
294
self._z_content = None
294
296
def _create_z_content_using_lzma(self):
295
297
if self._content_chunks is not None:
297
299
self._content_chunks = None
298
300
if self._content is None:
299
301
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
302
self._z_content = pylzma.compress(self._content)
303
self._z_content_length = len(self._z_content)
304
def _create_z_content_from_chunks(self, chunks):
305
def _create_z_content_from_chunks(self):
305
306
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
307
compressed_chunks = map(compressor.compress, self._content_chunks)
309
308
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
309
self._z_content = ''.join(compressed_chunks)
310
self._z_content_length = len(self._z_content)
314
312
def _create_z_content(self):
315
if self._z_content_chunks is not None:
313
if self._z_content is not None:
318
316
self._create_z_content_using_lzma()
320
318
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
319
self._create_z_content_from_chunks()
321
self._z_content = zlib.compress(self._content)
322
self._z_content_length = len(self._z_content)
327
"""Create the byte stream as a series of 'chunks'"""
325
"""Encode the information into a byte stream."""
328
326
self._create_z_content()
330
328
header = self.GCB_LZ_HEADER
332
330
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
332
'%d\n%d\n' % (self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
343
335
return ''.join(chunks)
345
337
def _dump(self, include_text=False):
703
695
z_header_bytes = zlib.compress(header_bytes)
705
697
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
698
block_bytes = self._block.to_bytes()
707
699
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
701
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
702
lines.append(block_bytes)
703
del z_header_bytes, block_bytes
714
704
return ''.join(lines)
717
707
def from_bytes(cls, bytes):
718
708
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
710
(storage_kind, z_header_len, header_len,
722
711
block_len, rest) = bytes.split('\n', 4)
882
871
After calling this, the compressor should no longer be used
873
# TODO: this causes us to 'bloat' to 2x the size of content in the
874
# group. This has an impact for 'commit' of large objects.
875
# One possibility is to use self._content_chunks, and be lazy and
876
# only fill out self._content as a full string when we actually
877
# need it. That would at least drop the peak memory consumption
878
# for 'commit' down to ~1x the size of the largest file, at a
879
# cost of increased complexity within this code. 2x is still <<
880
# 3x the size of the largest file, so we are doing ok.
884
881
self._block.set_chunked_content(self.chunks, self.endpoint)
885
882
self.chunks = None
886
883
self._delta_index = None
1286
1283
return self.get_record_stream(keys, 'unordered', True)
1288
def clear_cache(self):
1289
"""See VersionedFiles.clear_cache()"""
1290
self._group_cache.clear()
1291
self._index._graph_index.clear_cache()
1292
self._index._int_cache.clear()
1294
1285
def _check_add(self, key, lines, random_id, check_content):
1295
1286
"""check that version_id and lines are safe to add."""
1296
1287
version_id = key[-1]
1649
1640
self._unadded_refs = {}
1650
1641
keys_to_add = []
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
1643
bytes = self._compressor.flush().to_bytes()
1665
1644
index, start, length = self._access.add_raw_records(
1666
1645
[(None, len(bytes))], bytes)[0]
1777
1757
key = record.key
1778
1758
self._unadded_refs[key] = record.parents
1779
1759
yield found_sha1
1780
as_st = static_tuple.StaticTuple.from_sequence
1781
if record.parents is not None:
1782
parents = as_st([as_st(p) for p in record.parents])
1785
refs = static_tuple.StaticTuple(parents)
1786
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1760
keys_to_add.append((key, '%d %d' % (start_point, end_point),
1787
1762
if len(keys_to_add):
1789
1764
self._compressor = None
1842
class _GCBuildDetails(object):
1843
"""A blob of data about the build details.
1845
This stores the minimal data, which then allows compatibility with the old
1846
api, without taking as much memory.
1849
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
'_delta_end', '_parents')
1853
compression_parent = None
1855
def __init__(self, parents, position_info):
1856
self._parents = parents
1857
(self._index, self._group_start, self._group_end, self._basis_end,
1858
self._delta_end) = position_info
1861
return '%s(%s, %s)' % (self.__class__.__name__,
1862
self.index_memo, self._parents)
1865
def index_memo(self):
1866
return (self._index, self._group_start, self._group_end,
1867
self._basis_end, self._delta_end)
1870
def record_details(self):
1871
return static_tuple.StaticTuple(self.method, None)
1873
def __getitem__(self, offset):
1874
"""Compatibility thunk to act like a tuple."""
1876
return self.index_memo
1878
return self.compression_parent # Always None
1880
return self._parents
1882
return self.record_details
1884
raise IndexError('offset out of range')
1890
1817
class _GCGraphIndex(object):
1891
1818
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1961
1885
if not random_id:
1962
1886
present_nodes = self._get_entries(keys)
1963
1887
for (index, key, value, node_refs) in present_nodes:
1964
# Sometimes these are passed as a list rather than a tuple
1965
node_refs = static_tuple.as_tuples(node_refs)
1966
passed = static_tuple.as_tuples(keys[key])
1967
if node_refs != passed[1]:
1968
details = '%s %s %s' % (key, (value, node_refs), passed)
1888
if node_refs != keys[key][1]:
1889
details = '%s %s %s' % (key, (value, node_refs), keys[key])
1969
1890
if self._inconsistency_fatal:
1970
1891
raise errors.KnitCorrupt(self, "inconsistent details"
1971
1892
" in add_records: %s" %
2103
2025
"""Convert an index value to position details."""
2104
2026
bits = node[2].split(' ')
2105
2027
# It would be nice not to read the entire gzip.
2106
# start and stop are put into _int_cache because they are very common.
2107
# They define the 'group' that an entry is in, and many groups can have
2108
# thousands of objects.
2109
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2110
# each, or about 7MB. Note that it might be even more when you consider
2111
# how PyInt is allocated in separate slabs. And you can't return a slab
2112
# to the OS if even 1 int on it is in use. Note though that Python uses
2113
# a LIFO when re-using PyInt slots, which might cause more
2115
2028
start = int(bits[0])
2116
start = self._int_cache.setdefault(start, start)
2117
2029
stop = int(bits[1])
2118
stop = self._int_cache.setdefault(stop, stop)
2119
2030
basis_end = int(bits[2])
2120
2031
delta_end = int(bits[3])
2121
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2123
return (node[0], start, stop, basis_end, delta_end)
2032
return node[0], start, stop, basis_end, delta_end
2125
2034
def scan_unvalidated_index(self, graph_index):
2126
2035
"""Inform this _GCGraphIndex that there is an unvalidated index.