131
137
% (num_bytes, self._content_length))
132
138
# Expand the content if required
133
139
if self._content is None:
134
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
136
self._content_chunks = None
137
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
140
if self._z_content is None:
142
141
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
142
if self._z_content == '':
145
143
self._content = ''
146
144
elif self._compressor_name == 'lzma':
147
145
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
146
self._content = pylzma.decompress(self._z_content)
149
147
elif self._compressor_name == 'zlib':
150
148
# Start a zlib decompressor
151
if num_bytes * 4 > self._content_length * 3:
152
# If we are requesting more that 3/4ths of the content,
153
# just extract the whole thing in a single pass
154
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
149
if num_bytes is None:
150
self._content = zlib.decompress(self._z_content)
157
152
self._z_content_decompressor = zlib.decompressobj()
158
153
# Seed the decompressor with the uncompressed bytes, so
159
154
# that the rest of the code is simplified
160
155
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
if not self._z_content_decompressor.unconsumed_tail:
163
self._z_content_decompressor = None
156
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
165
158
raise AssertionError('Unknown compressor: %r'
166
159
% self._compressor_name)
168
161
# 'unconsumed_tail'
170
163
# Do we have enough bytes already?
171
if len(self._content) >= num_bytes:
164
if num_bytes is not None and len(self._content) >= num_bytes:
166
if num_bytes is None and self._z_content_decompressor is None:
167
# We must have already decompressed everything
173
169
# If we got this far, and don't have a decompressor, something is wrong
174
170
if self._z_content_decompressor is None:
175
171
raise AssertionError(
176
172
'No decompressor to decompress %d bytes' % num_bytes)
177
173
remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
if not remaining_decomp:
179
raise AssertionError('Nothing left to decompress')
180
needed_bytes = num_bytes - len(self._content)
181
# We always set max_size to 32kB over the minimum needed, so that
182
# zlib will give us as much as we really want.
183
# TODO: If this isn't good enough, we could make a loop here,
184
# that keeps expanding the request until we get enough
185
self._content += self._z_content_decompressor.decompress(
186
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
if len(self._content) < num_bytes:
188
raise AssertionError('%d bytes wanted, only %d available'
189
% (num_bytes, len(self._content)))
190
if not self._z_content_decompressor.unconsumed_tail:
191
# The stream is finished
192
self._z_content_decompressor = None
174
if num_bytes is None:
176
# We don't know how much is left, but we'll decompress it all
177
self._content += self._z_content_decompressor.decompress(
179
# Note: There's what I consider a bug in zlib.decompressobj
180
# If you pass back in the entire unconsumed_tail, only
181
# this time you don't pass a max-size, it doesn't
182
# change the unconsumed_tail back to None/''.
183
# However, we know we are done with the whole stream
184
self._z_content_decompressor = None
185
# XXX: Why is this the only place in this routine we set this?
186
self._content_length = len(self._content)
188
if not remaining_decomp:
189
raise AssertionError('Nothing left to decompress')
190
needed_bytes = num_bytes - len(self._content)
191
# We always set max_size to 32kB over the minimum needed, so that
192
# zlib will give us as much as we really want.
193
# TODO: If this isn't good enough, we could make a loop here,
194
# that keeps expanding the request until we get enough
195
self._content += self._z_content_decompressor.decompress(
196
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
197
if len(self._content) < num_bytes:
198
raise AssertionError('%d bytes wanted, only %d available'
199
% (num_bytes, len(self._content)))
200
if not self._z_content_decompressor.unconsumed_tail:
201
# The stream is finished
202
self._z_content_decompressor = None
194
204
def _parse_bytes(self, bytes, pos):
195
205
"""Read the various lengths from the header.
273
273
bytes = apply_delta_to_source(self._content, content_start, end)
276
def set_chunked_content(self, content_chunks, length):
277
"""Set the content of this block to the given chunks."""
278
# If we have lots of short lines, it is may be more efficient to join
279
# the content ahead of time. If the content is <10MiB, we don't really
280
# care about the extra memory consumption, so we can just pack it and
281
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
282
# mysql, which is below the noise margin
283
self._content_length = length
284
self._content_chunks = content_chunks
286
self._z_content_chunks = None
288
276
def set_content(self, content):
289
277
"""Set the content of this block."""
290
278
self._content_length = len(content)
291
279
self._content = content
292
self._z_content_chunks = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
304
def _create_z_content_from_chunks(self, chunks):
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
309
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
314
def _create_z_content(self):
315
if self._z_content_chunks is not None:
280
self._z_content = None
283
"""Encode the information into a byte stream."""
284
compress = zlib.compress
318
self._create_z_content_using_lzma()
320
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
327
"""Create the byte stream as a series of 'chunks'"""
328
self._create_z_content()
286
compress = pylzma.compress
287
if self._z_content is None:
288
if self._content is None:
289
raise AssertionError('Nothing to compress')
290
self._z_content = compress(self._content)
291
self._z_content_length = len(self._z_content)
330
293
header = self.GCB_LZ_HEADER
332
295
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
297
'%d\n%d\n' % (self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
343
300
return ''.join(chunks)
345
302
def _dump(self, include_text=False):
561
511
# time (self._block._content) is a little expensive.
562
512
self._block._ensure_content(self._last_byte)
564
def _check_rebuild_action(self):
514
def _check_rebuild_block(self):
565
515
"""Check to see if our block should be repacked."""
566
516
total_bytes_used = 0
567
517
last_byte_used = 0
568
518
for factory in self._factories:
569
519
total_bytes_used += factory._end - factory._start
570
if last_byte_used < factory._end:
571
last_byte_used = factory._end
572
# If we are using more than half of the bytes from the block, we have
573
# nothing else to check
520
last_byte_used = max(last_byte_used, factory._end)
521
# If we are using most of the bytes from the block, we have nothing
522
# else to check (currently more that 1/2)
574
523
if total_bytes_used * 2 >= self._block._content_length:
575
return None, last_byte_used, total_bytes_used
576
# We are using less than 50% of the content. Is the content we are
577
# using at the beginning of the block? If so, we can just trim the
578
# tail, rather than rebuilding from scratch.
525
# Can we just strip off the trailing bytes? If we are going to be
526
# transmitting more than 50% of the front of the content, go ahead
579
527
if total_bytes_used * 2 > last_byte_used:
580
return 'trim', last_byte_used, total_bytes_used
528
self._trim_block(last_byte_used)
582
531
# We are using a small amount of the data, and it isn't just packed
583
532
# nicely at the front, so rebuild the content.
590
539
# expanding many deltas into fulltexts, as well.
591
540
# If we build a cheap enough 'strip', then we could try a strip,
592
541
# if that expands the content, we then rebuild.
593
return 'rebuild', last_byte_used, total_bytes_used
595
def check_is_well_utilized(self):
596
"""Is the current block considered 'well utilized'?
598
This heuristic asks if the current block considers itself to be a fully
599
developed group, rather than just a loose collection of data.
601
if len(self._factories) == 1:
602
# A block of length 1 could be improved by combining with other
603
# groups - don't look deeper. Even larger than max size groups
604
# could compress well with adjacent versions of the same thing.
606
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
607
block_size = self._block._content_length
608
if total_bytes_used < block_size * self._max_cut_fraction:
609
# This block wants to trim itself small enough that we want to
610
# consider it under-utilized.
612
# TODO: This code is meant to be the twin of _insert_record_stream's
613
# 'start_new_block' logic. It would probably be better to factor
614
# out that logic into a shared location, so that it stays
616
# We currently assume a block is properly utilized whenever it is >75%
617
# of the size of a 'full' block. In normal operation, a block is
618
# considered full when it hits 4MB of same-file content. So any block
619
# >3MB is 'full enough'.
620
# The only time this isn't true is when a given block has large-object
621
# content. (a single file >4MB, etc.)
622
# Under these circumstances, we allow a block to grow to
623
# 2 x largest_content. Which means that if a given block had a large
624
# object, it may actually be under-utilized. However, given that this
625
# is 'pack-on-the-fly' it is probably reasonable to not repack large
626
# content blobs on-the-fly. Note that because we return False for all
627
# 1-item blobs, we will repack them; we may wish to reevaluate our
628
# treatment of large object blobs in the future.
629
if block_size >= self._full_enough_block_size:
631
# If a block is <3MB, it still may be considered 'full' if it contains
632
# mixed content. The current rule is 2MB of mixed content is considered
633
# full. So check to see if this block contains mixed content, and
634
# set the threshold appropriately.
636
for factory in self._factories:
637
prefix = factory.key[:-1]
638
if common_prefix is None:
639
common_prefix = prefix
640
elif prefix != common_prefix:
641
# Mixed content, check the size appropriately
642
if block_size >= self._full_enough_mixed_block_size:
645
# The content failed both the mixed check and the single-content check
646
# so obviously it is not fully utilized
647
# TODO: there is one other constraint that isn't being checked
648
# namely, that the entries in the block are in the appropriate
649
# order. For example, you could insert the entries in exactly
650
# reverse groupcompress order, and we would think that is ok.
651
# (all the right objects are in one group, and it is fully
652
# utilized, etc.) For now, we assume that case is rare,
653
# especially since we should always fetch in 'groupcompress'
657
def _check_rebuild_block(self):
658
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
662
self._trim_block(last_byte_used)
663
elif action == 'rebuild':
664
self._rebuild_block()
666
raise ValueError('unknown rebuild action: %r' % (action,))
542
self._rebuild_block()
668
544
def _wire_bytes(self):
669
545
"""Return a byte stream suitable for transmitting over the wire."""
703
579
z_header_bytes = zlib.compress(header_bytes)
705
581
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
582
block_bytes = self._block.to_bytes()
707
583
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
585
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
586
lines.append(block_bytes)
587
del z_header_bytes, block_bytes
714
588
return ''.join(lines)
717
591
def from_bytes(cls, bytes):
718
592
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
594
(storage_kind, z_header_len, header_len,
722
595
block_len, rest) = bytes.split('\n', 4)
1060
941
versioned_files.stream.close()
1063
class _BatchingBlockFetcher(object):
1064
"""Fetch group compress blocks in batches.
1066
:ivar total_bytes: int of expected number of bytes needed to fetch the
1067
currently pending batch.
1070
def __init__(self, gcvf, locations):
1072
self.locations = locations
1074
self.batch_memos = {}
1075
self.memos_to_get = []
1076
self.total_bytes = 0
1077
self.last_read_memo = None
1080
def add_key(self, key):
1081
"""Add another to key to fetch.
1083
:return: The estimated number of bytes needed to fetch the batch so
1086
self.keys.append(key)
1087
index_memo, _, _, _ = self.locations[key]
1088
read_memo = index_memo[0:3]
1089
# Three possibilities for this read_memo:
1090
# - it's already part of this batch; or
1091
# - it's not yet part of this batch, but is already cached; or
1092
# - it's not yet part of this batch and will need to be fetched.
1093
if read_memo in self.batch_memos:
1094
# This read memo is already in this batch.
1095
return self.total_bytes
1097
cached_block = self.gcvf._group_cache[read_memo]
1099
# This read memo is new to this batch, and the data isn't cached
1101
self.batch_memos[read_memo] = None
1102
self.memos_to_get.append(read_memo)
1103
byte_length = read_memo[2]
1104
self.total_bytes += byte_length
1106
# This read memo is new to this batch, but cached.
1107
# Keep a reference to the cached block in batch_memos because it's
1108
# certain that we'll use it when this batch is processed, but
1109
# there's a risk that it would fall out of _group_cache between now
1111
self.batch_memos[read_memo] = cached_block
1112
return self.total_bytes
1114
def _flush_manager(self):
1115
if self.manager is not None:
1116
for factory in self.manager.get_record_stream():
1119
self.last_read_memo = None
1121
def yield_factories(self, full_flush=False):
1122
"""Yield factories for keys added since the last yield. They will be
1123
returned in the order they were added via add_key.
1125
:param full_flush: by default, some results may not be returned in case
1126
they can be part of the next batch. If full_flush is True, then
1127
all results are returned.
1129
if self.manager is None and not self.keys:
1131
# Fetch all memos in this batch.
1132
blocks = self.gcvf._get_blocks(self.memos_to_get)
1133
# Turn blocks into factories and yield them.
1134
memos_to_get_stack = list(self.memos_to_get)
1135
memos_to_get_stack.reverse()
1136
for key in self.keys:
1137
index_memo, _, parents, _ = self.locations[key]
1138
read_memo = index_memo[:3]
1139
if self.last_read_memo != read_memo:
1140
# We are starting a new block. If we have a
1141
# manager, we have found everything that fits for
1142
# now, so yield records
1143
for factory in self._flush_manager():
1145
# Now start a new manager.
1146
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1147
# The next block from _get_blocks will be the block we
1149
block_read_memo, block = blocks.next()
1150
if block_read_memo != read_memo:
1151
raise AssertionError(
1152
"block_read_memo out of sync with read_memo"
1153
"(%r != %r)" % (block_read_memo, read_memo))
1154
self.batch_memos[read_memo] = block
1155
memos_to_get_stack.pop()
1157
block = self.batch_memos[read_memo]
1158
self.manager = _LazyGroupContentManager(block)
1159
self.last_read_memo = read_memo
1160
start, end = index_memo[3:5]
1161
self.manager.add_factory(key, parents, start, end)
1163
for factory in self._flush_manager():
1166
self.batch_memos.clear()
1167
del self.memos_to_get[:]
1168
self.total_bytes = 0
1171
944
class GroupCompressVersionedFiles(VersionedFiles):
1172
945
"""A group-compress based VersionedFiles implementation."""
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
947
def __init__(self, index, access, delta=True):
1175
948
"""Create a GroupCompressVersionedFiles object.
1177
950
:param index: The index object storing access and graph data.
1178
951
:param access: The access object storing raw data.
1179
952
:param delta: Whether to delta compress or just entropy compress.
1180
:param _unadded_refs: private parameter, don't use.
1182
954
self._index = index
1183
955
self._access = access
1184
956
self._delta = delta
1185
if _unadded_refs is None:
1187
self._unadded_refs = _unadded_refs
957
self._unadded_refs = {}
1188
958
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
959
self._fallback_vfs = []
1191
def without_fallbacks(self):
1192
"""Return a clone of this object without any fallbacks configured."""
1193
return GroupCompressVersionedFiles(self._index, self._access,
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1196
961
def add_lines(self, key, parents, lines, parent_texts=None,
1197
962
left_matching_blocks=None, nostore_sha=None, random_id=False,
1198
963
check_content=True):
1243
1008
nostore_sha=nostore_sha))[0]
1244
1009
return sha1, length, None
1246
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1247
"""See VersionedFiles._add_text()."""
1248
self._index._check_write_ok()
1249
self._check_add(key, None, random_id, check_content=False)
1250
if text.__class__ is not str:
1251
raise errors.BzrBadParameterUnicode("text")
1253
# The caller might pass None if there is no graph data, but kndx
1254
# indexes can't directly store that, so we give them
1255
# an empty tuple instead.
1257
# double handling for now. Make it work until then.
1259
record = FulltextContentFactory(key, parents, None, text)
1260
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1261
nostore_sha=nostore_sha))[0]
1262
return sha1, length, None
1264
1011
def add_fallback_versioned_files(self, a_versioned_files):
1265
1012
"""Add a source of texts for texts not present in this knit.
1271
1018
def annotate(self, key):
1272
1019
"""See VersionedFiles.annotate."""
1273
ann = annotate.Annotator(self)
1274
return ann.annotate_flat(key)
1276
def get_annotator(self):
1277
return annotate.Annotator(self)
1279
def check(self, progress_bar=None, keys=None):
1021
parent_map = self.get_parent_map([key])
1023
raise errors.RevisionNotPresent(key, self)
1024
if parent_map[key] is not None:
1025
parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
1027
keys = parent_map.keys()
1030
parent_map = {key:()}
1031
# We used Graph(self) to load the parent_map, but now that we have it,
1032
# we can just query the parent map directly, so create a KnownGraph
1033
heads_provider = _mod_graph.KnownGraph(parent_map)
1035
reannotate = annotate.reannotate
1036
for record in self.get_record_stream(keys, 'topological', True):
1038
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1039
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
1040
parent_cache[key] = list(
1041
reannotate(parent_lines, lines, key, None, heads_provider))
1042
return parent_cache[key]
1044
def check(self, progress_bar=None):
1280
1045
"""See VersionedFiles.check()."""
1283
for record in self.get_record_stream(keys, 'unordered', True):
1284
record.get_bytes_as('fulltext')
1286
return self.get_record_stream(keys, 'unordered', True)
1288
def clear_cache(self):
1289
"""See VersionedFiles.clear_cache()"""
1290
self._group_cache.clear()
1291
self._index._graph_index.clear_cache()
1292
self._index._int_cache.clear()
1047
for record in self.get_record_stream(keys, 'unordered', True):
1048
record.get_bytes_as('fulltext')
1294
1050
def _check_add(self, key, lines, random_id, check_content):
1295
1051
"""check that version_id and lines are safe to add."""
1354
1094
missing.difference_update(set(new_result))
1355
1095
return result, source_results
1357
def _get_blocks(self, read_memos):
1358
"""Get GroupCompressBlocks for the given read_memos.
1360
:returns: a series of (read_memo, block) pairs, in the order they were
1364
for read_memo in read_memos:
1366
block = self._group_cache[read_memo]
1370
cached[read_memo] = block
1372
not_cached_seen = set()
1373
for read_memo in read_memos:
1374
if read_memo in cached:
1375
# Don't fetch what we already have
1377
if read_memo in not_cached_seen:
1378
# Don't try to fetch the same data twice
1380
not_cached.append(read_memo)
1381
not_cached_seen.add(read_memo)
1382
raw_records = self._access.get_raw_records(not_cached)
1383
for read_memo in read_memos:
1385
yield read_memo, cached[read_memo]
1387
# Read the block, and cache it.
1388
zdata = raw_records.next()
1389
block = GroupCompressBlock.from_bytes(zdata)
1390
self._group_cache[read_memo] = block
1391
cached[read_memo] = block
1392
yield read_memo, block
1097
def _get_block(self, index_memo):
1098
read_memo = index_memo[0:3]
1101
block = self._group_cache[read_memo]
1104
zdata = self._access.get_raw_records([read_memo]).next()
1105
# decompress - whole thing - this is not a bug, as it
1106
# permits caching. We might want to store the partially
1107
# decompresed group and decompress object, so that recent
1108
# texts are not penalised by big groups.
1109
block = GroupCompressBlock.from_bytes(zdata)
1110
self._group_cache[read_memo] = block
1112
# print len(zdata), len(plain)
1113
# parse - requires split_lines, better to have byte offsets
1114
# here (but not by much - we only split the region for the
1115
# recipe, and we often want to end up with lines anyway.
1394
1118
def get_missing_compression_parent_keys(self):
1395
1119
"""Return the keys of missing compression parents.
1561
1285
unadded_keys, source_result)
1562
1286
for key in missing:
1563
1287
yield AbsentContentFactory(key)
1564
# Batch up as many keys as we can until either:
1565
# - we encounter an unadded ref, or
1566
# - we run out of keys, or
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1289
last_read_memo = None
1290
# TODO: This works fairly well at batching up existing groups into a
1291
# streamable format, and possibly allowing for taking one big
1292
# group and splitting it when it isn't fully utilized.
1293
# However, it doesn't allow us to find under-utilized groups and
1294
# combine them into a bigger group on the fly.
1295
# (Consider the issue with how chk_map inserts texts
1296
# one-at-a-time.) This could be done at insert_record_stream()
1297
# time, but it probably would decrease the number of
1298
# bytes-on-the-wire for fetch.
1569
1299
for source, keys in source_keys:
1570
1300
if source is self:
1571
1301
for key in keys:
1572
1302
if key in self._unadded_refs:
1573
# Flush batch, then yield unadded ref from
1575
for factory in batcher.yield_factories(full_flush=True):
1303
if manager is not None:
1304
for factory in manager.get_record_stream():
1306
last_read_memo = manager = None
1577
1307
bytes, sha1 = self._compressor.extract(key)
1578
1308
parents = self._unadded_refs[key]
1579
1309
yield FulltextContentFactory(key, parents, sha1, bytes)
1581
if batcher.add_key(key) > BATCH_SIZE:
1582
# Ok, this batch is big enough. Yield some results.
1583
for factory in batcher.yield_factories():
1311
index_memo, _, parents, (method, _) = locations[key]
1312
read_memo = index_memo[0:3]
1313
if last_read_memo != read_memo:
1314
# We are starting a new block. If we have a
1315
# manager, we have found everything that fits for
1316
# now, so yield records
1317
if manager is not None:
1318
for factory in manager.get_record_stream():
1320
# Now start a new manager
1321
block = self._get_block(index_memo)
1322
manager = _LazyGroupContentManager(block)
1323
last_read_memo = read_memo
1324
start, end = index_memo[3:5]
1325
manager.add_factory(key, parents, start, end)
1586
for factory in batcher.yield_factories(full_flush=True):
1327
if manager is not None:
1328
for factory in manager.get_record_stream():
1330
last_read_memo = manager = None
1588
1331
for record in source.get_record_stream(keys, ordering,
1589
1332
include_delta_closure):
1591
for factory in batcher.yield_factories(full_flush=True):
1334
if manager is not None:
1335
for factory in manager.get_record_stream():
1594
1338
def get_sha1s(self, keys):
1595
1339
"""See VersionedFiles.get_sha1s()."""
1649
1393
self._unadded_refs = {}
1650
1394
keys_to_add = []
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
1396
bytes = self._compressor.flush().to_bytes()
1665
1397
index, start, length = self._access.add_raw_records(
1666
1398
[(None, len(bytes))], bytes)[0]
1693
1425
if reuse_blocks:
1694
1426
# If the reuse_blocks flag is set, check to see if we can just
1695
1427
# copy a groupcompress block as-is.
1696
# We only check on the first record (groupcompress-block) not
1697
# on all of the (groupcompress-block-ref) entries.
1698
# The reuse_this_block flag is then kept for as long as
1699
if record.storage_kind == 'groupcompress-block':
1700
# Check to see if we really want to re-use this block
1701
insert_manager = record._manager
1702
reuse_this_block = insert_manager.check_is_well_utilized()
1704
reuse_this_block = False
1705
if reuse_this_block:
1706
# We still want to reuse this block
1707
1428
if record.storage_kind == 'groupcompress-block':
1708
1429
# Insert the raw block into the target repo
1709
1430
insert_manager = record._manager
1431
insert_manager._check_rebuild_block()
1710
1432
bytes = record._manager._block.to_bytes()
1711
1433
_, start, length = self._access.add_raw_records(
1712
1434
[(None, len(bytes))], bytes)[0]
1842
class _GCBuildDetails(object):
1843
"""A blob of data about the build details.
1845
This stores the minimal data, which then allows compatibility with the old
1846
api, without taking as much memory.
1849
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
'_delta_end', '_parents')
1853
compression_parent = None
1855
def __init__(self, parents, position_info):
1856
self._parents = parents
1857
(self._index, self._group_start, self._group_end, self._basis_end,
1858
self._delta_end) = position_info
1861
return '%s(%s, %s)' % (self.__class__.__name__,
1862
self.index_memo, self._parents)
1865
def index_memo(self):
1866
return (self._index, self._group_start, self._group_end,
1867
self._basis_end, self._delta_end)
1870
def record_details(self):
1871
return static_tuple.StaticTuple(self.method, None)
1873
def __getitem__(self, offset):
1874
"""Compatibility thunk to act like a tuple."""
1876
return self.index_memo
1878
return self.compression_parent # Always None
1880
return self._parents
1882
return self.record_details
1884
raise IndexError('offset out of range')
1890
1554
class _GCGraphIndex(object):
1891
1555
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1893
1557
def __init__(self, graph_index, is_locked, parents=True,
1894
add_callback=None, track_external_parent_refs=False,
1895
inconsistency_fatal=True, track_new_keys=False):
1558
add_callback=None, track_external_parent_refs=False):
1896
1559
"""Construct a _GCGraphIndex on a graph_index.
1898
1561
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1906
1569
:param track_external_parent_refs: As keys are added, keep track of the
1907
1570
keys they reference, so that we can query get_missing_parents(),
1909
:param inconsistency_fatal: When asked to add records that are already
1910
present, and the details are inconsistent with the existing
1911
record, raise an exception instead of warning (and skipping the
1914
1573
self._add_callback = add_callback
1915
1574
self._graph_index = graph_index
1916
1575
self._parents = parents
1917
1576
self.has_graph = parents
1918
1577
self._is_locked = is_locked
1919
self._inconsistency_fatal = inconsistency_fatal
1920
# GroupCompress records tend to have the same 'group' start + offset
1921
# repeated over and over, this creates a surplus of ints
1922
self._int_cache = {}
1923
1578
if track_external_parent_refs:
1924
self._key_dependencies = knit._KeyRefs(
1925
track_new_keys=track_new_keys)
1579
self._key_dependencies = knit._KeyRefs()
1927
1581
self._key_dependencies = None
1961
1615
if not random_id:
1962
1616
present_nodes = self._get_entries(keys)
1963
1617
for (index, key, value, node_refs) in present_nodes:
1964
# Sometimes these are passed as a list rather than a tuple
1965
node_refs = static_tuple.as_tuples(node_refs)
1966
passed = static_tuple.as_tuples(keys[key])
1967
if node_refs != passed[1]:
1968
details = '%s %s %s' % (key, (value, node_refs), passed)
1969
if self._inconsistency_fatal:
1970
raise errors.KnitCorrupt(self, "inconsistent details"
1971
" in add_records: %s" %
1974
trace.warning("inconsistent details in skipped"
1975
" record: %s", details)
1618
if node_refs != keys[key][1]:
1619
raise errors.KnitCorrupt(self, "inconsistent details in add_records"
1620
": %s %s" % ((value, node_refs), keys[key]))
2103
1741
"""Convert an index value to position details."""
2104
1742
bits = node[2].split(' ')
2105
1743
# It would be nice not to read the entire gzip.
2106
# start and stop are put into _int_cache because they are very common.
2107
# They define the 'group' that an entry is in, and many groups can have
2108
# thousands of objects.
2109
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2110
# each, or about 7MB. Note that it might be even more when you consider
2111
# how PyInt is allocated in separate slabs. And you can't return a slab
2112
# to the OS if even 1 int on it is in use. Note though that Python uses
2113
# a LIFO when re-using PyInt slots, which might cause more
2115
1744
start = int(bits[0])
2116
start = self._int_cache.setdefault(start, start)
2117
1745
stop = int(bits[1])
2118
stop = self._int_cache.setdefault(stop, stop)
2119
1746
basis_end = int(bits[2])
2120
1747
delta_end = int(bits[3])
2121
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2123
return (node[0], start, stop, basis_end, delta_end)
1748
return node[0], start, stop, basis_end, delta_end
2125
1750
def scan_unvalidated_index(self, graph_index):
2126
1751
"""Inform this _GCGraphIndex that there is an unvalidated index.
2128
1753
This allows this _GCGraphIndex to keep track of any missing
2129
1754
compression parents we may want to have filled in to make those
2130
indices valid. It also allows _GCGraphIndex to track any new keys.
2132
1757
:param graph_index: A GraphIndex
2134
key_dependencies = self._key_dependencies
2135
if key_dependencies is None:
2137
for node in graph_index.iter_all_entries():
2138
# Add parent refs from graph_index (and discard parent refs
2139
# that the graph_index has).
2140
key_dependencies.add_references(node[1], node[3][0])
1759
if self._key_dependencies is not None:
1760
# Add parent refs from graph_index (and discard parent refs that
1761
# the graph_index has).
1762
add_refs = self._key_dependencies.add_references
1763
for node in graph_index.iter_all_entries():
1764
add_refs(node[1], node[3][0])
2143
1768
from bzrlib._groupcompress_py import (