333
333
return SuccessfulSmartServerResponse(('ok', token))
336
class SmartServerRepositoryStreamSourceGetStream(SmartServerRepositoryRequest):
338
def do_repository_request(self, repository, to_network_name):
339
"""Get a stream for inserting into a to_format repository.
341
:param repository: The repository to stream from.
342
:param to_network_name: The network name of the format of the target
345
self._to_format = network_format_registry.get(to_network_name)
346
return None # Signal that we want a body.
348
def do_body(self, body_bytes):
349
repository = self._repository
350
repository.lock_read()
352
search, error = self.recreate_search(repository, body_bytes)
353
if error is not None:
356
search = search.get_result()
357
source = repository._get_source(self._to_format)
358
stream = source.get_stream(search)
360
exc_info = sys.exc_info()
362
# On non-error, unlocking is done by the body stream handler.
365
raise exc_info[0], exc_info[1], exc_info[2]
366
return SuccessfulSmartServerResponse(('ok',),
367
body_stream=self.body_stream(stream, repository))
369
def body_stream(self, stream, repository):
370
byte_stream = _stream_to_byte_stream(stream, repository._format)
372
for bytes in byte_stream:
374
except errors.RevisionNotPresent, e:
375
# This shouldn't be able to happen, but as we don't buffer
376
# everything it can in theory happen.
378
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
383
def _stream_to_byte_stream(stream, src_format):
384
"""Convert a record stream to a self delimited byte stream."""
385
pack_writer = pack.ContainerSerialiser()
386
yield pack_writer.begin()
387
yield pack_writer.bytes_record(src_format.network_name(), '')
388
for substream_type, substream in stream:
389
for record in substream:
390
if record.storage_kind in ('chunked', 'fulltext'):
391
serialised = record_to_fulltext_bytes(record)
393
serialised = record.get_bytes_as(record.storage_kind)
395
# Some streams embed the whole stream into the wire
396
# representation of the first record, which means that
397
# later records have no wire representation: we skip them.
398
yield pack_writer.bytes_record(serialised, [(substream_type,)])
399
yield pack_writer.end()
402
def _byte_stream_to_stream(byte_stream):
403
"""Convert a byte stream into a format and a StreamSource stream.
405
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
406
:return: (RepositoryFormat, stream_generator)
408
stream_decoder = pack.ContainerPushParser()
410
"""Closure to return the substreams."""
411
# May have fully parsed records already.
412
for record in stream_decoder.read_pending_records():
413
record_names, record_bytes = record
414
record_name, = record_names
415
substream_type = record_name[0]
416
substream = NetworkRecordStream([record_bytes])
417
yield substream_type, substream.read()
418
for bytes in byte_stream:
419
stream_decoder.accept_bytes(bytes)
420
for record in stream_decoder.read_pending_records():
421
record_names, record_bytes = record
422
record_name, = record_names
423
substream_type = record_name[0]
424
substream = NetworkRecordStream([record_bytes])
425
yield substream_type, substream.read()
426
for bytes in byte_stream:
427
stream_decoder.accept_bytes(bytes)
428
for record in stream_decoder.read_pending_records(max=1):
429
record_names, src_format_name = record
430
src_format = network_format_registry.get(src_format_name)
431
return src_format, record_stream()
336
434
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
338
436
def do_repository_request(self, repository, token):
422
526
tokens = [token for token in resume_tokens.split(' ') if token]
423
527
self.tokens = tokens
424
528
self.repository = repository
425
self.stream_decoder = pack.ContainerPushParser()
426
self.src_format = None
427
529
self.queue = Queue.Queue()
428
self.insert_thread = None
530
self.insert_thread = threading.Thread(target=self._inserter_thread)
531
self.insert_thread.start()
430
533
def do_chunk(self, body_stream_chunk):
431
self.stream_decoder.accept_bytes(body_stream_chunk)
432
for record in self.stream_decoder.read_pending_records():
433
record_names, record_bytes = record
434
if self.src_format is None:
435
src_format_name = record_bytes
436
src_format = network_format_registry.get(src_format_name)
437
self.src_format = src_format
438
self.insert_thread = threading.Thread(target=self._inserter_thread)
439
self.insert_thread.start()
441
record_name, = record_names
442
substream_type = record_name[0]
443
stream = NetworkRecordStream([record_bytes])
444
for record in stream.read():
445
self.queue.put((substream_type, [record]))
534
self.queue.put(body_stream_chunk)
447
536
def _inserter_thread(self):
538
src_format, stream = _byte_stream_to_stream(
539
self.blocking_byte_stream())
449
540
self.insert_result = self.repository._get_sink().insert_stream(
450
self.blocking_read_stream(), self.src_format, self.tokens)
541
stream, src_format, self.tokens)
451
542
self.insert_ok = True
453
544
self.insert_exception = sys.exc_info()
454
545
self.insert_ok = False
456
def blocking_read_stream(self):
547
def blocking_byte_stream(self):
458
item = self.queue.get()
459
if item is StopIteration:
549
bytes = self.queue.get()
550
if bytes is StopIteration:
464
555
def do_end(self):
465
556
self.queue.put(StopIteration)