490
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
491
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
493
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
494
495
self._block = block
495
496
# We need to preserve the ordering
496
497
self._factories = []
497
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
499
514
def add_factory(self, key, parents, start, end):
500
515
if not self._factories:
533
548
new_block.set_content(self._block._content[:last_byte])
534
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
536
554
def _rebuild_block(self):
537
555
"""Create a new GroupCompressBlock with only the referenced texts."""
538
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
539
557
tstart = time.time()
540
558
old_length = self._block._content_length
553
571
# block? It seems hard to come up with a method that it would
554
572
# expand, since we do full compression again. Perhaps based on a
555
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
556
579
delta = time.time() - tstart
557
580
self._block = new_block
558
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
790
813
self.labels_deltas = {}
791
814
self._delta_index = None # Set by the children
792
815
self._block = GroupCompressBlock()
819
self._settings = settings
794
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
795
822
"""Compress lines with label key.
911
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
914
941
"""Create a GroupCompressor.
916
943
Used only if the pyrex version is not available.
918
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
919
946
self._delta_index = LinesDeltaIndex([])
920
947
# The actual content is managed by LinesDeltaIndex
921
948
self.chunks = self._delta_index.lines
959
986
It contains code very similar to SequenceMatcher because of having a similar
960
987
task. However some key differences apply:
961
- there is no junk, we want a minimal edit not a human readable diff.
962
- we don't filter very common lines (because we don't know where a good
963
range will start, and after the first text we want to be emitting minmal
965
- we chain the left side, not the right side
966
- we incrementally update the adjacency matrix as new lines are provided.
967
- we look for matches in all of the left side, so the routine which does
968
the analagous task of find_longest_match does not need to filter on the
989
* there is no junk, we want a minimal edit not a human readable diff.
990
* we don't filter very common lines (because we don't know where a good
991
range will start, and after the first text we want to be emitting minmal
993
* we chain the left side, not the right side
994
* we incrementally update the adjacency matrix as new lines are provided.
995
* we look for matches in all of the left side, so the routine which does
996
the analagous task of find_longest_match does not need to filter on the
973
super(PyrexGroupCompressor, self).__init__()
974
self._delta_index = DeltaIndex()
1000
def __init__(self, settings=None):
1001
super(PyrexGroupCompressor, self).__init__(settings)
1002
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1003
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
976
1005
def _compress(self, key, bytes, max_delta_size, soft=False):
977
1006
"""see _CommonGroupCompressor._compress"""
1069
1098
class _BatchingBlockFetcher(object):
1070
1099
"""Fetch group compress blocks in batches.
1072
1101
:ivar total_bytes: int of expected number of bytes needed to fetch the
1073
1102
currently pending batch.
1076
def __init__(self, gcvf, locations):
1105
def __init__(self, gcvf, locations, get_compressor_settings=None):
1077
1106
self.gcvf = gcvf
1078
1107
self.locations = locations
1127
1157
def yield_factories(self, full_flush=False):
1128
1158
"""Yield factories for keys added since the last yield. They will be
1129
1159
returned in the order they were added via add_key.
1131
1161
:param full_flush: by default, some results may not be returned in case
1132
1162
they can be part of the next batch. If full_flush is True, then
1133
1163
all results are returned.
1161
1191
memos_to_get_stack.pop()
1163
1193
block = self.batch_memos[read_memo]
1164
self.manager = _LazyGroupContentManager(block)
1194
self.manager = _LazyGroupContentManager(block,
1195
get_compressor_settings=self._get_compressor_settings)
1165
1196
self.last_read_memo = read_memo
1166
1197
start, end = index_memo[3:5]
1167
1198
self.manager.add_factory(key, parents, start, end)
1177
1208
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1178
1209
"""A group-compress based VersionedFiles implementation."""
1211
# This controls how the GroupCompress DeltaIndex works. Basically, we
1212
# compute hash pointers into the source blocks (so hash(text) => text).
1213
# However each of these references costs some memory in trade against a
1214
# more accurate match result. For very large files, they either are
1215
# pre-compressed and change in bulk whenever they change, or change in just
1216
# local blocks. Either way, 'improved resolution' is not very helpful,
1217
# versus running out of memory trying to track everything. The default max
1218
# gives 100% sampling of a 1MB file.
1219
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1220
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1221
_DEFAULT_MAX_BYTES_TO_INDEX}
1180
1223
def __init__(self, index, access, delta=True, _unadded_refs=None,
1182
1225
"""Create a GroupCompressVersionedFiles object.
1184
1227
:param index: The index object storing access and graph data.
1197
1240
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1198
1241
self._group_cache = _group_cache
1199
1242
self._immediate_fallback_vfs = []
1243
self._max_bytes_to_index = None
1201
1245
def without_fallbacks(self):
1202
1246
"""Return a clone of this object without any fallbacks configured."""
1212
1256
:param key: The key tuple of the text to add.
1213
1257
:param parents: The parents key tuples of the text to add.
1214
1258
:param lines: A list of lines. Each line must be a bytestring. And all
1215
of them except the last must be terminated with \n and contain no
1216
other \n's. The last line may either contain no \n's or a single
1217
terminating \n. If the lines list does meet this constraint the add
1218
routine may error or may succeed - but you will be unable to read
1219
the data back accurately. (Checking the lines have been split
1259
of them except the last must be terminated with \\n and contain no
1260
other \\n's. The last line may either contain no \\n's or a single
1261
terminating \\n. If the lines list does meet this constraint the
1262
add routine may error or may succeed - but you will be unable to
1263
read the data back accurately. (Checking the lines have been split
1220
1264
correctly is expensive and extremely unlikely to catch bugs so it
1221
1265
is not done at runtime unless check_content is True.)
1222
1266
:param parent_texts: An optional dictionary containing the opaque
1317
1361
self._check_lines_not_unicode(lines)
1318
1362
self._check_lines_are_lines(lines)
1320
def get_known_graph_ancestry(self, keys):
1321
"""Get a KnownGraph instance with the ancestry of keys."""
1322
# Note that this is identical to
1323
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1325
parent_map, missing_keys = self._index.find_ancestry(keys)
1326
for fallback in self._transitive_fallbacks():
1327
if not missing_keys:
1329
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1331
parent_map.update(f_parent_map)
1332
missing_keys = f_missing_keys
1333
kg = _mod_graph.KnownGraph(parent_map)
1336
1364
def get_parent_map(self, keys):
1337
1365
"""Get a map of the graph parents of keys.
1477
1505
The returned objects should be in the order defined by 'ordering',
1478
1506
which can weave between different sources.
1479
1508
:param ordering: Must be one of 'topological' or 'groupcompress'
1480
1509
:return: List of [(source, [keys])] tuples, such that all keys are in
1481
1510
the defined order, regardless of source.
1576
1605
# - we encounter an unadded ref, or
1577
1606
# - we run out of keys, or
1578
1607
# - the total bytes to retrieve for this batch > BATCH_SIZE
1579
batcher = _BatchingBlockFetcher(self, locations)
1608
batcher = _BatchingBlockFetcher(self, locations,
1609
get_compressor_settings=self._get_compressor_settings)
1580
1610
for source, keys in source_keys:
1581
1611
if source is self:
1582
1612
for key in keys:
1628
1658
for _ in self._insert_record_stream(stream, random_id=False):
1661
def _get_compressor_settings(self):
1662
if self._max_bytes_to_index is None:
1663
# TODO: VersionedFiles don't know about their containing
1664
# repository, so they don't have much of an idea about their
1665
# location. So for now, this is only a global option.
1666
c = config.GlobalConfig()
1667
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
except ValueError, e:
1672
trace.warning('Value for '
1673
'"bzr.groupcompress.max_bytes_to_index"'
1674
' %r is not an integer'
1678
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1679
self._max_bytes_to_index = val
1680
return {'max_bytes_to_index': self._max_bytes_to_index}
1682
def _make_group_compressor(self):
1683
return GroupCompressor(self._get_compressor_settings())
1631
1685
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1632
1686
reuse_blocks=True):
1633
1687
"""Internal core to insert a record stream into this container.
1657
1711
# This will go up to fulltexts for gc to gc fetching, which isn't
1659
self._compressor = GroupCompressor()
1713
self._compressor = self._make_group_compressor()
1660
1714
self._unadded_refs = {}
1661
1715
keys_to_add = []
1663
1717
bytes_len, chunks = self._compressor.flush().to_chunks()
1664
self._compressor = GroupCompressor()
1718
self._compressor = self._make_group_compressor()
1665
1719
# Note: At this point we still have 1 copy of the fulltext (in
1666
1720
# record and the var 'bytes'), and this generates 2 copies of
1667
1721
# the compressed text (one for bytes, one in chunks)
2078
2132
:param keys: An iterable of keys.
2079
2133
:return: A dict of key:
2080
2134
(index_memo, compression_parent, parents, record_details).
2082
opaque structure to pass to read_records to extract the raw
2085
Content that this record is built upon, may be None
2087
Logical parents of this node
2089
extra information about the content which needs to be passed to
2090
Factory.parse_record
2136
* index_memo: opaque structure to pass to read_records to extract
2138
* compression_parent: Content that this record is built upon, may
2140
* parents: Logical parents of this node
2141
* record_details: extra information about the content which needs
2142
to be passed to Factory.parse_record
2092
2144
self._check_read()