~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-02 07:00:21 UTC
  • mfrom: (4060.1.4 branch.roundtrips)
  • Revision ID: pqm@pqm.ubuntu.com-20090302070021-dvcjdpf47t2aeari
(robertc) Streaming fetch from non-stacked branches on smart servers.
        (Robert Collins, Andrew Bennetts_

Show diffs side-by-side

added added

removed removed

Lines of Context:
39
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
40
from bzrlib import revision as _mod_revision
41
41
from bzrlib.util import bencode
42
 
from bzrlib.versionedfile import NetworkRecordStream
 
42
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
43
43
 
44
44
 
45
45
class SmartServerRepositoryRequest(SmartServerRequest):
333
333
        return SuccessfulSmartServerResponse(('ok', token))
334
334
 
335
335
 
 
336
class SmartServerRepositoryStreamSourceGetStream(SmartServerRepositoryRequest):
 
337
 
 
338
    def do_repository_request(self, repository, to_network_name):
 
339
        """Get a stream for inserting into a to_format repository.
 
340
 
 
341
        :param repository: The repository to stream from.
 
342
        :param to_network_name: The network name of the format of the target
 
343
            repository.
 
344
        """
 
345
        self._to_format = network_format_registry.get(to_network_name)
 
346
        return None # Signal that we want a body.
 
347
 
 
348
    def do_body(self, body_bytes):
 
349
        repository = self._repository
 
350
        repository.lock_read()
 
351
        try:
 
352
            search, error = self.recreate_search(repository, body_bytes)
 
353
            if error is not None:
 
354
                repository.unlock()
 
355
                return error
 
356
            search = search.get_result()
 
357
            source = repository._get_source(self._to_format)
 
358
            stream = source.get_stream(search)
 
359
        except Exception:
 
360
            exc_info = sys.exc_info()
 
361
            try:
 
362
                # On non-error, unlocking is done by the body stream handler.
 
363
                repository.unlock()
 
364
            finally:
 
365
                raise exc_info[0], exc_info[1], exc_info[2]
 
366
        return SuccessfulSmartServerResponse(('ok',),
 
367
            body_stream=self.body_stream(stream, repository))
 
368
 
 
369
    def body_stream(self, stream, repository):
 
370
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
371
        try:
 
372
            for bytes in byte_stream:
 
373
                yield bytes
 
374
        except errors.RevisionNotPresent, e:
 
375
            # This shouldn't be able to happen, but as we don't buffer
 
376
            # everything it can in theory happen.
 
377
            repository.unlock()
 
378
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
379
        else:
 
380
            repository.unlock()
 
381
 
 
382
 
 
383
def _stream_to_byte_stream(stream, src_format):
 
384
    """Convert a record stream to a self delimited byte stream."""
 
385
    pack_writer = pack.ContainerSerialiser()
 
386
    yield pack_writer.begin()
 
387
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
388
    for substream_type, substream in stream:
 
389
        for record in substream:
 
390
            if record.storage_kind in ('chunked', 'fulltext'):
 
391
                serialised = record_to_fulltext_bytes(record)
 
392
            else:
 
393
                serialised = record.get_bytes_as(record.storage_kind)
 
394
            if serialised:
 
395
                # Some streams embed the whole stream into the wire
 
396
                # representation of the first record, which means that
 
397
                # later records have no wire representation: we skip them.
 
398
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
399
    yield pack_writer.end()
 
400
 
 
401
 
 
402
def _byte_stream_to_stream(byte_stream):
 
403
    """Convert a byte stream into a format and a StreamSource stream.
 
404
 
 
405
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
406
    :return: (RepositoryFormat, stream_generator)
 
407
    """
 
408
    stream_decoder = pack.ContainerPushParser()
 
409
    def record_stream():
 
410
        """Closure to return the substreams."""
 
411
        # May have fully parsed records already.
 
412
        for record in stream_decoder.read_pending_records():
 
413
            record_names, record_bytes = record
 
414
            record_name, = record_names
 
415
            substream_type = record_name[0]
 
416
            substream = NetworkRecordStream([record_bytes])
 
417
            yield substream_type, substream.read()
 
418
        for bytes in byte_stream:
 
419
            stream_decoder.accept_bytes(bytes)
 
420
            for record in stream_decoder.read_pending_records():
 
421
                record_names, record_bytes = record
 
422
                record_name, = record_names
 
423
                substream_type = record_name[0]
 
424
                substream = NetworkRecordStream([record_bytes])
 
425
                yield substream_type, substream.read()
 
426
    for bytes in byte_stream:
 
427
        stream_decoder.accept_bytes(bytes)
 
428
        for record in stream_decoder.read_pending_records(max=1):
 
429
            record_names, src_format_name = record
 
430
            src_format = network_format_registry.get(src_format_name)
 
431
            return src_format, record_stream()
 
432
 
 
433
 
336
434
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
337
435
 
338
436
    def do_repository_request(self, repository, token):
415
513
 
416
514
 
417
515
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
 
516
    """Insert a record stream from a RemoteSink into a repository.
 
517
 
 
518
    This gets bytes pushed to it by the network infrastructure and turns that
 
519
    into a bytes iterator using a thread. That is then processed by
 
520
    _byte_stream_to_stream.
 
521
    """
418
522
 
419
523
    def do_repository_request(self, repository, resume_tokens):
420
524
        """StreamSink.insert_stream for a remote repository."""
422
526
        tokens = [token for token in resume_tokens.split(' ') if token]
423
527
        self.tokens = tokens
424
528
        self.repository = repository
425
 
        self.stream_decoder = pack.ContainerPushParser()
426
 
        self.src_format = None
427
529
        self.queue = Queue.Queue()
428
 
        self.insert_thread = None
 
530
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
531
        self.insert_thread.start()
429
532
 
430
533
    def do_chunk(self, body_stream_chunk):
431
 
        self.stream_decoder.accept_bytes(body_stream_chunk)
432
 
        for record in self.stream_decoder.read_pending_records():
433
 
            record_names, record_bytes = record
434
 
            if self.src_format is None:
435
 
                src_format_name = record_bytes
436
 
                src_format = network_format_registry.get(src_format_name)
437
 
                self.src_format = src_format
438
 
                self.insert_thread = threading.Thread(target=self._inserter_thread)
439
 
                self.insert_thread.start()
440
 
            else:
441
 
                record_name, = record_names
442
 
                substream_type = record_name[0]
443
 
                stream = NetworkRecordStream([record_bytes])
444
 
                for record in stream.read():
445
 
                    self.queue.put((substream_type, [record]))
 
534
        self.queue.put(body_stream_chunk)
446
535
 
447
536
    def _inserter_thread(self):
448
537
        try:
 
538
            src_format, stream = _byte_stream_to_stream(
 
539
                self.blocking_byte_stream())
449
540
            self.insert_result = self.repository._get_sink().insert_stream(
450
 
                self.blocking_read_stream(), self.src_format, self.tokens)
 
541
                stream, src_format, self.tokens)
451
542
            self.insert_ok = True
452
543
        except:
453
544
            self.insert_exception = sys.exc_info()
454
545
            self.insert_ok = False
455
546
 
456
 
    def blocking_read_stream(self):
 
547
    def blocking_byte_stream(self):
457
548
        while True:
458
 
            item = self.queue.get()
459
 
            if item is StopIteration:
 
549
            bytes = self.queue.get()
 
550
            if bytes is StopIteration:
460
551
                return
461
552
            else:
462
 
                yield item
 
553
                yield bytes
463
554
 
464
555
    def do_end(self):
465
556
        self.queue.put(StopIteration)