1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Persistent maps from tuple_of_strings->string using CHK stores.
19
Overview and current status:
21
The CHKMap class implements a dict from tuple_of_strings->string by using a trie
22
with internal nodes of 8-bit fan out; The key tuples are mapped to strings by
23
joining them by \x00, and \x00 padding shorter keys out to the length of the
24
longest key. Leaf nodes are packed as densely as possible, and internal nodes
25
are all an additional 8-bits wide leading to a sparse upper tree.
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
allow optimisation of the update operation; but individual map/unmap calls are
29
possible and supported. All changes via map/unmap are buffered in memory until
30
the _save method is called to force serialisation of the tree. apply_delta
31
performs a _save implicitly.
36
Densely packed upper nodes.
43
from bzrlib import lazy_import
44
lazy_import.lazy_import(globals(), """
45
from bzrlib import versionedfile
56
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
57
# out, it takes 3.1MB to cache the layer.
58
_PAGE_CACHE_SIZE = 4*1024*1024
59
# We are caching bytes so len(value) is perfectly accurate
60
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
62
# If a ChildNode falls below this many bytes, we check for a remap
63
_INTERESTING_NEW_SIZE = 50
64
# If a ChildNode shrinks by more than this amount, we check for a remap
65
_INTERESTING_SHRINKAGE_LIMIT = 20
66
# If we delete more than this many nodes applying a delta, we check for a remap
67
_INTERESTING_DELETES_LIMIT = 5
70
def _search_key_plain(key):
71
"""Map the key tuple into a search string that just uses the key bytes."""
72
return '\x00'.join(key)
75
search_key_registry = registry.Registry()
76
search_key_registry.register('plain', _search_key_plain)
80
"""A persistent map from string to string backed by a CHK store."""
82
def __init__(self, store, root_key, search_key_func=None):
83
"""Create a CHKMap object.
85
:param store: The store the CHKMap is stored in.
86
:param root_key: The root key of the map. None to create an empty
88
:param search_key_func: A function mapping a key => bytes. These bytes
89
are then used by the internal nodes to split up leaf nodes into
93
if search_key_func is None:
94
search_key_func = _search_key_plain
95
self._search_key_func = search_key_func
97
self._root_node = LeafNode(search_key_func=search_key_func)
99
self._root_node = self._node_key(root_key)
101
def apply_delta(self, delta):
102
"""Apply a delta to the map.
104
:param delta: An iterable of old_key, new_key, new_value tuples.
105
If new_key is not None, then new_key->new_value is inserted
106
into the map; if old_key is not None, then the old mapping
107
of old_key is removed.
110
for old, new, value in delta:
111
if old is not None and old != new:
112
self.unmap(old, check_remap=False)
114
for old, new, value in delta:
117
if delete_count > _INTERESTING_DELETES_LIMIT:
118
trace.mutter("checking remap as %d deletions", delete_count)
122
def _ensure_root(self):
123
"""Ensure that the root node is an object not a key."""
124
if type(self._root_node) == tuple:
125
# Demand-load the root
126
self._root_node = self._get_node(self._root_node)
128
def _get_node(self, node):
131
Note that this does not update the _items dict in objects containing a
132
reference to this node. As such it does not prevent subsequent IO being
135
:param node: A tuple key or node object.
136
:return: A node object.
138
if type(node) == tuple:
139
bytes = self._read_bytes(node)
140
return _deserialise(bytes, node,
141
search_key_func=self._search_key_func)
145
def _read_bytes(self, key):
147
return _page_cache[key]
149
stream = self._store.get_record_stream([key], 'unordered', True)
150
bytes = stream.next().get_bytes_as('fulltext')
151
_page_cache[key] = bytes
154
def _dump_tree(self, include_keys=False):
155
"""Return the tree in a string representation."""
157
res = self._dump_tree_node(self._root_node, prefix='', indent='',
158
include_keys=include_keys)
159
res.append('') # Give a trailing '\n'
160
return '\n'.join(res)
162
def _dump_tree_node(self, node, prefix, indent, include_keys=True):
163
"""For this node and all children, generate a string representation."""
168
node_key = node.key()
169
if node_key is not None:
170
key_str = ' %s' % (node_key[0],)
173
result.append('%s%r %s%s' % (indent, prefix, node.__class__.__name__,
175
if isinstance(node, InternalNode):
176
# Trigger all child nodes to get loaded
177
list(node._iter_nodes(self._store))
178
for prefix, sub in sorted(node._items.iteritems()):
179
result.extend(self._dump_tree_node(sub, prefix, indent + ' ',
180
include_keys=include_keys))
182
for key, value in sorted(node._items.iteritems()):
183
# Don't use prefix nor indent here to line up when used in
184
# tests in conjunction with assertEqualDiff
185
result.append(' %r %r' % (key, value))
189
def from_dict(klass, store, initial_value, maximum_size=0, key_width=1,
190
search_key_func=None):
191
"""Create a CHKMap in store with initial_value as the content.
193
:param store: The store to record initial_value in, a VersionedFiles
194
object with 1-tuple keys supporting CHK key generation.
195
:param initial_value: A dict to store in store. Its keys and values
197
:param maximum_size: The maximum_size rule to apply to nodes. This
198
determines the size at which no new data is added to a single node.
199
:param key_width: The number of elements in each key_tuple being stored
201
:param search_key_func: A function mapping a key => bytes. These bytes
202
are then used by the internal nodes to split up leaf nodes into
204
:return: The root chk of the resulting CHKMap.
206
result = CHKMap(store, None, search_key_func=search_key_func)
207
result._root_node.set_maximum_size(maximum_size)
208
result._root_node._key_width = key_width
210
for key, value in initial_value.items():
211
delta.append((None, key, value))
212
return result.apply_delta(delta)
214
def iter_changes(self, basis):
215
"""Iterate over the changes between basis and self.
217
:return: An iterator of tuples: (key, old_value, new_value). Old_value
218
is None for keys only in self; new_value is None for keys only in
222
# Read both trees in lexographic, highest-first order.
223
# Any identical nodes we skip
224
# Any unique prefixes we output immediately.
225
# values in a leaf node are treated as single-value nodes in the tree
226
# which allows them to be not-special-cased. We know to output them
227
# because their value is a string, not a key(tuple) or node.
229
# corner cases to beware of when considering this function:
230
# *) common references are at different heights.
231
# consider two trees:
232
# {'a': LeafNode={'aaa':'foo', 'aab':'bar'}, 'b': LeafNode={'b'}}
233
# {'a': InternalNode={'aa':LeafNode={'aaa':'foo', 'aab':'bar'},
234
# 'ab':LeafNode={'ab':'bar'}}
235
# 'b': LeafNode={'b'}}
236
# the node with aaa/aab will only be encountered in the second tree
237
# after reading the 'a' subtree, but it is encountered in the first
238
# tree immediately. Variations on this may have read internal nodes
239
# like this. we want to cut the entire pending subtree when we
240
# realise we have a common node. For this we use a list of keys -
241
# the path to a node - and check the entire path is clean as we
243
if self._node_key(self._root_node) == self._node_key(basis._root_node):
247
excluded_keys = set()
248
self_node = self._root_node
249
basis_node = basis._root_node
250
# A heap, each element is prefix, node(tuple/NodeObject/string),
251
# key_path (a list of tuples, tail-sharing down the tree.)
254
def process_node(node, path, a_map, pending):
255
# take a node and expand it
256
node = a_map._get_node(node)
257
if type(node) == LeafNode:
258
path = (node._key, path)
259
for key, value in node._items.items():
260
# For a LeafNode, the key is a serialized_key, rather than
261
# a search_key, but the heap is using search_keys
262
search_key = node._search_key_func(key)
263
heapq.heappush(pending, (search_key, key, value, path))
265
# type(node) == InternalNode
266
path = (node._key, path)
267
for prefix, child in node._items.items():
268
heapq.heappush(pending, (prefix, None, child, path))
269
def process_common_internal_nodes(self_node, basis_node):
270
self_items = set(self_node._items.items())
271
basis_items = set(basis_node._items.items())
272
path = (self_node._key, None)
273
for prefix, child in self_items - basis_items:
274
heapq.heappush(self_pending, (prefix, None, child, path))
275
path = (basis_node._key, None)
276
for prefix, child in basis_items - self_items:
277
heapq.heappush(basis_pending, (prefix, None, child, path))
278
def process_common_leaf_nodes(self_node, basis_node):
279
self_items = set(self_node._items.items())
280
basis_items = set(basis_node._items.items())
281
path = (self_node._key, None)
282
for key, value in self_items - basis_items:
283
prefix = self._search_key_func(key)
284
heapq.heappush(self_pending, (prefix, key, value, path))
285
path = (basis_node._key, None)
286
for key, value in basis_items - self_items:
287
prefix = basis._search_key_func(key)
288
heapq.heappush(basis_pending, (prefix, key, value, path))
289
def process_common_prefix_nodes(self_node, self_path,
290
basis_node, basis_path):
291
# Would it be more efficient if we could request both at the same
293
self_node = self._get_node(self_node)
294
basis_node = basis._get_node(basis_node)
295
if (type(self_node) == InternalNode
296
and type(basis_node) == InternalNode):
297
# Matching internal nodes
298
process_common_internal_nodes(self_node, basis_node)
299
elif (type(self_node) == LeafNode
300
and type(basis_node) == LeafNode):
301
process_common_leaf_nodes(self_node, basis_node)
303
process_node(self_node, self_path, self, self_pending)
304
process_node(basis_node, basis_path, basis, basis_pending)
305
process_common_prefix_nodes(self_node, None, basis_node, None)
308
excluded_keys = set()
309
def check_excluded(key_path):
310
# Note that this is N^2, it depends on us trimming trees
311
# aggressively to not become slow.
312
# A better implementation would probably have a reverse map
313
# back to the children of a node, and jump straight to it when
314
# a common node is detected, the proceed to remove the already
315
# pending children. bzrlib.graph has a searcher module with a
317
while key_path is not None:
318
key, key_path = key_path
319
if key in excluded_keys:
324
while self_pending or basis_pending:
327
# self is exhausted: output remainder of basis
328
for prefix, key, node, path in basis_pending:
329
if check_excluded(path):
331
node = basis._get_node(node)
334
yield (key, node, None)
336
# subtree - fastpath the entire thing.
337
for key, value in node.iteritems(basis._store):
338
yield (key, value, None)
340
elif not basis_pending:
341
# basis is exhausted: output remainder of self.
342
for prefix, key, node, path in self_pending:
343
if check_excluded(path):
345
node = self._get_node(node)
348
yield (key, None, node)
350
# subtree - fastpath the entire thing.
351
for key, value in node.iteritems(self._store):
352
yield (key, None, value)
355
# XXX: future optimisation - yield the smaller items
356
# immediately rather than pushing everything on/off the
357
# heaps. Applies to both internal nodes and leafnodes.
358
if self_pending[0][0] < basis_pending[0][0]:
360
prefix, key, node, path = heapq.heappop(self_pending)
361
if check_excluded(path):
365
yield (key, None, node)
367
process_node(node, path, self, self_pending)
369
elif self_pending[0][0] > basis_pending[0][0]:
371
prefix, key, node, path = heapq.heappop(basis_pending)
372
if check_excluded(path):
376
yield (key, node, None)
378
process_node(node, path, basis, basis_pending)
381
# common prefix: possibly expand both
382
if self_pending[0][1] is None:
387
if basis_pending[0][1] is None:
392
if not read_self and not read_basis:
393
# compare a common value
394
self_details = heapq.heappop(self_pending)
395
basis_details = heapq.heappop(basis_pending)
396
if self_details[2] != basis_details[2]:
397
yield (self_details[1],
398
basis_details[2], self_details[2])
400
# At least one side wasn't a simple value
401
if (self._node_key(self_pending[0][2]) ==
402
self._node_key(basis_pending[0][2])):
403
# Identical pointers, skip (and don't bother adding to
404
# excluded, it won't turn up again.
405
heapq.heappop(self_pending)
406
heapq.heappop(basis_pending)
408
# Now we need to expand this node before we can continue
409
if read_self and read_basis:
410
# Both sides start with the same prefix, so process
412
self_prefix, _, self_node, self_path = heapq.heappop(
414
basis_prefix, _, basis_node, basis_path = heapq.heappop(
416
assert self_prefix == basis_prefix
417
process_common_prefix_nodes(
418
self_node, self_path,
419
basis_node, basis_path)
422
prefix, key, node, path = heapq.heappop(self_pending)
423
if check_excluded(path):
425
process_node(node, path, self, self_pending)
427
prefix, key, node, path = heapq.heappop(basis_pending)
428
if check_excluded(path):
430
process_node(node, path, basis, basis_pending)
433
def iteritems(self, key_filter=None):
434
"""Iterate over the entire CHKMap's contents."""
436
return self._root_node.iteritems(self._store, key_filter=key_filter)
439
"""Return the key for this map."""
440
if isinstance(self._root_node, tuple):
441
return self._root_node
443
return self._root_node._key
447
return len(self._root_node)
449
def map(self, key, value):
450
"""Map a key tuple to value."""
451
# Need a root object.
453
prefix, node_details = self._root_node.map(self._store, key, value)
454
if len(node_details) == 1:
455
self._root_node = node_details[0][1]
457
self._root_node = InternalNode(prefix,
458
search_key_func=self._search_key_func)
459
self._root_node.set_maximum_size(node_details[0][1].maximum_size)
460
self._root_node._key_width = node_details[0][1]._key_width
461
for split, node in node_details:
462
self._root_node.add_node(split, node)
464
def _node_key(self, node):
465
"""Get the key for a node whether it's a tuple or node."""
466
if type(node) == tuple:
471
def unmap(self, key, check_remap=True):
472
"""remove key from the map."""
474
if isinstance(self._root_node, InternalNode):
475
unmapped = self._root_node.unmap(self._store, key,
476
check_remap=check_remap)
478
unmapped = self._root_node.unmap(self._store, key)
479
self._root_node = unmapped
481
def _check_remap(self):
482
"""Check if nodes can be collapsed."""
484
if isinstance(self._root_node, InternalNode):
485
self._root_node._check_remap(self._store)
488
"""Save the map completely.
490
:return: The key of the root node.
492
if type(self._root_node) == tuple:
494
return self._root_node
495
keys = list(self._root_node.serialise(self._store))
500
"""Base class defining the protocol for CHK Map nodes.
502
:ivar _raw_size: The total size of the serialized key:value data, before
503
adding the header bytes, and without prefix compression.
506
def __init__(self, key_width=1):
509
:param key_width: The width of keys for this node.
512
# Current number of elements
514
self._maximum_size = 0
515
self._key_width = key_width
516
# current size in bytes
518
# The pointers/values this node has - meaning defined by child classes.
520
# The common search prefix
521
self._search_prefix = None
524
items_str = str(sorted(self._items))
525
if len(items_str) > 20:
526
items_str = items_str[:16] + '...]'
527
return '%s(key:%s len:%s size:%s max:%s prefix:%s items:%s)' % (
528
self.__class__.__name__, self._key, self._len, self._raw_size,
529
self._maximum_size, self._search_prefix, items_str)
538
def maximum_size(self):
539
"""What is the upper limit for adding references to a node."""
540
return self._maximum_size
542
def set_maximum_size(self, new_size):
543
"""Set the size threshold for nodes.
545
:param new_size: The size at which no data is added to a node. 0 for
548
self._maximum_size = new_size
551
def common_prefix(cls, prefix, key):
552
"""Given 2 strings, return the longest prefix common to both.
554
:param prefix: This has been the common prefix for other keys, so it is
555
more likely to be the common prefix in this case as well.
556
:param key: Another string to compare to
558
if key.startswith(prefix):
560
# Is there a better way to do this?
561
for pos, (left, right) in enumerate(zip(prefix, key)):
565
common = prefix[:pos+1]
569
def common_prefix_for_keys(cls, keys):
570
"""Given a list of keys, find their common prefix.
572
:param keys: An iterable of strings.
573
:return: The longest common prefix of all keys.
577
if common_prefix is None:
580
common_prefix = cls.common_prefix(common_prefix, key)
581
if not common_prefix:
582
# if common_prefix is the empty string, then we know it won't
588
# Singleton indicating we have not computed _search_prefix yet
591
class LeafNode(Node):
592
"""A node containing actual key:value pairs.
594
:ivar _items: A dict of key->value items. The key is in tuple form.
595
:ivar _size: The number of bytes that would be used by serializing all of
599
def __init__(self, search_key_func=None):
601
# All of the keys in this leaf node share this common prefix
602
self._common_serialised_prefix = None
603
self._serialise_key = '\x00'.join
604
if search_key_func is None:
605
self._search_key_func = _search_key_plain
607
self._search_key_func = search_key_func
610
items_str = str(sorted(self._items))
611
if len(items_str) > 20:
612
items_str = items_str[:16] + '...]'
614
'%s(key:%s len:%s size:%s max:%s prefix:%s keywidth:%s items:%s)' \
615
% (self.__class__.__name__, self._key, self._len, self._raw_size,
616
self._maximum_size, self._search_prefix, self._key_width, items_str)
618
def _current_size(self):
619
"""Answer the current serialised size of this node.
621
This differs from self._raw_size in that it includes the bytes used for
624
if self._common_serialised_prefix is None:
628
# We will store a single string with the common prefix
629
# And then that common prefix will not be stored in any of the
631
prefix_len = len(self._common_serialised_prefix)
632
bytes_for_items = (self._raw_size - (prefix_len * self._len))
633
return (9 # 'chkleaf:\n'
634
+ len(str(self._maximum_size)) + 1
635
+ len(str(self._key_width)) + 1
636
+ len(str(self._len)) + 1
641
def deserialise(klass, bytes, key, search_key_func=None):
642
"""Deserialise bytes, with key key, into a LeafNode.
644
:param bytes: The bytes of the node.
645
:param key: The key that the serialised node has.
647
return _deserialise_leaf_node(bytes, key,
648
search_key_func=search_key_func)
650
def iteritems(self, store, key_filter=None):
651
"""Iterate over items in the node.
653
:param key_filter: A filter to apply to the node. It should be a
654
list/set/dict or similar repeatedly iterable container.
656
if key_filter is not None:
657
# Adjust the filter - short elements go to a prefix filter. Would this
658
# be cleaner explicitly? That would be no harder for InternalNode..
659
# XXX: perhaps defaultdict? Profiling<rinse and repeat>
661
for key in key_filter:
662
length_filter = filters.setdefault(len(key), set())
663
length_filter.add(key)
664
filters = filters.items()
665
for item in self._items.iteritems():
666
for length, length_filter in filters:
667
if item[0][:length] in length_filter:
671
for item in self._items.iteritems():
674
def _key_value_len(self, key, value):
675
# TODO: Should probably be done without actually joining the key, but
676
# then that can be done via the C extension
677
return (len(self._serialise_key(key)) + 1
678
+ len(str(value.count('\n'))) + 1
681
def _search_key(self, key):
682
return self._search_key_func(key)
684
def _map_no_split(self, key, value):
685
"""Map a key to a value.
687
This assumes either the key does not already exist, or you have already
688
removed its size and length from self.
690
:return: True if adding this node should cause us to split.
692
self._items[key] = value
693
self._raw_size += self._key_value_len(key, value)
695
serialised_key = self._serialise_key(key)
696
if self._common_serialised_prefix is None:
697
self._common_serialised_prefix = serialised_key
699
self._common_serialised_prefix = self.common_prefix(
700
self._common_serialised_prefix, serialised_key)
701
search_key = self._search_key(key)
702
if self._search_prefix is _unknown:
703
self._compute_search_prefix()
704
if self._search_prefix is None:
705
self._search_prefix = search_key
707
self._search_prefix = self.common_prefix(
708
self._search_prefix, search_key)
710
and self._maximum_size
711
and self._current_size() > self._maximum_size):
712
# Check to see if all of the search_keys for this node are
713
# identical. We allow the node to grow under that circumstance
714
# (we could track this as common state, but it is infrequent)
715
if (search_key != self._search_prefix
716
or not self._are_search_keys_identical()):
720
def _split(self, store):
721
"""We have overflowed.
723
Split this node into multiple LeafNodes, return it up the stack so that
724
the next layer creates a new InternalNode and references the new nodes.
726
:return: (common_serialised_prefix, [(node_serialised_prefix, node)])
728
assert self._search_prefix is not _unknown
729
common_prefix = self._search_prefix
730
split_at = len(common_prefix) + 1
732
for key, value in self._items.iteritems():
733
search_key = self._search_key(key)
734
prefix = search_key[:split_at]
735
# TODO: Generally only 1 key can be exactly the right length,
736
# which means we can only have 1 key in the node pointed
737
# at by the 'prefix\0' key. We might want to consider
738
# folding it into the containing InternalNode rather than
739
# having a fixed length-1 node.
740
# Note this is probably not true for hash keys, as they
741
# may get a '\00' node anywhere, but won't have keys of
743
if len(prefix) < split_at:
744
prefix += '\x00'*(split_at - len(prefix))
745
if prefix not in result:
746
node = LeafNode(search_key_func=self._search_key_func)
747
node.set_maximum_size(self._maximum_size)
748
node._key_width = self._key_width
749
result[prefix] = node
751
node = result[prefix]
752
node.map(store, key, value)
753
return common_prefix, result.items()
755
def map(self, store, key, value):
756
"""Map key to value."""
757
if key in self._items:
758
self._raw_size -= self._key_value_len(key, self._items[key])
761
if self._map_no_split(key, value):
762
return self._split(store)
764
assert self._search_prefix is not _unknown
765
return self._search_prefix, [("", self)]
767
def serialise(self, store):
768
"""Serialise the LeafNode to store.
770
:param store: A VersionedFiles honouring the CHK extensions.
771
:return: An iterable of the keys inserted by this operation.
773
lines = ["chkleaf:\n"]
774
lines.append("%d\n" % self._maximum_size)
775
lines.append("%d\n" % self._key_width)
776
lines.append("%d\n" % self._len)
777
if self._common_serialised_prefix is None:
779
if len(self._items) != 0:
780
raise AssertionError('If _common_serialised_prefix is None'
781
' we should have no items')
783
lines.append('%s\n' % (self._common_serialised_prefix,))
784
prefix_len = len(self._common_serialised_prefix)
785
for key, value in sorted(self._items.items()):
786
# Always add a final newline
787
value_lines = osutils.chunks_to_lines([value + '\n'])
788
serialized = "%s\x00%s\n" % (self._serialise_key(key),
790
if not serialized.startswith(self._common_serialised_prefix):
791
raise AssertionError('We thought the common prefix was %r'
792
' but entry %r does not have it in common'
793
% (self._common_serialised_prefix, serialized))
794
lines.append(serialized[prefix_len:])
795
lines.extend(value_lines)
796
sha1, _, _ = store.add_lines((None,), (), lines)
797
self._key = ("sha1:" + sha1,)
798
bytes = ''.join(lines)
799
if len(bytes) != self._current_size():
800
raise AssertionError('Invalid _current_size')
801
_page_cache.add(self._key, bytes)
805
"""Return the references to other CHK's held by this node."""
808
def _compute_search_prefix(self):
809
"""Determine the common search prefix for all keys in this node.
811
:return: A bytestring of the longest search key prefix that is
812
unique within this node.
814
search_keys = [self._search_key_func(key) for key in self._items]
815
self._search_prefix = self.common_prefix_for_keys(search_keys)
816
return self._search_prefix
818
def _are_search_keys_identical(self):
819
"""Check to see if the search keys for all entries are the same.
821
When using a hash as the search_key it is possible for non-identical
822
keys to collide. If that happens enough, we may try overflow a
823
LeafNode, but as all are collisions, we must not split.
825
common_search_key = None
826
for key in self._items:
827
search_key = self._search_key(key)
828
if common_search_key is None:
829
common_search_key = search_key
830
elif search_key != common_search_key:
834
def _compute_serialised_prefix(self):
835
"""Determine the common prefix for serialised keys in this node.
837
:return: A bytestring of the longest serialised key prefix that is
838
unique within this node.
840
serialised_keys = [self._serialise_key(key) for key in self._items]
841
self._common_serialised_prefix = self.common_prefix_for_keys(
843
return self._common_serialised_prefix
845
def unmap(self, store, key):
846
"""Unmap key from the node."""
848
self._raw_size -= self._key_value_len(key, self._items[key])
850
trace.mutter("key %s not found in %r", key, self._items)
855
# Recompute from scratch
856
self._compute_search_prefix()
857
self._compute_serialised_prefix()
861
class InternalNode(Node):
862
"""A node that contains references to other nodes.
864
An InternalNode is responsible for mapping search key prefixes to child
867
:ivar _items: serialised_key => node dictionary. node may be a tuple,
868
LeafNode or InternalNode.
871
def __init__(self, prefix='', search_key_func=None):
873
# The size of an internalnode with default values and no children.
874
# How many octets key prefixes within this node are.
876
self._search_prefix = prefix
877
if search_key_func is None:
878
self._search_key_func = _search_key_plain
880
self._search_key_func = search_key_func
882
def add_node(self, prefix, node):
883
"""Add a child node with prefix prefix, and node node.
885
:param prefix: The search key prefix for node.
886
:param node: The node being added.
888
if self._search_prefix is None:
889
raise AssertionError("_search_prefix should not be None")
890
if not prefix.startswith(self._search_prefix):
891
raise AssertionError("prefixes mismatch: %s must start with %s"
892
% (prefix,self._search_prefix))
893
if len(prefix) != len(self._search_prefix) + 1:
894
raise AssertionError("prefix wrong length: len(%s) is not %d" %
895
(prefix, len(self._search_prefix) + 1))
896
self._len += len(node)
897
if not len(self._items):
898
self._node_width = len(prefix)
899
if self._node_width != len(self._search_prefix) + 1:
900
raise AssertionError("node width mismatch: %d is not %d" %
901
(self._node_width, len(self._search_prefix) + 1))
902
self._items[prefix] = node
905
def _current_size(self):
906
"""Answer the current serialised size of this node."""
907
return (self._raw_size + len(str(self._len)) + len(str(self._key_width)) +
908
len(str(self._maximum_size)))
911
def deserialise(klass, bytes, key, search_key_func=None):
912
"""Deserialise bytes to an InternalNode, with key key.
914
:param bytes: The bytes of the node.
915
:param key: The key that the serialised node has.
916
:return: An InternalNode instance.
918
return _deserialise_internal_node(bytes, key,
919
search_key_func=search_key_func)
921
def iteritems(self, store, key_filter=None):
922
for node in self._iter_nodes(store, key_filter=key_filter):
923
for item in node.iteritems(store, key_filter=key_filter):
926
def _iter_nodes(self, store, key_filter=None, batch_size=None):
927
"""Iterate over node objects which match key_filter.
929
:param store: A store to use for accessing content.
930
:param key_filter: A key filter to filter nodes. Only nodes that might
931
contain a key in key_filter will be returned.
932
:param batch_size: If not None, then we will return the nodes that had
933
to be read using get_record_stream in batches, rather than reading
935
:return: An iterable of nodes. This function does not have to be fully
936
consumed. (There will be no pending I/O when items are being returned.)
939
if key_filter is None:
940
for prefix, node in self._items.iteritems():
941
if type(node) == tuple:
948
for key in key_filter:
949
search_key = self._search_prefix_filter(key)
950
length_filter = length_filters.setdefault(
951
len(search_key), set())
952
length_filter.add(search_key)
953
length_filters = length_filters.items()
954
for prefix, node in self._items.iteritems():
955
for length, length_filter in length_filters:
956
if prefix[:length] in length_filter:
957
if type(node) == tuple:
963
# Look in the page cache for some more bytes
967
bytes = _page_cache[key]
971
node = _deserialise(bytes, key,
972
search_key_func=self._search_key_func)
973
self._items[keys[key]] = node
976
for key in found_keys:
979
# demand load some pages.
980
if batch_size is None:
981
# Read all the keys in
982
batch_size = len(keys)
983
key_order = list(keys)
984
for batch_start in range(0, len(key_order), batch_size):
985
batch = key_order[batch_start:batch_start + batch_size]
986
# We have to fully consume the stream so there is no pending
987
# I/O, so we buffer the nodes for now.
988
stream = store.get_record_stream(batch, 'unordered', True)
990
for record in stream:
991
bytes = record.get_bytes_as('fulltext')
992
node = _deserialise(bytes, record.key,
993
search_key_func=self._search_key_func)
995
self._items[keys[record.key]] = node
996
_page_cache.add(record.key, bytes)
1000
def map(self, store, key, value):
1001
"""Map key to value."""
1002
if not len(self._items):
1003
raise AssertionError("can't map in an empty InternalNode.")
1004
search_key = self._search_key(key)
1005
if self._node_width != len(self._search_prefix) + 1:
1006
raise AssertionError("node width mismatch: %d is not %d" %
1007
(self._node_width, len(self._search_prefix) + 1))
1008
if not search_key.startswith(self._search_prefix):
1009
# This key doesn't fit in this index, so we need to split at the
1010
# point where it would fit, insert self into that internal node,
1011
# and then map this key into that node.
1012
new_prefix = self.common_prefix(self._search_prefix,
1014
new_parent = InternalNode(new_prefix,
1015
search_key_func=self._search_key_func)
1016
new_parent.set_maximum_size(self._maximum_size)
1017
new_parent._key_width = self._key_width
1018
new_parent.add_node(self._search_prefix[:len(new_prefix)+1],
1020
return new_parent.map(store, key, value)
1021
children = list(self._iter_nodes(store, key_filter=[key]))
1026
child = self._new_child(search_key, LeafNode)
1027
old_len = len(child)
1028
if isinstance(child, LeafNode):
1029
old_size = child._current_size()
1032
prefix, node_details = child.map(store, key, value)
1033
if len(node_details) == 1:
1034
# child may have shrunk, or might be a new node
1035
child = node_details[0][1]
1036
self._len = self._len - old_len + len(child)
1037
self._items[search_key] = child
1040
if isinstance(child, LeafNode):
1041
if old_size is None:
1042
# The old node was an InternalNode which means it has now
1043
# collapsed, so we need to check if it will chain to a
1044
# collapse at this level.
1045
trace.mutter("checking remap as InternalNode -> LeafNode")
1046
new_node = self._check_remap(store)
1048
# If the LeafNode has shrunk in size, we may want to run
1049
# a remap check. Checking for a remap is expensive though
1050
# and the frequency of a successful remap is very low.
1051
# Shrinkage by small amounts is common, so we only do the
1052
# remap check if the new_size is low or the shrinkage
1053
# amount is over a configurable limit.
1054
new_size = child._current_size()
1055
shrinkage = old_size - new_size
1056
if (shrinkage > 0 and new_size < _INTERESTING_NEW_SIZE
1057
or shrinkage > _INTERESTING_SHRINKAGE_LIMIT):
1059
"checking remap as size shrunk by %d to be %d",
1060
shrinkage, new_size)
1061
new_node = self._check_remap(store)
1062
if new_node._search_prefix is None:
1063
raise AssertionError("_search_prefix should not be None")
1064
return new_node._search_prefix, [('', new_node)]
1065
# child has overflown - create a new intermediate node.
1066
# XXX: This is where we might want to try and expand our depth
1067
# to refer to more bytes of every child (which would give us
1068
# multiple pointers to child nodes, but less intermediate nodes)
1069
child = self._new_child(search_key, InternalNode)
1070
child._search_prefix = prefix
1071
for split, node in node_details:
1072
child.add_node(split, node)
1073
self._len = self._len - old_len + len(child)
1075
return self._search_prefix, [("", self)]
1077
def _new_child(self, search_key, klass):
1078
"""Create a new child node of type klass."""
1080
child.set_maximum_size(self._maximum_size)
1081
child._key_width = self._key_width
1082
child._search_key_func = self._search_key_func
1083
self._items[search_key] = child
1086
def serialise(self, store):
1087
"""Serialise the node to store.
1089
:param store: A VersionedFiles honouring the CHK extensions.
1090
:return: An iterable of the keys inserted by this operation.
1092
for node in self._items.itervalues():
1093
if type(node) == tuple:
1094
# Never deserialised.
1096
if node._key is not None:
1099
for key in node.serialise(store):
1101
lines = ["chknode:\n"]
1102
lines.append("%d\n" % self._maximum_size)
1103
lines.append("%d\n" % self._key_width)
1104
lines.append("%d\n" % self._len)
1105
if self._search_prefix is None:
1106
raise AssertionError("_search_prefix should not be None")
1107
lines.append('%s\n' % (self._search_prefix,))
1108
prefix_len = len(self._search_prefix)
1109
for prefix, node in sorted(self._items.items()):
1110
if type(node) == tuple:
1114
serialised = "%s\x00%s\n" % (prefix, key)
1115
if not serialised.startswith(self._search_prefix):
1116
raise AssertionError("prefixes mismatch: %s must start with %s"
1117
% (serialised, self._search_prefix))
1118
lines.append(serialised[prefix_len:])
1119
sha1, _, _ = store.add_lines((None,), (), lines)
1120
self._key = ("sha1:" + sha1,)
1121
_page_cache.add(self._key, ''.join(lines))
1124
def _search_key(self, key):
1125
"""Return the serialised key for key in this node."""
1126
# search keys are fixed width. All will be self._node_width wide, so we
1128
return (self._search_key_func(key) + '\x00'*self._node_width)[:self._node_width]
1130
def _search_prefix_filter(self, key):
1131
"""Serialise key for use as a prefix filter in iteritems."""
1132
return self._search_key_func(key)[:self._node_width]
1134
def _split(self, offset):
1135
"""Split this node into smaller nodes starting at offset.
1137
:param offset: The offset to start the new child nodes at.
1138
:return: An iterable of (prefix, node) tuples. prefix is a byte
1139
prefix for reaching node.
1141
if offset >= self._node_width:
1142
for node in self._items.values():
1143
for result in node._split(offset):
1146
for key, node in self._items.items():
1150
"""Return the references to other CHK's held by this node."""
1151
if self._key is None:
1152
raise AssertionError("unserialised nodes have no refs.")
1154
for value in self._items.itervalues():
1155
if type(value) == tuple:
1158
refs.append(value.key())
1161
def _compute_search_prefix(self, extra_key=None):
1162
"""Return the unique key prefix for this node.
1164
:return: A bytestring of the longest search key prefix that is
1165
unique within this node.
1167
self._search_prefix = self.common_prefix_for_keys(self._items)
1168
return self._search_prefix
1170
def unmap(self, store, key, check_remap=True):
1171
"""Remove key from this node and it's children."""
1172
if not len(self._items):
1173
raise AssertionError("can't unmap in an empty InternalNode.")
1174
children = list(self._iter_nodes(store, key_filter=[key]))
1180
unmapped = child.unmap(store, key)
1182
search_key = self._search_key(key)
1183
if len(unmapped) == 0:
1184
# All child nodes are gone, remove the child:
1185
del self._items[search_key]
1188
# Stash the returned node
1189
self._items[search_key] = unmapped
1190
if len(self._items) == 1:
1191
# this node is no longer needed:
1192
return self._items.values()[0]
1193
if isinstance(unmapped, InternalNode):
1196
return self._check_remap(store)
1200
def _check_remap(self, store):
1201
"""Check if all keys contained by children fit in a single LeafNode.
1203
:param store: A store to use for reading more nodes
1204
:return: Either self, or a new LeafNode which should replace self.
1206
# Logic for how we determine when we need to rebuild
1207
# 1) Implicitly unmap() is removing a key which means that the child
1208
# nodes are going to be shrinking by some extent.
1209
# 2) If all children are LeafNodes, it is possible that they could be
1210
# combined into a single LeafNode, which can then completely replace
1211
# this internal node with a single LeafNode
1212
# 3) If *one* child is an InternalNode, we assume it has already done
1213
# all the work to determine that its children cannot collapse, and
1214
# we can then assume that those nodes *plus* the current nodes don't
1215
# have a chance of collapsing either.
1216
# So a very cheap check is to just say if 'unmapped' is an
1217
# InternalNode, we don't have to check further.
1219
# TODO: Another alternative is to check the total size of all known
1220
# LeafNodes. If there is some formula we can use to determine the
1221
# final size without actually having to read in any more
1222
# children, it would be nice to have. However, we have to be
1223
# careful with stuff like nodes that pull out the common prefix
1224
# of each key, as adding a new key can change the common prefix
1225
# and cause size changes greater than the length of one key.
1226
# So for now, we just add everything to a new Leaf until it
1227
# splits, as we know that will give the right answer
1228
new_leaf = LeafNode(search_key_func=self._search_key_func)
1229
new_leaf.set_maximum_size(self._maximum_size)
1230
new_leaf._key_width = self._key_width
1231
# A batch_size of 16 was chosen because:
1232
# a) In testing, a 4k page held 14 times. So if we have more than 16
1233
# leaf nodes we are unlikely to hold them in a single new leaf
1234
# node. This still allows for 1 round trip
1235
# b) With 16-way fan out, we can still do a single round trip
1236
# c) With 255-way fan out, we don't want to read all 255 and destroy
1237
# the page cache, just to determine that we really don't need it.
1238
for node in self._iter_nodes(store, batch_size=16):
1239
if isinstance(node, InternalNode):
1240
# Without looking at any leaf nodes, we are sure
1242
for key, value in node._items.iteritems():
1243
if new_leaf._map_no_split(key, value):
1245
trace.mutter("remap generated a new LeafNode")
1249
def _deserialise(bytes, key, search_key_func):
1250
"""Helper for repositorydetails - convert bytes to a node."""
1251
if bytes.startswith("chkleaf:\n"):
1252
node = LeafNode.deserialise(bytes, key, search_key_func=search_key_func)
1253
elif bytes.startswith("chknode:\n"):
1254
node = InternalNode.deserialise(bytes, key,
1255
search_key_func=search_key_func)
1257
raise AssertionError("Unknown node type.")
1261
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1262
"""Read the associated records, and determine what is interesting."""
1263
uninteresting_keys = set(uninteresting_keys)
1264
chks_to_read = uninteresting_keys.union(interesting_keys)
1265
next_uninteresting = set()
1266
next_interesting = set()
1267
uninteresting_items = set()
1268
interesting_items = set()
1269
interesting_records = []
1270
# records_read = set()
1271
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1272
# records_read.add(record.key())
1275
bytes = record.get_bytes_as('fulltext')
1276
# We don't care about search_key_func for this code, because we only
1277
# care about external references.
1278
node = _deserialise(bytes, record.key, search_key_func=None)
1279
if record.key in uninteresting_keys:
1280
if isinstance(node, InternalNode):
1281
next_uninteresting.update(node.refs())
1283
# We know we are at a LeafNode, so we can pass None for the
1285
uninteresting_items.update(node.iteritems(None))
1287
interesting_records.append(record)
1288
if isinstance(node, InternalNode):
1289
next_interesting.update(node.refs())
1291
interesting_items.update(node.iteritems(None))
1292
# TODO: Filter out records that have already been read, as node splitting
1293
# can cause us to reference the same nodes via shorter and longer
1295
return (next_uninteresting, uninteresting_items,
1296
next_interesting, interesting_records, interesting_items)
1299
def _find_all_uninteresting(store, interesting_root_keys,
1300
uninteresting_root_keys, adapter, pb):
1301
"""Determine the full set of uninteresting keys."""
1302
# What about duplicates between interesting_root_keys and
1303
# uninteresting_root_keys?
1304
if not uninteresting_root_keys:
1305
# Shortcut case. We know there is nothing uninteresting to filter out
1306
# So we just let the rest of the algorithm do the work
1307
# We know there is nothing uninteresting, and we didn't have to read
1308
# any interesting records yet.
1309
return (set(), set(), set(interesting_root_keys), [], set())
1310
all_uninteresting_chks = set(uninteresting_root_keys)
1311
all_uninteresting_items = set()
1313
# First step, find the direct children of both the interesting and
1315
(uninteresting_keys, uninteresting_items,
1316
interesting_keys, interesting_records,
1317
interesting_items) = _find_children_info(store, interesting_root_keys,
1318
uninteresting_root_keys,
1320
all_uninteresting_chks.update(uninteresting_keys)
1321
all_uninteresting_items.update(uninteresting_items)
1322
del uninteresting_items
1323
# Note: Exact matches between interesting and uninteresting do not need
1324
# to be search further. Non-exact matches need to be searched in case
1325
# there is a future exact-match
1326
uninteresting_keys.difference_update(interesting_keys)
1328
# Second, find the full set of uninteresting bits reachable by the
1329
# uninteresting roots
1330
chks_to_read = uninteresting_keys
1333
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1334
# TODO: Handle 'absent'
1338
bytes = record.get_bytes_as('fulltext')
1339
except errors.UnavailableRepresentation:
1340
bytes = adapter.get_bytes(record)
1341
# We don't care about search_key_func for this code, because we
1342
# only care about external references.
1343
node = _deserialise(bytes, record.key, search_key_func=None)
1344
if isinstance(node, InternalNode):
1345
# uninteresting_prefix_chks.update(node._items.iteritems())
1346
chks = node._items.values()
1347
# TODO: We remove the entries that are already in
1348
# uninteresting_chks ?
1349
next_chks.update(chks)
1350
all_uninteresting_chks.update(chks)
1352
all_uninteresting_items.update(node._items.iteritems())
1353
chks_to_read = next_chks
1354
return (all_uninteresting_chks, all_uninteresting_items,
1355
interesting_keys, interesting_records, interesting_items)
1358
def iter_interesting_nodes(store, interesting_root_keys,
1359
uninteresting_root_keys, pb=None):
1360
"""Given root keys, find interesting nodes.
1362
Evaluate nodes referenced by interesting_root_keys. Ones that are also
1363
referenced from uninteresting_root_keys are not considered interesting.
1365
:param interesting_root_keys: keys which should be part of the
1366
"interesting" nodes (which will be yielded)
1367
:param uninteresting_root_keys: keys which should be filtered out of the
1370
(interesting records, interesting chk's, interesting key:values)
1372
# TODO: consider that it may be more memory efficient to use the 20-byte
1373
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1374
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1375
# helper function similar to _read_bytes. This function should be
1376
# able to use nodes from the _page_cache as well as actually
1377
# requesting bytes from the store.
1379
# A way to adapt from the compressed texts back into fulltexts
1380
# In a way, this seems like a layering inversion to have CHKMap know the
1381
# details of versionedfile
1382
adapter_class = versionedfile.adapter_registry.get(
1383
('knit-ft-gz', 'fulltext'))
1384
adapter = adapter_class(store)
1386
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1387
interesting_records, interesting_items) = _find_all_uninteresting(store,
1388
interesting_root_keys, uninteresting_root_keys, adapter, pb)
1390
# Now that we know everything uninteresting, we can yield information from
1392
interesting_items.difference_update(all_uninteresting_items)
1393
records = dict((record.key, record) for record in interesting_records
1394
if record.key not in all_uninteresting_chks)
1395
if records or interesting_items:
1396
yield records, interesting_items
1397
interesting_keys.difference_update(all_uninteresting_chks)
1399
chks_to_read = interesting_keys
1403
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1406
pb.update('find chk pages', counter)
1407
# TODO: Handle 'absent'?
1409
bytes = record.get_bytes_as('fulltext')
1410
except errors.UnavailableRepresentation:
1411
bytes = adapter.get_bytes(record)
1412
# We don't care about search_key_func for this code, because we
1413
# only care about external references.
1414
node = _deserialise(bytes, record.key, search_key_func=None)
1415
if isinstance(node, InternalNode):
1416
chks = set(node.refs())
1417
chks.difference_update(all_uninteresting_chks)
1418
# Is set() and .difference_update better than:
1419
# chks = [chk for chk in node.refs()
1420
# if chk not in all_uninteresting_chks]
1421
next_chks.update(chks)
1422
# These are now uninteresting everywhere else
1423
all_uninteresting_chks.update(chks)
1424
interesting_items = []
1426
interesting_items = [item for item in node._items.iteritems()
1427
if item not in all_uninteresting_items]
1428
# TODO: Do we need to filter out items that we have already
1429
# seen on other pages? We don't really want to buffer the
1430
# whole thing, but it does mean that callers need to
1431
# understand they may get duplicate values.
1432
# all_uninteresting_items.update(interesting_items)
1433
yield {record.key: record}, interesting_items
1434
chks_to_read = next_chks
1438
from bzrlib._chk_map_pyx import (
1441
_deserialise_leaf_node,
1442
_deserialise_internal_node,
1445
from bzrlib._chk_map_py import (
1448
_deserialise_leaf_node,
1449
_deserialise_internal_node,
1451
search_key_registry.register('hash-16-way', _search_key_16)
1452
search_key_registry.register('hash-255-way', _search_key_255)