~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Martin von Gagern
  • Date: 2011-09-19 08:49:15 UTC
  • mto: This revision was merged to the branch mainline in revision 6148.
  • Revision ID: martin.vgagern@gmx.net-20110919084915-vbumflqq3xqm1vez
Avoid using deprecated api in the unit tests for bzrlib.missing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
25
25
 
26
26
from bzrlib import (
27
27
    bencode,
 
28
    commands,
28
29
    errors,
 
30
    estimate_compressed_size,
29
31
    graph,
30
32
    osutils,
31
33
    pack,
 
34
    trace,
32
35
    ui,
33
 
    versionedfile,
34
36
    )
35
37
from bzrlib.bzrdir import BzrDir
36
38
from bzrlib.smart.request import (
82
84
            recreate_search trusts that clients will look for missing things
83
85
            they expected and get it from elsewhere.
84
86
        """
 
87
        if search_bytes == 'everything':
 
88
            return graph.EverythingResult(repository), None
85
89
        lines = search_bytes.split('\n')
86
90
        if lines[0] == 'ancestry-of':
87
91
            heads = lines[1:]
140
144
        finally:
141
145
            repository.unlock()
142
146
 
 
147
_lsprof_count = 0
143
148
 
144
149
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
150
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
179
184
        finally:
180
185
            repository.unlock()
181
186
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
187
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
188
                               include_missing, max_size=65536):
200
189
        result = {}
201
190
        queried_revs = set()
202
 
        size_so_far = 0
 
191
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
192
        next_revs = revision_ids
204
193
        first_loop_done = False
205
194
        while next_revs:
227
216
                    # add parents to the result
228
217
                    result[encoded_id] = parents
229
218
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
219
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
220
                    estimator.add_content(line)
231
221
            # get all the directly asked for parents, and then flesh out to
232
222
            # 64K (compressed) or so. We do one level of depth at a time to
233
223
            # stay in sync with the client. The 250000 magic number is
234
224
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
225
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
226
                trace.mutter('size: %d, z_size: %d'
 
227
                             % (estimator._uncompressed_size_added,
 
228
                                estimator._compressed_size_added))
237
229
                next_revs = set()
238
230
                break
239
231
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
232
            next_revs = next_revs.difference(queried_revs)
241
233
            first_loop_done = True
 
234
        return result
 
235
 
 
236
    def _do_repository_request(self, body_bytes):
 
237
        repository = self._repository
 
238
        revision_ids = set(self._revision_ids)
 
239
        include_missing = 'include-missing:' in revision_ids
 
240
        if include_missing:
 
241
            revision_ids.remove('include-missing:')
 
242
        body_lines = body_bytes.split('\n')
 
243
        search_result, error = self.recreate_search_from_recipe(
 
244
            repository, body_lines)
 
245
        if error is not None:
 
246
            return error
 
247
        # TODO might be nice to start up the search again; but thats not
 
248
        # written or tested yet.
 
249
        client_seen_revs = set(search_result.get_keys())
 
250
        # Always include the requested ids.
 
251
        client_seen_revs.difference_update(revision_ids)
 
252
 
 
253
        repo_graph = repository.get_graph()
 
254
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
255
                                             client_seen_revs, include_missing)
242
256
 
243
257
        # sorting trivially puts lexographically similar revision ids together.
244
258
        # Compression FTW.
 
259
        lines = []
245
260
        for revision, parents in sorted(result.items()):
246
261
            lines.append(' '.join((revision, ) + tuple(parents)))
247
262
 
392
407
        if token == '':
393
408
            token = None
394
409
        try:
395
 
            token = repository.lock_write(token=token)
 
410
            token = repository.lock_write(token=token).repository_token
396
411
        except errors.LockContention, e:
397
412
            return FailedSmartServerResponse(('LockContention',))
398
413
        except errors.UnlockableTransport:
413
428
    def do_repository_request(self, repository, to_network_name):
414
429
        """Get a stream for inserting into a to_format repository.
415
430
 
 
431
        The request body is 'search_bytes', a description of the revisions
 
432
        being requested.
 
433
 
 
434
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
435
        implementations will respond with a BadSearch error, and clients should
 
436
        catch this and fallback appropriately.
 
437
 
416
438
        :param repository: The repository to stream from.
417
439
        :param to_network_name: The network name of the format of the target
418
440
            repository.
490
512
 
491
513
 
492
514
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
515
    """The same as Repository.get_stream, but will return stream CHK formats to
 
516
    clients.
 
517
 
 
518
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
519
    
 
520
    New in 1.19.
 
521
    """
493
522
 
494
523
    def _should_fake_unknown(self):
495
524
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
505
534
        for record in substream:
506
535
            if record.storage_kind in ('chunked', 'fulltext'):
507
536
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
537
            elif record.storage_kind == 'absent':
511
538
                raise ValueError("Absent factory for %s" % (record.key,))
512
539
            else:
544
571
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
572
    """
546
573
 
547
 
    def __init__(self, byte_stream):
 
574
    def __init__(self, byte_stream, record_counter):
548
575
        """Create a _ByteStreamDecoder."""
549
576
        self.stream_decoder = pack.ContainerPushParser()
550
577
        self.current_type = None
551
578
        self.first_bytes = None
552
579
        self.byte_stream = byte_stream
 
580
        self._record_counter = record_counter
 
581
        self.key_count = 0
553
582
 
554
583
    def iter_stream_decoder(self):
555
584
        """Iterate the contents of the pack from stream_decoder."""
580
609
 
581
610
    def record_stream(self):
582
611
        """Yield substream_type, substream from the byte stream."""
 
612
        def wrap_and_count(pb, rc, substream):
 
613
            """Yield records from stream while showing progress."""
 
614
            counter = 0
 
615
            if rc:
 
616
                if self.current_type != 'revisions' and self.key_count != 0:
 
617
                    # As we know the number of revisions now (in self.key_count)
 
618
                    # we can setup and use record_counter (rc).
 
619
                    if not rc.is_initialized():
 
620
                        rc.setup(self.key_count, self.key_count)
 
621
            for record in substream.read():
 
622
                if rc:
 
623
                    if rc.is_initialized() and counter == rc.STEP:
 
624
                        rc.increment(counter)
 
625
                        pb.update('Estimate', rc.current, rc.max)
 
626
                        counter = 0
 
627
                    if self.current_type == 'revisions':
 
628
                        # Total records is proportional to number of revs
 
629
                        # to fetch. With remote, we used self.key_count to
 
630
                        # track the number of revs. Once we have the revs
 
631
                        # counts in self.key_count, the progress bar changes
 
632
                        # from 'Estimating..' to 'Estimate' above.
 
633
                        self.key_count += 1
 
634
                        if counter == rc.STEP:
 
635
                            pb.update('Estimating..', self.key_count)
 
636
                            counter = 0
 
637
                counter += 1
 
638
                yield record
 
639
 
583
640
        self.seed_state()
 
641
        pb = ui.ui_factory.nested_progress_bar()
 
642
        rc = self._record_counter
584
643
        # Make and consume sub generators, one per substream type:
585
644
        while self.first_bytes is not None:
586
645
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
646
            # after substream is fully consumed, self.current_type is set to
588
647
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
648
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
649
        if rc:
 
650
            pb.update('Done', rc.max, rc.max)
 
651
        pb.finished()
590
652
 
591
653
    def seed_state(self):
592
654
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
659
        list(self.iter_substream_bytes())
598
660
 
599
661
 
600
 
def _byte_stream_to_stream(byte_stream):
 
662
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
663
    """Convert a byte stream into a format and a stream.
602
664
 
603
665
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
666
    :return: (RepositoryFormat, stream_generator)
605
667
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
668
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
669
    for bytes in byte_stream:
608
670
        decoder.stream_decoder.accept_bytes(bytes)
609
671
        for record in decoder.stream_decoder.read_pending_records(max=1):