1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
19
Overview and current status:
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
are all an additional 8-bits wide leading to a sparse upper tree.
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
allow optimisation of the update operation; but individual map/unmap calls are
29
possible and supported. Individual changes via map/unmap are buffered in memory
30
until the _save method is called to force serialisation of the tree.
31
apply_delta records its changes immediately by performing an implicit _save.
36
Densely packed upper nodes.
43
from bzrlib import lazy_import
44
lazy_import.lazy_import(globals(), """
57
from bzrlib.static_tuple import StaticTuple
60
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
61
# out, it takes 3.1MB to cache the layer.
62
_PAGE_CACHE_SIZE = 4*1024*1024
63
# Per thread caches for 2 reasons:
64
# - in the server we may be serving very different content, so we get less
66
# - we avoid locking on every cache lookup.
67
_thread_caches = threading.local()
69
_thread_caches.page_cache = None
72
"""Get the per-thread page cache.
74
We need a function to do this because in a new thread the _thread_caches
75
threading.local object does not have the cache initialized yet.
77
page_cache = getattr(_thread_caches, 'page_cache', None)
78
if page_cache is None:
79
# We are caching bytes so len(value) is perfectly accurate
80
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
81
_thread_caches.page_cache = page_cache
89
# If a ChildNode falls below this many bytes, we check for a remap
90
_INTERESTING_NEW_SIZE = 50
91
# If a ChildNode shrinks by more than this amount, we check for a remap
92
_INTERESTING_SHRINKAGE_LIMIT = 20
95
def _search_key_plain(key):
96
"""Map the key tuple into a search string that just uses the key bytes."""
97
return '\x00'.join(key)
100
search_key_registry = registry.Registry()
101
search_key_registry.register('plain', _search_key_plain)
104
class CHKMap(object):
105
"""A persistent map from string to string backed by a CHK store."""
107
__slots__ = ('_store', '_root_node', '_search_key_func')
109
def __init__(self, store, root_key, search_key_func=None):
110
"""Create a CHKMap object.
112
:param store: The store the CHKMap is stored in.
113
:param root_key: The root key of the map. None to create an empty
115
:param search_key_func: A function mapping a key => bytes. These bytes
116
are then used by the internal nodes to split up leaf nodes into
120
if search_key_func is None:
121
search_key_func = _search_key_plain
122
self._search_key_func = search_key_func
124
self._root_node = LeafNode(search_key_func=search_key_func)
126
self._root_node = self._node_key(root_key)
128
def apply_delta(self, delta):
129
"""Apply a delta to the map.
131
:param delta: An iterable of old_key, new_key, new_value tuples.
132
If new_key is not None, then new_key->new_value is inserted
133
into the map; if old_key is not None, then the old mapping
134
of old_key is removed.
137
# Check preconditions first.
138
as_st = StaticTuple.from_sequence
139
new_items = set([as_st(key) for (old, key, value) in delta
140
if key is not None and old is None])
141
existing_new = list(self.iteritems(key_filter=new_items))
143
raise errors.InconsistentDeltaDelta(delta,
144
"New items are already in the map %r." % existing_new)
146
for old, new, value in delta:
147
if old is not None and old != new:
148
self.unmap(old, check_remap=False)
150
for old, new, value in delta:
157
def _ensure_root(self):
158
"""Ensure that the root node is an object not a key."""
159
if type(self._root_node) is StaticTuple:
160
# Demand-load the root
161
self._root_node = self._get_node(self._root_node)
163
def _get_node(self, node):
166
Note that this does not update the _items dict in objects containing a
167
reference to this node. As such it does not prevent subsequent IO being
170
:param node: A tuple key or node object.
171
:return: A node object.
173
if type(node) is StaticTuple:
174
bytes = self._read_bytes(node)
175
return _deserialise(bytes, node,
176
search_key_func=self._search_key_func)
180
def _read_bytes(self, key):
182
return _get_cache()[key]
184
stream = self._store.get_record_stream([key], 'unordered', True)
185
bytes = stream.next().get_bytes_as('fulltext')
186
_get_cache()[key] = bytes
189
def _dump_tree(self, include_keys=False):
190
"""Return the tree in a string representation."""
192
res = self._dump_tree_node(self._root_node, prefix='', indent='',
193
include_keys=include_keys)
194
res.append('') # Give a trailing '\n'
195
return '\n'.join(res)
197
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
198
"""For this node and all children, generate a string representation."""
203
node_key = node.key()
204
if node_key is not None:
205
key_str = ' %s' % (node_key[0],)
208
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
210
if type(node) is InternalNode:
211
# Trigger all child nodes to get loaded
212
list(node._iter_nodes(self._store))
213
for prefix, sub in sorted(node._items.iteritems()):
214
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
215
include_keys=include_keys))
217
for key, value in sorted(node._items.iteritems()):
218
# Don't use prefix nor indent here to line up when used in
219
# tests in conjunction with assertEqualDiff
220
result.append(' %r %r' % (tuple(key), value))
224
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
225
search_key_func=None):
226
"""Create a CHKMap in store with initial_value as the content.
228
:param store: The store to record initial_value in, a VersionedFiles
229
object with 1-tuple keys supporting CHK key generation.
230
:param initial_value: A dict to store in store. Its keys and values
232
:param maximum_size: The maximum_size rule to apply to nodes. This
233
determines the size at which no new data is added to a single node.
234
:param key_width: The number of elements in each key_tuple being stored
236
:param search_key_func: A function mapping a key => bytes. These bytes
237
are then used by the internal nodes to split up leaf nodes into
239
:return: The root chk of the resulting CHKMap.
241
root_key = klass._create_directly(store, initial_value,
242
maximum_size=maximum_size, key_width=key_width,
243
search_key_func=search_key_func)
244
if type(root_key) is not StaticTuple:
245
raise AssertionError('we got a %s instead of a StaticTuple'
250
def _create_via_map(klass, store, initial_value, maximum_size=0,
251
key_width=1, search_key_func=None):
252
result = klass(store, None, search_key_func=search_key_func)
253
result._root_node.set_maximum_size(maximum_size)
254
result._root_node._key_width = key_width
256
for key, value in initial_value.items():
257
delta.append((None, key, value))
258
root_key = result.apply_delta(delta)
262
def _create_directly(klass, store, initial_value, maximum_size=0,
263
key_width=1, search_key_func=None):
264
node = LeafNode(search_key_func=search_key_func)
265
node.set_maximum_size(maximum_size)
266
node._key_width = key_width
267
as_st = StaticTuple.from_sequence
268
node._items = dict([(as_st(key), val) for key, val
269
in initial_value.iteritems()])
270
node._raw_size = sum([node._key_value_len(key, value)
271
for key,value in node._items.iteritems()])
272
node._len = len(node._items)
273
node._compute_search_prefix()
274
node._compute_serialised_prefix()
277
and node._current_size() > maximum_size):
278
prefix, node_details = node._split(store)
279
if len(node_details) == 1:
280
raise AssertionError('Failed to split using node._split')
281
node = InternalNode(prefix, search_key_func=search_key_func)
282
node.set_maximum_size(maximum_size)
283
node._key_width = key_width
284
for split, subnode in node_details:
285
node.add_node(split, subnode)
286
keys = list(node.serialise(store))
289
def iter_changes(self, basis):
290
"""Iterate over the changes between basis and self.
292
:return: An iterator of tuples: (key, old_value, new_value). Old_value
293
is None for keys only in self; new_value is None for keys only in
297
# Read both trees in lexographic, highest-first order.
298
# Any identical nodes we skip
299
# Any unique prefixes we output immediately.
300
# values in a leaf node are treated as single-value nodes in the tree
301
# which allows them to be not-special-cased. We know to output them
302
# because their value is a string, not a key(tuple) or node.
304
# corner cases to beware of when considering this function:
305
# *) common references are at different heights.
306
# consider two trees:
307
# {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
308
# {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
309
# 'ab':LeafNode={'ab':'bar'}}
310
# 'b': LeafNode={'b'}}
311
# the node with aaa/aab will only be encountered in the second tree
312
# after reading the 'a' subtree, but it is encountered in the first
313
# tree immediately. Variations on this may have read internal nodes
314
# like this. we want to cut the entire pending subtree when we
315
# realise we have a common node. For this we use a list of keys -
316
# the path to a node - and check the entire path is clean as we
318
if self._node_key(self._root_node) == self._node_key(basis._root_node):
322
excluded_keys = set()
323
self_node = self._root_node
324
basis_node = basis._root_node
325
# A heap, each element is prefix, node(tuple/NodeObject/string),
326
# key_path (a list of tuples, tail-sharing down the tree.)
329
def process_node(node, path, a_map, pending):
330
# take a node and expand it
331
node = a_map._get_node(node)
332
if type(node) == LeafNode:
333
path = (node._key, path)
334
for key, value in node._items.items():
335
# For a LeafNode, the key is a serialized_key, rather than
336
# a search_key, but the heap is using search_keys
337
search_key = node._search_key_func(key)
338
heapq.heappush(pending, (search_key, key, value, path))
340
# type(node) == InternalNode
341
path = (node._key, path)
342
for prefix, child in node._items.items():
343
heapq.heappush(pending, (prefix, None, child, path))
344
def process_common_internal_nodes(self_node, basis_node):
345
self_items = set(self_node._items.items())
346
basis_items = set(basis_node._items.items())
347
path = (self_node._key, None)
348
for prefix, child in self_items - basis_items:
349
heapq.heappush(self_pending, (prefix, None, child, path))
350
path = (basis_node._key, None)
351
for prefix, child in basis_items - self_items:
352
heapq.heappush(basis_pending, (prefix, None, child, path))
353
def process_common_leaf_nodes(self_node, basis_node):
354
self_items = set(self_node._items.items())
355
basis_items = set(basis_node._items.items())
356
path = (self_node._key, None)
357
for key, value in self_items - basis_items:
358
prefix = self._search_key_func(key)
359
heapq.heappush(self_pending, (prefix, key, value, path))
360
path = (basis_node._key, None)
361
for key, value in basis_items - self_items:
362
prefix = basis._search_key_func(key)
363
heapq.heappush(basis_pending, (prefix, key, value, path))
364
def process_common_prefix_nodes(self_node, self_path,
365
basis_node, basis_path):
366
# Would it be more efficient if we could request both at the same
368
self_node = self._get_node(self_node)
369
basis_node = basis._get_node(basis_node)
370
if (type(self_node) == InternalNode
371
and type(basis_node) == InternalNode):
372
# Matching internal nodes
373
process_common_internal_nodes(self_node, basis_node)
374
elif (type(self_node) == LeafNode
375
and type(basis_node) == LeafNode):
376
process_common_leaf_nodes(self_node, basis_node)
378
process_node(self_node, self_path, self, self_pending)
379
process_node(basis_node, basis_path, basis, basis_pending)
380
process_common_prefix_nodes(self_node, None, basis_node, None)
383
excluded_keys = set()
384
def check_excluded(key_path):
385
# Note that this is N^2, it depends on us trimming trees
386
# aggressively to not become slow.
387
# A better implementation would probably have a reverse map
388
# back to the children of a node, and jump straight to it when
389
# a common node is detected, the proceed to remove the already
390
# pending children. bzrlib.graph has a searcher module with a
392
while key_path is not None:
393
key, key_path = key_path
394
if key in excluded_keys:
399
while self_pending or basis_pending:
402
# self is exhausted: output remainder of basis
403
for prefix, key, node, path in basis_pending:
404
if check_excluded(path):
406
node = basis._get_node(node)
409
yield (key, node, None)
411
# subtree - fastpath the entire thing.
412
for key, value in node.iteritems(basis._store):
413
yield (key, value, None)
415
elif not basis_pending:
416
# basis is exhausted: output remainder of self.
417
for prefix, key, node, path in self_pending:
418
if check_excluded(path):
420
node = self._get_node(node)
423
yield (key, None, node)
425
# subtree - fastpath the entire thing.
426
for key, value in node.iteritems(self._store):
427
yield (key, None, value)
430
# XXX: future optimisation - yield the smaller items
431
# immediately rather than pushing everything on/off the
432
# heaps. Applies to both internal nodes and leafnodes.
433
if self_pending[0][0] < basis_pending[0][0]:
435
prefix, key, node, path = heapq.heappop(self_pending)
436
if check_excluded(path):
440
yield (key, None, node)
442
process_node(node, path, self, self_pending)
444
elif self_pending[0][0] > basis_pending[0][0]:
446
prefix, key, node, path = heapq.heappop(basis_pending)
447
if check_excluded(path):
451
yield (key, node, None)
453
process_node(node, path, basis, basis_pending)
456
# common prefix: possibly expand both
457
if self_pending[0][1] is None:
462
if basis_pending[0][1] is None:
467
if not read_self and not read_basis:
468
# compare a common value
469
self_details = heapq.heappop(self_pending)
470
basis_details = heapq.heappop(basis_pending)
471
if self_details[2] != basis_details[2]:
472
yield (self_details[1],
473
basis_details[2], self_details[2])
475
# At least one side wasn't a simple value
476
if (self._node_key(self_pending[0][2]) ==
477
self._node_key(basis_pending[0][2])):
478
# Identical pointers, skip (and don't bother adding to
479
# excluded, it won't turn up again.
480
heapq.heappop(self_pending)
481
heapq.heappop(basis_pending)
483
# Now we need to expand this node before we can continue
484
if read_self and read_basis:
485
# Both sides start with the same prefix, so process
487
self_prefix, _, self_node, self_path = heapq.heappop(
489
basis_prefix, _, basis_node, basis_path = heapq.heappop(
491
if self_prefix != basis_prefix:
492
raise AssertionError(
493
'%r != %r' % (self_prefix, basis_prefix))
494
process_common_prefix_nodes(
495
self_node, self_path,
496
basis_node, basis_path)
499
prefix, key, node, path = heapq.heappop(self_pending)
500
if check_excluded(path):
502
process_node(node, path, self, self_pending)
504
prefix, key, node, path = heapq.heappop(basis_pending)
505
if check_excluded(path):
507
process_node(node, path, basis, basis_pending)
510
def iteritems(self, key_filter=None):
511
"""Iterate over the entire CHKMap's contents."""
513
if key_filter is not None:
514
as_st = StaticTuple.from_sequence
515
key_filter = [as_st(key) for key in key_filter]
516
return self._root_node.iteritems(self._store, key_filter=key_filter)
519
"""Return the key for this map."""
520
if type(self._root_node) is StaticTuple:
521
return self._root_node
523
return self._root_node._key
527
return len(self._root_node)
529
def map(self, key, value):
530
"""Map a key tuple to value.
532
:param key: A key to map.
533
:param value: The value to assign to key.
535
key = StaticTuple.from_sequence(key)
536
# Need a root object.
538
prefix, node_details = self._root_node.map(self._store, key, value)
539
if len(node_details) == 1:
540
self._root_node = node_details[0][1]
542
self._root_node = InternalNode(prefix,
543
search_key_func=self._search_key_func)
544
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
545
self._root_node._key_width = node_details[0][1]._key_width
546
for split, node in node_details:
547
self._root_node.add_node(split, node)
549
def _node_key(self, node):
550
"""Get the key for a node whether it's a tuple or node."""
551
if type(node) is tuple:
552
node = StaticTuple.from_sequence(node)
553
if type(node) is StaticTuple:
558
def unmap(self, key, check_remap=True):
559
"""remove key from the map."""
560
key = StaticTuple.from_sequence(key)
562
if type(self._root_node) is InternalNode:
563
unmapped = self._root_node.unmap(self._store, key,
564
check_remap=check_remap)
566
unmapped = self._root_node.unmap(self._store, key)
567
self._root_node = unmapped
569
def _check_remap(self):
570
"""Check if nodes can be collapsed."""
572
if type(self._root_node) is InternalNode:
573
self._root_node = self._root_node._check_remap(self._store)
576
"""Save the map completely.
578
:return: The key of the root node.
580
if type(self._root_node) is StaticTuple:
582
return self._root_node
583
keys = list(self._root_node.serialise(self._store))
588
"""Base class defining the protocol for CHK Map nodes.
590
:ivar _raw_size: The total size of the serialized key:value data, before
591
adding the header bytes, and without prefix compression.
594
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
595
'_raw_size', '_items', '_search_prefix', '_search_key_func'
598
def __init__(self, key_width=1):
601
:param key_width: The width of keys for this node.
604
# Current number of elements
606
self._maximum_size = 0
607
self._key_width = key_width
608
# current size in bytes
610
# The pointers/values this node has - meaning defined by child classes.
612
# The common search prefix
613
self._search_prefix = None
616
items_str = str(sorted(self._items))
617
if len(items_str) > 20:
618
items_str = items_str[:16] + '...]'
619
return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
620
self.__class__.__name__, self._key, self._len, self._raw_size,
621
self._maximum_size, self._search_prefix, items_str)
630
def maximum_size(self):
631
"""What is the upper limit for adding references to a node."""
632
return self._maximum_size
634
def set_maximum_size(self, new_size):
635
"""Set the size threshold for nodes.
637
:param new_size: The size at which no data is added to a node. 0 for
640
self._maximum_size = new_size
643
def common_prefix(cls, prefix, key):
644
"""Given 2 strings, return the longest prefix common to both.
646
:param prefix: This has been the common prefix for other keys, so it is
647
more likely to be the common prefix in this case as well.
648
:param key: Another string to compare to
650
if key.startswith(prefix):
653
# Is there a better way to do this?
654
for pos, (left, right) in enumerate(zip(prefix, key)):
658
common = prefix[:pos+1]
662
def common_prefix_for_keys(cls, keys):
663
"""Given a list of keys, find their common prefix.
665
:param keys: An iterable of strings.
666
:return: The longest common prefix of all keys.
670
if common_prefix is None:
673
common_prefix = cls.common_prefix(common_prefix, key)
674
if not common_prefix:
675
# if common_prefix is the empty string, then we know it won't
681
# Singleton indicating we have not computed _search_prefix yet
684
class LeafNode(Node):
685
"""A node containing actual key:value pairs.
687
:ivar _items: A dict of key->value items. The key is in tuple form.
688
:ivar _size: The number of bytes that would be used by serializing all of
692
__slots__ = ('_common_serialised_prefix',)
694
def __init__(self, search_key_func=None):
696
# All of the keys in this leaf node share this common prefix
697
self._common_serialised_prefix = None
698
if search_key_func is None:
699
self._search_key_func = _search_key_plain
701
self._search_key_func = search_key_func
704
items_str = str(sorted(self._items))
705
if len(items_str) > 20:
706
items_str = items_str[:16] + '...]'
708
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
709
% (self.__class__.__name__, self._key, self._len, self._raw_size,
710
self._maximum_size, self._search_prefix, self._key_width, items_str)
712
def _current_size(self):
713
"""Answer the current serialised size of this node.
715
This differs from self._raw_size in that it includes the bytes used for
718
if self._common_serialised_prefix is None:
722
# We will store a single string with the common prefix
723
# And then that common prefix will not be stored in any of the
725
prefix_len = len(self._common_serialised_prefix)
726
bytes_for_items = (self._raw_size - (prefix_len * self._len))
727
return (9 # 'chkleaf:\n'
728
+ len(str(self._maximum_size)) + 1
729
+ len(str(self._key_width)) + 1
730
+ len(str(self._len)) + 1
735
def deserialise(klass, bytes, key, search_key_func=None):
736
"""Deserialise bytes, with key key, into a LeafNode.
738
:param bytes: The bytes of the node.
739
:param key: The key that the serialised node has.
741
key = static_tuple.expect_static_tuple(key)
742
return _deserialise_leaf_node(bytes, key,
743
search_key_func=search_key_func)
745
def iteritems(self, store, key_filter=None):
746
"""Iterate over items in the node.
748
:param key_filter: A filter to apply to the node. It should be a
749
list/set/dict or similar repeatedly iterable container.
751
if key_filter is not None:
752
# Adjust the filter - short elements go to a prefix filter. All
753
# other items are looked up directly.
754
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
756
for key in key_filter:
757
if len(key) == self._key_width:
758
# This filter is meant to match exactly one key, yield it
761
yield key, self._items[key]
763
# This key is not present in this map, continue
766
# Short items, we need to match based on a prefix
767
length_filter = filters.setdefault(len(key), set())
768
length_filter.add(key)
770
filters = filters.items()
771
for item in self._items.iteritems():
772
for length, length_filter in filters:
773
if item[0][:length] in length_filter:
777
for item in self._items.iteritems():
780
def _key_value_len(self, key, value):
781
# TODO: Should probably be done without actually joining the key, but
782
# then that can be done via the C extension
783
return (len(self._serialise_key(key)) + 1
784
+ len(str(value.count('\n'))) + 1
787
def _search_key(self, key):
788
return self._search_key_func(key)
790
def _map_no_split(self, key, value):
791
"""Map a key to a value.
793
This assumes either the key does not already exist, or you have already
794
removed its size and length from self.
796
:return: True if adding this node should cause us to split.
798
self._items[key] = value
799
self._raw_size += self._key_value_len(key, value)
801
serialised_key = self._serialise_key(key)
802
if self._common_serialised_prefix is None:
803
self._common_serialised_prefix = serialised_key
805
self._common_serialised_prefix = self.common_prefix(
806
self._common_serialised_prefix, serialised_key)
807
search_key = self._search_key(key)
808
if self._search_prefix is _unknown:
809
self._compute_search_prefix()
810
if self._search_prefix is None:
811
self._search_prefix = search_key
813
self._search_prefix = self.common_prefix(
814
self._search_prefix, search_key)
816
and self._maximum_size
817
and self._current_size() > self._maximum_size):
818
# Check to see if all of the search_keys for this node are
819
# identical. We allow the node to grow under that circumstance
820
# (we could track this as common state, but it is infrequent)
821
if (search_key != self._search_prefix
822
or not self._are_search_keys_identical()):
826
def _split(self, store):
827
"""We have overflowed.
829
Split this node into multiple LeafNodes, return it up the stack so that
830
the next layer creates a new InternalNode and references the new nodes.
832
:return: (common_serialised_prefix, [(node_serialised_prefix, node)])
834
if self._search_prefix is _unknown:
835
raise AssertionError('Search prefix must be known')
836
common_prefix = self._search_prefix
837
split_at = len(common_prefix) + 1
839
for key, value in self._items.iteritems():
840
search_key = self._search_key(key)
841
prefix = search_key[:split_at]
842
# TODO: Generally only 1 key can be exactly the right length,
843
# which means we can only have 1 key in the node pointed
844
# at by the 'prefix\0' key. We might want to consider
845
# folding it into the containing InternalNode rather than
846
# having a fixed length-1 node.
847
# Note this is probably not true for hash keys, as they
848
# may get a '\00' node anywhere, but won't have keys of
850
if len(prefix) < split_at:
851
prefix += '\x00'*(split_at - len(prefix))
852
if prefix not in result:
853
node = LeafNode(search_key_func=self._search_key_func)
854
node.set_maximum_size(self._maximum_size)
855
node._key_width = self._key_width
856
result[prefix] = node
858
node = result[prefix]
859
sub_prefix, node_details = node.map(store, key, value)
860
if len(node_details) > 1:
861
if prefix != sub_prefix:
862
# This node has been split and is now found via a different
865
new_node = InternalNode(sub_prefix,
866
search_key_func=self._search_key_func)
867
new_node.set_maximum_size(self._maximum_size)
868
new_node._key_width = self._key_width
869
for split, node in node_details:
870
new_node.add_node(split, node)
871
result[prefix] = new_node
872
return common_prefix, result.items()
874
def map(self, store, key, value):
875
"""Map key to value."""
876
if key in self._items:
877
self._raw_size -= self._key_value_len(key, self._items[key])
880
if self._map_no_split(key, value):
881
return self._split(store)
883
if self._search_prefix is _unknown:
884
raise AssertionError('%r must be known' % self._search_prefix)
885
return self._search_prefix, [("", self)]
887
_serialise_key = '\x00'.join
889
def serialise(self, store):
890
"""Serialise the LeafNode to store.
892
:param store: A VersionedFiles honouring the CHK extensions.
893
:return: An iterable of the keys inserted by this operation.
895
lines = ["chkleaf:\n"]
896
lines.append("%d\n" % self._maximum_size)
897
lines.append("%d\n" % self._key_width)
898
lines.append("%d\n" % self._len)
899
if self._common_serialised_prefix is None:
901
if len(self._items) != 0:
902
raise AssertionError('If _common_serialised_prefix is None'
903
' we should have no items')
905
lines.append('%s\n' % (self._common_serialised_prefix,))
906
prefix_len = len(self._common_serialised_prefix)
907
for key, value in sorted(self._items.items()):
908
# Always add a final newline
909
value_lines = osutils.chunks_to_lines([value + '\n'])
910
serialized = "%s\x00%s\n" % (self._serialise_key(key),
912
if not serialized.startswith(self._common_serialised_prefix):
913
raise AssertionError('We thought the common prefix was %r'
914
' but entry %r does not have it in common'
915
% (self._common_serialised_prefix, serialized))
916
lines.append(serialized[prefix_len:])
917
lines.extend(value_lines)
918
sha1, _, _ = store.add_lines((None,), (), lines)
919
self._key = StaticTuple("sha1:" + sha1,).intern()
920
bytes = ''.join(lines)
921
if len(bytes) != self._current_size():
922
raise AssertionError('Invalid _current_size')
923
_get_cache()[self._key] = bytes
927
"""Return the references to other CHK's held by this node."""
930
def _compute_search_prefix(self):
931
"""Determine the common search prefix for all keys in this node.
933
:return: A bytestring of the longest search key prefix that is
934
unique within this node.
936
search_keys = [self._search_key_func(key) for key in self._items]
937
self._search_prefix = self.common_prefix_for_keys(search_keys)
938
return self._search_prefix
940
def _are_search_keys_identical(self):
941
"""Check to see if the search keys for all entries are the same.
943
When using a hash as the search_key it is possible for non-identical
944
keys to collide. If that happens enough, we may try overflow a
945
LeafNode, but as all are collisions, we must not split.
947
common_search_key = None
948
for key in self._items:
949
search_key = self._search_key(key)
950
if common_search_key is None:
951
common_search_key = search_key
952
elif search_key != common_search_key:
956
def _compute_serialised_prefix(self):
957
"""Determine the common prefix for serialised keys in this node.
959
:return: A bytestring of the longest serialised key prefix that is
960
unique within this node.
962
serialised_keys = [self._serialise_key(key) for key in self._items]
963
self._common_serialised_prefix = self.common_prefix_for_keys(
965
return self._common_serialised_prefix
967
def unmap(self, store, key):
968
"""Unmap key from the node."""
970
self._raw_size -= self._key_value_len(key, self._items[key])
972
trace.mutter("key %s not found in %r", key, self._items)
977
# Recompute from scratch
978
self._compute_search_prefix()
979
self._compute_serialised_prefix()
983
class InternalNode(Node):
984
"""A node that contains references to other nodes.
986
An InternalNode is responsible for mapping search key prefixes to child
989
:ivar _items: serialised_key => node dictionary. node may be a tuple,
990
LeafNode or InternalNode.
993
__slots__ = ('_node_width',)
995
def __init__(self, prefix='', search_key_func=None):
997
# The size of an internalnode with default values and no children.
998
# How many octets key prefixes within this node are.
1000
self._search_prefix = prefix
1001
if search_key_func is None:
1002
self._search_key_func = _search_key_plain
1004
self._search_key_func = search_key_func
1006
def add_node(self, prefix, node):
1007
"""Add a child node with prefix prefix, and node node.
1009
:param prefix: The search key prefix for node.
1010
:param node: The node being added.
1012
if self._search_prefix is None:
1013
raise AssertionError("_search_prefix should not be None")
1014
if not prefix.startswith(self._search_prefix):
1015
raise AssertionError("prefixes mismatch: %s must start with %s"
1016
% (prefix,self._search_prefix))
1017
if len(prefix) != len(self._search_prefix) + 1:
1018
raise AssertionError("prefix wrong length: len(%s) is not %d" %
1019
(prefix, len(self._search_prefix) + 1))
1020
self._len += len(node)
1021
if not len(self._items):
1022
self._node_width = len(prefix)
1023
if self._node_width != len(self._search_prefix) + 1:
1024
raise AssertionError("node width mismatch: %d is not %d" %
1025
(self._node_width, len(self._search_prefix) + 1))
1026
self._items[prefix] = node
1029
def _current_size(self):
1030
"""Answer the current serialised size of this node."""
1031
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
1032
len(str(self._maximum_size)))
1035
def deserialise(klass, bytes, key, search_key_func=None):
1036
"""Deserialise bytes to an InternalNode, with key key.
1038
:param bytes: The bytes of the node.
1039
:param key: The key that the serialised node has.
1040
:return: An InternalNode instance.
1042
key = static_tuple.expect_static_tuple(key)
1043
return _deserialise_internal_node(bytes, key,
1044
search_key_func=search_key_func)
1046
def iteritems(self, store, key_filter=None):
1047
for node, node_filter in self._iter_nodes(store, key_filter=key_filter):
1048
for item in node.iteritems(store, key_filter=node_filter):
1051
def _iter_nodes(self, store, key_filter=None, batch_size=None):
1052
"""Iterate over node objects which match key_filter.
1054
:param store: A store to use for accessing content.
1055
:param key_filter: A key filter to filter nodes. Only nodes that might
1056
contain a key in key_filter will be returned.
1057
:param batch_size: If not None, then we will return the nodes that had
1058
to be read using get_record_stream in batches, rather than reading
1060
:return: An iterable of nodes. This function does not have to be fully
1061
consumed. (There will be no pending I/O when items are being returned.)
1063
# Map from chk key ('sha1:...',) to (prefix, key_filter)
1064
# prefix is the key in self._items to use, key_filter is the key_filter
1065
# entries that would match this node
1068
if key_filter is None:
1069
# yielding all nodes, yield whatever we have, and queue up a read
1070
# for whatever we are missing
1072
for prefix, node in self._items.iteritems():
1073
if node.__class__ is StaticTuple:
1074
keys[node] = (prefix, None)
1077
elif len(key_filter) == 1:
1078
# Technically, this path could also be handled by the first check
1079
# in 'self._node_width' in length_filters. However, we can handle
1080
# this case without spending any time building up the
1081
# prefix_to_keys, etc state.
1083
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1084
# 0.626us list(key_filter)[0]
1085
# is a func() for list(), 2 mallocs, and a getitem
1086
# 0.489us [k for k in key_filter][0]
1087
# still has the mallocs, avoids the func() call
1088
# 0.350us iter(key_filter).next()
1089
# has a func() call, and mallocs an iterator
1090
# 0.125us for key in key_filter: pass
1091
# no func() overhead, might malloc an iterator
1092
# 0.105us for key in key_filter: break
1093
# no func() overhead, might malloc an iterator, probably
1094
# avoids checking an 'else' clause as part of the for
1095
for key in key_filter:
1097
search_prefix = self._search_prefix_filter(key)
1098
if len(search_prefix) == self._node_width:
1099
# This item will match exactly, so just do a dict lookup, and
1100
# see what we can return
1103
node = self._items[search_prefix]
1105
# A given key can only match 1 child node, if it isn't
1106
# there, then we can just return nothing
1108
if node.__class__ is StaticTuple:
1109
keys[node] = (search_prefix, [key])
1111
# This is loaded, and the only thing that can match,
1116
# First, convert all keys into a list of search prefixes
1117
# Aggregate common prefixes, and track the keys they come from
1120
for key in key_filter:
1121
search_prefix = self._search_prefix_filter(key)
1122
length_filter = length_filters.setdefault(
1123
len(search_prefix), set())
1124
length_filter.add(search_prefix)
1125
prefix_to_keys.setdefault(search_prefix, []).append(key)
1127
if (self._node_width in length_filters
1128
and len(length_filters) == 1):
1129
# all of the search prefixes match exactly _node_width. This
1130
# means that everything is an exact match, and we can do a
1131
# lookup into self._items, rather than iterating over the items
1133
search_prefixes = length_filters[self._node_width]
1134
for search_prefix in search_prefixes:
1136
node = self._items[search_prefix]
1138
# We can ignore this one
1140
node_key_filter = prefix_to_keys[search_prefix]
1141
if node.__class__ is StaticTuple:
1142
keys[node] = (search_prefix, node_key_filter)
1144
yield node, node_key_filter
1146
# The slow way. We walk every item in self._items, and check to
1147
# see if there are any matches
1148
length_filters = length_filters.items()
1149
for prefix, node in self._items.iteritems():
1150
node_key_filter = []
1151
for length, length_filter in length_filters:
1152
sub_prefix = prefix[:length]
1153
if sub_prefix in length_filter:
1154
node_key_filter.extend(prefix_to_keys[sub_prefix])
1155
if node_key_filter: # this key matched something, yield it
1156
if node.__class__ is StaticTuple:
1157
keys[node] = (prefix, node_key_filter)
1159
yield node, node_key_filter
1161
# Look in the page cache for some more bytes
1165
bytes = _get_cache()[key]
1169
node = _deserialise(bytes, key,
1170
search_key_func=self._search_key_func)
1171
prefix, node_key_filter = keys[key]
1172
self._items[prefix] = node
1174
yield node, node_key_filter
1175
for key in found_keys:
1178
# demand load some pages.
1179
if batch_size is None:
1180
# Read all the keys in
1181
batch_size = len(keys)
1182
key_order = list(keys)
1183
for batch_start in range(0, len(key_order), batch_size):
1184
batch = key_order[batch_start:batch_start + batch_size]
1185
# We have to fully consume the stream so there is no pending
1186
# I/O, so we buffer the nodes for now.
1187
stream = store.get_record_stream(batch, 'unordered', True)
1188
node_and_filters = []
1189
for record in stream:
1190
bytes = record.get_bytes_as('fulltext')
1191
node = _deserialise(bytes, record.key,
1192
search_key_func=self._search_key_func)
1193
prefix, node_key_filter = keys[record.key]
1194
node_and_filters.append((node, node_key_filter))
1195
self._items[prefix] = node
1196
_get_cache()[record.key] = bytes
1197
for info in node_and_filters:
1200
def map(self, store, key, value):
1201
"""Map key to value."""
1202
if not len(self._items):
1203
raise AssertionError("can't map in an empty InternalNode.")
1204
search_key = self._search_key(key)
1205
if self._node_width != len(self._search_prefix) + 1:
1206
raise AssertionError("node width mismatch: %d is not %d" %
1207
(self._node_width, len(self._search_prefix) + 1))
1208
if not search_key.startswith(self._search_prefix):
1209
# This key doesn't fit in this index, so we need to split at the
1210
# point where it would fit, insert self into that internal node,
1211
# and then map this key into that node.
1212
new_prefix = self.common_prefix(self._search_prefix,
1214
new_parent = InternalNode(new_prefix,
1215
search_key_func=self._search_key_func)
1216
new_parent.set_maximum_size(self._maximum_size)
1217
new_parent._key_width = self._key_width
1218
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1220
return new_parent.map(store, key, value)
1221
children = [node for node, _
1222
in self._iter_nodes(store, key_filter=[key])]
1227
child = self._new_child(search_key, LeafNode)
1228
old_len = len(child)
1229
if type(child) is LeafNode:
1230
old_size = child._current_size()
1233
prefix, node_details = child.map(store, key, value)
1234
if len(node_details) == 1:
1235
# child may have shrunk, or might be a new node
1236
child = node_details[0][1]
1237
self._len = self._len - old_len + len(child)
1238
self._items[search_key] = child
1241
if type(child) is LeafNode:
1242
if old_size is None:
1243
# The old node was an InternalNode which means it has now
1244
# collapsed, so we need to check if it will chain to a
1245
# collapse at this level.
1246
trace.mutter("checking remap as InternalNode -> LeafNode")
1247
new_node = self._check_remap(store)
1249
# If the LeafNode has shrunk in size, we may want to run
1250
# a remap check. Checking for a remap is expensive though
1251
# and the frequency of a successful remap is very low.
1252
# Shrinkage by small amounts is common, so we only do the
1253
# remap check if the new_size is low or the shrinkage
1254
# amount is over a configurable limit.
1255
new_size = child._current_size()
1256
shrinkage = old_size - new_size
1257
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1258
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1260
"checking remap as size shrunk by %d to be %d",
1261
shrinkage, new_size)
1262
new_node = self._check_remap(store)
1263
if new_node._search_prefix is None:
1264
raise AssertionError("_search_prefix should not be None")
1265
return new_node._search_prefix, [('', new_node)]
1266
# child has overflown - create a new intermediate node.
1267
# XXX: This is where we might want to try and expand our depth
1268
# to refer to more bytes of every child (which would give us
1269
# multiple pointers to child nodes, but less intermediate nodes)
1270
child = self._new_child(search_key, InternalNode)
1271
child._search_prefix = prefix
1272
for split, node in node_details:
1273
child.add_node(split, node)
1274
self._len = self._len - old_len + len(child)
1276
return self._search_prefix, [("", self)]
1278
def _new_child(self, search_key, klass):
1279
"""Create a new child node of type klass."""
1281
child.set_maximum_size(self._maximum_size)
1282
child._key_width = self._key_width
1283
child._search_key_func = self._search_key_func
1284
self._items[search_key] = child
1287
def serialise(self, store):
1288
"""Serialise the node to store.
1290
:param store: A VersionedFiles honouring the CHK extensions.
1291
:return: An iterable of the keys inserted by this operation.
1293
for node in self._items.itervalues():
1294
if type(node) is StaticTuple:
1295
# Never deserialised.
1297
if node._key is not None:
1300
for key in node.serialise(store):
1302
lines = ["chknode:\n"]
1303
lines.append("%d\n" % self._maximum_size)
1304
lines.append("%d\n" % self._key_width)
1305
lines.append("%d\n" % self._len)
1306
if self._search_prefix is None:
1307
raise AssertionError("_search_prefix should not be None")
1308
lines.append('%s\n' % (self._search_prefix,))
1309
prefix_len = len(self._search_prefix)
1310
for prefix, node in sorted(self._items.items()):
1311
if type(node) is StaticTuple:
1315
serialised = "%s\x00%s\n" % (prefix, key)
1316
if not serialised.startswith(self._search_prefix):
1317
raise AssertionError("prefixes mismatch: %s must start with %s"
1318
% (serialised, self._search_prefix))
1319
lines.append(serialised[prefix_len:])
1320
sha1, _, _ = store.add_lines((None,), (), lines)
1321
self._key = StaticTuple("sha1:" + sha1,).intern()
1322
_get_cache()[self._key] = ''.join(lines)
1325
def _search_key(self, key):
1326
"""Return the serialised key for key in this node."""
1327
# search keys are fixed width. All will be self._node_width wide, so we
1329
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1331
def _search_prefix_filter(self, key):
1332
"""Serialise key for use as a prefix filter in iteritems."""
1333
return self._search_key_func(key)[:self._node_width]
1335
def _split(self, offset):
1336
"""Split this node into smaller nodes starting at offset.
1338
:param offset: The offset to start the new child nodes at.
1339
:return: An iterable of (prefix, node) tuples. prefix is a byte
1340
prefix for reaching node.
1342
if offset >= self._node_width:
1343
for node in self._items.values():
1344
for result in node._split(offset):
1347
for key, node in self._items.items():
1351
"""Return the references to other CHK's held by this node."""
1352
if self._key is None:
1353
raise AssertionError("unserialised nodes have no refs.")
1355
for value in self._items.itervalues():
1356
if type(value) is StaticTuple:
1359
refs.append(value.key())
1362
def _compute_search_prefix(self, extra_key=None):
1363
"""Return the unique key prefix for this node.
1365
:return: A bytestring of the longest search key prefix that is
1366
unique within this node.
1368
self._search_prefix = self.common_prefix_for_keys(self._items)
1369
return self._search_prefix
1371
def unmap(self, store, key, check_remap=True):
1372
"""Remove key from this node and its children."""
1373
if not len(self._items):
1374
raise AssertionError("can't unmap in an empty InternalNode.")
1375
children = [node for node, _
1376
in self._iter_nodes(store, key_filter=[key])]
1382
unmapped = child.unmap(store, key)
1384
search_key = self._search_key(key)
1385
if len(unmapped) == 0:
1386
# All child nodes are gone, remove the child:
1387
del self._items[search_key]
1390
# Stash the returned node
1391
self._items[search_key] = unmapped
1392
if len(self._items) == 1:
1393
# this node is no longer needed:
1394
return self._items.values()[0]
1395
if type(unmapped) is InternalNode:
1398
return self._check_remap(store)
1402
def _check_remap(self, store):
1403
"""Check if all keys contained by children fit in a single LeafNode.
1405
:param store: A store to use for reading more nodes
1406
:return: Either self, or a new LeafNode which should replace self.
1408
# Logic for how we determine when we need to rebuild
1409
# 1) Implicitly unmap() is removing a key which means that the child
1410
# nodes are going to be shrinking by some extent.
1411
# 2) If all children are LeafNodes, it is possible that they could be
1412
# combined into a single LeafNode, which can then completely replace
1413
# this internal node with a single LeafNode
1414
# 3) If *one* child is an InternalNode, we assume it has already done
1415
# all the work to determine that its children cannot collapse, and
1416
# we can then assume that those nodes *plus* the current nodes don't
1417
# have a chance of collapsing either.
1418
# So a very cheap check is to just say if 'unmapped' is an
1419
# InternalNode, we don't have to check further.
1421
# TODO: Another alternative is to check the total size of all known
1422
# LeafNodes. If there is some formula we can use to determine the
1423
# final size without actually having to read in any more
1424
# children, it would be nice to have. However, we have to be
1425
# careful with stuff like nodes that pull out the common prefix
1426
# of each key, as adding a new key can change the common prefix
1427
# and cause size changes greater than the length of one key.
1428
# So for now, we just add everything to a new Leaf until it
1429
# splits, as we know that will give the right answer
1430
new_leaf = LeafNode(search_key_func=self._search_key_func)
1431
new_leaf.set_maximum_size(self._maximum_size)
1432
new_leaf._key_width = self._key_width
1433
# A batch_size of 16 was chosen because:
1434
# a) In testing, a 4k page held 14 times. So if we have more than 16
1435
# leaf nodes we are unlikely to hold them in a single new leaf
1436
# node. This still allows for 1 round trip
1437
# b) With 16-way fan out, we can still do a single round trip
1438
# c) With 255-way fan out, we don't want to read all 255 and destroy
1439
# the page cache, just to determine that we really don't need it.
1440
for node, _ in self._iter_nodes(store, batch_size=16):
1441
if type(node) is InternalNode:
1442
# Without looking at any leaf nodes, we are sure
1444
for key, value in node._items.iteritems():
1445
if new_leaf._map_no_split(key, value):
1447
trace.mutter("remap generated a new LeafNode")
1451
def _deserialise(bytes, key, search_key_func):
1452
"""Helper for repositorydetails - convert bytes to a node."""
1453
if bytes.startswith("chkleaf:\n"):
1454
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1455
elif bytes.startswith("chknode:\n"):
1456
node = InternalNode.deserialise(bytes, key,
1457
search_key_func=search_key_func)
1459
raise AssertionError("Unknown node type.")
1463
class CHKMapDifference(object):
1464
"""Iterate the stored pages and key,value pairs for (new - old).
1466
This class provides a generator over the stored CHK pages and the
1467
(key, value) pairs that are in any of the new maps and not in any of the
1470
Note that it may yield chk pages that are common (especially root nodes),
1471
but it won't yield (key,value) pairs that are common.
1474
def __init__(self, store, new_root_keys, old_root_keys,
1475
search_key_func, pb=None):
1476
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1477
# force callers to use StaticTuple, because there will often be
1478
# lots of keys passed in here. And even if we cast it locally,
1479
# that just meanst that we will have *both* a StaticTuple and a
1480
# tuple() in memory, referring to the same object. (so a net
1481
# increase in memory, not a decrease.)
1483
self._new_root_keys = new_root_keys
1484
self._old_root_keys = old_root_keys
1486
# All uninteresting chks that we have seen. By the time they are added
1487
# here, they should be either fully ignored, or queued up for
1489
# TODO: This might grow to a large size if there are lots of merge
1490
# parents, etc. However, it probably doesn't scale to O(history)
1491
# like _processed_new_refs does.
1492
self._all_old_chks = set(self._old_root_keys)
1493
# All items that we have seen from the old_root_keys
1494
self._all_old_items = set()
1495
# These are interesting items which were either read, or already in the
1496
# interesting queue (so we don't need to walk them again)
1497
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1499
self._processed_new_refs = set()
1500
self._search_key_func = search_key_func
1502
# The uninteresting and interesting nodes to be searched
1503
self._old_queue = []
1504
self._new_queue = []
1505
# Holds the (key, value) items found when processing the root nodes,
1506
# waiting for the uninteresting nodes to be walked
1507
self._new_item_queue = []
1510
def _read_nodes_from_store(self, keys):
1511
# We chose not to use _get_cache(), because we think in
1512
# terms of records to be yielded. Also, we expect to touch each page
1513
# only 1 time during this code. (We may want to evaluate saving the
1514
# raw bytes into the page cache, which would allow a working tree
1515
# update after the fetch to not have to read the bytes again.)
1516
as_st = StaticTuple.from_sequence
1517
stream = self._store.get_record_stream(keys, 'unordered', True)
1518
for record in stream:
1519
if self._pb is not None:
1521
if record.storage_kind == 'absent':
1522
raise errors.NoSuchRevision(self._store, record.key)
1523
bytes = record.get_bytes_as('fulltext')
1524
node = _deserialise(bytes, record.key,
1525
search_key_func=self._search_key_func)
1526
if type(node) is InternalNode:
1527
# Note we don't have to do node.refs() because we know that
1528
# there are no children that have been pushed into this node
1529
# Note: Using as_st() here seemed to save 1.2MB, which would
1530
# indicate that we keep 100k prefix_refs around while
1531
# processing. They *should* be shorter lived than that...
1532
# It does cost us ~10s of processing time
1533
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1534
prefix_refs = node._items.items()
1538
# Note: We don't use a StaticTuple here. Profiling showed a
1539
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1540
# But a significant slowdown (15s / 145s, or 10%)
1541
items = node._items.items()
1542
yield record, node, prefix_refs, items
1544
def _read_old_roots(self):
1545
old_chks_to_enqueue = []
1546
all_old_chks = self._all_old_chks
1547
for record, node, prefix_refs, items in \
1548
self._read_nodes_from_store(self._old_root_keys):
1549
# Uninteresting node
1550
prefix_refs = [p_r for p_r in prefix_refs
1551
if p_r[1] not in all_old_chks]
1552
new_refs = [p_r[1] for p_r in prefix_refs]
1553
all_old_chks.update(new_refs)
1554
# TODO: This might be a good time to turn items into StaticTuple
1555
# instances and possibly intern them. However, this does not
1556
# impact 'initial branch' performance, so I'm not worrying
1558
self._all_old_items.update(items)
1559
# Queue up the uninteresting references
1560
# Don't actually put them in the 'to-read' queue until we have
1561
# finished checking the interesting references
1562
old_chks_to_enqueue.extend(prefix_refs)
1563
return old_chks_to_enqueue
1565
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1566
# At this point, we have read all the uninteresting and interesting
1567
# items, so we can queue up the uninteresting stuff, knowing that we've
1568
# handled the interesting ones
1569
for prefix, ref in old_chks_to_enqueue:
1570
not_interesting = True
1571
for i in xrange(len(prefix), 0, -1):
1572
if prefix[:i] in new_prefixes:
1573
not_interesting = False
1576
# This prefix is not part of the remaining 'interesting set'
1578
self._old_queue.append(ref)
1580
def _read_all_roots(self):
1581
"""Read the root pages.
1583
This is structured as a generator, so that the root records can be
1584
yielded up to whoever needs them without any buffering.
1586
# This is the bootstrap phase
1587
if not self._old_root_keys:
1588
# With no old_root_keys we can just shortcut and be ready
1589
# for _flush_new_queue
1590
self._new_queue = list(self._new_root_keys)
1592
old_chks_to_enqueue = self._read_old_roots()
1593
# filter out any root keys that are already known to be uninteresting
1594
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1595
# These are prefixes that are present in new_keys that we are
1597
new_prefixes = set()
1598
# We are about to yield all of these, so we don't want them getting
1599
# added a second time
1600
processed_new_refs = self._processed_new_refs
1601
processed_new_refs.update(new_keys)
1602
for record, node, prefix_refs, items in \
1603
self._read_nodes_from_store(new_keys):
1604
# At this level, we now know all the uninteresting references
1605
# So we filter and queue up whatever is remaining
1606
prefix_refs = [p_r for p_r in prefix_refs
1607
if p_r[1] not in self._all_old_chks
1608
and p_r[1] not in processed_new_refs]
1609
refs = [p_r[1] for p_r in prefix_refs]
1610
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1611
self._new_queue.extend(refs)
1612
# TODO: We can potentially get multiple items here, however the
1613
# current design allows for this, as callers will do the work
1614
# to make the results unique. We might profile whether we
1615
# gain anything by ensuring unique return values for items
1616
# TODO: This might be a good time to cast to StaticTuple, as
1617
# self._new_item_queue will hold the contents of multiple
1618
# records for an extended lifetime
1619
new_items = [item for item in items
1620
if item not in self._all_old_items]
1621
self._new_item_queue.extend(new_items)
1622
new_prefixes.update([self._search_key_func(item[0])
1623
for item in new_items])
1624
processed_new_refs.update(refs)
1626
# For new_prefixes we have the full length prefixes queued up.
1627
# However, we also need possible prefixes. (If we have a known ref to
1628
# 'ab', then we also need to include 'a'.) So expand the
1629
# new_prefixes to include all shorter prefixes
1630
for prefix in list(new_prefixes):
1631
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1632
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1634
def _flush_new_queue(self):
1635
# No need to maintain the heap invariant anymore, just pull things out
1637
refs = set(self._new_queue)
1638
self._new_queue = []
1639
# First pass, flush all interesting items and convert to using direct refs
1640
all_old_chks = self._all_old_chks
1641
processed_new_refs = self._processed_new_refs
1642
all_old_items = self._all_old_items
1643
new_items = [item for item in self._new_item_queue
1644
if item not in all_old_items]
1645
self._new_item_queue = []
1647
yield None, new_items
1648
refs = refs.difference(all_old_chks)
1649
processed_new_refs.update(refs)
1651
# TODO: Using a SimpleSet for self._processed_new_refs and
1652
# saved as much as 10MB of peak memory. However, it requires
1653
# implementing a non-pyrex version.
1655
next_refs_update = next_refs.update
1656
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1657
# from 1m54s to 1m51s. Consider it.
1658
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1660
# using the 'if' check saves about 145s => 141s, when
1661
# streaming initial branch of Launchpad data.
1662
items = [item for item in items
1663
if item not in all_old_items]
1665
next_refs_update([p_r[1] for p_r in p_refs])
1667
# set1.difference(set/dict) walks all of set1, and checks if it
1668
# exists in 'other'.
1669
# set1.difference(iterable) walks all of iterable, and does a
1670
# 'difference_update' on a clone of set1. Pick wisely based on the
1671
# expected sizes of objects.
1672
# in our case it is expected that 'new_refs' will always be quite
1674
next_refs = next_refs.difference(all_old_chks)
1675
next_refs = next_refs.difference(processed_new_refs)
1676
processed_new_refs.update(next_refs)
1679
def _process_next_old(self):
1680
# Since we don't filter uninteresting any further than during
1681
# _read_all_roots, process the whole queue in a single pass.
1682
refs = self._old_queue
1683
self._old_queue = []
1684
all_old_chks = self._all_old_chks
1685
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1686
# TODO: Use StaticTuple here?
1687
self._all_old_items.update(items)
1688
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1689
self._old_queue.extend(refs)
1690
all_old_chks.update(refs)
1692
def _process_queues(self):
1693
while self._old_queue:
1694
self._process_next_old()
1695
return self._flush_new_queue()
1698
for record in self._read_all_roots():
1700
for record, items in self._process_queues():
1704
def iter_interesting_nodes(store, interesting_root_keys,
1705
uninteresting_root_keys, pb=None):
1706
"""Given root keys, find interesting nodes.
1708
Evaluate nodes referenced by interesting_root_keys. Ones that are also
1709
referenced from uninteresting_root_keys are not considered interesting.
1711
:param interesting_root_keys: keys which should be part of the
1712
"interesting" nodes (which will be yielded)
1713
:param uninteresting_root_keys: keys which should be filtered out of the
1716
(interesting record, {interesting key:values})
1718
iterator = CHKMapDifference(store, interesting_root_keys,
1719
uninteresting_root_keys,
1720
search_key_func=store._search_key_func,
1722
return iterator.process()
1726
from bzrlib._chk_map_pyx import (
1730
_deserialise_leaf_node,
1731
_deserialise_internal_node,
1733
except ImportError, e:
1734
osutils.failed_to_load_extension(e)
1735
from bzrlib._chk_map_py import (
1739
_deserialise_leaf_node,
1740
_deserialise_internal_node,
1742
search_key_registry.register('hash-16-way', _search_key_16)
1743
search_key_registry.register('hash-255-way', _search_key_255)
1746
def _check_key(key):
1747
"""Helper function to assert that a key is properly formatted.
1749
This generally shouldn't be used in production code, but it can be helpful
1752
if type(key) is not StaticTuple:
1753
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1755
raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
1756
if type(key[0]) is not str:
1757
raise TypeError('key %r should hold a str, not %r'
1758
% (key, type(key[0])))
1759
if not key[0].startswith('sha1:'):
1760
raise ValueError('key %r should point to a sha1:' % (key,))