519
519
yield pack_writer.end()
522
class _ByteStreamDecoder(object):
523
"""Helper for _byte_stream_to_stream.
525
The expected usage of this class is via the function _byte_stream_to_stream
526
which creates a _ByteStreamDecoder, pops off the stream format and then
527
yields the output of record_stream(), the main entry point to
530
Broadly this class has to unwrap two layers of iterators:
534
This is complicated by wishing to return type, iterator_for_type, but
535
getting the data for iterator_for_type when we find out type: we can't
536
simply pass a generator down to the NetworkRecordStream parser, instead
537
we have a little local state to seed each NetworkRecordStream instance,
538
and gather the type that we'll be yielding.
540
:ivar byte_stream: The byte stream being decoded.
541
:ivar stream_decoder: A pack parser used to decode the bytestream
542
:ivar current_type: The current type, used to join adjacent records of the
543
same type into a single stream.
544
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
547
def __init__(self, byte_stream):
548
"""Create a _ByteStreamDecoder."""
549
self.stream_decoder = pack.ContainerPushParser()
550
self.current_type = None
551
self.first_bytes = None
552
self.byte_stream = byte_stream
554
def iter_stream_decoder(self):
555
"""Iterate the contents of the pack from stream_decoder."""
556
# dequeue pending items
557
for record in self.stream_decoder.read_pending_records():
559
# Pull bytes of the wire, decode them to records, yield those records.
560
for bytes in self.byte_stream:
561
self.stream_decoder.accept_bytes(bytes)
562
for record in self.stream_decoder.read_pending_records():
565
def iter_substream_bytes(self):
566
if self.first_bytes is not None:
567
yield self.first_bytes
568
# If we run out of pack records, single the outer layer to stop.
569
self.first_bytes = None
570
for record in self.iter_pack_records:
571
record_names, record_bytes = record
572
record_name, = record_names
573
substream_type = record_name[0]
574
if substream_type != self.current_type:
575
# end of a substream, seed the next substream.
576
self.current_type = substream_type
577
self.first_bytes = record_bytes
581
def record_stream(self):
582
"""Yield substream_type, substream from the byte stream."""
584
# Make and consume sub generators, one per substream type:
585
while self.first_bytes is not None:
586
substream = NetworkRecordStream(self.iter_substream_bytes())
587
# after substream is fully consumed, self.current_type is set to
588
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
591
def seed_state(self):
592
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
593
# Set a single generator we can use to get data from the pack stream.
594
self.iter_pack_records = self.iter_stream_decoder()
595
# Seed the very first subiterator with content; after this each one
597
list(self.iter_substream_bytes())
522
600
def _byte_stream_to_stream(byte_stream):
523
601
"""Convert a byte stream into a format and a stream.
525
603
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
526
604
:return: (RepositoryFormat, stream_generator)
528
stream_decoder = pack.ContainerPushParser()
530
"""Closure to return the substreams."""
531
# May have fully parsed records already.
532
for record in stream_decoder.read_pending_records():
533
record_names, record_bytes = record
534
record_name, = record_names
535
substream_type = record_name[0]
536
substream = NetworkRecordStream([record_bytes])
537
yield substream_type, substream.read()
538
for bytes in byte_stream:
539
stream_decoder.accept_bytes(bytes)
540
for record in stream_decoder.read_pending_records():
541
record_names, record_bytes = record
542
record_name, = record_names
543
substream_type = record_name[0]
544
substream = NetworkRecordStream([record_bytes])
545
yield substream_type, substream.read()
606
decoder = _ByteStreamDecoder(byte_stream)
546
607
for bytes in byte_stream:
547
stream_decoder.accept_bytes(bytes)
548
for record in stream_decoder.read_pending_records(max=1):
608
decoder.stream_decoder.accept_bytes(bytes)
609
for record in decoder.stream_decoder.read_pending_records(max=1):
549
610
record_names, src_format_name = record
550
611
src_format = network_format_registry.get(src_format_name)
551
return src_format, record_stream()
612
return src_format, decoder.record_stream()
554
615
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):