1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
19
Overview and current status:
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
are all an additional 8-bits wide leading to a sparse upper tree.
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
allow optimisation of the update operation; but individual map/unmap calls are
29
possible and supported. Individual changes via map/unmap are buffered in memory
30
until the _save method is called to force serialisation of the tree.
31
apply_delta records its changes immediately by performing an implicit _save.
36
Densely packed upper nodes.
42
from bzrlib import lazy_import
43
lazy_import.lazy_import(globals(), """
57
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
58
# out, it takes 3.1MB to cache the layer.
59
_PAGE_CACHE_SIZE = 4*1024*1024
60
# We are caching bytes so len(value) is perfectly accurate
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
66
# If a ChildNode falls below this many bytes, we check for a remap
67
_INTERESTING_NEW_SIZE = 50
68
# If a ChildNode shrinks by more than this amount, we check for a remap
69
_INTERESTING_SHRINKAGE_LIMIT = 20
70
# If we delete more than this many nodes applying a delta, we check for a remap
71
_INTERESTING_DELETES_LIMIT = 5
74
def _search_key_plain(key):
75
"""Map the key tuple into a search string that just uses the key bytes."""
76
return '\x00'.join(key)
79
search_key_registry = registry.Registry()
80
search_key_registry.register('plain', _search_key_plain)
84
"""A persistent map from string to string backed by a CHK store."""
86
__slots__ = ('_store', '_root_node', '_search_key_func')
88
def __init__(self, store, root_key, search_key_func=None):
89
"""Create a CHKMap object.
91
:param store: The store the CHKMap is stored in.
92
:param root_key: The root key of the map. None to create an empty
94
:param search_key_func: A function mapping a key => bytes. These bytes
95
are then used by the internal nodes to split up leaf nodes into
99
if search_key_func is None:
100
search_key_func = _search_key_plain
101
self._search_key_func = search_key_func
103
self._root_node = LeafNode(search_key_func=search_key_func)
105
self._root_node = self._node_key(root_key)
107
def apply_delta(self, delta):
108
"""Apply a delta to the map.
110
:param delta: An iterable of old_key, new_key, new_value tuples.
111
If new_key is not None, then new_key->new_value is inserted
112
into the map; if old_key is not None, then the old mapping
113
of old_key is removed.
116
# Check preconditions first.
117
new_items = set([key for (old, key, value) in delta if key is not None
119
existing_new = list(self.iteritems(key_filter=new_items))
121
raise errors.InconsistentDeltaDelta(delta,
122
"New items are already in the map %r." % existing_new)
124
for old, new, value in delta:
125
if old is not None and old != new:
126
self.unmap(old, check_remap=False)
128
for old, new, value in delta:
131
if delete_count > _INTERESTING_DELETES_LIMIT:
132
trace.mutter("checking remap as %d deletions", delete_count)
136
def _ensure_root(self):
137
"""Ensure that the root node is an object not a key."""
138
if type(self._root_node) is tuple:
139
# Demand-load the root
140
self._root_node = self._get_node(self._root_node)
142
def _get_node(self, node):
145
Note that this does not update the _items dict in objects containing a
146
reference to this node. As such it does not prevent subsequent IO being
149
:param node: A tuple key or node object.
150
:return: A node object.
152
if type(node) is tuple:
153
bytes = self._read_bytes(node)
154
return _deserialise(bytes, node,
155
search_key_func=self._search_key_func)
159
def _read_bytes(self, key):
161
return _page_cache[key]
163
stream = self._store.get_record_stream([key], 'unordered', True)
164
bytes = stream.next().get_bytes_as('fulltext')
165
_page_cache[key] = bytes
168
def _dump_tree(self, include_keys=False):
169
"""Return the tree in a string representation."""
171
res = self._dump_tree_node(self._root_node, prefix='', indent='',
172
include_keys=include_keys)
173
res.append('') # Give a trailing '\n'
174
return '\n'.join(res)
176
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
177
"""For this node and all children, generate a string representation."""
182
node_key = node.key()
183
if node_key is not None:
184
key_str = ' %s' % (node_key[0],)
187
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
189
if type(node) is InternalNode:
190
# Trigger all child nodes to get loaded
191
list(node._iter_nodes(self._store))
192
for prefix, sub in sorted(node._items.iteritems()):
193
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
194
include_keys=include_keys))
196
for key, value in sorted(node._items.iteritems()):
197
# Don't use prefix nor indent here to line up when used in
198
# tests in conjunction with assertEqualDiff
199
result.append(' %r %r' % (key, value))
203
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
204
search_key_func=None):
205
"""Create a CHKMap in store with initial_value as the content.
207
:param store: The store to record initial_value in, a VersionedFiles
208
object with 1-tuple keys supporting CHK key generation.
209
:param initial_value: A dict to store in store. Its keys and values
211
:param maximum_size: The maximum_size rule to apply to nodes. This
212
determines the size at which no new data is added to a single node.
213
:param key_width: The number of elements in each key_tuple being stored
215
:param search_key_func: A function mapping a key => bytes. These bytes
216
are then used by the internal nodes to split up leaf nodes into
218
:return: The root chk of the resulting CHKMap.
220
root_key = klass._create_directly(store, initial_value,
221
maximum_size=maximum_size, key_width=key_width,
222
search_key_func=search_key_func)
226
def _create_via_map(klass, store, initial_value, maximum_size=0,
227
key_width=1, search_key_func=None):
228
result = klass(store, None, search_key_func=search_key_func)
229
result._root_node.set_maximum_size(maximum_size)
230
result._root_node._key_width = key_width
232
for key, value in initial_value.items():
233
delta.append((None, key, value))
234
root_key = result.apply_delta(delta)
238
def _create_directly(klass, store, initial_value, maximum_size=0,
239
key_width=1, search_key_func=None):
240
node = LeafNode(search_key_func=search_key_func)
241
node.set_maximum_size(maximum_size)
242
node._key_width = key_width
243
node._items = dict(initial_value)
244
node._raw_size = sum([node._key_value_len(key, value)
245
for key,value in initial_value.iteritems()])
246
node._len = len(node._items)
247
node._compute_search_prefix()
248
node._compute_serialised_prefix()
251
and node._current_size() > maximum_size):
252
prefix, node_details = node._split(store)
253
if len(node_details) == 1:
254
raise AssertionError('Failed to split using node._split')
255
node = InternalNode(prefix, search_key_func=search_key_func)
256
node.set_maximum_size(maximum_size)
257
node._key_width = key_width
258
for split, subnode in node_details:
259
node.add_node(split, subnode)
260
keys = list(node.serialise(store))
263
def iter_changes(self, basis):
264
"""Iterate over the changes between basis and self.
266
:return: An iterator of tuples: (key, old_value, new_value). Old_value
267
is None for keys only in self; new_value is None for keys only in
271
# Read both trees in lexographic, highest-first order.
272
# Any identical nodes we skip
273
# Any unique prefixes we output immediately.
274
# values in a leaf node are treated as single-value nodes in the tree
275
# which allows them to be not-special-cased. We know to output them
276
# because their value is a string, not a key(tuple) or node.
278
# corner cases to beware of when considering this function:
279
# *) common references are at different heights.
280
# consider two trees:
281
# {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
282
# {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
283
# 'ab':LeafNode={'ab':'bar'}}
284
# 'b': LeafNode={'b'}}
285
# the node with aaa/aab will only be encountered in the second tree
286
# after reading the 'a' subtree, but it is encountered in the first
287
# tree immediately. Variations on this may have read internal nodes
288
# like this. we want to cut the entire pending subtree when we
289
# realise we have a common node. For this we use a list of keys -
290
# the path to a node - and check the entire path is clean as we
292
if self._node_key(self._root_node) == self._node_key(basis._root_node):
296
excluded_keys = set()
297
self_node = self._root_node
298
basis_node = basis._root_node
299
# A heap, each element is prefix, node(tuple/NodeObject/string),
300
# key_path (a list of tuples, tail-sharing down the tree.)
303
def process_node(node, path, a_map, pending):
304
# take a node and expand it
305
node = a_map._get_node(node)
306
if type(node) == LeafNode:
307
path = (node._key, path)
308
for key, value in node._items.items():
309
# For a LeafNode, the key is a serialized_key, rather than
310
# a search_key, but the heap is using search_keys
311
search_key = node._search_key_func(key)
312
heapq.heappush(pending, (search_key, key, value, path))
314
# type(node) == InternalNode
315
path = (node._key, path)
316
for prefix, child in node._items.items():
317
heapq.heappush(pending, (prefix, None, child, path))
318
def process_common_internal_nodes(self_node, basis_node):
319
self_items = set(self_node._items.items())
320
basis_items = set(basis_node._items.items())
321
path = (self_node._key, None)
322
for prefix, child in self_items - basis_items:
323
heapq.heappush(self_pending, (prefix, None, child, path))
324
path = (basis_node._key, None)
325
for prefix, child in basis_items - self_items:
326
heapq.heappush(basis_pending, (prefix, None, child, path))
327
def process_common_leaf_nodes(self_node, basis_node):
328
self_items = set(self_node._items.items())
329
basis_items = set(basis_node._items.items())
330
path = (self_node._key, None)
331
for key, value in self_items - basis_items:
332
prefix = self._search_key_func(key)
333
heapq.heappush(self_pending, (prefix, key, value, path))
334
path = (basis_node._key, None)
335
for key, value in basis_items - self_items:
336
prefix = basis._search_key_func(key)
337
heapq.heappush(basis_pending, (prefix, key, value, path))
338
def process_common_prefix_nodes(self_node, self_path,
339
basis_node, basis_path):
340
# Would it be more efficient if we could request both at the same
342
self_node = self._get_node(self_node)
343
basis_node = basis._get_node(basis_node)
344
if (type(self_node) == InternalNode
345
and type(basis_node) == InternalNode):
346
# Matching internal nodes
347
process_common_internal_nodes(self_node, basis_node)
348
elif (type(self_node) == LeafNode
349
and type(basis_node) == LeafNode):
350
process_common_leaf_nodes(self_node, basis_node)
352
process_node(self_node, self_path, self, self_pending)
353
process_node(basis_node, basis_path, basis, basis_pending)
354
process_common_prefix_nodes(self_node, None, basis_node, None)
357
excluded_keys = set()
358
def check_excluded(key_path):
359
# Note that this is N^2, it depends on us trimming trees
360
# aggressively to not become slow.
361
# A better implementation would probably have a reverse map
362
# back to the children of a node, and jump straight to it when
363
# a common node is detected, the proceed to remove the already
364
# pending children. bzrlib.graph has a searcher module with a
366
while key_path is not None:
367
key, key_path = key_path
368
if key in excluded_keys:
373
while self_pending or basis_pending:
376
# self is exhausted: output remainder of basis
377
for prefix, key, node, path in basis_pending:
378
if check_excluded(path):
380
node = basis._get_node(node)
383
yield (key, node, None)
385
# subtree - fastpath the entire thing.
386
for key, value in node.iteritems(basis._store):
387
yield (key, value, None)
389
elif not basis_pending:
390
# basis is exhausted: output remainder of self.
391
for prefix, key, node, path in self_pending:
392
if check_excluded(path):
394
node = self._get_node(node)
397
yield (key, None, node)
399
# subtree - fastpath the entire thing.
400
for key, value in node.iteritems(self._store):
401
yield (key, None, value)
404
# XXX: future optimisation - yield the smaller items
405
# immediately rather than pushing everything on/off the
406
# heaps. Applies to both internal nodes and leafnodes.
407
if self_pending[0][0] < basis_pending[0][0]:
409
prefix, key, node, path = heapq.heappop(self_pending)
410
if check_excluded(path):
414
yield (key, None, node)
416
process_node(node, path, self, self_pending)
418
elif self_pending[0][0] > basis_pending[0][0]:
420
prefix, key, node, path = heapq.heappop(basis_pending)
421
if check_excluded(path):
425
yield (key, node, None)
427
process_node(node, path, basis, basis_pending)
430
# common prefix: possibly expand both
431
if self_pending[0][1] is None:
436
if basis_pending[0][1] is None:
441
if not read_self and not read_basis:
442
# compare a common value
443
self_details = heapq.heappop(self_pending)
444
basis_details = heapq.heappop(basis_pending)
445
if self_details[2] != basis_details[2]:
446
yield (self_details[1],
447
basis_details[2], self_details[2])
449
# At least one side wasn't a simple value
450
if (self._node_key(self_pending[0][2]) ==
451
self._node_key(basis_pending[0][2])):
452
# Identical pointers, skip (and don't bother adding to
453
# excluded, it won't turn up again.
454
heapq.heappop(self_pending)
455
heapq.heappop(basis_pending)
457
# Now we need to expand this node before we can continue
458
if read_self and read_basis:
459
# Both sides start with the same prefix, so process
461
self_prefix, _, self_node, self_path = heapq.heappop(
463
basis_prefix, _, basis_node, basis_path = heapq.heappop(
465
if self_prefix != basis_prefix:
466
raise AssertionError(
467
'%r != %r' % (self_prefix, basis_prefix))
468
process_common_prefix_nodes(
469
self_node, self_path,
470
basis_node, basis_path)
473
prefix, key, node, path = heapq.heappop(self_pending)
474
if check_excluded(path):
476
process_node(node, path, self, self_pending)
478
prefix, key, node, path = heapq.heappop(basis_pending)
479
if check_excluded(path):
481
process_node(node, path, basis, basis_pending)
484
def iteritems(self, key_filter=None):
485
"""Iterate over the entire CHKMap's contents."""
487
return self._root_node.iteritems(self._store, key_filter=key_filter)
490
"""Return the key for this map."""
491
if type(self._root_node) is tuple:
492
return self._root_node
494
return self._root_node._key
498
return len(self._root_node)
500
def map(self, key, value):
501
"""Map a key tuple to value.
503
:param key: A key to map.
504
:param value: The value to assign to key.
506
# Need a root object.
508
prefix, node_details = self._root_node.map(self._store, key, value)
509
if len(node_details) == 1:
510
self._root_node = node_details[0][1]
512
self._root_node = InternalNode(prefix,
513
search_key_func=self._search_key_func)
514
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
515
self._root_node._key_width = node_details[0][1]._key_width
516
for split, node in node_details:
517
self._root_node.add_node(split, node)
519
def _node_key(self, node):
520
"""Get the key for a node whether it's a tuple or node."""
521
if type(node) is tuple:
526
def unmap(self, key, check_remap=True):
527
"""remove key from the map."""
529
if type(self._root_node) is InternalNode:
530
unmapped = self._root_node.unmap(self._store, key,
531
check_remap=check_remap)
533
unmapped = self._root_node.unmap(self._store, key)
534
self._root_node = unmapped
536
def _check_remap(self):
537
"""Check if nodes can be collapsed."""
539
if type(self._root_node) is InternalNode:
540
self._root_node._check_remap(self._store)
543
"""Save the map completely.
545
:return: The key of the root node.
547
if type(self._root_node) is tuple:
549
return self._root_node
550
keys = list(self._root_node.serialise(self._store))
555
"""Base class defining the protocol for CHK Map nodes.
557
:ivar _raw_size: The total size of the serialized key:value data, before
558
adding the header bytes, and without prefix compression.
561
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
562
'_raw_size', '_items', '_search_prefix', '_search_key_func'
565
def __init__(self, key_width=1):
568
:param key_width: The width of keys for this node.
571
# Current number of elements
573
self._maximum_size = 0
574
self._key_width = key_width
575
# current size in bytes
577
# The pointers/values this node has - meaning defined by child classes.
579
# The common search prefix
580
self._search_prefix = None
583
items_str = str(sorted(self._items))
584
if len(items_str) > 20:
585
items_str = items_str[:16] + '...]'
586
return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
587
self.__class__.__name__, self._key, self._len, self._raw_size,
588
self._maximum_size, self._search_prefix, items_str)
597
def maximum_size(self):
598
"""What is the upper limit for adding references to a node."""
599
return self._maximum_size
601
def set_maximum_size(self, new_size):
602
"""Set the size threshold for nodes.
604
:param new_size: The size at which no data is added to a node. 0 for
607
self._maximum_size = new_size
610
def common_prefix(cls, prefix, key):
611
"""Given 2 strings, return the longest prefix common to both.
613
:param prefix: This has been the common prefix for other keys, so it is
614
more likely to be the common prefix in this case as well.
615
:param key: Another string to compare to
617
if key.startswith(prefix):
620
# Is there a better way to do this?
621
for pos, (left, right) in enumerate(zip(prefix, key)):
625
common = prefix[:pos+1]
629
def common_prefix_for_keys(cls, keys):
630
"""Given a list of keys, find their common prefix.
632
:param keys: An iterable of strings.
633
:return: The longest common prefix of all keys.
637
if common_prefix is None:
640
common_prefix = cls.common_prefix(common_prefix, key)
641
if not common_prefix:
642
# if common_prefix is the empty string, then we know it won't
648
# Singleton indicating we have not computed _search_prefix yet
651
class LeafNode(Node):
652
"""A node containing actual key:value pairs.
654
:ivar _items: A dict of key->value items. The key is in tuple form.
655
:ivar _size: The number of bytes that would be used by serializing all of
659
__slots__ = ('_common_serialised_prefix', '_serialise_key')
661
def __init__(self, search_key_func=None):
663
# All of the keys in this leaf node share this common prefix
664
self._common_serialised_prefix = None
665
self._serialise_key = '\x00'.join
666
if search_key_func is None:
667
self._search_key_func = _search_key_plain
669
self._search_key_func = search_key_func
672
items_str = str(sorted(self._items))
673
if len(items_str) > 20:
674
items_str = items_str[:16] + '...]'
676
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
677
% (self.__class__.__name__, self._key, self._len, self._raw_size,
678
self._maximum_size, self._search_prefix, self._key_width, items_str)
680
def _current_size(self):
681
"""Answer the current serialised size of this node.
683
This differs from self._raw_size in that it includes the bytes used for
686
if self._common_serialised_prefix is None:
690
# We will store a single string with the common prefix
691
# And then that common prefix will not be stored in any of the
693
prefix_len = len(self._common_serialised_prefix)
694
bytes_for_items = (self._raw_size - (prefix_len * self._len))
695
return (9 # 'chkleaf:\n'
696
+ len(str(self._maximum_size)) + 1
697
+ len(str(self._key_width)) + 1
698
+ len(str(self._len)) + 1
703
def deserialise(klass, bytes, key, search_key_func=None):
704
"""Deserialise bytes, with key key, into a LeafNode.
706
:param bytes: The bytes of the node.
707
:param key: The key that the serialised node has.
709
return _deserialise_leaf_node(bytes, key,
710
search_key_func=search_key_func)
712
def iteritems(self, store, key_filter=None):
713
"""Iterate over items in the node.
715
:param key_filter: A filter to apply to the node. It should be a
716
list/set/dict or similar repeatedly iterable container.
718
if key_filter is not None:
719
# Adjust the filter - short elements go to a prefix filter. All
720
# other items are looked up directly.
721
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
723
for key in key_filter:
724
if len(key) == self._key_width:
725
# This filter is meant to match exactly one key, yield it
728
yield key, self._items[key]
730
# This key is not present in this map, continue
733
# Short items, we need to match based on a prefix
734
length_filter = filters.setdefault(len(key), set())
735
length_filter.add(key)
737
filters = filters.items()
738
for item in self._items.iteritems():
739
for length, length_filter in filters:
740
if item[0][:length] in length_filter:
744
for item in self._items.iteritems():
747
def _key_value_len(self, key, value):
748
# TODO: Should probably be done without actually joining the key, but
749
# then that can be done via the C extension
750
return (len(self._serialise_key(key)) + 1
751
+ len(str(value.count('\n'))) + 1
754
def _search_key(self, key):
755
return self._search_key_func(key)
757
def _map_no_split(self, key, value):
758
"""Map a key to a value.
760
This assumes either the key does not already exist, or you have already
761
removed its size and length from self.
763
:return: True if adding this node should cause us to split.
765
self._items[key] = value
766
self._raw_size += self._key_value_len(key, value)
768
serialised_key = self._serialise_key(key)
769
if self._common_serialised_prefix is None:
770
self._common_serialised_prefix = serialised_key
772
self._common_serialised_prefix = self.common_prefix(
773
self._common_serialised_prefix, serialised_key)
774
search_key = self._search_key(key)
775
if self._search_prefix is _unknown:
776
self._compute_search_prefix()
777
if self._search_prefix is None:
778
self._search_prefix = search_key
780
self._search_prefix = self.common_prefix(
781
self._search_prefix, search_key)
783
and self._maximum_size
784
and self._current_size() > self._maximum_size):
785
# Check to see if all of the search_keys for this node are
786
# identical. We allow the node to grow under that circumstance
787
# (we could track this as common state, but it is infrequent)
788
if (search_key != self._search_prefix
789
or not self._are_search_keys_identical()):
793
def _split(self, store):
794
"""We have overflowed.
796
Split this node into multiple LeafNodes, return it up the stack so that
797
the next layer creates a new InternalNode and references the new nodes.
799
:return: (common_serialised_prefix, [(node_serialised_prefix, node)])
801
if self._search_prefix is _unknown:
802
raise AssertionError('Search prefix must be known')
803
common_prefix = self._search_prefix
804
split_at = len(common_prefix) + 1
806
for key, value in self._items.iteritems():
807
search_key = self._search_key(key)
808
prefix = search_key[:split_at]
809
# TODO: Generally only 1 key can be exactly the right length,
810
# which means we can only have 1 key in the node pointed
811
# at by the 'prefix\0' key. We might want to consider
812
# folding it into the containing InternalNode rather than
813
# having a fixed length-1 node.
814
# Note this is probably not true for hash keys, as they
815
# may get a '\00' node anywhere, but won't have keys of
817
if len(prefix) < split_at:
818
prefix += '\x00'*(split_at - len(prefix))
819
if prefix not in result:
820
node = LeafNode(search_key_func=self._search_key_func)
821
node.set_maximum_size(self._maximum_size)
822
node._key_width = self._key_width
823
result[prefix] = node
825
node = result[prefix]
826
sub_prefix, node_details = node.map(store, key, value)
827
if len(node_details) > 1:
828
if prefix != sub_prefix:
829
# This node has been split and is now found via a different
832
new_node = InternalNode(sub_prefix,
833
search_key_func=self._search_key_func)
834
new_node.set_maximum_size(self._maximum_size)
835
new_node._key_width = self._key_width
836
for split, node in node_details:
837
new_node.add_node(split, node)
838
result[prefix] = new_node
839
return common_prefix, result.items()
841
def map(self, store, key, value):
842
"""Map key to value."""
843
if key in self._items:
844
self._raw_size -= self._key_value_len(key, self._items[key])
847
if self._map_no_split(key, value):
848
return self._split(store)
850
if self._search_prefix is _unknown:
851
raise AssertionError('%r must be known' % self._search_prefix)
852
return self._search_prefix, [("", self)]
854
def serialise(self, store):
855
"""Serialise the LeafNode to store.
857
:param store: A VersionedFiles honouring the CHK extensions.
858
:return: An iterable of the keys inserted by this operation.
860
lines = ["chkleaf:\n"]
861
lines.append("%d\n" % self._maximum_size)
862
lines.append("%d\n" % self._key_width)
863
lines.append("%d\n" % self._len)
864
if self._common_serialised_prefix is None:
866
if len(self._items) != 0:
867
raise AssertionError('If _common_serialised_prefix is None'
868
' we should have no items')
870
lines.append('%s\n' % (self._common_serialised_prefix,))
871
prefix_len = len(self._common_serialised_prefix)
872
for key, value in sorted(self._items.items()):
873
# Always add a final newline
874
value_lines = osutils.chunks_to_lines([value + '\n'])
875
serialized = "%s\x00%s\n" % (self._serialise_key(key),
877
if not serialized.startswith(self._common_serialised_prefix):
878
raise AssertionError('We thought the common prefix was %r'
879
' but entry %r does not have it in common'
880
% (self._common_serialised_prefix, serialized))
881
lines.append(serialized[prefix_len:])
882
lines.extend(value_lines)
883
sha1, _, _ = store.add_lines((None,), (), lines)
884
self._key = ("sha1:" + sha1,)
885
bytes = ''.join(lines)
886
if len(bytes) != self._current_size():
887
raise AssertionError('Invalid _current_size')
888
_page_cache.add(self._key, bytes)
892
"""Return the references to other CHK's held by this node."""
895
def _compute_search_prefix(self):
896
"""Determine the common search prefix for all keys in this node.
898
:return: A bytestring of the longest search key prefix that is
899
unique within this node.
901
search_keys = [self._search_key_func(key) for key in self._items]
902
self._search_prefix = self.common_prefix_for_keys(search_keys)
903
return self._search_prefix
905
def _are_search_keys_identical(self):
906
"""Check to see if the search keys for all entries are the same.
908
When using a hash as the search_key it is possible for non-identical
909
keys to collide. If that happens enough, we may try overflow a
910
LeafNode, but as all are collisions, we must not split.
912
common_search_key = None
913
for key in self._items:
914
search_key = self._search_key(key)
915
if common_search_key is None:
916
common_search_key = search_key
917
elif search_key != common_search_key:
921
def _compute_serialised_prefix(self):
922
"""Determine the common prefix for serialised keys in this node.
924
:return: A bytestring of the longest serialised key prefix that is
925
unique within this node.
927
serialised_keys = [self._serialise_key(key) for key in self._items]
928
self._common_serialised_prefix = self.common_prefix_for_keys(
930
return self._common_serialised_prefix
932
def unmap(self, store, key):
933
"""Unmap key from the node."""
935
self._raw_size -= self._key_value_len(key, self._items[key])
937
trace.mutter("key %s not found in %r", key, self._items)
942
# Recompute from scratch
943
self._compute_search_prefix()
944
self._compute_serialised_prefix()
948
class InternalNode(Node):
949
"""A node that contains references to other nodes.
951
An InternalNode is responsible for mapping search key prefixes to child
954
:ivar _items: serialised_key => node dictionary. node may be a tuple,
955
LeafNode or InternalNode.
958
__slots__ = ('_node_width',)
960
def __init__(self, prefix='', search_key_func=None):
962
# The size of an internalnode with default values and no children.
963
# How many octets key prefixes within this node are.
965
self._search_prefix = prefix
966
if search_key_func is None:
967
self._search_key_func = _search_key_plain
969
self._search_key_func = search_key_func
971
def add_node(self, prefix, node):
972
"""Add a child node with prefix prefix, and node node.
974
:param prefix: The search key prefix for node.
975
:param node: The node being added.
977
if self._search_prefix is None:
978
raise AssertionError("_search_prefix should not be None")
979
if not prefix.startswith(self._search_prefix):
980
raise AssertionError("prefixes mismatch: %s must start with %s"
981
% (prefix,self._search_prefix))
982
if len(prefix) != len(self._search_prefix) + 1:
983
raise AssertionError("prefix wrong length: len(%s) is not %d" %
984
(prefix, len(self._search_prefix) + 1))
985
self._len += len(node)
986
if not len(self._items):
987
self._node_width = len(prefix)
988
if self._node_width != len(self._search_prefix) + 1:
989
raise AssertionError("node width mismatch: %d is not %d" %
990
(self._node_width, len(self._search_prefix) + 1))
991
self._items[prefix] = node
994
def _current_size(self):
995
"""Answer the current serialised size of this node."""
996
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
997
len(str(self._maximum_size)))
1000
def deserialise(klass, bytes, key, search_key_func=None):
1001
"""Deserialise bytes to an InternalNode, with key key.
1003
:param bytes: The bytes of the node.
1004
:param key: The key that the serialised node has.
1005
:return: An InternalNode instance.
1007
return _deserialise_internal_node(bytes, key,
1008
search_key_func=search_key_func)
1010
def iteritems(self, store, key_filter=None):
1011
for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1012
for item in node.iteritems(store, key_filter=node_filter):
1015
def _iter_nodes(self, store, key_filter=None, batch_size=None):
1016
"""Iterate over node objects which match key_filter.
1018
:param store: A store to use for accessing content.
1019
:param key_filter: A key filter to filter nodes. Only nodes that might
1020
contain a key in key_filter will be returned.
1021
:param batch_size: If not None, then we will return the nodes that had
1022
to be read using get_record_stream in batches, rather than reading
1024
:return: An iterable of nodes. This function does not have to be fully
1025
consumed. (There will be no pending I/O when items are being returned.)
1027
# Map from chk key ('sha1:...',) to (prefix, key_filter)
1028
# prefix is the key in self._items to use, key_filter is the key_filter
1029
# entries that would match this node
1032
if key_filter is None:
1033
# yielding all nodes, yield whatever we have, and queue up a read
1034
# for whatever we are missing
1036
for prefix, node in self._items.iteritems():
1037
if node.__class__ is tuple:
1038
keys[node] = (prefix, None)
1041
elif len(key_filter) == 1:
1042
# Technically, this path could also be handled by the first check
1043
# in 'self._node_width' in length_filters. However, we can handle
1044
# this case without spending any time building up the
1045
# prefix_to_keys, etc state.
1047
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1048
# 0.626us list(key_filter)[0]
1049
# is a func() for list(), 2 mallocs, and a getitem
1050
# 0.489us [k for k in key_filter][0]
1051
# still has the mallocs, avoids the func() call
1052
# 0.350us iter(key_filter).next()
1053
# has a func() call, and mallocs an iterator
1054
# 0.125us for key in key_filter: pass
1055
# no func() overhead, might malloc an iterator
1056
# 0.105us for key in key_filter: break
1057
# no func() overhead, might malloc an iterator, probably
1058
# avoids checking an 'else' clause as part of the for
1059
for key in key_filter:
1061
search_prefix = self._search_prefix_filter(key)
1062
if len(search_prefix) == self._node_width:
1063
# This item will match exactly, so just do a dict lookup, and
1064
# see what we can return
1067
node = self._items[search_prefix]
1069
# A given key can only match 1 child node, if it isn't
1070
# there, then we can just return nothing
1072
if node.__class__ is tuple:
1073
keys[node] = (search_prefix, [key])
1075
# This is loaded, and the only thing that can match,
1080
# First, convert all keys into a list of search prefixes
1081
# Aggregate common prefixes, and track the keys they come from
1084
for key in key_filter:
1085
search_prefix = self._search_prefix_filter(key)
1086
length_filter = length_filters.setdefault(
1087
len(search_prefix), set())
1088
length_filter.add(search_prefix)
1089
prefix_to_keys.setdefault(search_prefix, []).append(key)
1091
if (self._node_width in length_filters
1092
and len(length_filters) == 1):
1093
# all of the search prefixes match exactly _node_width. This
1094
# means that everything is an exact match, and we can do a
1095
# lookup into self._items, rather than iterating over the items
1097
search_prefixes = length_filters[self._node_width]
1098
for search_prefix in search_prefixes:
1100
node = self._items[search_prefix]
1102
# We can ignore this one
1104
node_key_filter = prefix_to_keys[search_prefix]
1105
if node.__class__ is tuple:
1106
keys[node] = (search_prefix, node_key_filter)
1108
yield node, node_key_filter
1110
# The slow way. We walk every item in self._items, and check to
1111
# see if there are any matches
1112
length_filters = length_filters.items()
1113
for prefix, node in self._items.iteritems():
1114
node_key_filter = []
1115
for length, length_filter in length_filters:
1116
sub_prefix = prefix[:length]
1117
if sub_prefix in length_filter:
1118
node_key_filter.extend(prefix_to_keys[sub_prefix])
1119
if node_key_filter: # this key matched something, yield it
1120
if node.__class__ is tuple:
1121
keys[node] = (prefix, node_key_filter)
1123
yield node, node_key_filter
1125
# Look in the page cache for some more bytes
1129
bytes = _page_cache[key]
1133
node = _deserialise(bytes, key,
1134
search_key_func=self._search_key_func)
1135
prefix, node_key_filter = keys[key]
1136
self._items[prefix] = node
1138
yield node, node_key_filter
1139
for key in found_keys:
1142
# demand load some pages.
1143
if batch_size is None:
1144
# Read all the keys in
1145
batch_size = len(keys)
1146
key_order = list(keys)
1147
for batch_start in range(0, len(key_order), batch_size):
1148
batch = key_order[batch_start:batch_start + batch_size]
1149
# We have to fully consume the stream so there is no pending
1150
# I/O, so we buffer the nodes for now.
1151
stream = store.get_record_stream(batch, 'unordered', True)
1152
node_and_filters = []
1153
for record in stream:
1154
bytes = record.get_bytes_as('fulltext')
1155
node = _deserialise(bytes, record.key,
1156
search_key_func=self._search_key_func)
1157
prefix, node_key_filter = keys[record.key]
1158
node_and_filters.append((node, node_key_filter))
1159
self._items[prefix] = node
1160
_page_cache.add(record.key, bytes)
1161
for info in node_and_filters:
1164
def map(self, store, key, value):
1165
"""Map key to value."""
1166
if not len(self._items):
1167
raise AssertionError("can't map in an empty InternalNode.")
1168
search_key = self._search_key(key)
1169
if self._node_width != len(self._search_prefix) + 1:
1170
raise AssertionError("node width mismatch: %d is not %d" %
1171
(self._node_width, len(self._search_prefix) + 1))
1172
if not search_key.startswith(self._search_prefix):
1173
# This key doesn't fit in this index, so we need to split at the
1174
# point where it would fit, insert self into that internal node,
1175
# and then map this key into that node.
1176
new_prefix = self.common_prefix(self._search_prefix,
1178
new_parent = InternalNode(new_prefix,
1179
search_key_func=self._search_key_func)
1180
new_parent.set_maximum_size(self._maximum_size)
1181
new_parent._key_width = self._key_width
1182
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1184
return new_parent.map(store, key, value)
1185
children = [node for node, _
1186
in self._iter_nodes(store, key_filter=[key])]
1191
child = self._new_child(search_key, LeafNode)
1192
old_len = len(child)
1193
if type(child) is LeafNode:
1194
old_size = child._current_size()
1197
prefix, node_details = child.map(store, key, value)
1198
if len(node_details) == 1:
1199
# child may have shrunk, or might be a new node
1200
child = node_details[0][1]
1201
self._len = self._len - old_len + len(child)
1202
self._items[search_key] = child
1205
if type(child) is LeafNode:
1206
if old_size is None:
1207
# The old node was an InternalNode which means it has now
1208
# collapsed, so we need to check if it will chain to a
1209
# collapse at this level.
1210
trace.mutter("checking remap as InternalNode -> LeafNode")
1211
new_node = self._check_remap(store)
1213
# If the LeafNode has shrunk in size, we may want to run
1214
# a remap check. Checking for a remap is expensive though
1215
# and the frequency of a successful remap is very low.
1216
# Shrinkage by small amounts is common, so we only do the
1217
# remap check if the new_size is low or the shrinkage
1218
# amount is over a configurable limit.
1219
new_size = child._current_size()
1220
shrinkage = old_size - new_size
1221
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1222
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1224
"checking remap as size shrunk by %d to be %d",
1225
shrinkage, new_size)
1226
new_node = self._check_remap(store)
1227
if new_node._search_prefix is None:
1228
raise AssertionError("_search_prefix should not be None")
1229
return new_node._search_prefix, [('', new_node)]
1230
# child has overflown - create a new intermediate node.
1231
# XXX: This is where we might want to try and expand our depth
1232
# to refer to more bytes of every child (which would give us
1233
# multiple pointers to child nodes, but less intermediate nodes)
1234
child = self._new_child(search_key, InternalNode)
1235
child._search_prefix = prefix
1236
for split, node in node_details:
1237
child.add_node(split, node)
1238
self._len = self._len - old_len + len(child)
1240
return self._search_prefix, [("", self)]
1242
def _new_child(self, search_key, klass):
1243
"""Create a new child node of type klass."""
1245
child.set_maximum_size(self._maximum_size)
1246
child._key_width = self._key_width
1247
child._search_key_func = self._search_key_func
1248
self._items[search_key] = child
1251
def serialise(self, store):
1252
"""Serialise the node to store.
1254
:param store: A VersionedFiles honouring the CHK extensions.
1255
:return: An iterable of the keys inserted by this operation.
1257
for node in self._items.itervalues():
1258
if type(node) is tuple:
1259
# Never deserialised.
1261
if node._key is not None:
1264
for key in node.serialise(store):
1266
lines = ["chknode:\n"]
1267
lines.append("%d\n" % self._maximum_size)
1268
lines.append("%d\n" % self._key_width)
1269
lines.append("%d\n" % self._len)
1270
if self._search_prefix is None:
1271
raise AssertionError("_search_prefix should not be None")
1272
lines.append('%s\n' % (self._search_prefix,))
1273
prefix_len = len(self._search_prefix)
1274
for prefix, node in sorted(self._items.items()):
1275
if type(node) is tuple:
1279
serialised = "%s\x00%s\n" % (prefix, key)
1280
if not serialised.startswith(self._search_prefix):
1281
raise AssertionError("prefixes mismatch: %s must start with %s"
1282
% (serialised, self._search_prefix))
1283
lines.append(serialised[prefix_len:])
1284
sha1, _, _ = store.add_lines((None,), (), lines)
1285
self._key = ("sha1:" + sha1,)
1286
_page_cache.add(self._key, ''.join(lines))
1289
def _search_key(self, key):
1290
"""Return the serialised key for key in this node."""
1291
# search keys are fixed width. All will be self._node_width wide, so we
1293
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1295
def _search_prefix_filter(self, key):
1296
"""Serialise key for use as a prefix filter in iteritems."""
1297
return self._search_key_func(key)[:self._node_width]
1299
def _split(self, offset):
1300
"""Split this node into smaller nodes starting at offset.
1302
:param offset: The offset to start the new child nodes at.
1303
:return: An iterable of (prefix, node) tuples. prefix is a byte
1304
prefix for reaching node.
1306
if offset >= self._node_width:
1307
for node in self._items.values():
1308
for result in node._split(offset):
1311
for key, node in self._items.items():
1315
"""Return the references to other CHK's held by this node."""
1316
if self._key is None:
1317
raise AssertionError("unserialised nodes have no refs.")
1319
for value in self._items.itervalues():
1320
if type(value) is tuple:
1323
refs.append(value.key())
1326
def _compute_search_prefix(self, extra_key=None):
1327
"""Return the unique key prefix for this node.
1329
:return: A bytestring of the longest search key prefix that is
1330
unique within this node.
1332
self._search_prefix = self.common_prefix_for_keys(self._items)
1333
return self._search_prefix
1335
def unmap(self, store, key, check_remap=True):
1336
"""Remove key from this node and it's children."""
1337
if not len(self._items):
1338
raise AssertionError("can't unmap in an empty InternalNode.")
1339
children = [node for node, _
1340
in self._iter_nodes(store, key_filter=[key])]
1346
unmapped = child.unmap(store, key)
1348
search_key = self._search_key(key)
1349
if len(unmapped) == 0:
1350
# All child nodes are gone, remove the child:
1351
del self._items[search_key]
1354
# Stash the returned node
1355
self._items[search_key] = unmapped
1356
if len(self._items) == 1:
1357
# this node is no longer needed:
1358
return self._items.values()[0]
1359
if type(unmapped) is InternalNode:
1362
return self._check_remap(store)
1366
def _check_remap(self, store):
1367
"""Check if all keys contained by children fit in a single LeafNode.
1369
:param store: A store to use for reading more nodes
1370
:return: Either self, or a new LeafNode which should replace self.
1372
# Logic for how we determine when we need to rebuild
1373
# 1) Implicitly unmap() is removing a key which means that the child
1374
# nodes are going to be shrinking by some extent.
1375
# 2) If all children are LeafNodes, it is possible that they could be
1376
# combined into a single LeafNode, which can then completely replace
1377
# this internal node with a single LeafNode
1378
# 3) If *one* child is an InternalNode, we assume it has already done
1379
# all the work to determine that its children cannot collapse, and
1380
# we can then assume that those nodes *plus* the current nodes don't
1381
# have a chance of collapsing either.
1382
# So a very cheap check is to just say if 'unmapped' is an
1383
# InternalNode, we don't have to check further.
1385
# TODO: Another alternative is to check the total size of all known
1386
# LeafNodes. If there is some formula we can use to determine the
1387
# final size without actually having to read in any more
1388
# children, it would be nice to have. However, we have to be
1389
# careful with stuff like nodes that pull out the common prefix
1390
# of each key, as adding a new key can change the common prefix
1391
# and cause size changes greater than the length of one key.
1392
# So for now, we just add everything to a new Leaf until it
1393
# splits, as we know that will give the right answer
1394
new_leaf = LeafNode(search_key_func=self._search_key_func)
1395
new_leaf.set_maximum_size(self._maximum_size)
1396
new_leaf._key_width = self._key_width
1397
# A batch_size of 16 was chosen because:
1398
# a) In testing, a 4k page held 14 times. So if we have more than 16
1399
# leaf nodes we are unlikely to hold them in a single new leaf
1400
# node. This still allows for 1 round trip
1401
# b) With 16-way fan out, we can still do a single round trip
1402
# c) With 255-way fan out, we don't want to read all 255 and destroy
1403
# the page cache, just to determine that we really don't need it.
1404
for node, _ in self._iter_nodes(store, batch_size=16):
1405
if type(node) is InternalNode:
1406
# Without looking at any leaf nodes, we are sure
1408
for key, value in node._items.iteritems():
1409
if new_leaf._map_no_split(key, value):
1411
trace.mutter("remap generated a new LeafNode")
1415
def _deserialise(bytes, key, search_key_func):
1416
"""Helper for repositorydetails - convert bytes to a node."""
1417
if bytes.startswith("chkleaf:\n"):
1418
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1419
elif bytes.startswith("chknode:\n"):
1420
node = InternalNode.deserialise(bytes, key,
1421
search_key_func=search_key_func)
1423
raise AssertionError("Unknown node type.")
1427
class CHKMapDifference(object):
1428
"""Iterate the stored pages and key,value pairs for (new - old).
1430
This class provides a generator over the stored CHK pages and the
1431
(key, value) pairs that are in any of the new maps and not in any of the
1434
Note that it may yield chk pages that are common (especially root nodes),
1435
but it won't yield (key,value) pairs that are common.
1438
def __init__(self, store, new_root_keys, old_root_keys,
1439
search_key_func, pb=None):
1441
self._new_root_keys = new_root_keys
1442
self._old_root_keys = old_root_keys
1444
# All uninteresting chks that we have seen. By the time they are added
1445
# here, they should be either fully ignored, or queued up for
1447
self._all_old_chks = set(self._old_root_keys)
1448
# All items that we have seen from the old_root_keys
1449
self._all_old_items = set()
1450
# These are interesting items which were either read, or already in the
1451
# interesting queue (so we don't need to walk them again)
1452
self._processed_new_refs = set()
1453
self._search_key_func = search_key_func
1455
# The uninteresting and interesting nodes to be searched
1456
self._old_queue = []
1457
self._new_queue = []
1458
# Holds the (key, value) items found when processing the root nodes,
1459
# waiting for the uninteresting nodes to be walked
1460
self._new_item_queue = []
1463
def _read_nodes_from_store(self, keys):
1464
# We chose not to use _page_cache, because we think in terms of records
1465
# to be yielded. Also, we expect to touch each page only 1 time during
1466
# this code. (We may want to evaluate saving the raw bytes into the
1467
# page cache, which would allow a working tree update after the fetch
1468
# to not have to read the bytes again.)
1469
stream = self._store.get_record_stream(keys, 'unordered', True)
1470
for record in stream:
1471
if self._pb is not None:
1473
if record.storage_kind == 'absent':
1474
raise errors.NoSuchRevision(self._store, record.key)
1475
bytes = record.get_bytes_as('fulltext')
1476
node = _deserialise(bytes, record.key,
1477
search_key_func=self._search_key_func)
1478
if type(node) is InternalNode:
1479
# Note we don't have to do node.refs() because we know that
1480
# there are no children that have been pushed into this node
1481
prefix_refs = node._items.items()
1485
items = node._items.items()
1486
yield record, node, prefix_refs, items
1488
def _read_old_roots(self):
1489
old_chks_to_enqueue = []
1490
all_old_chks = self._all_old_chks
1491
for record, node, prefix_refs, items in \
1492
self._read_nodes_from_store(self._old_root_keys):
1493
# Uninteresting node
1494
prefix_refs = [p_r for p_r in prefix_refs
1495
if p_r[1] not in all_old_chks]
1496
new_refs = [p_r[1] for p_r in prefix_refs]
1497
all_old_chks.update(new_refs)
1498
self._all_old_items.update(items)
1499
# Queue up the uninteresting references
1500
# Don't actually put them in the 'to-read' queue until we have
1501
# finished checking the interesting references
1502
old_chks_to_enqueue.extend(prefix_refs)
1503
return old_chks_to_enqueue
1505
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1506
# At this point, we have read all the uninteresting and interesting
1507
# items, so we can queue up the uninteresting stuff, knowing that we've
1508
# handled the interesting ones
1509
for prefix, ref in old_chks_to_enqueue:
1510
not_interesting = True
1511
for i in xrange(len(prefix), 0, -1):
1512
if prefix[:i] in new_prefixes:
1513
not_interesting = False
1516
# This prefix is not part of the remaining 'interesting set'
1518
self._old_queue.append(ref)
1520
def _read_all_roots(self):
1521
"""Read the root pages.
1523
This is structured as a generator, so that the root records can be
1524
yielded up to whoever needs them without any buffering.
1526
# This is the bootstrap phase
1527
if not self._old_root_keys:
1528
# With no old_root_keys we can just shortcut and be ready
1529
# for _flush_new_queue
1530
self._new_queue = list(self._new_root_keys)
1532
old_chks_to_enqueue = self._read_old_roots()
1533
# filter out any root keys that are already known to be uninteresting
1534
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1535
# These are prefixes that are present in new_keys that we are
1537
new_prefixes = set()
1538
# We are about to yield all of these, so we don't want them getting
1539
# added a second time
1540
processed_new_refs = self._processed_new_refs
1541
processed_new_refs.update(new_keys)
1542
for record, node, prefix_refs, items in \
1543
self._read_nodes_from_store(new_keys):
1544
# At this level, we now know all the uninteresting references
1545
# So we filter and queue up whatever is remaining
1546
prefix_refs = [p_r for p_r in prefix_refs
1547
if p_r[1] not in self._all_old_chks
1548
and p_r[1] not in processed_new_refs]
1549
refs = [p_r[1] for p_r in prefix_refs]
1550
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1551
self._new_queue.extend(refs)
1552
# TODO: We can potentially get multiple items here, however the
1553
# current design allows for this, as callers will do the work
1554
# to make the results unique. We might profile whether we
1555
# gain anything by ensuring unique return values for items
1556
new_items = [item for item in items
1557
if item not in self._all_old_items]
1558
self._new_item_queue.extend(new_items)
1559
new_prefixes.update([self._search_key_func(item[0])
1560
for item in new_items])
1561
processed_new_refs.update(refs)
1563
# For new_prefixes we have the full length prefixes queued up.
1564
# However, we also need possible prefixes. (If we have a known ref to
1565
# 'ab', then we also need to include 'a'.) So expand the
1566
# new_prefixes to include all shorter prefixes
1567
for prefix in list(new_prefixes):
1568
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1569
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1571
def _flush_new_queue(self):
1572
# No need to maintain the heap invariant anymore, just pull things out
1574
refs = set(self._new_queue)
1575
self._new_queue = []
1576
# First pass, flush all interesting items and convert to using direct refs
1577
all_old_chks = self._all_old_chks
1578
processed_new_refs = self._processed_new_refs
1579
all_old_items = self._all_old_items
1580
new_items = [item for item in self._new_item_queue
1581
if item not in all_old_items]
1582
self._new_item_queue = []
1584
yield None, new_items
1585
refs = refs.difference(all_old_chks)
1588
next_refs_update = next_refs.update
1589
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1590
# from 1m54s to 1m51s. Consider it.
1591
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1592
items = [item for item in items
1593
if item not in all_old_items]
1595
next_refs_update([p_r[1] for p_r in p_refs])
1596
next_refs = next_refs.difference(all_old_chks)
1597
next_refs = next_refs.difference(processed_new_refs)
1598
processed_new_refs.update(next_refs)
1601
def _process_next_old(self):
1602
# Since we don't filter uninteresting any further than during
1603
# _read_all_roots, process the whole queue in a single pass.
1604
refs = self._old_queue
1605
self._old_queue = []
1606
all_old_chks = self._all_old_chks
1607
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1608
self._all_old_items.update(items)
1609
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1610
self._old_queue.extend(refs)
1611
all_old_chks.update(refs)
1613
def _process_queues(self):
1614
while self._old_queue:
1615
self._process_next_old()
1616
return self._flush_new_queue()
1619
for record in self._read_all_roots():
1621
for record, items in self._process_queues():
1625
def iter_interesting_nodes(store, interesting_root_keys,
1626
uninteresting_root_keys, pb=None):
1627
"""Given root keys, find interesting nodes.
1629
Evaluate nodes referenced by interesting_root_keys. Ones that are also
1630
referenced from uninteresting_root_keys are not considered interesting.
1632
:param interesting_root_keys: keys which should be part of the
1633
"interesting" nodes (which will be yielded)
1634
:param uninteresting_root_keys: keys which should be filtered out of the
1637
(interesting record, {interesting key:values})
1639
iterator = CHKMapDifference(store, interesting_root_keys,
1640
uninteresting_root_keys,
1641
search_key_func=store._search_key_func,
1643
return iterator.process()
1647
from bzrlib._chk_map_pyx import (
1650
_deserialise_leaf_node,
1651
_deserialise_internal_node,
1653
except ImportError, e:
1654
osutils.failed_to_load_extension(e)
1655
from bzrlib._chk_map_py import (
1658
_deserialise_leaf_node,
1659
_deserialise_internal_node,
1661
search_key_registry.register('hash-16-way', _search_key_16)
1662
search_key_registry.register('hash-255-way', _search_key_255)