466
465
# Grab and cache the raw bytes for this entry
467
466
# and break the ref-cycle with _manager since we don't need it
470
self._manager._prepare_for_extract()
471
except zlib.error as value:
472
raise errors.DecompressCorruption("zlib: " + str(value))
468
self._manager._prepare_for_extract()
473
469
block = self._manager._block
474
470
self._bytes = block.extract(self.key, self._start, self._end)
475
471
# There are code paths that first extract as fulltext, and then
494
490
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
491
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
497
def __init__(self, block, get_compressor_settings=None):
493
def __init__(self, block):
498
494
self._block = block
499
495
# We need to preserve the ordering
500
496
self._factories = []
501
497
self._last_byte = 0
502
self._get_settings = get_compressor_settings
503
self._compressor_settings = None
505
def _get_compressor_settings(self):
506
if self._compressor_settings is not None:
507
return self._compressor_settings
509
if self._get_settings is not None:
510
settings = self._get_settings()
512
vf = GroupCompressVersionedFiles
513
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
self._compressor_settings = settings
515
return self._compressor_settings
517
499
def add_factory(self, key, parents, start, end):
518
500
if not self._factories:
551
533
new_block.set_content(self._block._content[:last_byte])
552
534
self._block = new_block
554
def _make_group_compressor(self):
555
return GroupCompressor(self._get_compressor_settings())
557
536
def _rebuild_block(self):
558
537
"""Create a new GroupCompressBlock with only the referenced texts."""
559
compressor = self._make_group_compressor()
538
compressor = GroupCompressor()
560
539
tstart = time.time()
561
540
old_length = self._block._content_length
574
553
# block? It seems hard to come up with a method that it would
575
554
# expand, since we do full compression again. Perhaps based on a
576
555
# request that ends up poorly ordered?
577
# TODO: If the content would have expanded, then we would want to
578
# handle a case where we need to split the block.
579
# Now that we have a user-tweakable option
580
# (max_bytes_to_index), it is possible that one person set it
581
# to a very low value, causing poor compression.
582
556
delta = time.time() - tstart
583
557
self._block = new_block
584
558
trace.mutter('creating new compressed block on-the-fly in %.3fs'
816
790
self.labels_deltas = {}
817
791
self._delta_index = None # Set by the children
818
792
self._block = GroupCompressBlock()
822
self._settings = settings
824
794
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
825
795
"""Compress lines with label key.
941
911
class PythonGroupCompressor(_CommonGroupCompressor):
943
def __init__(self, settings=None):
944
914
"""Create a GroupCompressor.
946
916
Used only if the pyrex version is not available.
948
super(PythonGroupCompressor, self).__init__(settings)
918
super(PythonGroupCompressor, self).__init__()
949
919
self._delta_index = LinesDeltaIndex([])
950
920
# The actual content is managed by LinesDeltaIndex
951
921
self.chunks = self._delta_index.lines
989
959
It contains code very similar to SequenceMatcher because of having a similar
990
960
task. However some key differences apply:
992
* there is no junk, we want a minimal edit not a human readable diff.
993
* we don't filter very common lines (because we don't know where a good
994
range will start, and after the first text we want to be emitting minmal
996
* we chain the left side, not the right side
997
* we incrementally update the adjacency matrix as new lines are provided.
998
* we look for matches in all of the left side, so the routine which does
999
the analagous task of find_longest_match does not need to filter on the
961
- there is no junk, we want a minimal edit not a human readable diff.
962
- we don't filter very common lines (because we don't know where a good
963
range will start, and after the first text we want to be emitting minmal
965
- we chain the left side, not the right side
966
- we incrementally update the adjacency matrix as new lines are provided.
967
- we look for matches in all of the left side, so the routine which does
968
the analagous task of find_longest_match does not need to filter on the
1003
def __init__(self, settings=None):
1004
super(PyrexGroupCompressor, self).__init__(settings)
1005
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
973
super(PyrexGroupCompressor, self).__init__()
974
self._delta_index = DeltaIndex()
1008
976
def _compress(self, key, bytes, max_delta_size, soft=False):
1009
977
"""see _CommonGroupCompressor._compress"""
1101
1069
class _BatchingBlockFetcher(object):
1102
1070
"""Fetch group compress blocks in batches.
1104
1072
:ivar total_bytes: int of expected number of bytes needed to fetch the
1105
1073
currently pending batch.
1108
def __init__(self, gcvf, locations, get_compressor_settings=None):
1076
def __init__(self, gcvf, locations):
1109
1077
self.gcvf = gcvf
1110
1078
self.locations = locations
1160
1127
def yield_factories(self, full_flush=False):
1161
1128
"""Yield factories for keys added since the last yield. They will be
1162
1129
returned in the order they were added via add_key.
1164
1131
:param full_flush: by default, some results may not be returned in case
1165
1132
they can be part of the next batch. If full_flush is True, then
1166
1133
all results are returned.
1194
1161
memos_to_get_stack.pop()
1196
1163
block = self.batch_memos[read_memo]
1197
self.manager = _LazyGroupContentManager(block,
1198
get_compressor_settings=self._get_compressor_settings)
1164
self.manager = _LazyGroupContentManager(block)
1199
1165
self.last_read_memo = read_memo
1200
1166
start, end = index_memo[3:5]
1201
1167
self.manager.add_factory(key, parents, start, end)
1211
1177
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1212
1178
"""A group-compress based VersionedFiles implementation."""
1214
# This controls how the GroupCompress DeltaIndex works. Basically, we
1215
# compute hash pointers into the source blocks (so hash(text) => text).
1216
# However each of these references costs some memory in trade against a
1217
# more accurate match result. For very large files, they either are
1218
# pre-compressed and change in bulk whenever they change, or change in just
1219
# local blocks. Either way, 'improved resolution' is not very helpful,
1220
# versus running out of memory trying to track everything. The default max
1221
# gives 100% sampling of a 1MB file.
1222
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
_DEFAULT_MAX_BYTES_TO_INDEX}
1226
1180
def __init__(self, index, access, delta=True, _unadded_refs=None,
1228
1182
"""Create a GroupCompressVersionedFiles object.
1230
1184
:param index: The index object storing access and graph data.
1243
1197
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
1198
self._group_cache = _group_cache
1245
1199
self._immediate_fallback_vfs = []
1246
self._max_bytes_to_index = None
1248
1201
def without_fallbacks(self):
1249
1202
"""Return a clone of this object without any fallbacks configured."""
1259
1212
:param key: The key tuple of the text to add.
1260
1213
:param parents: The parents key tuples of the text to add.
1261
1214
:param lines: A list of lines. Each line must be a bytestring. And all
1262
of them except the last must be terminated with \\n and contain no
1263
other \\n's. The last line may either contain no \\n's or a single
1264
terminating \\n. If the lines list does meet this constraint the
1265
add routine may error or may succeed - but you will be unable to
1266
read the data back accurately. (Checking the lines have been split
1215
of them except the last must be terminated with \n and contain no
1216
other \n's. The last line may either contain no \n's or a single
1217
terminating \n. If the lines list does meet this constraint the add
1218
routine may error or may succeed - but you will be unable to read
1219
the data back accurately. (Checking the lines have been split
1267
1220
correctly is expensive and extremely unlikely to catch bugs so it
1268
1221
is not done at runtime unless check_content is True.)
1269
1222
:param parent_texts: An optional dictionary containing the opaque
1364
1317
self._check_lines_not_unicode(lines)
1365
1318
self._check_lines_are_lines(lines)
1320
def get_known_graph_ancestry(self, keys):
1321
"""Get a KnownGraph instance with the ancestry of keys."""
1322
# Note that this is identical to
1323
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1325
parent_map, missing_keys = self._index.find_ancestry(keys)
1326
for fallback in self._transitive_fallbacks():
1327
if not missing_keys:
1329
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1331
parent_map.update(f_parent_map)
1332
missing_keys = f_missing_keys
1333
kg = _mod_graph.KnownGraph(parent_map)
1367
1336
def get_parent_map(self, keys):
1368
1337
"""Get a map of the graph parents of keys.
1508
1477
The returned objects should be in the order defined by 'ordering',
1509
1478
which can weave between different sources.
1511
1479
:param ordering: Must be one of 'topological' or 'groupcompress'
1512
1480
:return: List of [(source, [keys])] tuples, such that all keys are in
1513
1481
the defined order, regardless of source.
1608
1576
# - we encounter an unadded ref, or
1609
1577
# - we run out of keys, or
1610
1578
# - the total bytes to retrieve for this batch > BATCH_SIZE
1611
batcher = _BatchingBlockFetcher(self, locations,
1612
get_compressor_settings=self._get_compressor_settings)
1579
batcher = _BatchingBlockFetcher(self, locations)
1613
1580
for source, keys in source_keys:
1614
1581
if source is self:
1615
1582
for key in keys:
1661
1628
for _ in self._insert_record_stream(stream, random_id=False):
1664
def _get_compressor_settings(self):
1665
if self._max_bytes_to_index is None:
1666
# TODO: VersionedFiles don't know about their containing
1667
# repository, so they don't have much of an idea about their
1668
# location. So for now, this is only a global option.
1669
c = config.GlobalConfig()
1670
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1674
except ValueError, e:
1675
trace.warning('Value for '
1676
'"bzr.groupcompress.max_bytes_to_index"'
1677
' %r is not an integer'
1681
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
self._max_bytes_to_index = val
1683
return {'max_bytes_to_index': self._max_bytes_to_index}
1685
def _make_group_compressor(self):
1686
return GroupCompressor(self._get_compressor_settings())
1688
1631
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1632
reuse_blocks=True):
1690
1633
"""Internal core to insert a record stream into this container.
1714
1657
# This will go up to fulltexts for gc to gc fetching, which isn't
1716
self._compressor = self._make_group_compressor()
1659
self._compressor = GroupCompressor()
1717
1660
self._unadded_refs = {}
1718
1661
keys_to_add = []
1720
1663
bytes_len, chunks = self._compressor.flush().to_chunks()
1721
self._compressor = self._make_group_compressor()
1664
self._compressor = GroupCompressor()
1722
1665
# Note: At this point we still have 1 copy of the fulltext (in
1723
1666
# record and the var 'bytes'), and this generates 2 copies of
1724
1667
# the compressed text (one for bytes, one in chunks)
2135
2078
:param keys: An iterable of keys.
2136
2079
:return: A dict of key:
2137
2080
(index_memo, compression_parent, parents, record_details).
2139
* index_memo: opaque structure to pass to read_records to extract
2141
* compression_parent: Content that this record is built upon, may
2143
* parents: Logical parents of this node
2144
* record_details: extra information about the content which needs
2145
to be passed to Factory.parse_record
2082
opaque structure to pass to read_records to extract the raw
2085
Content that this record is built upon, may be None
2087
Logical parents of this node
2089
extra information about the content which needs to be passed to
2090
Factory.parse_record
2147
2092
self._check_read()