227
231
# add parents to the result
228
232
result[encoded_id] = parents
229
233
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
231
236
# get all the directly asked for parents, and then flesh out to
232
237
# 64K (compressed) or so. We do one level of depth at a time to
233
238
# stay in sync with the client. The 250000 magic number is
234
239
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
237
244
next_revs = set()
239
246
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
247
next_revs = next_revs.difference(queried_revs)
241
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
243
272
# sorting trivially puts lexographically similar revision ids together.
244
273
# Compression FTW.
245
275
for revision, parents in sorted(result.items()):
246
276
lines.append(' '.join((revision, ) + tuple(parents)))
581
706
def record_stream(self):
582
707
"""Yield substream_type, substream from the byte stream."""
708
def wrap_and_count(pb, rc, substream):
709
"""Yield records from stream while showing progress."""
712
if self.current_type != 'revisions' and self.key_count != 0:
713
# As we know the number of revisions now (in self.key_count)
714
# we can setup and use record_counter (rc).
715
if not rc.is_initialized():
716
rc.setup(self.key_count, self.key_count)
717
for record in substream.read():
719
if rc.is_initialized() and counter == rc.STEP:
720
rc.increment(counter)
721
pb.update('Estimate', rc.current, rc.max)
723
if self.current_type == 'revisions':
724
# Total records is proportional to number of revs
725
# to fetch. With remote, we used self.key_count to
726
# track the number of revs. Once we have the revs
727
# counts in self.key_count, the progress bar changes
728
# from 'Estimating..' to 'Estimate' above.
730
if counter == rc.STEP:
731
pb.update('Estimating..', self.key_count)
583
736
self.seed_state()
584
# Make and consume sub generators, one per substream type:
585
while self.first_bytes is not None:
586
substream = NetworkRecordStream(self.iter_substream_bytes())
587
# after substream is fully consumed, self.current_type is set to
588
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
737
pb = ui.ui_factory.nested_progress_bar()
738
rc = self._record_counter
740
# Make and consume sub generators, one per substream type:
741
while self.first_bytes is not None:
742
substream = NetworkRecordStream(self.iter_substream_bytes())
743
# after substream is fully consumed, self.current_type is set
744
# to the next type, and self.first_bytes is set to the matching
746
yield self.current_type, wrap_and_count(pb, rc, substream)
749
pb.update('Done', rc.max, rc.max)
591
752
def seed_state(self):
592
753
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
966
self.do_insert_stream_request(repository, resume_tokens)
969
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
970
"""Add a revision signature text.
975
def do_repository_request(self, repository, lock_token, revision_id,
976
*write_group_tokens):
977
"""Add a revision signature text.
979
:param repository: Repository to operate on
980
:param lock_token: Lock token
981
:param revision_id: Revision for which to add signature
982
:param write_group_tokens: Write group tokens
984
self._lock_token = lock_token
985
self._revision_id = revision_id
986
self._write_group_tokens = write_group_tokens
989
def do_body(self, body_bytes):
990
"""Add a signature text.
992
:param body_bytes: GPG signature text
993
:return: SuccessfulSmartServerResponse with arguments 'ok' and
994
the list of new write group tokens.
996
self._repository.lock_write(token=self._lock_token)
998
self._repository.resume_write_group(self._write_group_tokens)
1000
self._repository.add_signature_text(self._revision_id,
1003
new_write_group_tokens = self._repository.suspend_write_group()
1005
self._repository.unlock()
1006
return SuccessfulSmartServerResponse(
1007
('ok', ) + tuple(new_write_group_tokens))
1010
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1011
"""Start a write group.
1016
def do_repository_request(self, repository, lock_token):
1017
"""Start a write group."""
1018
repository.lock_write(token=lock_token)
1020
repository.start_write_group()
1022
tokens = repository.suspend_write_group()
1023
except errors.UnsuspendableWriteGroup:
1024
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1027
return SuccessfulSmartServerResponse(('ok', tokens))
1030
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1031
"""Commit a write group.
1036
def do_repository_request(self, repository, lock_token,
1037
write_group_tokens):
1038
"""Commit a write group."""
1039
repository.lock_write(token=lock_token)
1042
repository.resume_write_group(write_group_tokens)
1043
except errors.UnresumableWriteGroup, e:
1044
return FailedSmartServerResponse(
1045
('UnresumableWriteGroup', e.write_groups, e.reason))
1047
repository.commit_write_group()
1049
write_group_tokens = repository.suspend_write_group()
1050
# FIXME JRV 2011-11-19: What if the write_group_tokens
1055
return SuccessfulSmartServerResponse(('ok', ))
1058
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1059
"""Abort a write group.
1064
def do_repository_request(self, repository, lock_token, write_group_tokens):
1065
"""Abort a write group."""
1066
repository.lock_write(token=lock_token)
1069
repository.resume_write_group(write_group_tokens)
1070
except errors.UnresumableWriteGroup, e:
1071
return FailedSmartServerResponse(
1072
('UnresumableWriteGroup', e.write_groups, e.reason))
1073
repository.abort_write_group()
1076
return SuccessfulSmartServerResponse(('ok', ))
1079
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1080
"""Check that a write group is still valid.
1085
def do_repository_request(self, repository, lock_token, write_group_tokens):
1086
"""Abort a write group."""
1087
repository.lock_write(token=lock_token)
1090
repository.resume_write_group(write_group_tokens)
1091
except errors.UnresumableWriteGroup, e:
1092
return FailedSmartServerResponse(
1093
('UnresumableWriteGroup', e.write_groups, e.reason))
1095
repository.suspend_write_group()
1098
return SuccessfulSmartServerResponse(('ok', ))
1101
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1102
"""Retrieve all of the revision ids in a repository.
1107
def do_repository_request(self, repository):
1108
revids = repository.all_revision_ids()
1109
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1112
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1113
"""Reconcile a repository.
1118
def do_repository_request(self, repository, lock_token):
1120
repository.lock_write(token=lock_token)
1121
except errors.TokenLockingNotSupported, e:
1122
return FailedSmartServerResponse(
1123
('TokenLockingNotSupported', ))
1125
reconciler = repository.reconcile()
1129
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1130
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1132
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1135
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1136
"""Pack a repository.
1141
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1142
self._repository = repository
1143
self._lock_token = lock_token
1144
if clean_obsolete_packs == 'True':
1145
self._clean_obsolete_packs = True
1147
self._clean_obsolete_packs = False
1150
def do_body(self, body_bytes):
1151
if body_bytes == "":
1154
hint = body_bytes.splitlines()
1155
self._repository.lock_write(token=self._lock_token)
1157
self._repository.pack(hint, self._clean_obsolete_packs)
1159
self._repository.unlock()
1160
return SuccessfulSmartServerResponse(("ok", ), )
1163
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1164
"""Iterate over the contents of files.
1166
The client sends a list of desired files to stream, one
1167
per line, and as tuples of file id and revision, separated by
1170
The server replies with a stream. Each entry is preceded by a header,
1171
which can either be:
1173
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1174
list sent by the client. This header is followed by the contents of
1175
the file, bzip2-compressed.
1176
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1177
The client can then raise an appropriate RevisionNotPresent error
1178
or check its fallback repositories.
1183
def body_stream(self, repository, desired_files):
1184
self._repository.lock_read()
1187
for i, key in enumerate(desired_files):
1189
for record in repository.texts.get_record_stream(text_keys,
1191
identifier = text_keys[record.key]
1192
if record.storage_kind == 'absent':
1193
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1194
record.key[1], identifier)
1195
# FIXME: Way to abort early?
1197
yield "ok\0%d\n" % identifier
1198
compressor = zlib.compressobj()
1199
for bytes in record.get_bytes_as('chunked'):
1200
data = compressor.compress(bytes)
1203
data = compressor.flush()
1207
self._repository.unlock()
1209
def do_body(self, body_bytes):
1211
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1212
return SuccessfulSmartServerResponse(('ok', ),
1213
body_stream=self.body_stream(self._repository, desired_files))
1215
def do_repository_request(self, repository):
1216
# Signal that we want a body
1220
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1221
"""Stream a list of revisions.
1223
The client sends a list of newline-separated revision ids in the
1224
body of the request and the server replies with the serializer format,
1225
and a stream of bzip2-compressed revision texts (using the specified
1228
Any revisions the server does not have are omitted from the stream.
1233
def do_repository_request(self, repository):
1234
self._repository = repository
1235
# Signal there is a body
1238
def do_body(self, body_bytes):
1239
revision_ids = body_bytes.split("\n")
1240
return SuccessfulSmartServerResponse(
1241
('ok', self._repository.get_serializer_format()),
1242
body_stream=self.body_stream(self._repository, revision_ids))
1244
def body_stream(self, repository, revision_ids):
1245
self._repository.lock_read()
1247
for record in repository.revisions.get_record_stream(
1248
[(revid,) for revid in revision_ids], 'unordered', True):
1249
if record.storage_kind == 'absent':
1251
yield zlib.compress(record.get_bytes_as('fulltext'))
1253
self._repository.unlock()
1256
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1257
"""Get the inventory deltas for a set of revision ids.
1259
This accepts a list of revision ids, and then sends a chain
1260
of deltas for the inventories of those revisions. The first
1261
revision will be empty.
1263
The server writes back zlibbed serialized inventory deltas,
1264
in the ordering specified. The base for each delta is the
1265
inventory generated by the previous delta.
1270
def _inventory_delta_stream(self, repository, ordering, revids):
1271
prev_inv = _mod_inventory.Inventory(root_id=None,
1272
revision_id=_mod_revision.NULL_REVISION)
1273
serializer = inventory_delta.InventoryDeltaSerializer(
1274
repository.supports_rich_root(),
1275
repository._format.supports_tree_reference)
1276
repository.lock_read()
1278
for inv, revid in repository._iter_inventories(revids, ordering):
1281
inv_delta = inv._make_delta(prev_inv)
1282
lines = serializer.delta_to_lines(
1283
prev_inv.revision_id, inv.revision_id, inv_delta)
1284
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1289
def body_stream(self, repository, ordering, revids):
1290
substream = self._inventory_delta_stream(repository,
1292
return _stream_to_byte_stream([('inventory-deltas', substream)],
1295
def do_body(self, body_bytes):
1296
return SuccessfulSmartServerResponse(('ok', ),
1297
body_stream=self.body_stream(self._repository, self._ordering,
1298
body_bytes.splitlines()))
1300
def do_repository_request(self, repository, ordering):
1301
if ordering == 'unordered':
1302
# inventory deltas for a topologically sorted stream
1303
# are likely to be smaller
1304
ordering = 'topological'
1305
self._ordering = ordering
1306
# Signal that we want a body