23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
33
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
42
49
AbsentContentFactory,
43
50
ChunkedContentFactory,
44
51
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
484
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
487
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
488
495
self._block = block
489
496
# We need to preserve the ordering
490
497
self._factories = []
491
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
493
514
def add_factory(self, key, parents, start, end):
494
515
if not self._factories:
527
548
new_block.set_content(self._block._content[:last_byte])
528
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
530
554
def _rebuild_block(self):
531
555
"""Create a new GroupCompressBlock with only the referenced texts."""
532
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
533
557
tstart = time.time()
534
558
old_length = self._block._content_length
547
571
# block? It seems hard to come up with a method that it would
548
572
# expand, since we do full compression again. Perhaps based on a
549
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
550
579
delta = time.time() - tstart
551
580
self._block = new_block
552
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
784
813
self.labels_deltas = {}
785
814
self._delta_index = None # Set by the children
786
815
self._block = GroupCompressBlock()
819
self._settings = settings
788
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
789
822
"""Compress lines with label key.
905
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
908
941
"""Create a GroupCompressor.
910
943
Used only if the pyrex version is not available.
912
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
913
946
self._delta_index = LinesDeltaIndex([])
914
947
# The actual content is managed by LinesDeltaIndex
915
948
self.chunks = self._delta_index.lines
953
986
It contains code very similar to SequenceMatcher because of having a similar
954
987
task. However some key differences apply:
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
989
* there is no junk, we want a minimal edit not a human readable diff.
990
* we don't filter very common lines (because we don't know where a good
991
range will start, and after the first text we want to be emitting minmal
993
* we chain the left side, not the right side
994
* we incrementally update the adjacency matrix as new lines are provided.
995
* we look for matches in all of the left side, so the routine which does
996
the analagous task of find_longest_match does not need to filter on the
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
1000
def __init__(self, settings=None):
1001
super(PyrexGroupCompressor, self).__init__(settings)
1002
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1003
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
970
1005
def _compress(self, key, bytes, max_delta_size, soft=False):
971
1006
"""see _CommonGroupCompressor._compress"""
1046
1081
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1047
1082
add_callback=graph_index.add_nodes,
1048
1083
inconsistency_fatal=inconsistency_fatal)
1049
access = knit._DirectPackAccess({})
1084
access = pack_repo._DirectPackAccess({})
1050
1085
access.set_writer(writer, graph_index, (transport, 'newpack'))
1051
1086
result = GroupCompressVersionedFiles(index, access, delta)
1052
1087
result.stream = stream
1063
1098
class _BatchingBlockFetcher(object):
1064
1099
"""Fetch group compress blocks in batches.
1066
1101
:ivar total_bytes: int of expected number of bytes needed to fetch the
1067
1102
currently pending batch.
1070
def __init__(self, gcvf, locations):
1105
def __init__(self, gcvf, locations, get_compressor_settings=None):
1071
1106
self.gcvf = gcvf
1072
1107
self.locations = locations
1121
1157
def yield_factories(self, full_flush=False):
1122
1158
"""Yield factories for keys added since the last yield. They will be
1123
1159
returned in the order they were added via add_key.
1125
1161
:param full_flush: by default, some results may not be returned in case
1126
1162
they can be part of the next batch. If full_flush is True, then
1127
1163
all results are returned.
1155
1191
memos_to_get_stack.pop()
1157
1193
block = self.batch_memos[read_memo]
1158
self.manager = _LazyGroupContentManager(block)
1194
self.manager = _LazyGroupContentManager(block,
1195
get_compressor_settings=self._get_compressor_settings)
1159
1196
self.last_read_memo = read_memo
1160
1197
start, end = index_memo[3:5]
1161
1198
self.manager.add_factory(key, parents, start, end)
1168
1205
self.total_bytes = 0
1171
class GroupCompressVersionedFiles(VersionedFiles):
1208
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1172
1209
"""A group-compress based VersionedFiles implementation."""
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1211
# This controls how the GroupCompress DeltaIndex works. Basically, we
1212
# compute hash pointers into the source blocks (so hash(text) => text).
1213
# However each of these references costs some memory in trade against a
1214
# more accurate match result. For very large files, they either are
1215
# pre-compressed and change in bulk whenever they change, or change in just
1216
# local blocks. Either way, 'improved resolution' is not very helpful,
1217
# versus running out of memory trying to track everything. The default max
1218
# gives 100% sampling of a 1MB file.
1219
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1220
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1221
_DEFAULT_MAX_BYTES_TO_INDEX}
1223
def __init__(self, index, access, delta=True, _unadded_refs=None,
1175
1225
"""Create a GroupCompressVersionedFiles object.
1177
1227
:param index: The index object storing access and graph data.
1178
1228
:param access: The access object storing raw data.
1179
1229
:param delta: Whether to delta compress or just entropy compress.
1180
1230
:param _unadded_refs: private parameter, don't use.
1231
:param _group_cache: private parameter, don't use.
1182
1233
self._index = index
1183
1234
self._access = access
1185
1236
if _unadded_refs is None:
1186
1237
_unadded_refs = {}
1187
1238
self._unadded_refs = _unadded_refs
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1239
if _group_cache is None:
1240
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1241
self._group_cache = _group_cache
1189
1242
self._immediate_fallback_vfs = []
1243
self._max_bytes_to_index = None
1191
1245
def without_fallbacks(self):
1192
1246
"""Return a clone of this object without any fallbacks configured."""
1193
1247
return GroupCompressVersionedFiles(self._index, self._access,
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1248
self._delta, _unadded_refs=dict(self._unadded_refs),
1249
_group_cache=self._group_cache)
1196
1251
def add_lines(self, key, parents, lines, parent_texts=None,
1197
1252
left_matching_blocks=None, nostore_sha=None, random_id=False,
1201
1256
:param key: The key tuple of the text to add.
1202
1257
:param parents: The parents key tuples of the text to add.
1203
1258
:param lines: A list of lines. Each line must be a bytestring. And all
1204
of them except the last must be terminated with \n and contain no
1205
other \n's. The last line may either contain no \n's or a single
1206
terminating \n. If the lines list does meet this constraint the add
1207
routine may error or may succeed - but you will be unable to read
1208
the data back accurately. (Checking the lines have been split
1259
of them except the last must be terminated with \\n and contain no
1260
other \\n's. The last line may either contain no \\n's or a single
1261
terminating \\n. If the lines list does meet this constraint the
1262
add routine may error or may succeed - but you will be unable to
1263
read the data back accurately. (Checking the lines have been split
1209
1264
correctly is expensive and extremely unlikely to catch bugs so it
1210
1265
is not done at runtime unless check_content is True.)
1211
1266
:param parent_texts: An optional dictionary containing the opaque
1306
1361
self._check_lines_not_unicode(lines)
1307
1362
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._transitive_fallbacks():
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1325
1364
def get_parent_map(self, keys):
1326
1365
"""Get a map of the graph parents of keys.
1466
1505
The returned objects should be in the order defined by 'ordering',
1467
1506
which can weave between different sources.
1468
1508
:param ordering: Must be one of 'topological' or 'groupcompress'
1469
1509
:return: List of [(source, [keys])] tuples, such that all keys are in
1470
1510
the defined order, regardless of source.
1472
1512
if ordering == 'topological':
1473
present_keys = topo_sort(parent_map)
1513
present_keys = tsort.topo_sort(parent_map)
1475
1515
# ordering == 'groupcompress'
1476
1516
# XXX: This only optimizes for the target ordering. We may need
1565
1605
# - we encounter an unadded ref, or
1566
1606
# - we run out of keys, or
1567
1607
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1608
batcher = _BatchingBlockFetcher(self, locations,
1609
get_compressor_settings=self._get_compressor_settings)
1569
1610
for source, keys in source_keys:
1570
1611
if source is self:
1571
1612
for key in keys:
1617
1658
for _ in self._insert_record_stream(stream, random_id=False):
1661
def _get_compressor_settings(self):
1662
if self._max_bytes_to_index is None:
1663
# TODO: VersionedFiles don't know about their containing
1664
# repository, so they don't have much of an idea about their
1665
# location. So for now, this is only a global option.
1666
c = config.GlobalConfig()
1667
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
except ValueError, e:
1672
trace.warning('Value for '
1673
'"bzr.groupcompress.max_bytes_to_index"'
1674
' %r is not an integer'
1678
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1679
self._max_bytes_to_index = val
1680
return {'max_bytes_to_index': self._max_bytes_to_index}
1682
def _make_group_compressor(self):
1683
return GroupCompressor(self._get_compressor_settings())
1620
1685
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1621
1686
reuse_blocks=True):
1622
1687
"""Internal core to insert a record stream into this container.
1646
1711
# This will go up to fulltexts for gc to gc fetching, which isn't
1648
self._compressor = GroupCompressor()
1713
self._compressor = self._make_group_compressor()
1649
1714
self._unadded_refs = {}
1650
1715
keys_to_add = []
1652
1717
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1718
self._compressor = self._make_group_compressor()
1654
1719
# Note: At this point we still have 1 copy of the fulltext (in
1655
1720
# record and the var 'bytes'), and this generates 2 copies of
1656
1721
# the compressed text (one for bytes, one in chunks)
1921
1986
# repeated over and over, this creates a surplus of ints
1922
1987
self._int_cache = {}
1923
1988
if track_external_parent_refs:
1924
self._key_dependencies = knit._KeyRefs(
1989
self._key_dependencies = _KeyRefs(
1925
1990
track_new_keys=track_new_keys)
1927
1992
self._key_dependencies = None
2067
2132
:param keys: An iterable of keys.
2068
2133
:return: A dict of key:
2069
2134
(index_memo, compression_parent, parents, record_details).
2071
opaque structure to pass to read_records to extract the raw
2074
Content that this record is built upon, may be None
2076
Logical parents of this node
2078
extra information about the content which needs to be passed to
2079
Factory.parse_record
2136
* index_memo: opaque structure to pass to read_records to extract
2138
* compression_parent: Content that this record is built upon, may
2140
* parents: Logical parents of this node
2141
* record_details: extra information about the content which needs
2142
to be passed to Factory.parse_record
2081
2144
self._check_read()