1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implementations."""
19
from __future__ import absolute_import
32
estimate_compressed_size,
33
inventory as _mod_inventory,
41
from bzrlib.bzrdir import BzrDir
42
from bzrlib.smart.request import (
43
FailedSmartServerResponse,
45
SuccessfulSmartServerResponse,
47
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
48
from bzrlib import revision as _mod_revision
49
from bzrlib.versionedfile import (
50
ChunkedContentFactory,
52
record_to_fulltext_bytes,
56
class SmartServerRepositoryRequest(SmartServerRequest):
57
"""Common base class for Repository requests."""
59
def do(self, path, *args):
60
"""Execute a repository request.
62
All Repository requests take a path to the repository as their first
63
argument. The repository must be at the exact path given by the
64
client - no searching is done.
66
The actual logic is delegated to self.do_repository_request.
68
:param client_path: The path for the repository as received from the
70
:return: A SmartServerResponse from self.do_repository_request().
72
transport = self.transport_from_client_path(path)
73
bzrdir = BzrDir.open_from_transport(transport)
74
# Save the repository for use with do_body.
75
self._repository = bzrdir.open_repository()
76
return self.do_repository_request(self._repository, *args)
78
def do_repository_request(self, repository, *args):
79
"""Override to provide an implementation for a verb."""
80
# No-op for verbs that take bodies (None as a result indicates a body
84
def recreate_search(self, repository, search_bytes, discard_excess=False):
85
"""Recreate a search from its serialised form.
87
:param discard_excess: If True, and the search refers to data we don't
88
have, just silently accept that fact - the verb calling
89
recreate_search trusts that clients will look for missing things
90
they expected and get it from elsewhere.
92
if search_bytes == 'everything':
93
return vf_search.EverythingResult(repository), None
94
lines = search_bytes.split('\n')
95
if lines[0] == 'ancestry-of':
97
search_result = vf_search.PendingAncestryResult(heads, repository)
98
return search_result, None
99
elif lines[0] == 'search':
100
return self.recreate_search_from_recipe(repository, lines[1:],
101
discard_excess=discard_excess)
103
return (None, FailedSmartServerResponse(('BadSearch',)))
105
def recreate_search_from_recipe(self, repository, lines,
106
discard_excess=False):
107
"""Recreate a specific revision search (vs a from-tip search).
109
:param discard_excess: If True, and the search refers to data we don't
110
have, just silently accept that fact - the verb calling
111
recreate_search trusts that clients will look for missing things
112
they expected and get it from elsewhere.
114
start_keys = set(lines[0].split(' '))
115
exclude_keys = set(lines[1].split(' '))
116
revision_count = int(lines[2])
117
repository.lock_read()
119
search = repository.get_graph()._make_breadth_first_searcher(
123
next_revs = search.next()
124
except StopIteration:
126
search.stop_searching_any(exclude_keys.intersection(next_revs))
127
(started_keys, excludes, included_keys) = search.get_state()
128
if (not discard_excess and len(included_keys) != revision_count):
129
# we got back a different amount of data than expected, this
130
# gets reported as NoSuchRevision, because less revisions
131
# indicates missing revisions, and more should never happen as
132
# the excludes list considers ghosts and ensures that ghost
133
# filling races are not a problem.
134
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
135
search_result = vf_search.SearchResult(started_keys, excludes,
136
len(included_keys), included_keys)
137
return (search_result, None)
142
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
143
"""Calls self.do_readlocked_repository_request."""
145
def do_repository_request(self, repository, *args):
146
"""Read lock a repository for do_readlocked_repository_request."""
147
repository.lock_read()
149
return self.do_readlocked_repository_request(repository, *args)
154
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
155
"""Break a repository lock."""
157
def do_repository_request(self, repository):
158
repository.break_lock()
159
return SuccessfulSmartServerResponse(('ok', ))
164
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
165
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
167
no_extra_results = False
169
def do_repository_request(self, repository, *revision_ids):
170
"""Get parent details for some revisions.
172
All the parents for revision_ids are returned. Additionally up to 64KB
173
of additional parent data found by performing a breadth first search
174
from revision_ids is returned. The verb takes a body containing the
175
current search state, see do_body for details.
177
If 'include-missing:' is in revision_ids, ghosts encountered in the
178
graph traversal for getting parent data are included in the result with
179
a prefix of 'missing:'.
181
:param repository: The repository to query in.
182
:param revision_ids: The utf8 encoded revision_id to answer for.
184
self._revision_ids = revision_ids
185
return None # Signal that we want a body.
187
def do_body(self, body_bytes):
188
"""Process the current search state and perform the parent lookup.
190
:return: A smart server response where the body contains an utf8
191
encoded flattened list of the parents of the revisions (the same
192
format as Repository.get_revision_graph) which has been bz2
195
repository = self._repository
196
repository.lock_read()
198
return self._do_repository_request(body_bytes)
202
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
203
include_missing, max_size=65536):
206
estimator = estimate_compressed_size.ZLibEstimator(max_size)
207
next_revs = revision_ids
208
first_loop_done = False
210
queried_revs.update(next_revs)
211
parent_map = repo_graph.get_parent_map(next_revs)
212
current_revs = next_revs
214
for revision_id in current_revs:
216
parents = parent_map.get(revision_id)
217
if parents is not None:
218
# adjust for the wire
219
if parents == (_mod_revision.NULL_REVISION,):
221
# prepare the next query
222
next_revs.update(parents)
223
encoded_id = revision_id
226
encoded_id = "missing:" + revision_id
228
if (revision_id not in client_seen_revs and
229
(not missing_rev or include_missing)):
230
# Client does not have this revision, give it to it.
231
# add parents to the result
232
result[encoded_id] = parents
233
# Approximate the serialized cost of this revision_id.
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
236
# get all the directly asked for parents, and then flesh out to
237
# 64K (compressed) or so. We do one level of depth at a time to
238
# stay in sync with the client. The 250000 magic number is
239
# estimated compression ratio taken from bzr.dev itself.
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
246
# don't query things we've already queried
247
next_revs = next_revs.difference(queried_revs)
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
272
# sorting trivially puts lexographically similar revision ids together.
275
for revision, parents in sorted(result.items()):
276
lines.append(' '.join((revision, ) + tuple(parents)))
278
return SuccessfulSmartServerResponse(
279
('ok', ), bz2.compress('\n'.join(lines)))
282
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
284
def do_readlocked_repository_request(self, repository, revision_id):
285
"""Return the result of repository.get_revision_graph(revision_id).
287
Deprecated as of bzr 1.4, but supported for older clients.
289
:param repository: The repository to query in.
290
:param revision_id: The utf8 encoded revision_id to get a graph from.
291
:return: A smart server response where the body contains an utf8
292
encoded flattened list of the revision graph.
298
graph = repository.get_graph()
300
search_ids = [revision_id]
302
search_ids = repository.all_revision_ids()
303
search = graph._make_breadth_first_searcher(search_ids)
304
transitive_ids = set()
305
map(transitive_ids.update, list(search))
306
parent_map = graph.get_parent_map(transitive_ids)
307
revision_graph = _strip_NULL_ghosts(parent_map)
308
if revision_id and revision_id not in revision_graph:
309
# Note that we return an empty body, rather than omitting the body.
310
# This way the client knows that it can always expect to find a body
311
# in the response for this method, even in the error case.
312
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
314
for revision, parents in revision_graph.items():
315
lines.append(' '.join((revision, ) + tuple(parents)))
317
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
320
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
322
def do_readlocked_repository_request(self, repository, revno,
324
"""Find the revid for a given revno, given a known revno/revid pair.
329
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
330
except errors.RevisionNotPresent, err:
331
if err.revision_id != known_pair[1]:
332
raise AssertionError(
333
'get_rev_id_for_revno raised RevisionNotPresent for '
334
'non-initial revision: ' + err.revision_id)
335
return FailedSmartServerResponse(
336
('nosuchrevision', err.revision_id))
338
return SuccessfulSmartServerResponse(('ok', result))
340
earliest_revno, earliest_revid = result
341
return SuccessfulSmartServerResponse(
342
('history-incomplete', earliest_revno, earliest_revid))
345
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
347
def do_repository_request(self, repository):
348
"""Return the serializer format for this repository.
352
:param repository: The repository to query
353
:return: A smart server response ('ok', FORMAT)
355
serializer = repository.get_serializer_format()
356
return SuccessfulSmartServerResponse(('ok', serializer))
359
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
361
def do_repository_request(self, repository, revision_id):
362
"""Return ok if a specific revision is in the repository at path.
364
:param repository: The repository to query in.
365
:param revision_id: The utf8 encoded revision_id to lookup.
366
:return: A smart server response of ('yes', ) if the revision is
367
present. ('no', ) if it is missing.
369
if repository.has_revision(revision_id):
370
return SuccessfulSmartServerResponse(('yes', ))
372
return SuccessfulSmartServerResponse(('no', ))
375
class SmartServerRequestHasSignatureForRevisionId(
376
SmartServerRepositoryRequest):
378
def do_repository_request(self, repository, revision_id):
379
"""Return ok if a signature is present for a revision.
381
Introduced in bzr 2.5.0.
383
:param repository: The repository to query in.
384
:param revision_id: The utf8 encoded revision_id to lookup.
385
:return: A smart server response of ('yes', ) if a
386
signature for the revision is present,
387
('no', ) if it is missing.
390
if repository.has_signature_for_revision_id(revision_id):
391
return SuccessfulSmartServerResponse(('yes', ))
393
return SuccessfulSmartServerResponse(('no', ))
394
except errors.NoSuchRevision:
395
return FailedSmartServerResponse(
396
('nosuchrevision', revision_id))
399
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
401
def do_repository_request(self, repository, revid, committers):
402
"""Return the result of repository.gather_stats().
404
:param repository: The repository to query in.
405
:param revid: utf8 encoded rev id or an empty string to indicate None
406
:param committers: 'yes' or 'no'.
408
:return: A SmartServerResponse ('ok',), a encoded body looking like
411
latestrev: 345.700 3600
414
But containing only fields returned by the gather_stats() call
417
decoded_revision_id = None
419
decoded_revision_id = revid
420
if committers == 'yes':
421
decoded_committers = True
423
decoded_committers = None
425
stats = repository.gather_stats(decoded_revision_id,
427
except errors.NoSuchRevision:
428
return FailedSmartServerResponse(('nosuchrevision', revid))
431
if stats.has_key('committers'):
432
body += 'committers: %d\n' % stats['committers']
433
if stats.has_key('firstrev'):
434
body += 'firstrev: %.3f %d\n' % stats['firstrev']
435
if stats.has_key('latestrev'):
436
body += 'latestrev: %.3f %d\n' % stats['latestrev']
437
if stats.has_key('revisions'):
438
body += 'revisions: %d\n' % stats['revisions']
439
if stats.has_key('size'):
440
body += 'size: %d\n' % stats['size']
442
return SuccessfulSmartServerResponse(('ok', ), body)
445
class SmartServerRepositoryGetRevisionSignatureText(
446
SmartServerRepositoryRequest):
447
"""Return the signature text of a revision.
452
def do_repository_request(self, repository, revision_id):
453
"""Return the result of repository.get_signature_text().
455
:param repository: The repository to query in.
456
:return: A smart server response of with the signature text as
460
text = repository.get_signature_text(revision_id)
461
except errors.NoSuchRevision, err:
462
return FailedSmartServerResponse(
463
('nosuchrevision', err.revision))
464
return SuccessfulSmartServerResponse(('ok', ), text)
467
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
469
def do_repository_request(self, repository):
470
"""Return the result of repository.is_shared().
472
:param repository: The repository to query in.
473
:return: A smart server response of ('yes', ) if the repository is
474
shared, and ('no', ) if it is not.
476
if repository.is_shared():
477
return SuccessfulSmartServerResponse(('yes', ))
479
return SuccessfulSmartServerResponse(('no', ))
482
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
484
def do_repository_request(self, repository):
485
"""Return the result of repository.make_working_trees().
487
Introduced in bzr 2.5.0.
489
:param repository: The repository to query in.
490
:return: A smart server response of ('yes', ) if the repository uses
491
working trees, and ('no', ) if it is not.
493
if repository.make_working_trees():
494
return SuccessfulSmartServerResponse(('yes', ))
496
return SuccessfulSmartServerResponse(('no', ))
499
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
501
def do_repository_request(self, repository, token=''):
502
# XXX: this probably should not have a token.
506
token = repository.lock_write(token=token).repository_token
507
except errors.LockContention, e:
508
return FailedSmartServerResponse(('LockContention',))
509
except errors.UnlockableTransport:
510
return FailedSmartServerResponse(('UnlockableTransport',))
511
except errors.LockFailed, e:
512
return FailedSmartServerResponse(('LockFailed',
513
str(e.lock), str(e.why)))
514
if token is not None:
515
repository.leave_lock_in_place()
519
return SuccessfulSmartServerResponse(('ok', token))
522
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
524
def do_repository_request(self, repository, to_network_name):
525
"""Get a stream for inserting into a to_format repository.
527
The request body is 'search_bytes', a description of the revisions
530
In 2.3 this verb added support for search_bytes == 'everything'. Older
531
implementations will respond with a BadSearch error, and clients should
532
catch this and fallback appropriately.
534
:param repository: The repository to stream from.
535
:param to_network_name: The network name of the format of the target
538
self._to_format = network_format_registry.get(to_network_name)
539
if self._should_fake_unknown():
540
return FailedSmartServerResponse(
541
('UnknownMethod', 'Repository.get_stream'))
542
return None # Signal that we want a body.
544
def _should_fake_unknown(self):
545
"""Return True if we should return UnknownMethod to the client.
547
This is a workaround for bugs in pre-1.19 clients that claim to
548
support receiving streams of CHK repositories. The pre-1.19 client
549
expects inventory records to be serialized in the format defined by
550
to_network_name, but in pre-1.19 (at least) that format definition
551
tries to use the xml5 serializer, which does not correctly handle
552
rich-roots. After 1.19 the client can also accept inventory-deltas
553
(which avoids this issue), and those clients will use the
554
Repository.get_stream_1.19 verb instead of this one.
555
So: if this repository is CHK, and the to_format doesn't match,
556
we should just fake an UnknownSmartMethod error so that the client
557
will fallback to VFS, rather than sending it a stream we know it
560
from_format = self._repository._format
561
to_format = self._to_format
562
if not from_format.supports_chks:
563
# Source not CHK: that's ok
565
if (to_format.supports_chks and
566
from_format.repository_class is to_format.repository_class and
567
from_format._serializer == to_format._serializer):
568
# Source is CHK, but target matches: that's ok
569
# (e.g. 2a->2a, or CHK2->2a)
571
# Source is CHK, and target is not CHK or incompatible CHK. We can't
572
# generate a compatible stream.
575
def do_body(self, body_bytes):
576
repository = self._repository
577
repository.lock_read()
579
search_result, error = self.recreate_search(repository, body_bytes,
581
if error is not None:
584
source = repository._get_source(self._to_format)
585
stream = source.get_stream(search_result)
587
exc_info = sys.exc_info()
589
# On non-error, unlocking is done by the body stream handler.
592
raise exc_info[0], exc_info[1], exc_info[2]
593
return SuccessfulSmartServerResponse(('ok',),
594
body_stream=self.body_stream(stream, repository))
596
def body_stream(self, stream, repository):
597
byte_stream = _stream_to_byte_stream(stream, repository._format)
599
for bytes in byte_stream:
601
except errors.RevisionNotPresent, e:
602
# This shouldn't be able to happen, but as we don't buffer
603
# everything it can in theory happen.
605
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
610
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
611
"""The same as Repository.get_stream, but will return stream CHK formats to
614
See SmartServerRepositoryGetStream._should_fake_unknown.
619
def _should_fake_unknown(self):
620
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
624
def _stream_to_byte_stream(stream, src_format):
625
"""Convert a record stream to a self delimited byte stream."""
626
pack_writer = pack.ContainerSerialiser()
627
yield pack_writer.begin()
628
yield pack_writer.bytes_record(src_format.network_name(), '')
629
for substream_type, substream in stream:
630
for record in substream:
631
if record.storage_kind in ('chunked', 'fulltext'):
632
serialised = record_to_fulltext_bytes(record)
633
elif record.storage_kind == 'absent':
634
raise ValueError("Absent factory for %s" % (record.key,))
636
serialised = record.get_bytes_as(record.storage_kind)
638
# Some streams embed the whole stream into the wire
639
# representation of the first record, which means that
640
# later records have no wire representation: we skip them.
641
yield pack_writer.bytes_record(serialised, [(substream_type,)])
642
yield pack_writer.end()
645
class _ByteStreamDecoder(object):
646
"""Helper for _byte_stream_to_stream.
648
The expected usage of this class is via the function _byte_stream_to_stream
649
which creates a _ByteStreamDecoder, pops off the stream format and then
650
yields the output of record_stream(), the main entry point to
653
Broadly this class has to unwrap two layers of iterators:
657
This is complicated by wishing to return type, iterator_for_type, but
658
getting the data for iterator_for_type when we find out type: we can't
659
simply pass a generator down to the NetworkRecordStream parser, instead
660
we have a little local state to seed each NetworkRecordStream instance,
661
and gather the type that we'll be yielding.
663
:ivar byte_stream: The byte stream being decoded.
664
:ivar stream_decoder: A pack parser used to decode the bytestream
665
:ivar current_type: The current type, used to join adjacent records of the
666
same type into a single stream.
667
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
670
def __init__(self, byte_stream, record_counter):
671
"""Create a _ByteStreamDecoder."""
672
self.stream_decoder = pack.ContainerPushParser()
673
self.current_type = None
674
self.first_bytes = None
675
self.byte_stream = byte_stream
676
self._record_counter = record_counter
679
def iter_stream_decoder(self):
680
"""Iterate the contents of the pack from stream_decoder."""
681
# dequeue pending items
682
for record in self.stream_decoder.read_pending_records():
684
# Pull bytes of the wire, decode them to records, yield those records.
685
for bytes in self.byte_stream:
686
self.stream_decoder.accept_bytes(bytes)
687
for record in self.stream_decoder.read_pending_records():
690
def iter_substream_bytes(self):
691
if self.first_bytes is not None:
692
yield self.first_bytes
693
# If we run out of pack records, single the outer layer to stop.
694
self.first_bytes = None
695
for record in self.iter_pack_records:
696
record_names, record_bytes = record
697
record_name, = record_names
698
substream_type = record_name[0]
699
if substream_type != self.current_type:
700
# end of a substream, seed the next substream.
701
self.current_type = substream_type
702
self.first_bytes = record_bytes
706
def record_stream(self):
707
"""Yield substream_type, substream from the byte stream."""
708
def wrap_and_count(pb, rc, substream):
709
"""Yield records from stream while showing progress."""
712
if self.current_type != 'revisions' and self.key_count != 0:
713
# As we know the number of revisions now (in self.key_count)
714
# we can setup and use record_counter (rc).
715
if not rc.is_initialized():
716
rc.setup(self.key_count, self.key_count)
717
for record in substream.read():
719
if rc.is_initialized() and counter == rc.STEP:
720
rc.increment(counter)
721
pb.update('Estimate', rc.current, rc.max)
723
if self.current_type == 'revisions':
724
# Total records is proportional to number of revs
725
# to fetch. With remote, we used self.key_count to
726
# track the number of revs. Once we have the revs
727
# counts in self.key_count, the progress bar changes
728
# from 'Estimating..' to 'Estimate' above.
730
if counter == rc.STEP:
731
pb.update('Estimating..', self.key_count)
737
pb = ui.ui_factory.nested_progress_bar()
738
rc = self._record_counter
739
# Make and consume sub generators, one per substream type:
740
while self.first_bytes is not None:
741
substream = NetworkRecordStream(self.iter_substream_bytes())
742
# after substream is fully consumed, self.current_type is set to
743
# the next type, and self.first_bytes is set to the matching bytes.
744
yield self.current_type, wrap_and_count(pb, rc, substream)
746
pb.update('Done', rc.max, rc.max)
749
def seed_state(self):
750
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
751
# Set a single generator we can use to get data from the pack stream.
752
self.iter_pack_records = self.iter_stream_decoder()
753
# Seed the very first subiterator with content; after this each one
755
list(self.iter_substream_bytes())
758
def _byte_stream_to_stream(byte_stream, record_counter=None):
759
"""Convert a byte stream into a format and a stream.
761
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
762
:return: (RepositoryFormat, stream_generator)
764
decoder = _ByteStreamDecoder(byte_stream, record_counter)
765
for bytes in byte_stream:
766
decoder.stream_decoder.accept_bytes(bytes)
767
for record in decoder.stream_decoder.read_pending_records(max=1):
768
record_names, src_format_name = record
769
src_format = network_format_registry.get(src_format_name)
770
return src_format, decoder.record_stream()
773
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
775
def do_repository_request(self, repository, token):
777
repository.lock_write(token=token)
778
except errors.TokenMismatch, e:
779
return FailedSmartServerResponse(('TokenMismatch',))
780
repository.dont_leave_lock_in_place()
782
return SuccessfulSmartServerResponse(('ok',))
785
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
786
"""Get the physical lock status for a repository.
791
def do_repository_request(self, repository):
792
if repository.get_physical_lock_status():
793
return SuccessfulSmartServerResponse(('yes', ))
795
return SuccessfulSmartServerResponse(('no', ))
798
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
800
def do_repository_request(self, repository, str_bool_new_value):
801
if str_bool_new_value == 'True':
805
repository.set_make_working_trees(new_value)
806
return SuccessfulSmartServerResponse(('ok',))
809
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
810
"""Get the raw repository files as a tarball.
812
The returned tarball contains a .bzr control directory which in turn
813
contains a repository.
815
This takes one parameter, compression, which currently must be
818
This is used to implement the Repository.copy_content_into operation.
821
def do_repository_request(self, repository, compression):
822
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
824
controldir_name = tmp_dirname + '/.bzr'
825
return self._tarfile_response(controldir_name, compression)
827
osutils.rmtree(tmp_dirname)
829
def _copy_to_tempdir(self, from_repo):
830
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
831
tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
832
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
833
from_repo.copy_content_into(tmp_repo)
834
return tmp_dirname, tmp_repo
836
def _tarfile_response(self, tmp_dirname, compression):
837
temp = tempfile.NamedTemporaryFile()
839
self._tarball_of_dir(tmp_dirname, compression, temp.file)
840
# all finished; write the tempfile out to the network
842
return SuccessfulSmartServerResponse(('ok',), temp.read())
843
# FIXME: Don't read the whole thing into memory here; rather stream
844
# it out from the file onto the network. mbp 20070411
848
def _tarball_of_dir(self, dirname, compression, ofile):
850
filename = os.path.basename(ofile.name)
851
tarball = tarfile.open(fileobj=ofile, name=filename,
852
mode='w|' + compression)
854
# The tarball module only accepts ascii names, and (i guess)
855
# packs them with their 8bit names. We know all the files
856
# within the repository have ASCII names so the should be safe
858
dirname = dirname.encode(sys.getfilesystemencoding())
859
# python's tarball module includes the whole path by default so
861
if not dirname.endswith('.bzr'):
862
raise ValueError(dirname)
863
tarball.add(dirname, '.bzr') # recursive by default
868
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
869
"""Insert a record stream from a RemoteSink into a repository.
871
This gets bytes pushed to it by the network infrastructure and turns that
872
into a bytes iterator using a thread. That is then processed by
873
_byte_stream_to_stream.
878
def do_repository_request(self, repository, resume_tokens, lock_token):
879
"""StreamSink.insert_stream for a remote repository."""
880
repository.lock_write(token=lock_token)
881
self.do_insert_stream_request(repository, resume_tokens)
883
def do_insert_stream_request(self, repository, resume_tokens):
884
tokens = [token for token in resume_tokens.split(' ') if token]
886
self.repository = repository
887
self.queue = Queue.Queue()
888
self.insert_thread = threading.Thread(target=self._inserter_thread)
889
self.insert_thread.start()
891
def do_chunk(self, body_stream_chunk):
892
self.queue.put(body_stream_chunk)
894
def _inserter_thread(self):
896
src_format, stream = _byte_stream_to_stream(
897
self.blocking_byte_stream())
898
self.insert_result = self.repository._get_sink().insert_stream(
899
stream, src_format, self.tokens)
900
self.insert_ok = True
902
self.insert_exception = sys.exc_info()
903
self.insert_ok = False
905
def blocking_byte_stream(self):
907
bytes = self.queue.get()
908
if bytes is StopIteration:
914
self.queue.put(StopIteration)
915
if self.insert_thread is not None:
916
self.insert_thread.join()
917
if not self.insert_ok:
918
exc_info = self.insert_exception
919
raise exc_info[0], exc_info[1], exc_info[2]
920
write_group_tokens, missing_keys = self.insert_result
921
if write_group_tokens or missing_keys:
922
# bzip needed? missing keys should typically be a small set.
923
# Should this be a streaming body response ?
924
missing_keys = sorted(missing_keys)
925
bytes = bencode.bencode((write_group_tokens, missing_keys))
926
self.repository.unlock()
927
return SuccessfulSmartServerResponse(('missing-basis', bytes))
929
self.repository.unlock()
930
return SuccessfulSmartServerResponse(('ok', ))
933
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
934
"""Insert a record stream from a RemoteSink into a repository.
936
Same as SmartServerRepositoryInsertStreamLocked, except:
937
- the lock token argument is optional
938
- servers that implement this verb accept 'inventory-delta' records in the
944
def do_repository_request(self, repository, resume_tokens, lock_token=None):
945
"""StreamSink.insert_stream for a remote repository."""
946
SmartServerRepositoryInsertStreamLocked.do_repository_request(
947
self, repository, resume_tokens, lock_token)
950
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
951
"""Insert a record stream from a RemoteSink into an unlocked repository.
953
This is the same as SmartServerRepositoryInsertStreamLocked, except it
954
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
955
like pack format) repository.
960
def do_repository_request(self, repository, resume_tokens):
961
"""StreamSink.insert_stream for a remote repository."""
962
repository.lock_write()
963
self.do_insert_stream_request(repository, resume_tokens)
966
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
967
"""Add a revision signature text.
972
def do_repository_request(self, repository, lock_token, revision_id,
973
*write_group_tokens):
974
"""Add a revision signature text.
976
:param repository: Repository to operate on
977
:param lock_token: Lock token
978
:param revision_id: Revision for which to add signature
979
:param write_group_tokens: Write group tokens
981
self._lock_token = lock_token
982
self._revision_id = revision_id
983
self._write_group_tokens = write_group_tokens
986
def do_body(self, body_bytes):
987
"""Add a signature text.
989
:param body_bytes: GPG signature text
990
:return: SuccessfulSmartServerResponse with arguments 'ok' and
991
the list of new write group tokens.
993
self._repository.lock_write(token=self._lock_token)
995
self._repository.resume_write_group(self._write_group_tokens)
997
self._repository.add_signature_text(self._revision_id,
1000
new_write_group_tokens = self._repository.suspend_write_group()
1002
self._repository.unlock()
1003
return SuccessfulSmartServerResponse(
1004
('ok', ) + tuple(new_write_group_tokens))
1007
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1008
"""Start a write group.
1013
def do_repository_request(self, repository, lock_token):
1014
"""Start a write group."""
1015
repository.lock_write(token=lock_token)
1017
repository.start_write_group()
1019
tokens = repository.suspend_write_group()
1020
except errors.UnsuspendableWriteGroup:
1021
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1024
return SuccessfulSmartServerResponse(('ok', tokens))
1027
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1028
"""Commit a write group.
1033
def do_repository_request(self, repository, lock_token,
1034
write_group_tokens):
1035
"""Commit a write group."""
1036
repository.lock_write(token=lock_token)
1039
repository.resume_write_group(write_group_tokens)
1040
except errors.UnresumableWriteGroup, e:
1041
return FailedSmartServerResponse(
1042
('UnresumableWriteGroup', e.write_groups, e.reason))
1044
repository.commit_write_group()
1046
write_group_tokens = repository.suspend_write_group()
1047
# FIXME JRV 2011-11-19: What if the write_group_tokens
1052
return SuccessfulSmartServerResponse(('ok', ))
1055
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1056
"""Abort a write group.
1061
def do_repository_request(self, repository, lock_token, write_group_tokens):
1062
"""Abort a write group."""
1063
repository.lock_write(token=lock_token)
1066
repository.resume_write_group(write_group_tokens)
1067
except errors.UnresumableWriteGroup, e:
1068
return FailedSmartServerResponse(
1069
('UnresumableWriteGroup', e.write_groups, e.reason))
1070
repository.abort_write_group()
1073
return SuccessfulSmartServerResponse(('ok', ))
1076
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1077
"""Check that a write group is still valid.
1082
def do_repository_request(self, repository, lock_token, write_group_tokens):
1083
"""Abort a write group."""
1084
repository.lock_write(token=lock_token)
1087
repository.resume_write_group(write_group_tokens)
1088
except errors.UnresumableWriteGroup, e:
1089
return FailedSmartServerResponse(
1090
('UnresumableWriteGroup', e.write_groups, e.reason))
1092
repository.suspend_write_group()
1095
return SuccessfulSmartServerResponse(('ok', ))
1098
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1099
"""Retrieve all of the revision ids in a repository.
1104
def do_repository_request(self, repository):
1105
revids = repository.all_revision_ids()
1106
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1109
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1110
"""Reconcile a repository.
1115
def do_repository_request(self, repository, lock_token):
1117
repository.lock_write(token=lock_token)
1118
except errors.TokenLockingNotSupported, e:
1119
return FailedSmartServerResponse(
1120
('TokenLockingNotSupported', ))
1122
reconciler = repository.reconcile()
1126
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1127
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1129
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1132
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1133
"""Pack a repository.
1138
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1139
self._repository = repository
1140
self._lock_token = lock_token
1141
if clean_obsolete_packs == 'True':
1142
self._clean_obsolete_packs = True
1144
self._clean_obsolete_packs = False
1147
def do_body(self, body_bytes):
1148
if body_bytes == "":
1151
hint = body_bytes.splitlines()
1152
self._repository.lock_write(token=self._lock_token)
1154
self._repository.pack(hint, self._clean_obsolete_packs)
1156
self._repository.unlock()
1157
return SuccessfulSmartServerResponse(("ok", ), )
1160
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1161
"""Iterate over the contents of files.
1163
The client sends a list of desired files to stream, one
1164
per line, and as tuples of file id and revision, separated by
1167
The server replies with a stream. Each entry is preceded by a header,
1168
which can either be:
1170
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1171
list sent by the client. This header is followed by the contents of
1172
the file, bzip2-compressed.
1173
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1174
The client can then raise an appropriate RevisionNotPresent error
1175
or check its fallback repositories.
1180
def body_stream(self, repository, desired_files):
1181
self._repository.lock_read()
1184
for i, key in enumerate(desired_files):
1186
for record in repository.texts.get_record_stream(text_keys,
1188
identifier = text_keys[record.key]
1189
if record.storage_kind == 'absent':
1190
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1191
record.key[1], identifier)
1192
# FIXME: Way to abort early?
1194
yield "ok\0%d\n" % identifier
1195
compressor = zlib.compressobj()
1196
for bytes in record.get_bytes_as('chunked'):
1197
data = compressor.compress(bytes)
1200
data = compressor.flush()
1204
self._repository.unlock()
1206
def do_body(self, body_bytes):
1208
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1209
return SuccessfulSmartServerResponse(('ok', ),
1210
body_stream=self.body_stream(self._repository, desired_files))
1212
def do_repository_request(self, repository):
1213
# Signal that we want a body
1217
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1218
"""Stream a list of revisions.
1220
The client sends a list of newline-separated revision ids in the
1221
body of the request and the server replies with the serializer format,
1222
and a stream of bzip2-compressed revision texts (using the specified
1225
Any revisions the server does not have are omitted from the stream.
1230
def do_repository_request(self, repository):
1231
self._repository = repository
1232
# Signal there is a body
1235
def do_body(self, body_bytes):
1236
revision_ids = body_bytes.split("\n")
1237
return SuccessfulSmartServerResponse(
1238
('ok', self._repository.get_serializer_format()),
1239
body_stream=self.body_stream(self._repository, revision_ids))
1241
def body_stream(self, repository, revision_ids):
1242
self._repository.lock_read()
1244
for record in repository.revisions.get_record_stream(
1245
[(revid,) for revid in revision_ids], 'unordered', True):
1246
if record.storage_kind == 'absent':
1248
yield zlib.compress(record.get_bytes_as('fulltext'))
1250
self._repository.unlock()
1253
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1254
"""Get the inventory deltas for a set of revision ids.
1256
This accepts a list of revision ids, and then sends a chain
1257
of deltas for the inventories of those revisions. The first
1258
revision will be empty.
1260
The server writes back zlibbed serialized inventory deltas,
1261
in the ordering specified. The base for each delta is the
1262
inventory generated by the previous delta.
1267
def _inventory_delta_stream(self, repository, ordering, revids):
1268
prev_inv = _mod_inventory.Inventory(root_id=None,
1269
revision_id=_mod_revision.NULL_REVISION)
1270
serializer = inventory_delta.InventoryDeltaSerializer(
1271
repository.supports_rich_root(),
1272
repository._format.supports_tree_reference)
1273
repository.lock_read()
1275
for inv, revid in repository._iter_inventories(revids, ordering):
1278
inv_delta = inv._make_delta(prev_inv)
1279
lines = serializer.delta_to_lines(
1280
prev_inv.revision_id, inv.revision_id, inv_delta)
1281
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1286
def body_stream(self, repository, ordering, revids):
1287
substream = self._inventory_delta_stream(repository,
1289
return _stream_to_byte_stream([('inventory-deltas', substream)],
1292
def do_body(self, body_bytes):
1293
return SuccessfulSmartServerResponse(('ok', ),
1294
body_stream=self.body_stream(self._repository, self._ordering,
1295
body_bytes.splitlines()))
1297
def do_repository_request(self, repository, ordering):
1298
if ordering == 'unordered':
1299
# inventory deltas for a topologically sorted stream
1300
# are likely to be smaller
1301
ordering = 'topological'
1302
self._ordering = ordering
1303
# Signal that we want a body