~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/btree_index.py

  • Committer: Frank Aspell
  • Date: 2009-02-17 11:40:05 UTC
  • mto: (4054.1.1 doc)
  • mto: This revision was merged to the branch mainline in revision 4056.
  • Revision ID: frankaspell@googlemail.com-20090217114005-ojufrp6rqht664um
Fixed typos.

Fixed some typos in bzr doc's using "aspell -l en -c FILENAME".

Show diffs side-by-side

added added

removed removed

Lines of Context:
139
139
        self._nodes = {}
140
140
        # Indicate it hasn't been built yet
141
141
        self._nodes_by_key = None
 
142
        self._optimize_for_size = False
142
143
 
143
144
    def add_node(self, key, value, references=()):
144
145
        """Add a node to the index.
276
277
                    length = _PAGE_SIZE
277
278
                    if internal_row.nodes == 0:
278
279
                        length -= _RESERVED_HEADER_BYTES # padded
279
 
                    internal_row.writer = chunk_writer.ChunkWriter(length, 0)
 
280
                    internal_row.writer = chunk_writer.ChunkWriter(length, 0,
 
281
                        optimize_for_size=self._optimize_for_size)
280
282
                    internal_row.writer.write(_INTERNAL_FLAG)
281
283
                    internal_row.writer.write(_INTERNAL_OFFSET +
282
284
                        str(rows[pos + 1].nodes) + "\n")
284
286
            length = _PAGE_SIZE
285
287
            if rows[-1].nodes == 0:
286
288
                length -= _RESERVED_HEADER_BYTES # padded
287
 
            rows[-1].writer = chunk_writer.ChunkWriter(length)
 
289
            rows[-1].writer = chunk_writer.ChunkWriter(length,
 
290
                optimize_for_size=self._optimize_for_size)
288
291
            rows[-1].writer.write(_LEAF_FLAG)
289
292
        if rows[-1].writer.write(line):
290
293
            # this key did not fit in the node:
313
316
                # This will be padded, hence the -100
314
317
                new_row.writer = chunk_writer.ChunkWriter(
315
318
                    _PAGE_SIZE - _RESERVED_HEADER_BYTES,
316
 
                    reserved_bytes)
 
319
                    reserved_bytes,
 
320
                    optimize_for_size=self._optimize_for_size)
317
321
                new_row.writer.write(_INTERNAL_FLAG)
318
322
                new_row.writer.write(_INTERNAL_OFFSET +
319
323
                    str(rows[1].nodes - 1) + "\n")
347
351
                # First key triggers the first row
348
352
                rows.append(_LeafBuilderRow())
349
353
            key_count += 1
350
 
            # TODO: Flattening the node into a string key and a line should
351
 
            #       probably be put into a pyrex function. We can do a quick
352
 
            #       iter over all the entries to determine the final length,
353
 
            #       and then do a single malloc() rather than lots of
354
 
            #       intermediate mallocs as we build everything up.
355
 
            #       ATM 3 / 13s are spent flattening nodes (10s is compressing)
356
354
            string_key, line = _btree_serializer._flatten_node(node,
357
355
                                    self.reference_lists)
358
356
            self._add_key(string_key, line, rows)
433
431
            efficient order for the index (keys iteration order in this case).
434
432
        """
435
433
        keys = set(keys)
 
434
        local_keys = keys.intersection(self._keys)
436
435
        if self.reference_lists:
437
 
            for key in keys.intersection(self._keys):
 
436
            for key in local_keys:
438
437
                node = self._nodes[key]
439
438
                yield self, key, node[1], node[0]
440
439
        else:
441
 
            for key in keys.intersection(self._keys):
 
440
            for key in local_keys:
442
441
                node = self._nodes[key]
443
442
                yield self, key, node[1]
444
 
        keys.difference_update(self._keys)
 
443
        # Find things that are in backing indices that have not been handled
 
444
        # yet.
 
445
        if not self._backing_indices:
 
446
            return # We won't find anything there either
 
447
        # Remove all of the keys that we found locally
 
448
        keys.difference_update(local_keys)
