137
139
self._content = ''.join(self._content_chunks)
138
140
self._content_chunks = None
139
141
if self._content is None:
140
if self._z_content is None:
142
# We join self._z_content_chunks here, because if we are
143
# decompressing, then it is *very* likely that we have a single
145
if self._z_content_chunks is None:
141
146
raise AssertionError('No content to decompress')
142
if self._z_content == '':
147
z_content = ''.join(self._z_content_chunks)
143
149
self._content = ''
144
150
elif self._compressor_name == 'lzma':
145
151
# We don't do partial lzma decomp yet
146
self._content = pylzma.decompress(self._z_content)
153
self._content = pylzma.decompress(z_content)
147
154
elif self._compressor_name == 'zlib':
148
155
# Start a zlib decompressor
149
if num_bytes is None:
150
self._content = zlib.decompress(self._z_content)
156
if num_bytes * 4 > self._content_length * 3:
157
# If we are requesting more that 3/4ths of the content,
158
# just extract the whole thing in a single pass
159
num_bytes = self._content_length
160
self._content = zlib.decompress(z_content)
152
162
self._z_content_decompressor = zlib.decompressobj()
153
163
# Seed the decompressor with the uncompressed bytes, so
154
164
# that the rest of the code is simplified
155
165
self._content = self._z_content_decompressor.decompress(
156
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
166
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
167
if not self._z_content_decompressor.unconsumed_tail:
168
self._z_content_decompressor = None
158
170
raise AssertionError('Unknown compressor: %r'
159
171
% self._compressor_name)
161
173
# 'unconsumed_tail'
163
175
# Do we have enough bytes already?
164
if num_bytes is not None and len(self._content) >= num_bytes:
166
if num_bytes is None and self._z_content_decompressor is None:
167
# We must have already decompressed everything
176
if len(self._content) >= num_bytes:
169
178
# If we got this far, and don't have a decompressor, something is wrong
170
179
if self._z_content_decompressor is None:
171
180
raise AssertionError(
172
181
'No decompressor to decompress %d bytes' % num_bytes)
173
182
remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
if num_bytes is None:
176
# We don't know how much is left, but we'll decompress it all
177
self._content += self._z_content_decompressor.decompress(
179
# Note: There's what I consider a bug in zlib.decompressobj
180
# If you pass back in the entire unconsumed_tail, only
181
# this time you don't pass a max-size, it doesn't
182
# change the unconsumed_tail back to None/''.
183
# However, we know we are done with the whole stream
184
self._z_content_decompressor = None
185
# XXX: Why is this the only place in this routine we set this?
186
self._content_length = len(self._content)
188
if not remaining_decomp:
189
raise AssertionError('Nothing left to decompress')
190
needed_bytes = num_bytes - len(self._content)
191
# We always set max_size to 32kB over the minimum needed, so that
192
# zlib will give us as much as we really want.
193
# TODO: If this isn't good enough, we could make a loop here,
194
# that keeps expanding the request until we get enough
195
self._content += self._z_content_decompressor.decompress(
196
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
197
if len(self._content) < num_bytes:
198
raise AssertionError('%d bytes wanted, only %d available'
199
% (num_bytes, len(self._content)))
200
if not self._z_content_decompressor.unconsumed_tail:
201
# The stream is finished
202
self._z_content_decompressor = None
183
if not remaining_decomp:
184
raise AssertionError('Nothing left to decompress')
185
needed_bytes = num_bytes - len(self._content)
186
# We always set max_size to 32kB over the minimum needed, so that
187
# zlib will give us as much as we really want.
188
# TODO: If this isn't good enough, we could make a loop here,
189
# that keeps expanding the request until we get enough
190
self._content += self._z_content_decompressor.decompress(
191
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
192
if len(self._content) < num_bytes:
193
raise AssertionError('%d bytes wanted, only %d available'
194
% (num_bytes, len(self._content)))
195
if not self._z_content_decompressor.unconsumed_tail:
196
# The stream is finished
197
self._z_content_decompressor = None
204
199
def _parse_bytes(self, bytes, pos):
205
200
"""Read the various lengths from the header.
283
288
self._content_length = length
284
289
self._content_chunks = content_chunks
285
290
self._content = None
286
self._z_content = None
291
self._z_content_chunks = None
288
293
def set_content(self, content):
289
294
"""Set the content of this block."""
290
295
self._content_length = len(content)
291
296
self._content = content
292
self._z_content = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
self._z_content = pylzma.compress(self._content)
301
self._z_content_length = len(self._z_content)
303
def _create_z_content_from_chunks(self):
297
self._z_content_chunks = None
299
def _create_z_content_from_chunks(self, chunks):
304
300
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
305
compressed_chunks = map(compressor.compress, self._content_chunks)
301
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
302
# (measured peak is maybe 30MB over the above...)
303
compressed_chunks = map(compressor.compress, chunks)
306
304
compressed_chunks.append(compressor.flush())
307
self._z_content = ''.join(compressed_chunks)
308
self._z_content_length = len(self._z_content)
305
# Ignore empty chunks
306
self._z_content_chunks = [c for c in compressed_chunks if c]
307
self._z_content_length = sum(map(len, self._z_content_chunks))
310
309
def _create_z_content(self):
311
if self._z_content is not None:
314
self._create_z_content_using_lzma()
310
if self._z_content_chunks is not None:
316
312
if self._content_chunks is not None:
317
self._create_z_content_from_chunks()
319
self._z_content = zlib.compress(self._content)
320
self._z_content_length = len(self._z_content)
313
chunks = self._content_chunks
315
chunks = (self._content,)
316
self._create_z_content_from_chunks(chunks)
319
"""Create the byte stream as a series of 'chunks'"""
320
self._create_z_content()
321
header = self.GCB_HEADER
322
chunks = ['%s%d\n%d\n'
323
% (header, self._z_content_length, self._content_length),
325
chunks.extend(self._z_content_chunks)
326
total_len = sum(map(len, chunks))
327
return total_len, chunks
322
329
def to_bytes(self):
323
330
"""Encode the information into a byte stream."""
324
self._create_z_content()
326
header = self.GCB_LZ_HEADER
328
header = self.GCB_HEADER
330
'%d\n%d\n' % (self._z_content_length, self._content_length),
331
total_len, chunks = self.to_chunks()
333
332
return ''.join(chunks)
335
334
def _dump(self, include_text=False):
467
468
class _LazyGroupContentManager(object):
468
469
"""This manages a group of _LazyGroupCompressFactory objects."""
470
def __init__(self, block):
471
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
472
# current size, and still be considered
474
_full_block_size = 4*1024*1024
475
_full_mixed_block_size = 2*1024*1024
476
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
477
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
479
def __init__(self, block, get_compressor_settings=None):
471
480
self._block = block
472
481
# We need to preserve the ordering
473
482
self._factories = []
474
483
self._last_byte = 0
484
self._get_settings = get_compressor_settings
485
self._compressor_settings = None
487
def _get_compressor_settings(self):
488
if self._compressor_settings is not None:
489
return self._compressor_settings
491
if self._get_settings is not None:
492
settings = self._get_settings()
494
vf = GroupCompressVersionedFiles
495
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
496
self._compressor_settings = settings
497
return self._compressor_settings
476
499
def add_factory(self, key, parents, start, end):
477
500
if not self._factories:
544
575
# time (self._block._content) is a little expensive.
545
576
self._block._ensure_content(self._last_byte)
547
def _check_rebuild_block(self):
578
def _check_rebuild_action(self):
548
579
"""Check to see if our block should be repacked."""
549
580
total_bytes_used = 0
550
581
last_byte_used = 0
551
582
for factory in self._factories:
552
583
total_bytes_used += factory._end - factory._start
553
last_byte_used = max(last_byte_used, factory._end)
554
# If we are using most of the bytes from the block, we have nothing
555
# else to check (currently more that 1/2)
584
if last_byte_used < factory._end:
585
last_byte_used = factory._end
586
# If we are using more than half of the bytes from the block, we have
587
# nothing else to check
556
588
if total_bytes_used * 2 >= self._block._content_length:
558
# Can we just strip off the trailing bytes? If we are going to be
559
# transmitting more than 50% of the front of the content, go ahead
589
return None, last_byte_used, total_bytes_used
590
# We are using less than 50% of the content. Is the content we are
591
# using at the beginning of the block? If so, we can just trim the
592
# tail, rather than rebuilding from scratch.
560
593
if total_bytes_used * 2 > last_byte_used:
561
self._trim_block(last_byte_used)
594
return 'trim', last_byte_used, total_bytes_used
564
596
# We are using a small amount of the data, and it isn't just packed
565
597
# nicely at the front, so rebuild the content.
572
604
# expanding many deltas into fulltexts, as well.
573
605
# If we build a cheap enough 'strip', then we could try a strip,
574
606
# if that expands the content, we then rebuild.
575
self._rebuild_block()
607
return 'rebuild', last_byte_used, total_bytes_used
609
def check_is_well_utilized(self):
610
"""Is the current block considered 'well utilized'?
612
This heuristic asks if the current block considers itself to be a fully
613
developed group, rather than just a loose collection of data.
615
if len(self._factories) == 1:
616
# A block of length 1 could be improved by combining with other
617
# groups - don't look deeper. Even larger than max size groups
618
# could compress well with adjacent versions of the same thing.
620
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
621
block_size = self._block._content_length
622
if total_bytes_used < block_size * self._max_cut_fraction:
623
# This block wants to trim itself small enough that we want to
624
# consider it under-utilized.
626
# TODO: This code is meant to be the twin of _insert_record_stream's
627
# 'start_new_block' logic. It would probably be better to factor
628
# out that logic into a shared location, so that it stays
630
# We currently assume a block is properly utilized whenever it is >75%
631
# of the size of a 'full' block. In normal operation, a block is
632
# considered full when it hits 4MB of same-file content. So any block
633
# >3MB is 'full enough'.
634
# The only time this isn't true is when a given block has large-object
635
# content. (a single file >4MB, etc.)
636
# Under these circumstances, we allow a block to grow to
637
# 2 x largest_content. Which means that if a given block had a large
638
# object, it may actually be under-utilized. However, given that this
639
# is 'pack-on-the-fly' it is probably reasonable to not repack large
640
# content blobs on-the-fly. Note that because we return False for all
641
# 1-item blobs, we will repack them; we may wish to reevaluate our
642
# treatment of large object blobs in the future.
643
if block_size >= self._full_enough_block_size:
645
# If a block is <3MB, it still may be considered 'full' if it contains
646
# mixed content. The current rule is 2MB of mixed content is considered
647
# full. So check to see if this block contains mixed content, and
648
# set the threshold appropriately.
650
for factory in self._factories:
651
prefix = factory.key[:-1]
652
if common_prefix is None:
653
common_prefix = prefix
654
elif prefix != common_prefix:
655
# Mixed content, check the size appropriately
656
if block_size >= self._full_enough_mixed_block_size:
659
# The content failed both the mixed check and the single-content check
660
# so obviously it is not fully utilized
661
# TODO: there is one other constraint that isn't being checked
662
# namely, that the entries in the block are in the appropriate
663
# order. For example, you could insert the entries in exactly
664
# reverse groupcompress order, and we would think that is ok.
665
# (all the right objects are in one group, and it is fully
666
# utilized, etc.) For now, we assume that case is rare,
667
# especially since we should always fetch in 'groupcompress'
671
def _check_rebuild_block(self):
672
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
676
self._trim_block(last_byte_used)
677
elif action == 'rebuild':
678
self._rebuild_block()
680
raise ValueError('unknown rebuild action: %r' % (action,))
577
682
def _wire_bytes(self):
578
683
"""Return a byte stream suitable for transmitting over the wire."""
612
717
z_header_bytes = zlib.compress(header_bytes)
614
719
z_header_bytes_len = len(z_header_bytes)
615
block_bytes = self._block.to_bytes()
720
block_bytes_len, block_chunks = self._block.to_chunks()
616
721
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
618
723
lines.append(z_header_bytes)
619
lines.append(block_bytes)
620
del z_header_bytes, block_bytes
724
lines.extend(block_chunks)
725
del z_header_bytes, block_chunks
726
# TODO: This is a point where we will double the memory consumption. To
727
# avoid this, we probably have to switch to a 'chunked' api
621
728
return ''.join(lines)
624
731
def from_bytes(cls, bytes):
625
732
# TODO: This does extra string copying, probably better to do it a
733
# different way. At a minimum this creates 2 copies of the
627
735
(storage_kind, z_header_len, header_len,
628
736
block_len, rest) = bytes.split('\n', 4)
867
971
It contains code very similar to SequenceMatcher because of having a similar
868
972
task. However some key differences apply:
869
- there is no junk, we want a minimal edit not a human readable diff.
870
- we don't filter very common lines (because we don't know where a good
871
range will start, and after the first text we want to be emitting minmal
873
- we chain the left side, not the right side
874
- we incrementally update the adjacency matrix as new lines are provided.
875
- we look for matches in all of the left side, so the routine which does
876
the analagous task of find_longest_match does not need to filter on the
974
* there is no junk, we want a minimal edit not a human readable diff.
975
* we don't filter very common lines (because we don't know where a good
976
range will start, and after the first text we want to be emitting minmal
978
* we chain the left side, not the right side
979
* we incrementally update the adjacency matrix as new lines are provided.
980
* we look for matches in all of the left side, so the routine which does
981
the analagous task of find_longest_match does not need to filter on the
881
super(PyrexGroupCompressor, self).__init__()
882
self._delta_index = DeltaIndex()
985
def __init__(self, settings=None):
986
super(PyrexGroupCompressor, self).__init__(settings)
987
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
988
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
884
990
def _compress(self, key, bytes, max_delta_size, soft=False):
885
991
"""see _CommonGroupCompressor._compress"""
974
1080
versioned_files.stream.close()
977
class GroupCompressVersionedFiles(VersionedFiles):
1083
class _BatchingBlockFetcher(object):
1084
"""Fetch group compress blocks in batches.
1086
:ivar total_bytes: int of expected number of bytes needed to fetch the
1087
currently pending batch.
1090
def __init__(self, gcvf, locations, get_compressor_settings=None):
1092
self.locations = locations
1094
self.batch_memos = {}
1095
self.memos_to_get = []
1096
self.total_bytes = 0
1097
self.last_read_memo = None
1099
self._get_compressor_settings = get_compressor_settings
1101
def add_key(self, key):
1102
"""Add another to key to fetch.
1104
:return: The estimated number of bytes needed to fetch the batch so
1107
self.keys.append(key)
1108
index_memo, _, _, _ = self.locations[key]
1109
read_memo = index_memo[0:3]
1110
# Three possibilities for this read_memo:
1111
# - it's already part of this batch; or
1112
# - it's not yet part of this batch, but is already cached; or
1113
# - it's not yet part of this batch and will need to be fetched.
1114
if read_memo in self.batch_memos:
1115
# This read memo is already in this batch.
1116
return self.total_bytes
1118
cached_block = self.gcvf._group_cache[read_memo]
1120
# This read memo is new to this batch, and the data isn't cached
1122
self.batch_memos[read_memo] = None
1123
self.memos_to_get.append(read_memo)
1124
byte_length = read_memo[2]
1125
self.total_bytes += byte_length
1127
# This read memo is new to this batch, but cached.
1128
# Keep a reference to the cached block in batch_memos because it's
1129
# certain that we'll use it when this batch is processed, but
1130
# there's a risk that it would fall out of _group_cache between now
1132
self.batch_memos[read_memo] = cached_block
1133
return self.total_bytes
1135
def _flush_manager(self):
1136
if self.manager is not None:
1137
for factory in self.manager.get_record_stream():
1140
self.last_read_memo = None
1142
def yield_factories(self, full_flush=False):
1143
"""Yield factories for keys added since the last yield. They will be
1144
returned in the order they were added via add_key.
1146
:param full_flush: by default, some results may not be returned in case
1147
they can be part of the next batch. If full_flush is True, then
1148
all results are returned.
1150
if self.manager is None and not self.keys:
1152
# Fetch all memos in this batch.
1153
blocks = self.gcvf._get_blocks(self.memos_to_get)
1154
# Turn blocks into factories and yield them.
1155
memos_to_get_stack = list(self.memos_to_get)
1156
memos_to_get_stack.reverse()
1157
for key in self.keys:
1158
index_memo, _, parents, _ = self.locations[key]
1159
read_memo = index_memo[:3]
1160
if self.last_read_memo != read_memo:
1161
# We are starting a new block. If we have a
1162
# manager, we have found everything that fits for
1163
# now, so yield records
1164
for factory in self._flush_manager():
1166
# Now start a new manager.
1167
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1168
# The next block from _get_blocks will be the block we
1170
block_read_memo, block = blocks.next()
1171
if block_read_memo != read_memo:
1172
raise AssertionError(
1173
"block_read_memo out of sync with read_memo"
1174
"(%r != %r)" % (block_read_memo, read_memo))
1175
self.batch_memos[read_memo] = block
1176
memos_to_get_stack.pop()
1178
block = self.batch_memos[read_memo]
1179
self.manager = _LazyGroupContentManager(block,
1180
get_compressor_settings=self._get_compressor_settings)
1181
self.last_read_memo = read_memo
1182
start, end = index_memo[3:5]
1183
self.manager.add_factory(key, parents, start, end)
1185
for factory in self._flush_manager():
1188
self.batch_memos.clear()
1189
del self.memos_to_get[:]
1190
self.total_bytes = 0
1193
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
978
1194
"""A group-compress based VersionedFiles implementation."""
980
def __init__(self, index, access, delta=True):
1196
# This controls how the GroupCompress DeltaIndex works. Basically, we
1197
# compute hash pointers into the source blocks (so hash(text) => text).
1198
# However each of these references costs some memory in trade against a
1199
# more accurate match result. For very large files, they either are
1200
# pre-compressed and change in bulk whenever they change, or change in just
1201
# local blocks. Either way, 'improved resolution' is not very helpful,
1202
# versus running out of memory trying to track everything. The default max
1203
# gives 100% sampling of a 1MB file.
1204
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1205
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1206
_DEFAULT_MAX_BYTES_TO_INDEX}
1208
def __init__(self, index, access, delta=True, _unadded_refs=None,
981
1210
"""Create a GroupCompressVersionedFiles object.
983
1212
:param index: The index object storing access and graph data.
984
1213
:param access: The access object storing raw data.
985
1214
:param delta: Whether to delta compress or just entropy compress.
1215
:param _unadded_refs: private parameter, don't use.
1216
:param _group_cache: private parameter, don't use.
987
1218
self._index = index
988
1219
self._access = access
989
1220
self._delta = delta
990
self._unadded_refs = {}
991
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
992
self._fallback_vfs = []
1221
if _unadded_refs is None:
1223
self._unadded_refs = _unadded_refs
1224
if _group_cache is None:
1225
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1226
self._group_cache = _group_cache
1227
self._immediate_fallback_vfs = []
1228
self._max_bytes_to_index = None
1230
def without_fallbacks(self):
1231
"""Return a clone of this object without any fallbacks configured."""
1232
return GroupCompressVersionedFiles(self._index, self._access,
1233
self._delta, _unadded_refs=dict(self._unadded_refs),
1234
_group_cache=self._group_cache)
994
1236
def add_lines(self, key, parents, lines, parent_texts=None,
995
1237
left_matching_blocks=None, nostore_sha=None, random_id=False,
1137
1378
missing.difference_update(set(new_result))
1138
1379
return result, source_results
1140
def _get_block(self, index_memo):
1141
read_memo = index_memo[0:3]
1144
block = self._group_cache[read_memo]
1147
zdata = self._access.get_raw_records([read_memo]).next()
1148
# decompress - whole thing - this is not a bug, as it
1149
# permits caching. We might want to store the partially
1150
# decompresed group and decompress object, so that recent
1151
# texts are not penalised by big groups.
1152
block = GroupCompressBlock.from_bytes(zdata)
1153
self._group_cache[read_memo] = block
1155
# print len(zdata), len(plain)
1156
# parse - requires split_lines, better to have byte offsets
1157
# here (but not by much - we only split the region for the
1158
# recipe, and we often want to end up with lines anyway.
1381
def _get_blocks(self, read_memos):
1382
"""Get GroupCompressBlocks for the given read_memos.
1384
:returns: a series of (read_memo, block) pairs, in the order they were
1388
for read_memo in read_memos:
1390
block = self._group_cache[read_memo]
1394
cached[read_memo] = block
1396
not_cached_seen = set()
1397
for read_memo in read_memos:
1398
if read_memo in cached:
1399
# Don't fetch what we already have
1401
if read_memo in not_cached_seen:
1402
# Don't try to fetch the same data twice
1404
not_cached.append(read_memo)
1405
not_cached_seen.add(read_memo)
1406
raw_records = self._access.get_raw_records(not_cached)
1407
for read_memo in read_memos:
1409
yield read_memo, cached[read_memo]
1411
# Read the block, and cache it.
1412
zdata = raw_records.next()
1413
block = GroupCompressBlock.from_bytes(zdata)
1414
self._group_cache[read_memo] = block
1415
cached[read_memo] = block
1416
yield read_memo, block
1161
1418
def get_missing_compression_parent_keys(self):
1162
1419
"""Return the keys of missing compression parents.
1328
1586
unadded_keys, source_result)
1329
1587
for key in missing:
1330
1588
yield AbsentContentFactory(key)
1332
last_read_memo = None
1333
# TODO: This works fairly well at batching up existing groups into a
1334
# streamable format, and possibly allowing for taking one big
1335
# group and splitting it when it isn't fully utilized.
1336
# However, it doesn't allow us to find under-utilized groups and
1337
# combine them into a bigger group on the fly.
1338
# (Consider the issue with how chk_map inserts texts
1339
# one-at-a-time.) This could be done at insert_record_stream()
1340
# time, but it probably would decrease the number of
1341
# bytes-on-the-wire for fetch.
1589
# Batch up as many keys as we can until either:
1590
# - we encounter an unadded ref, or
1591
# - we run out of keys, or
1592
# - the total bytes to retrieve for this batch > BATCH_SIZE
1593
batcher = _BatchingBlockFetcher(self, locations,
1594
get_compressor_settings=self._get_compressor_settings)
1342
1595
for source, keys in source_keys:
1343
1596
if source is self:
1344
1597
for key in keys:
1345
1598
if key in self._unadded_refs:
1346
if manager is not None:
1347
for factory in manager.get_record_stream():
1349
last_read_memo = manager = None
1599
# Flush batch, then yield unadded ref from
1601
for factory in batcher.yield_factories(full_flush=True):
1350
1603
bytes, sha1 = self._compressor.extract(key)
1351
1604
parents = self._unadded_refs[key]
1352
1605
yield FulltextContentFactory(key, parents, sha1, bytes)
1354
index_memo, _, parents, (method, _) = locations[key]
1355
read_memo = index_memo[0:3]
1356
if last_read_memo != read_memo:
1357
# We are starting a new block. If we have a
1358
# manager, we have found everything that fits for
1359
# now, so yield records
1360
if manager is not None:
1361
for factory in manager.get_record_stream():
1363
# Now start a new manager
1364
block = self._get_block(index_memo)
1365
manager = _LazyGroupContentManager(block)
1366
last_read_memo = read_memo
1367
start, end = index_memo[3:5]
1368
manager.add_factory(key, parents, start, end)
1607
if batcher.add_key(key) > BATCH_SIZE:
1608
# Ok, this batch is big enough. Yield some results.
1609
for factory in batcher.yield_factories():
1370
if manager is not None:
1371
for factory in manager.get_record_stream():
1373
last_read_memo = manager = None
1612
for factory in batcher.yield_factories(full_flush=True):
1374
1614
for record in source.get_record_stream(keys, ordering,
1375
1615
include_delta_closure):
1377
if manager is not None:
1378
for factory in manager.get_record_stream():
1617
for factory in batcher.yield_factories(full_flush=True):
1381
1620
def get_sha1s(self, keys):
1382
1621
"""See VersionedFiles.get_sha1s()."""
1433
1696
# This will go up to fulltexts for gc to gc fetching, which isn't
1435
self._compressor = GroupCompressor()
1698
self._compressor = self._make_group_compressor()
1436
1699
self._unadded_refs = {}
1437
1700
keys_to_add = []
1439
bytes = self._compressor.flush().to_bytes()
1702
bytes_len, chunks = self._compressor.flush().to_chunks()
1703
self._compressor = self._make_group_compressor()
1704
# Note: At this point we still have 1 copy of the fulltext (in
1705
# record and the var 'bytes'), and this generates 2 copies of
1706
# the compressed text (one for bytes, one in chunks)
1707
# TODO: Push 'chunks' down into the _access api, so that we don't
1708
# have to double compressed memory here
1709
# TODO: Figure out how to indicate that we would be happy to free
1710
# the fulltext content at this point. Note that sometimes we
1711
# will want it later (streaming CHK pages), but most of the
1712
# time we won't (everything else)
1713
bytes = ''.join(chunks)
1440
1715
index, start, length = self._access.add_raw_records(
1441
1716
[(None, len(bytes))], bytes)[0]
1455
1729
block_length = None
1456
1730
# XXX: TODO: remove this, it is just for safety checking for now
1457
1731
inserted_keys = set()
1732
reuse_this_block = reuse_blocks
1458
1733
for record in stream:
1459
1734
# Raise an error when a record is missing.
1460
1735
if record.storage_kind == 'absent':
1461
1736
raise errors.RevisionNotPresent(record.key, self)
1463
1738
if record.key in inserted_keys:
1464
trace.note('Insert claimed random_id=True,'
1465
' but then inserted %r two times', record.key)
1739
trace.note(gettext('Insert claimed random_id=True,'
1740
' but then inserted %r two times'), record.key)
1467
1742
inserted_keys.add(record.key)
1468
1743
if reuse_blocks:
1469
1744
# If the reuse_blocks flag is set, check to see if we can just
1470
1745
# copy a groupcompress block as-is.
1746
# We only check on the first record (groupcompress-block) not
1747
# on all of the (groupcompress-block-ref) entries.
1748
# The reuse_this_block flag is then kept for as long as
1749
if record.storage_kind == 'groupcompress-block':
1750
# Check to see if we really want to re-use this block
1751
insert_manager = record._manager
1752
reuse_this_block = insert_manager.check_is_well_utilized()
1754
reuse_this_block = False
1755
if reuse_this_block:
1756
# We still want to reuse this block
1471
1757
if record.storage_kind == 'groupcompress-block':
1472
1758
# Insert the raw block into the target repo
1473
1759
insert_manager = record._manager
1474
insert_manager._check_rebuild_block()
1475
1760
bytes = record._manager._block.to_bytes()
1476
1761
_, start, length = self._access.add_raw_records(
1477
1762
[(None, len(bytes))], bytes)[0]
1587
1882
"""See VersionedFiles.keys."""
1588
1883
if 'evil' in debug.debug_flags:
1589
1884
trace.mutter_callsite(2, "keys scales with size of history")
1590
sources = [self._index] + self._fallback_vfs
1885
sources = [self._index] + self._immediate_fallback_vfs
1592
1887
for source in sources:
1593
1888
result.update(source.keys())
1892
class _GCBuildDetails(object):
1893
"""A blob of data about the build details.
1895
This stores the minimal data, which then allows compatibility with the old
1896
api, without taking as much memory.
1899
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1900
'_delta_end', '_parents')
1903
compression_parent = None
1905
def __init__(self, parents, position_info):
1906
self._parents = parents
1907
(self._index, self._group_start, self._group_end, self._basis_end,
1908
self._delta_end) = position_info
1911
return '%s(%s, %s)' % (self.__class__.__name__,
1912
self.index_memo, self._parents)
1915
def index_memo(self):
1916
return (self._index, self._group_start, self._group_end,
1917
self._basis_end, self._delta_end)
1920
def record_details(self):
1921
return static_tuple.StaticTuple(self.method, None)
1923
def __getitem__(self, offset):
1924
"""Compatibility thunk to act like a tuple."""
1926
return self.index_memo
1928
return self.compression_parent # Always None
1930
return self._parents
1932
return self.record_details
1934
raise IndexError('offset out of range')
1597
1940
class _GCGraphIndex(object):
1598
1941
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1600
1943
def __init__(self, graph_index, is_locked, parents=True,
1601
1944
add_callback=None, track_external_parent_refs=False,
1602
inconsistency_fatal=True):
1945
inconsistency_fatal=True, track_new_keys=False):
1603
1946
"""Construct a _GCGraphIndex on a graph_index.
1605
1948
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1796
2151
"""Convert an index value to position details."""
1797
2152
bits = node[2].split(' ')
1798
2153
# It would be nice not to read the entire gzip.
2154
# start and stop are put into _int_cache because they are very common.
2155
# They define the 'group' that an entry is in, and many groups can have
2156
# thousands of objects.
2157
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2158
# each, or about 7MB. Note that it might be even more when you consider
2159
# how PyInt is allocated in separate slabs. And you can't return a slab
2160
# to the OS if even 1 int on it is in use. Note though that Python uses
2161
# a LIFO when re-using PyInt slots, which might cause more
1799
2163
start = int(bits[0])
2164
start = self._int_cache.setdefault(start, start)
1800
2165
stop = int(bits[1])
2166
stop = self._int_cache.setdefault(stop, stop)
1801
2167
basis_end = int(bits[2])
1802
2168
delta_end = int(bits[3])
1803
return node[0], start, stop, basis_end, delta_end
2169
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2171
return (node[0], start, stop, basis_end, delta_end)
1805
2173
def scan_unvalidated_index(self, graph_index):
1806
2174
"""Inform this _GCGraphIndex that there is an unvalidated index.
1808
2176
This allows this _GCGraphIndex to keep track of any missing
1809
2177
compression parents we may want to have filled in to make those
2178
indices valid. It also allows _GCGraphIndex to track any new keys.
1812
2180
:param graph_index: A GraphIndex
1814
if self._key_dependencies is not None:
1815
# Add parent refs from graph_index (and discard parent refs that
1816
# the graph_index has).
1817
add_refs = self._key_dependencies.add_references
1818
for node in graph_index.iter_all_entries():
1819
add_refs(node[1], node[3][0])
2182
key_dependencies = self._key_dependencies
2183
if key_dependencies is None:
2185
for node in graph_index.iter_all_entries():
2186
# Add parent refs from graph_index (and discard parent refs
2187
# that the graph_index has).
2188
key_dependencies.add_references(node[1], node[3][0])
1823
2191
from bzrlib._groupcompress_py import (