490
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
491
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
493
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
494
495
self._block = block
495
496
# We need to preserve the ordering
496
497
self._factories = []
497
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
499
514
def add_factory(self, key, parents, start, end):
500
515
if not self._factories:
533
548
new_block.set_content(self._block._content[:last_byte])
534
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
536
554
def _rebuild_block(self):
537
555
"""Create a new GroupCompressBlock with only the referenced texts."""
538
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
539
557
tstart = time.time()
540
558
old_length = self._block._content_length
553
571
# block? It seems hard to come up with a method that it would
554
572
# expand, since we do full compression again. Perhaps based on a
555
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
556
579
delta = time.time() - tstart
557
580
self._block = new_block
558
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
790
813
self.labels_deltas = {}
791
814
self._delta_index = None # Set by the children
792
815
self._block = GroupCompressBlock()
819
self._settings = settings
794
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
795
822
"""Compress lines with label key.
911
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
914
941
"""Create a GroupCompressor.
916
943
Used only if the pyrex version is not available.
918
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
919
946
self._delta_index = LinesDeltaIndex([])
920
947
# The actual content is managed by LinesDeltaIndex
921
948
self.chunks = self._delta_index.lines
973
super(PyrexGroupCompressor, self).__init__()
974
self._delta_index = DeltaIndex()
999
def __init__(self, settings=None):
1000
super(PyrexGroupCompressor, self).__init__(settings)
1001
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1002
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
976
1004
def _compress(self, key, bytes, max_delta_size, soft=False):
977
1005
"""see _CommonGroupCompressor._compress"""
1069
1097
class _BatchingBlockFetcher(object):
1070
1098
"""Fetch group compress blocks in batches.
1072
1100
:ivar total_bytes: int of expected number of bytes needed to fetch the
1073
1101
currently pending batch.
1076
def __init__(self, gcvf, locations):
1104
def __init__(self, gcvf, locations, get_compressor_settings=None):
1077
1105
self.gcvf = gcvf
1078
1106
self.locations = locations
1127
1156
def yield_factories(self, full_flush=False):
1128
1157
"""Yield factories for keys added since the last yield. They will be
1129
1158
returned in the order they were added via add_key.
1131
1160
:param full_flush: by default, some results may not be returned in case
1132
1161
they can be part of the next batch. If full_flush is True, then
1133
1162
all results are returned.
1161
1190
memos_to_get_stack.pop()
1163
1192
block = self.batch_memos[read_memo]
1164
self.manager = _LazyGroupContentManager(block)
1193
self.manager = _LazyGroupContentManager(block,
1194
get_compressor_settings=self._get_compressor_settings)
1165
1195
self.last_read_memo = read_memo
1166
1196
start, end = index_memo[3:5]
1167
1197
self.manager.add_factory(key, parents, start, end)
1174
1204
self.total_bytes = 0
1177
class GroupCompressVersionedFiles(VersionedFiles):
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1178
1208
"""A group-compress based VersionedFiles implementation."""
1180
def __init__(self, index, access, delta=True, _unadded_refs=None):
1210
# This controls how the GroupCompress DeltaIndex works. Basically, we
1211
# compute hash pointers into the source blocks (so hash(text) => text).
1212
# However each of these references costs some memory in trade against a
1213
# more accurate match result. For very large files, they either are
1214
# pre-compressed and change in bulk whenever they change, or change in just
1215
# local blocks. Either way, 'improved resolution' is not very helpful,
1216
# versus running out of memory trying to track everything. The default max
1217
# gives 100% sampling of a 1MB file.
1218
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1219
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1220
_DEFAULT_MAX_BYTES_TO_INDEX}
1222
def __init__(self, index, access, delta=True, _unadded_refs=None,
1181
1224
"""Create a GroupCompressVersionedFiles object.
1183
1226
:param index: The index object storing access and graph data.
1184
1227
:param access: The access object storing raw data.
1185
1228
:param delta: Whether to delta compress or just entropy compress.
1186
1229
:param _unadded_refs: private parameter, don't use.
1230
:param _group_cache: private parameter, don't use.
1188
1232
self._index = index
1189
1233
self._access = access
1191
1235
if _unadded_refs is None:
1192
1236
_unadded_refs = {}
1193
1237
self._unadded_refs = _unadded_refs
1194
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1238
if _group_cache is None:
1239
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1240
self._group_cache = _group_cache
1195
1241
self._immediate_fallback_vfs = []
1242
self._max_bytes_to_index = None
1197
1244
def without_fallbacks(self):
1198
1245
"""Return a clone of this object without any fallbacks configured."""
1199
1246
return GroupCompressVersionedFiles(self._index, self._access,
1200
self._delta, _unadded_refs=dict(self._unadded_refs))
1247
self._delta, _unadded_refs=dict(self._unadded_refs),
1248
_group_cache=self._group_cache)
1202
1250
def add_lines(self, key, parents, lines, parent_texts=None,
1203
1251
left_matching_blocks=None, nostore_sha=None, random_id=False,
1312
1360
self._check_lines_not_unicode(lines)
1313
1361
self._check_lines_are_lines(lines)
1315
def get_known_graph_ancestry(self, keys):
1316
"""Get a KnownGraph instance with the ancestry of keys."""
1317
# Note that this is identical to
1318
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1320
parent_map, missing_keys = self._index.find_ancestry(keys)
1321
for fallback in self._transitive_fallbacks():
1322
if not missing_keys:
1324
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1326
parent_map.update(f_parent_map)
1327
missing_keys = f_missing_keys
1328
kg = _mod_graph.KnownGraph(parent_map)
1331
1363
def get_parent_map(self, keys):
1332
1364
"""Get a map of the graph parents of keys.
1571
1603
# - we encounter an unadded ref, or
1572
1604
# - we run out of keys, or
1573
1605
# - the total bytes to retrieve for this batch > BATCH_SIZE
1574
batcher = _BatchingBlockFetcher(self, locations)
1606
batcher = _BatchingBlockFetcher(self, locations,
1607
get_compressor_settings=self._get_compressor_settings)
1575
1608
for source, keys in source_keys:
1576
1609
if source is self:
1577
1610
for key in keys:
1623
1656
for _ in self._insert_record_stream(stream, random_id=False):
1659
def _get_compressor_settings(self):
1660
if self._max_bytes_to_index is None:
1661
# TODO: VersionedFiles don't know about their containing
1662
# repository, so they don't have much of an idea about their
1663
# location. So for now, this is only a global option.
1664
c = config.GlobalConfig()
1665
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1669
except ValueError, e:
1670
trace.warning('Value for '
1671
'"bzr.groupcompress.max_bytes_to_index"'
1672
' %r is not an integer'
1676
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1677
self._max_bytes_to_index = val
1678
return {'max_bytes_to_index': self._max_bytes_to_index}
1680
def _make_group_compressor(self):
1681
return GroupCompressor(self._get_compressor_settings())
1626
1683
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1627
1684
reuse_blocks=True):
1628
1685
"""Internal core to insert a record stream into this container.
1652
1709
# This will go up to fulltexts for gc to gc fetching, which isn't
1654
self._compressor = GroupCompressor()
1711
self._compressor = self._make_group_compressor()
1655
1712
self._unadded_refs = {}
1656
1713
keys_to_add = []
1658
1715
bytes_len, chunks = self._compressor.flush().to_chunks()
1659
self._compressor = GroupCompressor()
1716
self._compressor = self._make_group_compressor()
1660
1717
# Note: At this point we still have 1 copy of the fulltext (in
1661
1718
# record and the var 'bytes'), and this generates 2 copies of
1662
1719
# the compressed text (one for bytes, one in chunks)