23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
32
graph as _mod_graph,
40
from bzrlib.repofmt import pack_repo
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
42
48
AbsentContentFactory,
43
49
ChunkedContentFactory,
135
141
self._content = ''.join(self._content_chunks)
136
142
self._content_chunks = None
137
143
if self._content is None:
138
if self._z_content is None:
144
# We join self._z_content_chunks here, because if we are
145
# decompressing, then it is *very* likely that we have a single
147
if self._z_content_chunks is None:
139
148
raise AssertionError('No content to decompress')
140
if self._z_content == '':
149
z_content = ''.join(self._z_content_chunks)
141
151
self._content = ''
142
152
elif self._compressor_name == 'lzma':
143
153
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
154
self._content = pylzma.decompress(z_content)
145
155
elif self._compressor_name == 'zlib':
146
156
# Start a zlib decompressor
147
157
if num_bytes * 4 > self._content_length * 3:
148
158
# If we are requesting more that 3/4ths of the content,
149
159
# just extract the whole thing in a single pass
150
160
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
161
self._content = zlib.decompress(z_content)
153
163
self._z_content_decompressor = zlib.decompressobj()
154
164
# Seed the decompressor with the uncompressed bytes, so
155
165
# that the rest of the code is simplified
156
166
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
167
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
168
if not self._z_content_decompressor.unconsumed_tail:
159
169
self._z_content_decompressor = None
207
217
# XXX: Define some GCCorrupt error ?
208
218
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
219
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
220
self._z_content_chunks = (bytes[pos:],)
223
def _z_content(self):
224
"""Return z_content_chunks as a simple string.
226
Meant only to be used by the test suite.
228
if self._z_content_chunks is not None:
229
return ''.join(self._z_content_chunks)
213
233
def from_bytes(cls, bytes):
269
289
self._content_length = length
270
290
self._content_chunks = content_chunks
271
291
self._content = None
272
self._z_content = None
292
self._z_content_chunks = None
274
294
def set_content(self, content):
275
295
"""Set the content of this block."""
276
296
self._content_length = len(content)
277
297
self._content = content
278
self._z_content = None
298
self._z_content_chunks = None
280
300
def _create_z_content_using_lzma(self):
281
301
if self._content_chunks is not None:
283
303
self._content_chunks = None
284
304
if self._content is None:
285
305
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
306
z_content = pylzma.compress(self._content)
307
self._z_content_chunks = (z_content,)
308
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
310
def _create_z_content_from_chunks(self, chunks):
290
311
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
312
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
313
# (measured peak is maybe 30MB over the above...)
314
compressed_chunks = map(compressor.compress, chunks)
292
315
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
316
# Ignore empty chunks
317
self._z_content_chunks = [c for c in compressed_chunks if c]
318
self._z_content_length = sum(map(len, self._z_content_chunks))
296
320
def _create_z_content(self):
297
if self._z_content is not None:
321
if self._z_content_chunks is not None:
300
324
self._create_z_content_using_lzma()
302
326
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
327
chunks = self._content_chunks
329
chunks = (self._content,)
330
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
333
"""Create the byte stream as a series of 'chunks'"""
310
334
self._create_z_content()
312
336
header = self.GCB_LZ_HEADER
314
338
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
339
chunks = ['%s%d\n%d\n'
340
% (header, self._z_content_length, self._content_length),
342
chunks.extend(self._z_content_chunks)
343
total_len = sum(map(len, chunks))
344
return total_len, chunks
347
"""Encode the information into a byte stream."""
348
total_len, chunks = self.to_chunks()
319
349
return ''.join(chunks)
321
351
def _dump(self, include_text=False):
679
709
z_header_bytes = zlib.compress(header_bytes)
681
711
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
712
block_bytes_len, block_chunks = self._block.to_chunks()
683
713
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
715
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
716
lines.extend(block_chunks)
717
del z_header_bytes, block_chunks
718
# TODO: This is a point where we will double the memory consumption. To
719
# avoid this, we probably have to switch to a 'chunked' api
688
720
return ''.join(lines)
691
723
def from_bytes(cls, bytes):
692
724
# TODO: This does extra string copying, probably better to do it a
725
# different way. At a minimum this creates 2 copies of the
694
727
(storage_kind, z_header_len, header_len,
695
728
block_len, rest) = bytes.split('\n', 4)
855
888
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
890
self._block.set_chunked_content(self.chunks, self.endpoint)
866
891
self.chunks = None
867
892
self._delta_index = None
1027
1052
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1053
add_callback=graph_index.add_nodes,
1029
1054
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1055
access = pack_repo._DirectPackAccess({})
1031
1056
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1057
result = GroupCompressVersionedFiles(index, access, delta)
1033
1058
result.stream = stream
1152
1177
class GroupCompressVersionedFiles(VersionedFiles):
1153
1178
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1180
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1182
"""Create a GroupCompressVersionedFiles object.
1158
1184
:param index: The index object storing access and graph data.
1159
1185
:param access: The access object storing raw data.
1160
1186
:param delta: Whether to delta compress or just entropy compress.
1161
1187
:param _unadded_refs: private parameter, don't use.
1188
:param _group_cache: private parameter, don't use.
1163
1190
self._index = index
1164
1191
self._access = access
1166
1193
if _unadded_refs is None:
1167
1194
_unadded_refs = {}
1168
1195
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1196
if _group_cache is None:
1197
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1198
self._group_cache = _group_cache
1199
self._immediate_fallback_vfs = []
1172
1201
def without_fallbacks(self):
1173
1202
"""Return a clone of this object without any fallbacks configured."""
1174
1203
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1204
self._delta, _unadded_refs=dict(self._unadded_refs),
1205
_group_cache=self._group_cache)
1177
1207
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1208
left_matching_blocks=None, nostore_sha=None, random_id=False,
1293
1323
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
1325
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1326
for fallback in self._transitive_fallbacks():
1297
1327
if not missing_keys:
1299
1329
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1451
1481
the defined order, regardless of source.
1453
1483
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1484
present_keys = tsort.topo_sort(parent_map)
1456
1486
# ordering == 'groupcompress'
1457
1487
# XXX: This only optimizes for the target ordering. We may need
1630
1660
self._unadded_refs = {}
1631
1661
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1663
bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1664
self._compressor = GroupCompressor()
1665
# Note: At this point we still have 1 copy of the fulltext (in
1666
# record and the var 'bytes'), and this generates 2 copies of
1667
# the compressed text (one for bytes, one in chunks)
1668
# TODO: Push 'chunks' down into the _access api, so that we don't
1669
# have to double compressed memory here
1670
# TODO: Figure out how to indicate that we would be happy to free
1671
# the fulltext content at this point. Note that sometimes we
1672
# will want it later (streaming CHK pages), but most of the
1673
# time we won't (everything else)
1674
bytes = ''.join(chunks)
1635
1676
index, start, length = self._access.add_raw_records(
1636
1677
[(None, len(bytes))], bytes)[0]
1802
1843
"""See VersionedFiles.keys."""
1803
1844
if 'evil' in debug.debug_flags:
1804
1845
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1846
sources = [self._index] + self._immediate_fallback_vfs
1807
1848
for source in sources:
1808
1849
result.update(source.keys())
1891
1932
# repeated over and over, this creates a surplus of ints
1892
1933
self._int_cache = {}
1893
1934
if track_external_parent_refs:
1894
self._key_dependencies = knit._KeyRefs(
1935
self._key_dependencies = _KeyRefs(
1895
1936
track_new_keys=track_new_keys)
1897
1938
self._key_dependencies = None