23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
32
graph as _mod_graph,
40
from bzrlib.repofmt import pack_repo
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
42
48
AbsentContentFactory,
43
49
ChunkedContentFactory,
135
141
self._content = ''.join(self._content_chunks)
136
142
self._content_chunks = None
137
143
if self._content is None:
138
if self._z_content is None:
144
# We join self._z_content_chunks here, because if we are
145
# decompressing, then it is *very* likely that we have a single
147
if self._z_content_chunks is None:
139
148
raise AssertionError('No content to decompress')
140
if self._z_content == '':
149
z_content = ''.join(self._z_content_chunks)
141
151
self._content = ''
142
152
elif self._compressor_name == 'lzma':
143
153
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
154
self._content = pylzma.decompress(z_content)
145
155
elif self._compressor_name == 'zlib':
146
156
# Start a zlib decompressor
147
157
if num_bytes * 4 > self._content_length * 3:
148
158
# If we are requesting more that 3/4ths of the content,
149
159
# just extract the whole thing in a single pass
150
160
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
161
self._content = zlib.decompress(z_content)
153
163
self._z_content_decompressor = zlib.decompressobj()
154
164
# Seed the decompressor with the uncompressed bytes, so
155
165
# that the rest of the code is simplified
156
166
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
167
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
168
if not self._z_content_decompressor.unconsumed_tail:
159
169
self._z_content_decompressor = None
207
217
# XXX: Define some GCCorrupt error ?
208
218
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
219
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
220
self._z_content_chunks = (bytes[pos:],)
223
def _z_content(self):
224
"""Return z_content_chunks as a simple string.
226
Meant only to be used by the test suite.
228
if self._z_content_chunks is not None:
229
return ''.join(self._z_content_chunks)
213
233
def from_bytes(cls, bytes):
269
289
self._content_length = length
270
290
self._content_chunks = content_chunks
271
291
self._content = None
272
self._z_content = None
292
self._z_content_chunks = None
274
294
def set_content(self, content):
275
295
"""Set the content of this block."""
276
296
self._content_length = len(content)
277
297
self._content = content
278
self._z_content = None
298
self._z_content_chunks = None
280
300
def _create_z_content_using_lzma(self):
281
301
if self._content_chunks is not None:
283
303
self._content_chunks = None
284
304
if self._content is None:
285
305
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
306
z_content = pylzma.compress(self._content)
307
self._z_content_chunks = (z_content,)
308
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
310
def _create_z_content_from_chunks(self, chunks):
290
311
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
312
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
313
# (measured peak is maybe 30MB over the above...)
314
compressed_chunks = map(compressor.compress, chunks)
292
315
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
316
# Ignore empty chunks
317
self._z_content_chunks = [c for c in compressed_chunks if c]
318
self._z_content_length = sum(map(len, self._z_content_chunks))
296
320
def _create_z_content(self):
297
if self._z_content is not None:
321
if self._z_content_chunks is not None:
300
324
self._create_z_content_using_lzma()
302
326
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
327
chunks = self._content_chunks
329
chunks = (self._content,)
330
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
333
"""Create the byte stream as a series of 'chunks'"""
310
334
self._create_z_content()
312
336
header = self.GCB_LZ_HEADER
314
338
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
339
chunks = ['%s%d\n%d\n'
340
% (header, self._z_content_length, self._content_length),
342
chunks.extend(self._z_content_chunks)
343
total_len = sum(map(len, chunks))
344
return total_len, chunks
347
"""Encode the information into a byte stream."""
348
total_len, chunks = self.to_chunks()
319
349
return ''.join(chunks)
321
351
def _dump(self, include_text=False):
679
709
z_header_bytes = zlib.compress(header_bytes)
681
711
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
712
block_bytes_len, block_chunks = self._block.to_chunks()
683
713
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
715
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
716
lines.extend(block_chunks)
717
del z_header_bytes, block_chunks
718
# TODO: This is a point where we will double the memory consumption. To
719
# avoid this, we probably have to switch to a 'chunked' api
688
720
return ''.join(lines)
691
723
def from_bytes(cls, bytes):
692
724
# TODO: This does extra string copying, probably better to do it a
725
# different way. At a minimum this creates 2 copies of the
694
727
(storage_kind, z_header_len, header_len,
695
728
block_len, rest) = bytes.split('\n', 4)
855
888
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
890
self._block.set_chunked_content(self.chunks, self.endpoint)
866
891
self.chunks = None
867
892
self._delta_index = None
1027
1052
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1053
add_callback=graph_index.add_nodes,
1029
1054
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1055
access = pack_repo._DirectPackAccess({})
1031
1056
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1057
result = GroupCompressVersionedFiles(index, access, delta)
1033
1058
result.stream = stream
1293
1318
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
1320
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1321
for fallback in self._transitive_fallbacks():
1297
1322
if not missing_keys:
1299
1324
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1451
1476
the defined order, regardless of source.
1453
1478
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1479
present_keys = tsort.topo_sort(parent_map)
1456
1481
# ordering == 'groupcompress'
1457
1482
# XXX: This only optimizes for the target ordering. We may need
1630
1655
self._unadded_refs = {}
1631
1656
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1658
bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1659
self._compressor = GroupCompressor()
1660
# Note: At this point we still have 1 copy of the fulltext (in
1661
# record and the var 'bytes'), and this generates 2 copies of
1662
# the compressed text (one for bytes, one in chunks)
1663
# TODO: Push 'chunks' down into the _access api, so that we don't
1664
# have to double compressed memory here
1665
# TODO: Figure out how to indicate that we would be happy to free
1666
# the fulltext content at this point. Note that sometimes we
1667
# will want it later (streaming CHK pages), but most of the
1668
# time we won't (everything else)
1669
bytes = ''.join(chunks)
1635
1671
index, start, length = self._access.add_raw_records(
1636
1672
[(None, len(bytes))], bytes)[0]
1802
1838
"""See VersionedFiles.keys."""
1803
1839
if 'evil' in debug.debug_flags:
1804
1840
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1841
sources = [self._index] + self._immediate_fallback_vfs
1807
1843
for source in sources:
1808
1844
result.update(source.keys())
1848
class _GCBuildDetails(object):
1849
"""A blob of data about the build details.
1851
This stores the minimal data, which then allows compatibility with the old
1852
api, without taking as much memory.
1855
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1856
'_delta_end', '_parents')
1859
compression_parent = None
1861
def __init__(self, parents, position_info):
1862
self._parents = parents
1863
(self._index, self._group_start, self._group_end, self._basis_end,
1864
self._delta_end) = position_info
1867
return '%s(%s, %s)' % (self.__class__.__name__,
1868
self.index_memo, self._parents)
1871
def index_memo(self):
1872
return (self._index, self._group_start, self._group_end,
1873
self._basis_end, self._delta_end)
1876
def record_details(self):
1877
return static_tuple.StaticTuple(self.method, None)
1879
def __getitem__(self, offset):
1880
"""Compatibility thunk to act like a tuple."""
1882
return self.index_memo
1884
return self.compression_parent # Always None
1886
return self._parents
1888
return self.record_details
1890
raise IndexError('offset out of range')
1812
1896
class _GCGraphIndex(object):
1813
1897
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1843
1927
# repeated over and over, this creates a surplus of ints
1844
1928
self._int_cache = {}
1845
1929
if track_external_parent_refs:
1846
self._key_dependencies = knit._KeyRefs(
1930
self._key_dependencies = _KeyRefs(
1847
1931
track_new_keys=track_new_keys)
1849
1933
self._key_dependencies = None
2033
2116
# each, or about 7MB. Note that it might be even more when you consider
2034
2117
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2118
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2119
# a LIFO when re-using PyInt slots, which might cause more
2037
2120
# fragmentation.
2038
2121
start = int(bits[0])
2039
2122
start = self._int_cache.setdefault(start, start)