120
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
121
120
content. If None, consume everything
123
if self._content_length is None:
124
raise AssertionError('self._content_length should never be None')
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
125
129
if num_bytes is None:
126
130
num_bytes = self._content_length
127
131
elif (self._content_length is not None
164
163
# 'unconsumed_tail'
166
165
# Do we have enough bytes already?
167
if len(self._content) >= num_bytes:
166
if num_bytes is not None and len(self._content) >= num_bytes:
168
if num_bytes is None and self._z_content_decompressor is None:
169
# We must have already decompressed everything
169
171
# If we got this far, and don't have a decompressor, something is wrong
170
172
if self._z_content_decompressor is None:
171
173
raise AssertionError(
172
174
'No decompressor to decompress %d bytes' % num_bytes)
173
175
remaining_decomp = self._z_content_decompressor.unconsumed_tail
174
if not remaining_decomp:
175
raise AssertionError('Nothing left to decompress')
176
needed_bytes = num_bytes - len(self._content)
177
# We always set max_size to 32kB over the minimum needed, so that
178
# zlib will give us as much as we really want.
179
# TODO: If this isn't good enough, we could make a loop here,
180
# that keeps expanding the request until we get enough
181
self._content += self._z_content_decompressor.decompress(
182
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
183
if len(self._content) < num_bytes:
184
raise AssertionError('%d bytes wanted, only %d available'
185
% (num_bytes, len(self._content)))
186
if not self._z_content_decompressor.unconsumed_tail:
187
# The stream is finished
188
self._z_content_decompressor = None
176
if num_bytes is None:
178
# We don't know how much is left, but we'll decompress it all
179
self._content += self._z_content_decompressor.decompress(
181
# Note: There's what I consider a bug in zlib.decompressobj
182
# If you pass back in the entire unconsumed_tail, only
183
# this time you don't pass a max-size, it doesn't
184
# change the unconsumed_tail back to None/''.
185
# However, we know we are done with the whole stream
186
self._z_content_decompressor = None
187
# XXX: Why is this the only place in this routine we set this?
188
self._content_length = len(self._content)
190
if not remaining_decomp:
191
raise AssertionError('Nothing left to decompress')
192
needed_bytes = num_bytes - len(self._content)
193
# We always set max_size to 32kB over the minimum needed, so that
194
# zlib will give us as much as we really want.
195
# TODO: If this isn't good enough, we could make a loop here,
196
# that keeps expanding the request until we get enough
197
self._content += self._z_content_decompressor.decompress(
198
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
199
if len(self._content) < num_bytes:
200
raise AssertionError('%d bytes wanted, only %d available'
201
% (num_bytes, len(self._content)))
202
if not self._z_content_decompressor.unconsumed_tail:
203
# The stream is finished
204
self._z_content_decompressor = None
190
206
def _parse_bytes(self, bytes, pos):
191
207
"""Read the various lengths from the header.
537
546
# time (self._block._content) is a little expensive.
538
547
self._block._ensure_content(self._last_byte)
540
def _check_rebuild_action(self):
549
def _check_rebuild_block(self):
541
550
"""Check to see if our block should be repacked."""
542
551
total_bytes_used = 0
543
552
last_byte_used = 0
544
553
for factory in self._factories:
545
554
total_bytes_used += factory._end - factory._start
546
if last_byte_used < factory._end:
547
last_byte_used = factory._end
548
# If we are using more than half of the bytes from the block, we have
549
# nothing else to check
555
last_byte_used = max(last_byte_used, factory._end)
556
# If we are using most of the bytes from the block, we have nothing
557
# else to check (currently more that 1/2)
550
558
if total_bytes_used * 2 >= self._block._content_length:
551
return None, last_byte_used, total_bytes_used
552
# We are using less than 50% of the content. Is the content we are
553
# using at the beginning of the block? If so, we can just trim the
554
# tail, rather than rebuilding from scratch.
560
# Can we just strip off the trailing bytes? If we are going to be
561
# transmitting more than 50% of the front of the content, go ahead
555
562
if total_bytes_used * 2 > last_byte_used:
556
return 'trim', last_byte_used, total_bytes_used
563
self._trim_block(last_byte_used)
558
566
# We are using a small amount of the data, and it isn't just packed
559
567
# nicely at the front, so rebuild the content.
566
574
# expanding many deltas into fulltexts, as well.
567
575
# If we build a cheap enough 'strip', then we could try a strip,
568
576
# if that expands the content, we then rebuild.
569
return 'rebuild', last_byte_used, total_bytes_used
571
def check_is_well_utilized(self):
572
"""Is the current block considered 'well utilized'?
574
This heuristic asks if the current block considers itself to be a fully
575
developed group, rather than just a loose collection of data.
577
if len(self._factories) == 1:
578
# A block of length 1 could be improved by combining with other
579
# groups - don't look deeper. Even larger than max size groups
580
# could compress well with adjacent versions of the same thing.
582
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
583
block_size = self._block._content_length
584
if total_bytes_used < block_size * self._max_cut_fraction:
585
# This block wants to trim itself small enough that we want to
586
# consider it under-utilized.
588
# TODO: This code is meant to be the twin of _insert_record_stream's
589
# 'start_new_block' logic. It would probably be better to factor
590
# out that logic into a shared location, so that it stays
592
# We currently assume a block is properly utilized whenever it is >75%
593
# of the size of a 'full' block. In normal operation, a block is
594
# considered full when it hits 4MB of same-file content. So any block
595
# >3MB is 'full enough'.
596
# The only time this isn't true is when a given block has large-object
597
# content. (a single file >4MB, etc.)
598
# Under these circumstances, we allow a block to grow to
599
# 2 x largest_content. Which means that if a given block had a large
600
# object, it may actually be under-utilized. However, given that this
601
# is 'pack-on-the-fly' it is probably reasonable to not repack large
602
# content blobs on-the-fly. Note that because we return False for all
603
# 1-item blobs, we will repack them; we may wish to reevaluate our
604
# treatment of large object blobs in the future.
605
if block_size >= self._full_enough_block_size:
607
# If a block is <3MB, it still may be considered 'full' if it contains
608
# mixed content. The current rule is 2MB of mixed content is considered
609
# full. So check to see if this block contains mixed content, and
610
# set the threshold appropriately.
612
for factory in self._factories:
613
prefix = factory.key[:-1]
614
if common_prefix is None:
615
common_prefix = prefix
616
elif prefix != common_prefix:
617
# Mixed content, check the size appropriately
618
if block_size >= self._full_enough_mixed_block_size:
621
# The content failed both the mixed check and the single-content check
622
# so obviously it is not fully utilized
623
# TODO: there is one other constraint that isn't being checked
624
# namely, that the entries in the block are in the appropriate
625
# order. For example, you could insert the entries in exactly
626
# reverse groupcompress order, and we would think that is ok.
627
# (all the right objects are in one group, and it is fully
628
# utilized, etc.) For now, we assume that case is rare,
629
# especially since we should always fetch in 'groupcompress'
633
def _check_rebuild_block(self):
634
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
638
self._trim_block(last_byte_used)
639
elif action == 'rebuild':
640
self._rebuild_block()
642
raise ValueError('unknown rebuild action: %r' % (action,))
577
self._rebuild_block()
644
579
def _wire_bytes(self):
645
580
"""Return a byte stream suitable for transmitting over the wire."""
1152
1087
class GroupCompressVersionedFiles(VersionedFiles):
1153
1088
"""A group-compress based VersionedFiles implementation."""
1155
def __init__(self, index, access, delta=True, _unadded_refs=None):
1090
def __init__(self, index, access, delta=True):
1156
1091
"""Create a GroupCompressVersionedFiles object.
1158
1093
:param index: The index object storing access and graph data.
1159
1094
:param access: The access object storing raw data.
1160
1095
:param delta: Whether to delta compress or just entropy compress.
1161
:param _unadded_refs: private parameter, don't use.
1163
1097
self._index = index
1164
1098
self._access = access
1165
1099
self._delta = delta
1166
if _unadded_refs is None:
1168
self._unadded_refs = _unadded_refs
1100
self._unadded_refs = {}
1169
1101
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
1102
self._fallback_vfs = []
1172
def without_fallbacks(self):
1173
"""Return a clone of this object without any fallbacks configured."""
1174
return GroupCompressVersionedFiles(self._index, self._access,
1175
self._delta, _unadded_refs=dict(self._unadded_refs))
1177
1104
def add_lines(self, key, parents, lines, parent_texts=None,
1178
1105
left_matching_blocks=None, nostore_sha=None, random_id=False,
1179
1106
check_content=True):
1290
1211
def get_known_graph_ancestry(self, keys):
1291
1212
"""Get a KnownGraph instance with the ancestry of keys."""
1292
# Note that this is identical to
1293
# KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1295
parent_map, missing_keys = self._index.find_ancestry(keys)
1296
for fallback in self._fallback_vfs:
1297
if not missing_keys:
1299
(f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1301
parent_map.update(f_parent_map)
1302
missing_keys = f_missing_keys
1213
parent_map, missing_keys = self._index._graph_index.find_ancestry(keys,
1303
1215
kg = _mod_graph.KnownGraph(parent_map)
1663
1574
if reuse_blocks:
1664
1575
# If the reuse_blocks flag is set, check to see if we can just
1665
1576
# copy a groupcompress block as-is.
1666
# We only check on the first record (groupcompress-block) not
1667
# on all of the (groupcompress-block-ref) entries.
1668
# The reuse_this_block flag is then kept for as long as
1669
if record.storage_kind == 'groupcompress-block':
1670
# Check to see if we really want to re-use this block
1671
insert_manager = record._manager
1672
reuse_this_block = insert_manager.check_is_well_utilized()
1674
reuse_this_block = False
1675
if reuse_this_block:
1676
# We still want to reuse this block
1677
1577
if record.storage_kind == 'groupcompress-block':
1678
1578
# Insert the raw block into the target repo
1679
1579
insert_manager = record._manager
1580
insert_manager._check_rebuild_block()
1680
1581
bytes = record._manager._block.to_bytes()
1681
1582
_, start, length = self._access.add_raw_records(
1682
1583
[(None, len(bytes))], bytes)[0]
1687
1588
'groupcompress-block-ref'):
1688
1589
if insert_manager is None:
1689
1590
raise AssertionError('No insert_manager set')
1690
if insert_manager is not record._manager:
1691
raise AssertionError('insert_manager does not match'
1692
' the current record, we cannot be positive'
1693
' that the appropriate content was inserted.'
1695
1591
value = "%d %d %d %d" % (block_start, block_length,
1696
1592
record._start, record._end)
1697
1593
nodes = [(record.key, value, (record.parents,))]
1883
1770
if not random_id:
1884
1771
present_nodes = self._get_entries(keys)
1885
1772
for (index, key, value, node_refs) in present_nodes:
1886
# Sometimes these are passed as a list rather than a tuple
1887
node_refs = static_tuple.as_tuples(node_refs)
1888
passed = static_tuple.as_tuples(keys[key])
1889
if node_refs != passed[1]:
1890
details = '%s %s %s' % (key, (value, node_refs), passed)
1773
if node_refs != keys[key][1]:
1774
details = '%s %s %s' % (key, (value, node_refs), keys[key])
1891
1775
if self._inconsistency_fatal:
1892
1776
raise errors.KnitCorrupt(self, "inconsistent details"
1893
1777
" in add_records: %s" %
1907
1791
result.append((key, value))
1908
1792
records = result
1909
1793
key_dependencies = self._key_dependencies
1910
if key_dependencies is not None:
1912
for key, value, refs in records:
1914
key_dependencies.add_references(key, parents)
1916
for key, value, refs in records:
1917
new_keys.add_key(key)
1794
if key_dependencies is not None and self._parents:
1795
for key, value, refs in records:
1797
key_dependencies.add_references(key, parents)
1918
1798
self._add_callback(records)
1920
1800
def _check_read(self):
2026
1902
"""Convert an index value to position details."""
2027
1903
bits = node[2].split(' ')
2028
1904
# It would be nice not to read the entire gzip.
2029
# start and stop are put into _int_cache because they are very common.
2030
# They define the 'group' that an entry is in, and many groups can have
2031
# thousands of objects.
2032
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2033
# each, or about 7MB. Note that it might be even more when you consider
2034
# how PyInt is allocated in separate slabs. And you can't return a slab
2035
# to the OS if even 1 int on it is in use. Note though that Python uses
2036
# a LIFO when re-using PyInt slots, which probably causes more
2038
1905
start = int(bits[0])
2039
start = self._int_cache.setdefault(start, start)
2040
1906
stop = int(bits[1])
2041
stop = self._int_cache.setdefault(stop, stop)
2042
1907
basis_end = int(bits[2])
2043
1908
delta_end = int(bits[3])
2044
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2046
return (node[0], start, stop, basis_end, delta_end)
1909
return node[0], start, stop, basis_end, delta_end
2048
1911
def scan_unvalidated_index(self, graph_index):
2049
1912
"""Inform this _GCGraphIndex that there is an unvalidated index.
2051
1914
This allows this _GCGraphIndex to keep track of any missing
2052
1915
compression parents we may want to have filled in to make those
2053
indices valid. It also allows _GCGraphIndex to track any new keys.
2055
1918
:param graph_index: A GraphIndex
2057
key_dependencies = self._key_dependencies
2058
if key_dependencies is None:
2060
for node in graph_index.iter_all_entries():
2061
# Add parent refs from graph_index (and discard parent refs
2062
# that the graph_index has).
2063
key_dependencies.add_references(node[1], node[3][0])
1920
if self._key_dependencies is not None:
1921
# Add parent refs from graph_index (and discard parent refs that
1922
# the graph_index has).
1923
add_refs = self._key_dependencies.add_references
1924
for node in graph_index.iter_all_entries():
1925
add_refs(node[1], node[3][0])
2066
1929
from bzrlib._groupcompress_py import (