445
449
        for backing in self._backing_indices:
446
450
            if backing is None:
447
451
                continue
607
611
        self._name = name
608
612
        self._size = size
609
613
        self._file = None
610
 
        self._page_size = transport.recommended_page_size()
 
614
        self._recommended_pages = self._compute_recommended_pages()
611
615
        self._root_node = None
612
616
        # Default max size is 100,000 leave values
613
617
        self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
628
632
    def __ne__(self, other):
629
633
        return not self.__eq__(other)
630
634
 
631
 
    def _get_root_node(self):
632
 
        if self._root_node is None:
633
 
            # We may not have a root node yet
634
 
            self._get_internal_nodes([0])
635
 
        return self._root_node
636
 
 
637
 
    def _cache_nodes(self, nodes, cache):
 
635
    def _get_and_cache_nodes(self, nodes):
638
636
        """Read nodes and cache them in the lru.
639
637
 
640
638
        The nodes list supplied is sorted and then read from disk, each node
647
645
 
648
646
        :return: A dict of {node_pos: node}
649
647
        """
650
 
        if len(nodes) > cache._max_cache:
651
 
            trace.mutter('Requesting %s > %s nodes, not all will be cached',
652
 
                         len(nodes), cache._max_cache)
653
648
        found = {}
654
649
        start_of_leaves = None
655
650
        for node_pos, node in self._read_nodes(sorted(nodes)):
662
657
                    self._internal_node_cache.add(node_pos, node)
663
658
                else:
664
659
                    self._leaf_node_cache.add(node_pos, node)
665
 
                cache.add(node_pos, node)
666
660
            found[node_pos] = node
667
661
        return found
668
662
 
669
 
    def _expand_nodes(self, node_indexes):
670
 
        """Find extra nodes to download.
 
663
    def _compute_recommended_pages(self):
 
664
        """Convert transport's recommended_page_size into btree pages.
 
665
 
 
666
        recommended_page_size is in bytes, we want to know how many _PAGE_SIZE
 
667
        pages fit in that length.
 
668
        """
 
669
        recommended_read = self._transport.recommended_page_size()
 
670
        recommended_pages = int(math.ceil(recommended_read /
 
671
                                          float(_PAGE_SIZE)))
 
672
        return recommended_pages
 
673
 
 
674
    def _compute_total_pages_in_index(self):
 
675
        """How many pages are in the index.
 
676
 
 
677
        If we have read the header we will use the value stored there.
 
678
        Otherwise it will be computed based on the length of the index.
 
679
        """
 
680
        if self._size is None:
 
681
            raise AssertionError('_compute_total_pages_in_index should not be'
 
682
                                 ' called when self._size is None')
 
683
        if self._root_node is not None:
 
684
            # This is the number of pages as defined by the header
 
685
            return self._row_offsets[-1]
 
686
        # This is the number of pages as defined by the size of the index. They
 
687
        # should be indentical.
 
688
        total_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
 
689
        return total_pages
 
690
 
 
691
    def _expand_offsets(self, offsets):
 
692
        """Find extra pages to download.
671
693
 
672
694
        The idea is that we always want to make big-enough requests (like 64kB
673
695
        for http), so that we don't waste round trips. So given the entries
674
 
        that we already have cached, and the new nodes being downloaded, figure
 
696
        that we already have cached and the new pages being downloaded figure
675
697
        out what other pages we might want to read.
676
698
 
677
 
        :param node_indexes: The nodes that have been requested to read.
678
 
        # :param recommended_size: The size of download we want, this assumes
679
 
        #     that readv() is accomplished in a single round trip (which is true
680
 
        #     for http and bzr+ssh, and sftp uses async to get close
681
 
        #     performance.)
682
 
        # :param max_page: The last possible page to read, possibly max_size is
683
 
        # better?
684
 
        :return: A new list of nodes to download
 
699
        See also doc/developers/btree_index_prefetch.txt for more details.
 
700
 
 
701
        :param offsets: The offsets to be read
 
702
        :return: A list of offsets to download
