1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
from bzrlib.lazy_import import lazy_import
23
lazy_import(globals(), """
26
24
from bzrlib import (
30
29
graph as _mod_graph,
37
from bzrlib.repofmt import pack_repo
38
from bzrlib.i18n import gettext
37
41
from bzrlib.btree_index import BTreeBuilder
38
42
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
43
from bzrlib.versionedfile import (
42
46
AbsentContentFactory,
43
47
ChunkedContentFactory,
44
48
FulltextContentFactory,
49
VersionedFilesWithFallbacks,
48
52
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
53
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
54
56
# osutils.sha_string('')
55
57
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
145
147
self._content = ''
146
148
elif self._compressor_name == 'lzma':
147
149
# We don't do partial lzma decomp yet
148
151
self._content = pylzma.decompress(z_content)
149
152
elif self._compressor_name == 'zlib':
150
153
# Start a zlib decompressor
291
294
self._content = content
292
295
self._z_content_chunks = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
304
297
def _create_z_content_from_chunks(self, chunks):
305
298
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
299
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
307
def _create_z_content(self):
315
308
if self._z_content_chunks is not None:
318
self._create_z_content_using_lzma()
320
310
if self._content_chunks is not None:
321
311
chunks = self._content_chunks
326
316
def to_chunks(self):
327
317
"""Create the byte stream as a series of 'chunks'"""
328
318
self._create_z_content()
330
header = self.GCB_LZ_HEADER
332
header = self.GCB_HEADER
319
header = self.GCB_HEADER
333
320
chunks = ['%s%d\n%d\n'
334
321
% (header, self._z_content_length, self._content_length),
459
446
# Grab and cache the raw bytes for this entry
460
447
# and break the ref-cycle with _manager since we don't need it
462
self._manager._prepare_for_extract()
450
self._manager._prepare_for_extract()
451
except zlib.error as value:
452
raise errors.DecompressCorruption("zlib: " + str(value))
463
453
block = self._manager._block
464
454
self._bytes = block.extract(self.key, self._start, self._end)
465
455
# There are code paths that first extract as fulltext, and then
484
474
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
475
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
487
def __init__(self, block):
477
def __init__(self, block, get_compressor_settings=None):
488
478
self._block = block
489
479
# We need to preserve the ordering
490
480
self._factories = []
491
481
self._last_byte = 0
482
self._get_settings = get_compressor_settings
483
self._compressor_settings = None
485
def _get_compressor_settings(self):
486
if self._compressor_settings is not None:
487
return self._compressor_settings
489
if self._get_settings is not None:
490
settings = self._get_settings()
492
vf = GroupCompressVersionedFiles
493
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
494
self._compressor_settings = settings
495
return self._compressor_settings
493
497
def add_factory(self, key, parents, start, end):
494
498
if not self._factories:
527
531
new_block.set_content(self._block._content[:last_byte])
528
532
self._block = new_block
534
def _make_group_compressor(self):
535
return GroupCompressor(self._get_compressor_settings())
530
537
def _rebuild_block(self):
531
538
"""Create a new GroupCompressBlock with only the referenced texts."""
532
compressor = GroupCompressor()
539
compressor = self._make_group_compressor()
533
540
tstart = time.time()
534
541
old_length = self._block._content_length
547
554
# block? It seems hard to come up with a method that it would
548
555
# expand, since we do full compression again. Perhaps based on a
549
556
# request that ends up poorly ordered?
557
# TODO: If the content would have expanded, then we would want to
558
# handle a case where we need to split the block.
559
# Now that we have a user-tweakable option
560
# (max_bytes_to_index), it is possible that one person set it
561
# to a very low value, causing poor compression.
550
562
delta = time.time() - tstart
551
563
self._block = new_block
552
564
trace.mutter('creating new compressed block on-the-fly in %.3fs'
784
796
self.labels_deltas = {}
785
797
self._delta_index = None # Set by the children
786
798
self._block = GroupCompressBlock()
802
self._settings = settings
788
804
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
789
805
"""Compress lines with label key.
905
921
class PythonGroupCompressor(_CommonGroupCompressor):
923
def __init__(self, settings=None):
908
924
"""Create a GroupCompressor.
910
926
Used only if the pyrex version is not available.
912
super(PythonGroupCompressor, self).__init__()
928
super(PythonGroupCompressor, self).__init__(settings)
913
929
self._delta_index = LinesDeltaIndex([])
914
930
# The actual content is managed by LinesDeltaIndex
915
931
self.chunks = self._delta_index.lines
953
969
It contains code very similar to SequenceMatcher because of having a similar
954
970
task. However some key differences apply:
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
972
* there is no junk, we want a minimal edit not a human readable diff.
973
* we don't filter very common lines (because we don't know where a good
974
range will start, and after the first text we want to be emitting minmal
976
* we chain the left side, not the right side
977
* we incrementally update the adjacency matrix as new lines are provided.
978
* we look for matches in all of the left side, so the routine which does
979
the analagous task of find_longest_match does not need to filter on the
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
983
def __init__(self, settings=None):
984
super(PyrexGroupCompressor, self).__init__(settings)
985
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
986
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
970
988
def _compress(self, key, bytes, max_delta_size, soft=False):
971
989
"""see _CommonGroupCompressor._compress"""
1046
1064
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1047
1065
add_callback=graph_index.add_nodes,
1048
1066
inconsistency_fatal=inconsistency_fatal)
1049
access = knit._DirectPackAccess({})
1067
access = pack_repo._DirectPackAccess({})
1050
1068
access.set_writer(writer, graph_index, (transport, 'newpack'))
1051
1069
result = GroupCompressVersionedFiles(index, access, delta)
1052
1070
result.stream = stream
1063
1081
class _BatchingBlockFetcher(object):
1064
1082
"""Fetch group compress blocks in batches.
1066
1084
:ivar total_bytes: int of expected number of bytes needed to fetch the
1067
1085
currently pending batch.
1070
def __init__(self, gcvf, locations):
1088
def __init__(self, gcvf, locations, get_compressor_settings=None):
1071
1089
self.gcvf = gcvf
1072
1090
self.locations = locations
1121
1140
def yield_factories(self, full_flush=False):
1122
1141
"""Yield factories for keys added since the last yield. They will be
1123
1142
returned in the order they were added via add_key.
1125
1144
:param full_flush: by default, some results may not be returned in case
1126
1145
they can be part of the next batch. If full_flush is True, then
1127
1146
all results are returned.
1155
1174
memos_to_get_stack.pop()
1157
1176
block = self.batch_memos[read_memo]
1158
self.manager = _LazyGroupContentManager(block)
1177
self.manager = _LazyGroupContentManager(block,
1178
get_compressor_settings=self._get_compressor_settings)
1159
1179
self.last_read_memo = read_memo
1160
1180
start, end = index_memo[3:5]
1161
1181
self.manager.add_factory(key, parents, start, end)
1168
1188
self.total_bytes = 0
1171
class GroupCompressVersionedFiles(VersionedFiles):
1191
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1172
1192
"""A group-compress based VersionedFiles implementation."""
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1194
# This controls how the GroupCompress DeltaIndex works. Basically, we
1195
# compute hash pointers into the source blocks (so hash(text) => text).
1196
# However each of these references costs some memory in trade against a
1197
# more accurate match result. For very large files, they either are
1198
# pre-compressed and change in bulk whenever they change, or change in just
1199
# local blocks. Either way, 'improved resolution' is not very helpful,
1200
# versus running out of memory trying to track everything. The default max
1201
# gives 100% sampling of a 1MB file.
1202
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1203
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1204
_DEFAULT_MAX_BYTES_TO_INDEX}
1206
def __init__(self, index, access, delta=True, _unadded_refs=None,
1175
1208
"""Create a GroupCompressVersionedFiles object.
1177
1210
:param index: The index object storing access and graph data.
1178
1211
:param access: The access object storing raw data.
1179
1212
:param delta: Whether to delta compress or just entropy compress.
1180
1213
:param _unadded_refs: private parameter, don't use.
1214
:param _group_cache: private parameter, don't use.
1182
1216
self._index = index
1183
1217
self._access = access
1185
1219
if _unadded_refs is None:
1186
1220
_unadded_refs = {}
1187
1221
self._unadded_refs = _unadded_refs
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
self._fallback_vfs = []
1222
if _group_cache is None:
1223
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1224
self._group_cache = _group_cache
1225
self._immediate_fallback_vfs = []
1226
self._max_bytes_to_index = None
1191
1228
def without_fallbacks(self):
1192
1229
"""Return a clone of this object without any fallbacks configured."""
1193
1230
return GroupCompressVersionedFiles(self._index, self._access,
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1231
self._delta, _unadded_refs=dict(self._unadded_refs),
1232
_group_cache=self._group_cache)
1196
1234
def add_lines(self, key, parents, lines, parent_texts=None,
1197
1235
left_matching_blocks=None, nostore_sha=None, random_id=False,
1201
1239
:param key: The key tuple of the text to add.
1202
1240
:param parents: The parents key tuples of the text to add.
1203
1241
:param lines: A list of lines. Each line must be a bytestring. And all
1204
of them except the last must be terminated with \n and contain no
1205
other \n's. The last line may either contain no \n's or a single
1206
terminating \n. If the lines list does meet this constraint the add
1207
routine may error or may succeed - but you will be unable to read
1208
the data back accurately. (Checking the lines have been split
1242
of them except the last must be terminated with \\n and contain no
1243
other \\n's. The last line may either contain no \\n's or a single
1244
terminating \\n. If the lines list does meet this constraint the
1245
add routine may error or may succeed - but you will be unable to
1246
read the data back accurately. (Checking the lines have been split
1209
1247
correctly is expensive and extremely unlikely to catch bugs so it
1210
1248
is not done at runtime unless check_content is True.)
1211
1249
:param parent_texts: An optional dictionary containing the opaque
1306
1344
self._check_lines_not_unicode(lines)
1307
1345
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._fallback_vfs:
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1325
1347
def get_parent_map(self, keys):
1326
1348
"""Get a map of the graph parents of keys.
1466
1488
The returned objects should be in the order defined by 'ordering',
1467
1489
which can weave between different sources.
1468
1491
:param ordering: Must be one of 'topological' or 'groupcompress'
1469
1492
:return: List of [(source, [keys])] tuples, such that all keys are in
1470
1493
the defined order, regardless of source.
1472
1495
if ordering == 'topological':
1473
present_keys = topo_sort(parent_map)
1496
present_keys = tsort.topo_sort(parent_map)
1475
1498
# ordering == 'groupcompress'
1476
1499
# XXX: This only optimizes for the target ordering. We may need
1565
1588
# - we encounter an unadded ref, or
1566
1589
# - we run out of keys, or
1567
1590
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1591
batcher = _BatchingBlockFetcher(self, locations,
1592
get_compressor_settings=self._get_compressor_settings)
1569
1593
for source, keys in source_keys:
1570
1594
if source is self:
1571
1595
for key in keys:
1617
1641
for _ in self._insert_record_stream(stream, random_id=False):
1644
def _get_compressor_settings(self):
1645
if self._max_bytes_to_index is None:
1646
# TODO: VersionedFiles don't know about their containing
1647
# repository, so they don't have much of an idea about their
1648
# location. So for now, this is only a global option.
1649
c = config.GlobalConfig()
1650
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1654
except ValueError, e:
1655
trace.warning('Value for '
1656
'"bzr.groupcompress.max_bytes_to_index"'
1657
' %r is not an integer'
1661
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1662
self._max_bytes_to_index = val
1663
return {'max_bytes_to_index': self._max_bytes_to_index}
1665
def _make_group_compressor(self):
1666
return GroupCompressor(self._get_compressor_settings())
1620
1668
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1621
1669
reuse_blocks=True):
1622
1670
"""Internal core to insert a record stream into this container.
1646
1694
# This will go up to fulltexts for gc to gc fetching, which isn't
1648
self._compressor = GroupCompressor()
1696
self._compressor = self._make_group_compressor()
1649
1697
self._unadded_refs = {}
1650
1698
keys_to_add = []
1652
1700
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1701
self._compressor = self._make_group_compressor()
1654
1702
# Note: At this point we still have 1 copy of the fulltext (in
1655
1703
# record and the var 'bytes'), and this generates 2 copies of
1656
1704
# the compressed text (one for bytes, one in chunks)
1686
1734
raise errors.RevisionNotPresent(record.key, self)
1688
1736
if record.key in inserted_keys:
1689
trace.note('Insert claimed random_id=True,'
1690
' but then inserted %r two times', record.key)
1737
trace.note(gettext('Insert claimed random_id=True,'
1738
' but then inserted %r two times'), record.key)
1692
1740
inserted_keys.add(record.key)
1693
1741
if reuse_blocks:
1832
1880
"""See VersionedFiles.keys."""
1833
1881
if 'evil' in debug.debug_flags:
1834
1882
trace.mutter_callsite(2, "keys scales with size of history")
1835
sources = [self._index] + self._fallback_vfs
1883
sources = [self._index] + self._immediate_fallback_vfs
1837
1885
for source in sources:
1838
1886
result.update(source.keys())
1921
1969
# repeated over and over, this creates a surplus of ints
1922
1970
self._int_cache = {}
1923
1971
if track_external_parent_refs:
1924
self._key_dependencies = knit._KeyRefs(
1972
self._key_dependencies = _KeyRefs(
1925
1973
track_new_keys=track_new_keys)
1927
1975
self._key_dependencies = None
2067
2115
:param keys: An iterable of keys.
2068
2116
:return: A dict of key:
2069
2117
(index_memo, compression_parent, parents, record_details).
2071
opaque structure to pass to read_records to extract the raw
2074
Content that this record is built upon, may be None
2076
Logical parents of this node
2078
extra information about the content which needs to be passed to
2079
Factory.parse_record
2119
* index_memo: opaque structure to pass to read_records to extract
2121
* compression_parent: Content that this record is built upon, may
2123
* parents: Logical parents of this node
2124
* record_details: extra information about the content which needs
2125
to be passed to Factory.parse_record
2081
2127
self._check_read()