~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/btree_index.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-10-28 20:20:57 UTC
  • mfrom: (3763.8.17 btree_buffer)
  • Revision ID: pqm@pqm.ubuntu.com-20081028202057-u3csau9zvf0hapya
(jam) BTreeIndex will now prefetch nearby pages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
351
351
                # First key triggers the first row
352
352
                rows.append(_LeafBuilderRow())
353
353
            key_count += 1
354
 
            # TODO: Flattening the node into a string key and a line should
355
 
            #       probably be put into a pyrex function. We can do a quick
356
 
            #       iter over all the entries to determine the final length,
357
 
            #       and then do a single malloc() rather than lots of
358
 
            #       intermediate mallocs as we build everything up.
359
 
            #       ATM 3 / 13s are spent flattening nodes (10s is compressing)
360
354
            string_key, line = _btree_serializer._flatten_node(node,
361
355
                                    self.reference_lists)
362
356
            self._add_key(string_key, line, rows)
611
605
        self._name = name
612
606
        self._size = size
613
607
        self._file = None
614
 
        self._page_size = transport.recommended_page_size()
 
608
        self._recommended_pages = self._compute_recommended_pages()
615
609
        self._root_node = None
616
610
        # Default max size is 100,000 leave values
617
611
        self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
632
626
    def __ne__(self, other):
633
627
        return not self.__eq__(other)
634
628
 
635
 
    def _get_root_node(self):
636
 
        if self._root_node is None:
637
 
            # We may not have a root node yet
638
 
            nodes = list(self._read_nodes([0]))
639
 
            if len(nodes):
640
 
                self._root_node = nodes[0][1]
641
 
        return self._root_node
642
 
 
643
 
    def _cache_nodes(self, nodes, cache):
 
629
    def _get_and_cache_nodes(self, nodes):
644
630
        """Read nodes and cache them in the lru.
645
631
 
646
632
        The nodes list supplied is sorted and then read from disk, each node
653
639
 
654
640
        :return: A dict of {node_pos: node}
655
641
        """
656
 
        if len(nodes) > cache._max_cache:
657
 
            trace.mutter('Requesting %s > %s nodes, not all will be cached',
658
 
                         len(nodes), cache._max_cache)
659
642
        found = {}
 
643
        start_of_leaves = None
660
644
        for node_pos, node in self._read_nodes(sorted(nodes)):
661
645
            if node_pos == 0: # Special case
662
646
                self._root_node = node
663
647
            else:
664
 
                cache.add(node_pos, node)
 
648
                if start_of_leaves is None:
 
649
                    start_of_leaves = self._row_offsets[-2]
 
650
                if node_pos < start_of_leaves:
 
651
                    self._internal_node_cache.add(node_pos, node)
 
652
                else:
 
653
                    self._leaf_node_cache.add(node_pos, node)
665
654
            found[node_pos] = node
666
655
        return found
667
656
 
 
657
    def _compute_recommended_pages(self):
 
658
        """Convert transport's recommended_page_size into btree pages.
 
659
 
 
660
        recommended_page_size is in bytes, we want to know how many _PAGE_SIZE
 
661
        pages fit in that length.
 
662
        """
 
663
        recommended_read = self._transport.recommended_page_size()
 
664
        recommended_pages = int(math.ceil(recommended_read /
 
665
                                          float(_PAGE_SIZE)))
 
666
        return recommended_pages
 
667
 
 
668
    def _compute_total_pages_in_index(self):
 
669
        """How many pages are in the index.
 
670
 
 
671
        If we have read the header we will use the value stored there.
 
672
        Otherwise it will be computed based on the length of the index.
 
673
        """
 
674
        if self._size is None:
 
675
            raise AssertionError('_compute_total_pages_in_index should not be'
 
676
                                 ' called when self._size is None')
 
677
        if self._root_node is not None:
 
678
            # This is the number of pages as defined by the header
 
679
            return self._row_offsets[-1]
 
680
        # This is the number of pages as defined by the size of the index. They
 
681
        # should be indentical.
 
682
        total_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
 
683
        return total_pages
 
684
 
 
685
    def _expand_offsets(self, offsets):
 
