23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
26
28
from bzrlib import (
30
33
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
37
44
from bzrlib.btree_index import BTreeBuilder
38
45
from bzrlib.lru_cache import LRUSizeCache
39
from bzrlib.tsort import topo_sort
40
46
from bzrlib.versionedfile import (
42
49
AbsentContentFactory,
43
50
ChunkedContentFactory,
44
51
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
48
55
# Minimum number of uncompressed bytes to try fetch at once when retrieving
135
142
self._content = ''.join(self._content_chunks)
136
143
self._content_chunks = None
137
144
if self._content is None:
138
if self._z_content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
139
149
raise AssertionError('No content to decompress')
140
if self._z_content == '':
150
z_content = ''.join(self._z_content_chunks)
141
152
self._content = ''
142
153
elif self._compressor_name == 'lzma':
143
154
# We don't do partial lzma decomp yet
144
self._content = pylzma.decompress(self._z_content)
155
self._content = pylzma.decompress(z_content)
145
156
elif self._compressor_name == 'zlib':
146
157
# Start a zlib decompressor
147
158
if num_bytes * 4 > self._content_length * 3:
148
159
# If we are requesting more that 3/4ths of the content,
149
160
# just extract the whole thing in a single pass
150
161
num_bytes = self._content_length
151
self._content = zlib.decompress(self._z_content)
162
self._content = zlib.decompress(z_content)
153
164
self._z_content_decompressor = zlib.decompressobj()
154
165
# Seed the decompressor with the uncompressed bytes, so
155
166
# that the rest of the code is simplified
156
167
self._content = self._z_content_decompressor.decompress(
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
169
if not self._z_content_decompressor.unconsumed_tail:
159
170
self._z_content_decompressor = None
207
218
# XXX: Define some GCCorrupt error ?
208
219
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
220
(len(bytes), pos, self._z_content_length))
210
self._z_content = bytes[pos:]
221
self._z_content_chunks = (bytes[pos:],)
224
def _z_content(self):
225
"""Return z_content_chunks as a simple string.
227
Meant only to be used by the test suite.
229
if self._z_content_chunks is not None:
230
return ''.join(self._z_content_chunks)
213
234
def from_bytes(cls, bytes):
269
290
self._content_length = length
270
291
self._content_chunks = content_chunks
271
292
self._content = None
272
self._z_content = None
293
self._z_content_chunks = None
274
295
def set_content(self, content):
275
296
"""Set the content of this block."""
276
297
self._content_length = len(content)
277
298
self._content = content
278
self._z_content = None
299
self._z_content_chunks = None
280
301
def _create_z_content_using_lzma(self):
281
302
if self._content_chunks is not None:
283
304
self._content_chunks = None
284
305
if self._content is None:
285
306
raise AssertionError('Nothing to compress')
286
self._z_content = pylzma.compress(self._content)
287
self._z_content_length = len(self._z_content)
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
289
def _create_z_content_from_chunks(self):
311
def _create_z_content_from_chunks(self, chunks):
290
312
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
compressed_chunks = map(compressor.compress, self._content_chunks)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
292
316
compressed_chunks.append(compressor.flush())
293
self._z_content = ''.join(compressed_chunks)
294
self._z_content_length = len(self._z_content)
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
296
321
def _create_z_content(self):
297
if self._z_content is not None:
322
if self._z_content_chunks is not None:
300
325
self._create_z_content_using_lzma()
302
327
if self._content_chunks is not None:
303
self._create_z_content_from_chunks()
305
self._z_content = zlib.compress(self._content)
306
self._z_content_length = len(self._z_content)
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
309
"""Encode the information into a byte stream."""
334
"""Create the byte stream as a series of 'chunks'"""
310
335
self._create_z_content()
312
337
header = self.GCB_LZ_HEADER
314
339
header = self.GCB_HEADER
316
'%d\n%d\n' % (self._z_content_length, self._content_length),
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
319
350
return ''.join(chunks)
321
352
def _dump(self, include_text=False):
460
491
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
492
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
463
def __init__(self, block):
494
def __init__(self, block, get_compressor_settings=None):
464
495
self._block = block
465
496
# We need to preserve the ordering
466
497
self._factories = []
467
498
self._last_byte = 0
499
self._get_settings = get_compressor_settings
500
self._compressor_settings = None
502
def _get_compressor_settings(self):
503
if self._compressor_settings is not None:
504
return self._compressor_settings
506
if self._get_settings is not None:
507
settings = self._get_settings()
509
vf = GroupCompressVersionedFiles
510
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
511
self._compressor_settings = settings
512
return self._compressor_settings
469
514
def add_factory(self, key, parents, start, end):
470
515
if not self._factories:
503
548
new_block.set_content(self._block._content[:last_byte])
504
549
self._block = new_block
551
def _make_group_compressor(self):
552
return GroupCompressor(self._get_compressor_settings())
506
554
def _rebuild_block(self):
507
555
"""Create a new GroupCompressBlock with only the referenced texts."""
508
compressor = GroupCompressor()
556
compressor = self._make_group_compressor()
509
557
tstart = time.time()
510
558
old_length = self._block._content_length
523
571
# block? It seems hard to come up with a method that it would
524
572
# expand, since we do full compression again. Perhaps based on a
525
573
# request that ends up poorly ordered?
574
# TODO: If the content would have expanded, then we would want to
575
# handle a case where we need to split the block.
576
# Now that we have a user-tweakable option
577
# (max_bytes_to_index), it is possible that one person set it
578
# to a very low value, causing poor compression.
526
579
delta = time.time() - tstart
527
580
self._block = new_block
528
581
trace.mutter('creating new compressed block on-the-fly in %.3fs'
679
732
z_header_bytes = zlib.compress(header_bytes)
681
734
z_header_bytes_len = len(z_header_bytes)
682
block_bytes = self._block.to_bytes()
735
block_bytes_len, block_chunks = self._block.to_chunks()
683
736
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
685
738
lines.append(z_header_bytes)
686
lines.append(block_bytes)
687
del z_header_bytes, block_bytes
739
lines.extend(block_chunks)
740
del z_header_bytes, block_chunks
741
# TODO: This is a point where we will double the memory consumption. To
742
# avoid this, we probably have to switch to a 'chunked' api
688
743
return ''.join(lines)
691
746
def from_bytes(cls, bytes):
692
747
# TODO: This does extra string copying, probably better to do it a
748
# different way. At a minimum this creates 2 copies of the
694
750
(storage_kind, z_header_len, header_len,
695
751
block_len, rest) = bytes.split('\n', 4)
757
813
self.labels_deltas = {}
758
814
self._delta_index = None # Set by the children
759
815
self._block = GroupCompressBlock()
819
self._settings = settings
761
821
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
762
822
"""Compress lines with label key.
855
915
After calling this, the compressor should no longer be used
857
# TODO: this causes us to 'bloat' to 2x the size of content in the
858
# group. This has an impact for 'commit' of large objects.
859
# One possibility is to use self._content_chunks, and be lazy and
860
# only fill out self._content as a full string when we actually
861
# need it. That would at least drop the peak memory consumption
862
# for 'commit' down to ~1x the size of the largest file, at a
863
# cost of increased complexity within this code. 2x is still <<
864
# 3x the size of the largest file, so we are doing ok.
865
917
self._block.set_chunked_content(self.chunks, self.endpoint)
866
918
self.chunks = None
867
919
self._delta_index = None
886
938
class PythonGroupCompressor(_CommonGroupCompressor):
940
def __init__(self, settings=None):
889
941
"""Create a GroupCompressor.
891
943
Used only if the pyrex version is not available.
893
super(PythonGroupCompressor, self).__init__()
945
super(PythonGroupCompressor, self).__init__(settings)
894
946
self._delta_index = LinesDeltaIndex([])
895
947
# The actual content is managed by LinesDeltaIndex
896
948
self.chunks = self._delta_index.lines
934
986
It contains code very similar to SequenceMatcher because of having a similar
935
987
task. However some key differences apply:
936
- there is no junk, we want a minimal edit not a human readable diff.
937
- we don't filter very common lines (because we don't know where a good
938
range will start, and after the first text we want to be emitting minmal
940
- we chain the left side, not the right side
941
- we incrementally update the adjacency matrix as new lines are provided.
942
- we look for matches in all of the left side, so the routine which does
943
the analagous task of find_longest_match does not need to filter on the
989
* there is no junk, we want a minimal edit not a human readable diff.
990
* we don't filter very common lines (because we don't know where a good
991
range will start, and after the first text we want to be emitting minmal
993
* we chain the left side, not the right side
994
* we incrementally update the adjacency matrix as new lines are provided.
995
* we look for matches in all of the left side, so the routine which does
996
the analagous task of find_longest_match does not need to filter on the
948
super(PyrexGroupCompressor, self).__init__()
949
self._delta_index = DeltaIndex()
1000
def __init__(self, settings=None):
1001
super(PyrexGroupCompressor, self).__init__(settings)
1002
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1003
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
951
1005
def _compress(self, key, bytes, max_delta_size, soft=False):
952
1006
"""see _CommonGroupCompressor._compress"""
1027
1081
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1082
add_callback=graph_index.add_nodes,
1029
1083
inconsistency_fatal=inconsistency_fatal)
1030
access = knit._DirectPackAccess({})
1084
access = pack_repo._DirectPackAccess({})
1031
1085
access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1086
result = GroupCompressVersionedFiles(index, access, delta)
1033
1087
result.stream = stream
1044
1098
class _BatchingBlockFetcher(object):
1045
1099
"""Fetch group compress blocks in batches.
1047
1101
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1102
currently pending batch.
1051
def __init__(self, gcvf, locations):
1105
def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1106
self.gcvf = gcvf
1053
1107
self.locations = locations
1102
1157
def yield_factories(self, full_flush=False):
1103
1158
"""Yield factories for keys added since the last yield. They will be
1104
1159
returned in the order they were added via add_key.
1106
1161
:param full_flush: by default, some results may not be returned in case
1107
1162
they can be part of the next batch. If full_flush is True, then
1108
1163
all results are returned.
1136
1191
memos_to_get_stack.pop()
1138
1193
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1194
self.manager = _LazyGroupContentManager(block,
1195
get_compressor_settings=self._get_compressor_settings)
1140
1196
self.last_read_memo = read_memo
1141
1197
start, end = index_memo[3:5]
1142
1198
self.manager.add_factory(key, parents, start, end)
1149
1205
self.total_bytes = 0
1152
class GroupCompressVersionedFiles(VersionedFiles):
1208
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1209
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1211
# This controls how the GroupCompress DeltaIndex works. Basically, we
1212
# compute hash pointers into the source blocks (so hash(text) => text).
1213
# However each of these references costs some memory in trade against a
1214
# more accurate match result. For very large files, they either are
1215
# pre-compressed and change in bulk whenever they change, or change in just
1216
# local blocks. Either way, 'improved resolution' is not very helpful,
1217
# versus running out of memory trying to track everything. The default max
1218
# gives 100% sampling of a 1MB file.
1219
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1220
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1221
_DEFAULT_MAX_BYTES_TO_INDEX}
1223
def __init__(self, index, access, delta=True, _unadded_refs=None,
1156
1225
"""Create a GroupCompressVersionedFiles object.
1158
1227
:param index: The index object storing access and graph data.
1159
1228
:param access: The access object storing raw data.
1160
1229
:param delta: Whether to delta compress or just entropy compress.
1161
1230
:param _unadded_refs: private parameter, don't use.
1231
:param _group_cache: private parameter, don't use.
1163
1233
self._index = index
1164
1234
self._access = access
1166
1236
if _unadded_refs is None:
1167
1237
_unadded_refs = {}
1168
1238
self._unadded_refs = _unadded_refs
1169
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
self._fallback_vfs = []
1239
if _group_cache is None:
1240
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1241
self._group_cache = _group_cache
1242
self._immediate_fallback_vfs = []
1243
self._max_bytes_to_index = None
1172
1245
def without_fallbacks(self):
1173
1246
"""Return a clone of this object without any fallbacks configured."""
1174
1247
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1248
self._delta, _unadded_refs=dict(self._unadded_refs),
1249
_group_cache=self._group_cache)
1177
1251
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1252
left_matching_blocks=None, nostore_sha=None, random_id=False,
1182
1256
:param key: The key tuple of the text to add.
1183
1257
:param parents: The parents key tuples of the text to add.
1184
1258
:param lines: A list of lines. Each line must be a bytestring. And all
1185
of them except the last must be terminated with \n and contain no
1186
other \n's. The last line may either contain no \n's or a single
1187
terminating \n. If the lines list does meet this constraint the add
1188
routine may error or may succeed - but you will be unable to read
1189
the data back accurately. (Checking the lines have been split
1259
of them except the last must be terminated with \\n and contain no
1260
other \\n's. The last line may either contain no \\n's or a single
1261
terminating \\n. If the lines list does meet this constraint the
1262
add routine may error or may succeed - but you will be unable to
1263
read the data back accurately. (Checking the lines have been split
1190
1264
correctly is expensive and extremely unlikely to catch bugs so it
1191
1265
is not done at runtime unless check_content is True.)
1192
1266
:param parent_texts: An optional dictionary containing the opaque
1287
1361
self._check_lines_not_unicode(lines)
1288
1362
self._check_lines_are_lines(lines)
1290
def get_known_graph_ancestry(self, keys):
1291
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1303
kg = _mod_graph.KnownGraph(parent_map)
1306
1364
def get_parent_map(self, keys):
1307
1365
"""Get a map of the graph parents of keys.
1447
1505
The returned objects should be in the order defined by 'ordering',
1448
1506
which can weave between different sources.
1449
1508
:param ordering: Must be one of 'topological' or 'groupcompress'
1450
1509
:return: List of [(source, [keys])] tuples, such that all keys are in
1451
1510
the defined order, regardless of source.
1453
1512
if ordering == 'topological':
1454
present_keys = topo_sort(parent_map)
1513
present_keys = tsort.topo_sort(parent_map)
1456
1515
# ordering == 'groupcompress'
1457
1516
# XXX: This only optimizes for the target ordering. We may need
1546
1605
# - we encounter an unadded ref, or
1547
1606
# - we run out of keys, or
1548
1607
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1608
batcher = _BatchingBlockFetcher(self, locations,
1609
get_compressor_settings=self._get_compressor_settings)
1550
1610
for source, keys in source_keys:
1551
1611
if source is self:
1552
1612
for key in keys:
1598
1658
for _ in self._insert_record_stream(stream, random_id=False):
1661
def _get_compressor_settings(self):
1662
if self._max_bytes_to_index is None:
1663
# TODO: VersionedFiles don't know about their containing
1664
# repository, so they don't have much of an idea about their
1665
# location. So for now, this is only a global option.
1666
c = config.GlobalConfig()
1667
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1671
except ValueError, e:
1672
trace.warning('Value for '
1673
'"bzr.groupcompress.max_bytes_to_index"'
1674
' %r is not an integer'
1678
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1679
self._max_bytes_to_index = val
1680
return {'max_bytes_to_index': self._max_bytes_to_index}
1682
def _make_group_compressor(self):
1683
return GroupCompressor(self._get_compressor_settings())
1601
1685
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1686
reuse_blocks=True):
1603
1687
"""Internal core to insert a record stream into this container.
1627
1711
# This will go up to fulltexts for gc to gc fetching, which isn't
1629
self._compressor = GroupCompressor()
1713
self._compressor = self._make_group_compressor()
1630
1714
self._unadded_refs = {}
1631
1715
keys_to_add = []
1633
bytes = self._compressor.flush().to_bytes()
1634
self._compressor = GroupCompressor()
1717
bytes_len, chunks = self._compressor.flush().to_chunks()
1718
self._compressor = self._make_group_compressor()
1719
# Note: At this point we still have 1 copy of the fulltext (in
1720
# record and the var 'bytes'), and this generates 2 copies of
1721
# the compressed text (one for bytes, one in chunks)
1722
# TODO: Push 'chunks' down into the _access api, so that we don't
1723
# have to double compressed memory here
1724
# TODO: Figure out how to indicate that we would be happy to free
1725
# the fulltext content at this point. Note that sometimes we
1726
# will want it later (streaming CHK pages), but most of the
1727
# time we won't (everything else)
1728
bytes = ''.join(chunks)
1635
1730
index, start, length = self._access.add_raw_records(
1636
1731
[(None, len(bytes))], bytes)[0]
1802
1897
"""See VersionedFiles.keys."""
1803
1898
if 'evil' in debug.debug_flags:
1804
1899
trace.mutter_callsite(2, "keys scales with size of history")
1805
sources = [self._index] + self._fallback_vfs
1900
sources = [self._index] + self._immediate_fallback_vfs
1807
1902
for source in sources:
1808
1903
result.update(source.keys())
1907
class _GCBuildDetails(object):
1908
"""A blob of data about the build details.
1910
This stores the minimal data, which then allows compatibility with the old
1911
api, without taking as much memory.
1914
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1915
'_delta_end', '_parents')
1918
compression_parent = None
1920
def __init__(self, parents, position_info):
1921
self._parents = parents
1922
(self._index, self._group_start, self._group_end, self._basis_end,
1923
self._delta_end) = position_info
1926
return '%s(%s, %s)' % (self.__class__.__name__,
1927
self.index_memo, self._parents)
1930
def index_memo(self):
1931
return (self._index, self._group_start, self._group_end,
1932
self._basis_end, self._delta_end)
1935
def record_details(self):
1936
return static_tuple.StaticTuple(self.method, None)
1938
def __getitem__(self, offset):
1939
"""Compatibility thunk to act like a tuple."""
1941
return self.index_memo
1943
return self.compression_parent # Always None
1945
return self._parents
1947
return self.record_details
1949
raise IndexError('offset out of range')
1812
1955
class _GCGraphIndex(object):
1813
1956
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1843
1986
# repeated over and over, this creates a surplus of ints
1844
1987
self._int_cache = {}
1845
1988
if track_external_parent_refs:
1846
self._key_dependencies = knit._KeyRefs(
1989
self._key_dependencies = _KeyRefs(
1847
1990
track_new_keys=track_new_keys)
1849
1992
self._key_dependencies = None
1989
2132
:param keys: An iterable of keys.
1990
2133
:return: A dict of key:
1991
2134
(index_memo, compression_parent, parents, record_details).
1993
opaque structure to pass to read_records to extract the raw
1996
Content that this record is built upon, may be None
1998
Logical parents of this node
2000
extra information about the content which needs to be passed to
2001
Factory.parse_record
2136
* index_memo: opaque structure to pass to read_records to extract
2138
* compression_parent: Content that this record is built upon, may
2140
* parents: Logical parents of this node
2141
* record_details: extra information about the content which needs
2142
to be passed to Factory.parse_record
2003
2144
self._check_read()
2033
2173
# each, or about 7MB. Note that it might be even more when you consider
2034
2174
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
2175
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2176
# a LIFO when re-using PyInt slots, which might cause more
2037
2177
# fragmentation.
2038
2178
start = int(bits[0])
2039
2179
start = self._int_cache.setdefault(start, start)