1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
from bisect import bisect_right
23
from copy import deepcopy
38
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
39
from bzrlib.transport import get_transport
42
_BTSIGNATURE = "B+Tree Graph Index 2\n"
43
_OPTION_ROW_LENGTHS = "row_lengths="
44
_LEAF_FLAG = "type=leaf\n"
45
_INTERNAL_FLAG = "type=internal\n"
46
_INTERNAL_OFFSET = "offset="
48
_RESERVED_HEADER_BYTES = 120
51
# 4K per page: 4MB - 1000 entries
52
_NODE_CACHE_SIZE = 1000
55
class _BuilderRow(object):
56
"""The stored state accumulated while writing out a row in the index.
58
:ivar spool: A temporary file used to accumulate nodes for this row
60
:ivar nodes: The count of nodes emitted so far.
64
"""Create a _BuilderRow."""
66
self.spool = tempfile.TemporaryFile()
69
def finish_node(self, pad=True):
70
byte_lines, _, padding = self.writer.finish()
73
self.spool.write("\x00" * _RESERVED_HEADER_BYTES)
75
if not pad and padding:
77
skipped_bytes = padding
78
self.spool.writelines(byte_lines)
79
remainder = (self.spool.tell() + skipped_bytes) % _PAGE_SIZE
81
raise AssertionError("incorrect node length: %d, %d"
82
% (self.spool.tell(), remainder))
87
class _InternalBuilderRow(_BuilderRow):
88
"""The stored state accumulated while writing out internal rows."""
90
def finish_node(self, pad=True):
92
raise AssertionError("Must pad internal nodes only.")
93
_BuilderRow.finish_node(self)
96
class _LeafBuilderRow(_BuilderRow):
97
"""The stored state accumulated while writing out a leaf rows."""
100
class BTreeBuilder(index.GraphIndexBuilder):
101
"""A Builder for B+Tree based Graph indices.
103
The resulting graph has the structure:
105
_SIGNATURE OPTIONS NODES
106
_SIGNATURE := 'B+Tree Graph Index 1' NEWLINE
107
OPTIONS := REF_LISTS KEY_ELEMENTS LENGTH
108
REF_LISTS := 'node_ref_lists=' DIGITS NEWLINE
109
KEY_ELEMENTS := 'key_elements=' DIGITS NEWLINE
110
LENGTH := 'len=' DIGITS NEWLINE
111
ROW_LENGTHS := 'row_lengths' DIGITS (COMMA DIGITS)*
112
NODES := NODE_COMPRESSED*
113
NODE_COMPRESSED:= COMPRESSED_BYTES{4096}
114
NODE_RAW := INTERNAL | LEAF
115
INTERNAL := INTERNAL_FLAG POINTERS
116
LEAF := LEAF_FLAG ROWS
117
KEY_ELEMENT := Not-whitespace-utf8
118
KEY := KEY_ELEMENT (NULL KEY_ELEMENT)*
120
ROW := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE
122
REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1}
123
REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)?
125
VALUE := no-newline-no-null-bytes
128
def __init__(self, reference_lists=0, key_elements=1, spill_at=100000):
129
"""See GraphIndexBuilder.__init__.
131
:param spill_at: Optional parameter controlling the maximum number
132
of nodes that BTreeBuilder will hold in memory.
134
index.GraphIndexBuilder.__init__(self, reference_lists=reference_lists,
135
key_elements=key_elements)
136
self._spill_at = spill_at
137
self._backing_indices = []
138
# A map of {key: (node_refs, value)}
140
# Indicate it hasn't been built yet
141
self._nodes_by_key = None
143
def add_node(self, key, value, references=()):
144
"""Add a node to the index.
146
If adding the node causes the builder to reach its spill_at threshold,
147
disk spilling will be triggered.
149
:param key: The key. keys are non-empty tuples containing
150
as many whitespace-free utf8 bytestrings as the key length
151
defined for this index.
152
:param references: An iterable of iterables of keys. Each is a
153
reference to another key.
154
:param value: The value to associate with the key. It may be any
155
bytes as long as it does not contain \0 or \n.
157
# we don't care about absent_references
158
node_refs, _ = self._check_key_ref_value(key, references, value)
159
if key in self._nodes:
160
raise errors.BadIndexDuplicateKey(key, self)
161
self._nodes[key] = (node_refs, value)
163
if self._nodes_by_key is not None and self._key_length > 1:
164
self._update_nodes_by_key(key, value, node_refs)
165
if len(self._keys) < self._spill_at:
167
self._spill_mem_keys_to_disk()
169
def _spill_mem_keys_to_disk(self):
170
"""Write the in memory keys down to disk to cap memory consumption.
172
If we already have some keys written to disk, we will combine them so
173
as to preserve the sorted order. The algorithm for combining uses
174
powers of two. So on the first spill, write all mem nodes into a
175
single index. On the second spill, combine the mem nodes with the nodes
176
on disk to create a 2x sized disk index and get rid of the first index.
177
On the third spill, create a single new disk index, which will contain
178
the mem nodes, and preserve the existing 2x sized index. On the fourth,
179
combine mem with the first and second indexes, creating a new one of
180
size 4x. On the fifth create a single new one, etc.
182
iterators_to_combine = [self._iter_mem_nodes()]
184
for pos, backing in enumerate(self._backing_indices):
188
iterators_to_combine.append(backing.iter_all_entries())
189
backing_pos = pos + 1
190
new_backing_file, size = \
191
self._write_nodes(self._iter_smallest(iterators_to_combine))
192
dir_path, base_name = osutils.split(new_backing_file.name)
193
# Note: The transport here isn't strictly needed, because we will use
194
# direct access to the new_backing._file object
195
new_backing = BTreeGraphIndex(get_transport(dir_path),
197
# GC will clean up the file
198
new_backing._file = new_backing_file
199
if len(self._backing_indices) == backing_pos:
200
self._backing_indices.append(None)
201
self._backing_indices[backing_pos] = new_backing
202
for pos in range(backing_pos):
203
self._backing_indices[pos] = None
206
self._nodes_by_key = None
208
def add_nodes(self, nodes):
209
"""Add nodes to the index.
211
:param nodes: An iterable of (key, node_refs, value) entries to add.
213
if self.reference_lists:
214
for (key, value, node_refs) in nodes:
215
self.add_node(key, value, node_refs)
217
for (key, value) in nodes:
218
self.add_node(key, value)
220
def _iter_mem_nodes(self):
221
"""Iterate over the nodes held in memory."""
223
if self.reference_lists:
224
for key in sorted(nodes):
225
references, value = nodes[key]
226
yield self, key, value, references
228
for key in sorted(nodes):
229
references, value = nodes[key]
230
yield self, key, value
232
def _iter_smallest(self, iterators_to_combine):
233
if len(iterators_to_combine) == 1:
234
for value in iterators_to_combine[0]:
238
for iterator in iterators_to_combine:
240
current_values.append(iterator.next())
241
except StopIteration:
242
current_values.append(None)
245
# Decorate candidates with the value to allow 2.4's min to be used.
246
candidates = [(item[1][1], item) for item
247
in enumerate(current_values) if item[1] is not None]
248
if not len(candidates):
250
selected = min(candidates)
251
# undecorate back to (pos, node)
252
selected = selected[1]
253
if last == selected[1][1]:
254
raise errors.BadIndexDuplicateKey(last, self)
255
last = selected[1][1]
256
# Yield, with self as the index
257
yield (self,) + selected[1][1:]
260
current_values[pos] = iterators_to_combine[pos].next()
261
except StopIteration:
262
current_values[pos] = None
264
def _add_key(self, string_key, line, rows):
265
"""Add a key to the current chunk.
267
:param string_key: The key to add.
268
:param line: The fully serialised key and value.
270
if rows[-1].writer is None:
271
# opening a new leaf chunk;
272
for pos, internal_row in enumerate(rows[:-1]):
273
# flesh out any internal nodes that are needed to
274
# preserve the height of the tree
275
if internal_row.writer is None:
277
if internal_row.nodes == 0:
278
length -= _RESERVED_HEADER_BYTES # padded
279
internal_row.writer = chunk_writer.ChunkWriter(length, 0)
280
internal_row.writer.write(_INTERNAL_FLAG)
281
internal_row.writer.write(_INTERNAL_OFFSET +
282
str(rows[pos + 1].nodes) + "\n")
285
if rows[-1].nodes == 0:
286
length -= _RESERVED_HEADER_BYTES # padded
287
rows[-1].writer = chunk_writer.ChunkWriter(length)
288
rows[-1].writer.write(_LEAF_FLAG)
289
if rows[-1].writer.write(line):
290
# this key did not fit in the node:
291
rows[-1].finish_node()
292
key_line = string_key + "\n"
294
for row in reversed(rows[:-1]):
295
# Mark the start of the next node in the node above. If it
296
# doesn't fit then propogate upwards until we find one that
298
if row.writer.write(key_line):
301
# We've found a node that can handle the pointer.
304
# If we reached the current root without being able to mark the
305
# division point, then we need a new root:
308
if 'index' in debug.debug_flags:
309
trace.mutter('Inserting new global row.')
310
new_row = _InternalBuilderRow()
312
rows.insert(0, new_row)
313
# This will be padded, hence the -100
314
new_row.writer = chunk_writer.ChunkWriter(
315
_PAGE_SIZE - _RESERVED_HEADER_BYTES,
317
new_row.writer.write(_INTERNAL_FLAG)
318
new_row.writer.write(_INTERNAL_OFFSET +
319
str(rows[1].nodes - 1) + "\n")
320
new_row.writer.write(key_line)
321
self._add_key(string_key, line, rows)
323
def _write_nodes(self, node_iterator):
324
"""Write node_iterator out as a B+Tree.
326
:param node_iterator: An iterator of sorted nodes. Each node should
327
match the output given by iter_all_entries.
328
:return: A file handle for a temporary file containing a B+Tree for
331
# The index rows - rows[0] is the root, rows[1] is the layer under it
334
# forward sorted by key. In future we may consider topological sorting,
335
# at the cost of table scans for direct lookup, or a second index for
338
# A stack with the number of nodes of each size. 0 is the root node
339
# and must always be 1 (if there are any nodes in the tree).
340
self.row_lengths = []
341
# Loop over all nodes adding them to the bottom row
342
# (rows[-1]). When we finish a chunk in a row,
343
# propogate the key that didn't fit (comes after the chunk) to the
344
# row above, transitively.
345
for node in node_iterator:
347
# First key triggers the first row
348
rows.append(_LeafBuilderRow())
350
# TODO: Flattening the node into a string key and a line should
351
# probably be put into a pyrex function. We can do a quick
352
# iter over all the entries to determine the final length,
353
# and then do a single malloc() rather than lots of
354
# intermediate mallocs as we build everything up.
355
# ATM 3 / 13s are spent flattening nodes (10s is compressing)
356
string_key, line = _btree_serializer._flatten_node(node,
357
self.reference_lists)
358
self._add_key(string_key, line, rows)
359
for row in reversed(rows):
360
pad = (type(row) != _LeafBuilderRow)
361
row.finish_node(pad=pad)
362
result = tempfile.NamedTemporaryFile()
363
lines = [_BTSIGNATURE]
364
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
365
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
366
lines.append(_OPTION_LEN + str(key_count) + '\n')
367
row_lengths = [row.nodes for row in rows]
368
lines.append(_OPTION_ROW_LENGTHS + ','.join(map(str, row_lengths)) + '\n')
369
result.writelines(lines)
370
position = sum(map(len, lines))
372
if position > _RESERVED_HEADER_BYTES:
373
raise AssertionError("Could not fit the header in the"
374
" reserved space: %d > %d"
375
% (position, _RESERVED_HEADER_BYTES))
376
# write the rows out:
378
reserved = _RESERVED_HEADER_BYTES # reserved space for first node
381
# copy nodes to the finalised file.
382
# Special case the first node as it may be prefixed
383
node = row.spool.read(_PAGE_SIZE)
384
result.write(node[reserved:])
385
result.write("\x00" * (reserved - position))
386
position = 0 # Only the root row actually has an offset
387
copied_len = osutils.pumpfile(row.spool, result)
388
if copied_len != (row.nodes - 1) * _PAGE_SIZE:
389
if type(row) != _LeafBuilderRow:
390
raise AssertionError("Incorrect amount of data copied"
391
" expected: %d, got: %d"
392
% ((row.nodes - 1) * _PAGE_SIZE,
400
"""Finalise the index.
402
:return: A file handle for a temporary file containing the nodes added
405
return self._write_nodes(self.iter_all_entries())[0]
407
def iter_all_entries(self):
408
"""Iterate over all keys within the index
410
:return: An iterable of (index, key, reference_lists, value). There is no
411
defined order for the result iteration - it will be in the most
412
efficient order for the index (in this case dictionary hash order).
414
if 'evil' in debug.debug_flags:
415
trace.mutter_callsite(3,
416
"iter_all_entries scales with size of history.")
417
# Doing serial rather than ordered would be faster; but this shouldn't
418
# be getting called routinely anyway.
419
iterators = [self._iter_mem_nodes()]
420
for backing in self._backing_indices:
421
if backing is not None:
422
iterators.append(backing.iter_all_entries())
423
if len(iterators) == 1:
425
return self._iter_smallest(iterators)
427
def iter_entries(self, keys):
428
"""Iterate over keys within the index.
430
:param keys: An iterable providing the keys to be retrieved.
431
:return: An iterable of (index, key, value, reference_lists). There is no
432
defined order for the result iteration - it will be in the most
433
efficient order for the index (keys iteration order in this case).
436
if self.reference_lists:
437
for key in keys.intersection(self._keys):
438
node = self._nodes[key]
439
yield self, key, node[1], node[0]
441
for key in keys.intersection(self._keys):
442
node = self._nodes[key]
443
yield self, key, node[1]
444
keys.difference_update(self._keys)
445
for backing in self._backing_indices:
450
for node in backing.iter_entries(keys):
452
yield (self,) + node[1:]
454
def iter_entries_prefix(self, keys):
455
"""Iterate over keys within the index using prefix matching.
457
Prefix matching is applied within the tuple of a key, not to within
458
the bytestring of each key element. e.g. if you have the keys ('foo',
459
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
460
only the former key is returned.
462
:param keys: An iterable providing the key prefixes to be retrieved.
463
Each key prefix takes the form of a tuple the length of a key, but
464
with the last N elements 'None' rather than a regular bytestring.
465
The first element cannot be 'None'.
466
:return: An iterable as per iter_all_entries, but restricted to the
467
keys with a matching prefix to those supplied. No additional keys
468
will be returned, and every match that is in the index will be
471
# XXX: To much duplication with the GraphIndex class; consider finding
472
# a good place to pull out the actual common logic.
476
for backing in self._backing_indices:
479
for node in backing.iter_entries_prefix(keys):
480
yield (self,) + node[1:]
481
if self._key_length == 1:
485
raise errors.BadIndexKey(key)
486
if len(key) != self._key_length:
487
raise errors.BadIndexKey(key)
489
node = self._nodes[key]
492
if self.reference_lists:
493
yield self, key, node[1], node[0]
495
yield self, key, node[1]
500
raise errors.BadIndexKey(key)
501
if len(key) != self._key_length:
502
raise errors.BadIndexKey(key)
503
# find what it refers to:
504
key_dict = self._get_nodes_by_key()
506
# find the subdict to return
508
while len(elements) and elements[0] is not None:
509
key_dict = key_dict[elements[0]]
512
# a non-existant lookup.
517
key_dict = dicts.pop(-1)
518
# can't be empty or would not exist
519
item, value = key_dict.iteritems().next()
520
if type(value) == dict:
522
dicts.extend(key_dict.itervalues())
525
for value in key_dict.itervalues():
526
yield (self, ) + value
528
yield (self, ) + key_dict
530
def _get_nodes_by_key(self):
531
if self._nodes_by_key is None:
533
if self.reference_lists:
534
for key, (references, value) in self._nodes.iteritems():
535
key_dict = nodes_by_key
536
for subkey in key[:-1]:
537
key_dict = key_dict.setdefault(subkey, {})
538
key_dict[key[-1]] = key, value, references
540
for key, (references, value) in self._nodes.iteritems():
541
key_dict = nodes_by_key
542
for subkey in key[:-1]:
543
key_dict = key_dict.setdefault(subkey, {})
544
key_dict[key[-1]] = key, value
545
self._nodes_by_key = nodes_by_key
546
return self._nodes_by_key
549
"""Return an estimate of the number of keys in this index.
551
For InMemoryGraphIndex the estimate is exact.
553
return len(self._keys) + sum(backing.key_count() for backing in
554
self._backing_indices if backing is not None)
557
"""In memory index's have no known corruption at the moment."""
560
class _LeafNode(object):
561
"""A leaf node for a serialised B+Tree index."""
563
def __init__(self, bytes, key_length, ref_list_length):
564
"""Parse bytes to create a leaf node object."""
565
# splitlines mangles the \r delimiters.. don't use it.
566
self.keys = dict(_btree_serializer._parse_leaf_lines(bytes,
567
key_length, ref_list_length))
570
class _InternalNode(object):
571
"""An internal node for a serialised B+Tree index."""
573
def __init__(self, bytes):
574
"""Parse bytes to create an internal node object."""
575
# splitlines mangles the \r delimiters.. don't use it.
576
self.keys = self._parse_lines(bytes.split('\n'))
578
def _parse_lines(self, lines):
580
self.offset = int(lines[1][7:])
581
for line in lines[2:]:
584
nodes.append(tuple(line.split('\0')))
588
class BTreeGraphIndex(object):
589
"""Access to nodes via the standard GraphIndex interface for B+Tree's.
591
Individual nodes are held in a LRU cache. This holds the root node in
592
memory except when very large walks are done.
595
def __init__(self, transport, name, size):
596
"""Create a B+Tree index object on the index name.
598
:param transport: The transport to read data for the index from.
599
:param name: The file name of the index on transport.
600
:param size: Optional size of the index in bytes. This allows
601
compatibility with the GraphIndex API, as well as ensuring that
602
the initial read (to read the root node header) can be done
603
without over-reading even on empty indices, and on small indices
604
allows single-IO to read the entire index.
606
self._transport = transport
610
self._page_size = transport.recommended_page_size()
611
self._root_node = None
612
# Default max size is 100,000 leave values
613
self._leaf_value_cache = None # lru_cache.LRUCache(100*1000)
614
self._leaf_node_cache = lru_cache.LRUCache(_NODE_CACHE_SIZE)
615
self._internal_node_cache = lru_cache.LRUCache()
616
self._key_count = None
617
self._row_lengths = None
618
self._row_offsets = None # Start of each row, [-1] is the end
620
def __eq__(self, other):
621
"""Equal when self and other were created with the same parameters."""
623
type(self) == type(other) and
624
self._transport == other._transport and
625
self._name == other._name and
626
self._size == other._size)
628
def __ne__(self, other):
629
return not self.__eq__(other)
631
def _get_root_node(self):
632
if self._root_node is None:
633
# We may not have a root node yet
634
nodes = list(self._read_nodes([0]))
636
self._root_node = nodes[0][1]
637
return self._root_node
639
def _cache_nodes(self, nodes, cache):
640
"""Read nodes and cache them in the lru.
642
The nodes list supplied is sorted and then read from disk, each node
643
being inserted it into the _node_cache.
645
Note: Asking for more nodes than the _node_cache can contain will
646
result in some of the results being immediately discarded, to prevent
647
this an assertion is raised if more nodes are asked for than are
650
:return: A dict of {node_pos: node}
652
if len(nodes) > cache._max_cache:
653
trace.mutter('Requesting %s > %s nodes, not all will be cached',
654
len(nodes), cache._max_cache)
656
for node_pos, node in self._read_nodes(sorted(nodes)):
657
if node_pos == 0: # Special case
658
self._root_node = node
660
cache.add(node_pos, node)
661
found[node_pos] = node
664
def _get_nodes(self, cache, node_indexes):
667
for idx in node_indexes:
668
if idx == 0 and self._root_node is not None:
669
found[0] = self._root_node
672
found[idx] = cache[idx]
675
found.update(self._cache_nodes(needed, cache))
678
def _get_internal_nodes(self, node_indexes):
679
"""Get a node, from cache or disk.
681
After getting it, the node will be cached.
683
return self._get_nodes(self._internal_node_cache, node_indexes)
685
def _get_leaf_nodes(self, node_indexes):
686
"""Get a bunch of nodes, from cache or disk."""
687
found = self._get_nodes(self._leaf_node_cache, node_indexes)
688
if self._leaf_value_cache is not None:
689
for node in found.itervalues():
690
for key, value in node.keys.iteritems():
691
if key in self._leaf_value_cache:
692
# Don't add the rest of the keys, we've seen this node
695
self._leaf_value_cache[key] = value
698
def iter_all_entries(self):
699
"""Iterate over all keys within the index.
701
:return: An iterable of (index, key, value) or (index, key, value, reference_lists).
702
The former tuple is used when there are no reference lists in the
703
index, making the API compatible with simple key:value index types.
704
There is no defined order for the result iteration - it will be in
705
the most efficient order for the index.
707
if 'evil' in debug.debug_flags:
708
trace.mutter_callsite(3,
709
"iter_all_entries scales with size of history.")
710
if not self.key_count():
712
start_of_leaves = self._row_offsets[-2]
713
end_of_leaves = self._row_offsets[-1]
714
needed_nodes = range(start_of_leaves, end_of_leaves)
715
# We iterate strictly in-order so that we can use this function
716
# for spilling index builds to disk.
717
if self.node_ref_lists:
718
for _, node in self._read_nodes(needed_nodes):
719
for key, (value, refs) in sorted(node.keys.items()):
720
yield (self, key, value, refs)
722
for _, node in self._read_nodes(needed_nodes):
723
for key, (value, refs) in sorted(node.keys.items()):
724
yield (self, key, value)
727
def _multi_bisect_right(in_keys, fixed_keys):
728
"""Find the positions where each 'in_key' would fit in fixed_keys.
730
This is equivalent to doing "bisect_right" on each in_key into
733
:param in_keys: A sorted list of keys to match with fixed_keys
734
:param fixed_keys: A sorted list of keys to match against
735
:return: A list of (integer position, [key list]) tuples.
740
# no pointers in the fixed_keys list, which means everything must
742
return [(0, in_keys)]
744
# TODO: Iterating both lists will generally take M + N steps
745
# Bisecting each key will generally take M * log2 N steps.
746
# If we had an efficient way to compare, we could pick the method
747
# based on which has the fewer number of steps.
748
# There is also the argument that bisect_right is a compiled
749
# function, so there is even more to be gained.
750
# iter_steps = len(in_keys) + len(fixed_keys)
751
# bisect_steps = len(in_keys) * math.log(len(fixed_keys), 2)
752
if len(in_keys) == 1: # Bisect will always be faster for M = 1
753
return [(bisect_right(fixed_keys, in_keys[0]), in_keys)]
754
# elif bisect_steps < iter_steps:
756
# for key in in_keys:
757
# offsets.setdefault(bisect_right(fixed_keys, key),
759
# return [(o, offsets[o]) for o in sorted(offsets)]
760
in_keys_iter = iter(in_keys)
761
fixed_keys_iter = enumerate(fixed_keys)
762
cur_in_key = in_keys_iter.next()
763
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
765
class InputDone(Exception): pass
766
class FixedDone(Exception): pass
771
# TODO: Another possibility is that rather than iterating on each side,
772
# we could use a combination of bisecting and iterating. For
773
# example, while cur_in_key < fixed_key, bisect to find its
774
# point, then iterate all matching keys, then bisect (restricted
775
# to only the remainder) for the next one, etc.
778
if cur_in_key < cur_fixed_key:
780
cur_out = (cur_fixed_offset, cur_keys)
781
output.append(cur_out)
782
while cur_in_key < cur_fixed_key:
783
cur_keys.append(cur_in_key)
785
cur_in_key = in_keys_iter.next()
786
except StopIteration:
788
# At this point cur_in_key must be >= cur_fixed_key
789
# step the cur_fixed_key until we pass the cur key, or walk off
791
while cur_in_key >= cur_fixed_key:
793
cur_fixed_offset, cur_fixed_key = fixed_keys_iter.next()
794
except StopIteration:
797
# We consumed all of the input, nothing more to do
800
# There was some input left, but we consumed all of fixed, so we
801
# have to add one more for the tail
802
cur_keys = [cur_in_key]
803
cur_keys.extend(in_keys_iter)
804
cur_out = (len(fixed_keys), cur_keys)
805
output.append(cur_out)
808
def iter_entries(self, keys):
809
"""Iterate over keys within the index.
811
:param keys: An iterable providing the keys to be retrieved.
812
:return: An iterable as per iter_all_entries, but restricted to the
813
keys supplied. No additional keys will be returned, and every
814
key supplied that is in the index will be returned.
816
# 6 seconds spent in miss_torture using the sorted() line.
817
# Even with out of order disk IO it seems faster not to sort it when
818
# large queries are being made.
819
# However, now that we are doing multi-way bisecting, we need the keys
820
# in sorted order anyway. We could change the multi-way code to not
821
# require sorted order. (For example, it bisects for the first node,
822
# does an in-order search until a key comes before the current point,
823
# which it then bisects for, etc.)
824
keys = frozenset(keys)
828
if not self.key_count():
832
if self._leaf_value_cache is None:
836
value = self._leaf_value_cache.get(key, None)
837
if value is not None:
838
# This key is known not to be here, skip it
840
if self.node_ref_lists:
841
yield (self, key, value, refs)
843
yield (self, key, value)
845
needed_keys.append(key)
851
# 6 seconds spent in miss_torture using the sorted() line.
852
# Even with out of order disk IO it seems faster not to sort it when
853
# large queries are being made.
854
needed_keys = sorted(needed_keys)
856
nodes_and_keys = [(0, needed_keys)]
858
for row_pos, next_row_start in enumerate(self._row_offsets[1:-1]):
859
node_indexes = [idx for idx, s_keys in nodes_and_keys]
860
nodes = self._get_internal_nodes(node_indexes)
862
next_nodes_and_keys = []
863
for node_index, sub_keys in nodes_and_keys:
864
node = nodes[node_index]
865
positions = self._multi_bisect_right(sub_keys, node.keys)
866
node_offset = next_row_start + node.offset
867
next_nodes_and_keys.extend([(node_offset + pos, s_keys)
868
for pos, s_keys in positions])
869
nodes_and_keys = next_nodes_and_keys
870
# We should now be at the _LeafNodes
871
node_indexes = [idx for idx, s_keys in nodes_and_keys]
873
# TODO: We may *not* want to always read all the nodes in one
874
# big go. Consider setting a max size on this.
876
nodes = self._get_leaf_nodes(node_indexes)
877
for node_index, sub_keys in nodes_and_keys:
880
node = nodes[node_index]
881
for next_sub_key in sub_keys:
882
if next_sub_key in node.keys:
883
value, refs = node.keys[next_sub_key]
884
if self.node_ref_lists:
885
yield (self, next_sub_key, value, refs)
887
yield (self, next_sub_key, value)
889
def iter_entries_prefix(self, keys):
890
"""Iterate over keys within the index using prefix matching.
892
Prefix matching is applied within the tuple of a key, not to within
893
the bytestring of each key element. e.g. if you have the keys ('foo',
894
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
895
only the former key is returned.
897
WARNING: Note that this method currently causes a full index parse
898
unconditionally (which is reasonably appropriate as it is a means for
899
thunking many small indices into one larger one and still supplies
900
iter_all_entries at the thunk layer).
902
:param keys: An iterable providing the key prefixes to be retrieved.
903
Each key prefix takes the form of a tuple the length of a key, but
904
with the last N elements 'None' rather than a regular bytestring.
905
The first element cannot be 'None'.
906
:return: An iterable as per iter_all_entries, but restricted to the
907
keys with a matching prefix to those supplied. No additional keys
908
will be returned, and every match that is in the index will be
911
keys = sorted(set(keys))
914
# Load if needed to check key lengths
915
if self._key_count is None:
916
self._get_root_node()
917
# TODO: only access nodes that can satisfy the prefixes we are looking
918
# for. For now, to meet API usage (as this function is not used by
919
# current bzrlib) just suck the entire index and iterate in memory.
921
if self.node_ref_lists:
922
if self._key_length == 1:
923
for _1, key, value, refs in self.iter_all_entries():
924
nodes[key] = value, refs
927
for _1, key, value, refs in self.iter_all_entries():
928
key_value = key, value, refs
929
# For a key of (foo, bar, baz) create
930
# _nodes_by_key[foo][bar][baz] = key_value
931
key_dict = nodes_by_key
932
for subkey in key[:-1]:
933
key_dict = key_dict.setdefault(subkey, {})
934
key_dict[key[-1]] = key_value
936
if self._key_length == 1:
937
for _1, key, value in self.iter_all_entries():
941
for _1, key, value in self.iter_all_entries():
942
key_value = key, value
943
# For a key of (foo, bar, baz) create
944
# _nodes_by_key[foo][bar][baz] = key_value
945
key_dict = nodes_by_key
946
for subkey in key[:-1]:
947
key_dict = key_dict.setdefault(subkey, {})
948
key_dict[key[-1]] = key_value
949
if self._key_length == 1:
953
raise errors.BadIndexKey(key)
954
if len(key) != self._key_length:
955
raise errors.BadIndexKey(key)
957
if self.node_ref_lists:
958
value, node_refs = nodes[key]
959
yield self, key, value, node_refs
961
yield self, key, nodes[key]
968
raise errors.BadIndexKey(key)
969
if len(key) != self._key_length:
970
raise errors.BadIndexKey(key)
971
# find what it refers to:
972
key_dict = nodes_by_key
974
# find the subdict whose contents should be returned.
976
while len(elements) and elements[0] is not None:
977
key_dict = key_dict[elements[0]]
980
# a non-existant lookup.
985
key_dict = dicts.pop(-1)
986
# can't be empty or would not exist
987
item, value = key_dict.iteritems().next()
988
if type(value) == dict:
990
dicts.extend(key_dict.itervalues())
993
for value in key_dict.itervalues():
994
# each value is the key:value:node refs tuple
996
yield (self, ) + value
998
# the last thing looked up was a terminal element
999
yield (self, ) + key_dict
1001
def key_count(self):
1002
"""Return an estimate of the number of keys in this index.
1004
For BTreeGraphIndex the estimate is exact as it is contained in the
1007
if self._key_count is None:
1008
self._get_root_node()
1009
return self._key_count
1011
def _parse_header_from_bytes(self, bytes):
1012
"""Parse the header from a region of bytes.
1014
:param bytes: The data to parse.
1015
:return: An offset, data tuple such as readv yields, for the unparsed
1016
data. (which may be of length 0).
1018
signature = bytes[0:len(self._signature())]
1019
if not signature == self._signature():
1020
raise errors.BadIndexFormatSignature(self._name, BTreeGraphIndex)
1021
lines = bytes[len(self._signature()):].splitlines()
1022
options_line = lines[0]
1023
if not options_line.startswith(_OPTION_NODE_REFS):
1024
raise errors.BadIndexOptions(self)
1026
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
1028
raise errors.BadIndexOptions(self)
1029
options_line = lines[1]
1030
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
1031
raise errors.BadIndexOptions(self)
1033
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
1035
raise errors.BadIndexOptions(self)
1036
options_line = lines[2]
1037
if not options_line.startswith(_OPTION_LEN):
1038
raise errors.BadIndexOptions(self)
1040
self._key_count = int(options_line[len(_OPTION_LEN):])
1042
raise errors.BadIndexOptions(self)
1043
options_line = lines[3]
1044
if not options_line.startswith(_OPTION_ROW_LENGTHS):
1045
raise errors.BadIndexOptions(self)
1047
self._row_lengths = map(int, [length for length in
1048
options_line[len(_OPTION_ROW_LENGTHS):].split(',')
1051
raise errors.BadIndexOptions(self)
1054
for row in self._row_lengths:
1055
offsets.append(row_offset)
1057
offsets.append(row_offset)
1058
self._row_offsets = offsets
1060
# calculate the bytes we have processed
1061
header_end = (len(signature) + sum(map(len, lines[0:4])) + 4)
1062
return header_end, bytes[header_end:]
1064
def _read_nodes(self, nodes):
1065
"""Read some nodes from disk into the LRU cache.
1067
This performs a readv to get the node data into memory, and parses each
1068
node, the yields it to the caller. The nodes are requested in the
1069
supplied order. If possible doing sort() on the list before requesting
1070
a read may improve performance.
1072
:param nodes: The nodes to read. 0 - first node, 1 - second node etc.
1077
offset = index * _PAGE_SIZE
1080
# Root node - special case
1082
size = min(_PAGE_SIZE, self._size)
1084
stream = self._transport.get(self._name)
1085
start = stream.read(_PAGE_SIZE)
1086
# Avoid doing this again
1087
self._size = len(start)
1088
size = min(_PAGE_SIZE, self._size)
1090
size = min(size, self._size - offset)
1091
ranges.append((offset, size))
1094
if self._file is None:
1095
data_ranges = self._transport.readv(self._name, ranges)
1098
for offset, size in ranges:
1099
self._file.seek(offset)
1100
data_ranges.append((offset, self._file.read(size)))
1101
for offset, data in data_ranges:
1103
# extract the header
1104
offset, data = self._parse_header_from_bytes(data)
1107
bytes = zlib.decompress(data)
1108
if bytes.startswith(_LEAF_FLAG):
1109
node = _LeafNode(bytes, self._key_length, self.node_ref_lists)
1110
elif bytes.startswith(_INTERNAL_FLAG):
1111
node = _InternalNode(bytes)
1113
raise AssertionError("Unknown node type for %r" % bytes)
1114
yield offset / _PAGE_SIZE, node
1116
def _signature(self):
1117
"""The file signature for this index type."""
1121
"""Validate that everything in the index can be accessed."""
1122
# just read and parse every node.
1123
self._get_root_node()
1124
if len(self._row_lengths) > 1:
1125
start_node = self._row_offsets[1]
1127
# We shouldn't be reading anything anyway
1129
node_end = self._row_offsets[-1]
1130
for node in self._read_nodes(range(start_node, node_end)):
1135
from bzrlib import _btree_serializer_c as _btree_serializer
1137
from bzrlib import _btree_serializer_py as _btree_serializer