35
59
def do(self, path, *args):
36
60
"""Execute a repository request.
38
The repository must be at the exact path - no searching is done.
62
All Repository requests take a path to the repository as their first
63
argument. The repository must be at the exact path given by the
64
client - no searching is done.
40
66
The actual logic is delegated to self.do_repository_request.
42
:param path: The path for the repository.
43
:return: A smart server from self.do_repository_request().
68
:param client_path: The path for the repository as received from the
70
:return: A SmartServerResponse from self.do_repository_request().
45
transport = self._backing_transport.clone(path)
72
transport = self.transport_from_client_path(path)
46
73
bzrdir = BzrDir.open_from_transport(transport)
47
repository = bzrdir.open_repository()
48
return self.do_repository_request(repository, *args)
51
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
53
def do_repository_request(self, repository, revision_id):
74
# Save the repository for use with do_body.
75
self._repository = bzrdir.open_repository()
76
return self.do_repository_request(self._repository, *args)
78
def do_repository_request(self, repository, *args):
79
"""Override to provide an implementation for a verb."""
80
# No-op for verbs that take bodies (None as a result indicates a body
84
def recreate_search(self, repository, search_bytes, discard_excess=False):
85
"""Recreate a search from its serialised form.
87
:param discard_excess: If True, and the search refers to data we don't
88
have, just silently accept that fact - the verb calling
89
recreate_search trusts that clients will look for missing things
90
they expected and get it from elsewhere.
92
if search_bytes == 'everything':
93
return vf_search.EverythingResult(repository), None
94
lines = search_bytes.split('\n')
95
if lines[0] == 'ancestry-of':
97
search_result = vf_search.PendingAncestryResult(heads, repository)
98
return search_result, None
99
elif lines[0] == 'search':
100
return self.recreate_search_from_recipe(repository, lines[1:],
101
discard_excess=discard_excess)
103
return (None, FailedSmartServerResponse(('BadSearch',)))
105
def recreate_search_from_recipe(self, repository, lines,
106
discard_excess=False):
107
"""Recreate a specific revision search (vs a from-tip search).
109
:param discard_excess: If True, and the search refers to data we don't
110
have, just silently accept that fact - the verb calling
111
recreate_search trusts that clients will look for missing things
112
they expected and get it from elsewhere.
114
start_keys = set(lines[0].split(' '))
115
exclude_keys = set(lines[1].split(' '))
116
revision_count = int(lines[2])
117
repository.lock_read()
119
search = repository.get_graph()._make_breadth_first_searcher(
123
next_revs = search.next()
124
except StopIteration:
126
search.stop_searching_any(exclude_keys.intersection(next_revs))
127
(started_keys, excludes, included_keys) = search.get_state()
128
if (not discard_excess and len(included_keys) != revision_count):
129
# we got back a different amount of data than expected, this
130
# gets reported as NoSuchRevision, because less revisions
131
# indicates missing revisions, and more should never happen as
132
# the excludes list considers ghosts and ensures that ghost
133
# filling races are not a problem.
134
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
135
search_result = vf_search.SearchResult(started_keys, excludes,
136
len(included_keys), included_keys)
137
return (search_result, None)
142
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
143
"""Calls self.do_readlocked_repository_request."""
145
def do_repository_request(self, repository, *args):
146
"""Read lock a repository for do_readlocked_repository_request."""
147
repository.lock_read()
149
return self.do_readlocked_repository_request(repository, *args)
154
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
155
"""Break a repository lock."""
157
def do_repository_request(self, repository):
158
repository.break_lock()
159
return SuccessfulSmartServerResponse(('ok', ))
164
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
165
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
167
no_extra_results = False
169
def do_repository_request(self, repository, *revision_ids):
170
"""Get parent details for some revisions.
172
All the parents for revision_ids are returned. Additionally up to 64KB
173
of additional parent data found by performing a breadth first search
174
from revision_ids is returned. The verb takes a body containing the
175
current search state, see do_body for details.
177
If 'include-missing:' is in revision_ids, ghosts encountered in the
178
graph traversal for getting parent data are included in the result with
179
a prefix of 'missing:'.
181
:param repository: The repository to query in.
182
:param revision_ids: The utf8 encoded revision_id to answer for.
184
self._revision_ids = revision_ids
185
return None # Signal that we want a body.
187
def do_body(self, body_bytes):
188
"""Process the current search state and perform the parent lookup.
190
:return: A smart server response where the body contains an utf8
191
encoded flattened list of the parents of the revisions (the same
192
format as Repository.get_revision_graph) which has been bz2
195
repository = self._repository
196
repository.lock_read()
198
return self._do_repository_request(body_bytes)
202
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
203
include_missing, max_size=65536):
206
estimator = estimate_compressed_size.ZLibEstimator(max_size)
207
next_revs = revision_ids
208
first_loop_done = False
210
queried_revs.update(next_revs)
211
parent_map = repo_graph.get_parent_map(next_revs)
212
current_revs = next_revs
214
for revision_id in current_revs:
216
parents = parent_map.get(revision_id)
217
if parents is not None:
218
# adjust for the wire
219
if parents == (_mod_revision.NULL_REVISION,):
221
# prepare the next query
222
next_revs.update(parents)
223
encoded_id = revision_id
226
encoded_id = "missing:" + revision_id
228
if (revision_id not in client_seen_revs and
229
(not missing_rev or include_missing)):
230
# Client does not have this revision, give it to it.
231
# add parents to the result
232
result[encoded_id] = parents
233
# Approximate the serialized cost of this revision_id.
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
236
# get all the directly asked for parents, and then flesh out to
237
# 64K (compressed) or so. We do one level of depth at a time to
238
# stay in sync with the client. The 250000 magic number is
239
# estimated compression ratio taken from bzr.dev itself.
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
246
# don't query things we've already queried
247
next_revs = next_revs.difference(queried_revs)
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
272
# sorting trivially puts lexographically similar revision ids together.
275
for revision, parents in sorted(result.items()):
276
lines.append(' '.join((revision, ) + tuple(parents)))
278
return SuccessfulSmartServerResponse(
279
('ok', ), bz2.compress('\n'.join(lines)))
282
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
284
def do_readlocked_repository_request(self, repository, revision_id):
54
285
"""Return the result of repository.get_revision_graph(revision_id).
287
Deprecated as of bzr 1.4, but supported for older clients.
56
289
:param repository: The repository to query in.
57
290
:param revision_id: The utf8 encoded revision_id to get a graph from.
58
291
:return: A smart server response where the body contains an utf8
160
token = repository.lock_write(token=token)
506
token = repository.lock_write(token=token).repository_token
161
507
except errors.LockContention, e:
162
508
return FailedSmartServerResponse(('LockContention',))
163
509
except errors.UnlockableTransport:
164
510
return FailedSmartServerResponse(('UnlockableTransport',))
165
repository.leave_lock_in_place()
511
except errors.LockFailed, e:
512
return FailedSmartServerResponse(('LockFailed',
513
str(e.lock), str(e.why)))
514
if token is not None:
515
repository.leave_lock_in_place()
166
516
repository.unlock()
167
517
if token is None:
169
519
return SuccessfulSmartServerResponse(('ok', token))
522
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
524
def do_repository_request(self, repository, to_network_name):
525
"""Get a stream for inserting into a to_format repository.
527
The request body is 'search_bytes', a description of the revisions
530
In 2.3 this verb added support for search_bytes == 'everything'. Older
531
implementations will respond with a BadSearch error, and clients should
532
catch this and fallback appropriately.
534
:param repository: The repository to stream from.
535
:param to_network_name: The network name of the format of the target
538
self._to_format = network_format_registry.get(to_network_name)
539
if self._should_fake_unknown():
540
return FailedSmartServerResponse(
541
('UnknownMethod', 'Repository.get_stream'))
542
return None # Signal that we want a body.
544
def _should_fake_unknown(self):
545
"""Return True if we should return UnknownMethod to the client.
547
This is a workaround for bugs in pre-1.19 clients that claim to
548
support receiving streams of CHK repositories. The pre-1.19 client
549
expects inventory records to be serialized in the format defined by
550
to_network_name, but in pre-1.19 (at least) that format definition
551
tries to use the xml5 serializer, which does not correctly handle
552
rich-roots. After 1.19 the client can also accept inventory-deltas
553
(which avoids this issue), and those clients will use the
554
Repository.get_stream_1.19 verb instead of this one.
555
So: if this repository is CHK, and the to_format doesn't match,
556
we should just fake an UnknownSmartMethod error so that the client
557
will fallback to VFS, rather than sending it a stream we know it
560
from_format = self._repository._format
561
to_format = self._to_format
562
if not from_format.supports_chks:
563
# Source not CHK: that's ok
565
if (to_format.supports_chks and
566
from_format.repository_class is to_format.repository_class and
567
from_format._serializer == to_format._serializer):
568
# Source is CHK, but target matches: that's ok
569
# (e.g. 2a->2a, or CHK2->2a)
571
# Source is CHK, and target is not CHK or incompatible CHK. We can't
572
# generate a compatible stream.
575
def do_body(self, body_bytes):
576
repository = self._repository
577
repository.lock_read()
579
search_result, error = self.recreate_search(repository, body_bytes,
581
if error is not None:
584
source = repository._get_source(self._to_format)
585
stream = source.get_stream(search_result)
587
exc_info = sys.exc_info()
589
# On non-error, unlocking is done by the body stream handler.
592
raise exc_info[0], exc_info[1], exc_info[2]
593
return SuccessfulSmartServerResponse(('ok',),
594
body_stream=self.body_stream(stream, repository))
596
def body_stream(self, stream, repository):
597
byte_stream = _stream_to_byte_stream(stream, repository._format)
599
for bytes in byte_stream:
601
except errors.RevisionNotPresent, e:
602
# This shouldn't be able to happen, but as we don't buffer
603
# everything it can in theory happen.
605
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
610
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
611
"""The same as Repository.get_stream, but will return stream CHK formats to
614
See SmartServerRepositoryGetStream._should_fake_unknown.
619
def _should_fake_unknown(self):
620
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
624
def _stream_to_byte_stream(stream, src_format):
625
"""Convert a record stream to a self delimited byte stream."""
626
pack_writer = pack.ContainerSerialiser()
627
yield pack_writer.begin()
628
yield pack_writer.bytes_record(src_format.network_name(), '')
629
for substream_type, substream in stream:
630
for record in substream:
631
if record.storage_kind in ('chunked', 'fulltext'):
632
serialised = record_to_fulltext_bytes(record)
633
elif record.storage_kind == 'absent':
634
raise ValueError("Absent factory for %s" % (record.key,))
636
serialised = record.get_bytes_as(record.storage_kind)
638
# Some streams embed the whole stream into the wire
639
# representation of the first record, which means that
640
# later records have no wire representation: we skip them.
641
yield pack_writer.bytes_record(serialised, [(substream_type,)])
642
yield pack_writer.end()
645
class _ByteStreamDecoder(object):
646
"""Helper for _byte_stream_to_stream.
648
The expected usage of this class is via the function _byte_stream_to_stream
649
which creates a _ByteStreamDecoder, pops off the stream format and then
650
yields the output of record_stream(), the main entry point to
653
Broadly this class has to unwrap two layers of iterators:
657
This is complicated by wishing to return type, iterator_for_type, but
658
getting the data for iterator_for_type when we find out type: we can't
659
simply pass a generator down to the NetworkRecordStream parser, instead
660
we have a little local state to seed each NetworkRecordStream instance,
661
and gather the type that we'll be yielding.
663
:ivar byte_stream: The byte stream being decoded.
664
:ivar stream_decoder: A pack parser used to decode the bytestream
665
:ivar current_type: The current type, used to join adjacent records of the
666
same type into a single stream.
667
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
670
def __init__(self, byte_stream, record_counter):
671
"""Create a _ByteStreamDecoder."""
672
self.stream_decoder = pack.ContainerPushParser()
673
self.current_type = None
674
self.first_bytes = None
675
self.byte_stream = byte_stream
676
self._record_counter = record_counter
679
def iter_stream_decoder(self):
680
"""Iterate the contents of the pack from stream_decoder."""
681
# dequeue pending items
682
for record in self.stream_decoder.read_pending_records():
684
# Pull bytes of the wire, decode them to records, yield those records.
685
for bytes in self.byte_stream:
686
self.stream_decoder.accept_bytes(bytes)
687
for record in self.stream_decoder.read_pending_records():
690
def iter_substream_bytes(self):
691
if self.first_bytes is not None:
692
yield self.first_bytes
693
# If we run out of pack records, single the outer layer to stop.
694
self.first_bytes = None
695
for record in self.iter_pack_records:
696
record_names, record_bytes = record
697
record_name, = record_names
698
substream_type = record_name[0]
699
if substream_type != self.current_type:
700
# end of a substream, seed the next substream.
701
self.current_type = substream_type
702
self.first_bytes = record_bytes
706
def record_stream(self):
707
"""Yield substream_type, substream from the byte stream."""
708
def wrap_and_count(pb, rc, substream):
709
"""Yield records from stream while showing progress."""
712
if self.current_type != 'revisions' and self.key_count != 0:
713
# As we know the number of revisions now (in self.key_count)
714
# we can setup and use record_counter (rc).
715
if not rc.is_initialized():
716
rc.setup(self.key_count, self.key_count)
717
for record in substream.read():
719
if rc.is_initialized() and counter == rc.STEP:
720
rc.increment(counter)
721
pb.update('Estimate', rc.current, rc.max)
723
if self.current_type == 'revisions':
724
# Total records is proportional to number of revs
725
# to fetch. With remote, we used self.key_count to
726
# track the number of revs. Once we have the revs
727
# counts in self.key_count, the progress bar changes
728
# from 'Estimating..' to 'Estimate' above.
730
if counter == rc.STEP:
731
pb.update('Estimating..', self.key_count)
737
pb = ui.ui_factory.nested_progress_bar()
738
rc = self._record_counter
740
# Make and consume sub generators, one per substream type:
741
while self.first_bytes is not None:
742
substream = NetworkRecordStream(self.iter_substream_bytes())
743
# after substream is fully consumed, self.current_type is set
744
# to the next type, and self.first_bytes is set to the matching
746
yield self.current_type, wrap_and_count(pb, rc, substream)
749
pb.update('Done', rc.max, rc.max)
752
def seed_state(self):
753
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
754
# Set a single generator we can use to get data from the pack stream.
755
self.iter_pack_records = self.iter_stream_decoder()
756
# Seed the very first subiterator with content; after this each one
758
list(self.iter_substream_bytes())
761
def _byte_stream_to_stream(byte_stream, record_counter=None):
762
"""Convert a byte stream into a format and a stream.
764
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
765
:return: (RepositoryFormat, stream_generator)
767
decoder = _ByteStreamDecoder(byte_stream, record_counter)
768
for bytes in byte_stream:
769
decoder.stream_decoder.accept_bytes(bytes)
770
for record in decoder.stream_decoder.read_pending_records(max=1):
771
record_names, src_format_name = record
772
src_format = network_format_registry.get(src_format_name)
773
return src_format, decoder.record_stream()
172
776
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
174
778
def do_repository_request(self, repository, token):
232
861
dirname = dirname.encode(sys.getfilesystemencoding())
233
862
# python's tarball module includes the whole path by default so
235
assert dirname.endswith('.bzr')
864
if not dirname.endswith('.bzr'):
865
raise ValueError(dirname)
236
866
tarball.add(dirname, '.bzr') # recursive by default
871
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
872
"""Insert a record stream from a RemoteSink into a repository.
874
This gets bytes pushed to it by the network infrastructure and turns that
875
into a bytes iterator using a thread. That is then processed by
876
_byte_stream_to_stream.
881
def do_repository_request(self, repository, resume_tokens, lock_token):
882
"""StreamSink.insert_stream for a remote repository."""
883
repository.lock_write(token=lock_token)
884
self.do_insert_stream_request(repository, resume_tokens)
886
def do_insert_stream_request(self, repository, resume_tokens):
887
tokens = [token for token in resume_tokens.split(' ') if token]
889
self.repository = repository
890
self.queue = Queue.Queue()
891
self.insert_thread = threading.Thread(target=self._inserter_thread)
892
self.insert_thread.start()
894
def do_chunk(self, body_stream_chunk):
895
self.queue.put(body_stream_chunk)
897
def _inserter_thread(self):
899
src_format, stream = _byte_stream_to_stream(
900
self.blocking_byte_stream())
901
self.insert_result = self.repository._get_sink().insert_stream(
902
stream, src_format, self.tokens)
903
self.insert_ok = True
905
self.insert_exception = sys.exc_info()
906
self.insert_ok = False
908
def blocking_byte_stream(self):
910
bytes = self.queue.get()
911
if bytes is StopIteration:
917
self.queue.put(StopIteration)
918
if self.insert_thread is not None:
919
self.insert_thread.join()
920
if not self.insert_ok:
921
exc_info = self.insert_exception
922
raise exc_info[0], exc_info[1], exc_info[2]
923
write_group_tokens, missing_keys = self.insert_result
924
if write_group_tokens or missing_keys:
925
# bzip needed? missing keys should typically be a small set.
926
# Should this be a streaming body response ?
927
missing_keys = sorted(missing_keys)
928
bytes = bencode.bencode((write_group_tokens, missing_keys))
929
self.repository.unlock()
930
return SuccessfulSmartServerResponse(('missing-basis', bytes))
932
self.repository.unlock()
933
return SuccessfulSmartServerResponse(('ok', ))
936
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
937
"""Insert a record stream from a RemoteSink into a repository.
939
Same as SmartServerRepositoryInsertStreamLocked, except:
940
- the lock token argument is optional
941
- servers that implement this verb accept 'inventory-delta' records in the
947
def do_repository_request(self, repository, resume_tokens, lock_token=None):
948
"""StreamSink.insert_stream for a remote repository."""
949
SmartServerRepositoryInsertStreamLocked.do_repository_request(
950
self, repository, resume_tokens, lock_token)
953
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
954
"""Insert a record stream from a RemoteSink into an unlocked repository.
956
This is the same as SmartServerRepositoryInsertStreamLocked, except it
957
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
958
like pack format) repository.
963
def do_repository_request(self, repository, resume_tokens):
964
"""StreamSink.insert_stream for a remote repository."""
965
repository.lock_write()
966
self.do_insert_stream_request(repository, resume_tokens)
969
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
970
"""Add a revision signature text.
975
def do_repository_request(self, repository, lock_token, revision_id,
976
*write_group_tokens):
977
"""Add a revision signature text.
979
:param repository: Repository to operate on
980
:param lock_token: Lock token
981
:param revision_id: Revision for which to add signature
982
:param write_group_tokens: Write group tokens
984
self._lock_token = lock_token
985
self._revision_id = revision_id
986
self._write_group_tokens = write_group_tokens
989
def do_body(self, body_bytes):
990
"""Add a signature text.
992
:param body_bytes: GPG signature text
993
:return: SuccessfulSmartServerResponse with arguments 'ok' and
994
the list of new write group tokens.
996
self._repository.lock_write(token=self._lock_token)
998
self._repository.resume_write_group(self._write_group_tokens)
1000
self._repository.add_signature_text(self._revision_id,
1003
new_write_group_tokens = self._repository.suspend_write_group()
1005
self._repository.unlock()
1006
return SuccessfulSmartServerResponse(
1007
('ok', ) + tuple(new_write_group_tokens))
1010
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1011
"""Start a write group.
1016
def do_repository_request(self, repository, lock_token):
1017
"""Start a write group."""
1018
repository.lock_write(token=lock_token)
1020
repository.start_write_group()
1022
tokens = repository.suspend_write_group()
1023
except errors.UnsuspendableWriteGroup:
1024
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1027
return SuccessfulSmartServerResponse(('ok', tokens))
1030
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1031
"""Commit a write group.
1036
def do_repository_request(self, repository, lock_token,
1037
write_group_tokens):
1038
"""Commit a write group."""
1039
repository.lock_write(token=lock_token)
1042
repository.resume_write_group(write_group_tokens)
1043
except errors.UnresumableWriteGroup, e:
1044
return FailedSmartServerResponse(
1045
('UnresumableWriteGroup', e.write_groups, e.reason))
1047
repository.commit_write_group()
1049
write_group_tokens = repository.suspend_write_group()
1050
# FIXME JRV 2011-11-19: What if the write_group_tokens
1055
return SuccessfulSmartServerResponse(('ok', ))
1058
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1059
"""Abort a write group.
1064
def do_repository_request(self, repository, lock_token, write_group_tokens):
1065
"""Abort a write group."""
1066
repository.lock_write(token=lock_token)
1069
repository.resume_write_group(write_group_tokens)
1070
except errors.UnresumableWriteGroup, e:
1071
return FailedSmartServerResponse(
1072
('UnresumableWriteGroup', e.write_groups, e.reason))
1073
repository.abort_write_group()
1076
return SuccessfulSmartServerResponse(('ok', ))
1079
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1080
"""Check that a write group is still valid.
1085
def do_repository_request(self, repository, lock_token, write_group_tokens):
1086
"""Abort a write group."""
1087
repository.lock_write(token=lock_token)
1090
repository.resume_write_group(write_group_tokens)
1091
except errors.UnresumableWriteGroup, e:
1092
return FailedSmartServerResponse(
1093
('UnresumableWriteGroup', e.write_groups, e.reason))
1095
repository.suspend_write_group()
1098
return SuccessfulSmartServerResponse(('ok', ))
1101
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1102
"""Retrieve all of the revision ids in a repository.
1107
def do_repository_request(self, repository):
1108
revids = repository.all_revision_ids()
1109
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1112
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1113
"""Reconcile a repository.
1118
def do_repository_request(self, repository, lock_token):
1120
repository.lock_write(token=lock_token)
1121
except errors.TokenLockingNotSupported, e:
1122
return FailedSmartServerResponse(
1123
('TokenLockingNotSupported', ))
1125
reconciler = repository.reconcile()
1129
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1130
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1132
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1135
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1136
"""Pack a repository.
1141
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1142
self._repository = repository
1143
self._lock_token = lock_token
1144
if clean_obsolete_packs == 'True':
1145
self._clean_obsolete_packs = True
1147
self._clean_obsolete_packs = False
1150
def do_body(self, body_bytes):
1151
if body_bytes == "":
1154
hint = body_bytes.splitlines()
1155
self._repository.lock_write(token=self._lock_token)
1157
self._repository.pack(hint, self._clean_obsolete_packs)
1159
self._repository.unlock()
1160
return SuccessfulSmartServerResponse(("ok", ), )
1163
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1164
"""Iterate over the contents of files.
1166
The client sends a list of desired files to stream, one
1167
per line, and as tuples of file id and revision, separated by
1170
The server replies with a stream. Each entry is preceded by a header,
1171
which can either be:
1173
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1174
list sent by the client. This header is followed by the contents of
1175
the file, bzip2-compressed.
1176
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1177
The client can then raise an appropriate RevisionNotPresent error
1178
or check its fallback repositories.
1183
def body_stream(self, repository, desired_files):
1184
self._repository.lock_read()
1187
for i, key in enumerate(desired_files):
1189
for record in repository.texts.get_record_stream(text_keys,
1191
identifier = text_keys[record.key]
1192
if record.storage_kind == 'absent':
1193
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1194
record.key[1], identifier)
1195
# FIXME: Way to abort early?
1197
yield "ok\0%d\n" % identifier
1198
compressor = zlib.compressobj()
1199
for bytes in record.get_bytes_as('chunked'):
1200
data = compressor.compress(bytes)
1203
data = compressor.flush()
1207
self._repository.unlock()
1209
def do_body(self, body_bytes):
1211
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1212
return SuccessfulSmartServerResponse(('ok', ),
1213
body_stream=self.body_stream(self._repository, desired_files))
1215
def do_repository_request(self, repository):
1216
# Signal that we want a body
1220
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1221
"""Stream a list of revisions.
1223
The client sends a list of newline-separated revision ids in the
1224
body of the request and the server replies with the serializer format,
1225
and a stream of bzip2-compressed revision texts (using the specified
1228
Any revisions the server does not have are omitted from the stream.
1233
def do_repository_request(self, repository):
1234
self._repository = repository
1235
# Signal there is a body
1238
def do_body(self, body_bytes):
1239
revision_ids = body_bytes.split("\n")
1240
return SuccessfulSmartServerResponse(
1241
('ok', self._repository.get_serializer_format()),
1242
body_stream=self.body_stream(self._repository, revision_ids))
1244
def body_stream(self, repository, revision_ids):
1245
self._repository.lock_read()
1247
for record in repository.revisions.get_record_stream(
1248
[(revid,) for revid in revision_ids], 'unordered', True):
1249
if record.storage_kind == 'absent':
1251
yield zlib.compress(record.get_bytes_as('fulltext'))
1253
self._repository.unlock()
1256
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1257
"""Get the inventory deltas for a set of revision ids.
1259
This accepts a list of revision ids, and then sends a chain
1260
of deltas for the inventories of those revisions. The first
1261
revision will be empty.
1263
The server writes back zlibbed serialized inventory deltas,
1264
in the ordering specified. The base for each delta is the
1265
inventory generated by the previous delta.
1270
def _inventory_delta_stream(self, repository, ordering, revids):
1271
prev_inv = _mod_inventory.Inventory(root_id=None,
1272
revision_id=_mod_revision.NULL_REVISION)
1273
serializer = inventory_delta.InventoryDeltaSerializer(
1274
repository.supports_rich_root(),
1275
repository._format.supports_tree_reference)
1276
repository.lock_read()
1278
for inv, revid in repository._iter_inventories(revids, ordering):
1281
inv_delta = inv._make_delta(prev_inv)
1282
lines = serializer.delta_to_lines(
1283
prev_inv.revision_id, inv.revision_id, inv_delta)
1284
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1289
def body_stream(self, repository, ordering, revids):
1290
substream = self._inventory_delta_stream(repository,
1292
return _stream_to_byte_stream([('inventory-deltas', substream)],
1295
def do_body(self, body_bytes):
1296
return SuccessfulSmartServerResponse(('ok', ),
1297
body_stream=self.body_stream(self._repository, self._ordering,
1298
body_bytes.splitlines()))
1300
def do_repository_request(self, repository, ordering):
1301
if ordering == 'unordered':
1302
# inventory deltas for a topologically sorted stream
1303
# are likely to be smaller
1304
ordering = 'topological'
1305
self._ordering = ordering
1306
# Signal that we want a body