23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
33
30
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
42
from bzrlib.i18n import gettext
45
37
from bzrlib.btree_index import BTreeBuilder
46
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.repofmt import pack_repo
40
from bzrlib.tsort import topo_sort
47
41
from bzrlib.versionedfile import (
50
43
AbsentContentFactory,
51
44
ChunkedContentFactory,
52
45
FulltextContentFactory,
53
VersionedFilesWithFallbacks,
56
49
# Minimum number of uncompressed bytes to try fetch at once when retrieving
467
460
# Grab and cache the raw bytes for this entry
468
461
# and break the ref-cycle with _manager since we don't need it
471
self._manager._prepare_for_extract()
472
except zlib.error as value:
473
raise errors.DecompressCorruption("zlib: " + str(value))
463
self._manager._prepare_for_extract()
474
464
block = self._manager._block
475
465
self._bytes = block.extract(self.key, self._start, self._end)
476
466
# There are code paths that first extract as fulltext, and then
495
485
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
496
486
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
498
def __init__(self, block, get_compressor_settings=None):
488
def __init__(self, block):
499
489
self._block = block
500
490
# We need to preserve the ordering
501
491
self._factories = []
502
492
self._last_byte = 0
503
self._get_settings = get_compressor_settings
504
self._compressor_settings = None
506
def _get_compressor_settings(self):
507
if self._compressor_settings is not None:
508
return self._compressor_settings
510
if self._get_settings is not None:
511
settings = self._get_settings()
513
vf = GroupCompressVersionedFiles
514
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
515
self._compressor_settings = settings
516
return self._compressor_settings
518
494
def add_factory(self, key, parents, start, end):
519
495
if not self._factories:
552
528
new_block.set_content(self._block._content[:last_byte])
553
529
self._block = new_block
555
def _make_group_compressor(self):
556
return GroupCompressor(self._get_compressor_settings())
558
531
def _rebuild_block(self):
559
532
"""Create a new GroupCompressBlock with only the referenced texts."""
560
compressor = self._make_group_compressor()
533
compressor = GroupCompressor()
561
534
tstart = time.time()
562
535
old_length = self._block._content_length
575
548
# block? It seems hard to come up with a method that it would
576
549
# expand, since we do full compression again. Perhaps based on a
577
550
# request that ends up poorly ordered?
578
# TODO: If the content would have expanded, then we would want to
579
# handle a case where we need to split the block.
580
# Now that we have a user-tweakable option
581
# (max_bytes_to_index), it is possible that one person set it
582
# to a very low value, causing poor compression.
583
551
delta = time.time() - tstart
584
552
self._block = new_block
585
553
trace.mutter('creating new compressed block on-the-fly in %.3fs'
817
785
self.labels_deltas = {}
818
786
self._delta_index = None # Set by the children
819
787
self._block = GroupCompressBlock()
823
self._settings = settings
825
789
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
826
790
"""Compress lines with label key.
942
906
class PythonGroupCompressor(_CommonGroupCompressor):
944
def __init__(self, settings=None):
945
909
"""Create a GroupCompressor.
947
911
Used only if the pyrex version is not available.
949
super(PythonGroupCompressor, self).__init__(settings)
913
super(PythonGroupCompressor, self).__init__()
950
914
self._delta_index = LinesDeltaIndex([])
951
915
# The actual content is managed by LinesDeltaIndex
952
916
self.chunks = self._delta_index.lines
990
954
It contains code very similar to SequenceMatcher because of having a similar
991
955
task. However some key differences apply:
993
* there is no junk, we want a minimal edit not a human readable diff.
994
* we don't filter very common lines (because we don't know where a good
995
range will start, and after the first text we want to be emitting minmal
997
* we chain the left side, not the right side
998
* we incrementally update the adjacency matrix as new lines are provided.
999
* we look for matches in all of the left side, so the routine which does
1000
the analagous task of find_longest_match does not need to filter on the
956
- there is no junk, we want a minimal edit not a human readable diff.
957
- we don't filter very common lines (because we don't know where a good
958
range will start, and after the first text we want to be emitting minmal
960
- we chain the left side, not the right side
961
- we incrementally update the adjacency matrix as new lines are provided.
962
- we look for matches in all of the left side, so the routine which does
963
the analagous task of find_longest_match does not need to filter on the
1004
def __init__(self, settings=None):
1005
super(PyrexGroupCompressor, self).__init__(settings)
1006
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1007
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
968
super(PyrexGroupCompressor, self).__init__()
969
self._delta_index = DeltaIndex()
1009
971
def _compress(self, key, bytes, max_delta_size, soft=False):
1010
972
"""see _CommonGroupCompressor._compress"""
1102
1064
class _BatchingBlockFetcher(object):
1103
1065
"""Fetch group compress blocks in batches.
1105
1067
:ivar total_bytes: int of expected number of bytes needed to fetch the
1106
1068
currently pending batch.
1109
def __init__(self, gcvf, locations, get_compressor_settings=None):
1071
def __init__(self, gcvf, locations):
1110
1072
self.gcvf = gcvf
1111
1073
self.locations = locations
1161
1122
def yield_factories(self, full_flush=False):
1162
1123
"""Yield factories for keys added since the last yield. They will be
1163
1124
returned in the order they were added via add_key.
1165
1126
:param full_flush: by default, some results may not be returned in case
1166
1127
they can be part of the next batch. If full_flush is True, then
1167
1128
all results are returned.
1195
1156
memos_to_get_stack.pop()
1197
1158
block = self.batch_memos[read_memo]
1198
self.manager = _LazyGroupContentManager(block,
1199
get_compressor_settings=self._get_compressor_settings)
1159
self.manager = _LazyGroupContentManager(block)
1200
1160
self.last_read_memo = read_memo
1201
1161
start, end = index_memo[3:5]
1202
1162
self.manager.add_factory(key, parents, start, end)
1209
1169
self.total_bytes = 0
1212
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1172
class GroupCompressVersionedFiles(VersionedFiles):
1213
1173
"""A group-compress based VersionedFiles implementation."""
1215
# This controls how the GroupCompress DeltaIndex works. Basically, we
1216
# compute hash pointers into the source blocks (so hash(text) => text).
1217
# However each of these references costs some memory in trade against a
1218
# more accurate match result. For very large files, they either are
1219
# pre-compressed and change in bulk whenever they change, or change in just
1220
# local blocks. Either way, 'improved resolution' is not very helpful,
1221
# versus running out of memory trying to track everything. The default max
1222
# gives 100% sampling of a 1MB file.
1223
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1224
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1225
_DEFAULT_MAX_BYTES_TO_INDEX}
1227
def __init__(self, index, access, delta=True, _unadded_refs=None,
1175
def __init__(self, index, access, delta=True, _unadded_refs=None):
1229
1176
"""Create a GroupCompressVersionedFiles object.
1231
1178
:param index: The index object storing access and graph data.
1232
1179
:param access: The access object storing raw data.
1233
1180
:param delta: Whether to delta compress or just entropy compress.
1234
1181
:param _unadded_refs: private parameter, don't use.
1235
:param _group_cache: private parameter, don't use.
1237
1183
self._index = index
1238
1184
self._access = access
1240
1186
if _unadded_refs is None:
1241
1187
_unadded_refs = {}
1242
1188
self._unadded_refs = _unadded_refs
1243
if _group_cache is None:
1244
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1245
self._group_cache = _group_cache
1189
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1246
1190
self._immediate_fallback_vfs = []
1247
self._max_bytes_to_index = None
1249
1192
def without_fallbacks(self):
1250
1193
"""Return a clone of this object without any fallbacks configured."""
1251
1194
return GroupCompressVersionedFiles(self._index, self._access,
1252
self._delta, _unadded_refs=dict(self._unadded_refs),
1253
_group_cache=self._group_cache)
1195
self._delta, _unadded_refs=dict(self._unadded_refs))
1255
1197
def add_lines(self, key, parents, lines, parent_texts=None,
1256
1198
left_matching_blocks=None, nostore_sha=None, random_id=False,
1260
1202
:param key: The key tuple of the text to add.
1261
1203
:param parents: The parents key tuples of the text to add.
1262
1204
:param lines: A list of lines. Each line must be a bytestring. And all
1263
of them except the last must be terminated with \\n and contain no
1264
other \\n's. The last line may either contain no \\n's or a single
1265
terminating \\n. If the lines list does meet this constraint the
1266
add routine may error or may succeed - but you will be unable to
1267
read the data back accurately. (Checking the lines have been split
1205
of them except the last must be terminated with \n and contain no
1206
other \n's. The last line may either contain no \n's or a single
1207
terminating \n. If the lines list does meet this constraint the add
1208
routine may error or may succeed - but you will be unable to read
1209
the data back accurately. (Checking the lines have been split
1268
1210
correctly is expensive and extremely unlikely to catch bugs so it
1269
1211
is not done at runtime unless check_content is True.)
1270
1212
:param parent_texts: An optional dictionary containing the opaque
1365
1307
self._check_lines_not_unicode(lines)
1366
1308
self._check_lines_are_lines(lines)
1310
def get_known_graph_ancestry(self, keys):
1311
"""Get a KnownGraph instance with the ancestry of keys."""
1312
# Note that this is identical to
1313
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1315
parent_map, missing_keys = self._index.find_ancestry(keys)
1316
for fallback in self._transitive_fallbacks():
1317
if not missing_keys:
1319
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1321
parent_map.update(f_parent_map)
1322
missing_keys = f_missing_keys
1323
kg = _mod_graph.KnownGraph(parent_map)
1368
1326
def get_parent_map(self, keys):
1369
1327
"""Get a map of the graph parents of keys.
1509
1467
The returned objects should be in the order defined by 'ordering',
1510
1468
which can weave between different sources.
1512
1469
:param ordering: Must be one of 'topological' or 'groupcompress'
1513
1470
:return: List of [(source, [keys])] tuples, such that all keys are in
1514
1471
the defined order, regardless of source.
1516
1473
if ordering == 'topological':
1517
present_keys = tsort.topo_sort(parent_map)
1474
present_keys = topo_sort(parent_map)
1519
1476
# ordering == 'groupcompress'
1520
1477
# XXX: This only optimizes for the target ordering. We may need
1609
1566
# - we encounter an unadded ref, or
1610
1567
# - we run out of keys, or
1611
1568
# - the total bytes to retrieve for this batch > BATCH_SIZE
1612
batcher = _BatchingBlockFetcher(self, locations,
1613
get_compressor_settings=self._get_compressor_settings)
1569
batcher = _BatchingBlockFetcher(self, locations)
1614
1570
for source, keys in source_keys:
1615
1571
if source is self:
1616
1572
for key in keys:
1662
1618
for _ in self._insert_record_stream(stream, random_id=False):
1665
def _get_compressor_settings(self):
1666
if self._max_bytes_to_index is None:
1667
# TODO: VersionedFiles don't know about their containing
1668
# repository, so they don't have much of an idea about their
1669
# location. So for now, this is only a global option.
1670
c = config.GlobalConfig()
1671
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1675
except ValueError, e:
1676
trace.warning('Value for '
1677
'"bzr.groupcompress.max_bytes_to_index"'
1678
' %r is not an integer'
1682
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1683
self._max_bytes_to_index = val
1684
return {'max_bytes_to_index': self._max_bytes_to_index}
1686
def _make_group_compressor(self):
1687
return GroupCompressor(self._get_compressor_settings())
1689
1621
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1690
1622
reuse_blocks=True):
1691
1623
"""Internal core to insert a record stream into this container.
1715
1647
# This will go up to fulltexts for gc to gc fetching, which isn't
1717
self._compressor = self._make_group_compressor()
1649
self._compressor = GroupCompressor()
1718
1650
self._unadded_refs = {}
1719
1651
keys_to_add = []
1721
1653
bytes_len, chunks = self._compressor.flush().to_chunks()
1722
self._compressor = self._make_group_compressor()
1654
self._compressor = GroupCompressor()
1723
1655
# Note: At this point we still have 1 copy of the fulltext (in
1724
1656
# record and the var 'bytes'), and this generates 2 copies of
1725
1657
# the compressed text (one for bytes, one in chunks)
1755
1687
raise errors.RevisionNotPresent(record.key, self)
1757
1689
if record.key in inserted_keys:
1758
trace.note(gettext('Insert claimed random_id=True,'
1759
' but then inserted %r two times'), record.key)
1690
trace.note('Insert claimed random_id=True,'
1691
' but then inserted %r two times', record.key)
1761
1693
inserted_keys.add(record.key)
1762
1694
if reuse_blocks:
1990
1922
# repeated over and over, this creates a surplus of ints
1991
1923
self._int_cache = {}
1992
1924
if track_external_parent_refs:
1993
self._key_dependencies = _KeyRefs(
1925
self._key_dependencies = knit._KeyRefs(
1994
1926
track_new_keys=track_new_keys)
1996
1928
self._key_dependencies = None
2136
2068
:param keys: An iterable of keys.
2137
2069
:return: A dict of key:
2138
2070
(index_memo, compression_parent, parents, record_details).
2140
* index_memo: opaque structure to pass to read_records to extract
2142
* compression_parent: Content that this record is built upon, may
2144
* parents: Logical parents of this node
2145
* record_details: extra information about the content which needs
2146
to be passed to Factory.parse_record
2072
opaque structure to pass to read_records to extract the raw
2075
Content that this record is built upon, may be None
2077
Logical parents of this node
2079
extra information about the content which needs to be passed to
2080
Factory.parse_record
2148
2082
self._check_read()