13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Server-side repository related request implmentations."""
20
from cStringIO import StringIO
26
from bzrlib import errors
27
34
from bzrlib.bzrdir import BzrDir
28
from bzrlib.pack import ContainerSerialiser
29
35
from bzrlib.smart.request import (
30
36
FailedSmartServerResponse,
31
37
SmartServerRequest,
32
38
SuccessfulSmartServerResponse,
34
from bzrlib.repository import _strip_NULL_ghosts
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
35
41
from bzrlib import revision as _mod_revision
42
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
38
45
class SmartServerRepositoryRequest(SmartServerRequest):
66
def recreate_search(self, repository, recipe_bytes):
67
lines = recipe_bytes.split('\n')
73
def recreate_search(self, repository, search_bytes, discard_excess=False):
74
"""Recreate a search from its serialised form.
76
:param discard_excess: If True, and the search refers to data we don't
77
have, just silently accept that fact - the verb calling
78
recreate_search trusts that clients will look for missing things
79
they expected and get it from elsewhere.
81
lines = search_bytes.split('\n')
82
if lines[0] == 'ancestry-of':
84
search_result = graph.PendingAncestryResult(heads, repository)
85
return search_result, None
86
elif lines[0] == 'search':
87
return self.recreate_search_from_recipe(repository, lines[1:],
88
discard_excess=discard_excess)
90
return (None, FailedSmartServerResponse(('BadSearch',)))
92
def recreate_search_from_recipe(self, repository, lines,
93
discard_excess=False):
94
"""Recreate a specific revision search (vs a from-tip search).
96
:param discard_excess: If True, and the search refers to data we don't
97
have, just silently accept that fact - the verb calling
98
recreate_search trusts that clients will look for missing things
99
they expected and get it from elsewhere.
68
101
start_keys = set(lines[0].split(' '))
69
102
exclude_keys = set(lines[1].split(' '))
70
103
revision_count = int(lines[2])
157
202
queried_revs.update(next_revs)
158
203
parent_map = repo_graph.get_parent_map(next_revs)
204
current_revs = next_revs
159
205
next_revs = set()
160
for revision_id, parents in parent_map.iteritems():
161
# adjust for the wire
162
if parents == (_mod_revision.NULL_REVISION,):
164
# prepare the next query
165
next_revs.update(parents)
166
if revision_id not in client_seen_revs:
206
for revision_id in current_revs:
208
parents = parent_map.get(revision_id)
209
if parents is not None:
210
# adjust for the wire
211
if parents == (_mod_revision.NULL_REVISION,):
213
# prepare the next query
214
next_revs.update(parents)
215
encoded_id = revision_id
218
encoded_id = "missing:" + revision_id
220
if (revision_id not in client_seen_revs and
221
(not missing_rev or include_missing)):
167
222
# Client does not have this revision, give it to it.
168
223
# add parents to the result
169
result[revision_id] = parents
224
result[encoded_id] = parents
170
225
# Approximate the serialized cost of this revision_id.
171
size_so_far += 2 + len(revision_id) + sum(map(len, parents))
226
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
172
227
# get all the directly asked for parents, and then flesh out to
173
228
# 64K (compressed) or so. We do one level of depth at a time to
174
229
# stay in sync with the client. The 250000 magic number is
175
230
# estimated compression ratio taken from bzr.dev itself.
176
if first_loop_done and size_so_far > 250000:
231
if self.no_extra_results or (
232
first_loop_done and size_so_far > 250000):
177
233
next_revs = set()
179
235
# don't query things we've already queried
227
283
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
230
class SmartServerRepositoryGraphHeads(SmartServerRepositoryRequest):
286
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
232
def do_repository_request(self, repository, *keys):
233
repository.lock_read()
288
def do_readlocked_repository_request(self, repository, revno,
290
"""Find the revid for a given revno, given a known revno/revid pair.
235
graph = repository.get_graph()
236
heads = tuple(graph.heads(keys))
239
return SuccessfulSmartServerResponse(heads)
295
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
296
except errors.RevisionNotPresent, err:
297
if err.revision_id != known_pair[1]:
298
raise AssertionError(
299
'get_rev_id_for_revno raised RevisionNotPresent for '
300
'non-initial revision: ' + err.revision_id)
301
return FailedSmartServerResponse(
302
('nosuchrevision', err.revision_id))
304
return SuccessfulSmartServerResponse(('ok', result))
306
earliest_revno, earliest_revid = result
307
return SuccessfulSmartServerResponse(
308
('history-incomplete', earliest_revno, earliest_revid))
242
311
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
336
404
return SuccessfulSmartServerResponse(('ok', token))
407
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
409
def do_repository_request(self, repository, to_network_name):
410
"""Get a stream for inserting into a to_format repository.
412
:param repository: The repository to stream from.
413
:param to_network_name: The network name of the format of the target
416
self._to_format = network_format_registry.get(to_network_name)
417
return None # Signal that we want a body.
419
def do_body(self, body_bytes):
420
repository = self._repository
421
repository.lock_read()
423
search_result, error = self.recreate_search(repository, body_bytes,
425
if error is not None:
428
source = repository._get_source(self._to_format)
429
stream = source.get_stream(search_result)
431
exc_info = sys.exc_info()
433
# On non-error, unlocking is done by the body stream handler.
436
raise exc_info[0], exc_info[1], exc_info[2]
437
return SuccessfulSmartServerResponse(('ok',),
438
body_stream=self.body_stream(stream, repository))
440
def body_stream(self, stream, repository):
441
byte_stream = _stream_to_byte_stream(stream, repository._format)
443
for bytes in byte_stream:
445
except errors.RevisionNotPresent, e:
446
# This shouldn't be able to happen, but as we don't buffer
447
# everything it can in theory happen.
449
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
454
def _stream_to_byte_stream(stream, src_format):
455
"""Convert a record stream to a self delimited byte stream."""
456
pack_writer = pack.ContainerSerialiser()
457
yield pack_writer.begin()
458
yield pack_writer.bytes_record(src_format.network_name(), '')
459
for substream_type, substream in stream:
460
for record in substream:
461
if record.storage_kind in ('chunked', 'fulltext'):
462
serialised = record_to_fulltext_bytes(record)
463
elif record.storage_kind == 'absent':
464
raise ValueError("Absent factory for %s" % (record.key,))
466
serialised = record.get_bytes_as(record.storage_kind)
468
# Some streams embed the whole stream into the wire
469
# representation of the first record, which means that
470
# later records have no wire representation: we skip them.
471
yield pack_writer.bytes_record(serialised, [(substream_type,)])
472
yield pack_writer.end()
475
def _byte_stream_to_stream(byte_stream):
476
"""Convert a byte stream into a format and a stream.
478
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
479
:return: (RepositoryFormat, stream_generator)
481
stream_decoder = pack.ContainerPushParser()
483
"""Closure to return the substreams."""
484
# May have fully parsed records already.
485
for record in stream_decoder.read_pending_records():
486
record_names, record_bytes = record
487
record_name, = record_names
488
substream_type = record_name[0]
489
substream = NetworkRecordStream([record_bytes])
490
yield substream_type, substream.read()
491
for bytes in byte_stream:
492
stream_decoder.accept_bytes(bytes)
493
for record in stream_decoder.read_pending_records():
494
record_names, record_bytes = record
495
record_name, = record_names
496
substream_type = record_name[0]
497
substream = NetworkRecordStream([record_bytes])
498
yield substream_type, substream.read()
499
for bytes in byte_stream:
500
stream_decoder.accept_bytes(bytes)
501
for record in stream_decoder.read_pending_records(max=1):
502
record_names, src_format_name = record
503
src_format = network_format_registry.get(src_format_name)
504
return src_format, record_stream()
339
507
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
341
509
def do_repository_request(self, repository, token):
348
516
return SuccessfulSmartServerResponse(('ok',))
519
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
521
def do_repository_request(self, repository, str_bool_new_value):
522
if str_bool_new_value == 'True':
526
repository.set_make_working_trees(new_value)
527
return SuccessfulSmartServerResponse(('ok',))
351
530
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
352
531
"""Get the raw repository files as a tarball.
354
533
The returned tarball contains a .bzr control directory which in turn
355
534
contains a repository.
357
This takes one parameter, compression, which currently must be
536
This takes one parameter, compression, which currently must be
358
537
"", "gz", or "bz2".
360
539
This is used to implement the Repository.copy_content_into operation.
363
542
def do_repository_request(self, repository, compression):
364
from bzrlib import osutils
365
repo_transport = repository.control_files._transport
366
543
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
368
545
controldir_name = tmp_dirname + '/.bzr'
411
class SmartServerRepositoryStreamKnitDataForRevisions(SmartServerRepositoryRequest):
412
"""Bzr <= 1.1 streaming pull, buffers all data on server."""
414
def do_repository_request(self, repository, *revision_ids):
415
repository.lock_read()
417
return self._do_repository_request(repository, revision_ids)
421
def _do_repository_request(self, repository, revision_ids):
422
stream = repository.get_data_stream_for_search(
423
repository.revision_ids_to_search_result(set(revision_ids)))
425
pack = ContainerSerialiser()
426
buffer.write(pack.begin())
428
for name_tuple, bytes in stream:
429
buffer.write(pack.bytes_record(bytes, [name_tuple]))
430
except errors.RevisionNotPresent, e:
431
return FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
432
buffer.write(pack.end())
433
return SuccessfulSmartServerResponse(('ok',), buffer.getvalue())
436
class SmartServerRepositoryStreamRevisionsChunked(SmartServerRepositoryRequest):
437
"""Bzr 1.1+ streaming pull."""
439
def do_body(self, body_bytes):
440
repository = self._repository
441
repository.lock_read()
443
search, error = self.recreate_search(repository, body_bytes)
444
if error is not None:
447
stream = repository.get_data_stream_for_search(search.get_result())
449
# On non-error, unlocking is done by the body stream handler.
452
return SuccessfulSmartServerResponse(('ok',),
453
body_stream=self.body_stream(stream, repository))
455
def body_stream(self, stream, repository):
456
pack = ContainerSerialiser()
460
for name_tuple, bytes in stream:
461
yield pack.bytes_record(bytes, [name_tuple])
463
# Undo the lock_read that that happens once the iterator from
464
# get_data_stream is started.
467
except errors.RevisionNotPresent, e:
468
# This shouldn't be able to happen, but as we don't buffer
469
# everything it can in theory happen.
471
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
588
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
589
"""Insert a record stream from a RemoteSink into a repository.
591
This gets bytes pushed to it by the network infrastructure and turns that
592
into a bytes iterator using a thread. That is then processed by
593
_byte_stream_to_stream.
598
def do_repository_request(self, repository, resume_tokens, lock_token):
599
"""StreamSink.insert_stream for a remote repository."""
600
repository.lock_write(token=lock_token)
601
self.do_insert_stream_request(repository, resume_tokens)
603
def do_insert_stream_request(self, repository, resume_tokens):
604
tokens = [token for token in resume_tokens.split(' ') if token]
606
self.repository = repository
607
self.queue = Queue.Queue()
608
self.insert_thread = threading.Thread(target=self._inserter_thread)
609
self.insert_thread.start()
611
def do_chunk(self, body_stream_chunk):
612
self.queue.put(body_stream_chunk)
614
def _inserter_thread(self):
616
src_format, stream = _byte_stream_to_stream(
617
self.blocking_byte_stream())
618
self.insert_result = self.repository._get_sink().insert_stream(
619
stream, src_format, self.tokens)
620
self.insert_ok = True
622
self.insert_exception = sys.exc_info()
623
self.insert_ok = False
625
def blocking_byte_stream(self):
627
bytes = self.queue.get()
628
if bytes is StopIteration:
634
self.queue.put(StopIteration)
635
if self.insert_thread is not None:
636
self.insert_thread.join()
637
if not self.insert_ok:
638
exc_info = self.insert_exception
639
raise exc_info[0], exc_info[1], exc_info[2]
640
write_group_tokens, missing_keys = self.insert_result
641
if write_group_tokens or missing_keys:
642
# bzip needed? missing keys should typically be a small set.
643
# Should this be a streaming body response ?
644
missing_keys = sorted(missing_keys)
645
bytes = bencode.bencode((write_group_tokens, missing_keys))
646
self.repository.unlock()
647
return SuccessfulSmartServerResponse(('missing-basis', bytes))
649
self.repository.unlock()
650
return SuccessfulSmartServerResponse(('ok', ))
653
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
654
"""Insert a record stream from a RemoteSink into an unlocked repository.
656
This is the same as SmartServerRepositoryInsertStreamLocked, except it
657
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
658
like pack format) repository.
663
def do_repository_request(self, repository, resume_tokens):
664
"""StreamSink.insert_stream for a remote repository."""
665
repository.lock_write()
666
self.do_insert_stream_request(repository, resume_tokens)