1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Indexing facilities."""
23
'GraphIndexPrefixAdapter',
27
from bisect import bisect_right
28
from cStringIO import StringIO
31
from bzrlib.lazy_import import lazy_import
32
lazy_import(globals(), """
33
from bzrlib import trace
34
from bzrlib.bisect_multi import bisect_multi_bytes
35
from bzrlib.revision import NULL_REVISION
36
from bzrlib.trace import mutter
44
_HEADER_READV = (0, 200)
45
_OPTION_KEY_ELEMENTS = "key_elements="
47
_OPTION_NODE_REFS = "node_ref_lists="
48
_SIGNATURE = "Bazaar Graph Index 1\n"
51
_whitespace_re = re.compile('[\t\n\x0b\x0c\r\x00 ]')
52
_newline_null_re = re.compile('[\n\0]')
55
class GraphIndexBuilder(object):
56
"""A builder that can build a GraphIndex.
58
The resulting graph has the structure:
60
_SIGNATURE OPTIONS NODES NEWLINE
61
_SIGNATURE := 'Bazaar Graph Index 1' NEWLINE
62
OPTIONS := 'node_ref_lists=' DIGITS NEWLINE
64
NODE := KEY NULL ABSENT? NULL REFERENCES NULL VALUE NEWLINE
65
KEY := Not-whitespace-utf8
67
REFERENCES := REFERENCE_LIST (TAB REFERENCE_LIST){node_ref_lists - 1}
68
REFERENCE_LIST := (REFERENCE (CR REFERENCE)*)?
69
REFERENCE := DIGITS ; digits is the byte offset in the index of the
71
VALUE := no-newline-no-null-bytes
74
def __init__(self, reference_lists=0, key_elements=1):
75
"""Create a GraphIndex builder.
77
:param reference_lists: The number of node references lists for each
79
:param key_elements: The number of bytestrings in each key.
81
self.reference_lists = reference_lists
83
# A dict of {key: (absent, ref_lists, value)}
85
self._nodes_by_key = None
86
self._key_length = key_elements
88
def _check_key(self, key):
89
"""Raise BadIndexKey if key is not a valid key for this index."""
90
if type(key) != tuple:
91
raise errors.BadIndexKey(key)
92
if self._key_length != len(key):
93
raise errors.BadIndexKey(key)
95
if not element or _whitespace_re.search(element) is not None:
96
raise errors.BadIndexKey(element)
98
def _get_nodes_by_key(self):
99
if self._nodes_by_key is None:
101
if self.reference_lists:
102
for key, (absent, references, value) in self._nodes.iteritems():
105
key_dict = nodes_by_key
106
for subkey in key[:-1]:
107
key_dict = key_dict.setdefault(subkey, {})
108
key_dict[key[-1]] = key, value, references
110
for key, (absent, references, value) in self._nodes.iteritems():
113
key_dict = nodes_by_key
114
for subkey in key[:-1]:
115
key_dict = key_dict.setdefault(subkey, {})
116
key_dict[key[-1]] = key, value
117
self._nodes_by_key = nodes_by_key
118
return self._nodes_by_key
120
def _update_nodes_by_key(self, key, value, node_refs):
121
"""Update the _nodes_by_key dict with a new key.
123
For a key of (foo, bar, baz) create
124
_nodes_by_key[foo][bar][baz] = key_value
126
if self._nodes_by_key is None:
128
key_dict = self._nodes_by_key
129
if self.reference_lists:
130
key_value = key, value, node_refs
132
key_value = key, value
133
for subkey in key[:-1]:
134
key_dict = key_dict.setdefault(subkey, {})
135
key_dict[key[-1]] = key_value
137
def _check_key_ref_value(self, key, references, value):
138
"""Check that 'key' and 'references' are all valid.
140
:param key: A key tuple. Must conform to the key interface (be a tuple,
141
be of the right length, not have any whitespace or nulls in any key
143
:param references: An iterable of reference lists. Something like
144
[[(ref, key)], [(ref, key), (other, key)]]
145
:param value: The value associate with this key. Must not contain
146
newlines or null characters.
147
:return: (node_refs, absent_references)
148
node_refs basically a packed form of 'references' where all
150
absent_references reference keys that are not in self._nodes.
151
This may contain duplicates if the same key is
152
referenced in multiple lists.
155
if _newline_null_re.search(value) is not None:
156
raise errors.BadIndexValue(value)
157
if len(references) != self.reference_lists:
158
raise errors.BadIndexValue(references)
160
absent_references = []
161
for reference_list in references:
162
for reference in reference_list:
163
# If reference *is* in self._nodes, then we know it has already
165
if reference not in self._nodes:
166
self._check_key(reference)
167
absent_references.append(reference)
168
node_refs.append(tuple(reference_list))
169
return tuple(node_refs), absent_references
171
def add_node(self, key, value, references=()):
172
"""Add a node to the index.
174
:param key: The key. keys are non-empty tuples containing
175
as many whitespace-free utf8 bytestrings as the key length
176
defined for this index.
177
:param references: An iterable of iterables of keys. Each is a
178
reference to another key.
179
:param value: The value to associate with the key. It may be any
180
bytes as long as it does not contain \0 or \n.
183
absent_references) = self._check_key_ref_value(key, references, value)
184
if key in self._nodes and self._nodes[key][0] != 'a':
185
raise errors.BadIndexDuplicateKey(key, self)
186
for reference in absent_references:
187
# There may be duplicates, but I don't think it is worth worrying
189
self._nodes[reference] = ('a', (), '')
190
self._nodes[key] = ('', node_refs, value)
192
if self._nodes_by_key is not None and self._key_length > 1:
193
self._update_nodes_by_key(key, value, node_refs)
197
lines.append(_OPTION_NODE_REFS + str(self.reference_lists) + '\n')
198
lines.append(_OPTION_KEY_ELEMENTS + str(self._key_length) + '\n')
199
lines.append(_OPTION_LEN + str(len(self._keys)) + '\n')
200
prefix_length = sum(len(x) for x in lines)
201
# references are byte offsets. To avoid having to do nasty
202
# polynomial work to resolve offsets (references to later in the
203
# file cannot be determined until all the inbetween references have
204
# been calculated too) we pad the offsets with 0's to make them be
205
# of consistent length. Using binary offsets would break the trivial
207
# to calculate the width of zero's needed we do three passes:
208
# one to gather all the non-reference data and the number of references.
209
# one to pad all the data with reference-length and determine entry
213
# forward sorted by key. In future we may consider topological sorting,
214
# at the cost of table scans for direct lookup, or a second index for
216
nodes = sorted(self._nodes.items())
217
# if we do not prepass, we don't know how long it will be up front.
218
expected_bytes = None
219
# we only need to pre-pass if we have reference lists at all.
220
if self.reference_lists:
222
non_ref_bytes = prefix_length
224
# TODO use simple multiplication for the constants in this loop.
225
for key, (absent, references, value) in nodes:
226
# record the offset known *so far* for this key:
227
# the non reference bytes to date, and the total references to
228
# date - saves reaccumulating on the second pass
229
key_offset_info.append((key, non_ref_bytes, total_references))
230
# key is literal, value is literal, there are 3 null's, 1 NL
231
# key is variable length tuple, \x00 between elements
232
non_ref_bytes += sum(len(element) for element in key)
233
if self._key_length > 1:
234
non_ref_bytes += self._key_length - 1
235
# value is literal bytes, there are 3 null's, 1 NL.
236
non_ref_bytes += len(value) + 3 + 1
237
# one byte for absent if set.
240
elif self.reference_lists:
241
# (ref_lists -1) tabs
242
non_ref_bytes += self.reference_lists - 1
243
# (ref-1 cr's per ref_list)
244
for ref_list in references:
245
# how many references across the whole file?
246
total_references += len(ref_list)
247
# accrue reference separators
249
non_ref_bytes += len(ref_list) - 1
250
# how many digits are needed to represent the total byte count?
252
possible_total_bytes = non_ref_bytes + total_references*digits
253
while 10 ** digits < possible_total_bytes:
255
possible_total_bytes = non_ref_bytes + total_references*digits
256
expected_bytes = possible_total_bytes + 1 # terminating newline
257
# resolve key addresses.
259
for key, non_ref_bytes, total_references in key_offset_info:
260
key_addresses[key] = non_ref_bytes + total_references*digits
262
format_string = '%%0%sd' % digits
263
for key, (absent, references, value) in nodes:
264
flattened_references = []
265
for ref_list in references:
267
for reference in ref_list:
268
ref_addresses.append(format_string % key_addresses[reference])
269
flattened_references.append('\r'.join(ref_addresses))
270
string_key = '\x00'.join(key)
271
lines.append("%s\x00%s\x00%s\x00%s\n" % (string_key, absent,
272
'\t'.join(flattened_references), value))
274
result = StringIO(''.join(lines))
275
if expected_bytes and len(result.getvalue()) != expected_bytes:
276
raise errors.BzrError('Failed index creation. Internal error:'
277
' mismatched output length and expected length: %d %d' %
278
(len(result.getvalue()), expected_bytes))
282
class GraphIndex(object):
283
"""An index for data with embedded graphs.
285
The index maps keys to a list of key reference lists, and a value.
286
Each node has the same number of key reference lists. Each key reference
287
list can be empty or an arbitrary length. The value is an opaque NULL
288
terminated string without any newlines. The storage of the index is
289
hidden in the interface: keys and key references are always tuples of
290
bytestrings, never the internal representation (e.g. dictionary offsets).
292
It is presumed that the index will not be mutated - it is static data.
294
Successive iter_all_entries calls will read the entire index each time.
295
Additionally, iter_entries calls will read the index linearly until the
296
desired keys are found. XXX: This must be fixed before the index is
297
suitable for production use. :XXX
300
def __init__(self, transport, name, size):
301
"""Open an index called name on transport.
303
:param transport: A bzrlib.transport.Transport.
304
:param name: A path to provide to transport API calls.
305
:param size: The size of the index in bytes. This is used for bisection
306
logic to perform partial index reads. While the size could be
307
obtained by statting the file this introduced an additional round
308
trip as well as requiring stat'able transports, both of which are
309
avoided by having it supplied. If size is None, then bisection
310
support will be disabled and accessing the index will just stream
313
self._transport = transport
315
# Becomes a dict of key:(value, reference-list-byte-locations) used by
316
# the bisection interface to store parsed but not resolved keys.
317
self._bisect_nodes = None
318
# Becomes a dict of key:(value, reference-list-keys) which are ready to
319
# be returned directly to callers.
321
# a sorted list of slice-addresses for the parsed bytes of the file.
322
# e.g. (0,1) would mean that byte 0 is parsed.
323
self._parsed_byte_map = []
324
# a sorted list of keys matching each slice address for parsed bytes
325
# e.g. (None, 'foo@bar') would mean that the first byte contained no
326
# key, and the end byte of the slice is the of the data for 'foo@bar'
327
self._parsed_key_map = []
328
self._key_count = None
329
self._keys_by_offset = None
330
self._nodes_by_key = None
332
# The number of bytes we've read so far in trying to process this file
335
def __eq__(self, other):
336
"""Equal when self and other were created with the same parameters."""
338
type(self) == type(other) and
339
self._transport == other._transport and
340
self._name == other._name and
341
self._size == other._size)
343
def __ne__(self, other):
344
return not self.__eq__(other)
347
return "%s(%r)" % (self.__class__.__name__,
348
self._transport.abspath(self._name))
350
def _buffer_all(self, stream=None):
351
"""Buffer all the index data.
353
Mutates self._nodes and self.keys_by_offset.
355
if self._nodes is not None:
356
# We already did this
358
if 'index' in debug.debug_flags:
359
mutter('Reading entire index %s', self._transport.abspath(self._name))
361
stream = self._transport.get(self._name)
362
self._read_prefix(stream)
363
self._expected_elements = 3 + self._key_length
365
# raw data keyed by offset
366
self._keys_by_offset = {}
367
# ready-to-return key:value or key:value, node_ref_lists
369
self._nodes_by_key = None
372
lines = stream.read().split('\n')
374
_, _, _, trailers = self._parse_lines(lines, pos)
375
for key, absent, references, value in self._keys_by_offset.itervalues():
378
# resolve references:
379
if self.node_ref_lists:
380
node_value = (value, self._resolve_references(references))
383
self._nodes[key] = node_value
384
# cache the keys for quick set intersections
385
self._keys = set(self._nodes)
387
# there must be one line - the empty trailer line.
388
raise errors.BadIndexData(self)
390
def _get_nodes_by_key(self):
391
if self._nodes_by_key is None:
393
if self.node_ref_lists:
394
for key, (value, references) in self._nodes.iteritems():
395
key_dict = nodes_by_key
396
for subkey in key[:-1]:
397
key_dict = key_dict.setdefault(subkey, {})
398
key_dict[key[-1]] = key, value, references
400
for key, value in self._nodes.iteritems():
401
key_dict = nodes_by_key
402
for subkey in key[:-1]:
403
key_dict = key_dict.setdefault(subkey, {})
404
key_dict[key[-1]] = key, value
405
self._nodes_by_key = nodes_by_key
406
return self._nodes_by_key
408
def iter_all_entries(self):
409
"""Iterate over all keys within the index.
411
:return: An iterable of (index, key, value) or (index, key, value, reference_lists).
412
The former tuple is used when there are no reference lists in the
413
index, making the API compatible with simple key:value index types.
414
There is no defined order for the result iteration - it will be in
415
the most efficient order for the index.
417
if 'evil' in debug.debug_flags:
418
trace.mutter_callsite(3,
419
"iter_all_entries scales with size of history.")
420
if self._nodes is None:
422
if self.node_ref_lists:
423
for key, (value, node_ref_lists) in self._nodes.iteritems():
424
yield self, key, value, node_ref_lists
426
for key, value in self._nodes.iteritems():
427
yield self, key, value
429
def _read_prefix(self, stream):
430
signature = stream.read(len(self._signature()))
431
if not signature == self._signature():
432
raise errors.BadIndexFormatSignature(self._name, GraphIndex)
433
options_line = stream.readline()
434
if not options_line.startswith(_OPTION_NODE_REFS):
435
raise errors.BadIndexOptions(self)
437
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):-1])
439
raise errors.BadIndexOptions(self)
440
options_line = stream.readline()
441
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
442
raise errors.BadIndexOptions(self)
444
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):-1])
446
raise errors.BadIndexOptions(self)
447
options_line = stream.readline()
448
if not options_line.startswith(_OPTION_LEN):
449
raise errors.BadIndexOptions(self)
451
self._key_count = int(options_line[len(_OPTION_LEN):-1])
453
raise errors.BadIndexOptions(self)
455
def _resolve_references(self, references):
456
"""Return the resolved key references for references.
458
References are resolved by looking up the location of the key in the
459
_keys_by_offset map and substituting the key name, preserving ordering.
461
:param references: An iterable of iterables of key locations. e.g.
463
:return: A tuple of tuples of keys.
466
for ref_list in references:
467
node_refs.append(tuple([self._keys_by_offset[ref][0] for ref in ref_list]))
468
return tuple(node_refs)
470
def _find_index(self, range_map, key):
471
"""Helper for the _parsed_*_index calls.
473
Given a range map - [(start, end), ...], finds the index of the range
474
in the map for key if it is in the map, and if it is not there, the
475
immediately preceeding range in the map.
477
result = bisect_right(range_map, key) - 1
478
if result + 1 < len(range_map):
479
# check the border condition, it may be in result + 1
480
if range_map[result + 1][0] == key[0]:
484
def _parsed_byte_index(self, offset):
485
"""Return the index of the entry immediately before offset.
487
e.g. if the parsed map has regions 0,10 and 11,12 parsed, meaning that
488
there is one unparsed byte (the 11th, addressed as[10]). then:
489
asking for 0 will return 0
490
asking for 10 will return 0
491
asking for 11 will return 1
492
asking for 12 will return 1
495
return self._find_index(self._parsed_byte_map, key)
497
def _parsed_key_index(self, key):
498
"""Return the index of the entry immediately before key.
500
e.g. if the parsed map has regions (None, 'a') and ('b','c') parsed,
501
meaning that keys from None to 'a' inclusive, and 'b' to 'c' inclusive
502
have been parsed, then:
503
asking for '' will return 0
504
asking for 'a' will return 0
505
asking for 'b' will return 1
506
asking for 'e' will return 1
508
search_key = (key, None)
509
return self._find_index(self._parsed_key_map, search_key)
511
def _is_parsed(self, offset):
512
"""Returns True if offset has been parsed."""
513
index = self._parsed_byte_index(offset)
514
if index == len(self._parsed_byte_map):
515
return offset < self._parsed_byte_map[index - 1][1]
516
start, end = self._parsed_byte_map[index]
517
return offset >= start and offset < end
519
def _iter_entries_from_total_buffer(self, keys):
520
"""Iterate over keys when the entire index is parsed."""
521
keys = keys.intersection(self._keys)
522
if self.node_ref_lists:
524
value, node_refs = self._nodes[key]
525
yield self, key, value, node_refs
528
yield self, key, self._nodes[key]
530
def iter_entries(self, keys):
531
"""Iterate over keys within the index.
533
:param keys: An iterable providing the keys to be retrieved.
534
:return: An iterable as per iter_all_entries, but restricted to the
535
keys supplied. No additional keys will be returned, and every
536
key supplied that is in the index will be returned.
541
if self._size is None and self._nodes is None:
544
# We fit about 20 keys per minimum-read (4K), so if we are looking for
545
# more than 1/20th of the index its likely (assuming homogenous key
546
# spread) that we'll read the entire index. If we're going to do that,
547
# buffer the whole thing. A better analysis might take key spread into
548
# account - but B+Tree indices are better anyway.
549
# We could look at all data read, and use a threshold there, which will
550
# trigger on ancestry walks, but that is not yet fully mapped out.
551
if self._nodes is None and len(keys) * 20 > self.key_count():
553
if self._nodes is not None:
554
return self._iter_entries_from_total_buffer(keys)
556
return (result[1] for result in bisect_multi_bytes(
557
self._lookup_keys_via_location, self._size, keys))
559
def iter_entries_prefix(self, keys):
560
"""Iterate over keys within the index using prefix matching.
562
Prefix matching is applied within the tuple of a key, not to within
563
the bytestring of each key element. e.g. if you have the keys ('foo',
564
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
565
only the former key is returned.
567
WARNING: Note that this method currently causes a full index parse
568
unconditionally (which is reasonably appropriate as it is a means for
569
thunking many small indices into one larger one and still supplies
570
iter_all_entries at the thunk layer).
572
:param keys: An iterable providing the key prefixes to be retrieved.
573
Each key prefix takes the form of a tuple the length of a key, but
574
with the last N elements 'None' rather than a regular bytestring.
575
The first element cannot be 'None'.
576
:return: An iterable as per iter_all_entries, but restricted to the
577
keys with a matching prefix to those supplied. No additional keys
578
will be returned, and every match that is in the index will be
584
# load data - also finds key lengths
585
if self._nodes is None:
587
if self._key_length == 1:
591
raise errors.BadIndexKey(key)
592
if len(key) != self._key_length:
593
raise errors.BadIndexKey(key)
594
if self.node_ref_lists:
595
value, node_refs = self._nodes[key]
596
yield self, key, value, node_refs
598
yield self, key, self._nodes[key]
600
nodes_by_key = self._get_nodes_by_key()
604
raise errors.BadIndexKey(key)
605
if len(key) != self._key_length:
606
raise errors.BadIndexKey(key)
607
# find what it refers to:
608
key_dict = nodes_by_key
610
# find the subdict whose contents should be returned.
612
while len(elements) and elements[0] is not None:
613
key_dict = key_dict[elements[0]]
616
# a non-existant lookup.
621
key_dict = dicts.pop(-1)
622
# can't be empty or would not exist
623
item, value = key_dict.iteritems().next()
624
if type(value) == dict:
626
dicts.extend(key_dict.itervalues())
629
for value in key_dict.itervalues():
630
# each value is the key:value:node refs tuple
632
yield (self, ) + value
634
# the last thing looked up was a terminal element
635
yield (self, ) + key_dict
638
"""Return an estimate of the number of keys in this index.
640
For GraphIndex the estimate is exact.
642
if self._key_count is None:
643
self._read_and_parse([_HEADER_READV])
644
return self._key_count
646
def _lookup_keys_via_location(self, location_keys):
647
"""Public interface for implementing bisection.
649
If _buffer_all has been called, then all the data for the index is in
650
memory, and this method should not be called, as it uses a separate
651
cache because it cannot pre-resolve all indices, which buffer_all does
654
:param location_keys: A list of location(byte offset), key tuples.
655
:return: A list of (location_key, result) tuples as expected by
656
bzrlib.bisect_multi.bisect_multi_bytes.
658
# Possible improvements:
659
# - only bisect lookup each key once
660
# - sort the keys first, and use that to reduce the bisection window
662
# this progresses in three parts:
665
# attempt to answer the question from the now in memory data.
666
# build the readv request
667
# for each location, ask for 800 bytes - much more than rows we've seen
670
for location, key in location_keys:
671
# can we answer from cache?
672
if self._bisect_nodes and key in self._bisect_nodes:
673
# We have the key parsed.
675
index = self._parsed_key_index(key)
676
if (len(self._parsed_key_map) and
677
self._parsed_key_map[index][0] <= key and
678
(self._parsed_key_map[index][1] >= key or
679
# end of the file has been parsed
680
self._parsed_byte_map[index][1] == self._size)):
681
# the key has been parsed, so no lookup is needed even if its
684
# - if we have examined this part of the file already - yes
685
index = self._parsed_byte_index(location)
686
if (len(self._parsed_byte_map) and
687
self._parsed_byte_map[index][0] <= location and
688
self._parsed_byte_map[index][1] > location):
689
# the byte region has been parsed, so no read is needed.
692
if location + length > self._size:
693
length = self._size - location
694
# todo, trim out parsed locations.
696
readv_ranges.append((location, length))
697
# read the header if needed
698
if self._bisect_nodes is None:
699
readv_ranges.append(_HEADER_READV)
700
self._read_and_parse(readv_ranges)
702
if self._nodes is not None:
703
# _read_and_parse triggered a _buffer_all because we requested the
705
for location, key in location_keys:
706
if key not in self._nodes: # not present
707
result.append(((location, key), False))
708
elif self.node_ref_lists:
709
value, refs = self._nodes[key]
710
result.append(((location, key),
711
(self, key, value, refs)))
713
result.append(((location, key),
714
(self, key, self._nodes[key])))
717
# - figure out <, >, missing, present
718
# - result present references so we can return them.
719
# keys that we cannot answer until we resolve references
720
pending_references = []
721
pending_locations = set()
722
for location, key in location_keys:
723
# can we answer from cache?
724
if key in self._bisect_nodes:
725
# the key has been parsed, so no lookup is needed
726
if self.node_ref_lists:
727
# the references may not have been all parsed.
728
value, refs = self._bisect_nodes[key]
729
wanted_locations = []
730
for ref_list in refs:
732
if ref not in self._keys_by_offset:
733
wanted_locations.append(ref)
735
pending_locations.update(wanted_locations)
736
pending_references.append((location, key))
738
result.append(((location, key), (self, key,
739
value, self._resolve_references(refs))))
741
result.append(((location, key),
742
(self, key, self._bisect_nodes[key])))
745
# has the region the key should be in, been parsed?
746
index = self._parsed_key_index(key)
747
if (self._parsed_key_map[index][0] <= key and
748
(self._parsed_key_map[index][1] >= key or
749
# end of the file has been parsed
750
self._parsed_byte_map[index][1] == self._size)):
751
result.append(((location, key), False))
753
# no, is the key above or below the probed location:
754
# get the range of the probed & parsed location
755
index = self._parsed_byte_index(location)
756
# if the key is below the start of the range, its below
757
if key < self._parsed_key_map[index][0]:
761
result.append(((location, key), direction))
763
# lookup data to resolve references
764
for location in pending_locations:
766
if location + length > self._size:
767
length = self._size - location
768
# TODO: trim out parsed locations (e.g. if the 800 is into the
769
# parsed region trim it, and dont use the adjust_for_latency
772
readv_ranges.append((location, length))
773
self._read_and_parse(readv_ranges)
774
if self._nodes is not None:
775
# The _read_and_parse triggered a _buffer_all, grab the data and
777
for location, key in pending_references:
778
value, refs = self._nodes[key]
779
result.append(((location, key), (self, key, value, refs)))
781
for location, key in pending_references:
782
# answer key references we had to look-up-late.
783
value, refs = self._bisect_nodes[key]
784
result.append(((location, key), (self, key,
785
value, self._resolve_references(refs))))
788
def _parse_header_from_bytes(self, bytes):
789
"""Parse the header from a region of bytes.
791
:param bytes: The data to parse.
792
:return: An offset, data tuple such as readv yields, for the unparsed
793
data. (which may length 0).
795
signature = bytes[0:len(self._signature())]
796
if not signature == self._signature():
797
raise errors.BadIndexFormatSignature(self._name, GraphIndex)
798
lines = bytes[len(self._signature()):].splitlines()
799
options_line = lines[0]
800
if not options_line.startswith(_OPTION_NODE_REFS):
801
raise errors.BadIndexOptions(self)
803
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
805
raise errors.BadIndexOptions(self)
806
options_line = lines[1]
807
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
808
raise errors.BadIndexOptions(self)
810
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
812
raise errors.BadIndexOptions(self)
813
options_line = lines[2]
814
if not options_line.startswith(_OPTION_LEN):
815
raise errors.BadIndexOptions(self)
817
self._key_count = int(options_line[len(_OPTION_LEN):])
819
raise errors.BadIndexOptions(self)
820
# calculate the bytes we have processed
821
header_end = (len(signature) + len(lines[0]) + len(lines[1]) +
823
self._parsed_bytes(0, None, header_end, None)
824
# setup parsing state
825
self._expected_elements = 3 + self._key_length
826
# raw data keyed by offset
827
self._keys_by_offset = {}
828
# keys with the value and node references
829
self._bisect_nodes = {}
830
return header_end, bytes[header_end:]
832
def _parse_region(self, offset, data):
833
"""Parse node data returned from a readv operation.
835
:param offset: The byte offset the data starts at.
836
:param data: The data to parse.
840
end = offset + len(data)
843
# Trivial test - if the current index's end is within the
844
# low-matching parsed range, we're done.
845
index = self._parsed_byte_index(high_parsed)
846
if end < self._parsed_byte_map[index][1]:
848
# print "[%d:%d]" % (offset, end), \
849
# self._parsed_byte_map[index:index + 2]
850
high_parsed, last_segment = self._parse_segment(
851
offset, data, end, index)
855
def _parse_segment(self, offset, data, end, index):
856
"""Parse one segment of data.
858
:param offset: Where 'data' begins in the file.
859
:param data: Some data to parse a segment of.
860
:param end: Where data ends
861
:param index: The current index into the parsed bytes map.
862
:return: True if the parsed segment is the last possible one in the
864
:return: high_parsed_byte, last_segment.
865
high_parsed_byte is the location of the highest parsed byte in this
866
segment, last_segment is True if the parsed segment is the last
867
possible one in the data block.
869
# default is to use all data
871
# accomodate overlap with data before this.
872
if offset < self._parsed_byte_map[index][1]:
873
# overlaps the lower parsed region
874
# skip the parsed data
875
trim_start = self._parsed_byte_map[index][1] - offset
876
# don't trim the start for \n
877
start_adjacent = True
878
elif offset == self._parsed_byte_map[index][1]:
879
# abuts the lower parsed region
882
# do not trim anything
883
start_adjacent = True
885
# does not overlap the lower parsed region
888
# but trim the leading \n
889
start_adjacent = False
890
if end == self._size:
891
# lines up to the end of all data:
894
# do not strip to the last \n
897
elif index + 1 == len(self._parsed_byte_map):
898
# at the end of the parsed data
901
# but strip to the last \n
904
elif end == self._parsed_byte_map[index + 1][0]:
905
# buts up against the next parsed region
908
# do not strip to the last \n
911
elif end > self._parsed_byte_map[index + 1][0]:
912
# overlaps into the next parsed region
913
# only consider the unparsed data
914
trim_end = self._parsed_byte_map[index + 1][0] - offset
915
# do not strip to the last \n as we know its an entire record
917
last_segment = end < self._parsed_byte_map[index + 1][1]
919
# does not overlap into the next region
922
# but strip to the last \n
925
# now find bytes to discard if needed
926
if not start_adjacent:
927
# work around python bug in rfind
928
if trim_start is None:
929
trim_start = data.find('\n') + 1
931
trim_start = data.find('\n', trim_start) + 1
932
if not (trim_start != 0):
933
raise AssertionError('no \n was present')
934
# print 'removing start', offset, trim_start, repr(data[:trim_start])
936
# work around python bug in rfind
938
trim_end = data.rfind('\n') + 1
940
trim_end = data.rfind('\n', None, trim_end) + 1
941
if not (trim_end != 0):
942
raise AssertionError('no \n was present')
943
# print 'removing end', offset, trim_end, repr(data[trim_end:])
944
# adjust offset and data to the parseable data.
945
trimmed_data = data[trim_start:trim_end]
946
if not (trimmed_data):
947
raise AssertionError('read unneeded data [%d:%d] from [%d:%d]'
948
% (trim_start, trim_end, offset, offset + len(data)))
951
# print "parsing", repr(trimmed_data)
952
# splitlines mangles the \r delimiters.. don't use it.
953
lines = trimmed_data.split('\n')
956
first_key, last_key, nodes, _ = self._parse_lines(lines, pos)
957
for key, value in nodes:
958
self._bisect_nodes[key] = value
959
self._parsed_bytes(offset, first_key,
960
offset + len(trimmed_data), last_key)
961
return offset + len(trimmed_data), last_segment
963
def _parse_lines(self, lines, pos):
972
if not (self._size == pos + 1):
973
raise AssertionError("%s %s" % (self._size, pos))
976
elements = line.split('\0')
977
if len(elements) != self._expected_elements:
978
raise errors.BadIndexData(self)
979
# keys are tuples. Each element is a string that may occur many
980
# times, so we intern them to save space. AB, RC, 200807
981
key = tuple([intern(element) for element in elements[:self._key_length]])
982
if first_key is None:
984
absent, references, value = elements[-3:]
986
for ref_string in references.split('\t'):
987
ref_lists.append(tuple([
988
int(ref) for ref in ref_string.split('\r') if ref
990
ref_lists = tuple(ref_lists)
991
self._keys_by_offset[pos] = (key, absent, ref_lists, value)
992
pos += len(line) + 1 # +1 for the \n
995
if self.node_ref_lists:
996
node_value = (value, ref_lists)
999
nodes.append((key, node_value))
1000
# print "parsed ", key
1001
return first_key, key, nodes, trailers
1003
def _parsed_bytes(self, start, start_key, end, end_key):
1004
"""Mark the bytes from start to end as parsed.
1006
Calling self._parsed_bytes(1,2) will mark one byte (the one at offset
1009
:param start: The start of the parsed region.
1010
:param end: The end of the parsed region.
1012
index = self._parsed_byte_index(start)
1013
new_value = (start, end)
1014
new_key = (start_key, end_key)
1016
# first range parsed is always the beginning.
1017
self._parsed_byte_map.insert(index, new_value)
1018
self._parsed_key_map.insert(index, new_key)
1022
# extend lower region
1023
# extend higher region
1024
# combine two regions
1025
if (index + 1 < len(self._parsed_byte_map) and
1026
self._parsed_byte_map[index][1] == start and
1027
self._parsed_byte_map[index + 1][0] == end):
1028
# combine two regions
1029
self._parsed_byte_map[index] = (self._parsed_byte_map[index][0],
1030
self._parsed_byte_map[index + 1][1])
1031
self._parsed_key_map[index] = (self._parsed_key_map[index][0],
1032
self._parsed_key_map[index + 1][1])
1033
del self._parsed_byte_map[index + 1]
1034
del self._parsed_key_map[index + 1]
1035
elif self._parsed_byte_map[index][1] == start:
1036
# extend the lower entry
1037
self._parsed_byte_map[index] = (
1038
self._parsed_byte_map[index][0], end)
1039
self._parsed_key_map[index] = (
1040
self._parsed_key_map[index][0], end_key)
1041
elif (index + 1 < len(self._parsed_byte_map) and
1042
self._parsed_byte_map[index + 1][0] == end):
1043
# extend the higher entry
1044
self._parsed_byte_map[index + 1] = (
1045
start, self._parsed_byte_map[index + 1][1])
1046
self._parsed_key_map[index + 1] = (
1047
start_key, self._parsed_key_map[index + 1][1])
1050
self._parsed_byte_map.insert(index + 1, new_value)
1051
self._parsed_key_map.insert(index + 1, new_key)
1053
def _read_and_parse(self, readv_ranges):
1054
"""Read the the ranges and parse the resulting data.
1056
:param readv_ranges: A prepared readv range list.
1058
if not readv_ranges:
1060
if self._nodes is None and self._bytes_read * 2 >= self._size:
1061
# We've already read more than 50% of the file and we are about to
1062
# request more data, just _buffer_all() and be done
1066
readv_data = self._transport.readv(self._name, readv_ranges, True,
1069
for offset, data in readv_data:
1070
self._bytes_read += len(data)
1071
if offset == 0 and len(data) == self._size:
1072
# We read the whole range, most likely because the
1073
# Transport upcast our readv ranges into one long request
1074
# for enough total data to grab the whole index.
1075
self._buffer_all(StringIO(data))
1077
if self._bisect_nodes is None:
1078
# this must be the start
1079
if not (offset == 0):
1080
raise AssertionError()
1081
offset, data = self._parse_header_from_bytes(data)
1082
# print readv_ranges, "[%d:%d]" % (offset, offset + len(data))
1083
self._parse_region(offset, data)
1085
def _signature(self):
1086
"""The file signature for this index type."""
1090
"""Validate that everything in the index can be accessed."""
1091
# iter_all validates completely at the moment, so just do that.
1092
for node in self.iter_all_entries():
1096
class CombinedGraphIndex(object):
1097
"""A GraphIndex made up from smaller GraphIndices.
1099
The backing indices must implement GraphIndex, and are presumed to be
1102
Queries against the combined index will be made against the first index,
1103
and then the second and so on. The order of index's can thus influence
1104
performance significantly. For example, if one index is on local disk and a
1105
second on a remote server, the local disk index should be before the other
1109
def __init__(self, indices):
1110
"""Create a CombinedGraphIndex backed by indices.
1112
:param indices: An ordered list of indices to query for data.
1114
self._indices = indices
1118
self.__class__.__name__,
1119
', '.join(map(repr, self._indices)))
1121
@symbol_versioning.deprecated_method(symbol_versioning.one_one)
1122
def get_parents(self, revision_ids):
1123
"""See graph._StackedParentsProvider.get_parents.
1125
This implementation thunks the graph.Graph.get_parents api across to
1128
:param revision_ids: An iterable of graph keys for this graph.
1129
:return: A list of parent details for each key in revision_ids.
1130
Each parent details will be one of:
1131
* None when the key was missing
1132
* (NULL_REVISION,) when the key has no parents.
1133
* (parent_key, parent_key...) otherwise.
1135
parent_map = self.get_parent_map(revision_ids)
1136
return [parent_map.get(r, None) for r in revision_ids]
1138
def get_parent_map(self, keys):
1139
"""See graph._StackedParentsProvider.get_parent_map"""
1140
search_keys = set(keys)
1141
if NULL_REVISION in search_keys:
1142
search_keys.discard(NULL_REVISION)
1143
found_parents = {NULL_REVISION:[]}
1146
for index, key, value, refs in self.iter_entries(search_keys):
1149
parents = (NULL_REVISION,)
1150
found_parents[key] = parents
1151
return found_parents
1153
def insert_index(self, pos, index):
1154
"""Insert a new index in the list of indices to query.
1156
:param pos: The position to insert the index.
1157
:param index: The index to insert.
1159
self._indices.insert(pos, index)
1161
def iter_all_entries(self):
1162
"""Iterate over all keys within the index
1164
Duplicate keys across child indices are presumed to have the same
1165
value and are only reported once.
1167
:return: An iterable of (index, key, reference_lists, value).
1168
There is no defined order for the result iteration - it will be in
1169
the most efficient order for the index.
1172
for index in self._indices:
1173
for node in index.iter_all_entries():
1174
if node[1] not in seen_keys:
1176
seen_keys.add(node[1])
1178
def iter_entries(self, keys):
1179
"""Iterate over keys within the index.
1181
Duplicate keys across child indices are presumed to have the same
1182
value and are only reported once.
1184
:param keys: An iterable providing the keys to be retrieved.
1185
:return: An iterable of (index, key, reference_lists, value). There is no
1186
defined order for the result iteration - it will be in the most
1187
efficient order for the index.
1190
for index in self._indices:
1193
for node in index.iter_entries(keys):
1194
keys.remove(node[1])
1197
def iter_entries_prefix(self, keys):
1198
"""Iterate over keys within the index using prefix matching.
1200
Duplicate keys across child indices are presumed to have the same
1201
value and are only reported once.
1203
Prefix matching is applied within the tuple of a key, not to within
1204
the bytestring of each key element. e.g. if you have the keys ('foo',
1205
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
1206
only the former key is returned.
1208
:param keys: An iterable providing the key prefixes to be retrieved.
1209
Each key prefix takes the form of a tuple the length of a key, but
1210
with the last N elements 'None' rather than a regular bytestring.
1211
The first element cannot be 'None'.
1212
:return: An iterable as per iter_all_entries, but restricted to the
1213
keys with a matching prefix to those supplied. No additional keys
1214
will be returned, and every match that is in the index will be
1221
for index in self._indices:
1222
for node in index.iter_entries_prefix(keys):
1223
if node[1] in seen_keys:
1225
seen_keys.add(node[1])
1228
def key_count(self):
1229
"""Return an estimate of the number of keys in this index.
1231
For CombinedGraphIndex this is approximated by the sum of the keys of
1232
the child indices. As child indices may have duplicate keys this can
1233
have a maximum error of the number of child indices * largest number of
1236
return sum((index.key_count() for index in self._indices), 0)
1239
"""Validate that everything in the index can be accessed."""
1240
for index in self._indices:
1244
class InMemoryGraphIndex(GraphIndexBuilder):
1245
"""A GraphIndex which operates entirely out of memory and is mutable.
1247
This is designed to allow the accumulation of GraphIndex entries during a
1248
single write operation, where the accumulated entries need to be immediately
1249
available - for example via a CombinedGraphIndex.
1252
def add_nodes(self, nodes):
1253
"""Add nodes to the index.
1255
:param nodes: An iterable of (key, node_refs, value) entries to add.
1257
if self.reference_lists:
1258
for (key, value, node_refs) in nodes:
1259
self.add_node(key, value, node_refs)
1261
for (key, value) in nodes:
1262
self.add_node(key, value)
1264
def iter_all_entries(self):
1265
"""Iterate over all keys within the index
1267
:return: An iterable of (index, key, reference_lists, value). There is no
1268
defined order for the result iteration - it will be in the most
1269
efficient order for the index (in this case dictionary hash order).
1271
if 'evil' in debug.debug_flags:
1272
trace.mutter_callsite(3,
1273
"iter_all_entries scales with size of history.")
1274
if self.reference_lists:
1275
for key, (absent, references, value) in self._nodes.iteritems():
1277
yield self, key, value, references
1279
for key, (absent, references, value) in self._nodes.iteritems():
1281
yield self, key, value
1283
def iter_entries(self, keys):
1284
"""Iterate over keys within the index.
1286
:param keys: An iterable providing the keys to be retrieved.
1287
:return: An iterable of (index, key, value, reference_lists). There is no
1288
defined order for the result iteration - it will be in the most
1289
efficient order for the index (keys iteration order in this case).
1292
if self.reference_lists:
1293
for key in keys.intersection(self._keys):
1294
node = self._nodes[key]
1296
yield self, key, node[2], node[1]
1298
for key in keys.intersection(self._keys):
1299
node = self._nodes[key]
1301
yield self, key, node[2]
1303
def iter_entries_prefix(self, keys):
1304
"""Iterate over keys within the index using prefix matching.
1306
Prefix matching is applied within the tuple of a key, not to within
1307
the bytestring of each key element. e.g. if you have the keys ('foo',
1308
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
1309
only the former key is returned.
1311
:param keys: An iterable providing the key prefixes to be retrieved.
1312
Each key prefix takes the form of a tuple the length of a key, but
1313
with the last N elements 'None' rather than a regular bytestring.
1314
The first element cannot be 'None'.
1315
:return: An iterable as per iter_all_entries, but restricted to the
1316
keys with a matching prefix to those supplied. No additional keys
1317
will be returned, and every match that is in the index will be
1320
# XXX: To much duplication with the GraphIndex class; consider finding
1321
# a good place to pull out the actual common logic.
1325
if self._key_length == 1:
1329
raise errors.BadIndexKey(key)
1330
if len(key) != self._key_length:
1331
raise errors.BadIndexKey(key)
1332
node = self._nodes[key]
1335
if self.reference_lists:
1336
yield self, key, node[2], node[1]
1338
yield self, key, node[2]
1340
nodes_by_key = self._get_nodes_by_key()
1344
raise errors.BadIndexKey(key)
1345
if len(key) != self._key_length:
1346
raise errors.BadIndexKey(key)
1347
# find what it refers to:
1348
key_dict = nodes_by_key
1349
elements = list(key)
1350
# find the subdict to return
1352
while len(elements) and elements[0] is not None:
1353
key_dict = key_dict[elements[0]]
1356
# a non-existant lookup.
1361
key_dict = dicts.pop(-1)
1362
# can't be empty or would not exist
1363
item, value = key_dict.iteritems().next()
1364
if type(value) == dict:
1366
dicts.extend(key_dict.itervalues())
1369
for value in key_dict.itervalues():
1370
yield (self, ) + value
1372
yield (self, ) + key_dict
1374
def key_count(self):
1375
"""Return an estimate of the number of keys in this index.
1377
For InMemoryGraphIndex the estimate is exact.
1379
return len(self._keys)
1382
"""In memory index's have no known corruption at the moment."""
1385
class GraphIndexPrefixAdapter(object):
1386
"""An adapter between GraphIndex with different key lengths.
1388
Queries against this will emit queries against the adapted Graph with the
1389
prefix added, queries for all items use iter_entries_prefix. The returned
1390
nodes will have their keys and node references adjusted to remove the
1391
prefix. Finally, an add_nodes_callback can be supplied - when called the
1392
nodes and references being added will have prefix prepended.
1395
def __init__(self, adapted, prefix, missing_key_length,
1396
add_nodes_callback=None):
1397
"""Construct an adapter against adapted with prefix."""
1398
self.adapted = adapted
1399
self.prefix_key = prefix + (None,)*missing_key_length
1400
self.prefix = prefix
1401
self.prefix_len = len(prefix)
1402
self.add_nodes_callback = add_nodes_callback
1404
def add_nodes(self, nodes):
1405
"""Add nodes to the index.
1407
:param nodes: An iterable of (key, node_refs, value) entries to add.
1409
# save nodes in case its an iterator
1410
nodes = tuple(nodes)
1411
translated_nodes = []
1413
# Add prefix_key to each reference node_refs is a tuple of tuples,
1414
# so split it apart, and add prefix_key to the internal reference
1415
for (key, value, node_refs) in nodes:
1416
adjusted_references = (
1417
tuple(tuple(self.prefix + ref_node for ref_node in ref_list)
1418
for ref_list in node_refs))
1419
translated_nodes.append((self.prefix + key, value,
1420
adjusted_references))
1422
# XXX: TODO add an explicit interface for getting the reference list
1423
# status, to handle this bit of user-friendliness in the API more
1425
for (key, value) in nodes:
1426
translated_nodes.append((self.prefix + key, value))
1427
self.add_nodes_callback(translated_nodes)
1429
def add_node(self, key, value, references=()):
1430
"""Add a node to the index.
1432
:param key: The key. keys are non-empty tuples containing
1433
as many whitespace-free utf8 bytestrings as the key length
1434
defined for this index.
1435
:param references: An iterable of iterables of keys. Each is a
1436
reference to another key.
1437
:param value: The value to associate with the key. It may be any
1438
bytes as long as it does not contain \0 or \n.
1440
self.add_nodes(((key, value, references), ))
1442
def _strip_prefix(self, an_iter):
1443
"""Strip prefix data from nodes and return it."""
1444
for node in an_iter:
1446
if node[1][:self.prefix_len] != self.prefix:
1447
raise errors.BadIndexData(self)
1448
for ref_list in node[3]:
1449
for ref_node in ref_list:
1450
if ref_node[:self.prefix_len] != self.prefix:
1451
raise errors.BadIndexData(self)
1452
yield node[0], node[1][self.prefix_len:], node[2], (
1453
tuple(tuple(ref_node[self.prefix_len:] for ref_node in ref_list)
1454
for ref_list in node[3]))
1456
def iter_all_entries(self):
1457
"""Iterate over all keys within the index
1459
iter_all_entries is implemented against the adapted index using
1460
iter_entries_prefix.
1462
:return: An iterable of (index, key, reference_lists, value). There is no
1463
defined order for the result iteration - it will be in the most
1464
efficient order for the index (in this case dictionary hash order).
1466
return self._strip_prefix(self.adapted.iter_entries_prefix([self.prefix_key]))
1468
def iter_entries(self, keys):
1469
"""Iterate over keys within the index.
1471
:param keys: An iterable providing the keys to be retrieved.
1472
:return: An iterable of (index, key, value, reference_lists). There is no
1473
defined order for the result iteration - it will be in the most
1474
efficient order for the index (keys iteration order in this case).
1476
return self._strip_prefix(self.adapted.iter_entries(
1477
self.prefix + key for key in keys))
1479
def iter_entries_prefix(self, keys):
1480
"""Iterate over keys within the index using prefix matching.
1482
Prefix matching is applied within the tuple of a key, not to within
1483
the bytestring of each key element. e.g. if you have the keys ('foo',
1484
'bar'), ('foobar', 'gam') and do a prefix search for ('foo', None) then
1485
only the former key is returned.
1487
:param keys: An iterable providing the key prefixes to be retrieved.
1488
Each key prefix takes the form of a tuple the length of a key, but
1489
with the last N elements 'None' rather than a regular bytestring.
1490
The first element cannot be 'None'.
1491
:return: An iterable as per iter_all_entries, but restricted to the
1492
keys with a matching prefix to those supplied. No additional keys
1493
will be returned, and every match that is in the index will be
1496
return self._strip_prefix(self.adapted.iter_entries_prefix(
1497
self.prefix + key for key in keys))
1499
def key_count(self):
1500
"""Return an estimate of the number of keys in this index.
1502
For GraphIndexPrefixAdapter this is relatively expensive - key
1503
iteration with the prefix is done.
1505
return len(list(self.iter_all_entries()))
1508
"""Call the adapted's validate."""
1509
self.adapted.validate()