1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
33
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
42
49
AbsentContentFactory,
43
50
ChunkedContentFactory,
44
51
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
484
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
487
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
488
495
self._block = block
489
496
# We need to preserve the ordering
490
497
self._factories = []
491
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
493
514
def add_factory(self, key, parents, start, end):
494
515
if not self._factories:
527
548
new_block.set_content(self._block._content[:last_byte])
528
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
530
554
def _rebuild_block(self):
531
555
"""Create a new GroupCompressBlock with only the referenced texts."""
532
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
533
557
tstart = time.time()
534
558
old_length = self._block._content_length
547
571
# block? It seems hard to come up with a method that it would
548
572
# expand, since we do full compression again. Perhaps based on a
549
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
550
579
delta = time.time() - tstart
551
580
self._block = new_block
552
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
784
813
self.labels_deltas = {}
785
814
self._delta_index = None # Set by the children
786
815
self._block = GroupCompressBlock()
819
self._settings = settings
788
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
789
822
"""Compress lines with label key.
905
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
908
941
"""Create a GroupCompressor.
910
943
Used only if the pyrex version is not available.
912
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
913
946
self._delta_index = LinesDeltaIndex([])
914
947
# The actual content is managed by LinesDeltaIndex
915
948
self.chunks = self._delta_index.lines
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
999
def __init__(self, settings=None):
1000
super(PyrexGroupCompressor, self).__init__(settings)
1001
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1002
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
970
1004
def _compress(self, key, bytes, max_delta_size, soft=False):
971
1005
"""see _CommonGroupCompressor._compress"""
1046
1080
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1047
1081
add_callback=graph_index.add_nodes,
1048
1082
inconsistency_fatal=inconsistency_fatal)
1049
access = knit._DirectPackAccess({})
1083
access = pack_repo._DirectPackAccess({})
1050
1084
access.set_writer(writer, graph_index, (transport, 'newpack'))
1051
1085
result = GroupCompressVersionedFiles(index, access, delta)
1052
1086
result.stream = stream
1063
1097
class _BatchingBlockFetcher(object):
1064
1098
"""Fetch group compress blocks in batches.
1066
1100
:ivar total_bytes: int of expected number of bytes needed to fetch the
1067
1101
currently pending batch.
1070
def __init__(self, gcvf, locations):
1104
def __init__(self, gcvf, locations, get_compressor_settings=None):
1071
1105
self.gcvf = gcvf
1072
1106
self.locations = locations
1121
1156
def yield_factories(self, full_flush=False):
1122
1157
"""Yield factories for keys added since the last yield. They will be
1123
1158
returned in the order they were added via add_key.
1125
1160
:param full_flush: by default, some results may not be returned in case
1126
1161
they can be part of the next batch. If full_flush is True, then
1127
1162
all results are returned.
1155
1190
memos_to_get_stack.pop()
1157
1192
block = self.batch_memos[read_memo]
1158
self.manager = _LazyGroupContentManager(block)
1193
self.manager = _LazyGroupContentManager(block,
1194
get_compressor_settings=self._get_compressor_settings)
1159
1195
self.last_read_memo = read_memo
1160
1196
start, end = index_memo[3:5]
1161
1197
self.manager.add_factory(key, parents, start, end)
1168
1204
self.total_bytes = 0
1171
class GroupCompressVersionedFiles(VersionedFiles):
1207
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1172
1208
"""A group-compress based VersionedFiles implementation."""
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1210
# This controls how the GroupCompress DeltaIndex works. Basically, we
1211
# compute hash pointers into the source blocks (so hash(text) => text).
1212
# However each of these references costs some memory in trade against a
1213
# more accurate match result. For very large files, they either are
1214
# pre-compressed and change in bulk whenever they change, or change in just
1215
# local blocks. Either way, 'improved resolution' is not very helpful,
1216
# versus running out of memory trying to track everything. The default max
1217
# gives 100% sampling of a 1MB file.
1218
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1219
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1220
_DEFAULT_MAX_BYTES_TO_INDEX}
1222
def __init__(self, index, access, delta=True, _unadded_refs=None,
1175
1224
"""Create a GroupCompressVersionedFiles object.
1177
1226
:param index: The index object storing access and graph data.
1178
1227
:param access: The access object storing raw data.
1179
1228
:param delta: Whether to delta compress or just entropy compress.
1180
1229
:param _unadded_refs: private parameter, don't use.
1230
:param _group_cache: private parameter, don't use.
1182
1232
self._index = index
1183
1233
self._access = access
1185
1235
if _unadded_refs is None:
1186
1236
_unadded_refs = {}
1187
1237
self._unadded_refs = _unadded_refs
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
self._fallback_vfs = []
1238
if _group_cache is None:
1239
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1240
self._group_cache = _group_cache
1241
self._immediate_fallback_vfs = []
1242
self._max_bytes_to_index = None
1191
1244
def without_fallbacks(self):
1192
1245
"""Return a clone of this object without any fallbacks configured."""
1193
1246
return GroupCompressVersionedFiles(self._index, self._access,
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1247
self._delta, _unadded_refs=dict(self._unadded_refs),
1248
_group_cache=self._group_cache)
1196
1250
def add_lines(self, key, parents, lines, parent_texts=None,
1197
1251
left_matching_blocks=None, nostore_sha=None, random_id=False,
1306
1360
self._check_lines_not_unicode(lines)
1307
1361
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._fallback_vfs:
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1325
1363
def get_parent_map(self, keys):
1326
1364
"""Get a map of the graph parents of keys.
1470
1508
the defined order, regardless of source.
1472
1510
if ordering == 'topological':
1473
present_keys = topo_sort(parent_map)
1511
present_keys = tsort.topo_sort(parent_map)
1475
1513
# ordering == 'groupcompress'
1476
1514
# XXX: This only optimizes for the target ordering. We may need
1565
1603
# - we encounter an unadded ref, or
1566
1604
# - we run out of keys, or
1567
1605
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1606
batcher = _BatchingBlockFetcher(self, locations,
1607
get_compressor_settings=self._get_compressor_settings)
1569
1608
for source, keys in source_keys:
1570
1609
if source is self:
1571
1610
for key in keys:
1617
1656
for _ in self._insert_record_stream(stream, random_id=False):
1659
def _get_compressor_settings(self):
1660
if self._max_bytes_to_index is None:
1661
# TODO: VersionedFiles don't know about their containing
1662
# repository, so they don't have much of an idea about their
1663
# location. So for now, this is only a global option.
1664
c = config.GlobalConfig()
1665
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1669
except ValueError, e:
1670
trace.warning('Value for '
1671
'"bzr.groupcompress.max_bytes_to_index"'
1672
' %r is not an integer'
1676
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1677
self._max_bytes_to_index = val
1678
return {'max_bytes_to_index': self._max_bytes_to_index}
1680
def _make_group_compressor(self):
1681
return GroupCompressor(self._get_compressor_settings())
1620
1683
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1621
1684
reuse_blocks=True):
1622
1685
"""Internal core to insert a record stream into this container.
1646
1709
# This will go up to fulltexts for gc to gc fetching, which isn't
1648
self._compressor = GroupCompressor()
1711
self._compressor = self._make_group_compressor()
1649
1712
self._unadded_refs = {}
1650
1713
keys_to_add = []
1652
1715
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1716
self._compressor = self._make_group_compressor()
1654
1717
# Note: At this point we still have 1 copy of the fulltext (in
1655
1718
# record and the var 'bytes'), and this generates 2 copies of
1656
1719
# the compressed text (one for bytes, one in chunks)
1832
1895
"""See VersionedFiles.keys."""
1833
1896
if 'evil' in debug.debug_flags:
1834
1897
trace.mutter_callsite(2, "keys scales with size of history")
1835
sources = [self._index] + self._fallback_vfs
1898
sources = [self._index] + self._immediate_fallback_vfs
1837
1900
for source in sources:
1838
1901
result.update(source.keys())
1921
1984
# repeated over and over, this creates a surplus of ints
1922
1985
self._int_cache = {}
1923
1986
if track_external_parent_refs:
1924
self._key_dependencies = knit._KeyRefs(
1987
self._key_dependencies = _KeyRefs(
1925
1988
track_new_keys=track_new_keys)
1927
1990
self._key_dependencies = None