23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
33
30
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
42
from bzrlib.i18n import gettext
45
37
from bzrlib.btree_index import BTreeBuilder
46
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
47
40
from bzrlib.versionedfile import (
50
42
AbsentContentFactory,
51
43
ChunkedContentFactory,
52
44
FulltextContentFactory,
53
VersionedFilesWithFallbacks,
56
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
467
459
# Grab and cache the raw bytes for this entry
468
460
# and break the ref-cycle with _manager since we don't need it
471
self._manager._prepare_for_extract()
472
except zlib.error as value:
473
raise errors.DecompressCorruption("zlib: " + str(value))
462
self._manager._prepare_for_extract()
474
463
block = self._manager._block
475
464
self._bytes = block.extract(self.key, self._start, self._end)
476
465
# There are code paths that first extract as fulltext, and then
495
484
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
496
485
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
498
def __init__(self, block, get_compressor_settings=None):
487
def __init__(self, block):
499
488
self._block = block
500
489
# We need to preserve the ordering
501
490
self._factories = []
502
491
self._last_byte = 0
503
self._get_settings = get_compressor_settings
504
self._compressor_settings = None
506
def _get_compressor_settings(self):
507
if self._compressor_settings is not None:
508
return self._compressor_settings
510
if self._get_settings is not None:
511
settings = self._get_settings()
513
vf = GroupCompressVersionedFiles
514
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
515
self._compressor_settings = settings
516
return self._compressor_settings
518
493
def add_factory(self, key, parents, start, end):
519
494
if not self._factories:
552
527
new_block.set_content(self._block._content[:last_byte])
553
528
self._block = new_block
555
def _make_group_compressor(self):
556
return GroupCompressor(self._get_compressor_settings())
558
530
def _rebuild_block(self):
559
531
"""Create a new GroupCompressBlock with only the referenced texts."""
560
compressor = self._make_group_compressor()
532
compressor = GroupCompressor()
561
533
tstart = time.time()
562
534
old_length = self._block._content_length
575
547
# block? It seems hard to come up with a method that it would
576
548
# expand, since we do full compression again. Perhaps based on a
577
549
# request that ends up poorly ordered?
578
# TODO: If the content would have expanded, then we would want to
579
# handle a case where we need to split the block.
580
# Now that we have a user-tweakable option
581
# (max_bytes_to_index), it is possible that one person set it
582
# to a very low value, causing poor compression.
583
550
delta = time.time() - tstart
584
551
self._block = new_block
585
552
trace.mutter('creating new compressed block on-the-fly in %.3fs'
817
784
self.labels_deltas = {}
818
785
self._delta_index = None # Set by the children
819
786
self._block = GroupCompressBlock()
823
self._settings = settings
825
788
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
826
789
"""Compress lines with label key.
942
905
class PythonGroupCompressor(_CommonGroupCompressor):
944
def __init__(self, settings=None):
945
908
"""Create a GroupCompressor.
947
910
Used only if the pyrex version is not available.
949
super(PythonGroupCompressor, self).__init__(settings)
912
super(PythonGroupCompressor, self).__init__()
950
913
self._delta_index = LinesDeltaIndex([])
951
914
# The actual content is managed by LinesDeltaIndex
952
915
self.chunks = self._delta_index.lines
990
953
It contains code very similar to SequenceMatcher because of having a similar
991
954
task. However some key differences apply:
993
* there is no junk, we want a minimal edit not a human readable diff.
994
* we don't filter very common lines (because we don't know where a good
995
range will start, and after the first text we want to be emitting minmal
997
* we chain the left side, not the right side
998
* we incrementally update the adjacency matrix as new lines are provided.
999
* we look for matches in all of the left side, so the routine which does
1000
the analagous task of find_longest_match does not need to filter on the
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
1004
def __init__(self, settings=None):
1005
super(PyrexGroupCompressor, self).__init__(settings)
1006
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1007
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
1009
970
def _compress(self, key, bytes, max_delta_size, soft=False):
1010
971
"""see _CommonGroupCompressor._compress"""
1085
1046
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1086
1047
add_callback=graph_index.add_nodes,
1087
1048
inconsistency_fatal=inconsistency_fatal)
1088
access = pack_repo._DirectPackAccess({})
1049
access = knit._DirectPackAccess({})
1089
1050
access.set_writer(writer, graph_index, (transport, 'newpack'))
1090
1051
result = GroupCompressVersionedFiles(index, access, delta)
1091
1052
result.stream = stream
1102
1063
class _BatchingBlockFetcher(object):
1103
1064
"""Fetch group compress blocks in batches.
1105
1066
:ivar total_bytes: int of expected number of bytes needed to fetch the
1106
1067
currently pending batch.
1109
def __init__(self, gcvf, locations, get_compressor_settings=None):
1070
def __init__(self, gcvf, locations):
1110
1071
self.gcvf = gcvf
1111
1072
self.locations = locations
1161
1121
def yield_factories(self, full_flush=False):
1162
1122
"""Yield factories for keys added since the last yield. They will be
1163
1123
returned in the order they were added via add_key.
1165
1125
:param full_flush: by default, some results may not be returned in case
1166
1126
they can be part of the next batch. If full_flush is True, then
1167
1127
all results are returned.
1195
1155
memos_to_get_stack.pop()
1197
1157
block = self.batch_memos[read_memo]
1198
self.manager = _LazyGroupContentManager(block,
1199
get_compressor_settings=self._get_compressor_settings)
1158
self.manager = _LazyGroupContentManager(block)
1200
1159
self.last_read_memo = read_memo
1201
1160
start, end = index_memo[3:5]
1202
1161
self.manager.add_factory(key, parents, start, end)
1209
1168
self.total_bytes = 0
1212
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1171
class GroupCompressVersionedFiles(VersionedFiles):
1213
1172
"""A group-compress based VersionedFiles implementation."""
1215
# This controls how the GroupCompress DeltaIndex works. Basically, we
1216
# compute hash pointers into the source blocks (so hash(text) => text).
1217
# However each of these references costs some memory in trade against a
1218
# more accurate match result. For very large files, they either are
1219
# pre-compressed and change in bulk whenever they change, or change in just
1220
# local blocks. Either way, 'improved resolution' is not very helpful,
1221
# versus running out of memory trying to track everything. The default max
1222
# gives 100% sampling of a 1MB file.
1223
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1224
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1225
_DEFAULT_MAX_BYTES_TO_INDEX}
1227
def __init__(self, index, access, delta=True, _unadded_refs=None,
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1229
1175
"""Create a GroupCompressVersionedFiles object.
1231
1177
:param index: The index object storing access and graph data.
1232
1178
:param access: The access object storing raw data.
1233
1179
:param delta: Whether to delta compress or just entropy compress.
1234
1180
:param _unadded_refs: private parameter, don't use.
1235
:param _group_cache: private parameter, don't use.
1237
1182
self._index = index
1238
1183
self._access = access
1240
1185
if _unadded_refs is None:
1241
1186
_unadded_refs = {}
1242
1187
self._unadded_refs = _unadded_refs
1243
if _group_cache is None:
1244
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1245
self._group_cache = _group_cache
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1246
1189
self._immediate_fallback_vfs = []
1247
self._max_bytes_to_index = None
1249
1191
def without_fallbacks(self):
1250
1192
"""Return a clone of this object without any fallbacks configured."""
1251
1193
return GroupCompressVersionedFiles(self._index, self._access,
1252
self._delta, _unadded_refs=dict(self._unadded_refs),
1253
_group_cache=self._group_cache)
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1255
1196
def add_lines(self, key, parents, lines, parent_texts=None,
1256
1197
left_matching_blocks=None, nostore_sha=None, random_id=False,
1260
1201
:param key: The key tuple of the text to add.
1261
1202
:param parents: The parents key tuples of the text to add.
1262
1203
:param lines: A list of lines. Each line must be a bytestring. And all
1263
of them except the last must be terminated with \\n and contain no
1264
other \\n's. The last line may either contain no \\n's or a single
1265
terminating \\n. If the lines list does meet this constraint the
1266
add routine may error or may succeed - but you will be unable to
1267
read the data back accurately. (Checking the lines have been split
1204
of them except the last must be terminated with \n and contain no
1205
other \n's. The last line may either contain no \n's or a single
1206
terminating \n. If the lines list does meet this constraint the add
1207
routine may error or may succeed - but you will be unable to read
1208
the data back accurately. (Checking the lines have been split
1268
1209
correctly is expensive and extremely unlikely to catch bugs so it
1269
1210
is not done at runtime unless check_content is True.)
1270
1211
:param parent_texts: An optional dictionary containing the opaque
1365
1306
self._check_lines_not_unicode(lines)
1366
1307
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._transitive_fallbacks():
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1368
1325
def get_parent_map(self, keys):
1369
1326
"""Get a map of the graph parents of keys.
1509
1466
The returned objects should be in the order defined by 'ordering',
1510
1467
which can weave between different sources.
1512
1468
:param ordering: Must be one of 'topological' or 'groupcompress'
1513
1469
:return: List of [(source, [keys])] tuples, such that all keys are in
1514
1470
the defined order, regardless of source.
1516
1472
if ordering == 'topological':
1517
present_keys = tsort.topo_sort(parent_map)
1473
present_keys = topo_sort(parent_map)
1519
1475
# ordering == 'groupcompress'
1520
1476
# XXX: This only optimizes for the target ordering. We may need
1609
1565
# - we encounter an unadded ref, or
1610
1566
# - we run out of keys, or
1611
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1612
batcher = _BatchingBlockFetcher(self, locations,
1613
get_compressor_settings=self._get_compressor_settings)
1568
batcher = _BatchingBlockFetcher(self, locations)
1614
1569
for source, keys in source_keys:
1615
1570
if source is self:
1616
1571
for key in keys:
1662
1617
for _ in self._insert_record_stream(stream, random_id=False):
1665
def _get_compressor_settings(self):
1666
if self._max_bytes_to_index is None:
1667
# TODO: VersionedFiles don't know about their containing
1668
# repository, so they don't have much of an idea about their
1669
# location. So for now, this is only a global option.
1670
c = config.GlobalConfig()
1671
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1675
except ValueError, e:
1676
trace.warning('Value for '
1677
'"bzr.groupcompress.max_bytes_to_index"'
1678
' %r is not an integer'
1682
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1683
self._max_bytes_to_index = val
1684
return {'max_bytes_to_index': self._max_bytes_to_index}
1686
def _make_group_compressor(self):
1687
return GroupCompressor(self._get_compressor_settings())
1689
1620
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1690
1621
reuse_blocks=True):
1691
1622
"""Internal core to insert a record stream into this container.
1715
1646
# This will go up to fulltexts for gc to gc fetching, which isn't
1717
self._compressor = self._make_group_compressor()
1648
self._compressor = GroupCompressor()
1718
1649
self._unadded_refs = {}
1719
1650
keys_to_add = []
1721
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1722
self._compressor = self._make_group_compressor()
1653
self._compressor = GroupCompressor()
1723
1654
# Note: At this point we still have 1 copy of the fulltext (in
1724
1655
# record and the var 'bytes'), and this generates 2 copies of
1725
1656
# the compressed text (one for bytes, one in chunks)
1755
1686
raise errors.RevisionNotPresent(record.key, self)
1757
1688
if record.key in inserted_keys:
1758
trace.note(gettext('Insert claimed random_id=True,'
1759
' but then inserted %r two times'), record.key)
1689
trace.note('Insert claimed random_id=True,'
1690
' but then inserted %r two times', record.key)
1761
1692
inserted_keys.add(record.key)
1762
1693
if reuse_blocks:
1990
1921
# repeated over and over, this creates a surplus of ints
1991
1922
self._int_cache = {}
1992
1923
if track_external_parent_refs:
1993
self._key_dependencies = _KeyRefs(
1924
self._key_dependencies = knit._KeyRefs(
1994
1925
track_new_keys=track_new_keys)
1996
1927
self._key_dependencies = None
2136
2067
:param keys: An iterable of keys.
2137
2068
:return: A dict of key:
2138
2069
(index_memo, compression_parent, parents, record_details).
2140
* index_memo: opaque structure to pass to read_records to extract
2142
* compression_parent: Content that this record is built upon, may
2144
* parents: Logical parents of this node
2145
* record_details: extra information about the content which needs
2146
to be passed to Factory.parse_record
2071
opaque structure to pass to read_records to extract the raw
2074
Content that this record is built upon, may be None
2076
Logical parents of this node
2078
extra information about the content which needs to be passed to
2079
Factory.parse_record
2148
2081
self._check_read()