377
341
self.row_lengths = []
378
342
# Loop over all nodes adding them to the bottom row
379
343
# (rows[-1]). When we finish a chunk in a row,
380
# propagate the key that didn't fit (comes after the chunk) to the
344
# propogate the key that didn't fit (comes after the chunk) to the
381
345
# row above, transitively.
382
346
for node in node_iterator:
383
347
if key_count == 0:
384
348
# First key triggers the first row
385
349
rows.append(_LeafBuilderRow())
351
# TODO: Flattening the node into a string key and a line should
352
# probably be put into a pyrex function. We can do a quick
353
# iter over all the entries to determine the final length,
354
# and then do a single malloc() rather than lots of
355
# intermediate mallocs as we build everything up.
356
# ATM 3 / 13s are spent flattening nodes (10s is compressing)
387
357
string_key, line = _btree_serializer._flatten_node(node,
388
358
self.reference_lists)
389
self._add_key(string_key, line, rows, allow_optimize=allow_optimize)
359
self._add_key(string_key, line, rows)
390
360
for row in reversed(rows):
391
361
pad = (type(row) != _LeafBuilderRow)
392
362
row.finish_node(pad=pad)
363
result = tempfile.NamedTemporaryFile()
393
364
lines = [_BTSIGNATURE]
394
365
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
395
366
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
396
367
lines.append(_OPTION_LEN + str(key_count) + '\n')
397
368
row_lengths = [row.nodes for row in rows]
398
369
lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, row_lengths)) + '\n')
399
if row_lengths and row_lengths[-1] > 1:
400
result = tempfile.NamedTemporaryFile(prefix='bzr-index-')
402
result = cStringIO.StringIO()
403
370
result.writelines(lines)
404
371
position = sum(map(len, lines))
600
552
For InMemoryGraphIndex the estimate is exact.
602
return len(self._nodes) + sum(backing.key_count() for backing in
554
return len(self._keys) + sum(backing.key_count() for backing in
603
555
self._backing_indices if backing is not None)
605
557
def validate(self):
606
558
"""In memory index's have no known corruption at the moment."""
609
class _LeafNode(dict):
561
class _LeafNode(object):
610
562
"""A leaf node for a serialised B+Tree index."""
612
__slots__ = ('min_key', 'max_key', '_keys')
614
564
def __init__(self, bytes, key_length, ref_list_length):
615
565
"""Parse bytes to create a leaf node object."""
616
566
# splitlines mangles the \r delimiters.. don't use it.
617
key_list = _btree_serializer._parse_leaf_lines(bytes,
618
key_length, ref_list_length)
620
self.min_key = key_list[0][0]
621
self.max_key = key_list[-1][0]
623
self.min_key = self.max_key = None
624
super(_LeafNode, self).__init__(key_list)
625
self._keys = dict(self)
628
"""Return a sorted list of (key, (value, refs)) items"""
634
"""Return a sorted list of all keys."""
567
self.keys = dict(_btree_serializer._parse_leaf_lines(bytes,
568
key_length, ref_list_length))
640
571
class _InternalNode(object):
641
572
"""An internal node for a serialised B+Tree index."""
643
__slots__ = ('keys', 'offset')
645
574
def __init__(self, bytes):
646
575
"""Parse bytes to create an internal node object."""
647
576
# splitlines mangles the \r delimiters.. don't use it.
676
603
the initial read (to read the root node header) can be done
677
604
without over-reading even on empty indices, and on small indices
678
605
allows single-IO to read the entire index.
679
:param unlimited_cache: If set to True, then instead of using an
680
LRUCache with size _NODE_CACHE_SIZE, we will use a dict and always
681
cache all leaf nodes.
682
:param offset: The start of the btree index data isn't byte 0 of the
683
file. Instead it starts at some point later.
685
607
self._transport = transport
686
608
self._name = name
687
609
self._size = size
688
610
self._file = None
689
self._recommended_pages = self._compute_recommended_pages()
611
self._page_size = transport.recommended_page_size()
690
612
self._root_node = None
691
self._base_offset = offset
692
self._leaf_factory = _LeafNode
693
613
# Default max size is 100,000 leave values
694
614
self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
696
self._leaf_node_cache = {}
697
self._internal_node_cache = {}
699
self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE)
700
# We use a FIFO here just to prevent possible blowout. However, a
701
# 300k record btree has only 3k leaf nodes, and only 20 internal
702
# nodes. A value of 100 scales to ~100*100*100 = 1M records.
703
self._internal_node_cache = fifo_cache.FIFOCache(100)
615
self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE)
616
self._internal_node_cache = lru_cache.LRUCache()
704
617
self._key_count = None
705
618
self._row_lengths = None
706
619
self._row_offsets = None # Start of each row, [-1] is the end
730
651
:return: A dict of {node_pos: node}
653
if len(nodes) > cache._max_cache:
654
trace.mutter('Requesting %s > %s nodes, not all will be cached',
655
len(nodes), cache._max_cache)
733
start_of_leaves = None
734
657
for node_pos, node in self._read_nodes(sorted(nodes)):
735
658
if node_pos == 0: # Special case
736
659
self._root_node = node
738
if start_of_leaves is None:
739
start_of_leaves = self._row_offsets[-2]
740
if node_pos < start_of_leaves:
741
self._internal_node_cache[node_pos] = node
743
self._leaf_node_cache[node_pos] = node
661
cache.add(node_pos, node)
744
662
found[node_pos] = node
747
def _compute_recommended_pages(self):
748
"""Convert transport's recommended_page_size into btree pages.
750
recommended_page_size is in bytes, we want to know how many _PAGE_SIZE
751
pages fit in that length.
753
recommended_read = self._transport.recommended_page_size()
754
recommended_pages = int(math.ceil(recommended_read /
756
return recommended_pages
758
def _compute_total_pages_in_index(self):
759
"""How many pages are in the index.
761
If we have read the header we will use the value stored there.
762
Otherwise it will be computed based on the length of the index.
764
if self._size is None:
765
raise AssertionError('_compute_total_pages_in_index should not be'
766
' called when self._size is None')
767
if self._root_node is not None:
768
# This is the number of pages as defined by the header
769
return self._row_offsets[-1]
770
# This is the number of pages as defined by the size of the index. They
771
# should be indentical.
772
total_pages = int(math.ceil(self._size / float(_PAGE_SIZE)))
775
def _expand_offsets(self, offsets):
776
"""Find extra pages to download.
778
The idea is that we always want to make big-enough requests (like 64kB
779
for http), so that we don't waste round trips. So given the entries
780
that we already have cached and the new pages being downloaded figure
781
out what other pages we might want to read.
783
See also doc/developers/btree_index_prefetch.txt for more details.
785
:param offsets: The offsets to be read
786
:return: A list of offsets to download
788
if 'index' in debug.debug_flags:
789
trace.mutter('expanding: %s\toffsets: %s', self._name, offsets)
791
if len(offsets) >= self._recommended_pages:
792
# Don't add more, we are already requesting more than enough
793
if 'index' in debug.debug_flags:
794
trace.mutter(' not expanding large request (%s >= %s)',
795
len(offsets), self._recommended_pages)
797
if self._size is None:
798
# Don't try anything, because we don't know where the file ends
799
if 'index' in debug.debug_flags:
800
trace.mutter(' not expanding without knowing index size')
802
total_pages = self._compute_total_pages_in_index()
803
cached_offsets = self._get_offsets_to_cached_pages()
804
# If reading recommended_pages would read the rest of the index, just
806
if total_pages - len(cached_offsets) <= self._recommended_pages:
807
# Read whatever is left
809
expanded = [x for x in xrange(total_pages)
810
if x not in cached_offsets]
812
expanded = range(total_pages)
813
if 'index' in debug.debug_flags:
814
trace.mutter(' reading all unread pages: %s', expanded)
817
if self._root_node is None:
818
# ATM on the first read of the root node of a large index, we don't
819
# bother pre-reading any other pages. This is because the
820
# likelyhood of actually reading interesting pages is very low.
821
# See doc/developers/btree_index_prefetch.txt for a discussion, and
822
# a possible implementation when we are guessing that the second
823
# layer index is small
824
final_offsets = offsets
826
tree_depth = len(self._row_lengths)
827
if len(cached_offsets) < tree_depth and len(offsets) == 1:
828
# We haven't read enough to justify expansion
829
# If we are only going to read the root node, and 1 leaf node,
830
# then it isn't worth expanding our request. Once we've read at
831
# least 2 nodes, then we are probably doing a search, and we
832
# start expanding our requests.
833
if 'index' in debug.debug_flags:
834
trace.mutter(' not expanding on first reads')
836
final_offsets = self._expand_to_neighbors(offsets, cached_offsets,
839
final_offsets = sorted(final_offsets)
840
if 'index' in debug.debug_flags:
841
trace.mutter('expanded: %s', final_offsets)
844
def _expand_to_neighbors(self, offsets, cached_offsets, total_pages):
845
"""Expand requests to neighbors until we have enough pages.
847
This is called from _expand_offsets after policy has determined that we
849
We only want to expand requests within a given layer. We cheat a little
850
bit and assume all requests will be in the same layer. This is true
851
given the current design, but if it changes this algorithm may perform
854
:param offsets: requested offsets
855
:param cached_offsets: offsets for pages we currently have cached
856
:return: A set() of offsets after expansion
858
final_offsets = set(offsets)
860
new_tips = set(final_offsets)
861
while len(final_offsets) < self._recommended_pages and new_tips:
865
first, end = self._find_layer_first_and_end(pos)
868
and previous not in cached_offsets
869
and previous not in final_offsets
870
and previous >= first):
871
next_tips.add(previous)
873
if (after < total_pages
874
and after not in cached_offsets
875
and after not in final_offsets
878
# This would keep us from going bigger than
879
# recommended_pages by only expanding the first offsets.
880
# However, if we are making a 'wide' request, it is
881
# reasonable to expand all points equally.
882
# if len(final_offsets) > recommended_pages:
884
final_offsets.update(next_tips)
888
def clear_cache(self):
889
"""Clear out any cached/memoized values.
891
This can be called at any time, but generally it is used when we have
892
extracted some information, but don't expect to be requesting any more
895
# Note that we don't touch self._root_node or self._internal_node_cache
896
# We don't expect either of those to be big, and it can save
897
# round-trips in the future. We may re-evaluate this if InternalNode
898
# memory starts to be an issue.
899
self._leaf_node_cache.clear()
901
def external_references(self, ref_list_num):
902
if self._root_node is None:
903
self._get_root_node()
904
if ref_list_num + 1 > self.node_ref_lists:
905
raise ValueError('No ref list %d, index has %d ref lists'
906
% (ref_list_num, self.node_ref_lists))
909
for node in self.iter_all_entries():
911
refs.update(node[3][ref_list_num])
914
def _find_layer_first_and_end(self, offset):
915
"""Find the start/stop nodes for the layer corresponding to offset.
917
:return: (first, end)
918
first is the first node in this layer
919
end is the first node of the next layer
922
for roffset in self._row_offsets:
929
def _get_offsets_to_cached_pages(self):
930
"""Determine what nodes we already have cached."""
931
cached_offsets = set(self._internal_node_cache.keys())
932
cached_offsets.update(self._leaf_node_cache.keys())
933
if self._root_node is not None:
934
cached_offsets.add(0)
935
return cached_offsets
937
def _get_root_node(self):
938
if self._root_node is None:
939
# We may not have a root node yet
940
self._get_internal_nodes([0])
941
return self._root_node
943
665
def _get_nodes(self, cache, node_indexes):
995
710
"iter_all_entries scales with size of history.")
996
711
if not self.key_count():
998
if self._row_offsets[-1] == 1:
999
# There is only the root node, and we read that via key_count()
1000
if self.node_ref_lists:
1001
for key, (value, refs) in self._root_node.all_items():
1002
yield (self, key, value, refs)
1004
for key, (value, refs) in self._root_node.all_items():
1005
yield (self, key, value)
1007
713
start_of_leaves = self._row_offsets[-2]
1008
714
end_of_leaves = self._row_offsets[-1]
1009
needed_offsets = range(start_of_leaves, end_of_leaves)
1010
if needed_offsets == [0]:
1011
# Special case when we only have a root node, as we have already
1013
nodes = [(0, self._root_node)]
1015
nodes = self._read_nodes(needed_offsets)
715
needed_nodes = range(start_of_leaves, end_of_leaves)
1016
716
# We iterate strictly in-order so that we can use this function
1017
717
# for spilling index builds to disk.
1018
718
if self.node_ref_lists:
1019
for _, node in nodes:
1020
for key, (value, refs) in node.all_items():
719
for _, node in self._read_nodes(needed_nodes):
720
for key, (value, refs) in sorted(node.keys.items()):
1021
721
yield (self, key, value, refs)
1023
for _, node in nodes:
1024
for key, (value, refs) in node.all_items():
723
for _, node in self._read_nodes(needed_nodes):
724
for key, (value, refs) in sorted(node.keys.items()):
1025
725
yield (self, key, value)
1106
806
output.append(cur_out)
1109
def _walk_through_internal_nodes(self, keys):
1110
"""Take the given set of keys, and find the corresponding LeafNodes.
1112
:param keys: An unsorted iterable of keys to search for
1113
:return: (nodes, index_and_keys)
1114
nodes is a dict mapping {index: LeafNode}
1115
keys_at_index is a list of tuples of [(index, [keys for Leaf])]
1117
# 6 seconds spent in miss_torture using the sorted() line.
1118
# Even with out of order disk IO it seems faster not to sort it when
1119
# large queries are being made.
1120
keys_at_index = [(0, sorted(keys))]
1122
for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]):
1123
node_indexes = [idx for idx, s_keys in keys_at_index]
1124
nodes = self._get_internal_nodes(node_indexes)
1126
next_nodes_and_keys = []
1127
for node_index, sub_keys in keys_at_index:
1128
node = nodes[node_index]
1129
positions = self._multi_bisect_right(sub_keys, node.keys)
1130
node_offset = next_row_start + node.offset
1131
next_nodes_and_keys.extend([(node_offset + pos, s_keys)
1132
for pos, s_keys in positions])
1133
keys_at_index = next_nodes_and_keys
1134
# We should now be at the _LeafNodes
1135
node_indexes = [idx for idx, s_keys in keys_at_index]
1137
# TODO: We may *not* want to always read all the nodes in one
1138
# big go. Consider setting a max size on this.
1139
nodes = self._get_leaf_nodes(node_indexes)
1140
return nodes, keys_at_index
1142
809
def iter_entries(self, keys):
1143
810
"""Iterate over keys within the index.
1182
849
needed_keys = keys
1183
850
if not needed_keys:
1185
nodes, nodes_and_keys = self._walk_through_internal_nodes(needed_keys)
852
# 6 seconds spent in miss_torture using the sorted() line.
853
# Even with out of order disk IO it seems faster not to sort it when
854
# large queries are being made.
855
needed_keys = sorted(needed_keys)
857
nodes_and_keys = [(0, needed_keys)]
859
for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]):
860
node_indexes = [idx for idx, s_keys in nodes_and_keys]
861
nodes = self._get_internal_nodes(node_indexes)
863
next_nodes_and_keys = []
864
for node_index, sub_keys in nodes_and_keys:
865
node = nodes[node_index]
866
positions = self._multi_bisect_right(sub_keys, node.keys)
867
node_offset = next_row_start + node.offset
868
next_nodes_and_keys.extend([(node_offset + pos, s_keys)
869
for pos, s_keys in positions])
870
nodes_and_keys = next_nodes_and_keys
871
# We should now be at the _LeafNodes
872
node_indexes = [idx for idx, s_keys in nodes_and_keys]
874
# TODO: We may *not* want to always read all the nodes in one
875
# big go. Consider setting a max size on this.
877
nodes = self._get_leaf_nodes(node_indexes)
1186
878
for node_index, sub_keys in nodes_and_keys:
1187
879
if not sub_keys:
1189
881
node = nodes[node_index]
1190
882
for next_sub_key in sub_keys:
1191
if next_sub_key in node:
1192
value, refs = node[next_sub_key]
883
if next_sub_key in node.keys:
884
value, refs = node.keys[next_sub_key]
1193
885
if self.node_ref_lists:
1194
886
yield (self, next_sub_key, value, refs)
1196
888
yield (self, next_sub_key, value)
1198
def _find_ancestors(self, keys, ref_list_num, parent_map, missing_keys):
1199
"""Find the parent_map information for the set of keys.
1201
This populates the parent_map dict and missing_keys set based on the
1202
queried keys. It also can fill out an arbitrary number of parents that
1203
it finds while searching for the supplied keys.
1205
It is unlikely that you want to call this directly. See
1206
"CombinedGraphIndex.find_ancestry()" for a more appropriate API.
1208
:param keys: A keys whose ancestry we want to return
1209
Every key will either end up in 'parent_map' or 'missing_keys'.
1210
:param ref_list_num: This index in the ref_lists is the parents we
1212
:param parent_map: {key: parent_keys} for keys that are present in this
1213
index. This may contain more entries than were in 'keys', that are
1214
reachable ancestors of the keys requested.
1215
:param missing_keys: keys which are known to be missing in this index.
1216
This may include parents that were not directly requested, but we
1217
were able to determine that they are not present in this index.
1218
:return: search_keys parents that were found but not queried to know
1219
if they are missing or present. Callers can re-query this index for
1220
those keys, and they will be placed into parent_map or missing_keys
1222
if not self.key_count():
1223
# We use key_count() to trigger reading the root node and
1224
# determining info about this BTreeGraphIndex
1225
# If we don't have any keys, then everything is missing
1226
missing_keys.update(keys)
1228
if ref_list_num >= self.node_ref_lists:
1229
raise ValueError('No ref list %d, index has %d ref lists'
1230
% (ref_list_num, self.node_ref_lists))
1232
# The main trick we are trying to accomplish is that when we find a
1233
# key listing its parents, we expect that the parent key is also likely
1234
# to sit on the same page. Allowing us to expand parents quickly
1235
# without suffering the full stack of bisecting, etc.
1236
nodes, nodes_and_keys = self._walk_through_internal_nodes(keys)
1238
# These are parent keys which could not be immediately resolved on the
1239
# page where the child was present. Note that we may already be
1240
# searching for that key, and it may actually be present [or known
1241
# missing] on one of the other pages we are reading.
1243
# We could try searching for them in the immediate previous or next
1244
# page. If they occur "later" we could put them in a pending lookup
1245
# set, and then for each node we read thereafter we could check to
1246
# see if they are present.
1247
# However, we don't know the impact of keeping this list of things
1248
# that I'm going to search for every node I come across from here on
1250
# It doesn't handle the case when the parent key is missing on a
1251
# page that we *don't* read. So we already have to handle being
1252
# re-entrant for that.
1253
# Since most keys contain a date string, they are more likely to be
1254
# found earlier in the file than later, but we would know that right
1255
# away (key < min_key), and wouldn't keep searching it on every other
1256
# page that we read.
1257
# Mostly, it is an idea, one which should be benchmarked.
1258
parents_not_on_page = set()
1260
for node_index, sub_keys in nodes_and_keys:
1263
# sub_keys is all of the keys we are looking for that should exist
1264
# on this page, if they aren't here, then they won't be found
1265
node = nodes[node_index]
1266
parents_to_check = set()
1267
for next_sub_key in sub_keys:
1268
if next_sub_key not in node:
1269
# This one is just not present in the index at all
1270
missing_keys.add(next_sub_key)
1272
value, refs = node[next_sub_key]
1273
parent_keys = refs[ref_list_num]
1274
parent_map[next_sub_key] = parent_keys
1275
parents_to_check.update(parent_keys)
1276
# Don't look for things we've already found
1277
parents_to_check = parents_to_check.difference(parent_map)
1278
# this can be used to test the benefit of having the check loop
1280
# parents_not_on_page.update(parents_to_check)
1282
while parents_to_check:
1283
next_parents_to_check = set()
1284
for key in parents_to_check:
1286
value, refs = node[key]
1287
parent_keys = refs[ref_list_num]
1288
parent_map[key] = parent_keys
1289
next_parents_to_check.update(parent_keys)
1291
# This parent either is genuinely missing, or should be
1292
# found on another page. Perf test whether it is better
1293
# to check if this node should fit on this page or not.
1294
# in the 'everything-in-one-pack' scenario, this *not*
1295
# doing the check is 237ms vs 243ms.
1296
# So slightly better, but I assume the standard 'lots
1297
# of packs' is going to show a reasonable improvement
1298
# from the check, because it avoids 'going around
1299
# again' for everything that is in another index
1300
# parents_not_on_page.add(key)
1301
# Missing for some reason
1302
if key < node.min_key:
1303
# in the case of bzr.dev, 3.4k/5.3k misses are
1304
# 'earlier' misses (65%)
1305
parents_not_on_page.add(key)
1306
elif key > node.max_key:
1307
# This parent key would be present on a different
1309
parents_not_on_page.add(key)
1311
# assert key != node.min_key and key != node.max_key
1312
# If it was going to be present, it would be on
1313
# *this* page, so mark it missing.
1314
missing_keys.add(key)
1315
parents_to_check = next_parents_to_check.difference(parent_map)
1316
# Might want to do another .difference() from missing_keys
1317
# parents_not_on_page could have been found on a different page, or be
1318
# known to be missing. So cull out everything that has already been
1320
search_keys = parents_not_on_page.difference(
1321
parent_map).difference(missing_keys)
1324
890
def iter_entries_prefix(self, keys):
1325
891
"""Iterate over keys within the index using prefix matching.
1504
1066
"""Read some nodes from disk into the LRU cache.
1506
1068
This performs a readv to get the node data into memory, and parses each
1507
node, then yields it to the caller. The nodes are requested in the
1069
node, the yields it to the caller. The nodes are requested in the
1508
1070
supplied order. If possible doing sort() on the list before requesting
1509
1071
a read may improve performance.
1511
1073
:param nodes: The nodes to read. 0 - first node, 1 - second node etc.
1514
# may be the byte string of the whole file
1516
# list of (offset, length) regions of the file that should, evenually
1517
# be read in to data_ranges, either from 'bytes' or from the transport
1519
base_offset = self._base_offset
1520
1077
for index in nodes:
1521
offset = (index * _PAGE_SIZE)
1078
offset = index * _PAGE_SIZE
1522
1079
size = _PAGE_SIZE
1524
1081
# Root node - special case
1526
1083
size = min(_PAGE_SIZE, self._size)
1528
# The only case where we don't know the size, is for very
1529
# small indexes. So we read the whole thing
1530
bytes = self._transport.get_bytes(self._name)
1531
num_bytes = len(bytes)
1532
self._size = num_bytes - base_offset
1533
# the whole thing should be parsed out of 'bytes'
1534
ranges = [(start, min(_PAGE_SIZE, num_bytes - start))
1535
for start in xrange(base_offset, num_bytes, _PAGE_SIZE)]
1085
stream = self._transport.get(self._name)
1086
start = stream.read(_PAGE_SIZE)
1087
# Avoid doing this again
1088
self._size = len(start)
1089
size = min(_PAGE_SIZE, self._size)
1538
if offset > self._size:
1539
raise AssertionError('tried to read past the end'
1540
' of the file %s > %s'
1541
% (offset, self._size))
1542
1091
size = min(size, self._size - offset)
1543
ranges.append((base_offset + offset, size))
1092
ranges.append((offset, size))
1546
elif bytes is not None:
1547
# already have the whole file
1548
data_ranges = [(start, bytes[start:start+size])
1549
for start, size in ranges]
1550
elif self._file is None:
1095
if self._file is None:
1551
1096
data_ranges = self._transport.readv(self._name, ranges)
1553
1098
data_ranges = []