~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Gary van der Merwe
  • Date: 2010-08-02 19:56:52 UTC
  • mfrom: (5050.3.18 2.2)
  • mto: (5050.3.19 2.2)
  • mto: This revision was merged to the branch mainline in revision 5371.
  • Revision ID: garyvdm@gmail.com-20100802195652-o1ppjemhwrr98i61
MergeĀ lp:bzr/2.2.

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
    SuccessfulSmartServerResponse,
40
40
    )
41
41
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
42
from bzrlib.recordcounter import RecordCounter
42
43
from bzrlib import revision as _mod_revision
43
44
from bzrlib.versionedfile import (
44
45
    NetworkRecordStream,
392
393
        if token == '':
393
394
            token = None
394
395
        try:
395
 
            token = repository.lock_write(token=token)
 
396
            token = repository.lock_write(token=token).repository_token
396
397
        except errors.LockContention, e:
397
398
            return FailedSmartServerResponse(('LockContention',))
398
399
        except errors.UnlockableTransport:
502
503
    yield pack_writer.begin()
503
504
    yield pack_writer.bytes_record(src_format.network_name(), '')
504
505
    for substream_type, substream in stream:
505
 
        if substream_type == 'inventory-deltas':
506
 
            # This doesn't feel like the ideal place to issue this warning;
507
 
            # however we don't want to do it in the Repository that's
508
 
            # generating the stream, because that might be on the server.
509
 
            # Instead we try to observe it as the stream goes by.
510
 
            ui.ui_factory.warn_cross_format_fetch(src_format,
511
 
                '(remote)')
512
506
        for record in substream:
513
507
            if record.storage_kind in ('chunked', 'fulltext'):
514
508
                serialised = record_to_fulltext_bytes(record)
551
545
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
552
546
    """
553
547
 
554
 
    def __init__(self, byte_stream):
 
548
    def __init__(self, byte_stream, record_counter):
555
549
        """Create a _ByteStreamDecoder."""
556
550
        self.stream_decoder = pack.ContainerPushParser()
557
551
        self.current_type = None
558
552
        self.first_bytes = None
559
553
        self.byte_stream = byte_stream
 
554
        self._record_counter = record_counter
 
555
        self.key_count = 0
560
556
 
561
557
    def iter_stream_decoder(self):
562
558
        """Iterate the contents of the pack from stream_decoder."""
587
583
 
588
584
    def record_stream(self):
589
585
        """Yield substream_type, substream from the byte stream."""
 
586
        def wrap_and_count(pb, rc, substream):
 
587
            """Yield records from stream while showing progress."""
 
588
            counter = 0
 
589
            if rc:
 
590
                if self.current_type != 'revisions' and self.key_count != 0:
 
591
                    # As we know the number of revisions now (in self.key_count)
 
592
                    # we can setup and use record_counter (rc).
 
593
                    if not rc.is_initialized():
 
594
                        rc.setup(self.key_count, self.key_count)
 
595
            for record in substream.read():
 
596
                if rc:
 
597
                    if rc.is_initialized() and counter == rc.STEP:
 
598
                        rc.increment(counter)
 
599
                        pb.update('Estimate', rc.current, rc.max)
 
600
                        counter = 0
 
601
                    if self.current_type == 'revisions':
 
602
                        # Total records is proportional to number of revs
 
603
                        # to fetch. With remote, we used self.key_count to
 
604
                        # track the number of revs. Once we have the revs
 
605
                        # counts in self.key_count, the progress bar changes
 
606
                        # from 'Estimating..' to 'Estimate' above.
 
607
                        self.key_count += 1
 
608
                        if counter == rc.STEP:
 
609
                            pb.update('Estimating..', self.key_count)
 
610
                            counter = 0
 
611
                counter += 1
 
612
                yield record
 
613
 
590
614
        self.seed_state()
 
615
        pb = ui.ui_factory.nested_progress_bar()
 
616
        rc = self._record_counter
591
617
        # Make and consume sub generators, one per substream type:
592
618
        while self.first_bytes is not None:
593
619
            substream = NetworkRecordStream(self.iter_substream_bytes())
594
620
            # after substream is fully consumed, self.current_type is set to
595
621
            # the next type, and self.first_bytes is set to the matching bytes.
596
 
            yield self.current_type, substream.read()
 
622
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
623
        if rc:
 
624
            pb.update('Done', rc.max, rc.max)
 
625
        pb.finished()
597
626
 
598
627
    def seed_state(self):
599
628
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
604
633
        list(self.iter_substream_bytes())
605
634
 
606
635
 
607
 
def _byte_stream_to_stream(byte_stream):
 
636
def _byte_stream_to_stream(byte_stream, record_counter=None):
608
637
    """Convert a byte stream into a format and a stream.
609
638
 
610
639
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
611
640
    :return: (RepositoryFormat, stream_generator)
612
641
    """
613
 
    decoder = _ByteStreamDecoder(byte_stream)
 
642
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
614
643
    for bytes in byte_stream:
615
644
        decoder.stream_decoder.accept_bytes(bytes)
616
645
        for record in decoder.stream_decoder.read_pending_records(max=1):