662
662
:return: A list or generator of (offset, data) tuples
664
664
if adjust_for_latency:
665
offsets = sorted(offsets)
666
# short circuit empty requests
667
if len(offsets) == 0:
669
# Quick thunk to stop this function becoming a generator
670
# itself, rather we return a generator that has nothing to
674
return empty_yielder()
675
# expand by page size at either end
676
maximum_expansion = self.recommended_page_size()
678
for offset, length in offsets:
679
expansion = maximum_expansion - length
681
# we're asking for more than the minimum read anyway.
683
reduction = expansion / 2
684
new_offset = offset - reduction
685
new_length = length + expansion
687
# don't ask for anything < 0
689
if (upper_limit is not None and
690
new_offset + new_length > upper_limit):
691
new_length = upper_limit - new_offset
692
new_offsets.append((new_offset, new_length))
693
# combine the expanded offsets
695
current_offset, current_length = new_offsets[0]
696
current_finish = current_length + current_offset
697
for offset, length in new_offsets[1:]:
698
finish = offset + length
699
if offset > current_finish:
700
# there is a gap, output the current accumulator and start
701
# a new one for the region we're examining.
702
offsets.append((current_offset, current_length))
703
current_offset = offset
704
current_length = length
705
current_finish = finish
707
if finish > current_finish:
708
# extend the current accumulator to the end of the region
710
current_finish = finish
711
current_length = finish - current_offset
712
offsets.append((current_offset, current_length))
665
# Design note: We may wish to have different algorithms for the
666
# expansion of the offsets per-transport. E.g. for local disk to
667
# use page-aligned expansion. If that is the case consider the following structure:
668
# - a test that transport.readv uses self._offset_expander or some similar attribute, to do the expansion
669
# - a test for each transport that it has some known-good offset expander
670
# - unit tests for each offset expander
671
# - a set of tests for the offset expander interface, giving
672
# baseline behaviour (which the current transport
673
# adjust_for_latency tests could be repurposed to).
674
offsets = self._sort_expand_and_combine(offsets, upper_limit)
713
675
return self._readv(relpath, offsets)
715
677
def _readv(self, relpath, offsets):
766
728
yield cur_offset_and_size[0], this_data
767
729
cur_offset_and_size = offset_stack.next()
731
def _sort_expand_and_combine(self, offsets, upper_limit):
734
:param offsets: A readv vector - (offset, length) tuples.
735
:param upper_limit: The highest byte offset that may be requested.
736
:return: A readv vector that will read all the regions requested by
737
offsets, in start-to-end order, with no duplicated regions,
738
expanded by the transports recommended page size.
740
offsets = sorted(offsets)
741
# short circuit empty requests
742
if len(offsets) == 0:
744
# Quick thunk to stop this function becoming a generator
745
# itself, rather we return a generator that has nothing to
749
return empty_yielder()
750
# expand by page size at either end
751
maximum_expansion = self.recommended_page_size()
753
for offset, length in offsets:
754
expansion = maximum_expansion - length
756
# we're asking for more than the minimum read anyway.
758
reduction = expansion / 2
759
new_offset = offset - reduction
760
new_length = length + expansion
762
# don't ask for anything < 0
764
if (upper_limit is not None and
765
new_offset + new_length > upper_limit):
766
new_length = upper_limit - new_offset
767
new_offsets.append((new_offset, new_length))
768
# combine the expanded offsets
770
current_offset, current_length = new_offsets[0]
771
current_finish = current_length + current_offset
772
for offset, length in new_offsets[1:]:
773
finish = offset + length
774
if offset > current_finish:
775
# there is a gap, output the current accumulator and start
776
# a new one for the region we're examining.
777
offsets.append((current_offset, current_length))
778
current_offset = offset
779
current_length = length
780
current_finish = finish
782
if finish > current_finish:
783
# extend the current accumulator to the end of the region
785
current_finish = finish
786
current_length = finish - current_offset
787
offsets.append((current_offset, current_length))
770
791
def _coalesce_offsets(offsets, limit, fudge_factor):
771
792
"""Yield coalesced offsets.