~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Andrew Bennetts
  • Date: 2010-02-12 04:33:05 UTC
  • mfrom: (5031 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5032.
  • Revision ID: andrew.bennetts@canonical.com-20100212043305-ujdbsdoviql2t7i3
MergeĀ lp:bzr

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
30
30
    osutils,
31
31
    pack,
32
32
    ui,
 
33
    versionedfile,
33
34
    )
34
35
from bzrlib.bzrdir import BzrDir
35
36
from bzrlib.smart.request import (
81
82
            recreate_search trusts that clients will look for missing things
82
83
            they expected and get it from elsewhere.
83
84
        """
84
 
        if search_bytes == 'everything':
85
 
            return graph.EverythingResult(repository), None
86
85
        lines = search_bytes.split('\n')
87
86
        if lines[0] == 'ancestry-of':
88
87
            heads = lines[1:]
393
392
        if token == '':
394
393
            token = None
395
394
        try:
396
 
            token = repository.lock_write(token=token).repository_token
 
395
            token = repository.lock_write(token=token)
397
396
        except errors.LockContention, e:
398
397
            return FailedSmartServerResponse(('LockContention',))
399
398
        except errors.UnlockableTransport:
414
413
    def do_repository_request(self, repository, to_network_name):
415
414
        """Get a stream for inserting into a to_format repository.
416
415
 
417
 
        The request body is 'search_bytes', a description of the revisions
418
 
        being requested.
419
 
 
420
 
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
421
 
        implementations will respond with a BadSearch error, and clients should
422
 
        catch this and fallback appropriately.
423
 
 
424
416
        :param repository: The repository to stream from.
425
417
        :param to_network_name: The network name of the format of the target
426
418
            repository.
498
490
 
499
491
 
500
492
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
501
 
    """The same as Repository.get_stream, but will return stream CHK formats to
502
 
    clients.
503
 
 
504
 
    See SmartServerRepositoryGetStream._should_fake_unknown.
505
 
    
506
 
    New in 1.19.
507
 
    """
508
493
 
509
494
    def _should_fake_unknown(self):
510
495
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
517
502
    yield pack_writer.begin()
518
503
    yield pack_writer.bytes_record(src_format.network_name(), '')
519
504
    for substream_type, substream in stream:
 
505
        if substream_type == 'inventory-deltas':
 
506
            # This doesn't feel like the ideal place to issue this warning;
 
507
            # however we don't want to do it in the Repository that's
 
508
            # generating the stream, because that might be on the server.
 
509
            # Instead we try to observe it as the stream goes by.
 
510
            ui.ui_factory.warn_cross_format_fetch(src_format,
 
511
                '(remote)')
520
512
        for record in substream:
521
513
            if record.storage_kind in ('chunked', 'fulltext'):
522
514
                serialised = record_to_fulltext_bytes(record)
 
515
            elif record.storage_kind == 'inventory-delta':
 
516
                serialised = record_to_inventory_delta_bytes(record)
523
517
            elif record.storage_kind == 'absent':
524
518
                raise ValueError("Absent factory for %s" % (record.key,))
525
519
            else:
557
551
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
558
552
    """
559
553
 
560
 
    def __init__(self, byte_stream, record_counter):
 
554
    def __init__(self, byte_stream):
561
555
        """Create a _ByteStreamDecoder."""
562
556
        self.stream_decoder = pack.ContainerPushParser()
563
557
        self.current_type = None
564
558
        self.first_bytes = None
565
559
        self.byte_stream = byte_stream
566
 
        self._record_counter = record_counter
567
 
        self.key_count = 0
568
560
 
569
561
    def iter_stream_decoder(self):
570
562
        """Iterate the contents of the pack from stream_decoder."""
595
587
 
596
588
    def record_stream(self):
597
589
        """Yield substream_type, substream from the byte stream."""
598
 
        def wrap_and_count(pb, rc, substream):
599
 
            """Yield records from stream while showing progress."""
600
 
            counter = 0
601
 
            if rc:
602
 
                if self.current_type != 'revisions' and self.key_count != 0:
603
 
                    # As we know the number of revisions now (in self.key_count)
604
 
                    # we can setup and use record_counter (rc).
605
 
                    if not rc.is_initialized():
606
 
                        rc.setup(self.key_count, self.key_count)
607
 
            for record in substream.read():
608
 
                if rc:
609
 
                    if rc.is_initialized() and counter == rc.STEP:
610
 
                        rc.increment(counter)
611
 
                        pb.update('Estimate', rc.current, rc.max)
612
 
                        counter = 0
613
 
                    if self.current_type == 'revisions':
614
 
                        # Total records is proportional to number of revs
615
 
                        # to fetch. With remote, we used self.key_count to
616
 
                        # track the number of revs. Once we have the revs
617
 
                        # counts in self.key_count, the progress bar changes
618
 
                        # from 'Estimating..' to 'Estimate' above.
619
 
                        self.key_count += 1
620
 
                        if counter == rc.STEP:
621
 
                            pb.update('Estimating..', self.key_count)
622
 
                            counter = 0
623
 
                counter += 1
624
 
                yield record
625
 
 
626
590
        self.seed_state()
627
 
        pb = ui.ui_factory.nested_progress_bar()
628
 
        rc = self._record_counter
629
591
        # Make and consume sub generators, one per substream type:
630
592
        while self.first_bytes is not None:
631
593
            substream = NetworkRecordStream(self.iter_substream_bytes())
632
594
            # after substream is fully consumed, self.current_type is set to
633
595
            # the next type, and self.first_bytes is set to the matching bytes.
634
 
            yield self.current_type, wrap_and_count(pb, rc, substream)
635
 
        if rc:
636
 
            pb.update('Done', rc.max, rc.max)
637
 
        pb.finished()
 
596
            yield self.current_type, substream.read()
638
597
 
639
598
    def seed_state(self):
640
599
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
645
604
        list(self.iter_substream_bytes())
646
605
 
647
606
 
648
 
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
607
def _byte_stream_to_stream(byte_stream):
649
608
    """Convert a byte stream into a format and a stream.
650
609
 
651
610
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
652
611
    :return: (RepositoryFormat, stream_generator)
653
612
    """
654
 
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
613
    decoder = _ByteStreamDecoder(byte_stream)
655
614
    for bytes in byte_stream:
656
615
        decoder.stream_decoder.accept_bytes(bytes)
657
616
        for record in decoder.stream_decoder.read_pending_records(max=1):