471
513
self._buffer_all()
472
514
return self._key_count
516
def lookup_keys_via_location(self, location_keys):
517
"""Public interface for implementing bisection.
519
:param location_keys: A list of location, key tuples.
520
:return: A list of (location_key, result) tuples as expected by
521
bzrlib.bisect_multi.bisect_multi_bytes.
523
# Possible improvements:
524
# - only bisect lookup each key once
525
# - sort the keys first, and use that to reduce the bisection window
527
# this progresses in three parts:
530
# attempt to answer the question from the now in memory data.
531
# build the readv request
532
# for each location, ask for 800 bytes - much more than rows we've seen
535
for location, key in location_keys:
536
# can we answer from cache?
537
index = self._parsed_key_index(key)
538
if (len(self._parsed_key_map) and
539
self._parsed_key_map[index][0] <= key and
540
self._parsed_key_map[index][1] > key):
541
# the key has been parsed, so no lookup is needed
544
if location + length > self._size:
545
length = self._size - location
546
# todo, trim out parsed locations.
548
readv_ranges.append((location, length))
549
# read the header if needed
550
if self._nodes is None:
551
readv_ranges.append((0, 200))
553
readv_data = self._transport.readv(self._name, readv_ranges, True,
556
for offset, data in readv_data:
557
if self._nodes is None:
558
# this must be the start
560
offset, data = self._parse_header_from_bytes(data)
561
self._parse_region(offset, data)
562
# print offset, len(data), data
564
# - figure out <, >, missing, present
565
# - result present references so we can return them.
567
for location, key in location_keys:
568
# can we answer from cache?
569
index = self._parsed_key_index(key)
570
if (self._parsed_key_map[index][0] <= key and
571
(self._parsed_key_map[index][1] > key or
572
# end of the file has been parsed
573
self._parsed_byte_map[index][1] == self._size)):
574
# the key has been parsed, so no lookup is needed
575
if key in self._nodes:
576
if self.node_ref_lists:
577
value, refs = self._nodes[key]
578
result.append(((location, key), (self, key, value, refs)))
580
result.append(((location, key), (self, key, self._nodes[key])))
582
result.append(((location, key), False))
584
# no, is the key above or below the probed location:
585
# get the range of the probed & parsed location
586
index = self._parsed_byte_index(location)
587
# if the key is below the start of the range, its below
588
if key < self._parsed_key_map[index][0]:
592
result.append(((location, key), direction))
595
def _parse_header_from_bytes(self, bytes):
596
"""Parse the header from a region of bytes.
598
:param bytes: The data to parse.
599
:return: An offset, data tuple such as readv yields, for the unparsed
600
data. (which may length 0).
602
signature = bytes[0:len(self._signature())]
603
if not signature == self._signature():
604
raise errors.BadIndexFormatSignature(self._name, GraphIndex)
605
lines = bytes[len(self._signature()):].splitlines()
606
options_line = lines[0]
607
if not options_line.startswith(_OPTION_NODE_REFS):
608
raise errors.BadIndexOptions(self)
610
self.node_ref_lists = int(options_line[len(_OPTION_NODE_REFS):])
612
raise errors.BadIndexOptions(self)
613
options_line = lines[1]
614
if not options_line.startswith(_OPTION_KEY_ELEMENTS):
615
raise errors.BadIndexOptions(self)
617
self._key_length = int(options_line[len(_OPTION_KEY_ELEMENTS):])
619
raise errors.BadIndexOptions(self)
620
options_line = lines[2]
621
if not options_line.startswith(_OPTION_LEN):
622
raise errors.BadIndexOptions(self)
624
self._key_count = int(options_line[len(_OPTION_LEN):])
626
raise errors.BadIndexOptions(self)
627
# calculate the bytes we have processed
628
header_end = (len(signature) + len(lines[0]) + len(lines[1]) +
630
self._parsed_bytes(0, None, header_end, None)
631
# setup parsing state
632
self._expected_elements = 3 + self._key_length
633
# raw data keyed by offset
634
self._keys_by_offset = {}
635
# ready-to-return key:value or key:value, node_ref_lists
637
self._nodes_by_key = {}
638
return header_end, bytes[header_end:]
640
def _parse_region(self, offset, data):
641
"""Parse node data returned from a readv operation.
643
:param offset: The byte offset the data starts at.
644
:param data: The data to parse.
648
end = offset + len(data)
649
index = self._parsed_byte_index(offset)
650
# default is to use all data
652
# trivial check for entirely parsed data:
653
if end < self._parsed_byte_map[index][1]:
655
# accomodate overlap with data before this.
656
if offset < self._parsed_byte_map[index][1]:
657
# overlaps the lower parsed region
658
# skip the parsed data
659
trim_start = self._parsed_byte_map[index][1] - offset
660
# don't trim the start for \n
661
start_adjacent = True
662
elif offset == self._parsed_byte_map[index][1]:
663
# abuts the lower parsed region
666
# do not trim anything
667
start_adjacent = True
669
# does not overlap the lower parsed region
672
# but trim the leading \n
673
start_adjacent = False
674
if end == self._size:
675
# lines up to the end of all data:
678
# do not strip to the last \n
680
elif index + 1 == len(self._parsed_byte_map):
681
# at the end of the parsed data
684
# but strip to the last \n
686
elif end == self._parsed_byte_map[index + 1][0]:
687
# buts up against the next parsed region
690
# do not strip to the last \n
692
elif end > self._parsed_byte_map[index + 1][0]:
693
# overlaps into the next parsed region
694
# only consider the unparsed data
695
trim_end = self._parsed_byte_map[index + 1][0] - offset
696
# do not strip to the last \n as we know its an entire record
699
# does not overlap into the next region
702
# but strip to the last \n
704
# now find bytes to discard if needed
705
if not start_adjacent:
706
# work around python bug in rfind
707
if trim_start is None:
708
trim_start = data.find('\n') + 1
710
trim_start = data.find('\n', trim_start) + 1
711
assert trim_start != 0, 'no \n was present'
712
# print 'removing start', offset, trim_start, repr(data[:trim_start])
714
# work around python bug in rfind
716
trim_end = data.rfind('\n') + 1
718
trim_end = data.rfind('\n', None, trim_end) + 1
719
assert trim_end != 0, 'no \n was present'
720
# print 'removing end', offset, trim_end, repr(data[trim_end:])
721
# adjust offset and data to the parseable data.
722
data = data[trim_start:trim_end]
725
# print "parsing", repr(data)
726
lines = data.splitlines()
733
assert self._size == pos + 1, "%s %s" % (self._size, pos)
735
elements = line.split('\0')
736
if len(elements) != self._expected_elements:
737
raise errors.BadIndexData(self)
739
key = tuple(elements[:self._key_length])
740
if first_key is None:
742
absent, references, value = elements[-3:]
744
for ref_string in references.split('\t'):
745
ref_lists.append(tuple([
746
int(ref) for ref in ref_string.split('\r') if ref
748
ref_lists = tuple(ref_lists)
749
self._keys_by_offset[pos] = (key, absent, ref_lists, value)
750
pos += len(line) + 1 # +1 for the \n
751
self._parsed_bytes(offset, first_key, offset + len(data), key)
752
# XXXXXX repeated work here.
753
for key, absent, references, value in self._keys_by_offset.itervalues():
756
# resolve references:
757
if self.node_ref_lists:
759
for ref_list in references:
760
node_refs.append(tuple([self._keys_by_offset[ref][0] for ref in ref_list]))
761
node_value = (value, tuple(node_refs))
764
self._nodes[key] = node_value
765
if self._key_length > 1:
766
subkey = list(reversed(key[:-1]))
767
key_dict = self._nodes_by_key
768
if self.node_ref_lists:
769
key_value = key, node_value[0], node_value[1]
771
key_value = key, node_value
772
# possibly should do this on-demand, but it seems likely it is
774
# For a key of (foo, bar, baz) create
775
# _nodes_by_key[foo][bar][baz] = key_value
776
for subkey in key[:-1]:
777
key_dict = key_dict.setdefault(subkey, {})
778
key_dict[key[-1]] = key_value
780
def _parsed_bytes(self, start, start_key, end, end_key):
781
"""Mark the bytes from start to end as parsed.
783
Calling self._parsed_bytes(1,2) will mark one byte (the one at offset
786
:param start: The start of the parsed region.
787
:param end: The end of the parsed region.
790
index = bisect_right(self._parsed_byte_map, key)
791
new_value = (start, end)
792
new_key = (start_key, end_key)
794
# first range parsed is always the beginning.
795
self._parsed_byte_map.insert(index, new_value)
796
self._parsed_key_map.insert(index, new_key)
797
elif index == len(self._parsed_byte_map):
798
# may be adjacent to the prior entry
799
if start == self._parsed_byte_map[index - 1][1]:
800
self._parsed_byte_map[index - 1] = (
801
self._parsed_byte_map[index - 1][0], end)
802
self._parsed_key_map[index - 1] = (
803
self._parsed_key_map[index - 1][0], end_key)
806
self._parsed_byte_map.insert(index, new_value)
807
self._parsed_key_map.insert(index, new_key)
809
# may be adjacent to the next entry
810
if end == self._parsed_byte_map[index][0]:
811
# move the next entry down to this range
812
self._parsed_byte_map[index] = (
813
start, self._parsed_byte_map[index][1])
814
self._parsed_key_map[index] = (
815
start_key, self._parsed_key_map[index][1])
818
self._parsed_byte_map.insert(index, new_value)
819
self._parsed_key_map.insert(index, new_key)
474
821
def _signature(self):
475
822
"""The file signature for this index type."""
476
823
return _SIGNATURE