36
59
def do(self, path, *args):
37
60
"""Execute a repository request.
39
The repository must be at the exact path - no searching is done.
62
All Repository requests take a path to the repository as their first
63
argument. The repository must be at the exact path given by the
64
client - no searching is done.
41
66
The actual logic is delegated to self.do_repository_request.
43
:param path: The path for the repository.
44
:return: A smart server from self.do_repository_request().
68
:param client_path: The path for the repository as received from the
70
:return: A SmartServerResponse from self.do_repository_request().
46
transport = self._backing_transport.clone(path)
72
transport = self.transport_from_client_path(path)
47
73
bzrdir = BzrDir.open_from_transport(transport)
48
repository = bzrdir.open_repository()
49
return self.do_repository_request(repository, *args)
52
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryRequest):
54
def do_repository_request(self, repository, revision_id):
74
# Save the repository for use with do_body.
75
self._repository = bzrdir.open_repository()
76
return self.do_repository_request(self._repository, *args)
78
def do_repository_request(self, repository, *args):
79
"""Override to provide an implementation for a verb."""
80
# No-op for verbs that take bodies (None as a result indicates a body
84
def recreate_search(self, repository, search_bytes, discard_excess=False):
85
"""Recreate a search from its serialised form.
87
:param discard_excess: If True, and the search refers to data we don't
88
have, just silently accept that fact - the verb calling
89
recreate_search trusts that clients will look for missing things
90
they expected and get it from elsewhere.
92
if search_bytes == 'everything':
93
return vf_search.EverythingResult(repository), None
94
lines = search_bytes.split('\n')
95
if lines[0] == 'ancestry-of':
97
search_result = vf_search.PendingAncestryResult(heads, repository)
98
return search_result, None
99
elif lines[0] == 'search':
100
return self.recreate_search_from_recipe(repository, lines[1:],
101
discard_excess=discard_excess)
103
return (None, FailedSmartServerResponse(('BadSearch',)))
105
def recreate_search_from_recipe(self, repository, lines,
106
discard_excess=False):
107
"""Recreate a specific revision search (vs a from-tip search).
109
:param discard_excess: If True, and the search refers to data we don't
110
have, just silently accept that fact - the verb calling
111
recreate_search trusts that clients will look for missing things
112
they expected and get it from elsewhere.
114
start_keys = set(lines[0].split(' '))
115
exclude_keys = set(lines[1].split(' '))
116
revision_count = int(lines[2])
117
repository.lock_read()
119
search = repository.get_graph()._make_breadth_first_searcher(
123
next_revs = search.next()
124
except StopIteration:
126
search.stop_searching_any(exclude_keys.intersection(next_revs))
127
(started_keys, excludes, included_keys) = search.get_state()
128
if (not discard_excess and len(included_keys) != revision_count):
129
# we got back a different amount of data than expected, this
130
# gets reported as NoSuchRevision, because less revisions
131
# indicates missing revisions, and more should never happen as
132
# the excludes list considers ghosts and ensures that ghost
133
# filling races are not a problem.
134
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
135
search_result = vf_search.SearchResult(started_keys, excludes,
136
len(included_keys), included_keys)
137
return (search_result, None)
142
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
143
"""Calls self.do_readlocked_repository_request."""
145
def do_repository_request(self, repository, *args):
146
"""Read lock a repository for do_readlocked_repository_request."""
147
repository.lock_read()
149
return self.do_readlocked_repository_request(repository, *args)
154
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
155
"""Break a repository lock."""
157
def do_repository_request(self, repository):
158
repository.break_lock()
159
return SuccessfulSmartServerResponse(('ok', ))
164
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
165
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
167
no_extra_results = False
169
def do_repository_request(self, repository, *revision_ids):
170
"""Get parent details for some revisions.
172
All the parents for revision_ids are returned. Additionally up to 64KB
173
of additional parent data found by performing a breadth first search
174
from revision_ids is returned. The verb takes a body containing the
175
current search state, see do_body for details.
177
If 'include-missing:' is in revision_ids, ghosts encountered in the
178
graph traversal for getting parent data are included in the result with
179
a prefix of 'missing:'.
181
:param repository: The repository to query in.
182
:param revision_ids: The utf8 encoded revision_id to answer for.
184
self._revision_ids = revision_ids
185
return None # Signal that we want a body.
187
def do_body(self, body_bytes):
188
"""Process the current search state and perform the parent lookup.
190
:return: A smart server response where the body contains an utf8
191
encoded flattened list of the parents of the revisions (the same
192
format as Repository.get_revision_graph) which has been bz2
195
repository = self._repository
196
repository.lock_read()
198
return self._do_repository_request(body_bytes)
202
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
203
include_missing, max_size=65536):
206
estimator = estimate_compressed_size.ZLibEstimator(max_size)
207
next_revs = revision_ids
208
first_loop_done = False
210
queried_revs.update(next_revs)
211
parent_map = repo_graph.get_parent_map(next_revs)
212
current_revs = next_revs
214
for revision_id in current_revs:
216
parents = parent_map.get(revision_id)
217
if parents is not None:
218
# adjust for the wire
219
if parents == (_mod_revision.NULL_REVISION,):
221
# prepare the next query
222
next_revs.update(parents)
223
encoded_id = revision_id
226
encoded_id = "missing:" + revision_id
228
if (revision_id not in client_seen_revs and
229
(not missing_rev or include_missing)):
230
# Client does not have this revision, give it to it.
231
# add parents to the result
232
result[encoded_id] = parents
233
# Approximate the serialized cost of this revision_id.
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
236
# get all the directly asked for parents, and then flesh out to
237
# 64K (compressed) or so. We do one level of depth at a time to
238
# stay in sync with the client. The 250000 magic number is
239
# estimated compression ratio taken from bzr.dev itself.
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
246
# don't query things we've already queried
247
next_revs = next_revs.difference(queried_revs)
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
272
# sorting trivially puts lexographically similar revision ids together.
275
for revision, parents in sorted(result.items()):
276
lines.append(' '.join((revision, ) + tuple(parents)))
278
return SuccessfulSmartServerResponse(
279
('ok', ), bz2.compress('\n'.join(lines)))
282
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
284
def do_readlocked_repository_request(self, repository, revision_id):
55
285
"""Return the result of repository.get_revision_graph(revision_id).
287
Deprecated as of bzr 1.4, but supported for older clients.
57
289
:param repository: The repository to query in.
58
290
:param revision_id: The utf8 encoded revision_id to get a graph from.
59
291
:return: A smart server response where the body contains an utf8
161
token = repository.lock_write(token=token)
506
token = repository.lock_write(token=token).repository_token
162
507
except errors.LockContention, e:
163
508
return FailedSmartServerResponse(('LockContention',))
164
509
except errors.UnlockableTransport:
165
510
return FailedSmartServerResponse(('UnlockableTransport',))
166
repository.leave_lock_in_place()
511
except errors.LockFailed, e:
512
return FailedSmartServerResponse(('LockFailed',
513
str(e.lock), str(e.why)))
514
if token is not None:
515
repository.leave_lock_in_place()
167
516
repository.unlock()
168
517
if token is None:
170
519
return SuccessfulSmartServerResponse(('ok', token))
522
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
524
def do_repository_request(self, repository, to_network_name):
525
"""Get a stream for inserting into a to_format repository.
527
The request body is 'search_bytes', a description of the revisions
530
In 2.3 this verb added support for search_bytes == 'everything'. Older
531
implementations will respond with a BadSearch error, and clients should
532
catch this and fallback appropriately.
534
:param repository: The repository to stream from.
535
:param to_network_name: The network name of the format of the target
538
self._to_format = network_format_registry.get(to_network_name)
539
if self._should_fake_unknown():
540
return FailedSmartServerResponse(
541
('UnknownMethod', 'Repository.get_stream'))
542
return None # Signal that we want a body.
544
def _should_fake_unknown(self):
545
"""Return True if we should return UnknownMethod to the client.
547
This is a workaround for bugs in pre-1.19 clients that claim to
548
support receiving streams of CHK repositories. The pre-1.19 client
549
expects inventory records to be serialized in the format defined by
550
to_network_name, but in pre-1.19 (at least) that format definition
551
tries to use the xml5 serializer, which does not correctly handle
552
rich-roots. After 1.19 the client can also accept inventory-deltas
553
(which avoids this issue), and those clients will use the
554
Repository.get_stream_1.19 verb instead of this one.
555
So: if this repository is CHK, and the to_format doesn't match,
556
we should just fake an UnknownSmartMethod error so that the client
557
will fallback to VFS, rather than sending it a stream we know it
560
from_format = self._repository._format
561
to_format = self._to_format
562
if not from_format.supports_chks:
563
# Source not CHK: that's ok
565
if (to_format.supports_chks and
566
from_format.repository_class is to_format.repository_class and
567
from_format._serializer == to_format._serializer):
568
# Source is CHK, but target matches: that's ok
569
# (e.g. 2a->2a, or CHK2->2a)
571
# Source is CHK, and target is not CHK or incompatible CHK. We can't
572
# generate a compatible stream.
575
def do_body(self, body_bytes):
576
repository = self._repository
577
repository.lock_read()
579
search_result, error = self.recreate_search(repository, body_bytes,
581
if error is not None:
584
source = repository._get_source(self._to_format)
585
stream = source.get_stream(search_result)
587
exc_info = sys.exc_info()
589
# On non-error, unlocking is done by the body stream handler.
592
raise exc_info[0], exc_info[1], exc_info[2]
593
return SuccessfulSmartServerResponse(('ok',),
594
body_stream=self.body_stream(stream, repository))
596
def body_stream(self, stream, repository):
597
byte_stream = _stream_to_byte_stream(stream, repository._format)
599
for bytes in byte_stream:
601
except errors.RevisionNotPresent, e:
602
# This shouldn't be able to happen, but as we don't buffer
603
# everything it can in theory happen.
605
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
610
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
611
"""The same as Repository.get_stream, but will return stream CHK formats to
614
See SmartServerRepositoryGetStream._should_fake_unknown.
619
def _should_fake_unknown(self):
620
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
624
def _stream_to_byte_stream(stream, src_format):
625
"""Convert a record stream to a self delimited byte stream."""
626
pack_writer = pack.ContainerSerialiser()
627
yield pack_writer.begin()
628
yield pack_writer.bytes_record(src_format.network_name(), '')
629
for substream_type, substream in stream:
630
for record in substream:
631
if record.storage_kind in ('chunked', 'fulltext'):
632
serialised = record_to_fulltext_bytes(record)
633
elif record.storage_kind == 'absent':
634
raise ValueError("Absent factory for %s" % (record.key,))
636
serialised = record.get_bytes_as(record.storage_kind)
638
# Some streams embed the whole stream into the wire
639
# representation of the first record, which means that
640
# later records have no wire representation: we skip them.
641
yield pack_writer.bytes_record(serialised, [(substream_type,)])
642
yield pack_writer.end()
645
class _ByteStreamDecoder(object):
646
"""Helper for _byte_stream_to_stream.
648
The expected usage of this class is via the function _byte_stream_to_stream
649
which creates a _ByteStreamDecoder, pops off the stream format and then
650
yields the output of record_stream(), the main entry point to
653
Broadly this class has to unwrap two layers of iterators:
657
This is complicated by wishing to return type, iterator_for_type, but
658
getting the data for iterator_for_type when we find out type: we can't
659
simply pass a generator down to the NetworkRecordStream parser, instead
660
we have a little local state to seed each NetworkRecordStream instance,
661
and gather the type that we'll be yielding.
663
:ivar byte_stream: The byte stream being decoded.
664
:ivar stream_decoder: A pack parser used to decode the bytestream
665
:ivar current_type: The current type, used to join adjacent records of the
666
same type into a single stream.
667
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
670
def __init__(self, byte_stream, record_counter):
671
"""Create a _ByteStreamDecoder."""
672
self.stream_decoder = pack.ContainerPushParser()
673
self.current_type = None
674
self.first_bytes = None
675
self.byte_stream = byte_stream
676
self._record_counter = record_counter
679
def iter_stream_decoder(self):
680
"""Iterate the contents of the pack from stream_decoder."""
681
# dequeue pending items
682
for record in self.stream_decoder.read_pending_records():
684
# Pull bytes of the wire, decode them to records, yield those records.
685
for bytes in self.byte_stream:
686
self.stream_decoder.accept_bytes(bytes)
687
for record in self.stream_decoder.read_pending_records():
690
def iter_substream_bytes(self):
691
if self.first_bytes is not None:
692
yield self.first_bytes
693
# If we run out of pack records, single the outer layer to stop.
694
self.first_bytes = None
695
for record in self.iter_pack_records:
696
record_names, record_bytes = record
697
record_name, = record_names
698
substream_type = record_name[0]
699
if substream_type != self.current_type:
700
# end of a substream, seed the next substream.
701
self.current_type = substream_type
702
self.first_bytes = record_bytes
706
def record_stream(self):
707
"""Yield substream_type, substream from the byte stream."""
708
def wrap_and_count(pb, rc, substream):
709
"""Yield records from stream while showing progress."""
712
if self.current_type != 'revisions' and self.key_count != 0:
713
# As we know the number of revisions now (in self.key_count)
714
# we can setup and use record_counter (rc).
715
if not rc.is_initialized():
716
rc.setup(self.key_count, self.key_count)
717
for record in substream.read():
719
if rc.is_initialized() and counter == rc.STEP:
720
rc.increment(counter)
721
pb.update('Estimate', rc.current, rc.max)
723
if self.current_type == 'revisions':
724
# Total records is proportional to number of revs
725
# to fetch. With remote, we used self.key_count to
726
# track the number of revs. Once we have the revs
727
# counts in self.key_count, the progress bar changes
728
# from 'Estimating..' to 'Estimate' above.
730
if counter == rc.STEP:
731
pb.update('Estimating..', self.key_count)
737
pb = ui.ui_factory.nested_progress_bar()
738
rc = self._record_counter
739
# Make and consume sub generators, one per substream type:
740
while self.first_bytes is not None:
741
substream = NetworkRecordStream(self.iter_substream_bytes())
742
# after substream is fully consumed, self.current_type is set to
743
# the next type, and self.first_bytes is set to the matching bytes.
744
yield self.current_type, wrap_and_count(pb, rc, substream)
746
pb.update('Done', rc.max, rc.max)
749
def seed_state(self):
750
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
751
# Set a single generator we can use to get data from the pack stream.
752
self.iter_pack_records = self.iter_stream_decoder()
753
# Seed the very first subiterator with content; after this each one
755
list(self.iter_substream_bytes())
758
def _byte_stream_to_stream(byte_stream, record_counter=None):
759
"""Convert a byte stream into a format and a stream.
761
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
762
:return: (RepositoryFormat, stream_generator)
764
decoder = _ByteStreamDecoder(byte_stream, record_counter)
765
for bytes in byte_stream:
766
decoder.stream_decoder.accept_bytes(bytes)
767
for record in decoder.stream_decoder.read_pending_records(max=1):
768
record_names, src_format_name = record
769
src_format = network_format_registry.get(src_format_name)
770
return src_format, decoder.record_stream()
173
773
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
175
775
def do_repository_request(self, repository, token):
235
858
dirname = dirname.encode(sys.getfilesystemencoding())
236
859
# python's tarball module includes the whole path by default so
238
assert dirname.endswith('.bzr')
861
if not dirname.endswith('.bzr'):
862
raise ValueError(dirname)
239
863
tarball.add(dirname, '.bzr') # recursive by default
868
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
869
"""Insert a record stream from a RemoteSink into a repository.
871
This gets bytes pushed to it by the network infrastructure and turns that
872
into a bytes iterator using a thread. That is then processed by
873
_byte_stream_to_stream.
878
def do_repository_request(self, repository, resume_tokens, lock_token):
879
"""StreamSink.insert_stream for a remote repository."""
880
repository.lock_write(token=lock_token)
881
self.do_insert_stream_request(repository, resume_tokens)
883
def do_insert_stream_request(self, repository, resume_tokens):
884
tokens = [token for token in resume_tokens.split(' ') if token]
886
self.repository = repository
887
self.queue = Queue.Queue()
888
self.insert_thread = threading.Thread(target=self._inserter_thread)
889
self.insert_thread.start()
891
def do_chunk(self, body_stream_chunk):
892
self.queue.put(body_stream_chunk)
894
def _inserter_thread(self):
896
src_format, stream = _byte_stream_to_stream(
897
self.blocking_byte_stream())
898
self.insert_result = self.repository._get_sink().insert_stream(
899
stream, src_format, self.tokens)
900
self.insert_ok = True
902
self.insert_exception = sys.exc_info()
903
self.insert_ok = False
905
def blocking_byte_stream(self):
907
bytes = self.queue.get()
908
if bytes is StopIteration:
914
self.queue.put(StopIteration)
915
if self.insert_thread is not None:
916
self.insert_thread.join()
917
if not self.insert_ok:
918
exc_info = self.insert_exception
919
raise exc_info[0], exc_info[1], exc_info[2]
920
write_group_tokens, missing_keys = self.insert_result
921
if write_group_tokens or missing_keys:
922
# bzip needed? missing keys should typically be a small set.
923
# Should this be a streaming body response ?
924
missing_keys = sorted(missing_keys)
925
bytes = bencode.bencode((write_group_tokens, missing_keys))
926
self.repository.unlock()
927
return SuccessfulSmartServerResponse(('missing-basis', bytes))
929
self.repository.unlock()
930
return SuccessfulSmartServerResponse(('ok', ))
933
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
934
"""Insert a record stream from a RemoteSink into a repository.
936
Same as SmartServerRepositoryInsertStreamLocked, except:
937
- the lock token argument is optional
938
- servers that implement this verb accept 'inventory-delta' records in the
944
def do_repository_request(self, repository, resume_tokens, lock_token=None):
945
"""StreamSink.insert_stream for a remote repository."""
946
SmartServerRepositoryInsertStreamLocked.do_repository_request(
947
self, repository, resume_tokens, lock_token)
950
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
951
"""Insert a record stream from a RemoteSink into an unlocked repository.
953
This is the same as SmartServerRepositoryInsertStreamLocked, except it
954
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
955
like pack format) repository.
960
def do_repository_request(self, repository, resume_tokens):
961
"""StreamSink.insert_stream for a remote repository."""
962
repository.lock_write()
963
self.do_insert_stream_request(repository, resume_tokens)
966
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
967
"""Add a revision signature text.
972
def do_repository_request(self, repository, lock_token, revision_id,
973
*write_group_tokens):
974
"""Add a revision signature text.
976
:param repository: Repository to operate on
977
:param lock_token: Lock token
978
:param revision_id: Revision for which to add signature
979
:param write_group_tokens: Write group tokens
981
self._lock_token = lock_token
982
self._revision_id = revision_id
983
self._write_group_tokens = write_group_tokens
986
def do_body(self, body_bytes):
987
"""Add a signature text.
989
:param body_bytes: GPG signature text
990
:return: SuccessfulSmartServerResponse with arguments 'ok' and
991
the list of new write group tokens.
993
self._repository.lock_write(token=self._lock_token)
995
self._repository.resume_write_group(self._write_group_tokens)
997
self._repository.add_signature_text(self._revision_id,
1000
new_write_group_tokens = self._repository.suspend_write_group()
1002
self._repository.unlock()
1003
return SuccessfulSmartServerResponse(
1004
('ok', ) + tuple(new_write_group_tokens))
1007
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1008
"""Start a write group.
1013
def do_repository_request(self, repository, lock_token):
1014
"""Start a write group."""
1015
repository.lock_write(token=lock_token)
1017
repository.start_write_group()
1019
tokens = repository.suspend_write_group()
1020
except errors.UnsuspendableWriteGroup:
1021
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1024
return SuccessfulSmartServerResponse(('ok', tokens))
1027
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1028
"""Commit a write group.
1033
def do_repository_request(self, repository, lock_token,
1034
write_group_tokens):
1035
"""Commit a write group."""
1036
repository.lock_write(token=lock_token)
1039
repository.resume_write_group(write_group_tokens)
1040
except errors.UnresumableWriteGroup, e:
1041
return FailedSmartServerResponse(
1042
('UnresumableWriteGroup', e.write_groups, e.reason))
1044
repository.commit_write_group()
1046
write_group_tokens = repository.suspend_write_group()
1047
# FIXME JRV 2011-11-19: What if the write_group_tokens
1052
return SuccessfulSmartServerResponse(('ok', ))
1055
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1056
"""Abort a write group.
1061
def do_repository_request(self, repository, lock_token, write_group_tokens):
1062
"""Abort a write group."""
1063
repository.lock_write(token=lock_token)
1066
repository.resume_write_group(write_group_tokens)
1067
except errors.UnresumableWriteGroup, e:
1068
return FailedSmartServerResponse(
1069
('UnresumableWriteGroup', e.write_groups, e.reason))
1070
repository.abort_write_group()
1073
return SuccessfulSmartServerResponse(('ok', ))
1076
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1077
"""Check that a write group is still valid.
1082
def do_repository_request(self, repository, lock_token, write_group_tokens):
1083
"""Abort a write group."""
1084
repository.lock_write(token=lock_token)
1087
repository.resume_write_group(write_group_tokens)
1088
except errors.UnresumableWriteGroup, e:
1089
return FailedSmartServerResponse(
1090
('UnresumableWriteGroup', e.write_groups, e.reason))
1092
repository.suspend_write_group()
1095
return SuccessfulSmartServerResponse(('ok', ))
1098
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1099
"""Retrieve all of the revision ids in a repository.
1104
def do_repository_request(self, repository):
1105
revids = repository.all_revision_ids()
1106
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1109
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1110
"""Reconcile a repository.
1115
def do_repository_request(self, repository, lock_token):
1117
repository.lock_write(token=lock_token)
1118
except errors.TokenLockingNotSupported, e:
1119
return FailedSmartServerResponse(
1120
('TokenLockingNotSupported', ))
1122
reconciler = repository.reconcile()
1126
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1127
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1129
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1132
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1133
"""Pack a repository.
1138
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1139
self._repository = repository
1140
self._lock_token = lock_token
1141
if clean_obsolete_packs == 'True':
1142
self._clean_obsolete_packs = True
1144
self._clean_obsolete_packs = False
1147
def do_body(self, body_bytes):
1148
if body_bytes == "":
1151
hint = body_bytes.splitlines()
1152
self._repository.lock_write(token=self._lock_token)
1154
self._repository.pack(hint, self._clean_obsolete_packs)
1156
self._repository.unlock()
1157
return SuccessfulSmartServerResponse(("ok", ), )
1160
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1161
"""Iterate over the contents of files.
1163
The client sends a list of desired files to stream, one
1164
per line, and as tuples of file id and revision, separated by
1167
The server replies with a stream. Each entry is preceded by a header,
1168
which can either be:
1170
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1171
list sent by the client. This header is followed by the contents of
1172
the file, bzip2-compressed.
1173
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1174
The client can then raise an appropriate RevisionNotPresent error
1175
or check its fallback repositories.
1180
def body_stream(self, repository, desired_files):
1181
self._repository.lock_read()
1184
for i, key in enumerate(desired_files):
1186
for record in repository.texts.get_record_stream(text_keys,
1188
identifier = text_keys[record.key]
1189
if record.storage_kind == 'absent':
1190
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1191
record.key[1], identifier)
1192
# FIXME: Way to abort early?
1194
yield "ok\0%d\n" % identifier
1195
compressor = zlib.compressobj()
1196
for bytes in record.get_bytes_as('chunked'):
1197
data = compressor.compress(bytes)
1200
data = compressor.flush()
1204
self._repository.unlock()
1206
def do_body(self, body_bytes):
1208
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1209
return SuccessfulSmartServerResponse(('ok', ),
1210
body_stream=self.body_stream(self._repository, desired_files))
1212
def do_repository_request(self, repository):
1213
# Signal that we want a body
1217
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1218
"""Stream a list of revisions.
1220
The client sends a list of newline-separated revision ids in the
1221
body of the request and the server replies with the serializer format,
1222
and a stream of bzip2-compressed revision texts (using the specified
1225
Any revisions the server does not have are omitted from the stream.
1230
def do_repository_request(self, repository):
1231
self._repository = repository
1232
# Signal there is a body
1235
def do_body(self, body_bytes):
1236
revision_ids = body_bytes.split("\n")
1237
return SuccessfulSmartServerResponse(
1238
('ok', self._repository.get_serializer_format()),
1239
body_stream=self.body_stream(self._repository, revision_ids))
1241
def body_stream(self, repository, revision_ids):
1242
self._repository.lock_read()
1244
for record in repository.revisions.get_record_stream(
1245
[(revid,) for revid in revision_ids], 'unordered', True):
1246
if record.storage_kind == 'absent':
1248
yield zlib.compress(record.get_bytes_as('fulltext'))
1250
self._repository.unlock()
1253
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1254
"""Get the inventory deltas for a set of revision ids.
1256
This accepts a list of revision ids, and then sends a chain
1257
of deltas for the inventories of those revisions. The first
1258
revision will be empty.
1260
The server writes back zlibbed serialized inventory deltas,
1261
in the ordering specified. The base for each delta is the
1262
inventory generated by the previous delta.
1267
def _inventory_delta_stream(self, repository, ordering, revids):
1268
prev_inv = _mod_inventory.Inventory(root_id=None,
1269
revision_id=_mod_revision.NULL_REVISION)
1270
serializer = inventory_delta.InventoryDeltaSerializer(
1271
repository.supports_rich_root(),
1272
repository._format.supports_tree_reference)
1273
repository.lock_read()
1275
for inv, revid in repository._iter_inventories(revids, ordering):
1278
inv_delta = inv._make_delta(prev_inv)
1279
lines = serializer.delta_to_lines(
1280
prev_inv.revision_id, inv.revision_id, inv_delta)
1281
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1286
def body_stream(self, repository, ordering, revids):
1287
substream = self._inventory_delta_stream(repository,
1289
return _stream_to_byte_stream([('inventory-deltas', substream)],
1292
def do_body(self, body_bytes):
1293
return SuccessfulSmartServerResponse(('ok', ),
1294
body_stream=self.body_stream(self._repository, self._ordering,
1295
body_bytes.splitlines()))
1297
def do_repository_request(self, repository, ordering):
1298
if ordering == 'unordered':
1299
# inventory deltas for a topologically sorted stream
1300
# are likely to be smaller
1301
ordering = 'topological'
1302
self._ordering = ordering
1303
# Signal that we want a body