685
703
        """
686
 
        trace.note('request: %s\tnodes: %s', self._name[-14:], node_indexes)
687
 
        # return node_indexes
 
704
        if 'index' in debug.debug_flags:
 
705
            trace.mutter('expanding: %s\toffsets: %s', self._name, offsets)
688
706
 
689
 
        # TODO: This could be cached
690
 
        recommended_read = self._transport.recommended_page_size() / 4
691
 
        recommended_pages = int(math.ceil(recommended_read /
692
 
                                          float(_PAGE_SIZE)))
693
 
        if len(node_indexes) >= recommended_pages:
 
707
        if len(offsets) >= self._recommended_pages:
694
708
            # Don't add more, we are already requesting more than enough
695
 
            return node_indexes
 
709
            if 'index' in debug.debug_flags:
 
710
                trace.mutter('  not expanding large request (%s >= %s)',
 
711
                             len(offsets), self._recommended_pages)
 
712
            return offsets
696
713
        if self._size is None:
697
714
            # Don't try anything, because we don't know where the file ends
698
 
            return node_indexes
699
 
        # The idea here is to get nodes "next to" the ones we are already
700
 
        # getting.
701
 
        max_page = int(math.ceil(self._size / float(_PAGE_SIZE)))
702
 
        # XXX: Update the LRUCache interface to have a .keys() attribute
703
 
        cached_nodes = set(self._internal_node_cache._cache.keys())
704
 
        cached_nodes.update(self._leaf_node_cache._cache.keys())
705
 
        if self._root_node is not None:
706
 
            cached_nodes.add(0)
707
 
 
 
715
            if 'index' in debug.debug_flags:
 
716
                trace.mutter('  not expanding without knowing index size')
 
717
            return offsets
 
718
        total_pages = self._compute_total_pages_in_index()
 
719
        cached_offsets = self._get_offsets_to_cached_pages()
708
720
        # If reading recommended_pages would read the rest of the index, just
709
721
        # do so.
710
 
        if max_page - len(cached_nodes) <= recommended_pages:
711
 
            # Just read the whole thing
712
 
            # NOTE: this breaks a little bit with the distinction between index
713
 
            # nodes and leaf nodes (as they are cached separately). It is
714
 
            # still worthwhile to read it all, but we need to figure out what
715
 
            # cache we should really put things in when we are done.
716
 
            # However, we may not have read the index header yet to know where
717
 
            # the internal nodes break from where the leaf nodes. We are sure
718
 
            # to know *after* we've done the read.
719
 
            # This also does the wrong thing if there are bloom pages.
720
 
 
721
 
            # Further, this gives the wrong results because we have 2 caches to
722
 
            # worry about...
723
 
            if cached_nodes:
724
 
                return [x for x in xrange(max_page) if x not in cached_nodes]
725
 
            return range(max_page)
726
 
 
727
 
        needed_nodes = recommended_pages - len(node_indexes)
728
 
        final_nodes = set(node_indexes)
729
 
        if node_indexes == [0]:
730
 
            # Special case when we are reading the root node, just read the
731
 
            # first N pages along with the root node
732
 
            final_nodes = node_indexes
733
 
            # for index in xrange(1, max_page):
734
 
            #     if index not in final_nodes and index not in cached_nodes:
735
 
            #         final_nodes.add(index)
736
 
            #         if len(final_nodes) >= recommended_pages:
737
 
            #             break
 
722
        if total_pages - len(cached_offsets) <= self._recommended_pages:
 
723
            # Read whatever is left
 
724
            if cached_offsets:
 
725
                expanded = [x for x in xrange(total_pages)
 
726
                               if x not in cached_offsets]
 
727
            else:
 
728
                expanded = range(total_pages)
 
729
            if 'index' in debug.debug_flags:
 
730
                trace.mutter('  reading all unread pages: %s', expanded)
 
731
            return expanded
 
732
 
 
733
        if self._root_node is None:
 
734
            # ATM on the first read of the root node of a large index, we don't
 
735
            # bother pre-reading any other pages. This is because the
 
736
            # likelyhood of actually reading interesting pages is very low.
 
737
            # See doc/developers/btree_index_prefetch.txt for a discussion, and
 
738
            # a possible implementation when we are guessing that the second
 
739
            # layer index is small
 
740
            final_offsets = offsets
738
741
        else:
739
 
            while len(final_nodes) < recommended_pages:
740
 
                for pos in sorted(final_nodes):
741
 
                    if pos > 0:
742
 
                        previous = pos - 1
743
 
                        if previous not in cached_nodes:
744
 
                            final_nodes.add(previous)
745
 
                    if pos < max_page:
746
 
                        after = pos + 1
747
 
                        if after not in cached_nodes:
748
 
                            final_nodes.add(after)
749
 
                    # if len(final_nodes) > recommended_pages:
750
 
                    #     break
751
 
 
752
 
        final_nodes = sorted(final_nodes)
753
 
        trace.note('\t\t\t\t%s', final_nodes)
754
 
        return final_nodes
 
742
            tree_depth = len(self._row_lengths)
 
743
            if len(cached_offsets) < tree_depth and len(offsets) == 1:
 
744
                # We haven't read enough to justify expansion
 
745
                # If we are only going to read the root node, and 1 leaf node,
 
746
                # then it isn't worth expanding our request. Once we've read at
 
747
                # least 2 nodes, then we are probably doing a search, and we
 
748
                # start expanding our requests.
 
749
                if 'index' in debug.debug_flags:
 
750
                    trace.mutter('  not expanding on first reads')
 
751
                return offsets
 
752
            final_offsets = self._expand_to_neighbors(offsets, cached_offsets,
 
753
                                                      total_pages)
 
754
 
 
755
        final_offsets = sorted(final_offsets)
 
756
        if 'index' in debug.debug_flags:
 
757
            trace.mutter('expanded:  %s', final_offsets)
 
758
        return final_offsets
 
759
 
 
760
    def _expand_to_neighbors(self, offsets, cached_offsets, total_pages):
 
761
        """Expand requests to neighbors until we have enough pages.
 
