227
226
# add parents to the result
228
227
result[encoded_id] = parents
229
228
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
229
line = '%s %s\n' % (encoded_id, ' '.join(parents))
230
estimator.add_content(line)
231
231
# get all the directly asked for parents, and then flesh out to
232
232
# 64K (compressed) or so. We do one level of depth at a time to
233
233
# stay in sync with the client. The 250000 magic number is
234
234
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
235
if self.no_extra_results or (first_loop_done and estimator.full()):
236
trace.mutter('size: %d, z_size: %d'
237
% (estimator._uncompressed_size_added,
238
estimator._compressed_size_added))
237
239
next_revs = set()
239
241
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
242
next_revs = next_revs.difference(queried_revs)
241
243
first_loop_done = True
246
def _do_repository_request(self, body_bytes):
247
repository = self._repository
248
revision_ids = set(self._revision_ids)
249
include_missing = 'include-missing:' in revision_ids
251
revision_ids.remove('include-missing:')
252
body_lines = body_bytes.split('\n')
253
search_result, error = self.recreate_search_from_recipe(
254
repository, body_lines)
255
if error is not None:
257
# TODO might be nice to start up the search again; but thats not
258
# written or tested yet.
259
client_seen_revs = set(search_result.get_keys())
260
# Always include the requested ids.
261
client_seen_revs.difference_update(revision_ids)
263
repo_graph = repository.get_graph()
264
result = self._expand_requested_revs(repo_graph, revision_ids,
265
client_seen_revs, include_missing)
243
267
# sorting trivially puts lexographically similar revision ids together.
244
268
# Compression FTW.
245
270
for revision, parents in sorted(result.items()):
246
271
lines.append(' '.join((revision, ) + tuple(parents)))
581
701
def record_stream(self):
582
702
"""Yield substream_type, substream from the byte stream."""
703
def wrap_and_count(pb, rc, substream):
704
"""Yield records from stream while showing progress."""
707
if self.current_type != 'revisions' and self.key_count != 0:
708
# As we know the number of revisions now (in self.key_count)
709
# we can setup and use record_counter (rc).
710
if not rc.is_initialized():
711
rc.setup(self.key_count, self.key_count)
712
for record in substream.read():
714
if rc.is_initialized() and counter == rc.STEP:
715
rc.increment(counter)
716
pb.update('Estimate', rc.current, rc.max)
718
if self.current_type == 'revisions':
719
# Total records is proportional to number of revs
720
# to fetch. With remote, we used self.key_count to
721
# track the number of revs. Once we have the revs
722
# counts in self.key_count, the progress bar changes
723
# from 'Estimating..' to 'Estimate' above.
725
if counter == rc.STEP:
726
pb.update('Estimating..', self.key_count)
583
731
self.seed_state()
732
pb = ui.ui_factory.nested_progress_bar()
733
rc = self._record_counter
584
734
# Make and consume sub generators, one per substream type:
585
735
while self.first_bytes is not None:
586
736
substream = NetworkRecordStream(self.iter_substream_bytes())
587
737
# after substream is fully consumed, self.current_type is set to
588
738
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
739
yield self.current_type, wrap_and_count(pb, rc, substream)
741
pb.update('Done', rc.max, rc.max)
591
744
def seed_state(self):
592
745
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
958
self.do_insert_stream_request(repository, resume_tokens)
961
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
962
"""Add a revision signature text.
967
def do_repository_request(self, repository, lock_token, revision_id,
968
*write_group_tokens):
969
"""Add a revision signature text.
971
:param repository: Repository to operate on
972
:param lock_token: Lock token
973
:param revision_id: Revision for which to add signature
974
:param write_group_tokens: Write group tokens
976
self._lock_token = lock_token
977
self._revision_id = revision_id
978
self._write_group_tokens = write_group_tokens
981
def do_body(self, body_bytes):
982
"""Add a signature text.
984
:param body_bytes: GPG signature text
985
:return: SuccessfulSmartServerResponse with arguments 'ok' and
986
the list of new write group tokens.
988
self._repository.lock_write(token=self._lock_token)
990
self._repository.resume_write_group(self._write_group_tokens)
992
self._repository.add_signature_text(self._revision_id,
995
new_write_group_tokens = self._repository.suspend_write_group()
997
self._repository.unlock()
998
return SuccessfulSmartServerResponse(
999
('ok', ) + tuple(new_write_group_tokens))
1002
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1003
"""Start a write group.
1008
def do_repository_request(self, repository, lock_token):
1009
"""Start a write group."""
1010
repository.lock_write(token=lock_token)
1012
repository.start_write_group()
1014
tokens = repository.suspend_write_group()
1015
except errors.UnsuspendableWriteGroup:
1016
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1019
return SuccessfulSmartServerResponse(('ok', tokens))
1022
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1023
"""Commit a write group.
1028
def do_repository_request(self, repository, lock_token,
1029
write_group_tokens):
1030
"""Commit a write group."""
1031
repository.lock_write(token=lock_token)
1034
repository.resume_write_group(write_group_tokens)
1035
except errors.UnresumableWriteGroup, e:
1036
return FailedSmartServerResponse(
1037
('UnresumableWriteGroup', e.write_groups, e.reason))
1039
repository.commit_write_group()
1041
write_group_tokens = repository.suspend_write_group()
1042
# FIXME JRV 2011-11-19: What if the write_group_tokens
1047
return SuccessfulSmartServerResponse(('ok', ))
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1051
"""Abort a write group.
1056
def do_repository_request(self, repository, lock_token, write_group_tokens):
1057
"""Abort a write group."""
1058
repository.lock_write(token=lock_token)
1061
repository.resume_write_group(write_group_tokens)
1062
except errors.UnresumableWriteGroup, e:
1063
return FailedSmartServerResponse(
1064
('UnresumableWriteGroup', e.write_groups, e.reason))
1065
repository.abort_write_group()
1068
return SuccessfulSmartServerResponse(('ok', ))
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1072
"""Check that a write group is still valid.
1077
def do_repository_request(self, repository, lock_token, write_group_tokens):
1078
"""Abort a write group."""
1079
repository.lock_write(token=lock_token)
1082
repository.resume_write_group(write_group_tokens)
1083
except errors.UnresumableWriteGroup, e:
1084
return FailedSmartServerResponse(
1085
('UnresumableWriteGroup', e.write_groups, e.reason))
1087
repository.suspend_write_group()
1090
return SuccessfulSmartServerResponse(('ok', ))
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1094
"""Retrieve all of the revision ids in a repository.
1099
def do_repository_request(self, repository):
1100
revids = repository.all_revision_ids()
1101
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1104
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1105
"""Reconcile a repository.
1110
def do_repository_request(self, repository, lock_token):
1112
repository.lock_write(token=lock_token)
1113
except errors.TokenLockingNotSupported, e:
1114
return FailedSmartServerResponse(
1115
('TokenLockingNotSupported', ))
1117
reconciler = repository.reconcile()
1121
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1122
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1124
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1127
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1128
"""Pack a repository.
1133
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1134
self._repository = repository
1135
self._lock_token = lock_token
1136
if clean_obsolete_packs == 'True':
1137
self._clean_obsolete_packs = True
1139
self._clean_obsolete_packs = False
1142
def do_body(self, body_bytes):
1143
if body_bytes == "":
1146
hint = body_bytes.splitlines()
1147
self._repository.lock_write(token=self._lock_token)
1149
self._repository.pack(hint, self._clean_obsolete_packs)
1151
self._repository.unlock()
1152
return SuccessfulSmartServerResponse(("ok", ), )
1155
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1156
"""Iterate over the contents of files.
1158
The client sends a list of desired files to stream, one
1159
per line, and as tuples of file id and revision, separated by
1162
The server replies with a stream. Each entry is preceded by a header,
1163
which can either be:
1165
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1166
list sent by the client. This header is followed by the contents of
1167
the file, bzip2-compressed.
1168
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1169
The client can then raise an appropriate RevisionNotPresent error
1170
or check its fallback repositories.
1175
def body_stream(self, repository, desired_files):
1176
self._repository.lock_read()
1179
for i, key in enumerate(desired_files):
1181
for record in repository.texts.get_record_stream(text_keys,
1183
identifier = text_keys[record.key]
1184
if record.storage_kind == 'absent':
1185
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1186
record.key[1], identifier)
1187
# FIXME: Way to abort early?
1189
yield "ok\0%d\n" % identifier
1190
compressor = zlib.compressobj()
1191
for bytes in record.get_bytes_as('chunked'):
1192
data = compressor.compress(bytes)
1195
data = compressor.flush()
1199
self._repository.unlock()
1201
def do_body(self, body_bytes):
1203
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1204
return SuccessfulSmartServerResponse(('ok', ),
1205
body_stream=self.body_stream(self._repository, desired_files))
1207
def do_repository_request(self, repository):
1208
# Signal that we want a body
1212
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1213
"""Stream a list of revisions.
1215
The client sends a list of newline-separated revision ids in the
1216
body of the request and the server replies with the serializer format,
1217
and a stream of bzip2-compressed revision texts (using the specified
1220
Any revisions the server does not have are omitted from the stream.
1225
def do_repository_request(self, repository):
1226
self._repository = repository
1227
# Signal there is a body
1230
def do_body(self, body_bytes):
1231
revision_ids = body_bytes.split("\n")
1232
return SuccessfulSmartServerResponse(
1233
('ok', self._repository.get_serializer_format()),
1234
body_stream=self.body_stream(self._repository, revision_ids))
1236
def body_stream(self, repository, revision_ids):
1237
self._repository.lock_read()
1239
for record in repository.revisions.get_record_stream(
1240
[(revid,) for revid in revision_ids], 'unordered', True):
1241
if record.storage_kind == 'absent':
1243
yield zlib.compress(record.get_bytes_as('fulltext'))
1245
self._repository.unlock()