1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006, 2007, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
81
83
recreate_search trusts that clients will look for missing things
82
84
they expected and get it from elsewhere.
84
if search_bytes == 'everything':
85
return graph.EverythingResult(repository), None
86
86
lines = search_bytes.split('\n')
87
87
if lines[0] == 'ancestry-of':
396
token = repository.lock_write(token=token).repository_token
396
token = repository.lock_write(token=token)
397
397
except errors.LockContention, e:
398
398
return FailedSmartServerResponse(('LockContention',))
399
399
except errors.UnlockableTransport:
414
414
def do_repository_request(self, repository, to_network_name):
415
415
"""Get a stream for inserting into a to_format repository.
417
The request body is 'search_bytes', a description of the revisions
420
In 2.3 this verb added support for search_bytes == 'everything'. Older
421
implementations will respond with a BadSearch error, and clients should
422
catch this and fallback appropriately.
424
417
:param repository: The repository to stream from.
425
418
:param to_network_name: The network name of the format of the target
500
493
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
501
"""The same as Repository.get_stream, but will return stream CHK formats to
504
See SmartServerRepositoryGetStream._should_fake_unknown.
509
495
def _should_fake_unknown(self):
510
496
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
517
503
yield pack_writer.begin()
518
504
yield pack_writer.bytes_record(src_format.network_name(), '')
519
505
for substream_type, substream in stream:
506
if substream_type == 'inventory-deltas':
507
# This doesn't feel like the ideal place to issue this warning;
508
# however we don't want to do it in the Repository that's
509
# generating the stream, because that might be on the server.
510
# Instead we try to observe it as the stream goes by.
511
ui.ui_factory.warn_cross_format_fetch(src_format,
520
513
for record in substream:
521
514
if record.storage_kind in ('chunked', 'fulltext'):
522
515
serialised = record_to_fulltext_bytes(record)
516
elif record.storage_kind == 'inventory-delta':
517
serialised = record_to_inventory_delta_bytes(record)
523
518
elif record.storage_kind == 'absent':
524
519
raise ValueError("Absent factory for %s" % (record.key,))
557
552
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
560
def __init__(self, byte_stream, record_counter):
555
def __init__(self, byte_stream):
561
556
"""Create a _ByteStreamDecoder."""
562
557
self.stream_decoder = pack.ContainerPushParser()
563
558
self.current_type = None
564
559
self.first_bytes = None
565
560
self.byte_stream = byte_stream
566
self._record_counter = record_counter
569
562
def iter_stream_decoder(self):
570
563
"""Iterate the contents of the pack from stream_decoder."""
596
589
def record_stream(self):
597
590
"""Yield substream_type, substream from the byte stream."""
598
def wrap_and_count(pb, rc, substream):
599
"""Yield records from stream while showing progress."""
602
if self.current_type != 'revisions' and self.key_count != 0:
603
# As we know the number of revisions now (in self.key_count)
604
# we can setup and use record_counter (rc).
605
if not rc.is_initialized():
606
rc.setup(self.key_count, self.key_count)
607
for record in substream.read():
609
if rc.is_initialized() and counter == rc.STEP:
610
rc.increment(counter)
611
pb.update('Estimate', rc.current, rc.max)
613
if self.current_type == 'revisions':
614
# Total records is proportional to number of revs
615
# to fetch. With remote, we used self.key_count to
616
# track the number of revs. Once we have the revs
617
# counts in self.key_count, the progress bar changes
618
# from 'Estimating..' to 'Estimate' above.
620
if counter == rc.STEP:
621
pb.update('Estimating..', self.key_count)
626
591
self.seed_state()
627
pb = ui.ui_factory.nested_progress_bar()
628
rc = self._record_counter
629
592
# Make and consume sub generators, one per substream type:
630
593
while self.first_bytes is not None:
631
594
substream = NetworkRecordStream(self.iter_substream_bytes())
632
595
# after substream is fully consumed, self.current_type is set to
633
596
# the next type, and self.first_bytes is set to the matching bytes.
634
yield self.current_type, wrap_and_count(pb, rc, substream)
636
pb.update('Done', rc.max, rc.max)
597
yield self.current_type, substream.read()
639
599
def seed_state(self):
640
600
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
645
605
list(self.iter_substream_bytes())
648
def _byte_stream_to_stream(byte_stream, record_counter=None):
608
def _byte_stream_to_stream(byte_stream):
649
609
"""Convert a byte stream into a format and a stream.
651
611
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
652
612
:return: (RepositoryFormat, stream_generator)
654
decoder = _ByteStreamDecoder(byte_stream, record_counter)
614
decoder = _ByteStreamDecoder(byte_stream)
655
615
for bytes in byte_stream:
656
616
decoder.stream_decoder.accept_bytes(bytes)
657
617
for record in decoder.stream_decoder.read_pending_records(max=1):
725
685
def _tarball_of_dir(self, dirname, compression, ofile):
727
686
filename = os.path.basename(ofile.name)
728
687
tarball = tarfile.open(fileobj=ofile, name=filename,
729
688
mode='w|' + compression)