762
 
 
763
        This is called from _expand_offsets after policy has determined that we
 
764
        want to expand.
 
765
        We only want to expand requests within a given layer. We cheat a little
 
766
        bit and assume all requests will be in the same layer. This is true
 
767
        given the current design, but if it changes this algorithm may perform
 
768
        oddly.
 
769
 
 
770
        :param offsets: requested offsets
 
771
        :param cached_offsets: offsets for pages we currently have cached
 
772
        :return: A set() of offsets after expansion
 
773
        """
 
774
        final_offsets = set(offsets)
 
775
        first = end = None
 
776
        new_tips = set(final_offsets)
 
777
        while len(final_offsets) < self._recommended_pages and new_tips:
 
778
            next_tips = set()
 
779
            for pos in new_tips:
 
780
                if first is None:
 
781
                    first, end = self._find_layer_first_and_end(pos)
 
782
                previous = pos - 1
 
783
                if (previous > 0
 
784
                    and previous not in cached_offsets
 
785
                    and previous not in final_offsets
 
786
                    and previous >= first):
 
787
                    next_tips.add(previous)
 
788
                after = pos + 1
 
789
                if (after < total_pages
 
790
                    and after not in cached_offsets
 
791
                    and after not in final_offsets
 
792
                    and after < end):
 
793
                    next_tips.add(after)
 
794
                # This would keep us from going bigger than
 
795
                # recommended_pages by only expanding the first offsets.
 
796
                # However, if we are making a 'wide' request, it is
 
797
                # reasonable to expand all points equally.
 
798
                # if len(final_offsets) > recommended_pages:
 
799
                #     break
 
800
            final_offsets.update(next_tips)
 
801
            new_tips = next_tips
 
802
        return final_offsets
 
803
 
 
804
    def _find_layer_first_and_end(self, offset):
 
805
        """Find the start/stop nodes for the layer corresponding to offset.
 
806
 
 
807
        :return: (first, end)
 
808
            first is the first node in this layer
 
809
            end is the first node of the next layer
 
