228
231
# add parents to the result
229
232
result[encoded_id] = parents
230
233
# Approximate the serialized cost of this revision_id.
231
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
232
236
# get all the directly asked for parents, and then flesh out to
233
237
# 64K (compressed) or so. We do one level of depth at a time to
234
238
# stay in sync with the client. The 250000 magic number is
235
239
# estimated compression ratio taken from bzr.dev itself.
236
if self.no_extra_results or (
237
first_loop_done and size_so_far > 250000):
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
238
244
next_revs = set()
240
246
# don't query things we've already queried
241
next_revs.difference_update(queried_revs)
247
next_revs = next_revs.difference(queried_revs)
242
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
244
272
# sorting trivially puts lexographically similar revision ids together.
245
273
# Compression FTW.
246
275
for revision, parents in sorted(result.items()):
247
276
lines.append(' '.join((revision, ) + tuple(parents)))
828
963
self.do_insert_stream_request(repository, resume_tokens)
966
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
967
"""Add a revision signature text.
972
def do_repository_request(self, repository, lock_token, revision_id,
973
*write_group_tokens):
974
"""Add a revision signature text.
976
:param repository: Repository to operate on
977
:param lock_token: Lock token
978
:param revision_id: Revision for which to add signature
979
:param write_group_tokens: Write group tokens
981
self._lock_token = lock_token
982
self._revision_id = revision_id
983
self._write_group_tokens = write_group_tokens
986
def do_body(self, body_bytes):
987
"""Add a signature text.
989
:param body_bytes: GPG signature text
990
:return: SuccessfulSmartServerResponse with arguments 'ok' and
991
the list of new write group tokens.
993
self._repository.lock_write(token=self._lock_token)
995
self._repository.resume_write_group(self._write_group_tokens)
997
self._repository.add_signature_text(self._revision_id,
1000
new_write_group_tokens = self._repository.suspend_write_group()
1002
self._repository.unlock()
1003
return SuccessfulSmartServerResponse(
1004
('ok', ) + tuple(new_write_group_tokens))
1007
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1008
"""Start a write group.
1013
def do_repository_request(self, repository, lock_token):
1014
"""Start a write group."""
1015
repository.lock_write(token=lock_token)
1017
repository.start_write_group()
1019
tokens = repository.suspend_write_group()
1020
except errors.UnsuspendableWriteGroup:
1021
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1024
return SuccessfulSmartServerResponse(('ok', tokens))
1027
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1028
"""Commit a write group.
1033
def do_repository_request(self, repository, lock_token,
1034
write_group_tokens):
1035
"""Commit a write group."""
1036
repository.lock_write(token=lock_token)
1039
repository.resume_write_group(write_group_tokens)
1040
except errors.UnresumableWriteGroup, e:
1041
return FailedSmartServerResponse(
1042
('UnresumableWriteGroup', e.write_groups, e.reason))
1044
repository.commit_write_group()
1046
write_group_tokens = repository.suspend_write_group()
1047
# FIXME JRV 2011-11-19: What if the write_group_tokens
1052
return SuccessfulSmartServerResponse(('ok', ))
1055
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1056
"""Abort a write group.
1061
def do_repository_request(self, repository, lock_token, write_group_tokens):
1062
"""Abort a write group."""
1063
repository.lock_write(token=lock_token)
1066
repository.resume_write_group(write_group_tokens)
1067
except errors.UnresumableWriteGroup, e:
1068
return FailedSmartServerResponse(
1069
('UnresumableWriteGroup', e.write_groups, e.reason))
1070
repository.abort_write_group()
1073
return SuccessfulSmartServerResponse(('ok', ))
1076
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1077
"""Check that a write group is still valid.
1082
def do_repository_request(self, repository, lock_token, write_group_tokens):
1083
"""Abort a write group."""
1084
repository.lock_write(token=lock_token)
1087
repository.resume_write_group(write_group_tokens)
1088
except errors.UnresumableWriteGroup, e:
1089
return FailedSmartServerResponse(
1090
('UnresumableWriteGroup', e.write_groups, e.reason))
1092
repository.suspend_write_group()
1095
return SuccessfulSmartServerResponse(('ok', ))
1098
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1099
"""Retrieve all of the revision ids in a repository.
1104
def do_repository_request(self, repository):
1105
revids = repository.all_revision_ids()
1106
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1109
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1110
"""Reconcile a repository.
1115
def do_repository_request(self, repository, lock_token):
1117
repository.lock_write(token=lock_token)
1118
except errors.TokenLockingNotSupported, e:
1119
return FailedSmartServerResponse(
1120
('TokenLockingNotSupported', ))
1122
reconciler = repository.reconcile()
1126
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1127
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1129
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1132
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1133
"""Pack a repository.
1138
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1139
self._repository = repository
1140
self._lock_token = lock_token
1141
if clean_obsolete_packs == 'True':
1142
self._clean_obsolete_packs = True
1144
self._clean_obsolete_packs = False
1147
def do_body(self, body_bytes):
1148
if body_bytes == "":
1151
hint = body_bytes.splitlines()
1152
self._repository.lock_write(token=self._lock_token)
1154
self._repository.pack(hint, self._clean_obsolete_packs)
1156
self._repository.unlock()
1157
return SuccessfulSmartServerResponse(("ok", ), )
1160
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1161
"""Iterate over the contents of files.
1163
The client sends a list of desired files to stream, one
1164
per line, and as tuples of file id and revision, separated by
1167
The server replies with a stream. Each entry is preceded by a header,
1168
which can either be:
1170
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1171
list sent by the client. This header is followed by the contents of
1172
the file, bzip2-compressed.
1173
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1174
The client can then raise an appropriate RevisionNotPresent error
1175
or check its fallback repositories.
1180
def body_stream(self, repository, desired_files):
1181
self._repository.lock_read()
1184
for i, key in enumerate(desired_files):
1186
for record in repository.texts.get_record_stream(text_keys,
1188
identifier = text_keys[record.key]
1189
if record.storage_kind == 'absent':
1190
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1191
record.key[1], identifier)
1192
# FIXME: Way to abort early?
1194
yield "ok\0%d\n" % identifier
1195
compressor = zlib.compressobj()
1196
for bytes in record.get_bytes_as('chunked'):
1197
data = compressor.compress(bytes)
1200
data = compressor.flush()
1204
self._repository.unlock()
1206
def do_body(self, body_bytes):
1208
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1209
return SuccessfulSmartServerResponse(('ok', ),
1210
body_stream=self.body_stream(self._repository, desired_files))
1212
def do_repository_request(self, repository):
1213
# Signal that we want a body
1217
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1218
"""Stream a list of revisions.
1220
The client sends a list of newline-separated revision ids in the
1221
body of the request and the server replies with the serializer format,
1222
and a stream of bzip2-compressed revision texts (using the specified
1225
Any revisions the server does not have are omitted from the stream.
1230
def do_repository_request(self, repository):
1231
self._repository = repository
1232
# Signal there is a body
1235
def do_body(self, body_bytes):
1236
revision_ids = body_bytes.split("\n")
1237
return SuccessfulSmartServerResponse(
1238
('ok', self._repository.get_serializer_format()),
1239
body_stream=self.body_stream(self._repository, revision_ids))
1241
def body_stream(self, repository, revision_ids):
1242
self._repository.lock_read()
1244
for record in repository.revisions.get_record_stream(
1245
[(revid,) for revid in revision_ids], 'unordered', True):
1246
if record.storage_kind == 'absent':
1248
yield zlib.compress(record.get_bytes_as('fulltext'))
1250
self._repository.unlock()
1253
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1254
"""Get the inventory deltas for a set of revision ids.
1256
This accepts a list of revision ids, and then sends a chain
1257
of deltas for the inventories of those revisions. The first
1258
revision will be empty.
1260
The server writes back zlibbed serialized inventory deltas,
1261
in the ordering specified. The base for each delta is the
1262
inventory generated by the previous delta.
1267
def _inventory_delta_stream(self, repository, ordering, revids):
1268
prev_inv = _mod_inventory.Inventory(root_id=None,
1269
revision_id=_mod_revision.NULL_REVISION)
1270
serializer = inventory_delta.InventoryDeltaSerializer(
1271
repository.supports_rich_root(),
1272
repository._format.supports_tree_reference)
1273
repository.lock_read()
1275
for inv, revid in repository._iter_inventories(revids, ordering):
1278
inv_delta = inv._make_delta(prev_inv)
1279
lines = serializer.delta_to_lines(
1280
prev_inv.revision_id, inv.revision_id, inv_delta)
1281
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1286
def body_stream(self, repository, ordering, revids):
1287
substream = self._inventory_delta_stream(repository,
1289
return _stream_to_byte_stream([('inventory-deltas', substream)],
1292
def do_body(self, body_bytes):
1293
return SuccessfulSmartServerResponse(('ok', ),
1294
body_stream=self.body_stream(self._repository, self._ordering,
1295
body_bytes.splitlines()))
1297
def do_repository_request(self, repository, ordering):
1298
if ordering == 'unordered':
1299
# inventory deltas for a topologically sorted stream
1300
# are likely to be smaller
1301
ordering = 'topological'
1302
self._ordering = ordering
1303
# Signal that we want a body