164
162
# 'unconsumed_tail'
166
164
# Do we have enough bytes already?
167
if len(self._content) >= num_bytes:
165
if num_bytes is not None and len(self._content) >= num_bytes:
167
if num_bytes is None and self._z_content_decompressor is None:
168
# We must have already decompressed everything
169
170
# If we got this far, and don't have a decompressor, something is wrong
170
171
if self._z_content_decompressor is None:
171
172
raise AssertionError(
172
173
'No decompressor to decompress %d bytes' % num_bytes)
173
174
remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
if not remaining_decomp:
175
raise AssertionError('Nothing left to decompress')
176
needed_bytes = num_bytes - len(self._content)
177
# We always set max_size to 32kB over the minimum needed, so that
178
# zlib will give us as much as we really want.
179
# TODO: If this isn't good enough, we could make a loop here,
180
# that keeps expanding the request until we get enough
181
self._content += self._z_content_decompressor.decompress(
182
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
183
if len(self._content) < num_bytes:
184
raise AssertionError('%d bytes wanted, only %d available'
185
% (num_bytes, len(self._content)))
186
if not self._z_content_decompressor.unconsumed_tail:
187
# The stream is finished
188
self._z_content_decompressor = None
175
if num_bytes is None:
177
# We don't know how much is left, but we'll decompress it all
178
self._content += self._z_content_decompressor.decompress(
180
# Note: There's what I consider a bug in zlib.decompressobj
181
# If you pass back in the entire unconsumed_tail, only
182
# this time you don't pass a max-size, it doesn't
183
# change the unconsumed_tail back to None/''.
184
# However, we know we are done with the whole stream
185
self._z_content_decompressor = None
186
# XXX: Why is this the only place in this routine we set this?
187
self._content_length = len(self._content)
189
if not remaining_decomp:
190
raise AssertionError('Nothing left to decompress')
191
needed_bytes = num_bytes - len(self._content)
192
# We always set max_size to 32kB over the minimum needed, so that
193
# zlib will give us as much as we really want.
194
# TODO: If this isn't good enough, we could make a loop here,
195
# that keeps expanding the request until we get enough
196
self._content += self._z_content_decompressor.decompress(
197
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
198
if len(self._content) < num_bytes:
199
raise AssertionError('%d bytes wanted, only %d available'
200
% (num_bytes, len(self._content)))
201
if not self._z_content_decompressor.unconsumed_tail:
202
# The stream is finished
203
self._z_content_decompressor = None
190
205
def _parse_bytes(self, bytes, pos):
191
206
"""Read the various lengths from the header.
537
545
# time (self._block._content) is a little expensive.
538
546
self._block._ensure_content(self._last_byte)
540
def _check_rebuild_action(self):
548
def _check_rebuild_block(self):
541
549
"""Check to see if our block should be repacked."""
542
550
total_bytes_used = 0
543
551
last_byte_used = 0
544
552
for factory in self._factories:
545
553
total_bytes_used += factory._end - factory._start
546
if last_byte_used < factory._end:
547
last_byte_used = factory._end
548
# If we are using more than half of the bytes from the block, we have
549
# nothing else to check
554
last_byte_used = max(last_byte_used, factory._end)
555
# If we are using most of the bytes from the block, we have nothing
556
# else to check (currently more that 1/2)
550
557
if total_bytes_used * 2 >= self._block._content_length:
551
return None, last_byte_used, total_bytes_used
552
# We are using less than 50% of the content. Is the content we are
553
# using at the beginning of the block? If so, we can just trim the
554
# tail, rather than rebuilding from scratch.
559
# Can we just strip off the trailing bytes? If we are going to be
560
# transmitting more than 50% of the front of the content, go ahead
555
561
if total_bytes_used * 2 > last_byte_used:
556
return 'trim', last_byte_used, total_bytes_used
562
self._trim_block(last_byte_used)
558
565
# We are using a small amount of the data, and it isn't just packed
559
566
# nicely at the front, so rebuild the content.
566
573
# expanding many deltas into fulltexts, as well.
567
574
# If we build a cheap enough 'strip', then we could try a strip,
568
575
# if that expands the content, we then rebuild.
569
return 'rebuild', last_byte_used, total_bytes_used
571
def check_is_well_utilized(self):
572
"""Is the current block considered 'well utilized'?
574
This heuristic asks if the current block considers itself to be a fully
575
developed group, rather than just a loose collection of data.
577
if len(self._factories) == 1:
578
# A block of length 1 could be improved by combining with other
579
# groups - don't look deeper. Even larger than max size groups
580
# could compress well with adjacent versions of the same thing.
582
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
583
block_size = self._block._content_length
584
if total_bytes_used < block_size * self._max_cut_fraction:
585
# This block wants to trim itself small enough that we want to
586
# consider it under-utilized.
588
# TODO: This code is meant to be the twin of _insert_record_stream's
589
# 'start_new_block' logic. It would probably be better to factor
590
# out that logic into a shared location, so that it stays
592
# We currently assume a block is properly utilized whenever it is >75%
593
# of the size of a 'full' block. In normal operation, a block is
594
# considered full when it hits 4MB of same-file content. So any block
595
# >3MB is 'full enough'.
596
# The only time this isn't true is when a given block has large-object
597
# content. (a single file >4MB, etc.)
598
# Under these circumstances, we allow a block to grow to
599
# 2 x largest_content. Which means that if a given block had a large
600
# object, it may actually be under-utilized. However, given that this
601
# is 'pack-on-the-fly' it is probably reasonable to not repack large
602
# content blobs on-the-fly. Note that because we return False for all
603
# 1-item blobs, we will repack them; we may wish to reevaluate our
604
# treatment of large object blobs in the future.
605
if block_size >= self._full_enough_block_size:
607
# If a block is <3MB, it still may be considered 'full' if it contains
608
# mixed content. The current rule is 2MB of mixed content is considered
609
# full. So check to see if this block contains mixed content, and
610
# set the threshold appropriately.
612
for factory in self._factories:
613
prefix = factory.key[:-1]
614
if common_prefix is None:
615
common_prefix = prefix
616
elif prefix != common_prefix:
617
# Mixed content, check the size appropriately
618
if block_size >= self._full_enough_mixed_block_size:
621
# The content failed both the mixed check and the single-content check
622
# so obviously it is not fully utilized
623
# TODO: there is one other constraint that isn't being checked
624
# namely, that the entries in the block are in the appropriate
625
# order. For example, you could insert the entries in exactly
626
# reverse groupcompress order, and we would think that is ok.
627
# (all the right objects are in one group, and it is fully
628
# utilized, etc.) For now, we assume that case is rare,
629
# especially since we should always fetch in 'groupcompress'
633
def _check_rebuild_block(self):
634
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
638
self._trim_block(last_byte_used)
639
elif action == 'rebuild':
640
self._rebuild_block()
642
raise ValueError('unknown rebuild action: %r' % (action,))
576
self._rebuild_block()
644
578
def _wire_bytes(self):
645
579
"""Return a byte stream suitable for transmitting over the wire."""
1041
975
versioned_files.stream.close()
1044
class _BatchingBlockFetcher(object):
1045
"""Fetch group compress blocks in batches.
1047
:ivar total_bytes: int of expected number of bytes needed to fetch the
1048
currently pending batch.
1051
def __init__(self, gcvf, locations):
1053
self.locations = locations
1055
self.batch_memos = {}
1056
self.memos_to_get = []
1057
self.total_bytes = 0
1058
self.last_read_memo = None
1061
def add_key(self, key):
1062
"""Add another to key to fetch.
1064
:return: The estimated number of bytes needed to fetch the batch so
1067
self.keys.append(key)
1068
index_memo, _, _, _ = self.locations[key]
1069
read_memo = index_memo[0:3]
1070
# Three possibilities for this read_memo:
1071
# - it's already part of this batch; or
1072
# - it's not yet part of this batch, but is already cached; or
1073
# - it's not yet part of this batch and will need to be fetched.
1074
if read_memo in self.batch_memos:
1075
# This read memo is already in this batch.
1076
return self.total_bytes
1078
cached_block = self.gcvf._group_cache[read_memo]
1080
# This read memo is new to this batch, and the data isn't cached
1082
self.batch_memos[read_memo] = None
1083
self.memos_to_get.append(read_memo)
1084
byte_length = read_memo[2]
1085
self.total_bytes += byte_length
1087
# This read memo is new to this batch, but cached.
1088
# Keep a reference to the cached block in batch_memos because it's
1089
# certain that we'll use it when this batch is processed, but
1090
# there's a risk that it would fall out of _group_cache between now
1092
self.batch_memos[read_memo] = cached_block
1093
return self.total_bytes
1095
def _flush_manager(self):
1096
if self.manager is not None:
1097
for factory in self.manager.get_record_stream():
1100
self.last_read_memo = None
1102
def yield_factories(self, full_flush=False):
1103
"""Yield factories for keys added since the last yield. They will be
1104
returned in the order they were added via add_key.
1106
:param full_flush: by default, some results may not be returned in case
1107
they can be part of the next batch. If full_flush is True, then
1108
all results are returned.
1110
if self.manager is None and not self.keys:
1112
# Fetch all memos in this batch.
1113
blocks = self.gcvf._get_blocks(self.memos_to_get)
1114
# Turn blocks into factories and yield them.
1115
memos_to_get_stack = list(self.memos_to_get)
1116
memos_to_get_stack.reverse()
1117
for key in self.keys:
1118
index_memo, _, parents, _ = self.locations[key]
1119
read_memo = index_memo[:3]
1120
if self.last_read_memo != read_memo:
1121
# We are starting a new block. If we have a
1122
# manager, we have found everything that fits for
1123
# now, so yield records
1124
for factory in self._flush_manager():
1126
# Now start a new manager.
1127
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1128
# The next block from _get_blocks will be the block we
1130
block_read_memo, block = blocks.next()
1131
if block_read_memo != read_memo:
1132
raise AssertionError(
1133
"block_read_memo out of sync with read_memo"
1134
"(%r != %r)" % (block_read_memo, read_memo))
1135
self.batch_memos[read_memo] = block
1136
memos_to_get_stack.pop()
1138
block = self.batch_memos[read_memo]
1139
self.manager = _LazyGroupContentManager(block)
1140
self.last_read_memo = read_memo
1141
start, end = index_memo[3:5]
1142
self.manager.add_factory(key, parents, start, end)
1144
for factory in self._flush_manager():
1147
self.batch_memos.clear()
1148
del self.memos_to_get[:]
1149
self.total_bytes = 0
1152
978
class GroupCompressVersionedFiles(VersionedFiles):
1153
979
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
981
def __init__(self, index, access, delta=True):
1156
982
"""Create a GroupCompressVersionedFiles object.
1158
984
:param index: The index object storing access and graph data.
1159
985
:param access: The access object storing raw data.
1160
986
:param delta: Whether to delta compress or just entropy compress.
1161
:param _unadded_refs: private parameter, don't use.
1163
988
self._index = index
1164
989
self._access = access
1165
990
self._delta = delta
1166
if _unadded_refs is None:
1168
self._unadded_refs = _unadded_refs
991
self._unadded_refs = {}
1169
992
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
993
self._fallback_vfs = []
1172
def without_fallbacks(self):
1173
"""Return a clone of this object without any fallbacks configured."""
1174
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1177
995
def add_lines(self, key, parents, lines, parent_texts=None,
1178
996
left_matching_blocks=None, nostore_sha=None, random_id=False,
1179
997
check_content=True):
1335
1131
missing.difference_update(set(new_result))
1336
1132
return result, source_results
1338
def _get_blocks(self, read_memos):
1339
"""Get GroupCompressBlocks for the given read_memos.
1341
:returns: a series of (read_memo, block) pairs, in the order they were
1345
for read_memo in read_memos:
1347
block = self._group_cache[read_memo]
1351
cached[read_memo] = block
1353
not_cached_seen = set()
1354
for read_memo in read_memos:
1355
if read_memo in cached:
1356
# Don't fetch what we already have
1358
if read_memo in not_cached_seen:
1359
# Don't try to fetch the same data twice
1361
not_cached.append(read_memo)
1362
not_cached_seen.add(read_memo)
1363
raw_records = self._access.get_raw_records(not_cached)
1364
for read_memo in read_memos:
1366
yield read_memo, cached[read_memo]
1368
# Read the block, and cache it.
1369
zdata = raw_records.next()
1370
block = GroupCompressBlock.from_bytes(zdata)
1371
self._group_cache[read_memo] = block
1372
cached[read_memo] = block
1373
yield read_memo, block
1134
def _get_block(self, index_memo):
1135
read_memo = index_memo[0:3]
1138
block = self._group_cache[read_memo]
1141
zdata = self._access.get_raw_records([read_memo]).next()
1142
# decompress - whole thing - this is not a bug, as it
1143
# permits caching. We might want to store the partially
1144
# decompresed group and decompress object, so that recent
1145
# texts are not penalised by big groups.
1146
block = GroupCompressBlock.from_bytes(zdata)
1147
self._group_cache[read_memo] = block
1149
# print len(zdata), len(plain)
1150
# parse - requires split_lines, better to have byte offsets
1151
# here (but not by much - we only split the region for the
1152
# recipe, and we often want to end up with lines anyway.
1375
1155
def get_missing_compression_parent_keys(self):
1376
1156
"""Return the keys of missing compression parents.
1542
1322
unadded_keys, source_result)
1543
1323
for key in missing:
1544
1324
yield AbsentContentFactory(key)
1545
# Batch up as many keys as we can until either:
1546
# - we encounter an unadded ref, or
1547
# - we run out of keys, or
1548
# - the total bytes to retrieve for this batch > BATCH_SIZE
1549
batcher = _BatchingBlockFetcher(self, locations)
1326
last_read_memo = None
1327
# TODO: This works fairly well at batching up existing groups into a
1328
# streamable format, and possibly allowing for taking one big
1329
# group and splitting it when it isn't fully utilized.
1330
# However, it doesn't allow us to find under-utilized groups and
1331
# combine them into a bigger group on the fly.
1332
# (Consider the issue with how chk_map inserts texts
1333
# one-at-a-time.) This could be done at insert_record_stream()
1334
# time, but it probably would decrease the number of
1335
# bytes-on-the-wire for fetch.
1550
1336
for source, keys in source_keys:
1551
1337
if source is self:
1552
1338
for key in keys:
1553
1339
if key in self._unadded_refs:
1554
# Flush batch, then yield unadded ref from
1556
for factory in batcher.yield_factories(full_flush=True):
1340
if manager is not None:
1341
for factory in manager.get_record_stream():
1343
last_read_memo = manager = None
1558
1344
bytes, sha1 = self._compressor.extract(key)
1559
1345
parents = self._unadded_refs[key]
1560
1346
yield FulltextContentFactory(key, parents, sha1, bytes)
1562
if batcher.add_key(key) > BATCH_SIZE:
1563
# Ok, this batch is big enough. Yield some results.
1564
for factory in batcher.yield_factories():
1348
index_memo, _, parents, (method, _) = locations[key]
1349
read_memo = index_memo[0:3]
1350
if last_read_memo != read_memo:
1351
# We are starting a new block. If we have a
1352
# manager, we have found everything that fits for
1353
# now, so yield records
1354
if manager is not None:
1355
for factory in manager.get_record_stream():
1357
# Now start a new manager
1358
block = self._get_block(index_memo)
1359
manager = _LazyGroupContentManager(block)
1360
last_read_memo = read_memo
1361
start, end = index_memo[3:5]
1362
manager.add_factory(key, parents, start, end)
1567
for factory in batcher.yield_factories(full_flush=True):
1364
if manager is not None:
1365
for factory in manager.get_record_stream():
1367
last_read_memo = manager = None
1569
1368
for record in source.get_record_stream(keys, ordering,
1570
1369
include_delta_closure):
1572
for factory in batcher.yield_factories(full_flush=True):
1371
if manager is not None:
1372
for factory in manager.get_record_stream():
1575
1375
def get_sha1s(self, keys):
1576
1376
"""See VersionedFiles.get_sha1s()."""
1663
1462
if reuse_blocks:
1664
1463
# If the reuse_blocks flag is set, check to see if we can just
1665
1464
# copy a groupcompress block as-is.
1666
# We only check on the first record (groupcompress-block) not
1667
# on all of the (groupcompress-block-ref) entries.
1668
# The reuse_this_block flag is then kept for as long as
1669
if record.storage_kind == 'groupcompress-block':
1670
# Check to see if we really want to re-use this block
1671
insert_manager = record._manager
1672
reuse_this_block = insert_manager.check_is_well_utilized()
1674
reuse_this_block = False
1675
if reuse_this_block:
1676
# We still want to reuse this block
1677
1465
if record.storage_kind == 'groupcompress-block':
1678
1466
# Insert the raw block into the target repo
1679
1467
insert_manager = record._manager
1468
insert_manager._check_rebuild_block()
1680
1469
bytes = record._manager._block.to_bytes()
1681
1470
_, start, length = self._access.add_raw_records(
1682
1471
[(None, len(bytes))], bytes)[0]
1812
class _GCBuildDetails(object):
1813
"""A blob of data about the build details.
1815
This stores the minimal data, which then allows compatibility with the old
1816
api, without taking as much memory.
1819
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1820
'_delta_end', '_parents')
1823
compression_parent = None
1825
def __init__(self, parents, position_info):
1826
self._parents = parents
1827
(self._index, self._group_start, self._group_end, self._basis_end,
1828
self._delta_end) = position_info
1831
return '%s(%s, %s)' % (self.__class__.__name__,
1832
self.index_memo, self._parents)
1835
def index_memo(self):
1836
return (self._index, self._group_start, self._group_end,
1837
self._basis_end, self._delta_end)
1840
def record_details(self):
1841
return static_tuple.StaticTuple(self.method, None)
1843
def __getitem__(self, offset):
1844
"""Compatibility thunk to act like a tuple."""
1846
return self.index_memo
1848
return self.compression_parent # Always None
1850
return self._parents
1852
return self.record_details
1854
raise IndexError('offset out of range')
1860
1591
class _GCGraphIndex(object):
1861
1592
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1863
1594
def __init__(self, graph_index, is_locked, parents=True,
1864
1595
add_callback=None, track_external_parent_refs=False,
1865
inconsistency_fatal=True, track_new_keys=False):
1596
inconsistency_fatal=True):
1866
1597
"""Construct a _GCGraphIndex on a graph_index.
1868
1599
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2073
1790
"""Convert an index value to position details."""
2074
1791
bits = node[2].split(' ')
2075
1792
# It would be nice not to read the entire gzip.
2076
# start and stop are put into _int_cache because they are very common.
2077
# They define the 'group' that an entry is in, and many groups can have
2078
# thousands of objects.
2079
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2080
# each, or about 7MB. Note that it might be even more when you consider
2081
# how PyInt is allocated in separate slabs. And you can't return a slab
2082
# to the OS if even 1 int on it is in use. Note though that Python uses
2083
# a LIFO when re-using PyInt slots, which might cause more
2085
1793
start = int(bits[0])
2086
start = self._int_cache.setdefault(start, start)
2087
1794
stop = int(bits[1])
2088
stop = self._int_cache.setdefault(stop, stop)
2089
1795
basis_end = int(bits[2])
2090
1796
delta_end = int(bits[3])
2091
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2093
return (node[0], start, stop, basis_end, delta_end)
1797
return node[0], start, stop, basis_end, delta_end
2095
1799
def scan_unvalidated_index(self, graph_index):
2096
1800
"""Inform this _GCGraphIndex that there is an unvalidated index.
2098
1802
This allows this _GCGraphIndex to keep track of any missing
2099
1803
compression parents we may want to have filled in to make those
2100
indices valid. It also allows _GCGraphIndex to track any new keys.
2102
1806
:param graph_index: A GraphIndex
2104
key_dependencies = self._key_dependencies
2105
if key_dependencies is None:
2107
for node in graph_index.iter_all_entries():
2108
# Add parent refs from graph_index (and discard parent refs
2109
# that the graph_index has).
2110
key_dependencies.add_references(node[1], node[3][0])
1808
if self._key_dependencies is not None:
1809
# Add parent refs from graph_index (and discard parent refs that
1810
# the graph_index has).
1811
add_refs = self._key_dependencies.add_references
1812
for node in graph_index.iter_all_entries():
1813
add_refs(node[1], node[3][0])
2113
1817
from bzrlib._groupcompress_py import (