84
82
recreate_search trusts that clients will look for missing things
85
83
they expected and get it from elsewhere.
87
if search_bytes == 'everything':
88
return graph.EverythingResult(repository), None
89
85
lines = search_bytes.split('\n')
90
86
if lines[0] == 'ancestry-of':
185
180
repository.unlock()
187
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
188
include_missing, max_size=65536):
182
def _do_repository_request(self, body_bytes):
183
repository = self._repository
184
revision_ids = set(self._revision_ids)
185
include_missing = 'include-missing:' in revision_ids
187
revision_ids.remove('include-missing:')
188
body_lines = body_bytes.split('\n')
189
search_result, error = self.recreate_search_from_recipe(
190
repository, body_lines)
191
if error is not None:
193
# TODO might be nice to start up the search again; but thats not
194
# written or tested yet.
195
client_seen_revs = set(search_result.get_keys())
196
# Always include the requested ids.
197
client_seen_revs.difference_update(revision_ids)
199
repo_graph = repository.get_graph()
190
201
queried_revs = set()
191
estimator = estimate_compressed_size.ZLibEstimator(max_size)
192
203
next_revs = revision_ids
193
204
first_loop_done = False
216
227
# add parents to the result
217
228
result[encoded_id] = parents
218
229
# Approximate the serialized cost of this revision_id.
219
line = '%s %s\n' % (encoded_id, ' '.join(parents))
220
estimator.add_content(line)
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
221
231
# get all the directly asked for parents, and then flesh out to
222
232
# 64K (compressed) or so. We do one level of depth at a time to
223
233
# stay in sync with the client. The 250000 magic number is
224
234
# estimated compression ratio taken from bzr.dev itself.
225
if self.no_extra_results or (first_loop_done and estimator.full()):
226
trace.mutter('size: %d, z_size: %d'
227
% (estimator._uncompressed_size_added,
228
estimator._compressed_size_added))
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
229
237
next_revs = set()
231
239
# don't query things we've already queried
232
next_revs = next_revs.difference(queried_revs)
240
next_revs.difference_update(queried_revs)
233
241
first_loop_done = True
236
def _do_repository_request(self, body_bytes):
237
repository = self._repository
238
revision_ids = set(self._revision_ids)
239
include_missing = 'include-missing:' in revision_ids
241
revision_ids.remove('include-missing:')
242
body_lines = body_bytes.split('\n')
243
search_result, error = self.recreate_search_from_recipe(
244
repository, body_lines)
245
if error is not None:
247
# TODO might be nice to start up the search again; but thats not
248
# written or tested yet.
249
client_seen_revs = set(search_result.get_keys())
250
# Always include the requested ids.
251
client_seen_revs.difference_update(revision_ids)
253
repo_graph = repository.get_graph()
254
result = self._expand_requested_revs(repo_graph, revision_ids,
255
client_seen_revs, include_missing)
257
243
# sorting trivially puts lexographically similar revision ids together.
258
244
# Compression FTW.
260
245
for revision, parents in sorted(result.items()):
261
246
lines.append(' '.join((revision, ) + tuple(parents)))
410
token = repository.lock_write(token=token).repository_token
395
token = repository.lock_write(token=token)
411
396
except errors.LockContention, e:
412
397
return FailedSmartServerResponse(('LockContention',))
413
398
except errors.UnlockableTransport:
428
413
def do_repository_request(self, repository, to_network_name):
429
414
"""Get a stream for inserting into a to_format repository.
431
The request body is 'search_bytes', a description of the revisions
434
In 2.3 this verb added support for search_bytes == 'everything'. Older
435
implementations will respond with a BadSearch error, and clients should
436
catch this and fallback appropriately.
438
416
:param repository: The repository to stream from.
439
417
:param to_network_name: The network name of the format of the target
514
492
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
515
"""The same as Repository.get_stream, but will return stream CHK formats to
518
See SmartServerRepositoryGetStream._should_fake_unknown.
523
494
def _should_fake_unknown(self):
524
495
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
534
505
for record in substream:
535
506
if record.storage_kind in ('chunked', 'fulltext'):
536
507
serialised = record_to_fulltext_bytes(record)
508
elif record.storage_kind == 'inventory-delta':
509
serialised = record_to_inventory_delta_bytes(record)
537
510
elif record.storage_kind == 'absent':
538
511
raise ValueError("Absent factory for %s" % (record.key,))
571
544
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
574
def __init__(self, byte_stream, record_counter):
547
def __init__(self, byte_stream):
575
548
"""Create a _ByteStreamDecoder."""
576
549
self.stream_decoder = pack.ContainerPushParser()
577
550
self.current_type = None
578
551
self.first_bytes = None
579
552
self.byte_stream = byte_stream
580
self._record_counter = record_counter
583
554
def iter_stream_decoder(self):
584
555
"""Iterate the contents of the pack from stream_decoder."""
610
581
def record_stream(self):
611
582
"""Yield substream_type, substream from the byte stream."""
612
def wrap_and_count(pb, rc, substream):
613
"""Yield records from stream while showing progress."""
616
if self.current_type != 'revisions' and self.key_count != 0:
617
# As we know the number of revisions now (in self.key_count)
618
# we can setup and use record_counter (rc).
619
if not rc.is_initialized():
620
rc.setup(self.key_count, self.key_count)
621
for record in substream.read():
623
if rc.is_initialized() and counter == rc.STEP:
624
rc.increment(counter)
625
pb.update('Estimate', rc.current, rc.max)
627
if self.current_type == 'revisions':
628
# Total records is proportional to number of revs
629
# to fetch. With remote, we used self.key_count to
630
# track the number of revs. Once we have the revs
631
# counts in self.key_count, the progress bar changes
632
# from 'Estimating..' to 'Estimate' above.
634
if counter == rc.STEP:
635
pb.update('Estimating..', self.key_count)
640
583
self.seed_state()
641
pb = ui.ui_factory.nested_progress_bar()
642
rc = self._record_counter
643
584
# Make and consume sub generators, one per substream type:
644
585
while self.first_bytes is not None:
645
586
substream = NetworkRecordStream(self.iter_substream_bytes())
646
587
# after substream is fully consumed, self.current_type is set to
647
588
# the next type, and self.first_bytes is set to the matching bytes.
648
yield self.current_type, wrap_and_count(pb, rc, substream)
650
pb.update('Done', rc.max, rc.max)
589
yield self.current_type, substream.read()
653
591
def seed_state(self):
654
592
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
659
597
list(self.iter_substream_bytes())
662
def _byte_stream_to_stream(byte_stream, record_counter=None):
600
def _byte_stream_to_stream(byte_stream):
663
601
"""Convert a byte stream into a format and a stream.
665
603
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
666
604
:return: (RepositoryFormat, stream_generator)
668
decoder = _ByteStreamDecoder(byte_stream, record_counter)
606
decoder = _ByteStreamDecoder(byte_stream)
669
607
for bytes in byte_stream:
670
608
decoder.stream_decoder.accept_bytes(bytes)
671
609
for record in decoder.stream_decoder.read_pending_records(max=1):
739
677
def _tarball_of_dir(self, dirname, compression, ofile):
741
678
filename = os.path.basename(ofile.name)
742
679
tarball = tarfile.open(fileobj=ofile, name=filename,
743
680
mode='w|' + compression)