138
137
% (num_bytes, self._content_length))
139
138
# Expand the content if required
140
139
if self._content is None:
141
if self._content_chunks is not None:
142
self._content = ''.join(self._content_chunks)
143
self._content_chunks = None
144
if self._content is None:
145
# We join self._z_content_chunks here, because if we are
146
# decompressing, then it is *very* likely that we have a single
148
if self._z_content_chunks is None:
140
if self._z_content is None:
149
141
raise AssertionError('No content to decompress')
150
z_content = ''.join(self._z_content_chunks)
142
if self._z_content == '':
152
143
self._content = ''
153
144
elif self._compressor_name == 'lzma':
154
145
# We don't do partial lzma decomp yet
155
self._content = pylzma.decompress(z_content)
146
self._content = pylzma.decompress(self._z_content)
156
147
elif self._compressor_name == 'zlib':
157
148
# Start a zlib decompressor
158
if num_bytes * 4 > self._content_length * 3:
159
# If we are requesting more that 3/4ths of the content,
160
# just extract the whole thing in a single pass
161
num_bytes = self._content_length
162
self._content = zlib.decompress(z_content)
149
if num_bytes is None:
150
self._content = zlib.decompress(self._z_content)
164
152
self._z_content_decompressor = zlib.decompressobj()
165
153
# Seed the decompressor with the uncompressed bytes, so
166
154
# that the rest of the code is simplified
167
155
self._content = self._z_content_decompressor.decompress(
168
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
169
if not self._z_content_decompressor.unconsumed_tail:
170
self._z_content_decompressor = None
156
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
172
158
raise AssertionError('Unknown compressor: %r'
173
159
% self._compressor_name)
175
161
# 'unconsumed_tail'
177
163
# Do we have enough bytes already?
178
if len(self._content) >= num_bytes:
164
if num_bytes is not None and len(self._content) >= num_bytes:
166
if num_bytes is None and self._z_content_decompressor is None:
167
# We must have already decompressed everything
180
169
# If we got this far, and don't have a decompressor, something is wrong
181
170
if self._z_content_decompressor is None:
182
171
raise AssertionError(
183
172
'No decompressor to decompress %d bytes' % num_bytes)
184
173
remaining_decomp = self._z_content_decompressor.unconsumed_tail
185
if not remaining_decomp:
186
raise AssertionError('Nothing left to decompress')
187
needed_bytes = num_bytes - len(self._content)
188
# We always set max_size to 32kB over the minimum needed, so that
189
# zlib will give us as much as we really want.
190
# TODO: If this isn't good enough, we could make a loop here,
191
# that keeps expanding the request until we get enough
192
self._content += self._z_content_decompressor.decompress(
193
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
194
if len(self._content) < num_bytes:
195
raise AssertionError('%d bytes wanted, only %d available'
196
% (num_bytes, len(self._content)))
197
if not self._z_content_decompressor.unconsumed_tail:
198
# The stream is finished
199
self._z_content_decompressor = None
174
if num_bytes is None:
176
# We don't know how much is left, but we'll decompress it all
177
self._content += self._z_content_decompressor.decompress(
179
# Note: There's what I consider a bug in zlib.decompressobj
180
# If you pass back in the entire unconsumed_tail, only
181
# this time you don't pass a max-size, it doesn't
182
# change the unconsumed_tail back to None/''.
183
# However, we know we are done with the whole stream
184
self._z_content_decompressor = None
185
# XXX: Why is this the only place in this routine we set this?
186
self._content_length = len(self._content)
188
if not remaining_decomp:
189
raise AssertionError('Nothing left to decompress')
190
needed_bytes = num_bytes - len(self._content)
191
# We always set max_size to 32kB over the minimum needed, so that
192
# zlib will give us as much as we really want.
193
# TODO: If this isn't good enough, we could make a loop here,
194
# that keeps expanding the request until we get enough
195
self._content += self._z_content_decompressor.decompress(
196
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
197
if len(self._content) < num_bytes:
198
raise AssertionError('%d bytes wanted, only %d available'
199
% (num_bytes, len(self._content)))
200
if not self._z_content_decompressor.unconsumed_tail:
201
# The stream is finished
202
self._z_content_decompressor = None
201
204
def _parse_bytes(self, bytes, pos):
202
205
"""Read the various lengths from the header.
280
273
bytes = apply_delta_to_source(self._content, content_start, end)
283
def set_chunked_content(self, content_chunks, length):
284
"""Set the content of this block to the given chunks."""
285
# If we have lots of short lines, it is may be more efficient to join
286
# the content ahead of time. If the content is <10MiB, we don't really
287
# care about the extra memory consumption, so we can just pack it and
288
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
289
# mysql, which is below the noise margin
290
self._content_length = length
291
self._content_chunks = content_chunks
293
self._z_content_chunks = None
295
276
def set_content(self, content):
296
277
"""Set the content of this block."""
297
278
self._content_length = len(content)
298
279
self._content = content
299
self._z_content_chunks = None
301
def _create_z_content_using_lzma(self):
302
if self._content_chunks is not None:
303
self._content = ''.join(self._content_chunks)
304
self._content_chunks = None
305
if self._content is None:
306
raise AssertionError('Nothing to compress')
307
z_content = pylzma.compress(self._content)
308
self._z_content_chunks = (z_content,)
309
self._z_content_length = len(z_content)
311
def _create_z_content_from_chunks(self, chunks):
312
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
313
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
314
# (measured peak is maybe 30MB over the above...)
315
compressed_chunks = map(compressor.compress, chunks)
316
compressed_chunks.append(compressor.flush())
317
# Ignore empty chunks
318
self._z_content_chunks = [c for c in compressed_chunks if c]
319
self._z_content_length = sum(map(len, self._z_content_chunks))
321
def _create_z_content(self):
322
if self._z_content_chunks is not None:
280
self._z_content = None
283
"""Encode the information into a byte stream."""
284
compress = zlib.compress
325
self._create_z_content_using_lzma()
327
if self._content_chunks is not None:
328
chunks = self._content_chunks
330
chunks = (self._content,)
331
self._create_z_content_from_chunks(chunks)
334
"""Create the byte stream as a series of 'chunks'"""
335
self._create_z_content()
286
compress = pylzma.compress
287
if self._z_content is None:
288
if self._content is None:
289
raise AssertionError('Nothing to compress')
290
self._z_content = compress(self._content)
291
self._z_content_length = len(self._z_content)
337
293
header = self.GCB_LZ_HEADER
339
295
header = self.GCB_HEADER
340
chunks = ['%s%d\n%d\n'
341
% (header, self._z_content_length, self._content_length),
297
'%d\n%d\n' % (self._z_content_length, self._content_length),
343
chunks.extend(self._z_content_chunks)
344
total_len = sum(map(len, chunks))
345
return total_len, chunks
348
"""Encode the information into a byte stream."""
349
total_len, chunks = self.to_chunks()
350
300
return ''.join(chunks)
352
302
def _dump(self, include_text=False):
486
426
class _LazyGroupContentManager(object):
487
427
"""This manages a group of _LazyGroupCompressFactory objects."""
489
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
490
# current size, and still be considered
492
_full_block_size = 4*1024*1024
493
_full_mixed_block_size = 2*1024*1024
494
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
495
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
497
def __init__(self, block, get_compressor_settings=None):
429
def __init__(self, block):
498
430
self._block = block
499
431
# We need to preserve the ordering
500
432
self._factories = []
501
433
self._last_byte = 0
502
self._get_settings = get_compressor_settings
503
self._compressor_settings = None
505
def _get_compressor_settings(self):
506
if self._compressor_settings is not None:
507
return self._compressor_settings
509
if self._get_settings is not None:
510
settings = self._get_settings()
512
vf = GroupCompressVersionedFiles
513
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
514
self._compressor_settings = settings
515
return self._compressor_settings
517
435
def add_factory(self, key, parents, start, end):
518
436
if not self._factories:
593
503
# time (self._block._content) is a little expensive.
594
504
self._block._ensure_content(self._last_byte)
596
def _check_rebuild_action(self):
506
def _check_rebuild_block(self):
597
507
"""Check to see if our block should be repacked."""
598
508
total_bytes_used = 0
599
509
last_byte_used = 0
600
510
for factory in self._factories:
601
511
total_bytes_used += factory._end - factory._start
602
if last_byte_used < factory._end:
603
last_byte_used = factory._end
604
# If we are using more than half of the bytes from the block, we have
605
# nothing else to check
512
last_byte_used = max(last_byte_used, factory._end)
513
# If we are using most of the bytes from the block, we have nothing
514
# else to check (currently more that 1/2)
606
515
if total_bytes_used * 2 >= self._block._content_length:
607
return None, last_byte_used, total_bytes_used
608
# We are using less than 50% of the content. Is the content we are
609
# using at the beginning of the block? If so, we can just trim the
610
# tail, rather than rebuilding from scratch.
517
# Can we just strip off the trailing bytes? If we are going to be
518
# transmitting more than 50% of the front of the content, go ahead
611
519
if total_bytes_used * 2 > last_byte_used:
612
return 'trim', last_byte_used, total_bytes_used
520
self._trim_block(last_byte_used)
614
523
# We are using a small amount of the data, and it isn't just packed
615
524
# nicely at the front, so rebuild the content.
622
531
# expanding many deltas into fulltexts, as well.
623
532
# If we build a cheap enough 'strip', then we could try a strip,
624
533
# if that expands the content, we then rebuild.
625
return 'rebuild', last_byte_used, total_bytes_used
627
def check_is_well_utilized(self):
628
"""Is the current block considered 'well utilized'?
630
This heuristic asks if the current block considers itself to be a fully
631
developed group, rather than just a loose collection of data.
633
if len(self._factories) == 1:
634
# A block of length 1 could be improved by combining with other
635
# groups - don't look deeper. Even larger than max size groups
636
# could compress well with adjacent versions of the same thing.
638
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
639
block_size = self._block._content_length
640
if total_bytes_used < block_size * self._max_cut_fraction:
641
# This block wants to trim itself small enough that we want to
642
# consider it under-utilized.
644
# TODO: This code is meant to be the twin of _insert_record_stream's
645
# 'start_new_block' logic. It would probably be better to factor
646
# out that logic into a shared location, so that it stays
648
# We currently assume a block is properly utilized whenever it is >75%
649
# of the size of a 'full' block. In normal operation, a block is
650
# considered full when it hits 4MB of same-file content. So any block
651
# >3MB is 'full enough'.
652
# The only time this isn't true is when a given block has large-object
653
# content. (a single file >4MB, etc.)
654
# Under these circumstances, we allow a block to grow to
655
# 2 x largest_content. Which means that if a given block had a large
656
# object, it may actually be under-utilized. However, given that this
657
# is 'pack-on-the-fly' it is probably reasonable to not repack large
658
# content blobs on-the-fly. Note that because we return False for all
659
# 1-item blobs, we will repack them; we may wish to reevaluate our
660
# treatment of large object blobs in the future.
661
if block_size >= self._full_enough_block_size:
663
# If a block is <3MB, it still may be considered 'full' if it contains
664
# mixed content. The current rule is 2MB of mixed content is considered
665
# full. So check to see if this block contains mixed content, and
666
# set the threshold appropriately.
668
for factory in self._factories:
669
prefix = factory.key[:-1]
670
if common_prefix is None:
671
common_prefix = prefix
672
elif prefix != common_prefix:
673
# Mixed content, check the size appropriately
674
if block_size >= self._full_enough_mixed_block_size:
677
# The content failed both the mixed check and the single-content check
678
# so obviously it is not fully utilized
679
# TODO: there is one other constraint that isn't being checked
680
# namely, that the entries in the block are in the appropriate
681
# order. For example, you could insert the entries in exactly
682
# reverse groupcompress order, and we would think that is ok.
683
# (all the right objects are in one group, and it is fully
684
# utilized, etc.) For now, we assume that case is rare,
685
# especially since we should always fetch in 'groupcompress'
689
def _check_rebuild_block(self):
690
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
694
self._trim_block(last_byte_used)
695
elif action == 'rebuild':
696
self._rebuild_block()
698
raise ValueError('unknown rebuild action: %r' % (action,))
534
self._rebuild_block()
700
536
def _wire_bytes(self):
701
537
"""Return a byte stream suitable for transmitting over the wire."""
735
571
z_header_bytes = zlib.compress(header_bytes)
737
573
z_header_bytes_len = len(z_header_bytes)
738
block_bytes_len, block_chunks = self._block.to_chunks()
574
block_bytes = self._block.to_bytes()
739
575
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
741
577
lines.append(z_header_bytes)
742
lines.extend(block_chunks)
743
del z_header_bytes, block_chunks
744
# TODO: This is a point where we will double the memory consumption. To
745
# avoid this, we probably have to switch to a 'chunked' api
578
lines.append(block_bytes)
579
del z_header_bytes, block_bytes
746
580
return ''.join(lines)
749
583
def from_bytes(cls, bytes):
750
584
# TODO: This does extra string copying, probably better to do it a
751
# different way. At a minimum this creates 2 copies of the
753
586
(storage_kind, z_header_len, header_len,
754
587
block_len, rest) = bytes.split('\n', 4)
989
819
It contains code very similar to SequenceMatcher because of having a similar
990
820
task. However some key differences apply:
992
* there is no junk, we want a minimal edit not a human readable diff.
993
* we don't filter very common lines (because we don't know where a good
994
range will start, and after the first text we want to be emitting minmal
996
* we chain the left side, not the right side
997
* we incrementally update the adjacency matrix as new lines are provided.
998
* we look for matches in all of the left side, so the routine which does
999
the analagous task of find_longest_match does not need to filter on the
821
- there is no junk, we want a minimal edit not a human readable diff.
822
- we don't filter very common lines (because we don't know where a good
823
range will start, and after the first text we want to be emitting minmal
825
- we chain the left side, not the right side
826
- we incrementally update the adjacency matrix as new lines are provided.
827
- we look for matches in all of the left side, so the routine which does
828
the analagous task of find_longest_match does not need to filter on the
1003
def __init__(self, settings=None):
1004
super(PyrexGroupCompressor, self).__init__(settings)
1005
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1006
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
833
super(PyrexGroupCompressor, self).__init__()
834
self._delta_index = DeltaIndex()
1008
836
def _compress(self, key, bytes, max_delta_size, soft=False):
1009
837
"""see _CommonGroupCompressor._compress"""
1098
925
versioned_files.stream.close()
1101
class _BatchingBlockFetcher(object):
1102
"""Fetch group compress blocks in batches.
1104
:ivar total_bytes: int of expected number of bytes needed to fetch the
1105
currently pending batch.
1108
def __init__(self, gcvf, locations, get_compressor_settings=None):
1110
self.locations = locations
1112
self.batch_memos = {}
1113
self.memos_to_get = []
1114
self.total_bytes = 0
1115
self.last_read_memo = None
1117
self._get_compressor_settings = get_compressor_settings
1119
def add_key(self, key):
1120
"""Add another to key to fetch.
1122
:return: The estimated number of bytes needed to fetch the batch so
1125
self.keys.append(key)
1126
index_memo, _, _, _ = self.locations[key]
1127
read_memo = index_memo[0:3]
1128
# Three possibilities for this read_memo:
1129
# - it's already part of this batch; or
1130
# - it's not yet part of this batch, but is already cached; or
1131
# - it's not yet part of this batch and will need to be fetched.
1132
if read_memo in self.batch_memos:
1133
# This read memo is already in this batch.
1134
return self.total_bytes
1136
cached_block = self.gcvf._group_cache[read_memo]
1138
# This read memo is new to this batch, and the data isn't cached
1140
self.batch_memos[read_memo] = None
1141
self.memos_to_get.append(read_memo)
1142
byte_length = read_memo[2]
1143
self.total_bytes += byte_length
1145
# This read memo is new to this batch, but cached.
1146
# Keep a reference to the cached block in batch_memos because it's
1147
# certain that we'll use it when this batch is processed, but
1148
# there's a risk that it would fall out of _group_cache between now
1150
self.batch_memos[read_memo] = cached_block
1151
return self.total_bytes
1153
def _flush_manager(self):
1154
if self.manager is not None:
1155
for factory in self.manager.get_record_stream():
1158
self.last_read_memo = None
1160
def yield_factories(self, full_flush=False):
1161
"""Yield factories for keys added since the last yield. They will be
1162
returned in the order they were added via add_key.
1164
:param full_flush: by default, some results may not be returned in case
1165
they can be part of the next batch. If full_flush is True, then
1166
all results are returned.
1168
if self.manager is None and not self.keys:
1170
# Fetch all memos in this batch.
1171
blocks = self.gcvf._get_blocks(self.memos_to_get)
1172
# Turn blocks into factories and yield them.
1173
memos_to_get_stack = list(self.memos_to_get)
1174
memos_to_get_stack.reverse()
1175
for key in self.keys:
1176
index_memo, _, parents, _ = self.locations[key]
1177
read_memo = index_memo[:3]
1178
if self.last_read_memo != read_memo:
1179
# We are starting a new block. If we have a
1180
# manager, we have found everything that fits for
1181
# now, so yield records
1182
for factory in self._flush_manager():
1184
# Now start a new manager.
1185
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1186
# The next block from _get_blocks will be the block we
1188
block_read_memo, block = blocks.next()
1189
if block_read_memo != read_memo:
1190
raise AssertionError(
1191
"block_read_memo out of sync with read_memo"
1192
"(%r != %r)" % (block_read_memo, read_memo))
1193
self.batch_memos[read_memo] = block
1194
memos_to_get_stack.pop()
1196
block = self.batch_memos[read_memo]
1197
self.manager = _LazyGroupContentManager(block,
1198
get_compressor_settings=self._get_compressor_settings)
1199
self.last_read_memo = read_memo
1200
start, end = index_memo[3:5]
1201
self.manager.add_factory(key, parents, start, end)
1203
for factory in self._flush_manager():
1206
self.batch_memos.clear()
1207
del self.memos_to_get[:]
1208
self.total_bytes = 0
1211
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
928
class GroupCompressVersionedFiles(VersionedFiles):
1212
929
"""A group-compress based VersionedFiles implementation."""
1214
# This controls how the GroupCompress DeltaIndex works. Basically, we
1215
# compute hash pointers into the source blocks (so hash(text) => text).
1216
# However each of these references costs some memory in trade against a
1217
# more accurate match result. For very large files, they either are
1218
# pre-compressed and change in bulk whenever they change, or change in just
1219
# local blocks. Either way, 'improved resolution' is not very helpful,
1220
# versus running out of memory trying to track everything. The default max
1221
# gives 100% sampling of a 1MB file.
1222
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1223
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1224
_DEFAULT_MAX_BYTES_TO_INDEX}
1226
def __init__(self, index, access, delta=True, _unadded_refs=None,
931
def __init__(self, index, access, delta=True):
1228
932
"""Create a GroupCompressVersionedFiles object.
1230
934
:param index: The index object storing access and graph data.
1231
935
:param access: The access object storing raw data.
1232
936
:param delta: Whether to delta compress or just entropy compress.
1233
:param _unadded_refs: private parameter, don't use.
1234
:param _group_cache: private parameter, don't use.
1236
938
self._index = index
1237
939
self._access = access
1238
940
self._delta = delta
1239
if _unadded_refs is None:
1241
self._unadded_refs = _unadded_refs
1242
if _group_cache is None:
1243
_group_cache = LRUSizeCache(max_size=50*1024*1024)
1244
self._group_cache = _group_cache
1245
self._immediate_fallback_vfs = []
1246
self._max_bytes_to_index = None
1248
def without_fallbacks(self):
1249
"""Return a clone of this object without any fallbacks configured."""
1250
return GroupCompressVersionedFiles(self._index, self._access,
1251
self._delta, _unadded_refs=dict(self._unadded_refs),
1252
_group_cache=self._group_cache)
941
self._unadded_refs = {}
942
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
943
self._fallback_vfs = []
1254
945
def add_lines(self, key, parents, lines, parent_texts=None,
1255
946
left_matching_blocks=None, nostore_sha=None, random_id=False,
1301
992
nostore_sha=nostore_sha))[0]
1302
993
return sha1, length, None
1304
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1305
"""See VersionedFiles._add_text()."""
1306
self._index._check_write_ok()
1307
self._check_add(key, None, random_id, check_content=False)
1308
if text.__class__ is not str:
1309
raise errors.BzrBadParameterUnicode("text")
1311
# The caller might pass None if there is no graph data, but kndx
1312
# indexes can't directly store that, so we give them
1313
# an empty tuple instead.
1315
# double handling for now. Make it work until then.
1317
record = FulltextContentFactory(key, parents, None, text)
1318
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1319
nostore_sha=nostore_sha))[0]
1320
return sha1, length, None
1322
995
def add_fallback_versioned_files(self, a_versioned_files):
1323
996
"""Add a source of texts for texts not present in this knit.
1325
998
:param a_versioned_files: A VersionedFiles object.
1327
self._immediate_fallback_vfs.append(a_versioned_files)
1000
self._fallback_vfs.append(a_versioned_files)
1329
1002
def annotate(self, key):
1330
1003
"""See VersionedFiles.annotate."""
1331
ann = annotate.Annotator(self)
1332
return ann.annotate_flat(key)
1334
def get_annotator(self):
1335
return annotate.Annotator(self)
1337
def check(self, progress_bar=None, keys=None):
1005
parent_map = self.get_parent_map([key])
1007
raise errors.RevisionNotPresent(key, self)
1008
if parent_map[key] is not None:
1009
search = graph._make_breadth_first_searcher([key])
1013
present, ghosts = search.next_with_ghosts()
1014
except StopIteration:
1016
keys.update(present)
1017
parent_map = self.get_parent_map(keys)
1020
parent_map = {key:()}
1021
# So we used Graph(self) to load the parent_map, but now that we have
1022
# it, we can just query the parent map directly, so create a new Graph
1024
graph = _mod_graph.Graph(_mod_graph.DictParentsProvider(parent_map))
1025
head_cache = _mod_graph.FrozenHeadsCache(graph)
1027
reannotate = annotate.reannotate
1028
for record in self.get_record_stream(keys, 'topological', True):
1030
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1031
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
1032
parent_cache[key] = list(
1033
reannotate(parent_lines, lines, key, None, head_cache))
1034
return parent_cache[key]
1036
def check(self, progress_bar=None):
1338
1037
"""See VersionedFiles.check()."""
1341
for record in self.get_record_stream(keys, 'unordered', True):
1342
record.get_bytes_as('fulltext')
1344
return self.get_record_stream(keys, 'unordered', True)
1346
def clear_cache(self):
1347
"""See VersionedFiles.clear_cache()"""
1348
self._group_cache.clear()
1349
self._index._graph_index.clear_cache()
1350
self._index._int_cache.clear()
1039
for record in self.get_record_stream(keys, 'unordered', True):
1040
record.get_bytes_as('fulltext')
1352
1042
def _check_add(self, key, lines, random_id, check_content):
1353
1043
"""check that version_id and lines are safe to add."""
1396
1086
missing.difference_update(set(new_result))
1397
1087
return result, source_results
1399
def _get_blocks(self, read_memos):
1400
"""Get GroupCompressBlocks for the given read_memos.
1402
:returns: a series of (read_memo, block) pairs, in the order they were
1406
for read_memo in read_memos:
1408
block = self._group_cache[read_memo]
1412
cached[read_memo] = block
1414
not_cached_seen = set()
1415
for read_memo in read_memos:
1416
if read_memo in cached:
1417
# Don't fetch what we already have
1419
if read_memo in not_cached_seen:
1420
# Don't try to fetch the same data twice
1422
not_cached.append(read_memo)
1423
not_cached_seen.add(read_memo)
1424
raw_records = self._access.get_raw_records(not_cached)
1425
for read_memo in read_memos:
1427
yield read_memo, cached[read_memo]
1429
# Read the block, and cache it.
1430
zdata = raw_records.next()
1431
block = GroupCompressBlock.from_bytes(zdata)
1432
self._group_cache[read_memo] = block
1433
cached[read_memo] = block
1434
yield read_memo, block
1089
def _get_block(self, index_memo):
1090
read_memo = index_memo[0:3]
1093
block = self._group_cache[read_memo]
1096
zdata = self._access.get_raw_records([read_memo]).next()
1097
# decompress - whole thing - this is not a bug, as it
1098
# permits caching. We might want to store the partially
1099
# decompresed group and decompress object, so that recent
1100
# texts are not penalised by big groups.
1101
block = GroupCompressBlock.from_bytes(zdata)
1102
self._group_cache[read_memo] = block
1104
# print len(zdata), len(plain)
1105
# parse - requires split_lines, better to have byte offsets
1106
# here (but not by much - we only split the region for the
1107
# recipe, and we often want to end up with lines anyway.
1436
1110
def get_missing_compression_parent_keys(self):
1437
1111
"""Return the keys of missing compression parents.
1604
1277
unadded_keys, source_result)
1605
1278
for key in missing:
1606
1279
yield AbsentContentFactory(key)
1607
# Batch up as many keys as we can until either:
1608
# - we encounter an unadded ref, or
1609
# - we run out of keys, or
1610
# - the total bytes to retrieve for this batch > BATCH_SIZE
1611
batcher = _BatchingBlockFetcher(self, locations,
1612
get_compressor_settings=self._get_compressor_settings)
1281
last_read_memo = None
1282
# TODO: This works fairly well at batching up existing groups into a
1283
# streamable format, and possibly allowing for taking one big
1284
# group and splitting it when it isn't fully utilized.
1285
# However, it doesn't allow us to find under-utilized groups and
1286
# combine them into a bigger group on the fly.
1287
# (Consider the issue with how chk_map inserts texts
1288
# one-at-a-time.) This could be done at insert_record_stream()
1289
# time, but it probably would decrease the number of
1290
# bytes-on-the-wire for fetch.
1613
1291
for source, keys in source_keys:
1614
1292
if source is self:
1615
1293
for key in keys:
1616
1294
if key in self._unadded_refs:
1617
# Flush batch, then yield unadded ref from
1619
for factory in batcher.yield_factories(full_flush=True):
1295
if manager is not None:
1296
for factory in manager.get_record_stream():
1298
last_read_memo = manager = None
1621
1299
bytes, sha1 = self._compressor.extract(key)
1622
1300
parents = self._unadded_refs[key]
1623
1301
yield FulltextContentFactory(key, parents, sha1, bytes)
1625
if batcher.add_key(key) > BATCH_SIZE:
1626
# Ok, this batch is big enough. Yield some results.
1627
for factory in batcher.yield_factories():
1303
index_memo, _, parents, (method, _) = locations[key]
1304
read_memo = index_memo[0:3]
1305
if last_read_memo != read_memo:
1306
# We are starting a new block. If we have a
1307
# manager, we have found everything that fits for
1308
# now, so yield records
1309
if manager is not None:
1310
for factory in manager.get_record_stream():
1312
# Now start a new manager
1313
block = self._get_block(index_memo)
1314
manager = _LazyGroupContentManager(block)
1315
last_read_memo = read_memo
1316
start, end = index_memo[3:5]
1317
manager.add_factory(key, parents, start, end)
1630
for factory in batcher.yield_factories(full_flush=True):
1319
if manager is not None:
1320
for factory in manager.get_record_stream():
1322
last_read_memo = manager = None
1632
1323
for record in source.get_record_stream(keys, ordering,
1633
1324
include_delta_closure):
1635
for factory in batcher.yield_factories(full_flush=True):
1326
if manager is not None:
1327
for factory in manager.get_record_stream():
1638
1330
def get_sha1s(self, keys):
1639
1331
"""See VersionedFiles.get_sha1s()."""
1714
1382
# This will go up to fulltexts for gc to gc fetching, which isn't
1716
self._compressor = self._make_group_compressor()
1384
self._compressor = GroupCompressor()
1717
1385
self._unadded_refs = {}
1718
1386
keys_to_add = []
1720
bytes_len, chunks = self._compressor.flush().to_chunks()
1721
self._compressor = self._make_group_compressor()
1722
# Note: At this point we still have 1 copy of the fulltext (in
1723
# record and the var 'bytes'), and this generates 2 copies of
1724
# the compressed text (one for bytes, one in chunks)
1725
# TODO: Push 'chunks' down into the _access api, so that we don't
1726
# have to double compressed memory here
1727
# TODO: Figure out how to indicate that we would be happy to free
1728
# the fulltext content at this point. Note that sometimes we
1729
# will want it later (streaming CHK pages), but most of the
1730
# time we won't (everything else)
1731
bytes = ''.join(chunks)
1388
bytes = self._compressor.flush().to_bytes()
1733
1389
index, start, length = self._access.add_raw_records(
1734
1390
[(None, len(bytes))], bytes)[0]
1886
1524
'unordered', True)):
1887
1525
# XXX: todo - optimise to use less than full texts.
1888
1526
key = record.key
1890
pb.update('Walking content', key_idx, total)
1527
pb.update('Walking content', key_idx, total)
1891
1528
if record.storage_kind == 'absent':
1892
1529
raise errors.RevisionNotPresent(key, self)
1893
1530
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1894
1531
for line in lines:
1895
1532
yield line, key
1897
pb.update('Walking content', total, total)
1533
pb.update('Walking content', total, total)
1899
1535
def keys(self):
1900
1536
"""See VersionedFiles.keys."""
1901
1537
if 'evil' in debug.debug_flags:
1902
1538
trace.mutter_callsite(2, "keys scales with size of history")
1903
sources = [self._index] + self._immediate_fallback_vfs
1539
sources = [self._index] + self._fallback_vfs
1905
1541
for source in sources:
1906
1542
result.update(source.keys())
1910
class _GCBuildDetails(object):
1911
"""A blob of data about the build details.
1913
This stores the minimal data, which then allows compatibility with the old
1914
api, without taking as much memory.
1917
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1918
'_delta_end', '_parents')
1921
compression_parent = None
1923
def __init__(self, parents, position_info):
1924
self._parents = parents
1925
(self._index, self._group_start, self._group_end, self._basis_end,
1926
self._delta_end) = position_info
1929
return '%s(%s, %s)' % (self.__class__.__name__,
1930
self.index_memo, self._parents)
1933
def index_memo(self):
1934
return (self._index, self._group_start, self._group_end,
1935
self._basis_end, self._delta_end)
1938
def record_details(self):
1939
return static_tuple.StaticTuple(self.method, None)
1941
def __getitem__(self, offset):
1942
"""Compatibility thunk to act like a tuple."""
1944
return self.index_memo
1946
return self.compression_parent # Always None
1948
return self._parents
1950
return self.record_details
1952
raise IndexError('offset out of range')
1958
1546
class _GCGraphIndex(object):
1959
1547
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1961
1549
def __init__(self, graph_index, is_locked, parents=True,
1962
add_callback=None, track_external_parent_refs=False,
1963
inconsistency_fatal=True, track_new_keys=False):
1550
add_callback=None, track_external_parent_refs=False):
1964
1551
"""Construct a _GCGraphIndex on a graph_index.
1966
1553
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2169
1733
"""Convert an index value to position details."""
2170
1734
bits = node[2].split(' ')
2171
1735
# It would be nice not to read the entire gzip.
2172
# start and stop are put into _int_cache because they are very common.
2173
# They define the 'group' that an entry is in, and many groups can have
2174
# thousands of objects.
2175
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2176
# each, or about 7MB. Note that it might be even more when you consider
2177
# how PyInt is allocated in separate slabs. And you can't return a slab
2178
# to the OS if even 1 int on it is in use. Note though that Python uses
2179
# a LIFO when re-using PyInt slots, which might cause more
2181
1736
start = int(bits[0])
2182
start = self._int_cache.setdefault(start, start)
2183
1737
stop = int(bits[1])
2184
stop = self._int_cache.setdefault(stop, stop)
2185
1738
basis_end = int(bits[2])
2186
1739
delta_end = int(bits[3])
2187
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2189
return (node[0], start, stop, basis_end, delta_end)
1740
return node[0], start, stop, basis_end, delta_end
2191
1742
def scan_unvalidated_index(self, graph_index):
2192
1743
"""Inform this _GCGraphIndex that there is an unvalidated index.
2194
1745
This allows this _GCGraphIndex to keep track of any missing
2195
1746
compression parents we may want to have filled in to make those
2196
indices valid. It also allows _GCGraphIndex to track any new keys.
2198
1749
:param graph_index: A GraphIndex
2200
key_dependencies = self._key_dependencies
2201
if key_dependencies is None:
2203
for node in graph_index.iter_all_entries():
2204
# Add parent refs from graph_index (and discard parent refs
2205
# that the graph_index has).
2206
key_dependencies.add_references(node[1], node[3][0])
1751
if self._key_dependencies is not None:
1752
# Add parent refs from graph_index (and discard parent refs that
1753
# the graph_index has).
1754
add_refs = self._key_dependencies.add_references
1755
for node in graph_index.iter_all_entries():
1756
add_refs(node[1], node[3][0])
2209
1760
from bzrlib._groupcompress_py import (