336
342
return SuccessfulSmartServerResponse(('ok',))
345
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
347
def do_repository_request(self, repository, str_bool_new_value):
348
if str_bool_new_value == 'True':
352
repository.set_make_working_trees(new_value)
353
return SuccessfulSmartServerResponse(('ok',))
339
356
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
340
357
"""Get the raw repository files as a tarball.
342
359
The returned tarball contains a .bzr control directory which in turn
343
360
contains a repository.
345
This takes one parameter, compression, which currently must be
362
This takes one parameter, compression, which currently must be
346
363
"", "gz", or "bz2".
348
365
This is used to implement the Repository.copy_content_into operation.
392
409
tarball.add(dirname, '.bzr') # recursive by default
414
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
416
def do_repository_request(self, repository, resume_tokens):
417
"""StreamSink.insert_stream for a remote repository."""
418
repository.lock_write()
419
tokens = [token for token in resume_tokens.split(' ') if token]
421
repository.resume_write_group(tokens)
423
repository.start_write_group()
424
self.repository = repository
425
self.stream_decoder = pack.ContainerPushParser()
426
self.src_format = None
427
self.queue = Queue.Queue()
428
self.insert_thread = None
430
def do_chunk(self, body_stream_chunk):
431
self.stream_decoder.accept_bytes(body_stream_chunk)
432
for record in self.stream_decoder.read_pending_records():
433
record_names, record_bytes = record
434
if self.src_format is None:
435
src_format_name = record_bytes
436
src_format = network_format_registry.get(src_format_name)
437
self.src_format = src_format
438
self.insert_thread = threading.Thread(target=self._inserter_thread)
439
self.insert_thread.start()
441
record_name, = record_names
442
substream_type = record_name[0]
443
stream = NetworkRecordStream([record_bytes])
444
for record in stream.read():
445
self.queue.put((substream_type, [record]))
447
def _inserter_thread(self):
448
self.repository._get_sink().insert_stream(self.blocking_read_stream(),
451
def blocking_read_stream(self):
453
item = self.queue.get()
454
if item is StopIteration:
460
self.queue.put(StopIteration)
461
if self.insert_thread is not None:
462
self.insert_thread.join()
465
for prefix, versioned_file in (
466
('texts', self.repository.texts),
467
('inventories', self.repository.inventories),
468
('revisions', self.repository.revisions),
469
('signatures', self.repository.signatures),
471
missing_keys.update((prefix,) + key for key in
472
versioned_file.get_missing_compression_parent_keys())
473
except NotImplementedError:
474
# cannot even attempt suspending.
478
# suspend the write group and tell the caller what we is
479
# missing. We know we can suspend or else we would not have
480
# entered this code path. (All repositories that can handle
481
# missing keys can handle suspending a write group).
482
write_group_tokens = self.repository.suspend_write_group()
483
# bzip needed? missing keys should typically be a small set.
484
# Should this be a streaming body response ?
485
missing_keys = sorted(missing_keys)
486
bytes = bencode.bencode((write_group_tokens, missing_keys))
487
return SuccessfulSmartServerResponse(('missing-basis', bytes))
489
self.repository.commit_write_group()
490
self.repository.unlock()
491
return SuccessfulSmartServerResponse(('ok', ))