1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implmentations."""
30
estimate_compressed_size,
37
from bzrlib.bzrdir import BzrDir
38
from bzrlib.smart.request import (
39
FailedSmartServerResponse,
41
SuccessfulSmartServerResponse,
43
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
44
from bzrlib import revision as _mod_revision
45
from bzrlib.versionedfile import (
47
record_to_fulltext_bytes,
51
class SmartServerRepositoryRequest(SmartServerRequest):
52
"""Common base class for Repository requests."""
54
def do(self, path, *args):
55
"""Execute a repository request.
57
All Repository requests take a path to the repository as their first
58
argument. The repository must be at the exact path given by the
59
client - no searching is done.
61
The actual logic is delegated to self.do_repository_request.
63
:param client_path: The path for the repository as received from the
65
:return: A SmartServerResponse from self.do_repository_request().
67
transport = self.transport_from_client_path(path)
68
bzrdir = BzrDir.open_from_transport(transport)
69
# Save the repository for use with do_body.
70
self._repository = bzrdir.open_repository()
71
return self.do_repository_request(self._repository, *args)
73
def do_repository_request(self, repository, *args):
74
"""Override to provide an implementation for a verb."""
75
# No-op for verbs that take bodies (None as a result indicates a body
79
def recreate_search(self, repository, search_bytes, discard_excess=False):
80
"""Recreate a search from its serialised form.
82
:param discard_excess: If True, and the search refers to data we don't
83
have, just silently accept that fact - the verb calling
84
recreate_search trusts that clients will look for missing things
85
they expected and get it from elsewhere.
87
if search_bytes == 'everything':
88
return vf_search.EverythingResult(repository), None
89
lines = search_bytes.split('\n')
90
if lines[0] == 'ancestry-of':
92
search_result = vf_search.PendingAncestryResult(heads, repository)
93
return search_result, None
94
elif lines[0] == 'search':
95
return self.recreate_search_from_recipe(repository, lines[1:],
96
discard_excess=discard_excess)
98
return (None, FailedSmartServerResponse(('BadSearch',)))
100
def recreate_search_from_recipe(self, repository, lines,
101
discard_excess=False):
102
"""Recreate a specific revision search (vs a from-tip search).
104
:param discard_excess: If True, and the search refers to data we don't
105
have, just silently accept that fact - the verb calling
106
recreate_search trusts that clients will look for missing things
107
they expected and get it from elsewhere.
109
start_keys = set(lines[0].split(' '))
110
exclude_keys = set(lines[1].split(' '))
111
revision_count = int(lines[2])
112
repository.lock_read()
114
search = repository.get_graph()._make_breadth_first_searcher(
118
next_revs = search.next()
119
except StopIteration:
121
search.stop_searching_any(exclude_keys.intersection(next_revs))
122
(started_keys, excludes, included_keys) = search.get_state()
123
if (not discard_excess and len(included_keys) != revision_count):
124
# we got back a different amount of data than expected, this
125
# gets reported as NoSuchRevision, because less revisions
126
# indicates missing revisions, and more should never happen as
127
# the excludes list considers ghosts and ensures that ghost
128
# filling races are not a problem.
129
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
130
search_result = vf_search.SearchResult(started_keys, excludes,
131
len(included_keys), included_keys)
132
return (search_result, None)
137
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
138
"""Calls self.do_readlocked_repository_request."""
140
def do_repository_request(self, repository, *args):
141
"""Read lock a repository for do_readlocked_repository_request."""
142
repository.lock_read()
144
return self.do_readlocked_repository_request(repository, *args)
149
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
150
"""Break a repository lock."""
152
def do_repository_request(self, repository):
153
repository.break_lock()
154
return SuccessfulSmartServerResponse(('ok', ))
159
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
160
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
162
no_extra_results = False
164
def do_repository_request(self, repository, *revision_ids):
165
"""Get parent details for some revisions.
167
All the parents for revision_ids are returned. Additionally up to 64KB
168
of additional parent data found by performing a breadth first search
169
from revision_ids is returned. The verb takes a body containing the
170
current search state, see do_body for details.
172
If 'include-missing:' is in revision_ids, ghosts encountered in the
173
graph traversal for getting parent data are included in the result with
174
a prefix of 'missing:'.
176
:param repository: The repository to query in.
177
:param revision_ids: The utf8 encoded revision_id to answer for.
179
self._revision_ids = revision_ids
180
return None # Signal that we want a body.
182
def do_body(self, body_bytes):
183
"""Process the current search state and perform the parent lookup.
185
:return: A smart server response where the body contains an utf8
186
encoded flattened list of the parents of the revisions (the same
187
format as Repository.get_revision_graph) which has been bz2
190
repository = self._repository
191
repository.lock_read()
193
return self._do_repository_request(body_bytes)
197
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
198
include_missing, max_size=65536):
201
estimator = estimate_compressed_size.ZLibEstimator(max_size)
202
next_revs = revision_ids
203
first_loop_done = False
205
queried_revs.update(next_revs)
206
parent_map = repo_graph.get_parent_map(next_revs)
207
current_revs = next_revs
209
for revision_id in current_revs:
211
parents = parent_map.get(revision_id)
212
if parents is not None:
213
# adjust for the wire
214
if parents == (_mod_revision.NULL_REVISION,):
216
# prepare the next query
217
next_revs.update(parents)
218
encoded_id = revision_id
221
encoded_id = "missing:" + revision_id
223
if (revision_id not in client_seen_revs and
224
(not missing_rev or include_missing)):
225
# Client does not have this revision, give it to it.
226
# add parents to the result
227
result[encoded_id] = parents
228
# Approximate the serialized cost of this revision_id.
229
line = '%s %s\n' % (encoded_id, ' '.join(parents))
230
estimator.add_content(line)
231
# get all the directly asked for parents, and then flesh out to
232
# 64K (compressed) or so. We do one level of depth at a time to
233
# stay in sync with the client. The 250000 magic number is
234
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (first_loop_done and estimator.full()):
236
trace.mutter('size: %d, z_size: %d'
237
% (estimator._uncompressed_size_added,
238
estimator._compressed_size_added))
241
# don't query things we've already queried
242
next_revs = next_revs.difference(queried_revs)
243
first_loop_done = True
246
def _do_repository_request(self, body_bytes):
247
repository = self._repository
248
revision_ids = set(self._revision_ids)
249
include_missing = 'include-missing:' in revision_ids
251
revision_ids.remove('include-missing:')
252
body_lines = body_bytes.split('\n')
253
search_result, error = self.recreate_search_from_recipe(
254
repository, body_lines)
255
if error is not None:
257
# TODO might be nice to start up the search again; but thats not
258
# written or tested yet.
259
client_seen_revs = set(search_result.get_keys())
260
# Always include the requested ids.
261
client_seen_revs.difference_update(revision_ids)
263
repo_graph = repository.get_graph()
264
result = self._expand_requested_revs(repo_graph, revision_ids,
265
client_seen_revs, include_missing)
267
# sorting trivially puts lexographically similar revision ids together.
270
for revision, parents in sorted(result.items()):
271
lines.append(' '.join((revision, ) + tuple(parents)))
273
return SuccessfulSmartServerResponse(
274
('ok', ), bz2.compress('\n'.join(lines)))
277
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
279
def do_readlocked_repository_request(self, repository, revision_id):
280
"""Return the result of repository.get_revision_graph(revision_id).
282
Deprecated as of bzr 1.4, but supported for older clients.
284
:param repository: The repository to query in.
285
:param revision_id: The utf8 encoded revision_id to get a graph from.
286
:return: A smart server response where the body contains an utf8
287
encoded flattened list of the revision graph.
293
graph = repository.get_graph()
295
search_ids = [revision_id]
297
search_ids = repository.all_revision_ids()
298
search = graph._make_breadth_first_searcher(search_ids)
299
transitive_ids = set()
300
map(transitive_ids.update, list(search))
301
parent_map = graph.get_parent_map(transitive_ids)
302
revision_graph = _strip_NULL_ghosts(parent_map)
303
if revision_id and revision_id not in revision_graph:
304
# Note that we return an empty body, rather than omitting the body.
305
# This way the client knows that it can always expect to find a body
306
# in the response for this method, even in the error case.
307
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
309
for revision, parents in revision_graph.items():
310
lines.append(' '.join((revision, ) + tuple(parents)))
312
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
315
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
317
def do_readlocked_repository_request(self, repository, revno,
319
"""Find the revid for a given revno, given a known revno/revid pair.
324
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
325
except errors.RevisionNotPresent, err:
326
if err.revision_id != known_pair[1]:
327
raise AssertionError(
328
'get_rev_id_for_revno raised RevisionNotPresent for '
329
'non-initial revision: ' + err.revision_id)
330
return FailedSmartServerResponse(
331
('nosuchrevision', err.revision_id))
333
return SuccessfulSmartServerResponse(('ok', result))
335
earliest_revno, earliest_revid = result
336
return SuccessfulSmartServerResponse(
337
('history-incomplete', earliest_revno, earliest_revid))
340
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
342
def do_repository_request(self, repository):
343
"""Return the serializer format for this repository.
347
:param repository: The repository to query
348
:return: A smart server response ('ok', FORMAT)
350
serializer = repository.get_serializer_format()
351
return SuccessfulSmartServerResponse(('ok', serializer))
354
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
356
def do_repository_request(self, repository, revision_id):
357
"""Return ok if a specific revision is in the repository at path.
359
:param repository: The repository to query in.
360
:param revision_id: The utf8 encoded revision_id to lookup.
361
:return: A smart server response of ('yes', ) if the revision is
362
present. ('no', ) if it is missing.
364
if repository.has_revision(revision_id):
365
return SuccessfulSmartServerResponse(('yes', ))
367
return SuccessfulSmartServerResponse(('no', ))
370
class SmartServerRequestHasSignatureForRevisionId(
371
SmartServerRepositoryRequest):
373
def do_repository_request(self, repository, revision_id):
374
"""Return ok if a signature is present for a revision.
376
Introduced in bzr 2.5.0.
378
:param repository: The repository to query in.
379
:param revision_id: The utf8 encoded revision_id to lookup.
380
:return: A smart server response of ('yes', ) if a
381
signature for the revision is present,
382
('no', ) if it is missing.
385
if repository.has_signature_for_revision_id(revision_id):
386
return SuccessfulSmartServerResponse(('yes', ))
388
return SuccessfulSmartServerResponse(('no', ))
389
except errors.NoSuchRevision:
390
return FailedSmartServerResponse(
391
('nosuchrevision', revision_id))
394
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
396
def do_repository_request(self, repository, revid, committers):
397
"""Return the result of repository.gather_stats().
399
:param repository: The repository to query in.
400
:param revid: utf8 encoded rev id or an empty string to indicate None
401
:param committers: 'yes' or 'no'.
403
:return: A SmartServerResponse ('ok',), a encoded body looking like
406
latestrev: 345.700 3600
409
But containing only fields returned by the gather_stats() call
412
decoded_revision_id = None
414
decoded_revision_id = revid
415
if committers == 'yes':
416
decoded_committers = True
418
decoded_committers = None
420
stats = repository.gather_stats(decoded_revision_id,
422
except errors.NoSuchRevision:
423
return FailedSmartServerResponse(('nosuchrevision', revid))
426
if stats.has_key('committers'):
427
body += 'committers: %d\n' % stats['committers']
428
if stats.has_key('firstrev'):
429
body += 'firstrev: %.3f %d\n' % stats['firstrev']
430
if stats.has_key('latestrev'):
431
body += 'latestrev: %.3f %d\n' % stats['latestrev']
432
if stats.has_key('revisions'):
433
body += 'revisions: %d\n' % stats['revisions']
434
if stats.has_key('size'):
435
body += 'size: %d\n' % stats['size']
437
return SuccessfulSmartServerResponse(('ok', ), body)
440
class SmartServerRepositoryGetRevisionSignatureText(
441
SmartServerRepositoryRequest):
442
"""Return the signature text of a revision.
447
def do_repository_request(self, repository, revision_id):
448
"""Return the result of repository.get_signature_text().
450
:param repository: The repository to query in.
451
:return: A smart server response of with the signature text as
455
text = repository.get_signature_text(revision_id)
456
except errors.NoSuchRevision, err:
457
return FailedSmartServerResponse(
458
('nosuchrevision', err.revision))
459
return SuccessfulSmartServerResponse(('ok', ), text)
462
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
464
def do_repository_request(self, repository):
465
"""Return the result of repository.is_shared().
467
:param repository: The repository to query in.
468
:return: A smart server response of ('yes', ) if the repository is
469
shared, and ('no', ) if it is not.
471
if repository.is_shared():
472
return SuccessfulSmartServerResponse(('yes', ))
474
return SuccessfulSmartServerResponse(('no', ))
477
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
479
def do_repository_request(self, repository):
480
"""Return the result of repository.make_working_trees().
482
Introduced in bzr 2.5.0.
484
:param repository: The repository to query in.
485
:return: A smart server response of ('yes', ) if the repository uses
486
working trees, and ('no', ) if it is not.
488
if repository.make_working_trees():
489
return SuccessfulSmartServerResponse(('yes', ))
491
return SuccessfulSmartServerResponse(('no', ))
494
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
496
def do_repository_request(self, repository, token=''):
497
# XXX: this probably should not have a token.
501
token = repository.lock_write(token=token).repository_token
502
except errors.LockContention, e:
503
return FailedSmartServerResponse(('LockContention',))
504
except errors.UnlockableTransport:
505
return FailedSmartServerResponse(('UnlockableTransport',))
506
except errors.LockFailed, e:
507
return FailedSmartServerResponse(('LockFailed',
508
str(e.lock), str(e.why)))
509
if token is not None:
510
repository.leave_lock_in_place()
514
return SuccessfulSmartServerResponse(('ok', token))
517
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
519
def do_repository_request(self, repository, to_network_name):
520
"""Get a stream for inserting into a to_format repository.
522
The request body is 'search_bytes', a description of the revisions
525
In 2.3 this verb added support for search_bytes == 'everything'. Older
526
implementations will respond with a BadSearch error, and clients should
527
catch this and fallback appropriately.
529
:param repository: The repository to stream from.
530
:param to_network_name: The network name of the format of the target
533
self._to_format = network_format_registry.get(to_network_name)
534
if self._should_fake_unknown():
535
return FailedSmartServerResponse(
536
('UnknownMethod', 'Repository.get_stream'))
537
return None # Signal that we want a body.
539
def _should_fake_unknown(self):
540
"""Return True if we should return UnknownMethod to the client.
542
This is a workaround for bugs in pre-1.19 clients that claim to
543
support receiving streams of CHK repositories. The pre-1.19 client
544
expects inventory records to be serialized in the format defined by
545
to_network_name, but in pre-1.19 (at least) that format definition
546
tries to use the xml5 serializer, which does not correctly handle
547
rich-roots. After 1.19 the client can also accept inventory-deltas
548
(which avoids this issue), and those clients will use the
549
Repository.get_stream_1.19 verb instead of this one.
550
So: if this repository is CHK, and the to_format doesn't match,
551
we should just fake an UnknownSmartMethod error so that the client
552
will fallback to VFS, rather than sending it a stream we know it
555
from_format = self._repository._format
556
to_format = self._to_format
557
if not from_format.supports_chks:
558
# Source not CHK: that's ok
560
if (to_format.supports_chks and
561
from_format.repository_class is to_format.repository_class and
562
from_format._serializer == to_format._serializer):
563
# Source is CHK, but target matches: that's ok
564
# (e.g. 2a->2a, or CHK2->2a)
566
# Source is CHK, and target is not CHK or incompatible CHK. We can't
567
# generate a compatible stream.
570
def do_body(self, body_bytes):
571
repository = self._repository
572
repository.lock_read()
574
search_result, error = self.recreate_search(repository, body_bytes,
576
if error is not None:
579
source = repository._get_source(self._to_format)
580
stream = source.get_stream(search_result)
582
exc_info = sys.exc_info()
584
# On non-error, unlocking is done by the body stream handler.
587
raise exc_info[0], exc_info[1], exc_info[2]
588
return SuccessfulSmartServerResponse(('ok',),
589
body_stream=self.body_stream(stream, repository))
591
def body_stream(self, stream, repository):
592
byte_stream = _stream_to_byte_stream(stream, repository._format)
594
for bytes in byte_stream:
596
except errors.RevisionNotPresent, e:
597
# This shouldn't be able to happen, but as we don't buffer
598
# everything it can in theory happen.
600
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
605
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
606
"""The same as Repository.get_stream, but will return stream CHK formats to
609
See SmartServerRepositoryGetStream._should_fake_unknown.
614
def _should_fake_unknown(self):
615
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
619
def _stream_to_byte_stream(stream, src_format):
620
"""Convert a record stream to a self delimited byte stream."""
621
pack_writer = pack.ContainerSerialiser()
622
yield pack_writer.begin()
623
yield pack_writer.bytes_record(src_format.network_name(), '')
624
for substream_type, substream in stream:
625
for record in substream:
626
if record.storage_kind in ('chunked', 'fulltext'):
627
serialised = record_to_fulltext_bytes(record)
628
elif record.storage_kind == 'absent':
629
raise ValueError("Absent factory for %s" % (record.key,))
631
serialised = record.get_bytes_as(record.storage_kind)
633
# Some streams embed the whole stream into the wire
634
# representation of the first record, which means that
635
# later records have no wire representation: we skip them.
636
yield pack_writer.bytes_record(serialised, [(substream_type,)])
637
yield pack_writer.end()
640
class _ByteStreamDecoder(object):
641
"""Helper for _byte_stream_to_stream.
643
The expected usage of this class is via the function _byte_stream_to_stream
644
which creates a _ByteStreamDecoder, pops off the stream format and then
645
yields the output of record_stream(), the main entry point to
648
Broadly this class has to unwrap two layers of iterators:
652
This is complicated by wishing to return type, iterator_for_type, but
653
getting the data for iterator_for_type when we find out type: we can't
654
simply pass a generator down to the NetworkRecordStream parser, instead
655
we have a little local state to seed each NetworkRecordStream instance,
656
and gather the type that we'll be yielding.
658
:ivar byte_stream: The byte stream being decoded.
659
:ivar stream_decoder: A pack parser used to decode the bytestream
660
:ivar current_type: The current type, used to join adjacent records of the
661
same type into a single stream.
662
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
665
def __init__(self, byte_stream, record_counter):
666
"""Create a _ByteStreamDecoder."""
667
self.stream_decoder = pack.ContainerPushParser()
668
self.current_type = None
669
self.first_bytes = None
670
self.byte_stream = byte_stream
671
self._record_counter = record_counter
674
def iter_stream_decoder(self):
675
"""Iterate the contents of the pack from stream_decoder."""
676
# dequeue pending items
677
for record in self.stream_decoder.read_pending_records():
679
# Pull bytes of the wire, decode them to records, yield those records.
680
for bytes in self.byte_stream:
681
self.stream_decoder.accept_bytes(bytes)
682
for record in self.stream_decoder.read_pending_records():
685
def iter_substream_bytes(self):
686
if self.first_bytes is not None:
687
yield self.first_bytes
688
# If we run out of pack records, single the outer layer to stop.
689
self.first_bytes = None
690
for record in self.iter_pack_records:
691
record_names, record_bytes = record
692
record_name, = record_names
693
substream_type = record_name[0]
694
if substream_type != self.current_type:
695
# end of a substream, seed the next substream.
696
self.current_type = substream_type
697
self.first_bytes = record_bytes
701
def record_stream(self):
702
"""Yield substream_type, substream from the byte stream."""
703
def wrap_and_count(pb, rc, substream):
704
"""Yield records from stream while showing progress."""
707
if self.current_type != 'revisions' and self.key_count != 0:
708
# As we know the number of revisions now (in self.key_count)
709
# we can setup and use record_counter (rc).
710
if not rc.is_initialized():
711
rc.setup(self.key_count, self.key_count)
712
for record in substream.read():
714
if rc.is_initialized() and counter == rc.STEP:
715
rc.increment(counter)
716
pb.update('Estimate', rc.current, rc.max)
718
if self.current_type == 'revisions':
719
# Total records is proportional to number of revs
720
# to fetch. With remote, we used self.key_count to
721
# track the number of revs. Once we have the revs
722
# counts in self.key_count, the progress bar changes
723
# from 'Estimating..' to 'Estimate' above.
725
if counter == rc.STEP:
726
pb.update('Estimating..', self.key_count)
732
pb = ui.ui_factory.nested_progress_bar()
733
rc = self._record_counter
734
# Make and consume sub generators, one per substream type:
735
while self.first_bytes is not None:
736
substream = NetworkRecordStream(self.iter_substream_bytes())
737
# after substream is fully consumed, self.current_type is set to
738
# the next type, and self.first_bytes is set to the matching bytes.
739
yield self.current_type, wrap_and_count(pb, rc, substream)
741
pb.update('Done', rc.max, rc.max)
744
def seed_state(self):
745
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
746
# Set a single generator we can use to get data from the pack stream.
747
self.iter_pack_records = self.iter_stream_decoder()
748
# Seed the very first subiterator with content; after this each one
750
list(self.iter_substream_bytes())
753
def _byte_stream_to_stream(byte_stream, record_counter=None):
754
"""Convert a byte stream into a format and a stream.
756
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
757
:return: (RepositoryFormat, stream_generator)
759
decoder = _ByteStreamDecoder(byte_stream, record_counter)
760
for bytes in byte_stream:
761
decoder.stream_decoder.accept_bytes(bytes)
762
for record in decoder.stream_decoder.read_pending_records(max=1):
763
record_names, src_format_name = record
764
src_format = network_format_registry.get(src_format_name)
765
return src_format, decoder.record_stream()
768
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
770
def do_repository_request(self, repository, token):
772
repository.lock_write(token=token)
773
except errors.TokenMismatch, e:
774
return FailedSmartServerResponse(('TokenMismatch',))
775
repository.dont_leave_lock_in_place()
777
return SuccessfulSmartServerResponse(('ok',))
780
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
781
"""Get the physical lock status for a repository.
786
def do_repository_request(self, repository):
787
if repository.get_physical_lock_status():
788
return SuccessfulSmartServerResponse(('yes', ))
790
return SuccessfulSmartServerResponse(('no', ))
793
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
795
def do_repository_request(self, repository, str_bool_new_value):
796
if str_bool_new_value == 'True':
800
repository.set_make_working_trees(new_value)
801
return SuccessfulSmartServerResponse(('ok',))
804
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
805
"""Get the raw repository files as a tarball.
807
The returned tarball contains a .bzr control directory which in turn
808
contains a repository.
810
This takes one parameter, compression, which currently must be
813
This is used to implement the Repository.copy_content_into operation.
816
def do_repository_request(self, repository, compression):
817
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
819
controldir_name = tmp_dirname + '/.bzr'
820
return self._tarfile_response(controldir_name, compression)
822
osutils.rmtree(tmp_dirname)
824
def _copy_to_tempdir(self, from_repo):
825
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
826
tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
827
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
828
from_repo.copy_content_into(tmp_repo)
829
return tmp_dirname, tmp_repo
831
def _tarfile_response(self, tmp_dirname, compression):
832
temp = tempfile.NamedTemporaryFile()
834
self._tarball_of_dir(tmp_dirname, compression, temp.file)
835
# all finished; write the tempfile out to the network
837
return SuccessfulSmartServerResponse(('ok',), temp.read())
838
# FIXME: Don't read the whole thing into memory here; rather stream
839
# it out from the file onto the network. mbp 20070411
843
def _tarball_of_dir(self, dirname, compression, ofile):
845
filename = os.path.basename(ofile.name)
846
tarball = tarfile.open(fileobj=ofile, name=filename,
847
mode='w|' + compression)
849
# The tarball module only accepts ascii names, and (i guess)
850
# packs them with their 8bit names. We know all the files
851
# within the repository have ASCII names so the should be safe
853
dirname = dirname.encode(sys.getfilesystemencoding())
854
# python's tarball module includes the whole path by default so
856
if not dirname.endswith('.bzr'):
857
raise ValueError(dirname)
858
tarball.add(dirname, '.bzr') # recursive by default
863
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
864
"""Insert a record stream from a RemoteSink into a repository.
866
This gets bytes pushed to it by the network infrastructure and turns that
867
into a bytes iterator using a thread. That is then processed by
868
_byte_stream_to_stream.
873
def do_repository_request(self, repository, resume_tokens, lock_token):
874
"""StreamSink.insert_stream for a remote repository."""
875
repository.lock_write(token=lock_token)
876
self.do_insert_stream_request(repository, resume_tokens)
878
def do_insert_stream_request(self, repository, resume_tokens):
879
tokens = [token for token in resume_tokens.split(' ') if token]
881
self.repository = repository
882
self.queue = Queue.Queue()
883
self.insert_thread = threading.Thread(target=self._inserter_thread)
884
self.insert_thread.start()
886
def do_chunk(self, body_stream_chunk):
887
self.queue.put(body_stream_chunk)
889
def _inserter_thread(self):
891
src_format, stream = _byte_stream_to_stream(
892
self.blocking_byte_stream())
893
self.insert_result = self.repository._get_sink().insert_stream(
894
stream, src_format, self.tokens)
895
self.insert_ok = True
897
self.insert_exception = sys.exc_info()
898
self.insert_ok = False
900
def blocking_byte_stream(self):
902
bytes = self.queue.get()
903
if bytes is StopIteration:
909
self.queue.put(StopIteration)
910
if self.insert_thread is not None:
911
self.insert_thread.join()
912
if not self.insert_ok:
913
exc_info = self.insert_exception
914
raise exc_info[0], exc_info[1], exc_info[2]
915
write_group_tokens, missing_keys = self.insert_result
916
if write_group_tokens or missing_keys:
917
# bzip needed? missing keys should typically be a small set.
918
# Should this be a streaming body response ?
919
missing_keys = sorted(missing_keys)
920
bytes = bencode.bencode((write_group_tokens, missing_keys))
921
self.repository.unlock()
922
return SuccessfulSmartServerResponse(('missing-basis', bytes))
924
self.repository.unlock()
925
return SuccessfulSmartServerResponse(('ok', ))
928
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
929
"""Insert a record stream from a RemoteSink into a repository.
931
Same as SmartServerRepositoryInsertStreamLocked, except:
932
- the lock token argument is optional
933
- servers that implement this verb accept 'inventory-delta' records in the
939
def do_repository_request(self, repository, resume_tokens, lock_token=None):
940
"""StreamSink.insert_stream for a remote repository."""
941
SmartServerRepositoryInsertStreamLocked.do_repository_request(
942
self, repository, resume_tokens, lock_token)
945
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
946
"""Insert a record stream from a RemoteSink into an unlocked repository.
948
This is the same as SmartServerRepositoryInsertStreamLocked, except it
949
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
950
like pack format) repository.
955
def do_repository_request(self, repository, resume_tokens):
956
"""StreamSink.insert_stream for a remote repository."""
957
repository.lock_write()
958
self.do_insert_stream_request(repository, resume_tokens)
961
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
962
"""Add a revision signature text.
967
def do_repository_request(self, repository, lock_token, revision_id,
968
*write_group_tokens):
969
"""Add a revision signature text.
971
:param repository: Repository to operate on
972
:param lock_token: Lock token
973
:param revision_id: Revision for which to add signature
974
:param write_group_tokens: Write group tokens
976
self._lock_token = lock_token
977
self._revision_id = revision_id
978
self._write_group_tokens = write_group_tokens
981
def do_body(self, body_bytes):
982
"""Add a signature text.
984
:param body_bytes: GPG signature text
985
:return: SuccessfulSmartServerResponse with arguments 'ok' and
986
the list of new write group tokens.
988
self._repository.lock_write(token=self._lock_token)
990
self._repository.resume_write_group(self._write_group_tokens)
992
self._repository.add_signature_text(self._revision_id,
995
new_write_group_tokens = self._repository.suspend_write_group()
997
self._repository.unlock()
998
return SuccessfulSmartServerResponse(
999
('ok', ) + tuple(new_write_group_tokens))
1002
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1003
"""Start a write group.
1008
def do_repository_request(self, repository, lock_token):
1009
"""Start a write group."""
1010
repository.lock_write(token=lock_token)
1012
repository.start_write_group()
1014
tokens = repository.suspend_write_group()
1015
except errors.UnsuspendableWriteGroup:
1016
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1019
return SuccessfulSmartServerResponse(('ok', tokens))
1022
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1023
"""Commit a write group.
1028
def do_repository_request(self, repository, lock_token,
1029
write_group_tokens):
1030
"""Commit a write group."""
1031
repository.lock_write(token=lock_token)
1034
repository.resume_write_group(write_group_tokens)
1035
except errors.UnresumableWriteGroup, e:
1036
return FailedSmartServerResponse(
1037
('UnresumableWriteGroup', e.write_groups, e.reason))
1039
repository.commit_write_group()
1041
write_group_tokens = repository.suspend_write_group()
1042
# FIXME JRV 2011-11-19: What if the write_group_tokens
1047
return SuccessfulSmartServerResponse(('ok', ))
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1051
"""Abort a write group.
1056
def do_repository_request(self, repository, lock_token, write_group_tokens):
1057
"""Abort a write group."""
1058
repository.lock_write(token=lock_token)
1061
repository.resume_write_group(write_group_tokens)
1062
except errors.UnresumableWriteGroup, e:
1063
return FailedSmartServerResponse(
1064
('UnresumableWriteGroup', e.write_groups, e.reason))
1065
repository.abort_write_group()
1068
return SuccessfulSmartServerResponse(('ok', ))
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1072
"""Check that a write group is still valid.
1077
def do_repository_request(self, repository, lock_token, write_group_tokens):
1078
"""Abort a write group."""
1079
repository.lock_write(token=lock_token)
1082
repository.resume_write_group(write_group_tokens)
1083
except errors.UnresumableWriteGroup, e:
1084
return FailedSmartServerResponse(
1085
('UnresumableWriteGroup', e.write_groups, e.reason))
1087
repository.suspend_write_group()
1090
return SuccessfulSmartServerResponse(('ok', ))
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1094
"""Retrieve all of the revision ids in a repository.
1099
def do_repository_request(self, repository):
1100
revids = repository.all_revision_ids()
1101
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1104
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1105
"""Pack a repository.
1110
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1111
self._repository = repository
1112
self._lock_token = lock_token
1113
if clean_obsolete_packs == 'True':
1114
self._clean_obsolete_packs = True
1116
self._clean_obsolete_packs = False
1119
def do_body(self, body_bytes):
1120
if body_bytes == "":
1123
hint = body_bytes.splitlines()
1124
self._repository.lock_write(token=self._lock_token)
1126
self._repository.pack(hint, self._clean_obsolete_packs)
1128
self._repository.unlock()
1129
return SuccessfulSmartServerResponse(("ok", ), )
1132
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1133
"""Iterate over the contents of files.
1135
The client sends a list of desired files to stream, one
1136
per line, and as tuples of file id and revision, separated by
1139
The server replies with a stream. Each entry is preceded by a header,
1140
which can either be:
1142
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1143
list sent by the client. This header is followed by the contents of
1144
the file, bzip2-compressed.
1145
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1146
The client can then raise an appropriate RevisionNotPresent error
1147
or check its fallback repositories.
1152
def body_stream(self, repository, desired_files):
1153
self._repository.lock_read()
1156
for i, key in enumerate(desired_files):
1158
for record in repository.texts.get_record_stream(text_keys,
1160
identifier = text_keys[record.key]
1161
if record.storage_kind == 'absent':
1162
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1163
record.key[1], identifier)
1164
# FIXME: Way to abort early?
1166
yield "ok\0%d\n" % identifier
1167
compressor = zlib.compressobj()
1168
for bytes in record.get_bytes_as('chunked'):
1169
data = compressor.compress(bytes)
1172
data = compressor.flush()
1176
self._repository.unlock()
1178
def do_body(self, body_bytes):
1180
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1181
return SuccessfulSmartServerResponse(('ok', ),
1182
body_stream=self.body_stream(self._repository, desired_files))
1184
def do_repository_request(self, repository):
1185
# Signal that we want a body
1189
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1190
"""Stream a list of revisions.
1192
The client sends a list of newline-separated revision ids in the
1193
body of the request and the server replies with the serializer format,
1194
and a stream of bzip2-compressed revision texts (using the specified
1197
Any revisions the server does not have are omitted from the stream.
1202
def do_repository_request(self, repository):
1203
self._repository = repository
1204
# Signal there is a body
1207
def do_body(self, body_bytes):
1208
revision_ids = body_bytes.split("\n")
1209
return SuccessfulSmartServerResponse(
1210
('ok', self._repository.get_serializer_format()),
1211
body_stream=self.body_stream(self._repository, revision_ids))
1213
def body_stream(self, repository, revision_ids):
1214
self._repository.lock_read()
1216
for record in repository.revisions.get_record_stream(
1217
[(revid,) for revid in revision_ids], 'unordered', True):
1218
if record.storage_kind == 'absent':
1220
yield zlib.compress(record.get_bytes_as('fulltext'))
1222
self._repository.unlock()