662
657
self._internal_node_cache.add(node_pos, node)
664
659
self._leaf_node_cache.add(node_pos, node)
665
cache.add(node_pos, node)
666
660
found[node_pos] = node
669
def _expand_nodes(self, node_indexes):
670
"""Find extra nodes to download.
663
def _compute_recommended_pages(self):
664
"""Convert transport's recommended_page_size into btree pages.
666
recommended_page_size is in bytes, we want to know how many _PAGE_SIZE
667
pages fit in that length.
669
recommended_read = self._transport.recommended_page_size()
670
recommended_pages = int(math.ceil(recommended_read /
672
return recommended_pages
674
def _compute_total_pages_in_index(self):
675
"""How many pages are in the index.
677
If we have read the header we will use the value stored there.
678
Otherwise it will be computed based on the length of the index.
680
if self._size is None:
681
raise AssertionError('_compute_total_pages_in_index should not be'
682
' called when self._size is None')
683
if self._root_node is not None:
684
# This is the number of pages as defined by the header
685
return self._row_offsets[-1]
686
# This is the number of pages as defined by the size of the index. They
687
# should be indentical.
688
total_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
691
def _expand_offsets(self, offsets):
692
"""Find extra pages to download.
672
694
The idea is that we always want to make big-enough requests (like 64kB
673
695
for http), so that we don't waste round trips. So given the entries
674
that we already have cached, and the new nodes being downloaded, figure
696
that we already have cached and the new pages being downloaded figure
675
697
out what other pages we might want to read.
677
:param node_indexes: The nodes that have been requested to read.
678
# :param recommended_size: The size of download we want, this assumes
679
# that readv() is accomplished in a single round trip (which is true
680
# for http and bzr+ssh, and sftp uses async to get close
682
# :param max_page: The last possible page to read, possibly max_size is
684
:return: A new list of nodes to download
699
See also doc/developers/btree_index_prefetch.txt for more details.
701
:param offsets: The offsets to be read
702
:return: A list of offsets to download
686
trace.note('request: %s\tnodes: %s', self._name[-14:], node_indexes)
687
# return node_indexes
704
if 'index' in debug.debug_flags:
705
trace.mutter('expanding: %s\toffsets: %s', self._name, offsets)
689
# TODO: This could be cached
690
recommended_read = self._transport.recommended_page_size() / 4
691
recommended_pages = int(math.ceil(recommended_read /
693
if len(node_indexes) >= recommended_pages:
707
if len(offsets) >= self._recommended_pages:
694
708
# Don't add more, we are already requesting more than enough
709
if 'index' in debug.debug_flags:
710
trace.mutter(' not expanding large request (%s >= %s)',
711
len(offsets), self._recommended_pages)
696
713
if self._size is None:
697
714
# Don't try anything, because we don't know where the file ends
699
# The idea here is to get nodes "next to" the ones we are already
701
max_page = int(math.ceil(self._size / float(_PAGE_SIZE)))
702
# XXX: Update the LRUCache interface to have a .keys() attribute
703
cached_nodes = set(self._internal_node_cache._cache.keys())
704
cached_nodes.update(self._leaf_node_cache._cache.keys())
705
if self._root_node is not None:
715
if 'index' in debug.debug_flags:
716
trace.mutter(' not expanding without knowing index size')
718
total_pages = self._compute_total_pages_in_index()
719
cached_offsets = self._get_offsets_to_cached_pages()
708
720
# If reading recommended_pages would read the rest of the index, just
710
if max_page - len(cached_nodes) <= recommended_pages:
711
# Just read the whole thing
712
# NOTE: this breaks a little bit with the distinction between index
713
# nodes and leaf nodes (as they are cached separately). It is
714
# still worthwhile to read it all, but we need to figure out what
715
# cache we should really put things in when we are done.
716
# However, we may not have read the index header yet to know where
717
# the internal nodes break from where the leaf nodes. We are sure
718
# to know *after* we've done the read.
719
# This also does the wrong thing if there are bloom pages.
721
# Further, this gives the wrong results because we have 2 caches to
724
return [x for x in xrange(max_page) if x not in cached_nodes]
725
return range(max_page)
727
needed_nodes = recommended_pages - len(node_indexes)
728
final_nodes = set(node_indexes)
729
if node_indexes == [0]:
730
# Special case when we are reading the root node, just read the
731
# first N pages along with the root node
732
final_nodes = node_indexes
733
# for index in xrange(1, max_page):
734
# if index not in final_nodes and index not in cached_nodes:
735
# final_nodes.add(index)
736
# if len(final_nodes) >= recommended_pages:
722
if total_pages - len(cached_offsets) <= self._recommended_pages:
723
# Read whatever is left
725
expanded = [x for x in xrange(total_pages)
726
if x not in cached_offsets]
728
expanded = range(total_pages)
729
if 'index' in debug.debug_flags:
730
trace.mutter(' reading all unread pages: %s', expanded)
733
if self._root_node is None:
734
# ATM on the first read of the root node of a large index, we don't
735
# bother pre-reading any other pages. This is because the
736
# likelyhood of actually reading interesting pages is very low.
737
# See doc/developers/btree_index_prefetch.txt for a discussion, and
738
# a possible implementation when we are guessing that the second
739
# layer index is small
740
final_offsets = offsets
739
while len(final_nodes) < recommended_pages:
740
for pos in sorted(final_nodes):
743
if previous not in cached_nodes:
744
final_nodes.add(previous)
747
if after not in cached_nodes:
748
final_nodes.add(after)
749
# if len(final_nodes) > recommended_pages:
752
final_nodes = sorted(final_nodes)
753
trace.note('\t\t\t\t%s', final_nodes)
742
tree_depth = len(self._row_lengths)
743
if len(cached_offsets) < tree_depth and len(offsets) == 1:
744
# We haven't read enough to justify expansion
745
# If we are only going to read the root node, and 1 leaf node,
746
# then it isn't worth expanding our request. Once we've read at
747
# least 2 nodes, then we are probably doing a search, and we
748
# start expanding our requests.
749
if 'index' in debug.debug_flags:
750
trace.mutter(' not expanding on first reads')
752
final_offsets = self._expand_to_neighbors(offsets, cached_offsets,
755
final_offsets = sorted(final_offsets)
756
if 'index' in debug.debug_flags:
757
trace.mutter('expanded: %s', final_offsets)
760
def _expand_to_neighbors(self, offsets, cached_offsets, total_pages):
761
"""Expand requests to neighbors until we have enough pages.
763
This is called from _expand_offsets after policy has determined that we
765
We only want to expand requests within a given layer. We cheat a little
766
bit and assume all requests will be in the same layer. This is true
767
given the current design, but if it changes this algorithm may perform
770
:param offsets: requested offsets
771
:param cached_offsets: offsets for pages we currently have cached
772
:return: A set() of offsets after expansion
774
final_offsets = set(offsets)
776
new_tips = set(final_offsets)
777
while len(final_offsets) < self._recommended_pages and new_tips:
781
first, end = self._find_layer_first_and_end(pos)
784
and previous not in cached_offsets
785
and previous not in final_offsets
786
and previous >= first):
787
next_tips.add(previous)
789
if (after < total_pages
790
and after not in cached_offsets
791
and after not in final_offsets
794
# This would keep us from going bigger than
795
# recommended_pages by only expanding the first offsets.
796
# However, if we are making a 'wide' request, it is
797
# reasonable to expand all points equally.
798
# if len(final_offsets) > recommended_pages:
800
final_offsets.update(next_tips)
804
def _find_layer_first_and_end(self, offset):
805
"""Find the start/stop nodes for the layer corresponding to offset.
807
:return: (first, end)
808
first is the first node in this layer
809
end is the first node of the next layer
812
for roffset in self._row_offsets:
819
def _get_offsets_to_cached_pages(self):
820
"""Determine what nodes we already have cached."""
821
cached_offsets = set(self._internal_node_cache.keys())
822
cached_offsets.update(self._leaf_node_cache.keys())
823
if self._root_node is not None:
824
cached_offsets.add(0)
825
return cached_offsets
827
def _get_root_node(self):
828
if self._root_node is None:
829
# We may not have a root node yet
830
self._get_internal_nodes([0])
831
return self._root_node
756
833
def _get_nodes(self, cache, node_indexes):