502
502
pack_writer = pack.ContainerSerialiser()
503
503
yield pack_writer.begin()
504
504
yield pack_writer.bytes_record(src_format.network_name(), '')
507
pb = ui.ui_factory.nested_progress_bar()
506
#rc = RecordCounter()
507
#pb = ui.ui_factory.nested_progress_bar()
508
#pb.update('', rc.current, rc.max)
508
509
for substream_type, substream in stream:
510
511
for record in substream:
512
if substream_type == 'revisions':
514
elif rc.is_initialized():
515
if counter == rc.STEP:
516
rc.increment(counter)
517
pb.update('', rc.current, rc.max)
513
#if substream_type == 'revisions':
515
#elif rc.is_initialized():
516
# if counter == rc.STEP:
517
# rc.increment(counter)
518
# pb.update('', rc.current, rc.max)
519
520
if record.storage_kind in ('chunked', 'fulltext'):
520
521
serialised = record_to_fulltext_bytes(record)
521
522
elif record.storage_kind == 'inventory-delta':
529
530
# representation of the first record, which means that
530
531
# later records have no wire representation: we skip them.
531
532
yield pack_writer.bytes_record(serialised, [(substream_type,)])
532
if substream_type == 'revisions':
533
rc.setup(key_count, current=key_count)
534
pb.update('', rc.max, rc.max)
533
#if substream_type == 'revisions':
534
# rc.setup(key_count, current=key_count)
535
#pb.update('', rc.max, rc.max)
536
537
yield pack_writer.end()
561
562
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
564
def __init__(self, byte_stream):
565
def __init__(self, byte_stream, record_counter):
565
566
"""Create a _ByteStreamDecoder."""
566
567
self.stream_decoder = pack.ContainerPushParser()
567
568
self.current_type = None
568
569
self.first_bytes = None
569
570
self.byte_stream = byte_stream
571
self.record_counter = record_counter
572
self.revisions_processed = False
571
575
def iter_stream_decoder(self):
572
576
"""Iterate the contents of the pack from stream_decoder."""
573
577
# dequeue pending items
574
578
for record in self.stream_decoder.read_pending_records():
582
#rc = RecordCounter()
583
#pb = ui.ui_factory.nested_progress_bar()
576
584
# Pull bytes of the wire, decode them to records, yield those records.
577
585
for bytes in self.byte_stream:
578
586
self.stream_decoder.accept_bytes(bytes)
579
588
for record in self.stream_decoder.read_pending_records():
589
#if self.current_type == 'revisions':
590
# pb.update('Estimating..', key_count)
592
#elif rc.is_initialized():
593
# if counter == rc.STEP:
594
# rc.increment(counter)
595
# pb.update('Estimate', rc.current, rc.max)
599
#if self.current_type == 'revisions':
600
# rc.setup(key_count, current=key_count)
601
#pb.update('Estimate', rc.max, rc.max)
582
604
def iter_substream_bytes(self):
583
605
if self.first_bytes is not None:
598
620
def record_stream(self):
599
621
"""Yield substream_type, substream from the byte stream."""
600
622
self.seed_state()
623
pb = ui.ui_factory.nested_progress_bar()
601
624
# Make and consume sub generators, one per substream type:
602
625
while self.first_bytes is not None:
603
626
substream = NetworkRecordStream(self.iter_substream_bytes())
604
627
# after substream is fully consumed, self.current_type is set to
605
628
# the next type, and self.first_bytes is set to the matching bytes.
606
yield self.current_type, substream.read()
629
def wrap_and_count(substream):
631
if self.record_counter:
632
if self.current_type != 'revisions' and self.key_count != 0:
633
if not self.record_counter.is_initialized():
634
self.record_counter.setup(self.key_count,
636
for record in substream.read():
637
if self.record_counter:
638
if self.record_counter.is_initialized() and \
639
counter == self.record_counter.STEP:
640
self.record_counter.increment(counter)
641
pb.update('Estimate', self.record_counter.current,
642
self.record_counter.max)
644
if self.current_type == 'revisions':
646
if counter == self.record_counter.STEP:
647
pb.update('Estimating..', self.key_count)
652
yield self.current_type, wrap_and_count(substream)
653
if self.record_counter:
654
pb.update('Done', self.record_counter.max, self.record_counter.max)
608
657
def seed_state(self):
609
658
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
614
663
list(self.iter_substream_bytes())
617
def _byte_stream_to_stream(byte_stream):
666
def _byte_stream_to_stream(byte_stream, record_counter=None):
618
667
"""Convert a byte stream into a format and a stream.
620
669
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
621
670
:return: (RepositoryFormat, stream_generator)
623
decoder = _ByteStreamDecoder(byte_stream)
672
decoder = _ByteStreamDecoder(byte_stream, record_counter)
624
673
for bytes in byte_stream:
625
674
decoder.stream_decoder.accept_bytes(bytes)
626
675
for record in decoder.stream_decoder.read_pending_records(max=1):