23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
32
30
graph as _mod_graph,
40
from bzrlib.repofmt import pack_repo
43
37
from bzrlib.btree_index import BTreeBuilder
44
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
45
40
from bzrlib.versionedfile import (
48
42
AbsentContentFactory,
49
43
ChunkedContentFactory,
141
135
self._content = ''.join(self._content_chunks)
142
136
self._content_chunks = None
143
137
if self._content is None:
144
# We join self._z_content_chunks here, because if we are
145
# decompressing, then it is *very* likely that we have a single
147
if self._z_content_chunks is None:
138
if self._z_content is None:
148
139
raise AssertionError('No content to decompress')
149
z_content = ''.join(self._z_content_chunks)
140
if self._z_content == '':
151
141
self._content = ''
152
142
elif self._compressor_name == 'lzma':
153
143
# We don't do partial lzma decomp yet
154
self._content = pylzma.decompress(z_content)
144
self._content = pylzma.decompress(self._z_content)
155
145
elif self._compressor_name == 'zlib':
156
146
# Start a zlib decompressor
157
147
if num_bytes * 4 > self._content_length * 3:
158
148
# If we are requesting more that 3/4ths of the content,
159
149
# just extract the whole thing in a single pass
160
150
num_bytes = self._content_length
161
self._content = zlib.decompress(z_content)
151
self._content = zlib.decompress(self._z_content)
163
153
self._z_content_decompressor = zlib.decompressobj()
164
154
# Seed the decompressor with the uncompressed bytes, so
165
155
# that the rest of the code is simplified
166
156
self._content = self._z_content_decompressor.decompress(
167
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
158
if not self._z_content_decompressor.unconsumed_tail:
169
159
self._z_content_decompressor = None
217
207
# XXX: Define some GCCorrupt error ?
218
208
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
219
209
(len(bytes), pos, self._z_content_length))
220
self._z_content_chunks = (bytes[pos:],)
223
def _z_content(self):
224
"""Return z_content_chunks as a simple string.
226
Meant only to be used by the test suite.
228
if self._z_content_chunks is not None:
229
return ''.join(self._z_content_chunks)
210
self._z_content = bytes[pos:]
233
213
def from_bytes(cls, bytes):
289
269
self._content_length = length
290
270
self._content_chunks = content_chunks
291
271
self._content = None
292
self._z_content_chunks = None
272
self._z_content = None
294
274
def set_content(self, content):
295
275
"""Set the content of this block."""
296
276
self._content_length = len(content)
297
277
self._content = content
298
self._z_content_chunks = None
278
self._z_content = None
300
280
def _create_z_content_using_lzma(self):
301
281
if self._content_chunks is not None:
303
283
self._content_chunks = None
304
284
if self._content is None:
305
285
raise AssertionError('Nothing to compress')
306
z_content = pylzma.compress(self._content)
307
self._z_content_chunks = (z_content,)
308
self._z_content_length = len(z_content)
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
310
def _create_z_content_from_chunks(self, chunks):
289
def _create_z_content_from_chunks(self):
311
290
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
312
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
313
# (measured peak is maybe 30MB over the above...)
314
compressed_chunks = map(compressor.compress, chunks)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
315
292
compressed_chunks.append(compressor.flush())
316
# Ignore empty chunks
317
self._z_content_chunks = [c for c in compressed_chunks if c]
318
self._z_content_length = sum(map(len, self._z_content_chunks))
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
320
296
def _create_z_content(self):
321
if self._z_content_chunks is not None:
297
if self._z_content is not None:
324
300
self._create_z_content_using_lzma()
326
302
if self._content_chunks is not None:
327
chunks = self._content_chunks
329
chunks = (self._content,)
330
self._create_z_content_from_chunks(chunks)
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
333
"""Create the byte stream as a series of 'chunks'"""
309
"""Encode the information into a byte stream."""
334
310
self._create_z_content()
336
312
header = self.GCB_LZ_HEADER
338
314
header = self.GCB_HEADER
339
chunks = ['%s%d\n%d\n'
340
% (header, self._z_content_length, self._content_length),
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
342
chunks.extend(self._z_content_chunks)
343
total_len = sum(map(len, chunks))
344
return total_len, chunks
347
"""Encode the information into a byte stream."""
348
total_len, chunks = self.to_chunks()
349
319
return ''.join(chunks)
351
321
def _dump(self, include_text=False):
709
679
z_header_bytes = zlib.compress(header_bytes)
711
681
z_header_bytes_len = len(z_header_bytes)
712
block_bytes_len, block_chunks = self._block.to_chunks()
682
block_bytes = self._block.to_bytes()
713
683
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
715
685
lines.append(z_header_bytes)
716
lines.extend(block_chunks)
717
del z_header_bytes, block_chunks
718
# TODO: This is a point where we will double the memory consumption. To
719
# avoid this, we probably have to switch to a 'chunked' api
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
720
688
return ''.join(lines)
723
691
def from_bytes(cls, bytes):
724
692
# TODO: This does extra string copying, probably better to do it a
725
# different way. At a minimum this creates 2 copies of the
727
694
(storage_kind, z_header_len, header_len,
728
695
block_len, rest) = bytes.split('\n', 4)
888
855
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
890
865
self._block.set_chunked_content(self.chunks, self.endpoint)
891
866
self.chunks = None
892
867
self._delta_index = None
1052
1027
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1053
1028
add_callback=graph_index.add_nodes,
1054
1029
inconsistency_fatal=inconsistency_fatal)
1055
access = pack_repo._DirectPackAccess({})
1030
access = knit._DirectPackAccess({})
1056
1031
access.set_writer(writer, graph_index, (transport, 'newpack'))
1057
1032
result = GroupCompressVersionedFiles(index, access, delta)
1058
1033
result.stream = stream
1318
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1320
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1321
for fallback in self._transitive_fallbacks():
1296
for fallback in self._fallback_vfs:
1322
1297
if not missing_keys:
1324
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1476
1451
the defined order, regardless of source.
1478
1453
if ordering == 'topological':
1479
present_keys = tsort.topo_sort(parent_map)
1454
present_keys = topo_sort(parent_map)
1481
1456
# ordering == 'groupcompress'
1482
1457
# XXX: This only optimizes for the target ordering. We may need
1655
1630
self._unadded_refs = {}
1656
1631
keys_to_add = []
1658
bytes_len, chunks = self._compressor.flush().to_chunks()
1633
bytes = self._compressor.flush().to_bytes()
1659
1634
self._compressor = GroupCompressor()
1660
# Note: At this point we still have 1 copy of the fulltext (in
1661
# record and the var 'bytes'), and this generates 2 copies of
1662
# the compressed text (one for bytes, one in chunks)
1663
# TODO: Push 'chunks' down into the _access api, so that we don't
1664
# have to double compressed memory here
1665
# TODO: Figure out how to indicate that we would be happy to free
1666
# the fulltext content at this point. Note that sometimes we
1667
# will want it later (streaming CHK pages), but most of the
1668
# time we won't (everything else)
1669
bytes = ''.join(chunks)
1671
1635
index, start, length = self._access.add_raw_records(
1672
1636
[(None, len(bytes))], bytes)[0]
1838
1802
"""See VersionedFiles.keys."""
1839
1803
if 'evil' in debug.debug_flags:
1840
1804
trace.mutter_callsite(2, "keys scales with size of history")
1841
sources = [self._index] + self._immediate_fallback_vfs
1805
sources = [self._index] + self._fallback_vfs
1843
1807
for source in sources:
1844
1808
result.update(source.keys())
1927
1891
# repeated over and over, this creates a surplus of ints
1928
1892
self._int_cache = {}
1929
1893
if track_external_parent_refs:
1930
self._key_dependencies = _KeyRefs(
1894
self._key_dependencies = knit._KeyRefs(
1931
1895
track_new_keys=track_new_keys)
1933
1897
self._key_dependencies = None