654
640
:return: A dict of {node_pos: node}
656
if len(nodes) > cache._max_cache:
657
trace.mutter('Requesting %s > %s nodes, not all will be cached',
658
len(nodes), cache._max_cache)
643
start_of_leaves = None
660
644
for node_pos, node in self._read_nodes(sorted(nodes)):
661
645
if node_pos == 0: # Special case
662
646
self._root_node = node
664
cache.add(node_pos, node)
648
if start_of_leaves is None:
649
start_of_leaves = self._row_offsets[-2]
650
if node_pos < start_of_leaves:
651
self._internal_node_cache.add(node_pos, node)
653
self._leaf_node_cache.add(node_pos, node)
665
654
found[node_pos] = node
657
def _compute_recommended_pages(self):
658
"""Convert transport's recommended_page_size into btree pages.
660
recommended_page_size is in bytes, we want to know how many _PAGE_SIZE
661
pages fit in that length.
663
recommended_read = self._transport.recommended_page_size()
664
recommended_pages = int(math.ceil(recommended_read /
666
return recommended_pages
668
def _compute_total_pages_in_index(self):
669
"""How many pages are in the index.
671
If we have read the header we will use the value stored there.
672
Otherwise it will be computed based on the length of the index.
674
if self._size is None:
675
raise AssertionError('_compute_total_pages_in_index should not be'
676
' called when self._size is None')
677
if self._root_node is not None:
678
# This is the number of pages as defined by the header
679
return self._row_offsets[-1]
680
# This is the number of pages as defined by the size of the index. They
681
# should be indentical.
682
total_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
685
def _expand_offsets(self, offsets):
686
"""Find extra pages to download.
688
The idea is that we always want to make big-enough requests (like 64kB
689
for http), so that we don't waste round trips. So given the entries
690
that we already have cached and the new pages being downloaded figure
691
out what other pages we might want to read.
693
See also doc/developers/btree_index_prefetch.txt for more details.
695
:param offsets: The offsets to be read
696
:return: A list of offsets to download
698
if 'index' in debug.debug_flags:
699
trace.mutter('expanding: %s\toffsets: %s', self._name, offsets)
701
if len(offsets) >= self._recommended_pages:
702
# Don't add more, we are already requesting more than enough
703
if 'index' in debug.debug_flags:
704
trace.mutter(' not expanding large request (%s >= %s)',
705
len(offsets), self._recommended_pages)
707
if self._size is None:
708
# Don't try anything, because we don't know where the file ends
709
if 'index' in debug.debug_flags:
710
trace.mutter(' not expanding without knowing index size')
712
total_pages = self._compute_total_pages_in_index()
713
cached_offsets = self._get_offsets_to_cached_pages()
714
# If reading recommended_pages would read the rest of the index, just
716
if total_pages - len(cached_offsets) <= self._recommended_pages:
717
# Read whatever is left
719
expanded = [x for x in xrange(total_pages)
720
if x not in cached_offsets]
722
expanded = range(total_pages)
723
if 'index' in debug.debug_flags:
724
trace.mutter(' reading all unread pages: %s', expanded)
727
if self._root_node is None:
728
# ATM on the first read of the root node of a large index, we don't
729
# bother pre-reading any other pages. This is because the
730
# likelyhood of actually reading interesting pages is very low.
731
# See doc/developers/btree_index_prefetch.txt for a discussion, and
732
# a possible implementation when we are guessing that the second
733
# layer index is small
734
final_offsets = offsets
736
tree_depth = len(self._row_lengths)
737
if len(cached_offsets) < tree_depth and len(offsets) == 1:
738
# We haven't read enough to justify expansion
739
# If we are only going to read the root node, and 1 leaf node,
740
# then it isn't worth expanding our request. Once we've read at
741
# least 2 nodes, then we are probably doing a search, and we
742
# start expanding our requests.
743
if 'index' in debug.debug_flags:
744
trace.mutter(' not expanding on first reads')
746
final_offsets = self._expand_to_neighbors(offsets, cached_offsets,
749
final_offsets = sorted(final_offsets)
750
if 'index' in debug.debug_flags:
751
trace.mutter('expanded: %s', final_offsets)
754
def _expand_to_neighbors(self, offsets, cached_offsets, total_pages):
755
"""Expand requests to neighbors until we have enough pages.
757
This is called from _expand_offsets after policy has determined that we
759
We only want to expand requests within a given layer. We cheat a little
760
bit and assume all requests will be in the same layer. This is true
761
given the current design, but if it changes this algorithm may perform
764
:param offsets: requested offsets
765
:param cached_offsets: offsets for pages we currently have cached
766
:return: A set() of offsets after expansion
768
final_offsets = set(offsets)
770
new_tips = set(final_offsets)
771
while len(final_offsets) < self._recommended_pages and new_tips:
775
first, end = self._find_layer_first_and_end(pos)
778
and previous not in cached_offsets
779
and previous not in final_offsets
780
and previous >= first):
781
next_tips.add(previous)
783
if (after < total_pages
784
and after not in cached_offsets
785
and after not in final_offsets
788
# This would keep us from going bigger than
789
# recommended_pages by only expanding the first offsets.
790
# However, if we are making a 'wide' request, it is
791
# reasonable to expand all points equally.
792
# if len(final_offsets) > recommended_pages:
794
final_offsets.update(next_tips)
798
def _find_layer_first_and_end(self, offset):
799
"""Find the start/stop nodes for the layer corresponding to offset.
801
:return: (first, end)
802
first is the first node in this layer
803
end is the first node of the next layer
806
for roffset in self._row_offsets:
813
def _get_offsets_to_cached_pages(self):
814
"""Determine what nodes we already have cached."""
815
cached_offsets = set(self._internal_node_cache.keys())
816
cached_offsets.update(self._leaf_node_cache.keys())
817
if self._root_node is not None:
818
cached_offsets.add(0)
819
return cached_offsets
821
def _get_root_node(self):
822
if self._root_node is None:
823
# We may not have a root node yet
824
self._get_internal_nodes([0])
825
return self._root_node
668
827
def _get_nodes(self, cache, node_indexes):