545
553
# time (self._block._content) is a little expensive.
546
554
self._block._ensure_content(self._last_byte)
548
def _check_rebuild_block(self):
556
def _check_rebuild_action(self):
549
557
"""Check to see if our block should be repacked."""
550
558
total_bytes_used = 0
551
559
last_byte_used = 0
552
560
for factory in self._factories:
553
561
total_bytes_used += factory._end - factory._start
554
last_byte_used = max(last_byte_used, factory._end)
555
# If we are using most of the bytes from the block, we have nothing
556
# else to check (currently more that 1/2)
562
if last_byte_used < factory._end:
563
last_byte_used = factory._end
564
# If we are using more than half of the bytes from the block, we have
565
# nothing else to check
557
566
if total_bytes_used * 2 >= self._block._content_length:
559
# Can we just strip off the trailing bytes? If we are going to be
560
# transmitting more than 50% of the front of the content, go ahead
567
return None, last_byte_used, total_bytes_used
568
# We are using less than 50% of the content. Is the content we are
569
# using at the beginning of the block? If so, we can just trim the
570
# tail, rather than rebuilding from scratch.
561
571
if total_bytes_used * 2 > last_byte_used:
562
self._trim_block(last_byte_used)
572
return 'trim', last_byte_used, total_bytes_used
565
574
# We are using a small amount of the data, and it isn't just packed
566
575
# nicely at the front, so rebuild the content.
573
582
# expanding many deltas into fulltexts, as well.
574
583
# If we build a cheap enough 'strip', then we could try a strip,
575
584
# if that expands the content, we then rebuild.
576
self._rebuild_block()
585
return 'rebuild', last_byte_used, total_bytes_used
587
def check_is_well_utilized(self):
588
"""Is the current block considered 'well utilized'?
590
This heuristic asks if the current block considers itself to be a fully
591
developed group, rather than just a loose collection of data.
593
if len(self._factories) == 1:
594
# A block of length 1 could be improved by combining with other
595
# groups - don't look deeper. Even larger than max size groups
596
# could compress well with adjacent versions of the same thing.
598
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
599
block_size = self._block._content_length
600
if total_bytes_used < block_size * self._max_cut_fraction:
601
# This block wants to trim itself small enough that we want to
602
# consider it under-utilized.
604
# TODO: This code is meant to be the twin of _insert_record_stream's
605
# 'start_new_block' logic. It would probably be better to factor
606
# out that logic into a shared location, so that it stays
608
# We currently assume a block is properly utilized whenever it is >75%
609
# of the size of a 'full' block. In normal operation, a block is
610
# considered full when it hits 4MB of same-file content. So any block
611
# >3MB is 'full enough'.
612
# The only time this isn't true is when a given block has large-object
613
# content. (a single file >4MB, etc.)
614
# Under these circumstances, we allow a block to grow to
615
# 2 x largest_content. Which means that if a given block had a large
616
# object, it may actually be under-utilized. However, given that this
617
# is 'pack-on-the-fly' it is probably reasonable to not repack large
618
# content blobs on-the-fly. Note that because we return False for all
619
# 1-item blobs, we will repack them; we may wish to reevaluate our
620
# treatment of large object blobs in the future.
621
if block_size >= self._full_enough_block_size:
623
# If a block is <3MB, it still may be considered 'full' if it contains
624
# mixed content. The current rule is 2MB of mixed content is considered
625
# full. So check to see if this block contains mixed content, and
626
# set the threshold appropriately.
628
for factory in self._factories:
629
prefix = factory.key[:-1]
630
if common_prefix is None:
631
common_prefix = prefix
632
elif prefix != common_prefix:
633
# Mixed content, check the size appropriately
634
if block_size >= self._full_enough_mixed_block_size:
637
# The content failed both the mixed check and the single-content check
638
# so obviously it is not fully utilized
639
# TODO: there is one other constraint that isn't being checked
640
# namely, that the entries in the block are in the appropriate
641
# order. For example, you could insert the entries in exactly
642
# reverse groupcompress order, and we would think that is ok.
643
# (all the right objects are in one group, and it is fully
644
# utilized, etc.) For now, we assume that case is rare,
645
# especially since we should always fetch in 'groupcompress'
649
def _check_rebuild_block(self):
650
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
654
self._trim_block(last_byte_used)
655
elif action == 'rebuild':
656
self._rebuild_block()
658
raise ValueError('unknown rebuild action: %r' % (action,))
578
660
def _wire_bytes(self):
579
661
"""Return a byte stream suitable for transmitting over the wire."""
975
1057
versioned_files.stream.close()
1060
class _BatchingBlockFetcher(object):
1061
"""Fetch group compress blocks in batches.
1063
:ivar total_bytes: int of expected number of bytes needed to fetch the
1064
currently pending batch.
1067
def __init__(self, gcvf, locations):
1069
self.locations = locations
1071
self.batch_memos = {}
1072
self.memos_to_get = []
1073
self.total_bytes = 0
1074
self.last_read_memo = None
1077
def add_key(self, key):
1078
"""Add another to key to fetch.
1080
:return: The estimated number of bytes needed to fetch the batch so
1083
self.keys.append(key)
1084
index_memo, _, _, _ = self.locations[key]
1085
read_memo = index_memo[0:3]
1086
# Three possibilities for this read_memo:
1087
# - it's already part of this batch; or
1088
# - it's not yet part of this batch, but is already cached; or
1089
# - it's not yet part of this batch and will need to be fetched.
1090
if read_memo in self.batch_memos:
1091
# This read memo is already in this batch.
1092
return self.total_bytes
1094
cached_block = self.gcvf._group_cache[read_memo]
1096
# This read memo is new to this batch, and the data isn't cached
1098
self.batch_memos[read_memo] = None
1099
self.memos_to_get.append(read_memo)
1100
byte_length = read_memo[2]
1101
self.total_bytes += byte_length
1103
# This read memo is new to this batch, but cached.
1104
# Keep a reference to the cached block in batch_memos because it's
1105
# certain that we'll use it when this batch is processed, but
1106
# there's a risk that it would fall out of _group_cache between now
1108
self.batch_memos[read_memo] = cached_block
1109
return self.total_bytes
1111
def _flush_manager(self):
1112
if self.manager is not None:
1113
for factory in self.manager.get_record_stream():
1116
self.last_read_memo = None
1118
def yield_factories(self, full_flush=False):
1119
"""Yield factories for keys added since the last yield. They will be
1120
returned in the order they were added via add_key.
1122
:param full_flush: by default, some results may not be returned in case
1123
they can be part of the next batch. If full_flush is True, then
1124
all results are returned.
1126
if self.manager is None and not self.keys:
1128
# Fetch all memos in this batch.
1129
blocks = self.gcvf._get_blocks(self.memos_to_get)
1130
# Turn blocks into factories and yield them.
1131
memos_to_get_stack = list(self.memos_to_get)
1132
memos_to_get_stack.reverse()
1133
for key in self.keys:
1134
index_memo, _, parents, _ = self.locations[key]
1135
read_memo = index_memo[:3]
1136
if self.last_read_memo != read_memo:
1137
# We are starting a new block. If we have a
1138
# manager, we have found everything that fits for
1139
# now, so yield records
1140
for factory in self._flush_manager():
1142
# Now start a new manager.
1143
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1144
# The next block from _get_blocks will be the block we
1146
block_read_memo, block = blocks.next()
1147
if block_read_memo != read_memo:
1148
raise AssertionError(
1149
"block_read_memo out of sync with read_memo"
1150
"(%r != %r)" % (block_read_memo, read_memo))
1151
self.batch_memos[read_memo] = block
1152
memos_to_get_stack.pop()
1154
block = self.batch_memos[read_memo]
1155
self.manager = _LazyGroupContentManager(block)
1156
self.last_read_memo = read_memo
1157
start, end = index_memo[3:5]
1158
self.manager.add_factory(key, parents, start, end)
1160
for factory in self._flush_manager():
1163
self.batch_memos.clear()
1164
del self.memos_to_get[:]
1165
self.total_bytes = 0
978
1168
class GroupCompressVersionedFiles(VersionedFiles):
979
1169
"""A group-compress based VersionedFiles implementation."""
981
def __init__(self, index, access, delta=True):
1171
def __init__(self, index, access, delta=True, _unadded_refs=None):
982
1172
"""Create a GroupCompressVersionedFiles object.
984
1174
:param index: The index object storing access and graph data.
985
1175
:param access: The access object storing raw data.
986
1176
:param delta: Whether to delta compress or just entropy compress.
1177
:param _unadded_refs: private parameter, don't use.
988
1179
self._index = index
989
1180
self._access = access
990
1181
self._delta = delta
991
self._unadded_refs = {}
1182
if _unadded_refs is None:
1184
self._unadded_refs = _unadded_refs
992
1185
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
993
1186
self._fallback_vfs = []
1188
def without_fallbacks(self):
1189
"""Return a clone of this object without any fallbacks configured."""
1190
return GroupCompressVersionedFiles(self._index, self._access,
1191
self._delta, _unadded_refs=dict(self._unadded_refs))
995
1193
def add_lines(self, key, parents, lines, parent_texts=None,
996
1194
left_matching_blocks=None, nostore_sha=None, random_id=False,
997
1195
check_content=True):
1131
1345
missing.difference_update(set(new_result))
1132
1346
return result, source_results
1134
def _get_block(self, index_memo):
1135
read_memo = index_memo[0:3]
1138
block = self._group_cache[read_memo]
1141
zdata = self._access.get_raw_records([read_memo]).next()
1142
# decompress - whole thing - this is not a bug, as it
1143
# permits caching. We might want to store the partially
1144
# decompresed group and decompress object, so that recent
1145
# texts are not penalised by big groups.
1146
block = GroupCompressBlock.from_bytes(zdata)
1147
self._group_cache[read_memo] = block
1149
# print len(zdata), len(plain)
1150
# parse - requires split_lines, better to have byte offsets
1151
# here (but not by much - we only split the region for the
1152
# recipe, and we often want to end up with lines anyway.
1348
def _get_blocks(self, read_memos):
1349
"""Get GroupCompressBlocks for the given read_memos.
1351
:returns: a series of (read_memo, block) pairs, in the order they were
1355
for read_memo in read_memos:
1357
block = self._group_cache[read_memo]
1361
cached[read_memo] = block
1363
not_cached_seen = set()
1364
for read_memo in read_memos:
1365
if read_memo in cached:
1366
# Don't fetch what we already have
1368
if read_memo in not_cached_seen:
1369
# Don't try to fetch the same data twice
1371
not_cached.append(read_memo)
1372
not_cached_seen.add(read_memo)
1373
raw_records = self._access.get_raw_records(not_cached)
1374
for read_memo in read_memos:
1376
yield read_memo, cached[read_memo]
1378
# Read the block, and cache it.
1379
zdata = raw_records.next()
1380
block = GroupCompressBlock.from_bytes(zdata)
1381
self._group_cache[read_memo] = block
1382
cached[read_memo] = block
1383
yield read_memo, block
1155
1385
def get_missing_compression_parent_keys(self):
1156
1386
"""Return the keys of missing compression parents.
1322
1552
unadded_keys, source_result)
1323
1553
for key in missing:
1324
1554
yield AbsentContentFactory(key)
1326
last_read_memo = None
1327
# TODO: This works fairly well at batching up existing groups into a
1328
# streamable format, and possibly allowing for taking one big
1329
# group and splitting it when it isn't fully utilized.
1330
# However, it doesn't allow us to find under-utilized groups and
1331
# combine them into a bigger group on the fly.
1332
# (Consider the issue with how chk_map inserts texts
1333
# one-at-a-time.) This could be done at insert_record_stream()
1334
# time, but it probably would decrease the number of
1335
# bytes-on-the-wire for fetch.
1555
# Batch up as many keys as we can until either:
1556
# - we encounter an unadded ref, or
1557
# - we run out of keys, or
1558
# - the total bytes to retrieve for this batch > BATCH_SIZE
1559
batcher = _BatchingBlockFetcher(self, locations)
1336
1560
for source, keys in source_keys:
1337
1561
if source is self:
1338
1562
for key in keys:
1339
1563
if key in self._unadded_refs:
1340
if manager is not None:
1341
for factory in manager.get_record_stream():
1343
last_read_memo = manager = None
1564
# Flush batch, then yield unadded ref from
1566
for factory in batcher.yield_factories(full_flush=True):
1344
1568
bytes, sha1 = self._compressor.extract(key)
1345
1569
parents = self._unadded_refs[key]
1346
1570
yield FulltextContentFactory(key, parents, sha1, bytes)
1348
index_memo, _, parents, (method, _) = locations[key]
1349
read_memo = index_memo[0:3]
1350
if last_read_memo != read_memo:
1351
# We are starting a new block. If we have a
1352
# manager, we have found everything that fits for
1353
# now, so yield records
1354
if manager is not None:
1355
for factory in manager.get_record_stream():
1357
# Now start a new manager
1358
block = self._get_block(index_memo)
1359
manager = _LazyGroupContentManager(block)
1360
last_read_memo = read_memo
1361
start, end = index_memo[3:5]
1362
manager.add_factory(key, parents, start, end)
1572
if batcher.add_key(key) > BATCH_SIZE:
1573
# Ok, this batch is big enough. Yield some results.
1574
for factory in batcher.yield_factories():
1364
if manager is not None:
1365
for factory in manager.get_record_stream():
1367
last_read_memo = manager = None
1577
for factory in batcher.yield_factories(full_flush=True):
1368
1579
for record in source.get_record_stream(keys, ordering,
1369
1580
include_delta_closure):
1371
if manager is not None:
1372
for factory in manager.get_record_stream():
1582
for factory in batcher.yield_factories(full_flush=True):
1375
1585
def get_sha1s(self, keys):
1376
1586
"""See VersionedFiles.get_sha1s()."""
1462
1673
if reuse_blocks:
1463
1674
# If the reuse_blocks flag is set, check to see if we can just
1464
1675
# copy a groupcompress block as-is.
1676
# We only check on the first record (groupcompress-block) not
1677
# on all of the (groupcompress-block-ref) entries.
1678
# The reuse_this_block flag is then kept for as long as
1679
if record.storage_kind == 'groupcompress-block':
1680
# Check to see if we really want to re-use this block
1681
insert_manager = record._manager
1682
reuse_this_block = insert_manager.check_is_well_utilized()
1684
reuse_this_block = False
1685
if reuse_this_block:
1686
# We still want to reuse this block
1465
1687
if record.storage_kind == 'groupcompress-block':
1466
1688
# Insert the raw block into the target repo
1467
1689
insert_manager = record._manager
1468
insert_manager._check_rebuild_block()
1469
1690
bytes = record._manager._block.to_bytes()
1470
1691
_, start, length = self._access.add_raw_records(
1471
1692
[(None, len(bytes))], bytes)[0]
1802
2037
This allows this _GCGraphIndex to keep track of any missing
1803
2038
compression parents we may want to have filled in to make those
2039
indices valid. It also allows _GCGraphIndex to track any new keys.
1806
2041
:param graph_index: A GraphIndex
1808
if self._key_dependencies is not None:
1809
# Add parent refs from graph_index (and discard parent refs that
1810
# the graph_index has).
1811
add_refs = self._key_dependencies.add_references
1812
for node in graph_index.iter_all_entries():
1813
add_refs(node[1], node[3][0])
2043
key_dependencies = self._key_dependencies
2044
if key_dependencies is None:
2046
for node in graph_index.iter_all_entries():
2047
# Add parent refs from graph_index (and discard parent refs
2048
# that the graph_index has).
2049
key_dependencies.add_references(node[1], node[3][0])
1817
2052
from bzrlib._groupcompress_py import (