138
137
% (num_bytes, self._content_length))
139
138
# Expand the content if required
140
139
if self._content is None:
141
if self._content_chunks is not None:
142
self._content = ''.join(self._content_chunks)
143
self._content_chunks = None
144
if self._content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
140
if self._z_content is None:
149
141
raise AssertionError('No content to decompress')
150
z_content = ''.join(self._z_content_chunks)
142
if self._z_content == '':
152
143
self._content = ''
153
144
elif self._compressor_name == 'lzma':
154
145
# We don't do partial lzma decomp yet
155
self._content = pylzma.decompress(z_content)
146
self._content = pylzma.decompress(self._z_content)
156
147
elif self._compressor_name == 'zlib':
157
148
# Start a zlib decompressor
158
if num_bytes * 4 > self._content_length * 3:
159
# If we are requesting more that 3/4ths of the content,
160
# just extract the whole thing in a single pass
161
num_bytes = self._content_length
162
self._content = zlib.decompress(z_content)
149
if num_bytes is None:
150
self._content = zlib.decompress(self._z_content)
164
152
self._z_content_decompressor = zlib.decompressobj()
165
153
# Seed the decompressor with the uncompressed bytes, so
166
154
# that the rest of the code is simplified
167
155
self._content = self._z_content_decompressor.decompress(
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
if not self._z_content_decompressor.unconsumed_tail:
170
self._z_content_decompressor = None
156
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
172
158
raise AssertionError('Unknown compressor: %r'
173
159
% self._compressor_name)
175
161
# 'unconsumed_tail'
177
163
# Do we have enough bytes already?
178
if len(self._content) >= num_bytes:
164
if num_bytes is not None and len(self._content) >= num_bytes:
166
if num_bytes is None and self._z_content_decompressor is None:
167
# We must have already decompressed everything
180
169
# If we got this far, and don't have a decompressor, something is wrong
181
170
if self._z_content_decompressor is None:
182
171
raise AssertionError(
183
172
'No decompressor to decompress %d bytes' % num_bytes)
184
173
remaining_decomp = self._z_content_decompressor.unconsumed_tail
185
if not remaining_decomp:
186
raise AssertionError('Nothing left to decompress')
187
needed_bytes = num_bytes - len(self._content)
188
# We always set max_size to 32kB over the minimum needed, so that
189
# zlib will give us as much as we really want.
190
# TODO: If this isn't good enough, we could make a loop here,
191
# that keeps expanding the request until we get enough
192
self._content += self._z_content_decompressor.decompress(
193
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
194
if len(self._content) < num_bytes:
195
raise AssertionError('%d bytes wanted, only %d available'
196
% (num_bytes, len(self._content)))
197
if not self._z_content_decompressor.unconsumed_tail:
198
# The stream is finished
199
self._z_content_decompressor = None
174
if num_bytes is None:
176
# We don't know how much is left, but we'll decompress it all
177
self._content += self._z_content_decompressor.decompress(
179
# Note: There's what I consider a bug in zlib.decompressobj
180
# If you pass back in the entire unconsumed_tail, only
181
# this time you don't pass a max-size, it doesn't
182
# change the unconsumed_tail back to None/''.
183
# However, we know we are done with the whole stream
184
self._z_content_decompressor = None
185
# XXX: Why is this the only place in this routine we set this?
186
self._content_length = len(self._content)
188
if not remaining_decomp:
189
raise AssertionError('Nothing left to decompress')
190
needed_bytes = num_bytes - len(self._content)
191
# We always set max_size to 32kB over the minimum needed, so that
192
# zlib will give us as much as we really want.
193
# TODO: If this isn't good enough, we could make a loop here,
194
# that keeps expanding the request until we get enough
195
self._content += self._z_content_decompressor.decompress(
196
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
197
if len(self._content) < num_bytes:
198
raise AssertionError('%d bytes wanted, only %d available'
199
% (num_bytes, len(self._content)))
200
if not self._z_content_decompressor.unconsumed_tail:
201
# The stream is finished
202
self._z_content_decompressor = None
201
204
def _parse_bytes(self, bytes, pos):
202
205
"""Read the various lengths from the header.
280
273
bytes = apply_delta_to_source(self._content, content_start, end)
283
def set_chunked_content(self, content_chunks, length):
284
"""Set the content of this block to the given chunks."""
285
# If we have lots of short lines, it is may be more efficient to join
286
# the content ahead of time. If the content is <10MiB, we don't really
287
# care about the extra memory consumption, so we can just pack it and
288
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
289
# mysql, which is below the noise margin
290
self._content_length = length
291
self._content_chunks = content_chunks
293
self._z_content_chunks = None
295
276
def set_content(self, content):
296
277
"""Set the content of this block."""
297
278
self._content_length = len(content)
298
279
self._content = content
299
self._z_content_chunks = None
301
def _create_z_content_using_lzma(self):
302
if self._content_chunks is not None:
303
self._content = ''.join(self._content_chunks)
304
self._content_chunks = None
305
if self._content is None:
306
raise AssertionError('Nothing to compress')
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
311
def _create_z_content_from_chunks(self, chunks):
312
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
316
compressed_chunks.append(compressor.flush())
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
321
def _create_z_content(self):
322
if self._z_content_chunks is not None:
280
self._z_content = None
283
"""Encode the information into a byte stream."""
284
compress = zlib.compress
325
self._create_z_content_using_lzma()
327
if self._content_chunks is not None:
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
334
"""Create the byte stream as a series of 'chunks'"""
335
self._create_z_content()
286
compress = pylzma.compress
287
if self._z_content is None:
288
if self._content is None:
289
raise AssertionError('Nothing to compress')
290
self._z_content = compress(self._content)
291
self._z_content_length = len(self._z_content)
337
293
header = self.GCB_LZ_HEADER
339
295
header = self.GCB_HEADER
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
297
'%d\n%d\n' % (self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
350
300
return ''.join(chunks)
352
def _dump(self, include_text=False):
353
"""Take this block, and spit out a human-readable structure.
355
:param include_text: Inserts also include text bits, chose whether you
356
want this displayed in the dump or not.
357
:return: A dump of the given block. The layout is something like:
358
[('f', length), ('d', delta_length, text_length, [delta_info])]
359
delta_info := [('i', num_bytes, text), ('c', offset, num_bytes),
362
self._ensure_content()
365
while pos < self._content_length:
366
kind = self._content[pos]
368
if kind not in ('f', 'd'):
369
raise ValueError('invalid kind character: %r' % (kind,))
370
content_len, len_len = decode_base128_int(
371
self._content[pos:pos + 5])
373
if content_len + pos > self._content_length:
374
raise ValueError('invalid content_len %d for record @ pos %d'
375
% (content_len, pos - len_len - 1))
376
if kind == 'f': # Fulltext
378
text = self._content[pos:pos+content_len]
379
result.append(('f', content_len, text))
381
result.append(('f', content_len))
382
elif kind == 'd': # Delta
383
delta_content = self._content[pos:pos+content_len]
385
# The first entry in a delta is the decompressed length
386
decomp_len, delta_pos = decode_base128_int(delta_content)
387
result.append(('d', content_len, decomp_len, delta_info))
389
while delta_pos < content_len:
390
c = ord(delta_content[delta_pos])
394
delta_pos) = decode_copy_instruction(delta_content, c,
397
text = self._content[offset:offset+length]
398
delta_info.append(('c', offset, length, text))
400
delta_info.append(('c', offset, length))
401
measured_len += length
404
txt = delta_content[delta_pos:delta_pos+c]
407
delta_info.append(('i', c, txt))
410
if delta_pos != content_len:
411
raise ValueError('Delta consumed a bad number of bytes:'
412
' %d != %d' % (delta_pos, content_len))
413
if measured_len != decomp_len:
414
raise ValueError('Delta claimed fulltext was %d bytes, but'
415
' extraction resulted in %d bytes'
416
% (decomp_len, measured_len))
421
303
class _LazyGroupCompressFactory(object):
422
304
"""Yield content from a GroupCompressBlock on demand."""
486
366
class _LazyGroupContentManager(object):
487
367
"""This manages a group of _LazyGroupCompressFactory objects."""
489
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
490
# current size, and still be considered
492
_full_block_size = 4*1024*1024
493
_full_mixed_block_size = 2*1024*1024
494
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
497
def __init__(self, block, get_compressor_settings=None):
369
def __init__(self, block):
498
370
self._block = block
499
371
# We need to preserve the ordering
500
372
self._factories = []
501
373
self._last_byte = 0
502
self._get_settings = get_compressor_settings
503
self._compressor_settings = None
505
def _get_compressor_settings(self):
506
if self._compressor_settings is not None:
507
return self._compressor_settings
509
if self._get_settings is not None:
510
settings = self._get_settings()
512
vf = GroupCompressVersionedFiles
513
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
self._compressor_settings = settings
515
return self._compressor_settings
517
375
def add_factory(self, key, parents, start, end):
518
376
if not self._factories:
593
443
# time (self._block._content) is a little expensive.
594
444
self._block._ensure_content(self._last_byte)
596
def _check_rebuild_action(self):
446
def _check_rebuild_block(self):
597
447
"""Check to see if our block should be repacked."""
598
448
total_bytes_used = 0
599
449
last_byte_used = 0
600
450
for factory in self._factories:
601
451
total_bytes_used += factory._end - factory._start
602
if last_byte_used < factory._end:
603
last_byte_used = factory._end
604
# If we are using more than half of the bytes from the block, we have
605
# nothing else to check
452
last_byte_used = max(last_byte_used, factory._end)
453
# If we are using most of the bytes from the block, we have nothing
454
# else to check (currently more that 1/2)
606
455
if total_bytes_used * 2 >= self._block._content_length:
607
return None, last_byte_used, total_bytes_used
608
# We are using less than 50% of the content. Is the content we are
609
# using at the beginning of the block? If so, we can just trim the
610
# tail, rather than rebuilding from scratch.
457
# Can we just strip off the trailing bytes? If we are going to be
458
# transmitting more than 50% of the front of the content, go ahead
611
459
if total_bytes_used * 2 > last_byte_used:
612
return 'trim', last_byte_used, total_bytes_used
460
self._trim_block(last_byte_used)
614
463
# We are using a small amount of the data, and it isn't just packed
615
464
# nicely at the front, so rebuild the content.
622
471
# expanding many deltas into fulltexts, as well.
623
472
# If we build a cheap enough 'strip', then we could try a strip,
624
473
# if that expands the content, we then rebuild.
625
return 'rebuild', last_byte_used, total_bytes_used
627
def check_is_well_utilized(self):
628
"""Is the current block considered 'well utilized'?
630
This heuristic asks if the current block considers itself to be a fully
631
developed group, rather than just a loose collection of data.
633
if len(self._factories) == 1:
634
# A block of length 1 could be improved by combining with other
635
# groups - don't look deeper. Even larger than max size groups
636
# could compress well with adjacent versions of the same thing.
638
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
639
block_size = self._block._content_length
640
if total_bytes_used < block_size * self._max_cut_fraction:
641
# This block wants to trim itself small enough that we want to
642
# consider it under-utilized.
644
# TODO: This code is meant to be the twin of _insert_record_stream's
645
# 'start_new_block' logic. It would probably be better to factor
646
# out that logic into a shared location, so that it stays
648
# We currently assume a block is properly utilized whenever it is >75%
649
# of the size of a 'full' block. In normal operation, a block is
650
# considered full when it hits 4MB of same-file content. So any block
651
# >3MB is 'full enough'.
652
# The only time this isn't true is when a given block has large-object
653
# content. (a single file >4MB, etc.)
654
# Under these circumstances, we allow a block to grow to
655
# 2 x largest_content. Which means that if a given block had a large
656
# object, it may actually be under-utilized. However, given that this
657
# is 'pack-on-the-fly' it is probably reasonable to not repack large
658
# content blobs on-the-fly. Note that because we return False for all
659
# 1-item blobs, we will repack them; we may wish to reevaluate our
660
# treatment of large object blobs in the future.
661
if block_size >= self._full_enough_block_size:
663
# If a block is <3MB, it still may be considered 'full' if it contains
664
# mixed content. The current rule is 2MB of mixed content is considered
665
# full. So check to see if this block contains mixed content, and
666
# set the threshold appropriately.
668
for factory in self._factories:
669
prefix = factory.key[:-1]
670
if common_prefix is None:
671
common_prefix = prefix
672
elif prefix != common_prefix:
673
# Mixed content, check the size appropriately
674
if block_size >= self._full_enough_mixed_block_size:
677
# The content failed both the mixed check and the single-content check
678
# so obviously it is not fully utilized
679
# TODO: there is one other constraint that isn't being checked
680
# namely, that the entries in the block are in the appropriate
681
# order. For example, you could insert the entries in exactly
682
# reverse groupcompress order, and we would think that is ok.
683
# (all the right objects are in one group, and it is fully
684
# utilized, etc.) For now, we assume that case is rare,
685
# especially since we should always fetch in 'groupcompress'
689
def _check_rebuild_block(self):
690
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
694
self._trim_block(last_byte_used)
695
elif action == 'rebuild':
696
self._rebuild_block()
698
raise ValueError('unknown rebuild action: %r' % (action,))
474
self._rebuild_block()
700
476
def _wire_bytes(self):
701
477
"""Return a byte stream suitable for transmitting over the wire."""
735
511
z_header_bytes = zlib.compress(header_bytes)
737
513
z_header_bytes_len = len(z_header_bytes)
738
block_bytes_len, block_chunks = self._block.to_chunks()
514
block_bytes = self._block.to_bytes()
739
515
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
741
517
lines.append(z_header_bytes)
742
lines.extend(block_chunks)
743
del z_header_bytes, block_chunks
744
# TODO: This is a point where we will double the memory consumption. To
745
# avoid this, we probably have to switch to a 'chunked' api
518
lines.append(block_bytes)
519
del z_header_bytes, block_bytes
746
520
return ''.join(lines)
749
523
def from_bytes(cls, bytes):
750
524
# TODO: This does extra string copying, probably better to do it a
751
# different way. At a minimum this creates 2 copies of the
753
526
(storage_kind, z_header_len, header_len,
754
527
block_len, rest) = bytes.split('\n', 4)
989
758
It contains code very similar to SequenceMatcher because of having a similar
990
759
task. However some key differences apply:
992
* there is no junk, we want a minimal edit not a human readable diff.
993
* we don't filter very common lines (because we don't know where a good
994
range will start, and after the first text we want to be emitting minmal
996
* we chain the left side, not the right side
997
* we incrementally update the adjacency matrix as new lines are provided.
998
* we look for matches in all of the left side, so the routine which does
999
the analagous task of find_longest_match does not need to filter on the
760
- there is no junk, we want a minimal edit not a human readable diff.
761
- we don't filter very common lines (because we don't know where a good
762
range will start, and after the first text we want to be emitting minmal
764
- we chain the left side, not the right side
765
- we incrementally update the adjacency matrix as new lines are provided.
766
- we look for matches in all of the left side, so the routine which does
767
the analagous task of find_longest_match does not need to filter on the
1003
def __init__(self, settings=None):
1004
super(PyrexGroupCompressor, self).__init__(settings)
1005
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
772
super(PyrexGroupCompressor, self).__init__()
773
self._delta_index = DeltaIndex()
1008
775
def _compress(self, key, bytes, max_delta_size, soft=False):
1009
776
"""see _CommonGroupCompressor._compress"""
1098
864
versioned_files.stream.close()
1101
class _BatchingBlockFetcher(object):
1102
"""Fetch group compress blocks in batches.
1104
:ivar total_bytes: int of expected number of bytes needed to fetch the
1105
currently pending batch.
1108
def __init__(self, gcvf, locations, get_compressor_settings=None):
1110
self.locations = locations
1112
self.batch_memos = {}
1113
self.memos_to_get = []
1114
self.total_bytes = 0
1115
self.last_read_memo = None
1117
self._get_compressor_settings = get_compressor_settings
1119
def add_key(self, key):
1120
"""Add another to key to fetch.
1122
:return: The estimated number of bytes needed to fetch the batch so
1125
self.keys.append(key)
1126
index_memo, _, _, _ = self.locations[key]
1127
read_memo = index_memo[0:3]
1128
# Three possibilities for this read_memo:
1129
# - it's already part of this batch; or
1130
# - it's not yet part of this batch, but is already cached; or
1131
# - it's not yet part of this batch and will need to be fetched.
1132
if read_memo in self.batch_memos:
1133
# This read memo is already in this batch.
1134
return self.total_bytes
1136
cached_block = self.gcvf._group_cache[read_memo]
1138
# This read memo is new to this batch, and the data isn't cached
1140
self.batch_memos[read_memo] = None
1141
self.memos_to_get.append(read_memo)
1142
byte_length = read_memo[2]
1143
self.total_bytes += byte_length
1145
# This read memo is new to this batch, but cached.
1146
# Keep a reference to the cached block in batch_memos because it's
1147
# certain that we'll use it when this batch is processed, but
1148
# there's a risk that it would fall out of _group_cache between now
1150
self.batch_memos[read_memo] = cached_block
1151
return self.total_bytes
1153
def _flush_manager(self):
1154
if self.manager is not None:
1155
for factory in self.manager.get_record_stream():
1158
self.last_read_memo = None
1160
def yield_factories(self, full_flush=False):
1161
"""Yield factories for keys added since the last yield. They will be
1162
returned in the order they were added via add_key.
1164
:param full_flush: by default, some results may not be returned in case
1165
they can be part of the next batch. If full_flush is True, then
1166
all results are returned.
1168
if self.manager is None and not self.keys:
1170
# Fetch all memos in this batch.
1171
blocks = self.gcvf._get_blocks(self.memos_to_get)
1172
# Turn blocks into factories and yield them.
1173
memos_to_get_stack = list(self.memos_to_get)
1174
memos_to_get_stack.reverse()
1175
for key in self.keys:
1176
index_memo, _, parents, _ = self.locations[key]
1177
read_memo = index_memo[:3]
1178
if self.last_read_memo != read_memo:
1179
# We are starting a new block. If we have a
1180
# manager, we have found everything that fits for
1181
# now, so yield records
1182
for factory in self._flush_manager():
1184
# Now start a new manager.
1185
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1186
# The next block from _get_blocks will be the block we
1188
block_read_memo, block = blocks.next()
1189
if block_read_memo != read_memo:
1190
raise AssertionError(
1191
"block_read_memo out of sync with read_memo"
1192
"(%r != %r)" % (block_read_memo, read_memo))
1193
self.batch_memos[read_memo] = block
1194
memos_to_get_stack.pop()
1196
block = self.batch_memos[read_memo]
1197
self.manager = _LazyGroupContentManager(block,
1198
get_compressor_settings=self._get_compressor_settings)
1199
self.last_read_memo = read_memo
1200
start, end = index_memo[3:5]
1201
self.manager.add_factory(key, parents, start, end)
1203
for factory in self._flush_manager():
1206
self.batch_memos.clear()
1207
del self.memos_to_get[:]
1208
self.total_bytes = 0
1211
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
867
class GroupCompressVersionedFiles(VersionedFiles):
1212
868
"""A group-compress based VersionedFiles implementation."""
1214
# This controls how the GroupCompress DeltaIndex works. Basically, we
1215
# compute hash pointers into the source blocks (so hash(text) => text).
1216
# However each of these references costs some memory in trade against a
1217
# more accurate match result. For very large files, they either are
1218
# pre-compressed and change in bulk whenever they change, or change in just
1219
# local blocks. Either way, 'improved resolution' is not very helpful,
1220
# versus running out of memory trying to track everything. The default max
1221
# gives 100% sampling of a 1MB file.
1222
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
_DEFAULT_MAX_BYTES_TO_INDEX}
1226
def __init__(self, index, access, delta=True, _unadded_refs=None,
870
def __init__(self, index, access, delta=True):
1228
871
"""Create a GroupCompressVersionedFiles object.
1230
873
:param index: The index object storing access and graph data.
1231
874
:param access: The access object storing raw data.
1232
875
:param delta: Whether to delta compress or just entropy compress.
1233
:param _unadded_refs: private parameter, don't use.
1234
:param _group_cache: private parameter, don't use.
1236
877
self._index = index
1237
878
self._access = access
1238
879
self._delta = delta
1239
if _unadded_refs is None:
1241
self._unadded_refs = _unadded_refs
1242
if _group_cache is None:
1243
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
self._group_cache = _group_cache
1245
self._immediate_fallback_vfs = []
1246
self._max_bytes_to_index = None
1248
def without_fallbacks(self):
1249
"""Return a clone of this object without any fallbacks configured."""
1250
return GroupCompressVersionedFiles(self._index, self._access,
1251
self._delta, _unadded_refs=dict(self._unadded_refs),
1252
_group_cache=self._group_cache)
880
self._unadded_refs = {}
881
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
882
self._fallback_vfs = []
1254
884
def add_lines(self, key, parents, lines, parent_texts=None,
1255
885
left_matching_blocks=None, nostore_sha=None, random_id=False,
1301
931
nostore_sha=nostore_sha))[0]
1302
932
return sha1, length, None
1304
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1305
"""See VersionedFiles._add_text()."""
1306
self._index._check_write_ok()
1307
self._check_add(key, None, random_id, check_content=False)
1308
if text.__class__ is not str:
1309
raise errors.BzrBadParameterUnicode("text")
1311
# The caller might pass None if there is no graph data, but kndx
1312
# indexes can't directly store that, so we give them
1313
# an empty tuple instead.
1315
# double handling for now. Make it work until then.
1317
record = FulltextContentFactory(key, parents, None, text)
1318
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1319
nostore_sha=nostore_sha))[0]
1320
return sha1, length, None
1322
934
def add_fallback_versioned_files(self, a_versioned_files):
1323
935
"""Add a source of texts for texts not present in this knit.
1325
937
:param a_versioned_files: A VersionedFiles object.
1327
self._immediate_fallback_vfs.append(a_versioned_files)
939
self._fallback_vfs.append(a_versioned_files)
1329
941
def annotate(self, key):
1330
942
"""See VersionedFiles.annotate."""
1331
ann = annotate.Annotator(self)
1332
return ann.annotate_flat(key)
1334
def get_annotator(self):
1335
return annotate.Annotator(self)
1337
def check(self, progress_bar=None, keys=None):
944
parent_map = self.get_parent_map([key])
946
raise errors.RevisionNotPresent(key, self)
947
if parent_map[key] is not None:
948
search = graph._make_breadth_first_searcher([key])
952
present, ghosts = search.next_with_ghosts()
953
except StopIteration:
956
parent_map = self.get_parent_map(keys)
959
parent_map = {key:()}
960
head_cache = _mod_graph.FrozenHeadsCache(graph)
962
reannotate = annotate.reannotate
963
for record in self.get_record_stream(keys, 'topological', True):
965
chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
966
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
967
parent_cache[key] = list(
968
reannotate(parent_lines, chunks, key, None, head_cache))
969
return parent_cache[key]
971
def check(self, progress_bar=None):
1338
972
"""See VersionedFiles.check()."""
1341
for record in self.get_record_stream(keys, 'unordered', True):
1342
record.get_bytes_as('fulltext')
1344
return self.get_record_stream(keys, 'unordered', True)
1346
def clear_cache(self):
1347
"""See VersionedFiles.clear_cache()"""
1348
self._group_cache.clear()
1349
self._index._graph_index.clear_cache()
1350
self._index._int_cache.clear()
974
for record in self.get_record_stream(keys, 'unordered', True):
975
record.get_bytes_as('fulltext')
1352
977
def _check_add(self, key, lines, random_id, check_content):
1353
978
"""check that version_id and lines are safe to add."""
1396
1021
missing.difference_update(set(new_result))
1397
1022
return result, source_results
1399
def _get_blocks(self, read_memos):
1400
"""Get GroupCompressBlocks for the given read_memos.
1402
:returns: a series of (read_memo, block) pairs, in the order they were
1406
for read_memo in read_memos:
1408
block = self._group_cache[read_memo]
1412
cached[read_memo] = block
1414
not_cached_seen = set()
1415
for read_memo in read_memos:
1416
if read_memo in cached:
1417
# Don't fetch what we already have
1419
if read_memo in not_cached_seen:
1420
# Don't try to fetch the same data twice
1422
not_cached.append(read_memo)
1423
not_cached_seen.add(read_memo)
1424
raw_records = self._access.get_raw_records(not_cached)
1425
for read_memo in read_memos:
1427
yield read_memo, cached[read_memo]
1429
# Read the block, and cache it.
1430
zdata = raw_records.next()
1431
block = GroupCompressBlock.from_bytes(zdata)
1432
self._group_cache[read_memo] = block
1433
cached[read_memo] = block
1434
yield read_memo, block
1024
def _get_block(self, index_memo):
1025
read_memo = index_memo[0:3]
1028
block = self._group_cache[read_memo]
1031
zdata = self._access.get_raw_records([read_memo]).next()
1032
# decompress - whole thing - this is not a bug, as it
1033
# permits caching. We might want to store the partially
1034
# decompresed group and decompress object, so that recent
1035
# texts are not penalised by big groups.
1036
block = GroupCompressBlock.from_bytes(zdata)
1037
self._group_cache[read_memo] = block
1039
# print len(zdata), len(plain)
1040
# parse - requires split_lines, better to have byte offsets
1041
# here (but not by much - we only split the region for the
1042
# recipe, and we often want to end up with lines anyway.
1436
1045
def get_missing_compression_parent_keys(self):
1437
1046
"""Return the keys of missing compression parents.
1604
1212
unadded_keys, source_result)
1605
1213
for key in missing:
1606
1214
yield AbsentContentFactory(key)
1607
# Batch up as many keys as we can until either:
1608
# - we encounter an unadded ref, or
1609
# - we run out of keys, or
1610
# - the total bytes to retrieve for this batch > BATCH_SIZE
1611
batcher = _BatchingBlockFetcher(self, locations,
1612
get_compressor_settings=self._get_compressor_settings)
1216
last_read_memo = None
1217
# TODO: This works fairly well at batching up existing groups into a
1218
# streamable format, and possibly allowing for taking one big
1219
# group and splitting it when it isn't fully utilized.
1220
# However, it doesn't allow us to find under-utilized groups and
1221
# combine them into a bigger group on the fly.
1222
# (Consider the issue with how chk_map inserts texts
1223
# one-at-a-time.) This could be done at insert_record_stream()
1224
# time, but it probably would decrease the number of
1225
# bytes-on-the-wire for fetch.
1613
1226
for source, keys in source_keys:
1614
1227
if source is self:
1615
1228
for key in keys:
1616
1229
if key in self._unadded_refs:
1617
# Flush batch, then yield unadded ref from
1619
for factory in batcher.yield_factories(full_flush=True):
1230
if manager is not None:
1231
for factory in manager.get_record_stream():
1233
last_read_memo = manager = None
1621
1234
bytes, sha1 = self._compressor.extract(key)
1622
1235
parents = self._unadded_refs[key]
1623
1236
yield FulltextContentFactory(key, parents, sha1, bytes)
1625
if batcher.add_key(key) > BATCH_SIZE:
1626
# Ok, this batch is big enough. Yield some results.
1627
for factory in batcher.yield_factories():
1238
index_memo, _, parents, (method, _) = locations[key]
1239
read_memo = index_memo[0:3]
1240
if last_read_memo != read_memo:
1241
# We are starting a new block. If we have a
1242
# manager, we have found everything that fits for
1243
# now, so yield records
1244
if manager is not None:
1245
for factory in manager.get_record_stream():
1247
# Now start a new manager
1248
block = self._get_block(index_memo)
1249
manager = _LazyGroupContentManager(block)
1250
last_read_memo = read_memo
1251
start, end = index_memo[3:5]
1252
manager.add_factory(key, parents, start, end)
1630
for factory in batcher.yield_factories(full_flush=True):
1254
if manager is not None:
1255
for factory in manager.get_record_stream():
1257
last_read_memo = manager = None
1632
1258
for record in source.get_record_stream(keys, ordering,
1633
1259
include_delta_closure):
1635
for factory in batcher.yield_factories(full_flush=True):
1261
if manager is not None:
1262
for factory in manager.get_record_stream():
1638
1265
def get_sha1s(self, keys):
1639
1266
"""See VersionedFiles.get_sha1s()."""
1714
1317
# This will go up to fulltexts for gc to gc fetching, which isn't
1716
self._compressor = self._make_group_compressor()
1319
self._compressor = GroupCompressor()
1717
1320
self._unadded_refs = {}
1718
1321
keys_to_add = []
1720
bytes_len, chunks = self._compressor.flush().to_chunks()
1721
self._compressor = self._make_group_compressor()
1722
# Note: At this point we still have 1 copy of the fulltext (in
1723
# record and the var 'bytes'), and this generates 2 copies of
1724
# the compressed text (one for bytes, one in chunks)
1725
# TODO: Push 'chunks' down into the _access api, so that we don't
1726
# have to double compressed memory here
1727
# TODO: Figure out how to indicate that we would be happy to free
1728
# the fulltext content at this point. Note that sometimes we
1729
# will want it later (streaming CHK pages), but most of the
1730
# time we won't (everything else)
1731
bytes = ''.join(chunks)
1323
bytes = self._compressor.flush().to_bytes()
1733
1324
index, start, length = self._access.add_raw_records(
1734
1325
[(None, len(bytes))], bytes)[0]
1886
1459
'unordered', True)):
1887
1460
# XXX: todo - optimise to use less than full texts.
1888
1461
key = record.key
1890
pb.update('Walking content', key_idx, total)
1462
pb.update('Walking content', key_idx, total)
1891
1463
if record.storage_kind == 'absent':
1892
1464
raise errors.RevisionNotPresent(key, self)
1893
1465
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1894
1466
for line in lines:
1895
1467
yield line, key
1897
pb.update('Walking content', total, total)
1468
pb.update('Walking content', total, total)
1899
1470
def keys(self):
1900
1471
"""See VersionedFiles.keys."""
1901
1472
if 'evil' in debug.debug_flags:
1902
1473
trace.mutter_callsite(2, "keys scales with size of history")
1903
sources = [self._index] + self._immediate_fallback_vfs
1474
sources = [self._index] + self._fallback_vfs
1905
1476
for source in sources:
1906
1477
result.update(source.keys())
1910
class _GCBuildDetails(object):
1911
"""A blob of data about the build details.
1913
This stores the minimal data, which then allows compatibility with the old
1914
api, without taking as much memory.
1917
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1918
'_delta_end', '_parents')
1921
compression_parent = None
1923
def __init__(self, parents, position_info):
1924
self._parents = parents
1925
(self._index, self._group_start, self._group_end, self._basis_end,
1926
self._delta_end) = position_info
1929
return '%s(%s, %s)' % (self.__class__.__name__,
1930
self.index_memo, self._parents)
1933
def index_memo(self):
1934
return (self._index, self._group_start, self._group_end,
1935
self._basis_end, self._delta_end)
1938
def record_details(self):
1939
return static_tuple.StaticTuple(self.method, None)
1941
def __getitem__(self, offset):
1942
"""Compatibility thunk to act like a tuple."""
1944
return self.index_memo
1946
return self.compression_parent # Always None
1948
return self._parents
1950
return self.record_details
1952
raise IndexError('offset out of range')
1958
1481
class _GCGraphIndex(object):
1959
1482
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1961
1484
def __init__(self, graph_index, is_locked, parents=True,
1962
add_callback=None, track_external_parent_refs=False,
1963
inconsistency_fatal=True, track_new_keys=False):
1964
1486
"""Construct a _GCGraphIndex on a graph_index.
1966
1488
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1971
1493
:param add_callback: If not None, allow additions to the index and call
1972
1494
this callback with a list of added GraphIndex nodes:
1973
1495
[(node, value, node_refs), ...]
1974
:param track_external_parent_refs: As keys are added, keep track of the
1975
keys they reference, so that we can query get_missing_parents(),
1977
:param inconsistency_fatal: When asked to add records that are already
1978
present, and the details are inconsistent with the existing
1979
record, raise an exception instead of warning (and skipping the
1982
1497
self._add_callback = add_callback
1983
1498
self._graph_index = graph_index
1984
1499
self._parents = parents
1985
1500
self.has_graph = parents
1986
1501
self._is_locked = is_locked
1987
self._inconsistency_fatal = inconsistency_fatal
1988
# GroupCompress records tend to have the same 'group' start + offset
1989
# repeated over and over, this creates a surplus of ints
1990
self._int_cache = {}
1991
if track_external_parent_refs:
1992
self._key_dependencies = _KeyRefs(
1993
track_new_keys=track_new_keys)
1995
self._key_dependencies = None
1997
1503
def add_records(self, records, random_id=False):
1998
1504
"""Add multiple records to the index.
2169
1648
"""Convert an index value to position details."""
2170
1649
bits = node[2].split(' ')
2171
1650
# It would be nice not to read the entire gzip.
2172
# start and stop are put into _int_cache because they are very common.
2173
# They define the 'group' that an entry is in, and many groups can have
2174
# thousands of objects.
2175
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2176
# each, or about 7MB. Note that it might be even more when you consider
2177
# how PyInt is allocated in separate slabs. And you can't return a slab
2178
# to the OS if even 1 int on it is in use. Note though that Python uses
2179
# a LIFO when re-using PyInt slots, which might cause more
2181
1651
start = int(bits[0])
2182
start = self._int_cache.setdefault(start, start)
2183
1652
stop = int(bits[1])
2184
stop = self._int_cache.setdefault(stop, stop)
2185
1653
basis_end = int(bits[2])
2186
1654
delta_end = int(bits[3])
2187
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2189
return (node[0], start, stop, basis_end, delta_end)
2191
def scan_unvalidated_index(self, graph_index):
2192
"""Inform this _GCGraphIndex that there is an unvalidated index.
2194
This allows this _GCGraphIndex to keep track of any missing
2195
compression parents we may want to have filled in to make those
2196
indices valid. It also allows _GCGraphIndex to track any new keys.
2198
:param graph_index: A GraphIndex
2200
key_dependencies = self._key_dependencies
2201
if key_dependencies is None:
2203
for node in graph_index.iter_all_entries():
2204
# Add parent refs from graph_index (and discard parent refs
2205
# that the graph_index has).
2206
key_dependencies.add_references(node[1], node[3][0])
1655
return node[0], start, stop, basis_end, delta_end
2209
1658
from bzrlib._groupcompress_py import (