39
39
SuccessfulSmartServerResponse,
41
41
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
42
from bzrlib.recordcounter import RecordCounter
42
43
from bzrlib import revision as _mod_revision
43
44
from bzrlib.versionedfile import (
44
45
NetworkRecordStream,
395
token = repository.lock_write(token=token)
396
token = repository.lock_write(token=token).repository_token
396
397
except errors.LockContention, e:
397
398
return FailedSmartServerResponse(('LockContention',))
398
399
except errors.UnlockableTransport:
502
503
yield pack_writer.begin()
503
504
yield pack_writer.bytes_record(src_format.network_name(), '')
504
505
for substream_type, substream in stream:
505
if substream_type == 'inventory-deltas':
506
# This doesn't feel like the ideal place to issue this warning;
507
# however we don't want to do it in the Repository that's
508
# generating the stream, because that might be on the server.
509
# Instead we try to observe it as the stream goes by.
510
ui.ui_factory.warn_cross_format_fetch(src_format,
512
506
for record in substream:
513
507
if record.storage_kind in ('chunked', 'fulltext'):
514
508
serialised = record_to_fulltext_bytes(record)
551
545
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
554
def __init__(self, byte_stream):
548
def __init__(self, byte_stream, record_counter):
555
549
"""Create a _ByteStreamDecoder."""
556
550
self.stream_decoder = pack.ContainerPushParser()
557
551
self.current_type = None
558
552
self.first_bytes = None
559
553
self.byte_stream = byte_stream
554
self._record_counter = record_counter
561
557
def iter_stream_decoder(self):
562
558
"""Iterate the contents of the pack from stream_decoder."""
588
584
def record_stream(self):
589
585
"""Yield substream_type, substream from the byte stream."""
586
def wrap_and_count(pb, rc, substream):
587
"""Yield records from stream while showing progress."""
590
if self.current_type != 'revisions' and self.key_count != 0:
591
# As we know the number of revisions now (in self.key_count)
592
# we can setup and use record_counter (rc).
593
if not rc.is_initialized():
594
rc.setup(self.key_count, self.key_count)
595
for record in substream.read():
597
if rc.is_initialized() and counter == rc.STEP:
598
rc.increment(counter)
599
pb.update('Estimate', rc.current, rc.max)
601
if self.current_type == 'revisions':
602
# Total records is proportional to number of revs
603
# to fetch. With remote, we used self.key_count to
604
# track the number of revs. Once we have the revs
605
# counts in self.key_count, the progress bar changes
606
# from 'Estimating..' to 'Estimate' above.
608
if counter == rc.STEP:
609
pb.update('Estimating..', self.key_count)
590
614
self.seed_state()
615
pb = ui.ui_factory.nested_progress_bar()
616
rc = self._record_counter
591
617
# Make and consume sub generators, one per substream type:
592
618
while self.first_bytes is not None:
593
619
substream = NetworkRecordStream(self.iter_substream_bytes())
594
620
# after substream is fully consumed, self.current_type is set to
595
621
# the next type, and self.first_bytes is set to the matching bytes.
596
yield self.current_type, substream.read()
622
yield self.current_type, wrap_and_count(pb, rc, substream)
624
pb.update('Done', rc.max, rc.max)
598
627
def seed_state(self):
599
628
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
604
633
list(self.iter_substream_bytes())
607
def _byte_stream_to_stream(byte_stream):
636
def _byte_stream_to_stream(byte_stream, record_counter=None):
608
637
"""Convert a byte stream into a format and a stream.
610
639
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
611
640
:return: (RepositoryFormat, stream_generator)
613
decoder = _ByteStreamDecoder(byte_stream)
642
decoder = _ByteStreamDecoder(byte_stream, record_counter)
614
643
for bytes in byte_stream:
615
644
decoder.stream_decoder.accept_bytes(bytes)
616
645
for record in decoder.stream_decoder.read_pending_records(max=1):