502
502
pack_writer = pack.ContainerSerialiser()
503
503
yield pack_writer.begin()
504
504
yield pack_writer.bytes_record(src_format.network_name(), '')
506
#rc = RecordCounter()
507
#pb = ui.ui_factory.nested_progress_bar()
508
#pb.update('', rc.current, rc.max)
509
505
for substream_type, substream in stream:
511
506
for record in substream:
513
#if substream_type == 'revisions':
515
#elif rc.is_initialized():
516
# if counter == rc.STEP:
517
# rc.increment(counter)
518
# pb.update('', rc.current, rc.max)
520
507
if record.storage_kind in ('chunked', 'fulltext'):
521
508
serialised = record_to_fulltext_bytes(record)
522
509
elif record.storage_kind == 'inventory-delta':
530
517
# representation of the first record, which means that
531
518
# later records have no wire representation: we skip them.
532
519
yield pack_writer.bytes_record(serialised, [(substream_type,)])
533
#if substream_type == 'revisions':
534
# rc.setup(key_count, current=key_count)
535
#pb.update('', rc.max, rc.max)
537
520
yield pack_writer.end()
578
561
for record in self.stream_decoder.read_pending_records():
582
#rc = RecordCounter()
583
#pb = ui.ui_factory.nested_progress_bar()
584
564
# Pull bytes of the wire, decode them to records, yield those records.
585
565
for bytes in self.byte_stream:
586
566
self.stream_decoder.accept_bytes(bytes)
588
567
for record in self.stream_decoder.read_pending_records():
589
#if self.current_type == 'revisions':
590
# pb.update('Estimating..', key_count)
592
#elif rc.is_initialized():
593
# if counter == rc.STEP:
594
# rc.increment(counter)
595
# pb.update('Estimate', rc.current, rc.max)
599
#if self.current_type == 'revisions':
600
# rc.setup(key_count, current=key_count)
601
#pb.update('Estimate', rc.max, rc.max)
604
570
def iter_substream_bytes(self):
605
571
if self.first_bytes is not None:
620
586
def record_stream(self):
621
587
"""Yield substream_type, substream from the byte stream."""
588
def wrap_and_count(pb, rc, substream):
589
"""Yield records from stream while showing progress."""
592
if self.current_type != 'revisions' and self.key_count != 0:
593
# As we know the number of revisions now (in self.key_count)
594
# we can setup and use record_counter (rc).
595
if not rc.is_initialized():
596
rc.setup(self.key_count, self.key_count)
597
for record in substream.read():
599
if rc.is_initialized() and counter == rc.STEP:
600
rc.increment(counter)
601
pb.update('Estimate', rc.current, rc.max)
603
if self.current_type == 'revisions':
604
# Total records is proportional to number of revs
605
# to fetch. With remote, we used self.key_count to
606
# track the number of revs. Once we have the revs
607
# counts in self.key_count, the progress bar changes
608
# from 'Estimating..' to 'Estimate' above.
610
if counter == rc.STEP:
611
pb.update('Estimating..', self.key_count)
622
616
self.seed_state()
623
617
pb = ui.ui_factory.nested_progress_bar()
618
rc = self._record_counter
624
619
# Make and consume sub generators, one per substream type:
625
620
while self.first_bytes is not None:
626
621
substream = NetworkRecordStream(self.iter_substream_bytes())
627
622
# after substream is fully consumed, self.current_type is set to
628
623
# the next type, and self.first_bytes is set to the matching bytes.
629
def wrap_and_count(substream):
631
if self.record_counter:
632
if self.current_type != 'revisions' and self.key_count != 0:
633
if not self.record_counter.is_initialized():
634
self.record_counter.setup(self.key_count,
636
for record in substream.read():
637
if self.record_counter:
638
if self.record_counter.is_initialized() and \
639
counter == self.record_counter.STEP:
640
self.record_counter.increment(counter)
641
pb.update('Estimate', self.record_counter.current,
642
self.record_counter.max)
644
if self.current_type == 'revisions':
646
if counter == self.record_counter.STEP:
647
pb.update('Estimating..', self.key_count)
652
yield self.current_type, wrap_and_count(substream)
653
if self.record_counter:
654
pb.update('Done', self.record_counter.max, self.record_counter.max)
624
yield self.current_type, wrap_and_count(pb, rc, substream)
626
pb.update('Done', rc.max, rc.max)
657
629
def seed_state(self):