686
        """Find extra pages to download.
 
687
 
 
688
        The idea is that we always want to make big-enough requests (like 64kB
 
689
        for http), so that we don't waste round trips. So given the entries
 
690
        that we already have cached and the new pages being downloaded figure
 
691
        out what other pages we might want to read.
 
692
 
 
693
        See also doc/developers/btree_index_prefetch.txt for more details.
 
694
 
 
695
        :param offsets: The offsets to be read
 
696
        :return: A list of offsets to download
 
697
        """
 
698
        if 'index' in debug.debug_flags:
 
699
            trace.mutter('expanding: %s\toffsets: %s', self._name, offsets)
 
700
 
 
701
        if len(offsets) >= self._recommended_pages:
 
702
            # Don't add more, we are already requesting more than enough
 
703
            if 'index' in debug.debug_flags:
 
704
                trace.mutter('  not expanding large request (%s >= %s)',
 
705
                             len(offsets), self._recommended_pages)
 
706
            return offsets
 
707
        if self._size is None:
 
708
            # Don't try anything, because we don't know where the file ends
 
709
            if 'index' in debug.debug_flags:
 
710
                trace.mutter('  not expanding without knowing index size')
 
711
            return offsets
 
712
        total_pages = self._compute_total_pages_in_index()
 
713
        cached_offsets = self._get_offsets_to_cached_pages()
 
714
        # If reading recommended_pages would read the rest of the index, just
 
715
        # do so.
 
716
        if total_pages - len(cached_offsets) <= self._recommended_pages:
 
717
            # Read whatever is left
 
718
            if cached_offsets:
 
719
                expanded = [x for x in xrange(total_pages)
 
720
                               if x not in cached_offsets]
 
721
            else:
 
722
                expanded = range(total_pages)
 
723
            if 'index' in debug.debug_flags:
 
724
                trace.mutter('  reading all unread pages: %s', expanded)
 
725
            return expanded
 
726
 
 
727
        if self._root_node is None:
 
728
            # ATM on the first read of the root node of a large index, we don't
 
729
            # bother pre-reading any other pages. This is because the
 
730
            # likelyhood of actually reading interesting pages is very low.
 
731
            # See doc/developers/btree_index_prefetch.txt for a discussion, and
 
732
            # a possible implementation when we are guessing that the second
 
733
            # layer index is small
 
734
            final_offsets = offsets
 
735
        else:
 
736
            tree_depth = len(self._row_lengths)
 
737
            if len(cached_offsets) < tree_depth and len(offsets) == 1:
 
738
                # We haven't read enough to justify expansion
 
739
                # If we are only going to read the root node, and 1 leaf node,
 
740
                # then it isn't worth expanding our request. Once we've read at
 
741
                # least 2 nodes, then we are probably doing a search, and we
 
742
                # start expanding our requests.
 
743
                if 'index' in debug.debug_flags:
 
744
                    trace.mutter('  not expanding on first reads')
 
745
                return offsets
 
746
            final_offsets = self._expand_to_neighbors(offsets, cached_offsets,
 
747
                                                      total_pages)
 
748
 
 
749
        final_offsets = sorted(final_offsets)
 
750
        if 'index' in debug.debug_flags:
 
751
            trace.mutter('expanded:  %s', final_offsets)
 
752
        return final_offsets
 
753
 
 
754
    def _expand_to_neighbors(self, offsets, cached_offsets, total_pages):
 
755
        """Expand requests to neighbors until we have enough pages.
 
756
 
 
757
        This is called from _expand_offsets after policy has determined that we
 
758
        want to expand.
 
759
        We only want to expand requests within a given layer. We cheat a little
 
760
        bit and assume all requests will be in the same layer. This is true
 
761
        given the current design, but if it changes this algorithm may perform
 
762
        oddly.
 
763
 
 
764
        :param offsets: requested offsets
 
765
        :param cached_offsets: offsets for pages we currently have cached
 
766
        :return: A set() of offsets after expansion
 
767
        """
 
768
        final_offsets = set(offsets)
 
769
        first = end = None
 
770
        new_tips = set(final_offsets)
 
771
        while len(final_offsets) < self._recommended_pages and new_tips:
 
772
            next_tips = set()
 
773
            for pos in new_tips:
 
774
                if first is None:
 
775
                    first, end = self._find_layer_first_and_end(pos)
 
776
                previous = pos - 1
 