810
        """
 
811
        first = end = 0
 
812
        for roffset in self._row_offsets:
 
813
            first = end
 
814
            end = roffset
 
815
            if offset < roffset:
 
816
                break
 
817
        return first, end
 
818
 
 
819
    def _get_offsets_to_cached_pages(self):
 
820
        """Determine what nodes we already have cached."""
 
821
        cached_offsets = set(self._internal_node_cache.keys())
 
822
        cached_offsets.update(self._leaf_node_cache.keys())
 
823
        if self._root_node is not None:
 
824
            cached_offsets.add(0)
 
825
        return cached_offsets
 
826
 
 
827
    def _get_root_node(self):
 
828
        if self._root_node is None:
 
829
            # We may not have a root node yet
 
830
            self._get_internal_nodes([0])
 
831
        return self._root_node
755
832
 
756
833
    def _get_nodes(self, cache, node_indexes):
757
834
        found = {}
766
843
                needed.append(idx)
767
844
        if not needed:
768
845
            return found
769
 
        needed = self._expand_nodes(needed)
770
 
        found.update(self._cache_nodes(needed, cache))
 
846
        needed = self._expand_offsets(needed)
 
847
        found.update(self._get_and_cache_nodes(needed))
771
848
        return found
772
849
 
773
850
    def _get_internal_nodes(self, node_indexes):
777
854
        """
778
855
        return self._get_nodes(self._internal_node_cache, node_indexes)
779
856
 
780
 
    def _get_leaf_nodes(self, node_indexes):
781
 
        """Get a bunch of nodes, from cache or disk."""
782
 
        found = self._get_nodes(self._leaf_node_cache, node_indexes)
 
857
    def _cache_leaf_values(self, nodes):
 
858
        """Cache directly from key => value, skipping the btree."""
783
859
        if self._leaf_value_cache is not None:
784
 
            for node in found.itervalues():
 
860
            for node in nodes.itervalues():
785
861
                for key, value in node.keys.iteritems():
786
862
                    if key in self._leaf_value_cache:
787
863
                        # Don't add the rest of the keys, we've seen this node
788
864
                        # before.
789
865
                        break
790
866
                    self._leaf_value_cache[key] = value
 
867
 
 
868
    def _get_leaf_nodes(self, node_indexes):
 
869
        """Get a bunch of nodes, from cache or disk."""
 
870
        found = self._get_nodes(self._leaf_node_cache, node_indexes)
 
871
        self._cache_leaf_values(found)
791
872
        return found
792
873
 
793
874
    def iter_all_entries(self):
804
885
                "iter_all_entries scales with size of history.")
805
886
        if not self.key_count():
806
887
            return
 
888
        if self._row_offsets[-1] == 1:
 
889
            # There is only the root node, and we read that via key_count()
 
890
            if self.node_ref_lists:
 
891
                for key, (value, refs) in sorted(self._root_node.keys.items()):
 
892
                    yield (self, key, value, refs)
 
893
            else:
 
894
                for key, (value, refs) in sorted(self._root_node.keys.items()):
 
895
                    yield (self, key, value)
 
896
            return
807
897
        start_of_leaves = self._row_offsets[-2]
808
898
        end_of_leaves = self._row_offsets[-1]
809
 
        needed_nodes = range(start_of_leaves, end_of_leaves)
 
899
        needed_offsets = range(start_of_leaves, end_of_leaves)
 
900
        if needed_offsets == [0]:
 
901
            # Special case when we only have a root node, as we have already
 
902
            # read everything
 
903
            nodes = [(0, self._root_node)]
 
904
        else:
 
905
            nodes = self._read_nodes(needed_offsets)
810
906
        # We iterate strictly in-order so that we can use this function
811
907
        # for spilling index builds to disk.
812
908
        if self.node_ref_lists:
813
 
            for _, node in self._read_nodes(needed_nodes):
 
909
            for _, node in nodes:
814
910
                for key, (value, refs) in sorted(node.keys.items()):
815
911
                    yield (self, key, value, refs)
816
912
        else:
817
 
            for _, node in self._read_nodes(needed_nodes):
 
913
            for _, node in nodes:
818
914
                for key, (value, refs) in sorted(node.keys.items()):
819
915
                    yield (self, key, value)
820
916
 
1103
1199
            self._get_root_node()
1104
1200
        return self._key_count
1105
1201
 
 
1202
    def _compute_row_offsets(self):
 
1203
        """Fill out the _row_offsets attribute based on _row_lengths."""
 
1204
        offsets = []
 
1205
        row_offset = 0
 
1206
        for row in self._row_lengths:
 
1207
            offsets.append(row_offset)
 
1208
            row_offset += row
 
1209
        offsets.append(row_offset)
 
1210
        self._row_offsets = offsets
 
1211
 
1106
1212
    def _parse_header_from_bytes(self, bytes):
1107
1213
        """Parse the header from a region of bytes.
1108
1214
 
1144
1250
                if len(length)])
1145
1251
        except ValueError:
1146
1252
            raise errors.BadIndexOptions(self)
1147
 
        offsets = []
1148
 
        row_offset = 0
1149
 
        for row in self._row_lengths:
1150
 
            offsets.append(row_offset)
1151
 
            row_offset += row
1152
 
        offsets.append(row_offset)
1153
 
        self._row_offsets = offsets
 
1253
        self._compute_row_offsets()
1154
1254
 
1155
1255
        # calculate the bytes we have processed
1156
1256
        header_end = (len(signature) + sum(map(len, lines[0:4])) + 4)
1160
1260
        """Read some nodes from disk into the LRU cache.
1161
1261
 
1162
1262
        This performs a readv to get the node data into memory, and parses each
1163
 
        node, the yields it to the caller. The nodes are requested in the
 
1263
        node, then yields it to the caller. The nodes are requested in the
1164
1264
        supplied order. If possible doing sort() on the list before requesting
1165
1265
        a read may improve performance.
1166
1266
 
1167
1267
        :param nodes: The nodes to read. 0 - first node, 1 - second node etc.
1168
1268
        :return: None
1169
1269
        """
 
1270
        # may be the byte string of the whole file
 
1271
        bytes = None
 
1272
        # list of (offset, length) regions of the file that should, evenually
 
1273
        # be read in to data_ranges, either from 'bytes' or from the transport
1170
1274
        ranges = []
1171
1275
        for index in nodes:
1172
1276
            offset = index * _PAGE_SIZE
1176
1280
                if self._size:
1177
1281
                    size = min(_PAGE_SIZE, self._size)
1178
1282
                else:
1179
 
                    stream = self._transport.get(self._name)
1180
 
                    start = stream.read(_PAGE_SIZE)
1181
 
                    # Avoid doing this again
1182
 
                    self._size = len(start)
1183
 
                    size = min(_PAGE_SIZE, self._size)
 
1283
                    # The only case where we don't know the size, is for very
 
1284
                    # small indexes. So we read the whole thing
 
1285
                    bytes = self._transport.get_bytes(self._name)
 
1286
                    self._size = len(bytes)
 
1287
                    # the whole thing should be parsed out of 'bytes'
 
1288
                    ranges.append((0, len(bytes)))
 
1289
                    break
1184
1290
            else:
 
1291
                if offset > self._size:
 
1292
                    raise AssertionError('tried to read past the end'
 
1293
                                         ' of the file %s > %s'
 
1294
                                         % (offset, self._size))
1185
1295
                size = min(size, self._size - offset)
1186
1296
            ranges.append((offset, size))
1187
1297
        if not ranges:
1188
1298
            return
1189
 
        if self._file is None:
 
1299
        elif bytes is not None:
 
1300
            # already have the whole file
 
1301
            data_ranges = [(start, bytes[start:start+_PAGE_SIZE])
 
1302
                           for start in xrange(0, len(bytes), _PAGE_SIZE)]
 
1303
        elif self._file is None:
1190
1304
            data_ranges = self._transport.readv(self._name, ranges)
1191
1305
        else:
1192
1306
            data_ranges = []