654
654
list/set/dict or similar repeatedly iterable container.
656
656
if key_filter is not None:
657
# Adjust the filter - short elements go to a prefix filter. Would this
658
# be cleaner explicitly? That would be no harder for InternalNode..
657
# Adjust the filter - short elements go to a prefix filter. All
658
# other items are looked up directly.
659
659
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
661
661
for key in key_filter:
662
length_filter = filters.setdefault(len(key), set())
663
length_filter.add(key)
664
filters = filters.items()
665
for item in self._items.iteritems():
666
for length, length_filter in filters:
667
if item[0][:length] in length_filter:
662
if len(key) == self._key_width:
663
# This filter is meant to match exactly one key, yield it
666
yield key, self._items[key]
668
# This key is not present in this map, continue
671
# Short items, we need to match based on a prefix
672
length_filter = filters.setdefault(len(key), set())
673
length_filter.add(key)
675
filters = filters.items()
676
for item in self._items.iteritems():
677
for length, length_filter in filters:
678
if item[0][:length] in length_filter:
671
682
for item in self._items.iteritems():
919
930
search_key_func=search_key_func)
921
932
def iteritems(self, store, key_filter=None):
922
for node in self._iter_nodes(store, key_filter=key_filter):
923
for item in node.iteritems(store, key_filter=key_filter):
933
for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
934
for item in node.iteritems(store, key_filter=node_filter):
926
937
def _iter_nodes(self, store, key_filter=None, batch_size=None):
935
946
:return: An iterable of nodes. This function does not have to be fully
936
947
consumed. (There will be no pending I/O when items are being returned.)
949
# Map from chk key ('sha1:...',) to (prefix, key_filter)
950
# prefix is the key in self._items to use, key_filter is the key_filter
951
# entries that would match this node
939
953
if key_filter is None:
940
954
for prefix, node in self._items.iteritems():
941
955
if type(node) == tuple:
956
keys[node] = (prefix, None)
946
960
# XXX defaultdict ?
947
962
length_filters = {}
948
963
for key in key_filter:
949
964
search_key = self._search_prefix_filter(key)
950
965
length_filter = length_filters.setdefault(
951
966
len(search_key), set())
952
967
length_filter.add(search_key)
968
prefix_to_keys.setdefault(search_key, []).append(key)
953
969
length_filters = length_filters.items()
954
970
for prefix, node in self._items.iteritems():
955
972
for length, length_filter in length_filters:
956
if prefix[:length] in length_filter:
957
if type(node) == tuple:
973
sub_prefix = prefix[:length]
974
if sub_prefix in length_filter:
975
node_key_filter.extend(prefix_to_keys[sub_prefix])
976
if node_key_filter: # this key matched something, yield it
977
if type(node) == tuple:
978
keys[node] = (prefix, node_key_filter)
980
yield node, node_key_filter
963
982
# Look in the page cache for some more bytes
964
983
found_keys = set()
971
990
node = _deserialise(bytes, key,
972
991
search_key_func=self._search_key_func)
973
self._items[keys[key]] = node
992
prefix, node_key_filter = keys[key]
993
self._items[prefix] = node
974
994
found_keys.add(key)
995
yield node, node_key_filter
976
996
for key in found_keys:
986
1006
# We have to fully consume the stream so there is no pending
987
1007
# I/O, so we buffer the nodes for now.
988
1008
stream = store.get_record_stream(batch, 'unordered', True)
1009
node_and_filters = []
990
1010
for record in stream:
991
1011
bytes = record.get_bytes_as('fulltext')
992
1012
node = _deserialise(bytes, record.key,
993
1013
search_key_func=self._search_key_func)
995
self._items[keys[record.key]] = node
1014
prefix, node_key_filter = keys[record.key]
1015
node_and_filters.append((node, node_key_filter))
1016
self._items[prefix] = node
996
1017
_page_cache.add(record.key, bytes)
1018
for info in node_and_filters:
1000
1021
def map(self, store, key, value):
1001
1022
"""Map key to value."""
1018
1039
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1020
1041
return new_parent.map(store, key, value)
1021
children = list(self._iter_nodes(store, key_filter=[key]))
1042
children = [node for node, _
1043
in self._iter_nodes(store, key_filter=[key])]
1023
1045
child = children[0]
1171
1193
"""Remove key from this node and it's children."""
1172
1194
if not len(self._items):
1173
1195
raise AssertionError("can't unmap in an empty InternalNode.")
1174
children = list(self._iter_nodes(store, key_filter=[key]))
1196
children = [node for node, _
1197
in self._iter_nodes(store, key_filter=[key])]
1176
1199
child = children[0]
1235
1258
# b) With 16-way fan out, we can still do a single round trip
1236
1259
# c) With 255-way fan out, we don't want to read all 255 and destroy
1237
1260
# the page cache, just to determine that we really don't need it.
1238
for node in self._iter_nodes(store, batch_size=16):
1261
for node, _ in self._iter_nodes(store, batch_size=16):
1239
1262
if isinstance(node, InternalNode):
1240
1263
# Without looking at any leaf nodes, we are sure