23
23
except ImportError:
26
from bzrlib.lazy_import import lazy_import
27
lazy_import(globals(), """
28
26
from bzrlib import (
33
30
graph as _mod_graph,
41
from bzrlib.repofmt import pack_repo
44
36
from bzrlib.btree_index import BTreeBuilder
45
37
from bzrlib.lru_cache import LRUSizeCache
38
from bzrlib.tsort import topo_sort
46
39
from bzrlib.versionedfile import (
49
41
AbsentContentFactory,
50
42
ChunkedContentFactory,
51
43
FulltextContentFactory,
52
VersionedFilesWithFallbacks,
55
47
# Minimum number of uncompressed bytes to try fetch at once when retrieving
127
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
128
120
content. If None, consume everything
130
if self._content_length is None:
131
raise AssertionError('self._content_length should never be None')
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
132
129
if num_bytes is None:
133
130
num_bytes = self._content_length
134
131
elif (self._content_length is not None
142
139
self._content = ''.join(self._content_chunks)
143
140
self._content_chunks = None
144
141
if self._content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
142
if self._z_content is None:
149
143
raise AssertionError('No content to decompress')
150
z_content = ''.join(self._z_content_chunks)
144
if self._z_content == '':
152
145
self._content = ''
153
146
elif self._compressor_name == 'lzma':
154
147
# We don't do partial lzma decomp yet
155
self._content = pylzma.decompress(z_content)
148
self._content = pylzma.decompress(self._z_content)
156
149
elif self._compressor_name == 'zlib':
157
150
# Start a zlib decompressor
158
if num_bytes * 4 > self._content_length * 3:
159
# If we are requesting more that 3/4ths of the content,
160
# just extract the whole thing in a single pass
161
num_bytes = self._content_length
162
self._content = zlib.decompress(z_content)
151
if num_bytes is None:
152
self._content = zlib.decompress(self._z_content)
164
154
self._z_content_decompressor = zlib.decompressobj()
165
155
# Seed the decompressor with the uncompressed bytes, so
166
156
# that the rest of the code is simplified
167
157
self._content = self._z_content_decompressor.decompress(
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
if not self._z_content_decompressor.unconsumed_tail:
170
self._z_content_decompressor = None
158
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
172
160
raise AssertionError('Unknown compressor: %r'
173
161
% self._compressor_name)
175
163
# 'unconsumed_tail'
177
165
# Do we have enough bytes already?
178
if len(self._content) >= num_bytes:
166
if num_bytes is not None and len(self._content) >= num_bytes:
168
if num_bytes is None and self._z_content_decompressor is None:
169
# We must have already decompressed everything
180
171
# If we got this far, and don't have a decompressor, something is wrong
181
172
if self._z_content_decompressor is None:
182
173
raise AssertionError(
183
174
'No decompressor to decompress %d bytes' % num_bytes)
184
175
remaining_decomp = self._z_content_decompressor.unconsumed_tail
185
if not remaining_decomp:
186
raise AssertionError('Nothing left to decompress')
187
needed_bytes = num_bytes - len(self._content)
188
# We always set max_size to 32kB over the minimum needed, so that
189
# zlib will give us as much as we really want.
190
# TODO: If this isn't good enough, we could make a loop here,
191
# that keeps expanding the request until we get enough
192
self._content += self._z_content_decompressor.decompress(
193
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
194
if len(self._content) < num_bytes:
195
raise AssertionError('%d bytes wanted, only %d available'
196
% (num_bytes, len(self._content)))
197
if not self._z_content_decompressor.unconsumed_tail:
198
# The stream is finished
199
self._z_content_decompressor = None
176
if num_bytes is None:
178
# We don't know how much is left, but we'll decompress it all
179
self._content += self._z_content_decompressor.decompress(
181
# Note: There's what I consider a bug in zlib.decompressobj
182
# If you pass back in the entire unconsumed_tail, only
183
# this time you don't pass a max-size, it doesn't
184
# change the unconsumed_tail back to None/''.
185
# However, we know we are done with the whole stream
186
self._z_content_decompressor = None
187
# XXX: Why is this the only place in this routine we set this?
188
self._content_length = len(self._content)
190
if not remaining_decomp:
191
raise AssertionError('Nothing left to decompress')
192
needed_bytes = num_bytes - len(self._content)
193
# We always set max_size to 32kB over the minimum needed, so that
194
# zlib will give us as much as we really want.
195
# TODO: If this isn't good enough, we could make a loop here,
196
# that keeps expanding the request until we get enough
197
self._content += self._z_content_decompressor.decompress(
198
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
if len(self._content) < num_bytes:
200
raise AssertionError('%d bytes wanted, only %d available'
201
% (num_bytes, len(self._content)))
202
if not self._z_content_decompressor.unconsumed_tail:
203
# The stream is finished
204
self._z_content_decompressor = None
201
206
def _parse_bytes(self, bytes, pos):
202
207
"""Read the various lengths from the header.
218
223
# XXX: Define some GCCorrupt error ?
219
224
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
220
225
(len(bytes), pos, self._z_content_length))
221
self._z_content_chunks = (bytes[pos:],)
224
def _z_content(self):
225
"""Return z_content_chunks as a simple string.
227
Meant only to be used by the test suite.
229
if self._z_content_chunks is not None:
230
return ''.join(self._z_content_chunks)
226
self._z_content = bytes[pos:]
234
229
def from_bytes(cls, bytes):
290
285
self._content_length = length
291
286
self._content_chunks = content_chunks
292
287
self._content = None
293
self._z_content_chunks = None
288
self._z_content = None
295
290
def set_content(self, content):
296
291
"""Set the content of this block."""
297
292
self._content_length = len(content)
298
293
self._content = content
299
self._z_content_chunks = None
294
self._z_content = None
301
296
def _create_z_content_using_lzma(self):
302
297
if self._content_chunks is not None:
304
299
self._content_chunks = None
305
300
if self._content is None:
306
301
raise AssertionError('Nothing to compress')
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
302
self._z_content = pylzma.compress(self._content)
303
self._z_content_length = len(self._z_content)
311
def _create_z_content_from_chunks(self, chunks):
305
def _create_z_content_from_chunks(self):
312
306
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
307
compressed_chunks = map(compressor.compress, self._content_chunks)
316
308
compressed_chunks.append(compressor.flush())
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
309
self._z_content = ''.join(compressed_chunks)
310
self._z_content_length = len(self._z_content)
321
312
def _create_z_content(self):
322
if self._z_content_chunks is not None:
313
if self._z_content is not None:
325
316
self._create_z_content_using_lzma()
327
318
if self._content_chunks is not None:
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
319
self._create_z_content_from_chunks()
321
self._z_content = zlib.compress(self._content)
322
self._z_content_length = len(self._z_content)
334
"""Create the byte stream as a series of 'chunks'"""
325
"""Encode the information into a byte stream."""
335
326
self._create_z_content()
337
328
header = self.GCB_LZ_HEADER
339
330
header = self.GCB_HEADER
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
332
'%d\n%d\n' % (self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
350
335
return ''.join(chunks)
352
337
def _dump(self, include_text=False):
466
451
# Grab and cache the raw bytes for this entry
467
452
# and break the ref-cycle with _manager since we don't need it
470
self._manager._prepare_for_extract()
471
except zlib.error as value:
472
raise errors.DecompressCorruption("zlib: " + str(value))
454
self._manager._prepare_for_extract()
473
455
block = self._manager._block
474
456
self._bytes = block.extract(self.key, self._start, self._end)
475
457
# There are code paths that first extract as fulltext, and then
494
476
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
477
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
497
def __init__(self, block, get_compressor_settings=None):
479
def __init__(self, block):
498
480
self._block = block
499
481
# We need to preserve the ordering
500
482
self._factories = []
501
483
self._last_byte = 0
502
self._get_settings = get_compressor_settings
503
self._compressor_settings = None
505
def _get_compressor_settings(self):
506
if self._compressor_settings is not None:
507
return self._compressor_settings
509
if self._get_settings is not None:
510
settings = self._get_settings()
512
vf = GroupCompressVersionedFiles
513
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
self._compressor_settings = settings
515
return self._compressor_settings
517
485
def add_factory(self, key, parents, start, end):
518
486
if not self._factories:
551
519
new_block.set_content(self._block._content[:last_byte])
552
520
self._block = new_block
554
def _make_group_compressor(self):
555
return GroupCompressor(self._get_compressor_settings())
557
522
def _rebuild_block(self):
558
523
"""Create a new GroupCompressBlock with only the referenced texts."""
559
compressor = self._make_group_compressor()
524
compressor = GroupCompressor()
560
525
tstart = time.time()
561
526
old_length = self._block._content_length
574
539
# block? It seems hard to come up with a method that it would
575
540
# expand, since we do full compression again. Perhaps based on a
576
541
# request that ends up poorly ordered?
577
# TODO: If the content would have expanded, then we would want to
578
# handle a case where we need to split the block.
579
# Now that we have a user-tweakable option
580
# (max_bytes_to_index), it is possible that one person set it
581
# to a very low value, causing poor compression.
582
542
delta = time.time() - tstart
583
543
self._block = new_block
584
544
trace.mutter('creating new compressed block on-the-fly in %.3fs'
735
695
z_header_bytes = zlib.compress(header_bytes)
737
697
z_header_bytes_len = len(z_header_bytes)
738
block_bytes_len, block_chunks = self._block.to_chunks()
698
block_bytes = self._block.to_bytes()
739
699
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
741
701
lines.append(z_header_bytes)
742
lines.extend(block_chunks)
743
del z_header_bytes, block_chunks
744
# TODO: This is a point where we will double the memory consumption. To
745
# avoid this, we probably have to switch to a 'chunked' api
702
lines.append(block_bytes)
703
del z_header_bytes, block_bytes
746
704
return ''.join(lines)
749
707
def from_bytes(cls, bytes):
750
708
# TODO: This does extra string copying, probably better to do it a
751
# different way. At a minimum this creates 2 copies of the
753
710
(storage_kind, z_header_len, header_len,
754
711
block_len, rest) = bytes.split('\n', 4)
918
871
After calling this, the compressor should no longer be used
873
# TODO: this causes us to 'bloat' to 2x the size of content in the
874
# group. This has an impact for 'commit' of large objects.
875
# One possibility is to use self._content_chunks, and be lazy and
876
# only fill out self._content as a full string when we actually
877
# need it. That would at least drop the peak memory consumption
878
# for 'commit' down to ~1x the size of the largest file, at a
879
# cost of increased complexity within this code. 2x is still <<
880
# 3x the size of the largest file, so we are doing ok.
920
881
self._block.set_chunked_content(self.chunks, self.endpoint)
921
882
self.chunks = None
922
883
self._delta_index = None
941
902
class PythonGroupCompressor(_CommonGroupCompressor):
943
def __init__(self, settings=None):
944
905
"""Create a GroupCompressor.
946
907
Used only if the pyrex version is not available.
948
super(PythonGroupCompressor, self).__init__(settings)
909
super(PythonGroupCompressor, self).__init__()
949
910
self._delta_index = LinesDeltaIndex([])
950
911
# The actual content is managed by LinesDeltaIndex
951
912
self.chunks = self._delta_index.lines
989
950
It contains code very similar to SequenceMatcher because of having a similar
990
951
task. However some key differences apply:
992
* there is no junk, we want a minimal edit not a human readable diff.
993
* we don't filter very common lines (because we don't know where a good
994
range will start, and after the first text we want to be emitting minmal
996
* we chain the left side, not the right side
997
* we incrementally update the adjacency matrix as new lines are provided.
998
* we look for matches in all of the left side, so the routine which does
999
the analagous task of find_longest_match does not need to filter on the
952
- there is no junk, we want a minimal edit not a human readable diff.
953
- we don't filter very common lines (because we don't know where a good
954
range will start, and after the first text we want to be emitting minmal
956
- we chain the left side, not the right side
957
- we incrementally update the adjacency matrix as new lines are provided.
958
- we look for matches in all of the left side, so the routine which does
959
the analagous task of find_longest_match does not need to filter on the
1003
def __init__(self, settings=None):
1004
super(PyrexGroupCompressor, self).__init__(settings)
1005
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
964
super(PyrexGroupCompressor, self).__init__()
965
self._delta_index = DeltaIndex()
1008
967
def _compress(self, key, bytes, max_delta_size, soft=False):
1009
968
"""see _CommonGroupCompressor._compress"""
1084
1043
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1085
1044
add_callback=graph_index.add_nodes,
1086
1045
inconsistency_fatal=inconsistency_fatal)
1087
access = pack_repo._DirectPackAccess({})
1046
access = knit._DirectPackAccess({})
1088
1047
access.set_writer(writer, graph_index, (transport, 'newpack'))
1089
1048
result = GroupCompressVersionedFiles(index, access, delta)
1090
1049
result.stream = stream
1101
1060
class _BatchingBlockFetcher(object):
1102
1061
"""Fetch group compress blocks in batches.
1104
1063
:ivar total_bytes: int of expected number of bytes needed to fetch the
1105
1064
currently pending batch.
1108
def __init__(self, gcvf, locations, get_compressor_settings=None):
1067
def __init__(self, gcvf, locations):
1109
1068
self.gcvf = gcvf
1110
1069
self.locations = locations
1194
1152
memos_to_get_stack.pop()
1196
1154
block = self.batch_memos[read_memo]
1197
self.manager = _LazyGroupContentManager(block,
1198
get_compressor_settings=self._get_compressor_settings)
1155
self.manager = _LazyGroupContentManager(block)
1199
1156
self.last_read_memo = read_memo
1200
1157
start, end = index_memo[3:5]
1201
1158
self.manager.add_factory(key, parents, start, end)
1208
1165
self.total_bytes = 0
1211
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1168
class GroupCompressVersionedFiles(VersionedFiles):
1212
1169
"""A group-compress based VersionedFiles implementation."""
1214
# This controls how the GroupCompress DeltaIndex works. Basically, we
1215
# compute hash pointers into the source blocks (so hash(text) => text).
1216
# However each of these references costs some memory in trade against a
1217
# more accurate match result. For very large files, they either are
1218
# pre-compressed and change in bulk whenever they change, or change in just
1219
# local blocks. Either way, 'improved resolution' is not very helpful,
1220
# versus running out of memory trying to track everything. The default max
1221
# gives 100% sampling of a 1MB file.
1222
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
_DEFAULT_MAX_BYTES_TO_INDEX}
1226
def __init__(self, index, access, delta=True, _unadded_refs=None,
1171
def __init__(self, index, access, delta=True, _unadded_refs=None):
1228
1172
"""Create a GroupCompressVersionedFiles object.
1230
1174
:param index: The index object storing access and graph data.
1231
1175
:param access: The access object storing raw data.
1232
1176
:param delta: Whether to delta compress or just entropy compress.
1233
1177
:param _unadded_refs: private parameter, don't use.
1234
:param _group_cache: private parameter, don't use.
1236
1179
self._index = index
1237
1180
self._access = access
1239
1182
if _unadded_refs is None:
1240
1183
_unadded_refs = {}
1241
1184
self._unadded_refs = _unadded_refs
1242
if _group_cache is None:
1243
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
self._group_cache = _group_cache
1245
self._immediate_fallback_vfs = []
1246
self._max_bytes_to_index = None
1185
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1186
self._fallback_vfs = []
1248
1188
def without_fallbacks(self):
1249
1189
"""Return a clone of this object without any fallbacks configured."""
1250
1190
return GroupCompressVersionedFiles(self._index, self._access,
1251
self._delta, _unadded_refs=dict(self._unadded_refs),
1252
_group_cache=self._group_cache)
1191
self._delta, _unadded_refs=dict(self._unadded_refs))
1254
1193
def add_lines(self, key, parents, lines, parent_texts=None,
1255
1194
left_matching_blocks=None, nostore_sha=None, random_id=False,
1259
1198
:param key: The key tuple of the text to add.
1260
1199
:param parents: The parents key tuples of the text to add.
1261
1200
:param lines: A list of lines. Each line must be a bytestring. And all
1262
of them except the last must be terminated with \\n and contain no
1263
other \\n's. The last line may either contain no \\n's or a single
1264
terminating \\n. If the lines list does meet this constraint the
1265
add routine may error or may succeed - but you will be unable to
1266
read the data back accurately. (Checking the lines have been split
1201
of them except the last must be terminated with \n and contain no
1202
other \n's. The last line may either contain no \n's or a single
1203
terminating \n. If the lines list does meet this constraint the add
1204
routine may error or may succeed - but you will be unable to read
1205
the data back accurately. (Checking the lines have been split
1267
1206
correctly is expensive and extremely unlikely to catch bugs so it
1268
1207
is not done at runtime unless check_content is True.)
1269
1208
:param parent_texts: An optional dictionary containing the opaque
1344
1283
return self.get_record_stream(keys, 'unordered', True)
1346
def clear_cache(self):
1347
"""See VersionedFiles.clear_cache()"""
1348
self._group_cache.clear()
1349
self._index._graph_index.clear_cache()
1350
self._index._int_cache.clear()
1352
1285
def _check_add(self, key, lines, random_id, check_content):
1353
1286
"""check that version_id and lines are safe to add."""
1354
1287
version_id = key[-1]
1364
1297
self._check_lines_not_unicode(lines)
1365
1298
self._check_lines_are_lines(lines)
1300
def get_known_graph_ancestry(self, keys):
1301
"""Get a KnownGraph instance with the ancestry of keys."""
1302
# Note that this is identical to
1303
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1305
parent_map, missing_keys = self._index.find_ancestry(keys)
1306
for fallback in self._fallback_vfs:
1307
if not missing_keys:
1309
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1311
parent_map.update(f_parent_map)
1312
missing_keys = f_missing_keys
1313
kg = _mod_graph.KnownGraph(parent_map)
1367
1316
def get_parent_map(self, keys):
1368
1317
"""Get a map of the graph parents of keys.
1508
1457
The returned objects should be in the order defined by 'ordering',
1509
1458
which can weave between different sources.
1511
1459
:param ordering: Must be one of 'topological' or 'groupcompress'
1512
1460
:return: List of [(source, [keys])] tuples, such that all keys are in
1513
1461
the defined order, regardless of source.
1515
1463
if ordering == 'topological':
1516
present_keys = tsort.topo_sort(parent_map)
1464
present_keys = topo_sort(parent_map)
1518
1466
# ordering == 'groupcompress'
1519
1467
# XXX: This only optimizes for the target ordering. We may need
1608
1556
# - we encounter an unadded ref, or
1609
1557
# - we run out of keys, or
1610
1558
# - the total bytes to retrieve for this batch > BATCH_SIZE
1611
batcher = _BatchingBlockFetcher(self, locations,
1612
get_compressor_settings=self._get_compressor_settings)
1559
batcher = _BatchingBlockFetcher(self, locations)
1613
1560
for source, keys in source_keys:
1614
1561
if source is self:
1615
1562
for key in keys:
1661
1608
for _ in self._insert_record_stream(stream, random_id=False):
1664
def _get_compressor_settings(self):
1665
if self._max_bytes_to_index is None:
1666
# TODO: VersionedFiles don't know about their containing
1667
# repository, so they don't have much of an idea about their
1668
# location. So for now, this is only a global option.
1669
c = config.GlobalConfig()
1670
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1674
except ValueError, e:
1675
trace.warning('Value for '
1676
'"bzr.groupcompress.max_bytes_to_index"'
1677
' %r is not an integer'
1681
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1682
self._max_bytes_to_index = val
1683
return {'max_bytes_to_index': self._max_bytes_to_index}
1685
def _make_group_compressor(self):
1686
return GroupCompressor(self._get_compressor_settings())
1688
1611
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1689
1612
reuse_blocks=True):
1690
1613
"""Internal core to insert a record stream into this container.
1714
1637
# This will go up to fulltexts for gc to gc fetching, which isn't
1716
self._compressor = self._make_group_compressor()
1639
self._compressor = GroupCompressor()
1717
1640
self._unadded_refs = {}
1718
1641
keys_to_add = []
1720
bytes_len, chunks = self._compressor.flush().to_chunks()
1721
self._compressor = self._make_group_compressor()
1722
# Note: At this point we still have 1 copy of the fulltext (in
1723
# record and the var 'bytes'), and this generates 2 copies of
1724
# the compressed text (one for bytes, one in chunks)
1725
# TODO: Push 'chunks' down into the _access api, so that we don't
1726
# have to double compressed memory here
1727
# TODO: Figure out how to indicate that we would be happy to free
1728
# the fulltext content at this point. Note that sometimes we
1729
# will want it later (streaming CHK pages), but most of the
1730
# time we won't (everything else)
1731
bytes = ''.join(chunks)
1643
bytes = self._compressor.flush().to_bytes()
1733
1644
index, start, length = self._access.add_raw_records(
1734
1645
[(None, len(bytes))], bytes)[0]
1845
1757
key = record.key
1846
1758
self._unadded_refs[key] = record.parents
1847
1759
yield found_sha1
1848
as_st = static_tuple.StaticTuple.from_sequence
1849
if record.parents is not None:
1850
parents = as_st([as_st(p) for p in record.parents])
1853
refs = static_tuple.StaticTuple(parents)
1854
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1760
keys_to_add.append((key, '%d %d' % (start_point, end_point),
1855
1762
if len(keys_to_add):
1857
1764
self._compressor = None
1900
1807
"""See VersionedFiles.keys."""
1901
1808
if 'evil' in debug.debug_flags:
1902
1809
trace.mutter_callsite(2, "keys scales with size of history")
1903
sources = [self._index] + self._immediate_fallback_vfs
1810
sources = [self._index] + self._fallback_vfs
1905
1812
for source in sources:
1906
1813
result.update(source.keys())
1910
class _GCBuildDetails(object):
1911
"""A blob of data about the build details.
1913
This stores the minimal data, which then allows compatibility with the old
1914
api, without taking as much memory.
1917
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1918
'_delta_end', '_parents')
1921
compression_parent = None
1923
def __init__(self, parents, position_info):
1924
self._parents = parents
1925
(self._index, self._group_start, self._group_end, self._basis_end,
1926
self._delta_end) = position_info
1929
return '%s(%s, %s)' % (self.__class__.__name__,
1930
self.index_memo, self._parents)
1933
def index_memo(self):
1934
return (self._index, self._group_start, self._group_end,
1935
self._basis_end, self._delta_end)
1938
def record_details(self):
1939
return static_tuple.StaticTuple(self.method, None)
1941
def __getitem__(self, offset):
1942
"""Compatibility thunk to act like a tuple."""
1944
return self.index_memo
1946
return self.compression_parent # Always None
1948
return self._parents
1950
return self.record_details
1952
raise IndexError('offset out of range')
1958
1817
class _GCGraphIndex(object):
1959
1818
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1985
1844
self.has_graph = parents
1986
1845
self._is_locked = is_locked
1987
1846
self._inconsistency_fatal = inconsistency_fatal
1988
# GroupCompress records tend to have the same 'group' start + offset
1989
# repeated over and over, this creates a surplus of ints
1990
self._int_cache = {}
1991
1847
if track_external_parent_refs:
1992
self._key_dependencies = _KeyRefs(
1848
self._key_dependencies = knit._KeyRefs(
1993
1849
track_new_keys=track_new_keys)
1995
1851
self._key_dependencies = None
2029
1885
if not random_id:
2030
1886
present_nodes = self._get_entries(keys)
2031
1887
for (index, key, value, node_refs) in present_nodes:
2032
# Sometimes these are passed as a list rather than a tuple
2033
node_refs = static_tuple.as_tuples(node_refs)
2034
passed = static_tuple.as_tuples(keys[key])
2035
if node_refs != passed[1]:
2036
details = '%s %s %s' % (key, (value, node_refs), passed)
1888
if node_refs != keys[key][1]:
1889
details = '%s %s %s' % (key, (value, node_refs), keys[key])
2037
1890
if self._inconsistency_fatal:
2038
1891
raise errors.KnitCorrupt(self, "inconsistent details"
2039
1892
" in add_records: %s" %
2135
1988
:param keys: An iterable of keys.
2136
1989
:return: A dict of key:
2137
1990
(index_memo, compression_parent, parents, record_details).
2139
* index_memo: opaque structure to pass to read_records to extract
2141
* compression_parent: Content that this record is built upon, may
2143
* parents: Logical parents of this node
2144
* record_details: extra information about the content which needs
2145
to be passed to Factory.parse_record
1992
opaque structure to pass to read_records to extract the raw
1995
Content that this record is built upon, may be None
1997
Logical parents of this node
1999
extra information about the content which needs to be passed to
2000
Factory.parse_record
2147
2002
self._check_read()
2169
2025
"""Convert an index value to position details."""
2170
2026
bits = node[2].split(' ')
2171
2027
# It would be nice not to read the entire gzip.
2172
# start and stop are put into _int_cache because they are very common.
2173
# They define the 'group' that an entry is in, and many groups can have
2174
# thousands of objects.
2175
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2176
# each, or about 7MB. Note that it might be even more when you consider
2177
# how PyInt is allocated in separate slabs. And you can't return a slab
2178
# to the OS if even 1 int on it is in use. Note though that Python uses
2179
# a LIFO when re-using PyInt slots, which might cause more
2181
2028
start = int(bits[0])
2182
start = self._int_cache.setdefault(start, start)
2183
2029
stop = int(bits[1])
2184
stop = self._int_cache.setdefault(stop, stop)
2185
2030
basis_end = int(bits[2])
2186
2031
delta_end = int(bits[3])
2187
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2189
return (node[0], start, stop, basis_end, delta_end)
2032
return node[0], start, stop, basis_end, delta_end
2191
2034
def scan_unvalidated_index(self, graph_index):
2192
2035
"""Inform this _GCGraphIndex that there is an unvalidated index.