~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

(robertc) Fix lp: urls behind an https proxy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 
26
26
from bzrlib import (
27
27
    bencode,
28
 
    commands,
29
28
    errors,
30
 
    estimate_compressed_size,
31
29
    graph,
32
30
    osutils,
33
31
    pack,
34
 
    trace,
35
32
    ui,
 
33
    versionedfile,
36
34
    )
37
35
from bzrlib.bzrdir import BzrDir
38
36
from bzrlib.smart.request import (
84
82
            recreate_search trusts that clients will look for missing things
85
83
            they expected and get it from elsewhere.
86
84
        """
87
 
        if search_bytes == 'everything':
88
 
            return graph.EverythingResult(repository), None
89
85
        lines = search_bytes.split('\n')
90
86
        if lines[0] == 'ancestry-of':
91
87
            heads = lines[1:]
144
140
        finally:
145
141
            repository.unlock()
146
142
 
147
 
_lsprof_count = 0
148
143
 
149
144
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
150
145
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
184
179
        finally:
185
180
            repository.unlock()
186
181
 
187
 
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
188
 
                               include_missing, max_size=65536):
 
182
    def _do_repository_request(self, body_bytes):
 
183
        repository = self._repository
 
184
        revision_ids = set(self._revision_ids)
 
185
        include_missing = 'include-missing:' in revision_ids
 
186
        if include_missing:
 
187
            revision_ids.remove('include-missing:')
 
188
        body_lines = body_bytes.split('\n')
 
189
        search_result, error = self.recreate_search_from_recipe(
 
190
            repository, body_lines)
 
191
        if error is not None:
 
192
            return error
 
193
        # TODO might be nice to start up the search again; but thats not
 
194
        # written or tested yet.
 
195
        client_seen_revs = set(search_result.get_keys())
 
196
        # Always include the requested ids.
 
197
        client_seen_revs.difference_update(revision_ids)
 
198
        lines = []
 
199
        repo_graph = repository.get_graph()
189
200
        result = {}
190
201
        queried_revs = set()
191
 
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
 
202
        size_so_far = 0
192
203
        next_revs = revision_ids
193
204
        first_loop_done = False
194
205
        while next_revs:
216
227
                    # add parents to the result
217
228
                    result[encoded_id] = parents
218
229
                    # Approximate the serialized cost of this revision_id.
219
 
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
220
 
                    estimator.add_content(line)
 
230
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
221
231
            # get all the directly asked for parents, and then flesh out to
222
232
            # 64K (compressed) or so. We do one level of depth at a time to
223
233
            # stay in sync with the client. The 250000 magic number is
224
234
            # estimated compression ratio taken from bzr.dev itself.
225
 
            if self.no_extra_results or (first_loop_done and estimator.full()):
226
 
                trace.mutter('size: %d, z_size: %d'
227
 
                             % (estimator._uncompressed_size_added,
228
 
                                estimator._compressed_size_added))
 
235
            if self.no_extra_results or (
 
236
                first_loop_done and size_so_far > 250000):
229
237
                next_revs = set()
230
238
                break
231
239
            # don't query things we've already queried
232
 
            next_revs = next_revs.difference(queried_revs)
 
240
            next_revs.difference_update(queried_revs)
233
241
            first_loop_done = True
234
 
        return result
235
 
 
236
 
    def _do_repository_request(self, body_bytes):
237
 
        repository = self._repository
238
 
        revision_ids = set(self._revision_ids)
239
 
        include_missing = 'include-missing:' in revision_ids
240
 
        if include_missing:
241
 
            revision_ids.remove('include-missing:')
242
 
        body_lines = body_bytes.split('\n')
243
 
        search_result, error = self.recreate_search_from_recipe(
244
 
            repository, body_lines)
245
 
        if error is not None:
246
 
            return error
247
 
        # TODO might be nice to start up the search again; but thats not
248
 
        # written or tested yet.
249
 
        client_seen_revs = set(search_result.get_keys())
250
 
        # Always include the requested ids.
251
 
        client_seen_revs.difference_update(revision_ids)
252
 
 
253
 
        repo_graph = repository.get_graph()
254
 
        result = self._expand_requested_revs(repo_graph, revision_ids,
255
 
                                             client_seen_revs, include_missing)
256
242
 
257
243
        # sorting trivially puts lexographically similar revision ids together.
258
244
        # Compression FTW.
259
 
        lines = []
260
245
        for revision, parents in sorted(result.items()):
261
246
            lines.append(' '.join((revision, ) + tuple(parents)))
262
247
 
407
392
        if token == '':
408
393
            token = None
409
394
        try:
410
 
            token = repository.lock_write(token=token).repository_token
 
395
            token = repository.lock_write(token=token)
411
396
        except errors.LockContention, e:
412
397
            return FailedSmartServerResponse(('LockContention',))
413
398
        except errors.UnlockableTransport:
428
413
    def do_repository_request(self, repository, to_network_name):
429
414
        """Get a stream for inserting into a to_format repository.
430
415
 
431
 
        The request body is 'search_bytes', a description of the revisions
432
 
        being requested.
433
 
 
434
 
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
435
 
        implementations will respond with a BadSearch error, and clients should
436
 
        catch this and fallback appropriately.
437
 
 
438
416
        :param repository: The repository to stream from.
439
417
        :param to_network_name: The network name of the format of the target
440
418
            repository.
512
490
 
513
491
 
514
492
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
515
 
    """The same as Repository.get_stream, but will return stream CHK formats to
516
 
    clients.
517
 
 
518
 
    See SmartServerRepositoryGetStream._should_fake_unknown.
519
 
    
520
 
    New in 1.19.
521
 
    """
522
493
 
523
494
    def _should_fake_unknown(self):
524
495
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
534
505
        for record in substream:
535
506
            if record.storage_kind in ('chunked', 'fulltext'):
536
507
                serialised = record_to_fulltext_bytes(record)
 
508
            elif record.storage_kind == 'inventory-delta':
 
509
                serialised = record_to_inventory_delta_bytes(record)
537
510
            elif record.storage_kind == 'absent':
538
511
                raise ValueError("Absent factory for %s" % (record.key,))
539
512
            else:
571
544
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
572
545
    """
573
546
 
574
 
    def __init__(self, byte_stream, record_counter):
 
547
    def __init__(self, byte_stream):
575
548
        """Create a _ByteStreamDecoder."""
576
549
        self.stream_decoder = pack.ContainerPushParser()
577
550
        self.current_type = None
578
551
        self.first_bytes = None
579
552
        self.byte_stream = byte_stream
580
 
        self._record_counter = record_counter
581
 
        self.key_count = 0
582
553
 
583
554
    def iter_stream_decoder(self):
584
555
        """Iterate the contents of the pack from stream_decoder."""
609
580
 
610
581
    def record_stream(self):
611
582
        """Yield substream_type, substream from the byte stream."""
612
 
        def wrap_and_count(pb, rc, substream):
613
 
            """Yield records from stream while showing progress."""
614
 
            counter = 0
615
 
            if rc:
616
 
                if self.current_type != 'revisions' and self.key_count != 0:
617
 
                    # As we know the number of revisions now (in self.key_count)
618
 
                    # we can setup and use record_counter (rc).
619
 
                    if not rc.is_initialized():
620
 
                        rc.setup(self.key_count, self.key_count)
621
 
            for record in substream.read():
622
 
                if rc:
623
 
                    if rc.is_initialized() and counter == rc.STEP:
624
 
                        rc.increment(counter)
625
 
                        pb.update('Estimate', rc.current, rc.max)
626
 
                        counter = 0
627
 
                    if self.current_type == 'revisions':
628
 
                        # Total records is proportional to number of revs
629
 
                        # to fetch. With remote, we used self.key_count to
630
 
                        # track the number of revs. Once we have the revs
631
 
                        # counts in self.key_count, the progress bar changes
632
 
                        # from 'Estimating..' to 'Estimate' above.
633
 
                        self.key_count += 1
634
 
                        if counter == rc.STEP:
635
 
                            pb.update('Estimating..', self.key_count)
636
 
                            counter = 0
637
 
                counter += 1
638
 
                yield record
639
 
 
640
583
        self.seed_state()
641
 
        pb = ui.ui_factory.nested_progress_bar()
642
 
        rc = self._record_counter
643
584
        # Make and consume sub generators, one per substream type:
644
585
        while self.first_bytes is not None:
645
586
            substream = NetworkRecordStream(self.iter_substream_bytes())
646
587
            # after substream is fully consumed, self.current_type is set to
647
588
            # the next type, and self.first_bytes is set to the matching bytes.
648
 
            yield self.current_type, wrap_and_count(pb, rc, substream)
649
 
        if rc:
650
 
            pb.update('Done', rc.max, rc.max)
651
 
        pb.finished()
 
589
            yield self.current_type, substream.read()
652
590
 
653
591
    def seed_state(self):
654
592
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
659
597
        list(self.iter_substream_bytes())
660
598
 
661
599
 
662
 
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
600
def _byte_stream_to_stream(byte_stream):
663
601
    """Convert a byte stream into a format and a stream.
664
602
 
665
603
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
666
604
    :return: (RepositoryFormat, stream_generator)
667
605
    """
668
 
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
606
    decoder = _ByteStreamDecoder(byte_stream)
669
607
    for bytes in byte_stream:
670
608
        decoder.stream_decoder.accept_bytes(bytes)
671
609
        for record in decoder.stream_decoder.read_pending_records(max=1):