1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
19
Overview and current status:
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
are all an additional 8-bits wide leading to a sparse upper tree.
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
allow optimisation of the update operation; but individual map/unmap calls are
29
possible and supported. Individual changes via map/unmap are buffered in memory
30
until the _save method is called to force serialisation of the tree.
31
apply_delta records its changes immediately by performing an implicit _save.
36
Densely packed upper nodes.
42
from bzrlib import lazy_import
43
lazy_import.lazy_import(globals(), """
57
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
58
# out, it takes 3.1MB to cache the layer.
59
_PAGE_CACHE_SIZE = 4*1024*1024
60
# We are caching bytes so len(value) is perfectly accurate
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
66
# If a ChildNode falls below this many bytes, we check for a remap
67
_INTERESTING_NEW_SIZE = 50
68
# If a ChildNode shrinks by more than this amount, we check for a remap
69
_INTERESTING_SHRINKAGE_LIMIT = 20
70
# If we delete more than this many nodes applying a delta, we check for a remap
71
_INTERESTING_DELETES_LIMIT = 5
74
def _search_key_plain(key):
75
"""Map the key tuple into a search string that just uses the key bytes."""
76
return '\x00'.join(key)
79
search_key_registry = registry.Registry()
80
search_key_registry.register('plain', _search_key_plain)
84
"""A persistent map from string to string backed by a CHK store."""
86
def __init__(self, store, root_key, search_key_func=None):
87
"""Create a CHKMap object.
89
:param store: The store the CHKMap is stored in.
90
:param root_key: The root key of the map. None to create an empty
92
:param search_key_func: A function mapping a key => bytes. These bytes
93
are then used by the internal nodes to split up leaf nodes into
97
if search_key_func is None:
98
search_key_func = _search_key_plain
99
self._search_key_func = search_key_func
101
self._root_node = LeafNode(search_key_func=search_key_func)
103
self._root_node = self._node_key(root_key)
105
def apply_delta(self, delta):
106
"""Apply a delta to the map.
108
:param delta: An iterable of old_key, new_key, new_value tuples.
109
If new_key is not None, then new_key->new_value is inserted
110
into the map; if old_key is not None, then the old mapping
111
of old_key is removed.
114
# Check preconditions first.
115
new_items = set([key for (old, key, value) in delta if key is not None
117
existing_new = list(self.iteritems(key_filter=new_items))
119
raise errors.InconsistentDeltaDelta(delta,
120
"New items are already in the map %r." % existing_new)
122
for old, new, value in delta:
123
if old is not None and old != new:
124
self.unmap(old, check_remap=False)
126
for old, new, value in delta:
129
if delete_count > _INTERESTING_DELETES_LIMIT:
130
trace.mutter("checking remap as %d deletions", delete_count)
134
def _ensure_root(self):
135
"""Ensure that the root node is an object not a key."""
136
if type(self._root_node) is tuple:
137
# Demand-load the root
138
self._root_node = self._get_node(self._root_node)
140
def _get_node(self, node):
143
Note that this does not update the _items dict in objects containing a
144
reference to this node. As such it does not prevent subsequent IO being
147
:param node: A tuple key or node object.
148
:return: A node object.
150
if type(node) is tuple:
151
bytes = self._read_bytes(node)
152
return _deserialise(bytes, node,
153
search_key_func=self._search_key_func)
157
def _read_bytes(self, key):
159
return _page_cache[key]
161
stream = self._store.get_record_stream([key], 'unordered', True)
162
bytes = stream.next().get_bytes_as('fulltext')
163
_page_cache[key] = bytes
166
def _dump_tree(self, include_keys=False):
167
"""Return the tree in a string representation."""
169
res = self._dump_tree_node(self._root_node, prefix='', indent='',
170
include_keys=include_keys)
171
res.append('') # Give a trailing '\n'
172
return '\n'.join(res)
174
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
175
"""For this node and all children, generate a string representation."""
180
node_key = node.key()
181
if node_key is not None:
182
key_str = ' %s' % (node_key[0],)
185
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
187
if type(node) is InternalNode:
188
# Trigger all child nodes to get loaded
189
list(node._iter_nodes(self._store))
190
for prefix, sub in sorted(node._items.iteritems()):
191
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
192
include_keys=include_keys))
194
for key, value in sorted(node._items.iteritems()):
195
# Don't use prefix nor indent here to line up when used in
196
# tests in conjunction with assertEqualDiff
197
result.append(' %r %r' % (key, value))
201
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
202
search_key_func=None):
203
"""Create a CHKMap in store with initial_value as the content.
205
:param store: The store to record initial_value in, a VersionedFiles
206
object with 1-tuple keys supporting CHK key generation.
207
:param initial_value: A dict to store in store. Its keys and values
209
:param maximum_size: The maximum_size rule to apply to nodes. This
210
determines the size at which no new data is added to a single node.
211
:param key_width: The number of elements in each key_tuple being stored
213
:param search_key_func: A function mapping a key => bytes. These bytes
214
are then used by the internal nodes to split up leaf nodes into
216
:return: The root chk of the resulting CHKMap.
218
root_key = klass._create_directly(store, initial_value,
219
maximum_size=maximum_size, key_width=key_width,
220
search_key_func=search_key_func)
224
def _create_via_map(klass, store, initial_value, maximum_size=0,
225
key_width=1, search_key_func=None):
226
result = klass(store, None, search_key_func=search_key_func)
227
result._root_node.set_maximum_size(maximum_size)
228
result._root_node._key_width = key_width
230
for key, value in initial_value.items():
231
delta.append((None, key, value))
232
root_key = result.apply_delta(delta)
236
def _create_directly(klass, store, initial_value, maximum_size=0,
237
key_width=1, search_key_func=None):
238
node = LeafNode(search_key_func=search_key_func)
239
node.set_maximum_size(maximum_size)
240
node._key_width = key_width
241
node._items = dict(initial_value)
242
node._raw_size = sum([node._key_value_len(key, value)
243
for key,value in initial_value.iteritems()])
244
node._len = len(node._items)
245
node._compute_search_prefix()
246
node._compute_serialised_prefix()
249
and node._current_size() > maximum_size):
250
prefix, node_details = node._split(store)
251
if len(node_details) == 1:
252
raise AssertionError('Failed to split using node._split')
253
node = InternalNode(prefix, search_key_func=search_key_func)
254
node.set_maximum_size(maximum_size)
255
node._key_width = key_width
256
for split, subnode in node_details:
257
node.add_node(split, subnode)
258
keys = list(node.serialise(store))
261
def iter_changes(self, basis):
262
"""Iterate over the changes between basis and self.
264
:return: An iterator of tuples: (key, old_value, new_value). Old_value
265
is None for keys only in self; new_value is None for keys only in
269
# Read both trees in lexographic, highest-first order.
270
# Any identical nodes we skip
271
# Any unique prefixes we output immediately.
272
# values in a leaf node are treated as single-value nodes in the tree
273
# which allows them to be not-special-cased. We know to output them
274
# because their value is a string, not a key(tuple) or node.
276
# corner cases to beware of when considering this function:
277
# *) common references are at different heights.
278
# consider two trees:
279
# {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
280
# {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
281
# 'ab':LeafNode={'ab':'bar'}}
282
# 'b': LeafNode={'b'}}
283
# the node with aaa/aab will only be encountered in the second tree
284
# after reading the 'a' subtree, but it is encountered in the first
285
# tree immediately. Variations on this may have read internal nodes
286
# like this. we want to cut the entire pending subtree when we
287
# realise we have a common node. For this we use a list of keys -
288
# the path to a node - and check the entire path is clean as we
290
if self._node_key(self._root_node) == self._node_key(basis._root_node):
294
excluded_keys = set()
295
self_node = self._root_node
296
basis_node = basis._root_node
297
# A heap, each element is prefix, node(tuple/NodeObject/string),
298
# key_path (a list of tuples, tail-sharing down the tree.)
301
def process_node(node, path, a_map, pending):
302
# take a node and expand it
303
node = a_map._get_node(node)
304
if type(node) == LeafNode:
305
path = (node._key, path)
306
for key, value in node._items.items():
307
# For a LeafNode, the key is a serialized_key, rather than
308
# a search_key, but the heap is using search_keys
309
search_key = node._search_key_func(key)
310
heapq.heappush(pending, (search_key, key, value, path))
312
# type(node) == InternalNode
313
path = (node._key, path)
314
for prefix, child in node._items.items():
315
heapq.heappush(pending, (prefix, None, child, path))
316
def process_common_internal_nodes(self_node, basis_node):
317
self_items = set(self_node._items.items())
318
basis_items = set(basis_node._items.items())
319
path = (self_node._key, None)
320
for prefix, child in self_items - basis_items:
321
heapq.heappush(self_pending, (prefix, None, child, path))
322
path = (basis_node._key, None)
323
for prefix, child in basis_items - self_items:
324
heapq.heappush(basis_pending, (prefix, None, child, path))
325
def process_common_leaf_nodes(self_node, basis_node):
326
self_items = set(self_node._items.items())
327
basis_items = set(basis_node._items.items())
328
path = (self_node._key, None)
329
for key, value in self_items - basis_items:
330
prefix = self._search_key_func(key)
331
heapq.heappush(self_pending, (prefix, key, value, path))
332
path = (basis_node._key, None)
333
for key, value in basis_items - self_items:
334
prefix = basis._search_key_func(key)
335
heapq.heappush(basis_pending, (prefix, key, value, path))
336
def process_common_prefix_nodes(self_node, self_path,
337
basis_node, basis_path):
338
# Would it be more efficient if we could request both at the same
340
self_node = self._get_node(self_node)
341
basis_node = basis._get_node(basis_node)
342
if (type(self_node) == InternalNode
343
and type(basis_node) == InternalNode):
344
# Matching internal nodes
345
process_common_internal_nodes(self_node, basis_node)
346
elif (type(self_node) == LeafNode
347
and type(basis_node) == LeafNode):
348
process_common_leaf_nodes(self_node, basis_node)
350
process_node(self_node, self_path, self, self_pending)
351
process_node(basis_node, basis_path, basis, basis_pending)
352
process_common_prefix_nodes(self_node, None, basis_node, None)
355
excluded_keys = set()
356
def check_excluded(key_path):
357
# Note that this is N^2, it depends on us trimming trees
358
# aggressively to not become slow.
359
# A better implementation would probably have a reverse map
360
# back to the children of a node, and jump straight to it when
361
# a common node is detected, the proceed to remove the already
362
# pending children. bzrlib.graph has a searcher module with a
364
while key_path is not None:
365
key, key_path = key_path
366
if key in excluded_keys:
371
while self_pending or basis_pending:
374
# self is exhausted: output remainder of basis
375
for prefix, key, node, path in basis_pending:
376
if check_excluded(path):
378
node = basis._get_node(node)
381
yield (key, node, None)
383
# subtree - fastpath the entire thing.
384
for key, value in node.iteritems(basis._store):
385
yield (key, value, None)
387
elif not basis_pending:
388
# basis is exhausted: output remainder of self.
389
for prefix, key, node, path in self_pending:
390
if check_excluded(path):
392
node = self._get_node(node)
395
yield (key, None, node)
397
# subtree - fastpath the entire thing.
398
for key, value in node.iteritems(self._store):
399
yield (key, None, value)
402
# XXX: future optimisation - yield the smaller items
403
# immediately rather than pushing everything on/off the
404
# heaps. Applies to both internal nodes and leafnodes.
405
if self_pending[0][0] < basis_pending[0][0]:
407
prefix, key, node, path = heapq.heappop(self_pending)
408
if check_excluded(path):
412
yield (key, None, node)
414
process_node(node, path, self, self_pending)
416
elif self_pending[0][0] > basis_pending[0][0]:
418
prefix, key, node, path = heapq.heappop(basis_pending)
419
if check_excluded(path):
423
yield (key, node, None)
425
process_node(node, path, basis, basis_pending)
428
# common prefix: possibly expand both
429
if self_pending[0][1] is None:
434
if basis_pending[0][1] is None:
439
if not read_self and not read_basis:
440
# compare a common value
441
self_details = heapq.heappop(self_pending)
442
basis_details = heapq.heappop(basis_pending)
443
if self_details[2] != basis_details[2]:
444
yield (self_details[1],
445
basis_details[2], self_details[2])
447
# At least one side wasn't a simple value
448
if (self._node_key(self_pending[0][2]) ==
449
self._node_key(basis_pending[0][2])):
450
# Identical pointers, skip (and don't bother adding to
451
# excluded, it won't turn up again.
452
heapq.heappop(self_pending)
453
heapq.heappop(basis_pending)
455
# Now we need to expand this node before we can continue
456
if read_self and read_basis:
457
# Both sides start with the same prefix, so process
459
self_prefix, _, self_node, self_path = heapq.heappop(
461
basis_prefix, _, basis_node, basis_path = heapq.heappop(
463
if self_prefix != basis_prefix:
464
raise AssertionError(
465
'%r != %r' % (self_prefix, basis_prefix))
466
process_common_prefix_nodes(
467
self_node, self_path,
468
basis_node, basis_path)
471
prefix, key, node, path = heapq.heappop(self_pending)
472
if check_excluded(path):
474
process_node(node, path, self, self_pending)
476
prefix, key, node, path = heapq.heappop(basis_pending)
477
if check_excluded(path):
479
process_node(node, path, basis, basis_pending)
482
def iteritems(self, key_filter=None):
483
"""Iterate over the entire CHKMap's contents."""
485
return self._root_node.iteritems(self._store, key_filter=key_filter)
488
"""Return the key for this map."""
489
if type(self._root_node) is tuple:
490
return self._root_node
492
return self._root_node._key
496
return len(self._root_node)
498
def map(self, key, value):
499
"""Map a key tuple to value.
501
:param key: A key to map.
502
:param value: The value to assign to key.
504
# Need a root object.
506
prefix, node_details = self._root_node.map(self._store, key, value)
507
if len(node_details) == 1:
508
self._root_node = node_details[0][1]
510
self._root_node = InternalNode(prefix,
511
search_key_func=self._search_key_func)
512
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
513
self._root_node._key_width = node_details[0][1]._key_width
514
for split, node in node_details:
515
self._root_node.add_node(split, node)
517
def _node_key(self, node):
518
"""Get the key for a node whether it's a tuple or node."""
519
if type(node) is tuple:
524
def unmap(self, key, check_remap=True):
525
"""remove key from the map."""
527
if type(self._root_node) is InternalNode:
528
unmapped = self._root_node.unmap(self._store, key,
529
check_remap=check_remap)
531
unmapped = self._root_node.unmap(self._store, key)
532
self._root_node = unmapped
534
def _check_remap(self):
535
"""Check if nodes can be collapsed."""
537
if type(self._root_node) is InternalNode:
538
self._root_node._check_remap(self._store)
541
"""Save the map completely.
543
:return: The key of the root node.
545
if type(self._root_node) is tuple:
547
return self._root_node
548
keys = list(self._root_node.serialise(self._store))
553
"""Base class defining the protocol for CHK Map nodes.
555
:ivar _raw_size: The total size of the serialized key:value data, before
556
adding the header bytes, and without prefix compression.
559
def __init__(self, key_width=1):
562
:param key_width: The width of keys for this node.
565
# Current number of elements
567
self._maximum_size = 0
568
self._key_width = key_width
569
# current size in bytes
571
# The pointers/values this node has - meaning defined by child classes.
573
# The common search prefix
574
self._search_prefix = None
577
items_str = str(sorted(self._items))
578
if len(items_str) > 20:
579
items_str = items_str[:16] + '...]'
580
return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
581
self.__class__.__name__, self._key, self._len, self._raw_size,
582
self._maximum_size, self._search_prefix, items_str)
591
def maximum_size(self):
592
"""What is the upper limit for adding references to a node."""
593
return self._maximum_size
595
def set_maximum_size(self, new_size):
596
"""Set the size threshold for nodes.
598
:param new_size: The size at which no data is added to a node. 0 for
601
self._maximum_size = new_size
604
def common_prefix(cls, prefix, key):
605
"""Given 2 strings, return the longest prefix common to both.
607
:param prefix: This has been the common prefix for other keys, so it is
608
more likely to be the common prefix in this case as well.
609
:param key: Another string to compare to
611
if key.startswith(prefix):
614
# Is there a better way to do this?
615
for pos, (left, right) in enumerate(zip(prefix, key)):
619
common = prefix[:pos+1]
623
def common_prefix_for_keys(cls, keys):
624
"""Given a list of keys, find their common prefix.
626
:param keys: An iterable of strings.
627
:return: The longest common prefix of all keys.
631
if common_prefix is None:
634
common_prefix = cls.common_prefix(common_prefix, key)
635
if not common_prefix:
636
# if common_prefix is the empty string, then we know it won't
642
# Singleton indicating we have not computed _search_prefix yet
645
class LeafNode(Node):
646
"""A node containing actual key:value pairs.
648
:ivar _items: A dict of key->value items. The key is in tuple form.
649
:ivar _size: The number of bytes that would be used by serializing all of
653
def __init__(self, search_key_func=None):
655
# All of the keys in this leaf node share this common prefix
656
self._common_serialised_prefix = None
657
self._serialise_key = '\x00'.join
658
if search_key_func is None:
659
self._search_key_func = _search_key_plain
661
self._search_key_func = search_key_func
664
items_str = str(sorted(self._items))
665
if len(items_str) > 20:
666
items_str = items_str[:16] + '...]'
668
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
669
% (self.__class__.__name__, self._key, self._len, self._raw_size,
670
self._maximum_size, self._search_prefix, self._key_width, items_str)
672
def _current_size(self):
673
"""Answer the current serialised size of this node.
675
This differs from self._raw_size in that it includes the bytes used for
678
if self._common_serialised_prefix is None:
682
# We will store a single string with the common prefix
683
# And then that common prefix will not be stored in any of the
685
prefix_len = len(self._common_serialised_prefix)
686
bytes_for_items = (self._raw_size - (prefix_len * self._len))
687
return (9 # 'chkleaf:\n'
688
+ len(str(self._maximum_size)) + 1
689
+ len(str(self._key_width)) + 1
690
+ len(str(self._len)) + 1
695
def deserialise(klass, bytes, key, search_key_func=None):
696
"""Deserialise bytes, with key key, into a LeafNode.
698
:param bytes: The bytes of the node.
699
:param key: The key that the serialised node has.
701
return _deserialise_leaf_node(bytes, key,
702
search_key_func=search_key_func)
704
def iteritems(self, store, key_filter=None):
705
"""Iterate over items in the node.
707
:param key_filter: A filter to apply to the node. It should be a
708
list/set/dict or similar repeatedly iterable container.
710
if key_filter is not None:
711
# Adjust the filter - short elements go to a prefix filter. All
712
# other items are looked up directly.
713
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
715
for key in key_filter:
716
if len(key) == self._key_width:
717
# This filter is meant to match exactly one key, yield it
720
yield key, self._items[key]
722
# This key is not present in this map, continue
725
# Short items, we need to match based on a prefix
726
length_filter = filters.setdefault(len(key), set())
727
length_filter.add(key)
729
filters = filters.items()
730
for item in self._items.iteritems():
731
for length, length_filter in filters:
732
if item[0][:length] in length_filter:
736
for item in self._items.iteritems():
739
def _key_value_len(self, key, value):
740
# TODO: Should probably be done without actually joining the key, but
741
# then that can be done via the C extension
742
return (len(self._serialise_key(key)) + 1
743
+ len(str(value.count('\n'))) + 1
746
def _search_key(self, key):
747
return self._search_key_func(key)
749
def _map_no_split(self, key, value):
750
"""Map a key to a value.
752
This assumes either the key does not already exist, or you have already
753
removed its size and length from self.
755
:return: True if adding this node should cause us to split.
757
self._items[key] = value
758
self._raw_size += self._key_value_len(key, value)
760
serialised_key = self._serialise_key(key)
761
if self._common_serialised_prefix is None:
762
self._common_serialised_prefix = serialised_key
764
self._common_serialised_prefix = self.common_prefix(
765
self._common_serialised_prefix, serialised_key)
766
search_key = self._search_key(key)
767
if self._search_prefix is _unknown:
768
self._compute_search_prefix()
769
if self._search_prefix is None:
770
self._search_prefix = search_key
772
self._search_prefix = self.common_prefix(
773
self._search_prefix, search_key)
775
and self._maximum_size
776
and self._current_size() > self._maximum_size):
777
# Check to see if all of the search_keys for this node are
778
# identical. We allow the node to grow under that circumstance
779
# (we could track this as common state, but it is infrequent)
780
if (search_key != self._search_prefix
781
or not self._are_search_keys_identical()):
785
def _split(self, store):
786
"""We have overflowed.
788
Split this node into multiple LeafNodes, return it up the stack so that
789
the next layer creates a new InternalNode and references the new nodes.
791
:return: (common_serialised_prefix, [(node_serialised_prefix, node)])
793
if self._search_prefix is _unknown:
794
raise AssertionError('Search prefix must be known')
795
common_prefix = self._search_prefix
796
split_at = len(common_prefix) + 1
798
for key, value in self._items.iteritems():
799
search_key = self._search_key(key)
800
prefix = search_key[:split_at]
801
# TODO: Generally only 1 key can be exactly the right length,
802
# which means we can only have 1 key in the node pointed
803
# at by the 'prefix\0' key. We might want to consider
804
# folding it into the containing InternalNode rather than
805
# having a fixed length-1 node.
806
# Note this is probably not true for hash keys, as they
807
# may get a '\00' node anywhere, but won't have keys of
809
if len(prefix) < split_at:
810
prefix += '\x00'*(split_at - len(prefix))
811
if prefix not in result:
812
node = LeafNode(search_key_func=self._search_key_func)
813
node.set_maximum_size(self._maximum_size)
814
node._key_width = self._key_width
815
result[prefix] = node
817
node = result[prefix]
818
sub_prefix, node_details = node.map(store, key, value)
819
if len(node_details) > 1:
820
if prefix != sub_prefix:
821
# This node has been split and is now found via a different
824
new_node = InternalNode(sub_prefix,
825
search_key_func=self._search_key_func)
826
new_node.set_maximum_size(self._maximum_size)
827
new_node._key_width = self._key_width
828
for split, node in node_details:
829
new_node.add_node(split, node)
830
result[prefix] = new_node
831
return common_prefix, result.items()
833
def map(self, store, key, value):
834
"""Map key to value."""
835
if key in self._items:
836
self._raw_size -= self._key_value_len(key, self._items[key])
839
if self._map_no_split(key, value):
840
return self._split(store)
842
if self._search_prefix is _unknown:
843
raise AssertionError('%r must be known' % self._search_prefix)
844
return self._search_prefix, [("", self)]
846
def serialise(self, store):
847
"""Serialise the LeafNode to store.
849
:param store: A VersionedFiles honouring the CHK extensions.
850
:return: An iterable of the keys inserted by this operation.
852
lines = ["chkleaf:\n"]
853
lines.append("%d\n" % self._maximum_size)
854
lines.append("%d\n" % self._key_width)
855
lines.append("%d\n" % self._len)
856
if self._common_serialised_prefix is None:
858
if len(self._items) != 0:
859
raise AssertionError('If _common_serialised_prefix is None'
860
' we should have no items')
862
lines.append('%s\n' % (self._common_serialised_prefix,))
863
prefix_len = len(self._common_serialised_prefix)
864
for key, value in sorted(self._items.items()):
865
# Always add a final newline
866
value_lines = osutils.chunks_to_lines([value + '\n'])
867
serialized = "%s\x00%s\n" % (self._serialise_key(key),
869
if not serialized.startswith(self._common_serialised_prefix):
870
raise AssertionError('We thought the common prefix was %r'
871
' but entry %r does not have it in common'
872
% (self._common_serialised_prefix, serialized))
873
lines.append(serialized[prefix_len:])
874
lines.extend(value_lines)
875
sha1, _, _ = store.add_lines((None,), (), lines)
876
self._key = ("sha1:" + sha1,)
877
bytes = ''.join(lines)
878
if len(bytes) != self._current_size():
879
raise AssertionError('Invalid _current_size')
880
_page_cache.add(self._key, bytes)
884
"""Return the references to other CHK's held by this node."""
887
def _compute_search_prefix(self):
888
"""Determine the common search prefix for all keys in this node.
890
:return: A bytestring of the longest search key prefix that is
891
unique within this node.
893
search_keys = [self._search_key_func(key) for key in self._items]
894
self._search_prefix = self.common_prefix_for_keys(search_keys)
895
return self._search_prefix
897
def _are_search_keys_identical(self):
898
"""Check to see if the search keys for all entries are the same.
900
When using a hash as the search_key it is possible for non-identical
901
keys to collide. If that happens enough, we may try overflow a
902
LeafNode, but as all are collisions, we must not split.
904
common_search_key = None
905
for key in self._items:
906
search_key = self._search_key(key)
907
if common_search_key is None:
908
common_search_key = search_key
909
elif search_key != common_search_key:
913
def _compute_serialised_prefix(self):
914
"""Determine the common prefix for serialised keys in this node.
916
:return: A bytestring of the longest serialised key prefix that is
917
unique within this node.
919
serialised_keys = [self._serialise_key(key) for key in self._items]
920
self._common_serialised_prefix = self.common_prefix_for_keys(
922
return self._common_serialised_prefix
924
def unmap(self, store, key):
925
"""Unmap key from the node."""
927
self._raw_size -= self._key_value_len(key, self._items[key])
929
trace.mutter("key %s not found in %r", key, self._items)
934
# Recompute from scratch
935
self._compute_search_prefix()
936
self._compute_serialised_prefix()
940
class InternalNode(Node):
941
"""A node that contains references to other nodes.
943
An InternalNode is responsible for mapping search key prefixes to child
946
:ivar _items: serialised_key => node dictionary. node may be a tuple,
947
LeafNode or InternalNode.
950
def __init__(self, prefix='', search_key_func=None):
952
# The size of an internalnode with default values and no children.
953
# How many octets key prefixes within this node are.
955
self._search_prefix = prefix
956
if search_key_func is None:
957
self._search_key_func = _search_key_plain
959
self._search_key_func = search_key_func
961
def add_node(self, prefix, node):
962
"""Add a child node with prefix prefix, and node node.
964
:param prefix: The search key prefix for node.
965
:param node: The node being added.
967
if self._search_prefix is None:
968
raise AssertionError("_search_prefix should not be None")
969
if not prefix.startswith(self._search_prefix):
970
raise AssertionError("prefixes mismatch: %s must start with %s"
971
% (prefix,self._search_prefix))
972
if len(prefix) != len(self._search_prefix) + 1:
973
raise AssertionError("prefix wrong length: len(%s) is not %d" %
974
(prefix, len(self._search_prefix) + 1))
975
self._len += len(node)
976
if not len(self._items):
977
self._node_width = len(prefix)
978
if self._node_width != len(self._search_prefix) + 1:
979
raise AssertionError("node width mismatch: %d is not %d" %
980
(self._node_width, len(self._search_prefix) + 1))
981
self._items[prefix] = node
984
def _current_size(self):
985
"""Answer the current serialised size of this node."""
986
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
987
len(str(self._maximum_size)))
990
def deserialise(klass, bytes, key, search_key_func=None):
991
"""Deserialise bytes to an InternalNode, with key key.
993
:param bytes: The bytes of the node.
994
:param key: The key that the serialised node has.
995
:return: An InternalNode instance.
997
return _deserialise_internal_node(bytes, key,
998
search_key_func=search_key_func)
1000
def iteritems(self, store, key_filter=None):
1001
for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1002
for item in node.iteritems(store, key_filter=node_filter):
1005
def _iter_nodes(self, store, key_filter=None, batch_size=None):
1006
"""Iterate over node objects which match key_filter.
1008
:param store: A store to use for accessing content.
1009
:param key_filter: A key filter to filter nodes. Only nodes that might
1010
contain a key in key_filter will be returned.
1011
:param batch_size: If not None, then we will return the nodes that had
1012
to be read using get_record_stream in batches, rather than reading
1014
:return: An iterable of nodes. This function does not have to be fully
1015
consumed. (There will be no pending I/O when items are being returned.)
1017
# Map from chk key ('sha1:...',) to (prefix, key_filter)
1018
# prefix is the key in self._items to use, key_filter is the key_filter
1019
# entries that would match this node
1022
if key_filter is None:
1023
# yielding all nodes, yield whatever we have, and queue up a read
1024
# for whatever we are missing
1026
for prefix, node in self._items.iteritems():
1027
if node.__class__ is tuple:
1028
keys[node] = (prefix, None)
1031
elif len(key_filter) == 1:
1032
# Technically, this path could also be handled by the first check
1033
# in 'self._node_width' in length_filters. However, we can handle
1034
# this case without spending any time building up the
1035
# prefix_to_keys, etc state.
1037
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1038
# 0.626us list(key_filter)[0]
1039
# is a func() for list(), 2 mallocs, and a getitem
1040
# 0.489us [k for k in key_filter][0]
1041
# still has the mallocs, avoids the func() call
1042
# 0.350us iter(key_filter).next()
1043
# has a func() call, and mallocs an iterator
1044
# 0.125us for key in key_filter: pass
1045
# no func() overhead, might malloc an iterator
1046
# 0.105us for key in key_filter: break
1047
# no func() overhead, might malloc an iterator, probably
1048
# avoids checking an 'else' clause as part of the for
1049
for key in key_filter:
1051
search_prefix = self._search_prefix_filter(key)
1052
if len(search_prefix) == self._node_width:
1053
# This item will match exactly, so just do a dict lookup, and
1054
# see what we can return
1057
node = self._items[search_prefix]
1059
# A given key can only match 1 child node, if it isn't
1060
# there, then we can just return nothing
1062
if node.__class__ is tuple:
1063
keys[node] = (search_prefix, [key])
1065
# This is loaded, and the only thing that can match,
1070
# First, convert all keys into a list of search prefixes
1071
# Aggregate common prefixes, and track the keys they come from
1074
for key in key_filter:
1075
search_prefix = self._search_prefix_filter(key)
1076
length_filter = length_filters.setdefault(
1077
len(search_prefix), set())
1078
length_filter.add(search_prefix)
1079
prefix_to_keys.setdefault(search_prefix, []).append(key)
1081
if (self._node_width in length_filters
1082
and len(length_filters) == 1):
1083
# all of the search prefixes match exactly _node_width. This
1084
# means that everything is an exact match, and we can do a
1085
# lookup into self._items, rather than iterating over the items
1087
search_prefixes = length_filters[self._node_width]
1088
for search_prefix in search_prefixes:
1090
node = self._items[search_prefix]
1092
# We can ignore this one
1094
node_key_filter = prefix_to_keys[search_prefix]
1095
if node.__class__ is tuple:
1096
keys[node] = (search_prefix, node_key_filter)
1098
yield node, node_key_filter
1100
# The slow way. We walk every item in self._items, and check to
1101
# see if there are any matches
1102
length_filters = length_filters.items()
1103
for prefix, node in self._items.iteritems():
1104
node_key_filter = []
1105
for length, length_filter in length_filters:
1106
sub_prefix = prefix[:length]
1107
if sub_prefix in length_filter:
1108
node_key_filter.extend(prefix_to_keys[sub_prefix])
1109
if node_key_filter: # this key matched something, yield it
1110
if node.__class__ is tuple:
1111
keys[node] = (prefix, node_key_filter)
1113
yield node, node_key_filter
1115
# Look in the page cache for some more bytes
1119
bytes = _page_cache[key]
1123
node = _deserialise(bytes, key,
1124
search_key_func=self._search_key_func)
1125
prefix, node_key_filter = keys[key]
1126
self._items[prefix] = node
1128
yield node, node_key_filter
1129
for key in found_keys:
1132
# demand load some pages.
1133
if batch_size is None:
1134
# Read all the keys in
1135
batch_size = len(keys)
1136
key_order = list(keys)
1137
for batch_start in range(0, len(key_order), batch_size):
1138
batch = key_order[batch_start:batch_start + batch_size]
1139
# We have to fully consume the stream so there is no pending
1140
# I/O, so we buffer the nodes for now.
1141
stream = store.get_record_stream(batch, 'unordered', True)
1142
node_and_filters = []
1143
for record in stream:
1144
bytes = record.get_bytes_as('fulltext')
1145
node = _deserialise(bytes, record.key,
1146
search_key_func=self._search_key_func)
1147
prefix, node_key_filter = keys[record.key]
1148
node_and_filters.append((node, node_key_filter))
1149
self._items[prefix] = node
1150
_page_cache.add(record.key, bytes)
1151
for info in node_and_filters:
1154
def map(self, store, key, value):
1155
"""Map key to value."""
1156
if not len(self._items):
1157
raise AssertionError("can't map in an empty InternalNode.")
1158
search_key = self._search_key(key)
1159
if self._node_width != len(self._search_prefix) + 1:
1160
raise AssertionError("node width mismatch: %d is not %d" %
1161
(self._node_width, len(self._search_prefix) + 1))
1162
if not search_key.startswith(self._search_prefix):
1163
# This key doesn't fit in this index, so we need to split at the
1164
# point where it would fit, insert self into that internal node,
1165
# and then map this key into that node.
1166
new_prefix = self.common_prefix(self._search_prefix,
1168
new_parent = InternalNode(new_prefix,
1169
search_key_func=self._search_key_func)
1170
new_parent.set_maximum_size(self._maximum_size)
1171
new_parent._key_width = self._key_width
1172
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1174
return new_parent.map(store, key, value)
1175
children = [node for node, _
1176
in self._iter_nodes(store, key_filter=[key])]
1181
child = self._new_child(search_key, LeafNode)
1182
old_len = len(child)
1183
if type(child) is LeafNode:
1184
old_size = child._current_size()
1187
prefix, node_details = child.map(store, key, value)
1188
if len(node_details) == 1:
1189
# child may have shrunk, or might be a new node
1190
child = node_details[0][1]
1191
self._len = self._len - old_len + len(child)
1192
self._items[search_key] = child
1195
if type(child) is LeafNode:
1196
if old_size is None:
1197
# The old node was an InternalNode which means it has now
1198
# collapsed, so we need to check if it will chain to a
1199
# collapse at this level.
1200
trace.mutter("checking remap as InternalNode -> LeafNode")
1201
new_node = self._check_remap(store)
1203
# If the LeafNode has shrunk in size, we may want to run
1204
# a remap check. Checking for a remap is expensive though
1205
# and the frequency of a successful remap is very low.
1206
# Shrinkage by small amounts is common, so we only do the
1207
# remap check if the new_size is low or the shrinkage
1208
# amount is over a configurable limit.
1209
new_size = child._current_size()
1210
shrinkage = old_size - new_size
1211
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1212
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1214
"checking remap as size shrunk by %d to be %d",
1215
shrinkage, new_size)
1216
new_node = self._check_remap(store)
1217
if new_node._search_prefix is None:
1218
raise AssertionError("_search_prefix should not be None")
1219
return new_node._search_prefix, [('', new_node)]
1220
# child has overflown - create a new intermediate node.
1221
# XXX: This is where we might want to try and expand our depth
1222
# to refer to more bytes of every child (which would give us
1223
# multiple pointers to child nodes, but less intermediate nodes)
1224
child = self._new_child(search_key, InternalNode)
1225
child._search_prefix = prefix
1226
for split, node in node_details:
1227
child.add_node(split, node)
1228
self._len = self._len - old_len + len(child)
1230
return self._search_prefix, [("", self)]
1232
def _new_child(self, search_key, klass):
1233
"""Create a new child node of type klass."""
1235
child.set_maximum_size(self._maximum_size)
1236
child._key_width = self._key_width
1237
child._search_key_func = self._search_key_func
1238
self._items[search_key] = child
1241
def serialise(self, store):
1242
"""Serialise the node to store.
1244
:param store: A VersionedFiles honouring the CHK extensions.
1245
:return: An iterable of the keys inserted by this operation.
1247
for node in self._items.itervalues():
1248
if type(node) is tuple:
1249
# Never deserialised.
1251
if node._key is not None:
1254
for key in node.serialise(store):
1256
lines = ["chknode:\n"]
1257
lines.append("%d\n" % self._maximum_size)
1258
lines.append("%d\n" % self._key_width)
1259
lines.append("%d\n" % self._len)
1260
if self._search_prefix is None:
1261
raise AssertionError("_search_prefix should not be None")
1262
lines.append('%s\n' % (self._search_prefix,))
1263
prefix_len = len(self._search_prefix)
1264
for prefix, node in sorted(self._items.items()):
1265
if type(node) is tuple:
1269
serialised = "%s\x00%s\n" % (prefix, key)
1270
if not serialised.startswith(self._search_prefix):
1271
raise AssertionError("prefixes mismatch: %s must start with %s"
1272
% (serialised, self._search_prefix))
1273
lines.append(serialised[prefix_len:])
1274
sha1, _, _ = store.add_lines((None,), (), lines)
1275
self._key = ("sha1:" + sha1,)
1276
_page_cache.add(self._key, ''.join(lines))
1279
def _search_key(self, key):
1280
"""Return the serialised key for key in this node."""
1281
# search keys are fixed width. All will be self._node_width wide, so we
1283
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1285
def _search_prefix_filter(self, key):
1286
"""Serialise key for use as a prefix filter in iteritems."""
1287
return self._search_key_func(key)[:self._node_width]
1289
def _split(self, offset):
1290
"""Split this node into smaller nodes starting at offset.
1292
:param offset: The offset to start the new child nodes at.
1293
:return: An iterable of (prefix, node) tuples. prefix is a byte
1294
prefix for reaching node.
1296
if offset >= self._node_width:
1297
for node in self._items.values():
1298
for result in node._split(offset):
1301
for key, node in self._items.items():
1305
"""Return the references to other CHK's held by this node."""
1306
if self._key is None:
1307
raise AssertionError("unserialised nodes have no refs.")
1309
for value in self._items.itervalues():
1310
if type(value) is tuple:
1313
refs.append(value.key())
1316
def _compute_search_prefix(self, extra_key=None):
1317
"""Return the unique key prefix for this node.
1319
:return: A bytestring of the longest search key prefix that is
1320
unique within this node.
1322
self._search_prefix = self.common_prefix_for_keys(self._items)
1323
return self._search_prefix
1325
def unmap(self, store, key, check_remap=True):
1326
"""Remove key from this node and it's children."""
1327
if not len(self._items):
1328
raise AssertionError("can't unmap in an empty InternalNode.")
1329
children = [node for node, _
1330
in self._iter_nodes(store, key_filter=[key])]
1336
unmapped = child.unmap(store, key)
1338
search_key = self._search_key(key)
1339
if len(unmapped) == 0:
1340
# All child nodes are gone, remove the child:
1341
del self._items[search_key]
1344
# Stash the returned node
1345
self._items[search_key] = unmapped
1346
if len(self._items) == 1:
1347
# this node is no longer needed:
1348
return self._items.values()[0]
1349
if type(unmapped) is InternalNode:
1352
return self._check_remap(store)
1356
def _check_remap(self, store):
1357
"""Check if all keys contained by children fit in a single LeafNode.
1359
:param store: A store to use for reading more nodes
1360
:return: Either self, or a new LeafNode which should replace self.
1362
# Logic for how we determine when we need to rebuild
1363
# 1) Implicitly unmap() is removing a key which means that the child
1364
# nodes are going to be shrinking by some extent.
1365
# 2) If all children are LeafNodes, it is possible that they could be
1366
# combined into a single LeafNode, which can then completely replace
1367
# this internal node with a single LeafNode
1368
# 3) If *one* child is an InternalNode, we assume it has already done
1369
# all the work to determine that its children cannot collapse, and
1370
# we can then assume that those nodes *plus* the current nodes don't
1371
# have a chance of collapsing either.
1372
# So a very cheap check is to just say if 'unmapped' is an
1373
# InternalNode, we don't have to check further.
1375
# TODO: Another alternative is to check the total size of all known
1376
# LeafNodes. If there is some formula we can use to determine the
1377
# final size without actually having to read in any more
1378
# children, it would be nice to have. However, we have to be
1379
# careful with stuff like nodes that pull out the common prefix
1380
# of each key, as adding a new key can change the common prefix
1381
# and cause size changes greater than the length of one key.
1382
# So for now, we just add everything to a new Leaf until it
1383
# splits, as we know that will give the right answer
1384
new_leaf = LeafNode(search_key_func=self._search_key_func)
1385
new_leaf.set_maximum_size(self._maximum_size)
1386
new_leaf._key_width = self._key_width
1387
# A batch_size of 16 was chosen because:
1388
# a) In testing, a 4k page held 14 times. So if we have more than 16
1389
# leaf nodes we are unlikely to hold them in a single new leaf
1390
# node. This still allows for 1 round trip
1391
# b) With 16-way fan out, we can still do a single round trip
1392
# c) With 255-way fan out, we don't want to read all 255 and destroy
1393
# the page cache, just to determine that we really don't need it.
1394
for node, _ in self._iter_nodes(store, batch_size=16):
1395
if type(node) is InternalNode:
1396
# Without looking at any leaf nodes, we are sure
1398
for key, value in node._items.iteritems():
1399
if new_leaf._map_no_split(key, value):
1401
trace.mutter("remap generated a new LeafNode")
1405
def _deserialise(bytes, key, search_key_func):
1406
"""Helper for repositorydetails - convert bytes to a node."""
1407
if bytes.startswith("chkleaf:\n"):
1408
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1409
elif bytes.startswith("chknode:\n"):
1410
node = InternalNode.deserialise(bytes, key,
1411
search_key_func=search_key_func)
1413
raise AssertionError("Unknown node type.")
1417
class CHKMapDifference(object):
1418
"""Iterate the stored pages and key,value pairs for (new - old).
1420
This class provides a generator over the stored CHK pages and the
1421
(key, value) pairs that are in any of the new maps and not in any of the
1424
Note that it may yield chk pages that are common (especially root nodes),
1425
but it won't yield (key,value) pairs that are common.
1428
def __init__(self, store, new_root_keys, old_root_keys,
1429
search_key_func, pb=None):
1431
self._new_root_keys = new_root_keys
1432
self._old_root_keys = old_root_keys
1434
# All uninteresting chks that we have seen. By the time they are added
1435
# here, they should be either fully ignored, or queued up for
1437
self._all_old_chks = set(self._old_root_keys)
1438
# All items that we have seen from the old_root_keys
1439
self._all_old_items = set()
1440
# These are interesting items which were either read, or already in the
1441
# interesting queue (so we don't need to walk them again)
1442
self._processed_new_refs = set()
1443
self._search_key_func = search_key_func
1445
# The uninteresting and interesting nodes to be searched
1446
self._old_queue = []
1447
self._new_queue = []
1448
# Holds the (key, value) items found when processing the root nodes,
1449
# waiting for the uninteresting nodes to be walked
1450
self._new_item_queue = []
1453
def _read_nodes_from_store(self, keys):
1454
# We chose not to use _page_cache, because we think in terms of records
1455
# to be yielded. Also, we expect to touch each page only 1 time during
1456
# this code. (We may want to evaluate saving the raw bytes into the
1457
# page cache, which would allow a working tree update after the fetch
1458
# to not have to read the bytes again.)
1459
stream = self._store.get_record_stream(keys, 'unordered', True)
1460
for record in stream:
1461
if self._pb is not None:
1463
if record.storage_kind == 'absent':
1464
raise errors.NoSuchRevision(self._store, record.key)
1465
bytes = record.get_bytes_as('fulltext')
1466
node = _deserialise(bytes, record.key,
1467
search_key_func=self._search_key_func)
1468
if type(node) is InternalNode:
1469
# Note we don't have to do node.refs() because we know that
1470
# there are no children that have been pushed into this node
1471
prefix_refs = node._items.items()
1475
items = node._items.items()
1476
yield record, node, prefix_refs, items
1478
def _read_old_roots(self):
1479
old_chks_to_enqueue = []
1480
all_old_chks = self._all_old_chks
1481
for record, node, prefix_refs, items in \
1482
self._read_nodes_from_store(self._old_root_keys):
1483
# Uninteresting node
1484
prefix_refs = [p_r for p_r in prefix_refs
1485
if p_r[1] not in all_old_chks]
1486
new_refs = [p_r[1] for p_r in prefix_refs]
1487
all_old_chks.update(new_refs)
1488
self._all_old_items.update(items)
1489
# Queue up the uninteresting references
1490
# Don't actually put them in the 'to-read' queue until we have
1491
# finished checking the interesting references
1492
old_chks_to_enqueue.extend(prefix_refs)
1493
return old_chks_to_enqueue
1495
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1496
# At this point, we have read all the uninteresting and interesting
1497
# items, so we can queue up the uninteresting stuff, knowing that we've
1498
# handled the interesting ones
1499
for prefix, ref in old_chks_to_enqueue:
1500
not_interesting = True
1501
for i in xrange(len(prefix), 0, -1):
1502
if prefix[:i] in new_prefixes:
1503
not_interesting = False
1506
# This prefix is not part of the remaining 'interesting set'
1508
self._old_queue.append(ref)
1510
def _read_all_roots(self):
1511
"""Read the root pages.
1513
This is structured as a generator, so that the root records can be
1514
yielded up to whoever needs them without any buffering.
1516
# This is the bootstrap phase
1517
if not self._old_root_keys:
1518
# With no old_root_keys we can just shortcut and be ready
1519
# for _flush_new_queue
1520
self._new_queue = list(self._new_root_keys)
1522
old_chks_to_enqueue = self._read_old_roots()
1523
# filter out any root keys that are already known to be uninteresting
1524
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1525
# These are prefixes that are present in new_keys that we are
1527
new_prefixes = set()
1528
# We are about to yield all of these, so we don't want them getting
1529
# added a second time
1530
processed_new_refs = self._processed_new_refs
1531
processed_new_refs.update(new_keys)
1532
for record, node, prefix_refs, items in \
1533
self._read_nodes_from_store(new_keys):
1534
# At this level, we now know all the uninteresting references
1535
# So we filter and queue up whatever is remaining
1536
prefix_refs = [p_r for p_r in prefix_refs
1537
if p_r[1] not in self._all_old_chks
1538
and p_r[1] not in processed_new_refs]
1539
refs = [p_r[1] for p_r in prefix_refs]
1540
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1541
self._new_queue.extend(refs)
1542
# TODO: We can potentially get multiple items here, however the
1543
# current design allows for this, as callers will do the work
1544
# to make the results unique. We might profile whether we
1545
# gain anything by ensuring unique return values for items
1546
new_items = [item for item in items
1547
if item not in self._all_old_items]
1548
self._new_item_queue.extend(new_items)
1549
new_prefixes.update([self._search_key_func(item[0])
1550
for item in new_items])
1551
processed_new_refs.update(refs)
1553
# For new_prefixes we have the full length prefixes queued up.
1554
# However, we also need possible prefixes. (If we have a known ref to
1555
# 'ab', then we also need to include 'a'.) So expand the
1556
# new_prefixes to include all shorter prefixes
1557
for prefix in list(new_prefixes):
1558
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1559
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1561
def _flush_new_queue(self):
1562
# No need to maintain the heap invariant anymore, just pull things out
1564
refs = set(self._new_queue)
1565
self._new_queue = []
1566
# First pass, flush all interesting items and convert to using direct refs
1567
all_old_chks = self._all_old_chks
1568
processed_new_refs = self._processed_new_refs
1569
all_old_items = self._all_old_items
1570
new_items = [item for item in self._new_item_queue
1571
if item not in all_old_items]
1572
self._new_item_queue = []
1574
yield None, new_items
1575
refs = refs.difference(all_old_chks)
1578
next_refs_update = next_refs.update
1579
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1580
# from 1m54s to 1m51s. Consider it.
1581
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1582
items = [item for item in items
1583
if item not in all_old_items]
1585
next_refs_update([p_r[1] for p_r in p_refs])
1586
next_refs = next_refs.difference(all_old_chks)
1587
next_refs = next_refs.difference(processed_new_refs)
1588
processed_new_refs.update(next_refs)
1591
def _process_next_old(self):
1592
# Since we don't filter uninteresting any further than during
1593
# _read_all_roots, process the whole queue in a single pass.
1594
refs = self._old_queue
1595
self._old_queue = []
1596
all_old_chks = self._all_old_chks
1597
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1598
self._all_old_items.update(items)
1599
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1600
self._old_queue.extend(refs)
1601
all_old_chks.update(refs)
1603
def _process_queues(self):
1604
while self._old_queue:
1605
self._process_next_old()
1606
return self._flush_new_queue()
1609
for record in self._read_all_roots():
1611
for record, items in self._process_queues():
1615
def iter_interesting_nodes(store, interesting_root_keys,
1616
uninteresting_root_keys, pb=None):
1617
"""Given root keys, find interesting nodes.
1619
Evaluate nodes referenced by interesting_root_keys. Ones that are also
1620
referenced from uninteresting_root_keys are not considered interesting.
1622
:param interesting_root_keys: keys which should be part of the
1623
"interesting" nodes (which will be yielded)
1624
:param uninteresting_root_keys: keys which should be filtered out of the
1627
(interesting record, {interesting key:values})
1629
iterator = CHKMapDifference(store, interesting_root_keys,
1630
uninteresting_root_keys,
1631
search_key_func=store._search_key_func,
1633
return iterator.process()
1637
from bzrlib._chk_map_pyx import (
1640
_deserialise_leaf_node,
1641
_deserialise_internal_node,
1644
from bzrlib._chk_map_py import (
1647
_deserialise_leaf_node,
1648
_deserialise_internal_node,
1650
search_key_registry.register('hash-16-way', _search_key_16)
1651
search_key_registry.register('hash-255-way', _search_key_255)