17
17
"""Core compression logic for compressing streams of related files."""
19
from __future__ import absolute_import
24
from bzrlib.lazy_import import lazy_import
25
lazy_import(globals(), """
26
26
from bzrlib import (
31
30
graph as _mod_graph,
39
from bzrlib.repofmt import pack_repo
40
from bzrlib.i18n import gettext
43
37
from bzrlib.btree_index import BTreeBuilder
44
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
45
40
from bzrlib.versionedfile import (
48
42
AbsentContentFactory,
49
43
ChunkedContentFactory,
50
44
FulltextContentFactory,
51
VersionedFilesWithFallbacks,
54
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
55
49
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
58
54
# osutils.sha_string('')
59
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
149
145
self._content = ''
150
146
elif self._compressor_name == 'lzma':
151
147
# We don't do partial lzma decomp yet
153
148
self._content = pylzma.decompress(z_content)
154
149
elif self._compressor_name == 'zlib':
155
150
# Start a zlib decompressor
296
291
self._content = content
297
292
self._z_content_chunks = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
299
304
def _create_z_content_from_chunks(self, chunks):
300
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
301
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
309
314
def _create_z_content(self):
310
315
if self._z_content_chunks is not None:
318
self._create_z_content_using_lzma()
312
320
if self._content_chunks is not None:
313
321
chunks = self._content_chunks
318
326
def to_chunks(self):
319
327
"""Create the byte stream as a series of 'chunks'"""
320
328
self._create_z_content()
321
header = self.GCB_HEADER
330
header = self.GCB_LZ_HEADER
332
header = self.GCB_HEADER
322
333
chunks = ['%s%d\n%d\n'
323
334
% (header, self._z_content_length, self._content_length),
448
459
# Grab and cache the raw bytes for this entry
449
460
# and break the ref-cycle with _manager since we don't need it
452
self._manager._prepare_for_extract()
453
except zlib.error as value:
454
raise errors.DecompressCorruption("zlib: " + str(value))
462
self._manager._prepare_for_extract()
455
463
block = self._manager._block
456
464
self._bytes = block.extract(self.key, self._start, self._end)
457
465
# There are code paths that first extract as fulltext, and then
476
484
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
477
485
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
479
def __init__(self, block, get_compressor_settings=None):
487
def __init__(self, block):
480
488
self._block = block
481
489
# We need to preserve the ordering
482
490
self._factories = []
483
491
self._last_byte = 0
484
self._get_settings = get_compressor_settings
485
self._compressor_settings = None
487
def _get_compressor_settings(self):
488
if self._compressor_settings is not None:
489
return self._compressor_settings
491
if self._get_settings is not None:
492
settings = self._get_settings()
494
vf = GroupCompressVersionedFiles
495
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
496
self._compressor_settings = settings
497
return self._compressor_settings
499
493
def add_factory(self, key, parents, start, end):
500
494
if not self._factories:
533
527
new_block.set_content(self._block._content[:last_byte])
534
528
self._block = new_block
536
def _make_group_compressor(self):
537
return GroupCompressor(self._get_compressor_settings())
539
530
def _rebuild_block(self):
540
531
"""Create a new GroupCompressBlock with only the referenced texts."""
541
compressor = self._make_group_compressor()
532
compressor = GroupCompressor()
542
533
tstart = time.time()
543
534
old_length = self._block._content_length
556
547
# block? It seems hard to come up with a method that it would
557
548
# expand, since we do full compression again. Perhaps based on a
558
549
# request that ends up poorly ordered?
559
# TODO: If the content would have expanded, then we would want to
560
# handle a case where we need to split the block.
561
# Now that we have a user-tweakable option
562
# (max_bytes_to_index), it is possible that one person set it
563
# to a very low value, causing poor compression.
564
550
delta = time.time() - tstart
565
551
self._block = new_block
566
552
trace.mutter('creating new compressed block on-the-fly in %.3fs'
798
784
self.labels_deltas = {}
799
785
self._delta_index = None # Set by the children
800
786
self._block = GroupCompressBlock()
804
self._settings = settings
806
788
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
807
789
"""Compress lines with label key.
923
905
class PythonGroupCompressor(_CommonGroupCompressor):
925
def __init__(self, settings=None):
926
908
"""Create a GroupCompressor.
928
910
Used only if the pyrex version is not available.
930
super(PythonGroupCompressor, self).__init__(settings)
912
super(PythonGroupCompressor, self).__init__()
931
913
self._delta_index = LinesDeltaIndex([])
932
914
# The actual content is managed by LinesDeltaIndex
933
915
self.chunks = self._delta_index.lines
971
953
It contains code very similar to SequenceMatcher because of having a similar
972
954
task. However some key differences apply:
974
* there is no junk, we want a minimal edit not a human readable diff.
975
* we don't filter very common lines (because we don't know where a good
976
range will start, and after the first text we want to be emitting minmal
978
* we chain the left side, not the right side
979
* we incrementally update the adjacency matrix as new lines are provided.
980
* we look for matches in all of the left side, so the routine which does
981
the analagous task of find_longest_match does not need to filter on the
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
985
def __init__(self, settings=None):
986
super(PyrexGroupCompressor, self).__init__(settings)
987
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
988
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
990
970
def _compress(self, key, bytes, max_delta_size, soft=False):
991
971
"""see _CommonGroupCompressor._compress"""
1066
1046
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1067
1047
add_callback=graph_index.add_nodes,
1068
1048
inconsistency_fatal=inconsistency_fatal)
1069
access = pack_repo._DirectPackAccess({})
1049
access = knit._DirectPackAccess({})
1070
1050
access.set_writer(writer, graph_index, (transport, 'newpack'))
1071
1051
result = GroupCompressVersionedFiles(index, access, delta)
1072
1052
result.stream = stream
1083
1063
class _BatchingBlockFetcher(object):
1084
1064
"""Fetch group compress blocks in batches.
1086
1066
:ivar total_bytes: int of expected number of bytes needed to fetch the
1087
1067
currently pending batch.
1090
def __init__(self, gcvf, locations, get_compressor_settings=None):
1070
def __init__(self, gcvf, locations):
1091
1071
self.gcvf = gcvf
1092
1072
self.locations = locations
1142
1121
def yield_factories(self, full_flush=False):
1143
1122
"""Yield factories for keys added since the last yield. They will be
1144
1123
returned in the order they were added via add_key.
1146
1125
:param full_flush: by default, some results may not be returned in case
1147
1126
they can be part of the next batch. If full_flush is True, then
1148
1127
all results are returned.
1176
1155
memos_to_get_stack.pop()
1178
1157
block = self.batch_memos[read_memo]
1179
self.manager = _LazyGroupContentManager(block,
1180
get_compressor_settings=self._get_compressor_settings)
1158
self.manager = _LazyGroupContentManager(block)
1181
1159
self.last_read_memo = read_memo
1182
1160
start, end = index_memo[3:5]
1183
1161
self.manager.add_factory(key, parents, start, end)
1190
1168
self.total_bytes = 0
1193
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1171
class GroupCompressVersionedFiles(VersionedFiles):
1194
1172
"""A group-compress based VersionedFiles implementation."""
1196
# This controls how the GroupCompress DeltaIndex works. Basically, we
1197
# compute hash pointers into the source blocks (so hash(text) => text).
1198
# However each of these references costs some memory in trade against a
1199
# more accurate match result. For very large files, they either are
1200
# pre-compressed and change in bulk whenever they change, or change in just
1201
# local blocks. Either way, 'improved resolution' is not very helpful,
1202
# versus running out of memory trying to track everything. The default max
1203
# gives 100% sampling of a 1MB file.
1204
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1205
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1206
_DEFAULT_MAX_BYTES_TO_INDEX}
1208
def __init__(self, index, access, delta=True, _unadded_refs=None,
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1210
1175
"""Create a GroupCompressVersionedFiles object.
1212
1177
:param index: The index object storing access and graph data.
1213
1178
:param access: The access object storing raw data.
1214
1179
:param delta: Whether to delta compress or just entropy compress.
1215
1180
:param _unadded_refs: private parameter, don't use.
1216
:param _group_cache: private parameter, don't use.
1218
1182
self._index = index
1219
1183
self._access = access
1221
1185
if _unadded_refs is None:
1222
1186
_unadded_refs = {}
1223
1187
self._unadded_refs = _unadded_refs
1224
if _group_cache is None:
1225
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1226
self._group_cache = _group_cache
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1227
1189
self._immediate_fallback_vfs = []
1228
self._max_bytes_to_index = None
1230
1191
def without_fallbacks(self):
1231
1192
"""Return a clone of this object without any fallbacks configured."""
1232
1193
return GroupCompressVersionedFiles(self._index, self._access,
1233
self._delta, _unadded_refs=dict(self._unadded_refs),
1234
_group_cache=self._group_cache)
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1236
1196
def add_lines(self, key, parents, lines, parent_texts=None,
1237
1197
left_matching_blocks=None, nostore_sha=None, random_id=False,
1241
1201
:param key: The key tuple of the text to add.
1242
1202
:param parents: The parents key tuples of the text to add.
1243
1203
:param lines: A list of lines. Each line must be a bytestring. And all
1244
of them except the last must be terminated with \\n and contain no
1245
other \\n's. The last line may either contain no \\n's or a single
1246
terminating \\n. If the lines list does meet this constraint the
1247
add routine may error or may succeed - but you will be unable to
1248
read the data back accurately. (Checking the lines have been split
1204
of them except the last must be terminated with \n and contain no
1205
other \n's. The last line may either contain no \n's or a single
1206
terminating \n. If the lines list does meet this constraint the add
1207
routine may error or may succeed - but you will be unable to read
1208
the data back accurately. (Checking the lines have been split
1249
1209
correctly is expensive and extremely unlikely to catch bugs so it
1250
1210
is not done at runtime unless check_content is True.)
1251
1211
:param parent_texts: An optional dictionary containing the opaque
1346
1306
self._check_lines_not_unicode(lines)
1347
1307
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._transitive_fallbacks():
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1349
1325
def get_parent_map(self, keys):
1350
1326
"""Get a map of the graph parents of keys.
1490
1466
The returned objects should be in the order defined by 'ordering',
1491
1467
which can weave between different sources.
1493
1468
:param ordering: Must be one of 'topological' or 'groupcompress'
1494
1469
:return: List of [(source, [keys])] tuples, such that all keys are in
1495
1470
the defined order, regardless of source.
1497
1472
if ordering == 'topological':
1498
present_keys = tsort.topo_sort(parent_map)
1473
present_keys = topo_sort(parent_map)
1500
1475
# ordering == 'groupcompress'
1501
1476
# XXX: This only optimizes for the target ordering. We may need
1590
1565
# - we encounter an unadded ref, or
1591
1566
# - we run out of keys, or
1592
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1593
batcher = _BatchingBlockFetcher(self, locations,
1594
get_compressor_settings=self._get_compressor_settings)
1568
batcher = _BatchingBlockFetcher(self, locations)
1595
1569
for source, keys in source_keys:
1596
1570
if source is self:
1597
1571
for key in keys:
1643
1617
for _ in self._insert_record_stream(stream, random_id=False):
1646
def _get_compressor_settings(self):
1647
if self._max_bytes_to_index is None:
1648
# TODO: VersionedFiles don't know about their containing
1649
# repository, so they don't have much of an idea about their
1650
# location. So for now, this is only a global option.
1651
c = config.GlobalConfig()
1652
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1656
except ValueError, e:
1657
trace.warning('Value for '
1658
'"bzr.groupcompress.max_bytes_to_index"'
1659
' %r is not an integer'
1663
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1664
self._max_bytes_to_index = val
1665
return {'max_bytes_to_index': self._max_bytes_to_index}
1667
def _make_group_compressor(self):
1668
return GroupCompressor(self._get_compressor_settings())
1670
1620
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1671
1621
reuse_blocks=True):
1672
1622
"""Internal core to insert a record stream into this container.
1696
1646
# This will go up to fulltexts for gc to gc fetching, which isn't
1698
self._compressor = self._make_group_compressor()
1648
self._compressor = GroupCompressor()
1699
1649
self._unadded_refs = {}
1700
1650
keys_to_add = []
1702
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1703
self._compressor = self._make_group_compressor()
1653
self._compressor = GroupCompressor()
1704
1654
# Note: At this point we still have 1 copy of the fulltext (in
1705
1655
# record and the var 'bytes'), and this generates 2 copies of
1706
1656
# the compressed text (one for bytes, one in chunks)
1736
1686
raise errors.RevisionNotPresent(record.key, self)
1738
1688
if record.key in inserted_keys:
1739
trace.note(gettext('Insert claimed random_id=True,'
1740
' but then inserted %r two times'), record.key)
1689
trace.note('Insert claimed random_id=True,'
1690
' but then inserted %r two times', record.key)
1742
1692
inserted_keys.add(record.key)
1743
1693
if reuse_blocks:
1971
1921
# repeated over and over, this creates a surplus of ints
1972
1922
self._int_cache = {}
1973
1923
if track_external_parent_refs:
1974
self._key_dependencies = _KeyRefs(
1924
self._key_dependencies = knit._KeyRefs(
1975
1925
track_new_keys=track_new_keys)
1977
1927
self._key_dependencies = None
2117
2067
:param keys: An iterable of keys.
2118
2068
:return: A dict of key:
2119
2069
(index_memo, compression_parent, parents, record_details).
2121
* index_memo: opaque structure to pass to read_records to extract
2123
* compression_parent: Content that this record is built upon, may
2125
* parents: Logical parents of this node
2126
* record_details: extra information about the content which needs
2127
to be passed to Factory.parse_record
2071
opaque structure to pass to read_records to extract the raw
2074
Content that this record is built upon, may be None
2076
Logical parents of this node
2078
extra information about the content which needs to be passed to
2079
Factory.parse_record
2129
2081
self._check_read()