~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-08-17 18:13:57 UTC
  • mfrom: (5268.7.29 transport-segments)
  • Revision ID: pqm@pqm.ubuntu.com-20110817181357-y5q5eth1hk8bl3om
(jelmer) Allow specifying the colocated branch to use in the branch URL,
 and retrieving the branch name using ControlDir._get_selected_branch.
 (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007, 2010 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
import os
21
21
import Queue
22
22
import sys
23
 
import tarfile
24
23
import tempfile
25
24
import threading
26
25
 
31
30
    osutils,
32
31
    pack,
33
32
    ui,
34
 
    versionedfile,
35
33
    )
36
34
from bzrlib.bzrdir import BzrDir
37
35
from bzrlib.smart.request import (
83
81
            recreate_search trusts that clients will look for missing things
84
82
            they expected and get it from elsewhere.
85
83
        """
 
84
        if search_bytes == 'everything':
 
85
            return graph.EverythingResult(repository), None
86
86
        lines = search_bytes.split('\n')
87
87
        if lines[0] == 'ancestry-of':
88
88
            heads = lines[1:]
393
393
        if token == '':
394
394
            token = None
395
395
        try:
396
 
            token = repository.lock_write(token=token)
 
396
            token = repository.lock_write(token=token).repository_token
397
397
        except errors.LockContention, e:
398
398
            return FailedSmartServerResponse(('LockContention',))
399
399
        except errors.UnlockableTransport:
414
414
    def do_repository_request(self, repository, to_network_name):
415
415
        """Get a stream for inserting into a to_format repository.
416
416
 
 
417
        The request body is 'search_bytes', a description of the revisions
 
418
        being requested.
 
419
 
 
420
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
421
        implementations will respond with a BadSearch error, and clients should
 
422
        catch this and fallback appropriately.
 
423
 
417
424
        :param repository: The repository to stream from.
418
425
        :param to_network_name: The network name of the format of the target
419
426
            repository.
491
498
 
492
499
 
493
500
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
501
    """The same as Repository.get_stream, but will return stream CHK formats to
 
502
    clients.
 
503
 
 
504
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
505
    
 
506
    New in 1.19.
 
507
    """
494
508
 
495
509
    def _should_fake_unknown(self):
496
510
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
503
517
    yield pack_writer.begin()
504
518
    yield pack_writer.bytes_record(src_format.network_name(), '')
505
519
    for substream_type, substream in stream:
506
 
        if substream_type == 'inventory-deltas':
507
 
            # This doesn't feel like the ideal place to issue this warning;
508
 
            # however we don't want to do it in the Repository that's
509
 
            # generating the stream, because that might be on the server.
510
 
            # Instead we try to observe it as the stream goes by.
511
 
            ui.ui_factory.warn_cross_format_fetch(src_format,
512
 
                '(remote)')
513
520
        for record in substream:
514
521
            if record.storage_kind in ('chunked', 'fulltext'):
515
522
                serialised = record_to_fulltext_bytes(record)
516
 
            elif record.storage_kind == 'inventory-delta':
517
 
                serialised = record_to_inventory_delta_bytes(record)
518
523
            elif record.storage_kind == 'absent':
519
524
                raise ValueError("Absent factory for %s" % (record.key,))
520
525
            else:
552
557
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
553
558
    """
554
559
 
555
 
    def __init__(self, byte_stream):
 
560
    def __init__(self, byte_stream, record_counter):
556
561
        """Create a _ByteStreamDecoder."""
557
562
        self.stream_decoder = pack.ContainerPushParser()
558
563
        self.current_type = None
559
564
        self.first_bytes = None
560
565
        self.byte_stream = byte_stream
 
566
        self._record_counter = record_counter
 
567
        self.key_count = 0
561
568
 
562
569
    def iter_stream_decoder(self):
563
570
        """Iterate the contents of the pack from stream_decoder."""
588
595
 
589
596
    def record_stream(self):
590
597
        """Yield substream_type, substream from the byte stream."""
 
598
        def wrap_and_count(pb, rc, substream):
 
599
            """Yield records from stream while showing progress."""
 
600
            counter = 0
 
601
            if rc:
 
602
                if self.current_type != 'revisions' and self.key_count != 0:
 
603
                    # As we know the number of revisions now (in self.key_count)
 
604
                    # we can setup and use record_counter (rc).
 
605
                    if not rc.is_initialized():
 
606
                        rc.setup(self.key_count, self.key_count)
 
607
            for record in substream.read():
 
608
                if rc:
 
609
                    if rc.is_initialized() and counter == rc.STEP:
 
610
                        rc.increment(counter)
 
611
                        pb.update('Estimate', rc.current, rc.max)
 
612
                        counter = 0
 
613
                    if self.current_type == 'revisions':
 
614
                        # Total records is proportional to number of revs
 
615
                        # to fetch. With remote, we used self.key_count to
 
616
                        # track the number of revs. Once we have the revs
 
617
                        # counts in self.key_count, the progress bar changes
 
618
                        # from 'Estimating..' to 'Estimate' above.
 
619
                        self.key_count += 1
 
620
                        if counter == rc.STEP:
 
621
                            pb.update('Estimating..', self.key_count)
 
622
                            counter = 0
 
623
                counter += 1
 
624
                yield record
 
625
 
591
626
        self.seed_state()
 
627
        pb = ui.ui_factory.nested_progress_bar()
 
628
        rc = self._record_counter
592
629
        # Make and consume sub generators, one per substream type:
593
630
        while self.first_bytes is not None:
594
631
            substream = NetworkRecordStream(self.iter_substream_bytes())
595
632
            # after substream is fully consumed, self.current_type is set to
596
633
            # the next type, and self.first_bytes is set to the matching bytes.
597
 
            yield self.current_type, substream.read()
 
634
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
635
        if rc:
 
636
            pb.update('Done', rc.max, rc.max)
 
637
        pb.finished()
598
638
 
599
639
    def seed_state(self):
600
640
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
605
645
        list(self.iter_substream_bytes())
606
646
 
607
647
 
608
 
def _byte_stream_to_stream(byte_stream):
 
648
def _byte_stream_to_stream(byte_stream, record_counter=None):
609
649
    """Convert a byte stream into a format and a stream.
610
650
 
611
651
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
612
652
    :return: (RepositoryFormat, stream_generator)
613
653
    """
614
 
    decoder = _ByteStreamDecoder(byte_stream)
 
654
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
615
655
    for bytes in byte_stream:
616
656
        decoder.stream_decoder.accept_bytes(bytes)
617
657
        for record in decoder.stream_decoder.read_pending_records(max=1):
683
723
            temp.close()
684
724
 
685
725
    def _tarball_of_dir(self, dirname, compression, ofile):
 
726
        import tarfile
686
727
        filename = os.path.basename(ofile.name)
687
728
        tarball = tarfile.open(fileobj=ofile, name=filename,
688
729
            mode='w|' + compression)