777
                if (previous > 0
 
778
                    and previous not in cached_offsets
 
779
                    and previous not in final_offsets
 
780
                    and previous >= first):
 
781
                    next_tips.add(previous)
 
782
                after = pos + 1
 
783
                if (after < total_pages
 
784
                    and after not in cached_offsets
 
785
                    and after not in final_offsets
 
786
                    and after < end):
 
787
                    next_tips.add(after)
 
788
                # This would keep us from going bigger than
 
789
                # recommended_pages by only expanding the first offsets.
 
790
                # However, if we are making a 'wide' request, it is
 
791
                # reasonable to expand all points equally.
 
792
                # if len(final_offsets) > recommended_pages:
 
793
                #     break
 
794
            final_offsets.update(next_tips)
 
795
            new_tips = next_tips
 
796
        return final_offsets
 
797
 
 
798
    def _find_layer_first_and_end(self, offset):
 
799
        """Find the start/stop nodes for the layer corresponding to offset.
 
800
 
 
801
        :return: (first, end)
 
802
            first is the first node in this layer
 
803
            end is the first node of the next layer
 
804
        """
 
805
        first = end = 0
 
806
        for roffset in self._row_offsets:
 
807
            first = end
 
808
            end = roffset
 
809
            if offset < roffset:
 
810
                break
 
811
        return first, end
 
812
 
 
813
    def _get_offsets_to_cached_pages(self):
 
814
        """Determine what nodes we already have cached."""
 
815
        cached_offsets = set(self._internal_node_cache.keys())
 
816
        cached_offsets.update(self._leaf_node_cache.keys())
 
817
        if self._root_node is not None:
 
818
            cached_offsets.add(0)
 
819
        return cached_offsets
 
820
 
 
821
    def _get_root_node(self):
 
822
        if self._root_node is None:
 
823
            # We may not have a root node yet
 
824
            self._get_internal_nodes([0])
 
825
        return self._root_node
 
826
 
668
827
    def _get_nodes(self, cache, node_indexes):
669
828
        found = {}
670
829
        needed = []
676
835
                found[idx] = cache[idx]
677
836
            except KeyError:
678
837
                needed.append(idx)
679
 
        found.update(self._cache_nodes(needed, cache))
 
838
        if not needed:
 
839
            return found
 
840
        needed = self._expand_offsets(needed)
 
841
        found.update(self._get_and_cache_nodes(needed))
680
842
        return found
681
843
 
682
844
    def _get_internal_nodes(self, node_indexes):
1012
1174
            self._get_root_node()
1013
1175
        return self._key_count
1014
1176
 
 
1177
    def _compute_row_offsets(self):
 
1178
        """Fill out the _row_offsets attribute based on _row_lengths."""
 
1179
        offsets = []
 
1180
        row_offset = 0
 
1181
        for row in self._row_lengths:
 
1182
            offsets.append(row_offset)
 
1183
            row_offset += row
 
1184
        offsets.append(row_offset)
 
1185
        self._row_offsets = offsets
 
1186
 
1015
1187
    def _parse_header_from_bytes(self, bytes):
1016
1188
        """Parse the header from a region of bytes.
1017
1189
 
1053
1225
                if len(length)])
1054
1226
        except ValueError:
1055
1227
            raise errors.BadIndexOptions(self)
1056
 
        offsets = []
1057
 
        row_offset = 0
1058
 
        for row in self._row_lengths:
1059
 
            offsets.append(row_offset)
1060
 
            row_offset += row
1061
 
        offsets.append(row_offset)
1062
 
        self._row_offsets = offsets
 
1228
        self._compute_row_offsets()
1063
1229
 
1064
1230
        # calculate the bytes we have processed
1065
1231
        header_end = (len(signature) + sum(map(len, lines[0:4])) + 4)
1091
1257
                    self._size = len(start)
1092
1258
                    size = min(_PAGE_SIZE, self._size)
1093
1259
            else:
 
1260
                if offset > self._size:
 
1261
                    raise AssertionError('tried to read past the end'
 
1262
                                         ' of the file %s > %s'
 
1263
                                         % (offset, self._size))
1094
1264
                size = min(size, self._size - offset)
1095
1265
            ranges.append((offset, size))
1096
1266
        if not ranges: