228
229
# add parents to the result
229
230
result[encoded_id] = parents
230
231
# Approximate the serialized cost of this revision_id.
231
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
232
line = '%s %s\n' % (encoded_id, ' '.join(parents))
233
estimator.add_content(line)
232
234
# get all the directly asked for parents, and then flesh out to
233
235
# 64K (compressed) or so. We do one level of depth at a time to
234
236
# stay in sync with the client. The 250000 magic number is
235
237
# estimated compression ratio taken from bzr.dev itself.
236
if self.no_extra_results or (
237
first_loop_done and size_so_far > 250000):
238
if self.no_extra_results or (first_loop_done and estimator.full()):
239
trace.mutter('size: %d, z_size: %d'
240
% (estimator._uncompressed_size_added,
241
estimator._compressed_size_added))
238
242
next_revs = set()
240
244
# don't query things we've already queried
241
next_revs.difference_update(queried_revs)
245
next_revs = next_revs.difference(queried_revs)
242
246
first_loop_done = True
249
def _do_repository_request(self, body_bytes):
250
repository = self._repository
251
revision_ids = set(self._revision_ids)
252
include_missing = 'include-missing:' in revision_ids
254
revision_ids.remove('include-missing:')
255
body_lines = body_bytes.split('\n')
256
search_result, error = self.recreate_search_from_recipe(
257
repository, body_lines)
258
if error is not None:
260
# TODO might be nice to start up the search again; but thats not
261
# written or tested yet.
262
client_seen_revs = set(search_result.get_keys())
263
# Always include the requested ids.
264
client_seen_revs.difference_update(revision_ids)
266
repo_graph = repository.get_graph()
267
result = self._expand_requested_revs(repo_graph, revision_ids,
268
client_seen_revs, include_missing)
244
270
# sorting trivially puts lexographically similar revision ids together.
245
271
# Compression FTW.
246
273
for revision, parents in sorted(result.items()):
247
274
lines.append(' '.join((revision, ) + tuple(parents)))
840
961
self.do_insert_stream_request(repository, resume_tokens)
964
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
965
"""Add a revision signature text.
970
def do_repository_request(self, repository, lock_token, revision_id,
971
*write_group_tokens):
972
"""Add a revision signature text.
974
:param repository: Repository to operate on
975
:param lock_token: Lock token
976
:param revision_id: Revision for which to add signature
977
:param write_group_tokens: Write group tokens
979
self._lock_token = lock_token
980
self._revision_id = revision_id
981
self._write_group_tokens = write_group_tokens
984
def do_body(self, body_bytes):
985
"""Add a signature text.
987
:param body_bytes: GPG signature text
988
:return: SuccessfulSmartServerResponse with arguments 'ok' and
989
the list of new write group tokens.
991
self._repository.lock_write(token=self._lock_token)
993
self._repository.resume_write_group(self._write_group_tokens)
995
self._repository.add_signature_text(self._revision_id,
998
new_write_group_tokens = self._repository.suspend_write_group()
1000
self._repository.unlock()
1001
return SuccessfulSmartServerResponse(
1002
('ok', ) + tuple(new_write_group_tokens))
1005
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1006
"""Start a write group.
1011
def do_repository_request(self, repository, lock_token):
1012
"""Start a write group."""
1013
repository.lock_write(token=lock_token)
1015
repository.start_write_group()
1017
tokens = repository.suspend_write_group()
1018
except errors.UnsuspendableWriteGroup:
1019
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1022
return SuccessfulSmartServerResponse(('ok', tokens))
1025
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1026
"""Commit a write group.
1031
def do_repository_request(self, repository, lock_token,
1032
write_group_tokens):
1033
"""Commit a write group."""
1034
repository.lock_write(token=lock_token)
1037
repository.resume_write_group(write_group_tokens)
1038
except errors.UnresumableWriteGroup, e:
1039
return FailedSmartServerResponse(
1040
('UnresumableWriteGroup', e.write_groups, e.reason))
1042
repository.commit_write_group()
1044
write_group_tokens = repository.suspend_write_group()
1045
# FIXME JRV 2011-11-19: What if the write_group_tokens
1050
return SuccessfulSmartServerResponse(('ok', ))
1053
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1054
"""Abort a write group.
1059
def do_repository_request(self, repository, lock_token, write_group_tokens):
1060
"""Abort a write group."""
1061
repository.lock_write(token=lock_token)
1064
repository.resume_write_group(write_group_tokens)
1065
except errors.UnresumableWriteGroup, e:
1066
return FailedSmartServerResponse(
1067
('UnresumableWriteGroup', e.write_groups, e.reason))
1068
repository.abort_write_group()
1071
return SuccessfulSmartServerResponse(('ok', ))
1074
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1075
"""Check that a write group is still valid.
1080
def do_repository_request(self, repository, lock_token, write_group_tokens):
1081
"""Abort a write group."""
1082
repository.lock_write(token=lock_token)
1085
repository.resume_write_group(write_group_tokens)
1086
except errors.UnresumableWriteGroup, e:
1087
return FailedSmartServerResponse(
1088
('UnresumableWriteGroup', e.write_groups, e.reason))
1090
repository.suspend_write_group()
1093
return SuccessfulSmartServerResponse(('ok', ))
1096
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1097
"""Retrieve all of the revision ids in a repository.
1102
def do_repository_request(self, repository):
1103
revids = repository.all_revision_ids()
1104
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1107
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1108
"""Reconcile a repository.
1113
def do_repository_request(self, repository, lock_token):
1115
repository.lock_write(token=lock_token)
1116
except errors.TokenLockingNotSupported, e:
1117
return FailedSmartServerResponse(
1118
('TokenLockingNotSupported', ))
1120
reconciler = repository.reconcile()
1124
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1125
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1127
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1130
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1131
"""Pack a repository.
1136
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1137
self._repository = repository
1138
self._lock_token = lock_token
1139
if clean_obsolete_packs == 'True':
1140
self._clean_obsolete_packs = True
1142
self._clean_obsolete_packs = False
1145
def do_body(self, body_bytes):
1146
if body_bytes == "":
1149
hint = body_bytes.splitlines()
1150
self._repository.lock_write(token=self._lock_token)
1152
self._repository.pack(hint, self._clean_obsolete_packs)
1154
self._repository.unlock()
1155
return SuccessfulSmartServerResponse(("ok", ), )
1158
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1159
"""Iterate over the contents of files.
1161
The client sends a list of desired files to stream, one
1162
per line, and as tuples of file id and revision, separated by
1165
The server replies with a stream. Each entry is preceded by a header,
1166
which can either be:
1168
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1169
list sent by the client. This header is followed by the contents of
1170
the file, bzip2-compressed.
1171
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1172
The client can then raise an appropriate RevisionNotPresent error
1173
or check its fallback repositories.
1178
def body_stream(self, repository, desired_files):
1179
self._repository.lock_read()
1182
for i, key in enumerate(desired_files):
1184
for record in repository.texts.get_record_stream(text_keys,
1186
identifier = text_keys[record.key]
1187
if record.storage_kind == 'absent':
1188
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1189
record.key[1], identifier)
1190
# FIXME: Way to abort early?
1192
yield "ok\0%d\n" % identifier
1193
compressor = zlib.compressobj()
1194
for bytes in record.get_bytes_as('chunked'):
1195
data = compressor.compress(bytes)
1198
data = compressor.flush()
1202
self._repository.unlock()
1204
def do_body(self, body_bytes):
1206
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1207
return SuccessfulSmartServerResponse(('ok', ),
1208
body_stream=self.body_stream(self._repository, desired_files))
1210
def do_repository_request(self, repository):
1211
# Signal that we want a body
1215
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1216
"""Stream a list of revisions.
1218
The client sends a list of newline-separated revision ids in the
1219
body of the request and the server replies with the serializer format,
1220
and a stream of bzip2-compressed revision texts (using the specified
1223
Any revisions the server does not have are omitted from the stream.
1228
def do_repository_request(self, repository):
1229
self._repository = repository
1230
# Signal there is a body
1233
def do_body(self, body_bytes):
1234
revision_ids = body_bytes.split("\n")
1235
return SuccessfulSmartServerResponse(
1236
('ok', self._repository.get_serializer_format()),
1237
body_stream=self.body_stream(self._repository, revision_ids))
1239
def body_stream(self, repository, revision_ids):
1240
self._repository.lock_read()
1242
for record in repository.revisions.get_record_stream(
1243
[(revid,) for revid in revision_ids], 'unordered', True):
1244
if record.storage_kind == 'absent':
1246
yield zlib.compress(record.get_bytes_as('fulltext'))
1248
self._repository.unlock()
1251
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1252
"""Get the inventory deltas for a set of revision ids.
1254
This accepts a list of revision ids, and then sends a chain
1255
of deltas for the inventories of those revisions. The first
1256
revision will be empty.
1258
The server writes back zlibbed serialized inventory deltas,
1259
in the ordering specified. The base for each delta is the
1260
inventory generated by the previous delta.
1265
def _inventory_delta_stream(self, repository, ordering, revids):
1266
prev_inv = _mod_inventory.Inventory(root_id=None,
1267
revision_id=_mod_revision.NULL_REVISION)
1268
serializer = inventory_delta.InventoryDeltaSerializer(
1269
repository.supports_rich_root(),
1270
repository._format.supports_tree_reference)
1271
repository.lock_read()
1273
for inv, revid in repository._iter_inventories(revids, ordering):
1276
inv_delta = inv._make_delta(prev_inv)
1277
lines = serializer.delta_to_lines(
1278
prev_inv.revision_id, inv.revision_id, inv_delta)
1279
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1284
def body_stream(self, repository, ordering, revids):
1285
substream = self._inventory_delta_stream(repository,
1287
return _stream_to_byte_stream([('inventory-deltas', substream)],
1290
def do_body(self, body_bytes):
1291
return SuccessfulSmartServerResponse(('ok', ),
1292
body_stream=self.body_stream(self._repository, self._ordering,
1293
body_bytes.splitlines()))
1295
def do_repository_request(self, repository, ordering):
1296
if ordering == 'unordered':
1297
# inventory deltas for a topologically sorted stream
1298
# are likely to be smaller
1299
ordering = 'topological'
1300
self._ordering = ordering
1301
# Signal that we want a body