1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implmentations."""
30
estimate_compressed_size,
37
from bzrlib.bzrdir import BzrDir
38
from bzrlib.smart.request import (
39
FailedSmartServerResponse,
41
SuccessfulSmartServerResponse,
43
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
44
from bzrlib import revision as _mod_revision
45
from bzrlib.versionedfile import (
47
record_to_fulltext_bytes,
51
class SmartServerRepositoryRequest(SmartServerRequest):
52
"""Common base class for Repository requests."""
54
def do(self, path, *args):
55
"""Execute a repository request.
57
All Repository requests take a path to the repository as their first
58
argument. The repository must be at the exact path given by the
59
client - no searching is done.
61
The actual logic is delegated to self.do_repository_request.
63
:param client_path: The path for the repository as received from the
65
:return: A SmartServerResponse from self.do_repository_request().
67
transport = self.transport_from_client_path(path)
68
bzrdir = BzrDir.open_from_transport(transport)
69
# Save the repository for use with do_body.
70
self._repository = bzrdir.open_repository()
71
return self.do_repository_request(self._repository, *args)
73
def do_repository_request(self, repository, *args):
74
"""Override to provide an implementation for a verb."""
75
# No-op for verbs that take bodies (None as a result indicates a body
79
def recreate_search(self, repository, search_bytes, discard_excess=False):
80
"""Recreate a search from its serialised form.
82
:param discard_excess: If True, and the search refers to data we don't
83
have, just silently accept that fact - the verb calling
84
recreate_search trusts that clients will look for missing things
85
they expected and get it from elsewhere.
87
if search_bytes == 'everything':
88
return graph.EverythingResult(repository), None
89
lines = search_bytes.split('\n')
90
if lines[0] == 'ancestry-of':
92
search_result = graph.PendingAncestryResult(heads, repository)
93
return search_result, None
94
elif lines[0] == 'search':
95
return self.recreate_search_from_recipe(repository, lines[1:],
96
discard_excess=discard_excess)
98
return (None, FailedSmartServerResponse(('BadSearch',)))
100
def recreate_search_from_recipe(self, repository, lines,
101
discard_excess=False):
102
"""Recreate a specific revision search (vs a from-tip search).
104
:param discard_excess: If True, and the search refers to data we don't
105
have, just silently accept that fact - the verb calling
106
recreate_search trusts that clients will look for missing things
107
they expected and get it from elsewhere.
109
start_keys = set(lines[0].split(' '))
110
exclude_keys = set(lines[1].split(' '))
111
revision_count = int(lines[2])
112
repository.lock_read()
114
search = repository.get_graph()._make_breadth_first_searcher(
118
next_revs = search.next()
119
except StopIteration:
121
search.stop_searching_any(exclude_keys.intersection(next_revs))
122
search_result = search.get_result()
123
if (not discard_excess and
124
search_result.get_recipe()[3] != revision_count):
125
# we got back a different amount of data than expected, this
126
# gets reported as NoSuchRevision, because less revisions
127
# indicates missing revisions, and more should never happen as
128
# the excludes list considers ghosts and ensures that ghost
129
# filling races are not a problem.
130
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
131
return (search_result, None)
136
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
137
"""Calls self.do_readlocked_repository_request."""
139
def do_repository_request(self, repository, *args):
140
"""Read lock a repository for do_readlocked_repository_request."""
141
repository.lock_read()
143
return self.do_readlocked_repository_request(repository, *args)
148
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
149
"""Break a repository lock."""
151
def do_repository_request(self, repository):
152
repository.break_lock()
153
return SuccessfulSmartServerResponse(('ok', ))
158
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
159
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
161
no_extra_results = False
163
def do_repository_request(self, repository, *revision_ids):
164
"""Get parent details for some revisions.
166
All the parents for revision_ids are returned. Additionally up to 64KB
167
of additional parent data found by performing a breadth first search
168
from revision_ids is returned. The verb takes a body containing the
169
current search state, see do_body for details.
171
If 'include-missing:' is in revision_ids, ghosts encountered in the
172
graph traversal for getting parent data are included in the result with
173
a prefix of 'missing:'.
175
:param repository: The repository to query in.
176
:param revision_ids: The utf8 encoded revision_id to answer for.
178
self._revision_ids = revision_ids
179
return None # Signal that we want a body.
181
def do_body(self, body_bytes):
182
"""Process the current search state and perform the parent lookup.
184
:return: A smart server response where the body contains an utf8
185
encoded flattened list of the parents of the revisions (the same
186
format as Repository.get_revision_graph) which has been bz2
189
repository = self._repository
190
repository.lock_read()
192
return self._do_repository_request(body_bytes)
196
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
197
include_missing, max_size=65536):
200
estimator = estimate_compressed_size.ZLibEstimator(max_size)
201
next_revs = revision_ids
202
first_loop_done = False
204
queried_revs.update(next_revs)
205
parent_map = repo_graph.get_parent_map(next_revs)
206
current_revs = next_revs
208
for revision_id in current_revs:
210
parents = parent_map.get(revision_id)
211
if parents is not None:
212
# adjust for the wire
213
if parents == (_mod_revision.NULL_REVISION,):
215
# prepare the next query
216
next_revs.update(parents)
217
encoded_id = revision_id
220
encoded_id = "missing:" + revision_id
222
if (revision_id not in client_seen_revs and
223
(not missing_rev or include_missing)):
224
# Client does not have this revision, give it to it.
225
# add parents to the result
226
result[encoded_id] = parents
227
# Approximate the serialized cost of this revision_id.
228
line = '%s %s\n' % (encoded_id, ' '.join(parents))
229
estimator.add_content(line)
230
# get all the directly asked for parents, and then flesh out to
231
# 64K (compressed) or so. We do one level of depth at a time to
232
# stay in sync with the client. The 250000 magic number is
233
# estimated compression ratio taken from bzr.dev itself.
234
if self.no_extra_results or (first_loop_done and estimator.full()):
235
trace.mutter('size: %d, z_size: %d'
236
% (estimator._uncompressed_size_added,
237
estimator._compressed_size_added))
240
# don't query things we've already queried
241
next_revs = next_revs.difference(queried_revs)
242
first_loop_done = True
245
def _do_repository_request(self, body_bytes):
246
repository = self._repository
247
revision_ids = set(self._revision_ids)
248
include_missing = 'include-missing:' in revision_ids
250
revision_ids.remove('include-missing:')
251
body_lines = body_bytes.split('\n')
252
search_result, error = self.recreate_search_from_recipe(
253
repository, body_lines)
254
if error is not None:
256
# TODO might be nice to start up the search again; but thats not
257
# written or tested yet.
258
client_seen_revs = set(search_result.get_keys())
259
# Always include the requested ids.
260
client_seen_revs.difference_update(revision_ids)
262
repo_graph = repository.get_graph()
263
result = self._expand_requested_revs(repo_graph, revision_ids,
264
client_seen_revs, include_missing)
266
# sorting trivially puts lexographically similar revision ids together.
269
for revision, parents in sorted(result.items()):
270
lines.append(' '.join((revision, ) + tuple(parents)))
272
return SuccessfulSmartServerResponse(
273
('ok', ), bz2.compress('\n'.join(lines)))
276
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
278
def do_readlocked_repository_request(self, repository, revision_id):
279
"""Return the result of repository.get_revision_graph(revision_id).
281
Deprecated as of bzr 1.4, but supported for older clients.
283
:param repository: The repository to query in.
284
:param revision_id: The utf8 encoded revision_id to get a graph from.
285
:return: A smart server response where the body contains an utf8
286
encoded flattened list of the revision graph.
292
graph = repository.get_graph()
294
search_ids = [revision_id]
296
search_ids = repository.all_revision_ids()
297
search = graph._make_breadth_first_searcher(search_ids)
298
transitive_ids = set()
299
map(transitive_ids.update, list(search))
300
parent_map = graph.get_parent_map(transitive_ids)
301
revision_graph = _strip_NULL_ghosts(parent_map)
302
if revision_id and revision_id not in revision_graph:
303
# Note that we return an empty body, rather than omitting the body.
304
# This way the client knows that it can always expect to find a body
305
# in the response for this method, even in the error case.
306
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
308
for revision, parents in revision_graph.items():
309
lines.append(' '.join((revision, ) + tuple(parents)))
311
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
314
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
316
def do_readlocked_repository_request(self, repository, revno,
318
"""Find the revid for a given revno, given a known revno/revid pair.
323
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
324
except errors.RevisionNotPresent, err:
325
if err.revision_id != known_pair[1]:
326
raise AssertionError(
327
'get_rev_id_for_revno raised RevisionNotPresent for '
328
'non-initial revision: ' + err.revision_id)
329
return FailedSmartServerResponse(
330
('nosuchrevision', err.revision_id))
332
return SuccessfulSmartServerResponse(('ok', result))
334
earliest_revno, earliest_revid = result
335
return SuccessfulSmartServerResponse(
336
('history-incomplete', earliest_revno, earliest_revid))
339
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
341
def do_repository_request(self, repository):
342
"""Return the serializer format for this repository.
346
:param repository: The repository to query
347
:return: A smart server response ('ok', FORMAT)
349
serializer = repository.get_serializer_format()
350
return SuccessfulSmartServerResponse(('ok', serializer))
353
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
355
def do_repository_request(self, repository, revision_id):
356
"""Return ok if a specific revision is in the repository at path.
358
:param repository: The repository to query in.
359
:param revision_id: The utf8 encoded revision_id to lookup.
360
:return: A smart server response of ('yes', ) if the revision is
361
present. ('no', ) if it is missing.
363
if repository.has_revision(revision_id):
364
return SuccessfulSmartServerResponse(('yes', ))
366
return SuccessfulSmartServerResponse(('no', ))
369
class SmartServerRequestHasSignatureForRevisionId(
370
SmartServerRepositoryRequest):
372
def do_repository_request(self, repository, revision_id):
373
"""Return ok if a signature is present for a revision.
375
Introduced in bzr 2.5.0.
377
:param repository: The repository to query in.
378
:param revision_id: The utf8 encoded revision_id to lookup.
379
:return: A smart server response of ('yes', ) if a
380
signature for the revision is present,
381
('no', ) if it is missing.
384
if repository.has_signature_for_revision_id(revision_id):
385
return SuccessfulSmartServerResponse(('yes', ))
387
return SuccessfulSmartServerResponse(('no', ))
388
except errors.NoSuchRevision:
389
return FailedSmartServerResponse(
390
('nosuchrevision', revision_id))
393
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
395
def do_repository_request(self, repository, revid, committers):
396
"""Return the result of repository.gather_stats().
398
:param repository: The repository to query in.
399
:param revid: utf8 encoded rev id or an empty string to indicate None
400
:param committers: 'yes' or 'no'.
402
:return: A SmartServerResponse ('ok',), a encoded body looking like
405
latestrev: 345.700 3600
408
But containing only fields returned by the gather_stats() call
411
decoded_revision_id = None
413
decoded_revision_id = revid
414
if committers == 'yes':
415
decoded_committers = True
417
decoded_committers = None
419
stats = repository.gather_stats(decoded_revision_id,
421
except errors.NoSuchRevision:
422
return FailedSmartServerResponse(('nosuchrevision', revid))
425
if stats.has_key('committers'):
426
body += 'committers: %d\n' % stats['committers']
427
if stats.has_key('firstrev'):
428
body += 'firstrev: %.3f %d\n' % stats['firstrev']
429
if stats.has_key('latestrev'):
430
body += 'latestrev: %.3f %d\n' % stats['latestrev']
431
if stats.has_key('revisions'):
432
body += 'revisions: %d\n' % stats['revisions']
433
if stats.has_key('size'):
434
body += 'size: %d\n' % stats['size']
436
return SuccessfulSmartServerResponse(('ok', ), body)
439
class SmartServerRepositoryGetRevisionSignatureText(
440
SmartServerRepositoryRequest):
441
"""Return the signature text of a revision.
446
def do_repository_request(self, repository, revision_id):
447
"""Return the result of repository.get_signature_text().
449
:param repository: The repository to query in.
450
:return: A smart server response of with the signature text as
454
text = repository.get_signature_text(revision_id)
455
except errors.NoSuchRevision, err:
456
return FailedSmartServerResponse(
457
('nosuchrevision', err.revision))
458
return SuccessfulSmartServerResponse(('ok', ), text)
461
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
463
def do_repository_request(self, repository):
464
"""Return the result of repository.is_shared().
466
:param repository: The repository to query in.
467
:return: A smart server response of ('yes', ) if the repository is
468
shared, and ('no', ) if it is not.
470
if repository.is_shared():
471
return SuccessfulSmartServerResponse(('yes', ))
473
return SuccessfulSmartServerResponse(('no', ))
476
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
478
def do_repository_request(self, repository):
479
"""Return the result of repository.make_working_trees().
481
Introduced in bzr 2.5.0.
483
:param repository: The repository to query in.
484
:return: A smart server response of ('yes', ) if the repository uses
485
working trees, and ('no', ) if it is not.
487
if repository.make_working_trees():
488
return SuccessfulSmartServerResponse(('yes', ))
490
return SuccessfulSmartServerResponse(('no', ))
493
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
495
def do_repository_request(self, repository, token=''):
496
# XXX: this probably should not have a token.
500
token = repository.lock_write(token=token).repository_token
501
except errors.LockContention, e:
502
return FailedSmartServerResponse(('LockContention',))
503
except errors.UnlockableTransport:
504
return FailedSmartServerResponse(('UnlockableTransport',))
505
except errors.LockFailed, e:
506
return FailedSmartServerResponse(('LockFailed',
507
str(e.lock), str(e.why)))
508
if token is not None:
509
repository.leave_lock_in_place()
513
return SuccessfulSmartServerResponse(('ok', token))
516
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
518
def do_repository_request(self, repository, to_network_name):
519
"""Get a stream for inserting into a to_format repository.
521
The request body is 'search_bytes', a description of the revisions
524
In 2.3 this verb added support for search_bytes == 'everything'. Older
525
implementations will respond with a BadSearch error, and clients should
526
catch this and fallback appropriately.
528
:param repository: The repository to stream from.
529
:param to_network_name: The network name of the format of the target
532
self._to_format = network_format_registry.get(to_network_name)
533
if self._should_fake_unknown():
534
return FailedSmartServerResponse(
535
('UnknownMethod', 'Repository.get_stream'))
536
return None # Signal that we want a body.
538
def _should_fake_unknown(self):
539
"""Return True if we should return UnknownMethod to the client.
541
This is a workaround for bugs in pre-1.19 clients that claim to
542
support receiving streams of CHK repositories. The pre-1.19 client
543
expects inventory records to be serialized in the format defined by
544
to_network_name, but in pre-1.19 (at least) that format definition
545
tries to use the xml5 serializer, which does not correctly handle
546
rich-roots. After 1.19 the client can also accept inventory-deltas
547
(which avoids this issue), and those clients will use the
548
Repository.get_stream_1.19 verb instead of this one.
549
So: if this repository is CHK, and the to_format doesn't match,
550
we should just fake an UnknownSmartMethod error so that the client
551
will fallback to VFS, rather than sending it a stream we know it
554
from_format = self._repository._format
555
to_format = self._to_format
556
if not from_format.supports_chks:
557
# Source not CHK: that's ok
559
if (to_format.supports_chks and
560
from_format.repository_class is to_format.repository_class and
561
from_format._serializer == to_format._serializer):
562
# Source is CHK, but target matches: that's ok
563
# (e.g. 2a->2a, or CHK2->2a)
565
# Source is CHK, and target is not CHK or incompatible CHK. We can't
566
# generate a compatible stream.
569
def do_body(self, body_bytes):
570
repository = self._repository
571
repository.lock_read()
573
search_result, error = self.recreate_search(repository, body_bytes,
575
if error is not None:
578
source = repository._get_source(self._to_format)
579
stream = source.get_stream(search_result)
581
exc_info = sys.exc_info()
583
# On non-error, unlocking is done by the body stream handler.
586
raise exc_info[0], exc_info[1], exc_info[2]
587
return SuccessfulSmartServerResponse(('ok',),
588
body_stream=self.body_stream(stream, repository))
590
def body_stream(self, stream, repository):
591
byte_stream = _stream_to_byte_stream(stream, repository._format)
593
for bytes in byte_stream:
595
except errors.RevisionNotPresent, e:
596
# This shouldn't be able to happen, but as we don't buffer
597
# everything it can in theory happen.
599
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
604
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
605
"""The same as Repository.get_stream, but will return stream CHK formats to
608
See SmartServerRepositoryGetStream._should_fake_unknown.
613
def _should_fake_unknown(self):
614
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
618
def _stream_to_byte_stream(stream, src_format):
619
"""Convert a record stream to a self delimited byte stream."""
620
pack_writer = pack.ContainerSerialiser()
621
yield pack_writer.begin()
622
yield pack_writer.bytes_record(src_format.network_name(), '')
623
for substream_type, substream in stream:
624
for record in substream:
625
if record.storage_kind in ('chunked', 'fulltext'):
626
serialised = record_to_fulltext_bytes(record)
627
elif record.storage_kind == 'absent':
628
raise ValueError("Absent factory for %s" % (record.key,))
630
serialised = record.get_bytes_as(record.storage_kind)
632
# Some streams embed the whole stream into the wire
633
# representation of the first record, which means that
634
# later records have no wire representation: we skip them.
635
yield pack_writer.bytes_record(serialised, [(substream_type,)])
636
yield pack_writer.end()
639
class _ByteStreamDecoder(object):
640
"""Helper for _byte_stream_to_stream.
642
The expected usage of this class is via the function _byte_stream_to_stream
643
which creates a _ByteStreamDecoder, pops off the stream format and then
644
yields the output of record_stream(), the main entry point to
647
Broadly this class has to unwrap two layers of iterators:
651
This is complicated by wishing to return type, iterator_for_type, but
652
getting the data for iterator_for_type when we find out type: we can't
653
simply pass a generator down to the NetworkRecordStream parser, instead
654
we have a little local state to seed each NetworkRecordStream instance,
655
and gather the type that we'll be yielding.
657
:ivar byte_stream: The byte stream being decoded.
658
:ivar stream_decoder: A pack parser used to decode the bytestream
659
:ivar current_type: The current type, used to join adjacent records of the
660
same type into a single stream.
661
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
664
def __init__(self, byte_stream, record_counter):
665
"""Create a _ByteStreamDecoder."""
666
self.stream_decoder = pack.ContainerPushParser()
667
self.current_type = None
668
self.first_bytes = None
669
self.byte_stream = byte_stream
670
self._record_counter = record_counter
673
def iter_stream_decoder(self):
674
"""Iterate the contents of the pack from stream_decoder."""
675
# dequeue pending items
676
for record in self.stream_decoder.read_pending_records():
678
# Pull bytes of the wire, decode them to records, yield those records.
679
for bytes in self.byte_stream:
680
self.stream_decoder.accept_bytes(bytes)
681
for record in self.stream_decoder.read_pending_records():
684
def iter_substream_bytes(self):
685
if self.first_bytes is not None:
686
yield self.first_bytes
687
# If we run out of pack records, single the outer layer to stop.
688
self.first_bytes = None
689
for record in self.iter_pack_records:
690
record_names, record_bytes = record
691
record_name, = record_names
692
substream_type = record_name[0]
693
if substream_type != self.current_type:
694
# end of a substream, seed the next substream.
695
self.current_type = substream_type
696
self.first_bytes = record_bytes
700
def record_stream(self):
701
"""Yield substream_type, substream from the byte stream."""
702
def wrap_and_count(pb, rc, substream):
703
"""Yield records from stream while showing progress."""
706
if self.current_type != 'revisions' and self.key_count != 0:
707
# As we know the number of revisions now (in self.key_count)
708
# we can setup and use record_counter (rc).
709
if not rc.is_initialized():
710
rc.setup(self.key_count, self.key_count)
711
for record in substream.read():
713
if rc.is_initialized() and counter == rc.STEP:
714
rc.increment(counter)
715
pb.update('Estimate', rc.current, rc.max)
717
if self.current_type == 'revisions':
718
# Total records is proportional to number of revs
719
# to fetch. With remote, we used self.key_count to
720
# track the number of revs. Once we have the revs
721
# counts in self.key_count, the progress bar changes
722
# from 'Estimating..' to 'Estimate' above.
724
if counter == rc.STEP:
725
pb.update('Estimating..', self.key_count)
731
pb = ui.ui_factory.nested_progress_bar()
732
rc = self._record_counter
733
# Make and consume sub generators, one per substream type:
734
while self.first_bytes is not None:
735
substream = NetworkRecordStream(self.iter_substream_bytes())
736
# after substream is fully consumed, self.current_type is set to
737
# the next type, and self.first_bytes is set to the matching bytes.
738
yield self.current_type, wrap_and_count(pb, rc, substream)
740
pb.update('Done', rc.max, rc.max)
743
def seed_state(self):
744
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
745
# Set a single generator we can use to get data from the pack stream.
746
self.iter_pack_records = self.iter_stream_decoder()
747
# Seed the very first subiterator with content; after this each one
749
list(self.iter_substream_bytes())
752
def _byte_stream_to_stream(byte_stream, record_counter=None):
753
"""Convert a byte stream into a format and a stream.
755
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
756
:return: (RepositoryFormat, stream_generator)
758
decoder = _ByteStreamDecoder(byte_stream, record_counter)
759
for bytes in byte_stream:
760
decoder.stream_decoder.accept_bytes(bytes)
761
for record in decoder.stream_decoder.read_pending_records(max=1):
762
record_names, src_format_name = record
763
src_format = network_format_registry.get(src_format_name)
764
return src_format, decoder.record_stream()
767
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
769
def do_repository_request(self, repository, token):
771
repository.lock_write(token=token)
772
except errors.TokenMismatch, e:
773
return FailedSmartServerResponse(('TokenMismatch',))
774
repository.dont_leave_lock_in_place()
776
return SuccessfulSmartServerResponse(('ok',))
779
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
780
"""Get the physical lock status for a repository.
785
def do_repository_request(self, repository):
786
if repository.get_physical_lock_status():
787
return SuccessfulSmartServerResponse(('yes', ))
789
return SuccessfulSmartServerResponse(('no', ))
792
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
794
def do_repository_request(self, repository, str_bool_new_value):
795
if str_bool_new_value == 'True':
799
repository.set_make_working_trees(new_value)
800
return SuccessfulSmartServerResponse(('ok',))
803
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
804
"""Get the raw repository files as a tarball.
806
The returned tarball contains a .bzr control directory which in turn
807
contains a repository.
809
This takes one parameter, compression, which currently must be
812
This is used to implement the Repository.copy_content_into operation.
815
def do_repository_request(self, repository, compression):
816
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
818
controldir_name = tmp_dirname + '/.bzr'
819
return self._tarfile_response(controldir_name, compression)
821
osutils.rmtree(tmp_dirname)
823
def _copy_to_tempdir(self, from_repo):
824
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
825
tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
826
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
827
from_repo.copy_content_into(tmp_repo)
828
return tmp_dirname, tmp_repo
830
def _tarfile_response(self, tmp_dirname, compression):
831
temp = tempfile.NamedTemporaryFile()
833
self._tarball_of_dir(tmp_dirname, compression, temp.file)
834
# all finished; write the tempfile out to the network
836
return SuccessfulSmartServerResponse(('ok',), temp.read())
837
# FIXME: Don't read the whole thing into memory here; rather stream
838
# it out from the file onto the network. mbp 20070411
842
def _tarball_of_dir(self, dirname, compression, ofile):
844
filename = os.path.basename(ofile.name)
845
tarball = tarfile.open(fileobj=ofile, name=filename,
846
mode='w|' + compression)
848
# The tarball module only accepts ascii names, and (i guess)
849
# packs them with their 8bit names. We know all the files
850
# within the repository have ASCII names so the should be safe
852
dirname = dirname.encode(sys.getfilesystemencoding())
853
# python's tarball module includes the whole path by default so
855
if not dirname.endswith('.bzr'):
856
raise ValueError(dirname)
857
tarball.add(dirname, '.bzr') # recursive by default
862
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
863
"""Insert a record stream from a RemoteSink into a repository.
865
This gets bytes pushed to it by the network infrastructure and turns that
866
into a bytes iterator using a thread. That is then processed by
867
_byte_stream_to_stream.
872
def do_repository_request(self, repository, resume_tokens, lock_token):
873
"""StreamSink.insert_stream for a remote repository."""
874
repository.lock_write(token=lock_token)
875
self.do_insert_stream_request(repository, resume_tokens)
877
def do_insert_stream_request(self, repository, resume_tokens):
878
tokens = [token for token in resume_tokens.split(' ') if token]
880
self.repository = repository
881
self.queue = Queue.Queue()
882
self.insert_thread = threading.Thread(target=self._inserter_thread)
883
self.insert_thread.start()
885
def do_chunk(self, body_stream_chunk):
886
self.queue.put(body_stream_chunk)
888
def _inserter_thread(self):
890
src_format, stream = _byte_stream_to_stream(
891
self.blocking_byte_stream())
892
self.insert_result = self.repository._get_sink().insert_stream(
893
stream, src_format, self.tokens)
894
self.insert_ok = True
896
self.insert_exception = sys.exc_info()
897
self.insert_ok = False
899
def blocking_byte_stream(self):
901
bytes = self.queue.get()
902
if bytes is StopIteration:
908
self.queue.put(StopIteration)
909
if self.insert_thread is not None:
910
self.insert_thread.join()
911
if not self.insert_ok:
912
exc_info = self.insert_exception
913
raise exc_info[0], exc_info[1], exc_info[2]
914
write_group_tokens, missing_keys = self.insert_result
915
if write_group_tokens or missing_keys:
916
# bzip needed? missing keys should typically be a small set.
917
# Should this be a streaming body response ?
918
missing_keys = sorted(missing_keys)
919
bytes = bencode.bencode((write_group_tokens, missing_keys))
920
self.repository.unlock()
921
return SuccessfulSmartServerResponse(('missing-basis', bytes))
923
self.repository.unlock()
924
return SuccessfulSmartServerResponse(('ok', ))
927
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
928
"""Insert a record stream from a RemoteSink into a repository.
930
Same as SmartServerRepositoryInsertStreamLocked, except:
931
- the lock token argument is optional
932
- servers that implement this verb accept 'inventory-delta' records in the
938
def do_repository_request(self, repository, resume_tokens, lock_token=None):
939
"""StreamSink.insert_stream for a remote repository."""
940
SmartServerRepositoryInsertStreamLocked.do_repository_request(
941
self, repository, resume_tokens, lock_token)
944
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
945
"""Insert a record stream from a RemoteSink into an unlocked repository.
947
This is the same as SmartServerRepositoryInsertStreamLocked, except it
948
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
949
like pack format) repository.
954
def do_repository_request(self, repository, resume_tokens):
955
"""StreamSink.insert_stream for a remote repository."""
956
repository.lock_write()
957
self.do_insert_stream_request(repository, resume_tokens)
960
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
961
"""Add a revision signature text.
966
def do_repository_request(self, repository, lock_token, revision_id,
967
*write_group_tokens):
968
"""Add a revision signature text.
970
:param repository: Repository to operate on
971
:param lock_token: Lock token
972
:param revision_id: Revision for which to add signature
973
:param write_group_tokens: Write group tokens
975
self._lock_token = lock_token
976
self._revision_id = revision_id
977
self._write_group_tokens = write_group_tokens
980
def do_body(self, body_bytes):
981
"""Add a signature text.
983
:param body_bytes: GPG signature text
984
:return: SuccessfulSmartServerResponse with arguments 'ok' and
985
the list of new write group tokens.
987
self._repository.lock_write(token=self._lock_token)
989
self._repository.resume_write_group(self._write_group_tokens)
991
self._repository.add_signature_text(self._revision_id,
994
new_write_group_tokens = self._repository.suspend_write_group()
996
self._repository.unlock()
997
return SuccessfulSmartServerResponse(
998
('ok', ) + tuple(new_write_group_tokens))
1001
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1002
"""Start a write group.
1007
def do_repository_request(self, repository, lock_token):
1008
"""Start a write group."""
1009
repository.lock_write(token=lock_token)
1011
repository.start_write_group()
1013
tokens = repository.suspend_write_group()
1014
except errors.UnsuspendableWriteGroup:
1015
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1018
return SuccessfulSmartServerResponse(('ok', tokens))
1021
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1022
"""Commit a write group.
1027
def do_repository_request(self, repository, lock_token,
1028
write_group_tokens):
1029
"""Commit a write group."""
1030
repository.lock_write(token=lock_token)
1033
repository.resume_write_group(write_group_tokens)
1034
except errors.UnresumableWriteGroup, e:
1035
return FailedSmartServerResponse(
1036
('UnresumableWriteGroup', e.write_groups, e.reason))
1038
repository.commit_write_group()
1040
write_group_tokens = repository.suspend_write_group()
1041
# FIXME JRV 2011-11-19: What if the write_group_tokens
1046
return SuccessfulSmartServerResponse(('ok', ))
1049
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1050
"""Abort a write group.
1055
def do_repository_request(self, repository, lock_token, write_group_tokens):
1056
"""Abort a write group."""
1057
repository.lock_write(token=lock_token)
1060
repository.resume_write_group(write_group_tokens)
1061
except errors.UnresumableWriteGroup, e:
1062
return FailedSmartServerResponse(
1063
('UnresumableWriteGroup', e.write_groups, e.reason))
1064
repository.abort_write_group()
1067
return SuccessfulSmartServerResponse(('ok', ))
1070
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1071
"""Check that a write group is still valid.
1076
def do_repository_request(self, repository, lock_token, write_group_tokens):
1077
"""Abort a write group."""
1078
repository.lock_write(token=lock_token)
1081
repository.resume_write_group(write_group_tokens)
1082
except errors.UnresumableWriteGroup, e:
1083
return FailedSmartServerResponse(
1084
('UnresumableWriteGroup', e.write_groups, e.reason))
1086
repository.suspend_write_group()
1089
return SuccessfulSmartServerResponse(('ok', ))
1092
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1093
"""Retrieve all of the revision ids in a repository.
1098
def do_repository_request(self, repository):
1099
revids = repository.all_revision_ids()
1100
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1103
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1104
"""Pack a repository.
1109
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1110
self._repository = repository
1111
self._lock_token = lock_token
1112
if clean_obsolete_packs == 'True':
1113
self._clean_obsolete_packs = True
1115
self._clean_obsolete_packs = False
1118
def do_body(self, body_bytes):
1119
if body_bytes == "":
1122
hint = body_bytes.splitlines()
1123
self._repository.lock_write(token=self._lock_token)
1125
self._repository.pack(hint, self._clean_obsolete_packs)
1127
self._repository.unlock()
1128
return SuccessfulSmartServerResponse(("ok", ), )
1131
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1132
"""Iterate over the contents of files.
1134
The client sends a list of desired files to stream, one
1135
per line, and as tuples of file id and revision, separated by
1138
The server replies with a stream. Each entry is preceded by a header,
1139
which can either be:
1141
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1142
list sent by the client. This header is followed by the contents of
1143
the file, bzip2-compressed.
1144
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1145
The client can then raise an appropriate RevisionNotPresent error
1146
or check its fallback repositories.
1151
def body_stream(self, repository, desired_files):
1152
self._repository.lock_read()
1155
for i, key in enumerate(desired_files):
1157
for record in repository.texts.get_record_stream(text_keys,
1159
identifier = text_keys[record.key]
1160
if record.storage_kind == 'absent':
1161
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1162
record.key[1], identifier)
1163
# FIXME: Way to abort early?
1165
yield "ok\0%d\n" % identifier
1166
compressor = zlib.compressobj()
1167
for bytes in record.get_bytes_as('chunked'):
1168
data = compressor.compress(bytes)
1171
data = compressor.flush()
1175
self._repository.unlock()
1177
def do_body(self, body_bytes):
1179
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1180
return SuccessfulSmartServerResponse(('ok', ),
1181
body_stream=self.body_stream(self._repository, desired_files))
1183
def do_repository_request(self, repository):
1184
# Signal that we want a body
1188
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1189
"""Stream a list of revisions.
1191
The client sends a list of newline-separated revision ids in the
1192
body of the request and the server replies with the serializer format,
1193
and a stream of bzip2-compressed revision texts (using the specified
1196
Any revisions the server does not have are omitted from the stream.
1201
def do_repository_request(self, repository):
1202
self._repository = repository
1203
# Signal there is a body
1206
def do_body(self, body_bytes):
1207
revision_ids = body_bytes.split("\n")
1208
return SuccessfulSmartServerResponse(
1209
('ok', self._repository.get_serializer_format()),
1210
body_stream=self.body_stream(self._repository, revision_ids))
1212
def body_stream(self, repository, revision_ids):
1213
self._repository.lock_read()
1215
for record in repository.revisions.get_record_stream(
1216
[(revid,) for revid in revision_ids], 'unordered', True):
1217
if record.storage_kind == 'absent':
1219
yield zlib.compress(record.get_bytes_as('fulltext'))
1221
self._repository.unlock()