1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implementations."""
19
from __future__ import absolute_import
32
estimate_compressed_size,
33
inventory as _mod_inventory,
41
from bzrlib.bzrdir import BzrDir
42
from bzrlib.smart.request import (
43
FailedSmartServerResponse,
45
SuccessfulSmartServerResponse,
47
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
48
from bzrlib import revision as _mod_revision
49
from bzrlib.versionedfile import (
50
ChunkedContentFactory,
52
record_to_fulltext_bytes,
56
class SmartServerRepositoryRequest(SmartServerRequest):
57
"""Common base class for Repository requests."""
59
def do(self, path, *args):
60
"""Execute a repository request.
62
All Repository requests take a path to the repository as their first
63
argument. The repository must be at the exact path given by the
64
client - no searching is done.
66
The actual logic is delegated to self.do_repository_request.
68
:param client_path: The path for the repository as received from the
70
:return: A SmartServerResponse from self.do_repository_request().
72
transport = self.transport_from_client_path(path)
73
bzrdir = BzrDir.open_from_transport(transport)
74
# Save the repository for use with do_body.
75
self._repository = bzrdir.open_repository()
76
return self.do_repository_request(self._repository, *args)
78
def do_repository_request(self, repository, *args):
79
"""Override to provide an implementation for a verb."""
80
# No-op for verbs that take bodies (None as a result indicates a body
84
def recreate_search(self, repository, search_bytes, discard_excess=False):
85
"""Recreate a search from its serialised form.
87
:param discard_excess: If True, and the search refers to data we don't
88
have, just silently accept that fact - the verb calling
89
recreate_search trusts that clients will look for missing things
90
they expected and get it from elsewhere.
92
if search_bytes == 'everything':
93
return vf_search.EverythingResult(repository), None
94
lines = search_bytes.split('\n')
95
if lines[0] == 'ancestry-of':
97
search_result = vf_search.PendingAncestryResult(heads, repository)
98
return search_result, None
99
elif lines[0] == 'search':
100
return self.recreate_search_from_recipe(repository, lines[1:],
101
discard_excess=discard_excess)
103
return (None, FailedSmartServerResponse(('BadSearch',)))
105
def recreate_search_from_recipe(self, repository, lines,
106
discard_excess=False):
107
"""Recreate a specific revision search (vs a from-tip search).
109
:param discard_excess: If True, and the search refers to data we don't
110
have, just silently accept that fact - the verb calling
111
recreate_search trusts that clients will look for missing things
112
they expected and get it from elsewhere.
114
start_keys = set(lines[0].split(' '))
115
exclude_keys = set(lines[1].split(' '))
116
revision_count = int(lines[2])
117
repository.lock_read()
119
search = repository.get_graph()._make_breadth_first_searcher(
123
next_revs = search.next()
124
except StopIteration:
126
search.stop_searching_any(exclude_keys.intersection(next_revs))
127
(started_keys, excludes, included_keys) = search.get_state()
128
if (not discard_excess and len(included_keys) != revision_count):
129
# we got back a different amount of data than expected, this
130
# gets reported as NoSuchRevision, because less revisions
131
# indicates missing revisions, and more should never happen as
132
# the excludes list considers ghosts and ensures that ghost
133
# filling races are not a problem.
134
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
135
search_result = vf_search.SearchResult(started_keys, excludes,
136
len(included_keys), included_keys)
137
return (search_result, None)
142
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
143
"""Calls self.do_readlocked_repository_request."""
145
def do_repository_request(self, repository, *args):
146
"""Read lock a repository for do_readlocked_repository_request."""
147
repository.lock_read()
149
return self.do_readlocked_repository_request(repository, *args)
154
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
155
"""Break a repository lock."""
157
def do_repository_request(self, repository):
158
repository.break_lock()
159
return SuccessfulSmartServerResponse(('ok', ))
164
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
165
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
167
no_extra_results = False
169
def do_repository_request(self, repository, *revision_ids):
170
"""Get parent details for some revisions.
172
All the parents for revision_ids are returned. Additionally up to 64KB
173
of additional parent data found by performing a breadth first search
174
from revision_ids is returned. The verb takes a body containing the
175
current search state, see do_body for details.
177
If 'include-missing:' is in revision_ids, ghosts encountered in the
178
graph traversal for getting parent data are included in the result with
179
a prefix of 'missing:'.
181
:param repository: The repository to query in.
182
:param revision_ids: The utf8 encoded revision_id to answer for.
184
self._revision_ids = revision_ids
185
return None # Signal that we want a body.
187
def do_body(self, body_bytes):
188
"""Process the current search state and perform the parent lookup.
190
:return: A smart server response where the body contains an utf8
191
encoded flattened list of the parents of the revisions (the same
192
format as Repository.get_revision_graph) which has been bz2
195
repository = self._repository
196
repository.lock_read()
198
return self._do_repository_request(body_bytes)
202
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
203
include_missing, max_size=65536):
206
estimator = estimate_compressed_size.ZLibEstimator(max_size)
207
next_revs = revision_ids
208
first_loop_done = False
210
queried_revs.update(next_revs)
211
parent_map = repo_graph.get_parent_map(next_revs)
212
current_revs = next_revs
214
for revision_id in current_revs:
216
parents = parent_map.get(revision_id)
217
if parents is not None:
218
# adjust for the wire
219
if parents == (_mod_revision.NULL_REVISION,):
221
# prepare the next query
222
next_revs.update(parents)
223
encoded_id = revision_id
226
encoded_id = "missing:" + revision_id
228
if (revision_id not in client_seen_revs and
229
(not missing_rev or include_missing)):
230
# Client does not have this revision, give it to it.
231
# add parents to the result
232
result[encoded_id] = parents
233
# Approximate the serialized cost of this revision_id.
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
236
# get all the directly asked for parents, and then flesh out to
237
# 64K (compressed) or so. We do one level of depth at a time to
238
# stay in sync with the client. The 250000 magic number is
239
# estimated compression ratio taken from bzr.dev itself.
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
246
# don't query things we've already queried
247
next_revs = next_revs.difference(queried_revs)
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
272
# sorting trivially puts lexographically similar revision ids together.
275
for revision, parents in sorted(result.items()):
276
lines.append(' '.join((revision, ) + tuple(parents)))
278
return SuccessfulSmartServerResponse(
279
('ok', ), bz2.compress('\n'.join(lines)))
282
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
284
def do_readlocked_repository_request(self, repository, revision_id):
285
"""Return the result of repository.get_revision_graph(revision_id).
287
Deprecated as of bzr 1.4, but supported for older clients.
289
:param repository: The repository to query in.
290
:param revision_id: The utf8 encoded revision_id to get a graph from.
291
:return: A smart server response where the body contains an utf8
292
encoded flattened list of the revision graph.
298
graph = repository.get_graph()
300
search_ids = [revision_id]
302
search_ids = repository.all_revision_ids()
303
search = graph._make_breadth_first_searcher(search_ids)
304
transitive_ids = set()
305
map(transitive_ids.update, list(search))
306
parent_map = graph.get_parent_map(transitive_ids)
307
revision_graph = _strip_NULL_ghosts(parent_map)
308
if revision_id and revision_id not in revision_graph:
309
# Note that we return an empty body, rather than omitting the body.
310
# This way the client knows that it can always expect to find a body
311
# in the response for this method, even in the error case.
312
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
314
for revision, parents in revision_graph.items():
315
lines.append(' '.join((revision, ) + tuple(parents)))
317
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
320
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
322
def do_readlocked_repository_request(self, repository, revno,
324
"""Find the revid for a given revno, given a known revno/revid pair.
329
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
330
except errors.RevisionNotPresent, err:
331
if err.revision_id != known_pair[1]:
332
raise AssertionError(
333
'get_rev_id_for_revno raised RevisionNotPresent for '
334
'non-initial revision: ' + err.revision_id)
335
return FailedSmartServerResponse(
336
('nosuchrevision', err.revision_id))
338
return SuccessfulSmartServerResponse(('ok', result))
340
earliest_revno, earliest_revid = result
341
return SuccessfulSmartServerResponse(
342
('history-incomplete', earliest_revno, earliest_revid))
345
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
347
def do_repository_request(self, repository):
348
"""Return the serializer format for this repository.
352
:param repository: The repository to query
353
:return: A smart server response ('ok', FORMAT)
355
serializer = repository.get_serializer_format()
356
return SuccessfulSmartServerResponse(('ok', serializer))
359
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
361
def do_repository_request(self, repository, revision_id):
362
"""Return ok if a specific revision is in the repository at path.
364
:param repository: The repository to query in.
365
:param revision_id: The utf8 encoded revision_id to lookup.
366
:return: A smart server response of ('yes', ) if the revision is
367
present. ('no', ) if it is missing.
369
if repository.has_revision(revision_id):
370
return SuccessfulSmartServerResponse(('yes', ))
372
return SuccessfulSmartServerResponse(('no', ))
375
class SmartServerRequestHasSignatureForRevisionId(
376
SmartServerRepositoryRequest):
378
def do_repository_request(self, repository, revision_id):
379
"""Return ok if a signature is present for a revision.
381
Introduced in bzr 2.5.0.
383
:param repository: The repository to query in.
384
:param revision_id: The utf8 encoded revision_id to lookup.
385
:return: A smart server response of ('yes', ) if a
386
signature for the revision is present,
387
('no', ) if it is missing.
390
if repository.has_signature_for_revision_id(revision_id):
391
return SuccessfulSmartServerResponse(('yes', ))
393
return SuccessfulSmartServerResponse(('no', ))
394
except errors.NoSuchRevision:
395
return FailedSmartServerResponse(
396
('nosuchrevision', revision_id))
399
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
401
def do_repository_request(self, repository, revid, committers):
402
"""Return the result of repository.gather_stats().
404
:param repository: The repository to query in.
405
:param revid: utf8 encoded rev id or an empty string to indicate None
406
:param committers: 'yes' or 'no'.
408
:return: A SmartServerResponse ('ok',), a encoded body looking like
411
latestrev: 345.700 3600
414
But containing only fields returned by the gather_stats() call
417
decoded_revision_id = None
419
decoded_revision_id = revid
420
if committers == 'yes':
421
decoded_committers = True
423
decoded_committers = None
425
stats = repository.gather_stats(decoded_revision_id,
427
except errors.NoSuchRevision:
428
return FailedSmartServerResponse(('nosuchrevision', revid))
431
if stats.has_key('committers'):
432
body += 'committers: %d\n' % stats['committers']
433
if stats.has_key('firstrev'):
434
body += 'firstrev: %.3f %d\n' % stats['firstrev']
435
if stats.has_key('latestrev'):
436
body += 'latestrev: %.3f %d\n' % stats['latestrev']
437
if stats.has_key('revisions'):
438
body += 'revisions: %d\n' % stats['revisions']
439
if stats.has_key('size'):
440
body += 'size: %d\n' % stats['size']
442
return SuccessfulSmartServerResponse(('ok', ), body)
445
class SmartServerRepositoryGetRevisionSignatureText(
446
SmartServerRepositoryRequest):
447
"""Return the signature text of a revision.
452
def do_repository_request(self, repository, revision_id):
453
"""Return the result of repository.get_signature_text().
455
:param repository: The repository to query in.
456
:return: A smart server response of with the signature text as
460
text = repository.get_signature_text(revision_id)
461
except errors.NoSuchRevision, err:
462
return FailedSmartServerResponse(
463
('nosuchrevision', err.revision))
464
return SuccessfulSmartServerResponse(('ok', ), text)
467
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
469
def do_repository_request(self, repository):
470
"""Return the result of repository.is_shared().
472
:param repository: The repository to query in.
473
:return: A smart server response of ('yes', ) if the repository is
474
shared, and ('no', ) if it is not.
476
if repository.is_shared():
477
return SuccessfulSmartServerResponse(('yes', ))
479
return SuccessfulSmartServerResponse(('no', ))
482
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
484
def do_repository_request(self, repository):
485
"""Return the result of repository.make_working_trees().
487
Introduced in bzr 2.5.0.
489
:param repository: The repository to query in.
490
:return: A smart server response of ('yes', ) if the repository uses
491
working trees, and ('no', ) if it is not.
493
if repository.make_working_trees():
494
return SuccessfulSmartServerResponse(('yes', ))
496
return SuccessfulSmartServerResponse(('no', ))
499
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
501
def do_repository_request(self, repository, token=''):
502
# XXX: this probably should not have a token.
506
token = repository.lock_write(token=token).repository_token
507
except errors.LockContention, e:
508
return FailedSmartServerResponse(('LockContention',))
509
except errors.UnlockableTransport:
510
return FailedSmartServerResponse(('UnlockableTransport',))
511
except errors.LockFailed, e:
512
return FailedSmartServerResponse(('LockFailed',
513
str(e.lock), str(e.why)))
514
if token is not None:
515
repository.leave_lock_in_place()
519
return SuccessfulSmartServerResponse(('ok', token))
522
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
524
def do_repository_request(self, repository, to_network_name):
525
"""Get a stream for inserting into a to_format repository.
527
The request body is 'search_bytes', a description of the revisions
530
In 2.3 this verb added support for search_bytes == 'everything'. Older
531
implementations will respond with a BadSearch error, and clients should
532
catch this and fallback appropriately.
534
:param repository: The repository to stream from.
535
:param to_network_name: The network name of the format of the target
538
self._to_format = network_format_registry.get(to_network_name)
539
if self._should_fake_unknown():
540
return FailedSmartServerResponse(
541
('UnknownMethod', 'Repository.get_stream'))
542
return None # Signal that we want a body.
544
def _should_fake_unknown(self):
545
"""Return True if we should return UnknownMethod to the client.
547
This is a workaround for bugs in pre-1.19 clients that claim to
548
support receiving streams of CHK repositories. The pre-1.19 client
549
expects inventory records to be serialized in the format defined by
550
to_network_name, but in pre-1.19 (at least) that format definition
551
tries to use the xml5 serializer, which does not correctly handle
552
rich-roots. After 1.19 the client can also accept inventory-deltas
553
(which avoids this issue), and those clients will use the
554
Repository.get_stream_1.19 verb instead of this one.
555
So: if this repository is CHK, and the to_format doesn't match,
556
we should just fake an UnknownSmartMethod error so that the client
557
will fallback to VFS, rather than sending it a stream we know it
560
from_format = self._repository._format
561
to_format = self._to_format
562
if not from_format.supports_chks:
563
# Source not CHK: that's ok
565
if (to_format.supports_chks and
566
from_format.repository_class is to_format.repository_class and
567
from_format._serializer == to_format._serializer):
568
# Source is CHK, but target matches: that's ok
569
# (e.g. 2a->2a, or CHK2->2a)
571
# Source is CHK, and target is not CHK or incompatible CHK. We can't
572
# generate a compatible stream.
575
def do_body(self, body_bytes):
576
repository = self._repository
577
repository.lock_read()
579
search_result, error = self.recreate_search(repository, body_bytes,
581
if error is not None:
584
source = repository._get_source(self._to_format)
585
stream = source.get_stream(search_result)
587
exc_info = sys.exc_info()
589
# On non-error, unlocking is done by the body stream handler.
592
raise exc_info[0], exc_info[1], exc_info[2]
593
return SuccessfulSmartServerResponse(('ok',),
594
body_stream=self.body_stream(stream, repository))
596
def body_stream(self, stream, repository):
597
byte_stream = _stream_to_byte_stream(stream, repository._format)
599
for bytes in byte_stream:
601
except errors.RevisionNotPresent, e:
602
# This shouldn't be able to happen, but as we don't buffer
603
# everything it can in theory happen.
605
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
610
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
611
"""The same as Repository.get_stream, but will return stream CHK formats to
614
See SmartServerRepositoryGetStream._should_fake_unknown.
619
def _should_fake_unknown(self):
620
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
624
def _stream_to_byte_stream(stream, src_format):
625
"""Convert a record stream to a self delimited byte stream."""
626
pack_writer = pack.ContainerSerialiser()
627
yield pack_writer.begin()
628
yield pack_writer.bytes_record(src_format.network_name(), '')
629
for substream_type, substream in stream:
630
for record in substream:
631
if record.storage_kind in ('chunked', 'fulltext'):
632
serialised = record_to_fulltext_bytes(record)
633
elif record.storage_kind == 'absent':
634
raise ValueError("Absent factory for %s" % (record.key,))
636
serialised = record.get_bytes_as(record.storage_kind)
638
# Some streams embed the whole stream into the wire
639
# representation of the first record, which means that
640
# later records have no wire representation: we skip them.
641
yield pack_writer.bytes_record(serialised, [(substream_type,)])
642
yield pack_writer.end()
645
class _ByteStreamDecoder(object):
646
"""Helper for _byte_stream_to_stream.
648
The expected usage of this class is via the function _byte_stream_to_stream
649
which creates a _ByteStreamDecoder, pops off the stream format and then
650
yields the output of record_stream(), the main entry point to
653
Broadly this class has to unwrap two layers of iterators:
657
This is complicated by wishing to return type, iterator_for_type, but
658
getting the data for iterator_for_type when we find out type: we can't
659
simply pass a generator down to the NetworkRecordStream parser, instead
660
we have a little local state to seed each NetworkRecordStream instance,
661
and gather the type that we'll be yielding.
663
:ivar byte_stream: The byte stream being decoded.
664
:ivar stream_decoder: A pack parser used to decode the bytestream
665
:ivar current_type: The current type, used to join adjacent records of the
666
same type into a single stream.
667
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
670
def __init__(self, byte_stream, record_counter):
671
"""Create a _ByteStreamDecoder."""
672
self.stream_decoder = pack.ContainerPushParser()
673
self.current_type = None
674
self.first_bytes = None
675
self.byte_stream = byte_stream
676
self._record_counter = record_counter
679
def iter_stream_decoder(self):
680
"""Iterate the contents of the pack from stream_decoder."""
681
# dequeue pending items
682
for record in self.stream_decoder.read_pending_records():
684
# Pull bytes of the wire, decode them to records, yield those records.
685
for bytes in self.byte_stream:
686
self.stream_decoder.accept_bytes(bytes)
687
for record in self.stream_decoder.read_pending_records():
690
def iter_substream_bytes(self):
691
if self.first_bytes is not None:
692
yield self.first_bytes
693
# If we run out of pack records, single the outer layer to stop.
694
self.first_bytes = None
695
for record in self.iter_pack_records:
696
record_names, record_bytes = record
697
record_name, = record_names
698
substream_type = record_name[0]
699
if substream_type != self.current_type:
700
# end of a substream, seed the next substream.
701
self.current_type = substream_type
702
self.first_bytes = record_bytes
706
def record_stream(self):
707
"""Yield substream_type, substream from the byte stream."""
708
def wrap_and_count(pb, rc, substream):
709
"""Yield records from stream while showing progress."""
712
if self.current_type != 'revisions' and self.key_count != 0:
713
# As we know the number of revisions now (in self.key_count)
714
# we can setup and use record_counter (rc).
715
if not rc.is_initialized():
716
rc.setup(self.key_count, self.key_count)
717
for record in substream.read():
719
if rc.is_initialized() and counter == rc.STEP:
720
rc.increment(counter)
721
pb.update('Estimate', rc.current, rc.max)
723
if self.current_type == 'revisions':
724
# Total records is proportional to number of revs
725
# to fetch. With remote, we used self.key_count to
726
# track the number of revs. Once we have the revs
727
# counts in self.key_count, the progress bar changes
728
# from 'Estimating..' to 'Estimate' above.
730
if counter == rc.STEP:
731
pb.update('Estimating..', self.key_count)
737
pb = ui.ui_factory.nested_progress_bar()
738
rc = self._record_counter
740
# Make and consume sub generators, one per substream type:
741
while self.first_bytes is not None:
742
substream = NetworkRecordStream(self.iter_substream_bytes())
743
# after substream is fully consumed, self.current_type is set
744
# to the next type, and self.first_bytes is set to the matching
746
yield self.current_type, wrap_and_count(pb, rc, substream)
749
pb.update('Done', rc.max, rc.max)
752
def seed_state(self):
753
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
754
# Set a single generator we can use to get data from the pack stream.
755
self.iter_pack_records = self.iter_stream_decoder()
756
# Seed the very first subiterator with content; after this each one
758
list(self.iter_substream_bytes())
761
def _byte_stream_to_stream(byte_stream, record_counter=None):
762
"""Convert a byte stream into a format and a stream.
764
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
765
:return: (RepositoryFormat, stream_generator)
767
decoder = _ByteStreamDecoder(byte_stream, record_counter)
768
for bytes in byte_stream:
769
decoder.stream_decoder.accept_bytes(bytes)
770
for record in decoder.stream_decoder.read_pending_records(max=1):
771
record_names, src_format_name = record
772
src_format = network_format_registry.get(src_format_name)
773
return src_format, decoder.record_stream()
776
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
778
def do_repository_request(self, repository, token):
780
repository.lock_write(token=token)
781
except errors.TokenMismatch, e:
782
return FailedSmartServerResponse(('TokenMismatch',))
783
repository.dont_leave_lock_in_place()
785
return SuccessfulSmartServerResponse(('ok',))
788
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
789
"""Get the physical lock status for a repository.
794
def do_repository_request(self, repository):
795
if repository.get_physical_lock_status():
796
return SuccessfulSmartServerResponse(('yes', ))
798
return SuccessfulSmartServerResponse(('no', ))
801
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
803
def do_repository_request(self, repository, str_bool_new_value):
804
if str_bool_new_value == 'True':
808
repository.set_make_working_trees(new_value)
809
return SuccessfulSmartServerResponse(('ok',))
812
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
813
"""Get the raw repository files as a tarball.
815
The returned tarball contains a .bzr control directory which in turn
816
contains a repository.
818
This takes one parameter, compression, which currently must be
821
This is used to implement the Repository.copy_content_into operation.
824
def do_repository_request(self, repository, compression):
825
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
827
controldir_name = tmp_dirname + '/.bzr'
828
return self._tarfile_response(controldir_name, compression)
830
osutils.rmtree(tmp_dirname)
832
def _copy_to_tempdir(self, from_repo):
833
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
834
tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
835
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
836
from_repo.copy_content_into(tmp_repo)
837
return tmp_dirname, tmp_repo
839
def _tarfile_response(self, tmp_dirname, compression):
840
temp = tempfile.NamedTemporaryFile()
842
self._tarball_of_dir(tmp_dirname, compression, temp.file)
843
# all finished; write the tempfile out to the network
845
return SuccessfulSmartServerResponse(('ok',), temp.read())
846
# FIXME: Don't read the whole thing into memory here; rather stream
847
# it out from the file onto the network. mbp 20070411
851
def _tarball_of_dir(self, dirname, compression, ofile):
853
filename = os.path.basename(ofile.name)
854
tarball = tarfile.open(fileobj=ofile, name=filename,
855
mode='w|' + compression)
857
# The tarball module only accepts ascii names, and (i guess)
858
# packs them with their 8bit names. We know all the files
859
# within the repository have ASCII names so the should be safe
861
dirname = dirname.encode(sys.getfilesystemencoding())
862
# python's tarball module includes the whole path by default so
864
if not dirname.endswith('.bzr'):
865
raise ValueError(dirname)
866
tarball.add(dirname, '.bzr') # recursive by default
871
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
872
"""Insert a record stream from a RemoteSink into a repository.
874
This gets bytes pushed to it by the network infrastructure and turns that
875
into a bytes iterator using a thread. That is then processed by
876
_byte_stream_to_stream.
881
def do_repository_request(self, repository, resume_tokens, lock_token):
882
"""StreamSink.insert_stream for a remote repository."""
883
repository.lock_write(token=lock_token)
884
self.do_insert_stream_request(repository, resume_tokens)
886
def do_insert_stream_request(self, repository, resume_tokens):
887
tokens = [token for token in resume_tokens.split(' ') if token]
889
self.repository = repository
890
self.queue = Queue.Queue()
891
self.insert_thread = threading.Thread(target=self._inserter_thread)
892
self.insert_thread.start()
894
def do_chunk(self, body_stream_chunk):
895
self.queue.put(body_stream_chunk)
897
def _inserter_thread(self):
899
src_format, stream = _byte_stream_to_stream(
900
self.blocking_byte_stream())
901
self.insert_result = self.repository._get_sink().insert_stream(
902
stream, src_format, self.tokens)
903
self.insert_ok = True
905
self.insert_exception = sys.exc_info()
906
self.insert_ok = False
908
def blocking_byte_stream(self):
910
bytes = self.queue.get()
911
if bytes is StopIteration:
917
self.queue.put(StopIteration)
918
if self.insert_thread is not None:
919
self.insert_thread.join()
920
if not self.insert_ok:
921
exc_info = self.insert_exception
922
raise exc_info[0], exc_info[1], exc_info[2]
923
write_group_tokens, missing_keys = self.insert_result
924
if write_group_tokens or missing_keys:
925
# bzip needed? missing keys should typically be a small set.
926
# Should this be a streaming body response ?
927
missing_keys = sorted(missing_keys)
928
bytes = bencode.bencode((write_group_tokens, missing_keys))
929
self.repository.unlock()
930
return SuccessfulSmartServerResponse(('missing-basis', bytes))
932
self.repository.unlock()
933
return SuccessfulSmartServerResponse(('ok', ))
936
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
937
"""Insert a record stream from a RemoteSink into a repository.
939
Same as SmartServerRepositoryInsertStreamLocked, except:
940
- the lock token argument is optional
941
- servers that implement this verb accept 'inventory-delta' records in the
947
def do_repository_request(self, repository, resume_tokens, lock_token=None):
948
"""StreamSink.insert_stream for a remote repository."""
949
SmartServerRepositoryInsertStreamLocked.do_repository_request(
950
self, repository, resume_tokens, lock_token)
953
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
954
"""Insert a record stream from a RemoteSink into an unlocked repository.
956
This is the same as SmartServerRepositoryInsertStreamLocked, except it
957
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
958
like pack format) repository.
963
def do_repository_request(self, repository, resume_tokens):
964
"""StreamSink.insert_stream for a remote repository."""
965
repository.lock_write()
966
self.do_insert_stream_request(repository, resume_tokens)
969
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
970
"""Add a revision signature text.
975
def do_repository_request(self, repository, lock_token, revision_id,
976
*write_group_tokens):
977
"""Add a revision signature text.
979
:param repository: Repository to operate on
980
:param lock_token: Lock token
981
:param revision_id: Revision for which to add signature
982
:param write_group_tokens: Write group tokens
984
self._lock_token = lock_token
985
self._revision_id = revision_id
986
self._write_group_tokens = write_group_tokens
989
def do_body(self, body_bytes):
990
"""Add a signature text.
992
:param body_bytes: GPG signature text
993
:return: SuccessfulSmartServerResponse with arguments 'ok' and
994
the list of new write group tokens.
996
self._repository.lock_write(token=self._lock_token)
998
self._repository.resume_write_group(self._write_group_tokens)
1000
self._repository.add_signature_text(self._revision_id,
1003
new_write_group_tokens = self._repository.suspend_write_group()
1005
self._repository.unlock()
1006
return SuccessfulSmartServerResponse(
1007
('ok', ) + tuple(new_write_group_tokens))
1010
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1011
"""Start a write group.
1016
def do_repository_request(self, repository, lock_token):
1017
"""Start a write group."""
1018
repository.lock_write(token=lock_token)
1020
repository.start_write_group()
1022
tokens = repository.suspend_write_group()
1023
except errors.UnsuspendableWriteGroup:
1024
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1027
return SuccessfulSmartServerResponse(('ok', tokens))
1030
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1031
"""Commit a write group.
1036
def do_repository_request(self, repository, lock_token,
1037
write_group_tokens):
1038
"""Commit a write group."""
1039
repository.lock_write(token=lock_token)
1042
repository.resume_write_group(write_group_tokens)
1043
except errors.UnresumableWriteGroup, e:
1044
return FailedSmartServerResponse(
1045
('UnresumableWriteGroup', e.write_groups, e.reason))
1047
repository.commit_write_group()
1049
write_group_tokens = repository.suspend_write_group()
1050
# FIXME JRV 2011-11-19: What if the write_group_tokens
1055
return SuccessfulSmartServerResponse(('ok', ))
1058
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1059
"""Abort a write group.
1064
def do_repository_request(self, repository, lock_token, write_group_tokens):
1065
"""Abort a write group."""
1066
repository.lock_write(token=lock_token)
1069
repository.resume_write_group(write_group_tokens)
1070
except errors.UnresumableWriteGroup, e:
1071
return FailedSmartServerResponse(
1072
('UnresumableWriteGroup', e.write_groups, e.reason))
1073
repository.abort_write_group()
1076
return SuccessfulSmartServerResponse(('ok', ))
1079
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1080
"""Check that a write group is still valid.
1085
def do_repository_request(self, repository, lock_token, write_group_tokens):
1086
"""Abort a write group."""
1087
repository.lock_write(token=lock_token)
1090
repository.resume_write_group(write_group_tokens)
1091
except errors.UnresumableWriteGroup, e:
1092
return FailedSmartServerResponse(
1093
('UnresumableWriteGroup', e.write_groups, e.reason))
1095
repository.suspend_write_group()
1098
return SuccessfulSmartServerResponse(('ok', ))
1101
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1102
"""Retrieve all of the revision ids in a repository.
1107
def do_repository_request(self, repository):
1108
revids = repository.all_revision_ids()
1109
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1112
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1113
"""Reconcile a repository.
1118
def do_repository_request(self, repository, lock_token):
1120
repository.lock_write(token=lock_token)
1121
except errors.TokenLockingNotSupported, e:
1122
return FailedSmartServerResponse(
1123
('TokenLockingNotSupported', ))
1125
reconciler = repository.reconcile()
1129
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1130
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1132
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1135
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1136
"""Pack a repository.
1141
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1142
self._repository = repository
1143
self._lock_token = lock_token
1144
if clean_obsolete_packs == 'True':
1145
self._clean_obsolete_packs = True
1147
self._clean_obsolete_packs = False
1150
def do_body(self, body_bytes):
1151
if body_bytes == "":
1154
hint = body_bytes.splitlines()
1155
self._repository.lock_write(token=self._lock_token)
1157
self._repository.pack(hint, self._clean_obsolete_packs)
1159
self._repository.unlock()
1160
return SuccessfulSmartServerResponse(("ok", ), )
1163
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1164
"""Iterate over the contents of files.
1166
The client sends a list of desired files to stream, one
1167
per line, and as tuples of file id and revision, separated by
1170
The server replies with a stream. Each entry is preceded by a header,
1171
which can either be:
1173
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1174
list sent by the client. This header is followed by the contents of
1175
the file, bzip2-compressed.
1176
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1177
The client can then raise an appropriate RevisionNotPresent error
1178
or check its fallback repositories.
1183
def body_stream(self, repository, desired_files):
1184
self._repository.lock_read()
1187
for i, key in enumerate(desired_files):
1189
for record in repository.texts.get_record_stream(text_keys,
1191
identifier = text_keys[record.key]
1192
if record.storage_kind == 'absent':
1193
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1194
record.key[1], identifier)
1195
# FIXME: Way to abort early?
1197
yield "ok\0%d\n" % identifier
1198
compressor = zlib.compressobj()
1199
for bytes in record.get_bytes_as('chunked'):
1200
data = compressor.compress(bytes)
1203
data = compressor.flush()
1207
self._repository.unlock()
1209
def do_body(self, body_bytes):
1211
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1212
return SuccessfulSmartServerResponse(('ok', ),
1213
body_stream=self.body_stream(self._repository, desired_files))
1215
def do_repository_request(self, repository):
1216
# Signal that we want a body
1220
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1221
"""Stream a list of revisions.
1223
The client sends a list of newline-separated revision ids in the
1224
body of the request and the server replies with the serializer format,
1225
and a stream of bzip2-compressed revision texts (using the specified
1228
Any revisions the server does not have are omitted from the stream.
1233
def do_repository_request(self, repository):
1234
self._repository = repository
1235
# Signal there is a body
1238
def do_body(self, body_bytes):
1239
revision_ids = body_bytes.split("\n")
1240
return SuccessfulSmartServerResponse(
1241
('ok', self._repository.get_serializer_format()),
1242
body_stream=self.body_stream(self._repository, revision_ids))
1244
def body_stream(self, repository, revision_ids):
1245
self._repository.lock_read()
1247
for record in repository.revisions.get_record_stream(
1248
[(revid,) for revid in revision_ids], 'unordered', True):
1249
if record.storage_kind == 'absent':
1251
yield zlib.compress(record.get_bytes_as('fulltext'))
1253
self._repository.unlock()
1256
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1257
"""Get the inventory deltas for a set of revision ids.
1259
This accepts a list of revision ids, and then sends a chain
1260
of deltas for the inventories of those revisions. The first
1261
revision will be empty.
1263
The server writes back zlibbed serialized inventory deltas,
1264
in the ordering specified. The base for each delta is the
1265
inventory generated by the previous delta.
1270
def _inventory_delta_stream(self, repository, ordering, revids):
1271
prev_inv = _mod_inventory.Inventory(root_id=None,
1272
revision_id=_mod_revision.NULL_REVISION)
1273
serializer = inventory_delta.InventoryDeltaSerializer(
1274
repository.supports_rich_root(),
1275
repository._format.supports_tree_reference)
1276
repository.lock_read()
1278
for inv, revid in repository._iter_inventories(revids, ordering):
1281
inv_delta = inv._make_delta(prev_inv)
1282
lines = serializer.delta_to_lines(
1283
prev_inv.revision_id, inv.revision_id, inv_delta)
1284
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1289
def body_stream(self, repository, ordering, revids):
1290
substream = self._inventory_delta_stream(repository,
1292
return _stream_to_byte_stream([('inventory-deltas', substream)],
1295
def do_body(self, body_bytes):
1296
return SuccessfulSmartServerResponse(('ok', ),
1297
body_stream=self.body_stream(self._repository, self._ordering,
1298
body_bytes.splitlines()))
1300
def do_repository_request(self, repository, ordering):
1301
if ordering == 'unordered':
1302
# inventory deltas for a topologically sorted stream
1303
# are likely to be smaller
1304
ordering = 'topological'
1305
self._ordering = ordering
1306
# Signal that we want a body