1
# Copyright (C) 2008-2011 Canonical Ltd
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
33
30
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
44
37
from bzrlib.btree_index import BTreeBuilder
45
38
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
46
40
from bzrlib.versionedfile import (
49
42
AbsentContentFactory,
50
43
ChunkedContentFactory,
51
44
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
55
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
466
459
# Grab and cache the raw bytes for this entry
467
460
# and break the ref-cycle with _manager since we don't need it
470
self._manager._prepare_for_extract()
471
except zlib.error as value:
472
raise errors.DecompressCorruption("zlib: " + str(value))
462
self._manager._prepare_for_extract()
473
463
block = self._manager._block
474
464
self._bytes = block.extract(self.key, self._start, self._end)
475
465
# There are code paths that first extract as fulltext, and then
494
484
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
485
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
497
def __init__(self, block, get_compressor_settings=None):
487
def __init__(self, block):
498
488
self._block = block
499
489
# We need to preserve the ordering
500
490
self._factories = []
501
491
self._last_byte = 0
502
self._get_settings = get_compressor_settings
503
self._compressor_settings = None
505
def _get_compressor_settings(self):
506
if self._compressor_settings is not None:
507
return self._compressor_settings
509
if self._get_settings is not None:
510
settings = self._get_settings()
512
vf = GroupCompressVersionedFiles
513
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
self._compressor_settings = settings
515
return self._compressor_settings
517
493
def add_factory(self, key, parents, start, end):
518
494
if not self._factories:
551
527
new_block.set_content(self._block._content[:last_byte])
552
528
self._block = new_block
554
def _make_group_compressor(self):
555
return GroupCompressor(self._get_compressor_settings())
557
530
def _rebuild_block(self):
558
531
"""Create a new GroupCompressBlock with only the referenced texts."""
559
compressor = self._make_group_compressor()
532
compressor = GroupCompressor()
560
533
tstart = time.time()
561
534
old_length = self._block._content_length
574
547
# block? It seems hard to come up with a method that it would
575
548
# expand, since we do full compression again. Perhaps based on a
576
549
# request that ends up poorly ordered?
577
# TODO: If the content would have expanded, then we would want to
578
# handle a case where we need to split the block.
579
# Now that we have a user-tweakable option
580
# (max_bytes_to_index), it is possible that one person set it
581
# to a very low value, causing poor compression.
582
550
delta = time.time() - tstart
583
551
self._block = new_block
584
552
trace.mutter('creating new compressed block on-the-fly in %.3fs'
816
784
self.labels_deltas = {}
817
785
self._delta_index = None # Set by the children
818
786
self._block = GroupCompressBlock()
822
self._settings = settings
824
788
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
825
789
"""Compress lines with label key.
941
905
class PythonGroupCompressor(_CommonGroupCompressor):
943
def __init__(self, settings=None):
944
908
"""Create a GroupCompressor.
946
910
Used only if the pyrex version is not available.
948
super(PythonGroupCompressor, self).__init__(settings)
912
super(PythonGroupCompressor, self).__init__()
949
913
self._delta_index = LinesDeltaIndex([])
950
914
# The actual content is managed by LinesDeltaIndex
951
915
self.chunks = self._delta_index.lines
989
953
It contains code very similar to SequenceMatcher because of having a similar
990
954
task. However some key differences apply:
992
* there is no junk, we want a minimal edit not a human readable diff.
993
* we don't filter very common lines (because we don't know where a good
994
range will start, and after the first text we want to be emitting minmal
996
* we chain the left side, not the right side
997
* we incrementally update the adjacency matrix as new lines are provided.
998
* we look for matches in all of the left side, so the routine which does
999
the analagous task of find_longest_match does not need to filter on the
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
1003
def __init__(self, settings=None):
1004
super(PyrexGroupCompressor, self).__init__(settings)
1005
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
1008
970
def _compress(self, key, bytes, max_delta_size, soft=False):
1009
971
"""see _CommonGroupCompressor._compress"""
1084
1046
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1085
1047
add_callback=graph_index.add_nodes,
1086
1048
inconsistency_fatal=inconsistency_fatal)
1087
access = pack_repo._DirectPackAccess({})
1049
access = knit._DirectPackAccess({})
1088
1050
access.set_writer(writer, graph_index, (transport, 'newpack'))
1089
1051
result = GroupCompressVersionedFiles(index, access, delta)
1090
1052
result.stream = stream
1101
1063
class _BatchingBlockFetcher(object):
1102
1064
"""Fetch group compress blocks in batches.
1104
1066
:ivar total_bytes: int of expected number of bytes needed to fetch the
1105
1067
currently pending batch.
1108
def __init__(self, gcvf, locations, get_compressor_settings=None):
1070
def __init__(self, gcvf, locations):
1109
1071
self.gcvf = gcvf
1110
1072
self.locations = locations
1160
1121
def yield_factories(self, full_flush=False):
1161
1122
"""Yield factories for keys added since the last yield. They will be
1162
1123
returned in the order they were added via add_key.
1164
1125
:param full_flush: by default, some results may not be returned in case
1165
1126
they can be part of the next batch. If full_flush is True, then
1166
1127
all results are returned.
1194
1155
memos_to_get_stack.pop()
1196
1157
block = self.batch_memos[read_memo]
1197
self.manager = _LazyGroupContentManager(block,
1198
get_compressor_settings=self._get_compressor_settings)
1158
self.manager = _LazyGroupContentManager(block)
1199
1159
self.last_read_memo = read_memo
1200
1160
start, end = index_memo[3:5]
1201
1161
self.manager.add_factory(key, parents, start, end)
1208
1168
self.total_bytes = 0
1211
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1171
class GroupCompressVersionedFiles(VersionedFiles):
1212
1172
"""A group-compress based VersionedFiles implementation."""
1214
# This controls how the GroupCompress DeltaIndex works. Basically, we
1215
# compute hash pointers into the source blocks (so hash(text) => text).
1216
# However each of these references costs some memory in trade against a
1217
# more accurate match result. For very large files, they either are
1218
# pre-compressed and change in bulk whenever they change, or change in just
1219
# local blocks. Either way, 'improved resolution' is not very helpful,
1220
# versus running out of memory trying to track everything. The default max
1221
# gives 100% sampling of a 1MB file.
1222
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
_DEFAULT_MAX_BYTES_TO_INDEX}
1226
def __init__(self, index, access, delta=True, _unadded_refs=None,
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
1228
1175
"""Create a GroupCompressVersionedFiles object.
1230
1177
:param index: The index object storing access and graph data.
1231
1178
:param access: The access object storing raw data.
1232
1179
:param delta: Whether to delta compress or just entropy compress.
1233
1180
:param _unadded_refs: private parameter, don't use.
1234
:param _group_cache: private parameter, don't use.
1236
1182
self._index = index
1237
1183
self._access = access
1239
1185
if _unadded_refs is None:
1240
1186
_unadded_refs = {}
1241
1187
self._unadded_refs = _unadded_refs
1242
if _group_cache is None:
1243
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
self._group_cache = _group_cache
1245
self._immediate_fallback_vfs = []
1246
self._max_bytes_to_index = None
1188
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
self._fallback_vfs = []
1248
1191
def without_fallbacks(self):
1249
1192
"""Return a clone of this object without any fallbacks configured."""
1250
1193
return GroupCompressVersionedFiles(self._index, self._access,
1251
self._delta, _unadded_refs=dict(self._unadded_refs),
1252
_group_cache=self._group_cache)
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1254
1196
def add_lines(self, key, parents, lines, parent_texts=None,
1255
1197
left_matching_blocks=None, nostore_sha=None, random_id=False,
1259
1201
:param key: The key tuple of the text to add.
1260
1202
:param parents: The parents key tuples of the text to add.
1261
1203
:param lines: A list of lines. Each line must be a bytestring. And all
1262
of them except the last must be terminated with \\n and contain no
1263
other \\n's. The last line may either contain no \\n's or a single
1264
terminating \\n. If the lines list does meet this constraint the
1265
add routine may error or may succeed - but you will be unable to
1266
read the data back accurately. (Checking the lines have been split
1204
of them except the last must be terminated with \n and contain no
1205
other \n's. The last line may either contain no \n's or a single
1206
terminating \n. If the lines list does meet this constraint the add
1207
routine may error or may succeed - but you will be unable to read
1208
the data back accurately. (Checking the lines have been split
1267
1209
correctly is expensive and extremely unlikely to catch bugs so it
1268
1210
is not done at runtime unless check_content is True.)
1269
1211
:param parent_texts: An optional dictionary containing the opaque
1364
1306
self._check_lines_not_unicode(lines)
1365
1307
self._check_lines_are_lines(lines)
1309
def get_known_graph_ancestry(self, keys):
1310
"""Get a KnownGraph instance with the ancestry of keys."""
1311
# Note that this is identical to
1312
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1314
parent_map, missing_keys = self._index.find_ancestry(keys)
1315
for fallback in self._fallback_vfs:
1316
if not missing_keys:
1318
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1320
parent_map.update(f_parent_map)
1321
missing_keys = f_missing_keys
1322
kg = _mod_graph.KnownGraph(parent_map)
1367
1325
def get_parent_map(self, keys):
1368
1326
"""Get a map of the graph parents of keys.
1508
1466
The returned objects should be in the order defined by 'ordering',
1509
1467
which can weave between different sources.
1511
1468
:param ordering: Must be one of 'topological' or 'groupcompress'
1512
1469
:return: List of [(source, [keys])] tuples, such that all keys are in
1513
1470
the defined order, regardless of source.
1515
1472
if ordering == 'topological':
1516
present_keys = tsort.topo_sort(parent_map)
1473
present_keys = topo_sort(parent_map)
1518
1475
# ordering == 'groupcompress'
1519
1476
# XXX: This only optimizes for the target ordering. We may need
1608
1565
# - we encounter an unadded ref, or
1609
1566
# - we run out of keys, or
1610
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1611
batcher = _BatchingBlockFetcher(self, locations,
1612
get_compressor_settings=self._get_compressor_settings)
1568
batcher = _BatchingBlockFetcher(self, locations)
1613
1569
for source, keys in source_keys:
1614
1570
if source is self:
1615
1571
for key in keys:
1661
1617
for _ in self._insert_record_stream(stream, random_id=False):
1664
def _get_compressor_settings(self):
1665
if self._max_bytes_to_index is None:
1666
# TODO: VersionedFiles don't know about their containing
1667
# repository, so they don't have much of an idea about their
1668
# location. So for now, this is only a global option.
1669
c = config.GlobalConfig()
1670
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1674
except ValueError, e:
1675
trace.warning('Value for '
1676
'"bzr.groupcompress.max_bytes_to_index"'
1677
' %r is not an integer'
1681
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
self._max_bytes_to_index = val
1683
return {'max_bytes_to_index': self._max_bytes_to_index}
1685
def _make_group_compressor(self):
1686
return GroupCompressor(self._get_compressor_settings())
1688
1620
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1621
reuse_blocks=True):
1690
1622
"""Internal core to insert a record stream into this container.
1714
1646
# This will go up to fulltexts for gc to gc fetching, which isn't
1716
self._compressor = self._make_group_compressor()
1648
self._compressor = GroupCompressor()
1717
1649
self._unadded_refs = {}
1718
1650
keys_to_add = []
1720
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1721
self._compressor = self._make_group_compressor()
1653
self._compressor = GroupCompressor()
1722
1654
# Note: At this point we still have 1 copy of the fulltext (in
1723
1655
# record and the var 'bytes'), and this generates 2 copies of
1724
1656
# the compressed text (one for bytes, one in chunks)
1900
1832
"""See VersionedFiles.keys."""
1901
1833
if 'evil' in debug.debug_flags:
1902
1834
trace.mutter_callsite(2, "keys scales with size of history")
1903
sources = [self._index] + self._immediate_fallback_vfs
1835
sources = [self._index] + self._fallback_vfs
1905
1837
for source in sources:
1906
1838
result.update(source.keys())
1989
1921
# repeated over and over, this creates a surplus of ints
1990
1922
self._int_cache = {}
1991
1923
if track_external_parent_refs:
1992
self._key_dependencies = _KeyRefs(
1924
self._key_dependencies = knit._KeyRefs(
1993
1925
track_new_keys=track_new_keys)
1995
1927
self._key_dependencies = None
2135
2067
:param keys: An iterable of keys.
2136
2068
:return: A dict of key:
2137
2069
(index_memo, compression_parent, parents, record_details).
2139
* index_memo: opaque structure to pass to read_records to extract
2141
* compression_parent: Content that this record is built upon, may
2143
* parents: Logical parents of this node
2144
* record_details: extra information about the content which needs
2145
to be passed to Factory.parse_record
2071
opaque structure to pass to read_records to extract the raw
2074
Content that this record is built upon, may be None
2076
Logical parents of this node
2078
extra information about the content which needs to be passed to
2079
Factory.parse_record
2147
2081
self._check_read()