~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-12-09 02:53:42 UTC
  • mfrom: (4873.2.3 2.1.0b4-win32-test-imports)
  • Revision ID: pqm@pqm.ubuntu.com-20091209025342-sidvxfcqdgxmuz59
(jam) Get the test suite running again on Windows, (bug #492561)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
import os
21
21
import Queue
22
22
import sys
 
23
import tarfile
23
24
import tempfile
24
25
import threading
25
26
 
26
27
from bzrlib import (
27
28
    bencode,
28
 
    commands,
29
29
    errors,
30
 
    estimate_compressed_size,
31
30
    graph,
32
31
    osutils,
33
32
    pack,
34
 
    trace,
35
 
    ui,
 
33
    versionedfile,
36
34
    )
37
35
from bzrlib.bzrdir import BzrDir
38
36
from bzrlib.smart.request import (
84
82
            recreate_search trusts that clients will look for missing things
85
83
            they expected and get it from elsewhere.
86
84
        """
87
 
        if search_bytes == 'everything':
88
 
            return graph.EverythingResult(repository), None
89
85
        lines = search_bytes.split('\n')
90
86
        if lines[0] == 'ancestry-of':
91
87
            heads = lines[1:]
144
140
        finally:
145
141
            repository.unlock()
146
142
 
147
 
_lsprof_count = 0
148
143
 
149
144
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
150
145
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
184
179
        finally:
185
180
            repository.unlock()
186
181
 
187
 
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
188
 
                               include_missing, max_size=65536):
 
182
    def _do_repository_request(self, body_bytes):
 
183
        repository = self._repository
 
184
        revision_ids = set(self._revision_ids)
 
185
        include_missing = 'include-missing:' in revision_ids
 
186
        if include_missing:
 
187
            revision_ids.remove('include-missing:')
 
188
        body_lines = body_bytes.split('\n')
 
189
        search_result, error = self.recreate_search_from_recipe(
 
190
            repository, body_lines)
 
191
        if error is not None:
 
192
            return error
 
193
        # TODO might be nice to start up the search again; but thats not
 
194
        # written or tested yet.
 
195
        client_seen_revs = set(search_result.get_keys())
 
196
        # Always include the requested ids.
 
197
        client_seen_revs.difference_update(revision_ids)
 
198
        lines = []
 
199
        repo_graph = repository.get_graph()
189
200
        result = {}
190
201
        queried_revs = set()
191
 
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
 
202
        size_so_far = 0
192
203
        next_revs = revision_ids
193
204
        first_loop_done = False
194
205
        while next_revs:
216
227
                    # add parents to the result
217
228
                    result[encoded_id] = parents
218
229
                    # Approximate the serialized cost of this revision_id.
219
 
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
220
 
                    estimator.add_content(line)
 
230
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
221
231
            # get all the directly asked for parents, and then flesh out to
222
232
            # 64K (compressed) or so. We do one level of depth at a time to
223
233
            # stay in sync with the client. The 250000 magic number is
224
234
            # estimated compression ratio taken from bzr.dev itself.
225
 
            if self.no_extra_results or (first_loop_done and estimator.full()):
226
 
                trace.mutter('size: %d, z_size: %d'
227
 
                             % (estimator._uncompressed_size_added,
228
 
                                estimator._compressed_size_added))
 
235
            if self.no_extra_results or (
 
236
                first_loop_done and size_so_far > 250000):
229
237
                next_revs = set()
230
238
                break
231
239
            # don't query things we've already queried
232
 
            next_revs = next_revs.difference(queried_revs)
 
240
            next_revs.difference_update(queried_revs)
233
241
            first_loop_done = True
234
 
        return result
235
 
 
236
 
    def _do_repository_request(self, body_bytes):
237
 
        repository = self._repository
238
 
        revision_ids = set(self._revision_ids)
239
 
        include_missing = 'include-missing:' in revision_ids
240
 
        if include_missing:
241
 
            revision_ids.remove('include-missing:')
242
 
        body_lines = body_bytes.split('\n')
243
 
        search_result, error = self.recreate_search_from_recipe(
244
 
            repository, body_lines)
245
 
        if error is not None:
246
 
            return error
247
 
        # TODO might be nice to start up the search again; but thats not
248
 
        # written or tested yet.
249
 
        client_seen_revs = set(search_result.get_keys())
250
 
        # Always include the requested ids.
251
 
        client_seen_revs.difference_update(revision_ids)
252
 
 
253
 
        repo_graph = repository.get_graph()
254
 
        result = self._expand_requested_revs(repo_graph, revision_ids,
255
 
                                             client_seen_revs, include_missing)
256
242
 
257
243
        # sorting trivially puts lexographically similar revision ids together.
258
244
        # Compression FTW.
259
 
        lines = []
260
245
        for revision, parents in sorted(result.items()):
261
246
            lines.append(' '.join((revision, ) + tuple(parents)))
262
247
 
407
392
        if token == '':
408
393
            token = None
409
394
        try:
410
 
            token = repository.lock_write(token=token).repository_token
 
395
            token = repository.lock_write(token=token)
411
396
        except errors.LockContention, e:
412
397
            return FailedSmartServerResponse(('LockContention',))
413
398
        except errors.UnlockableTransport:
428
413
    def do_repository_request(self, repository, to_network_name):
429
414
        """Get a stream for inserting into a to_format repository.
430
415
 
431
 
        The request body is 'search_bytes', a description of the revisions
432
 
        being requested.
433
 
 
434
 
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
435
 
        implementations will respond with a BadSearch error, and clients should
436
 
        catch this and fallback appropriately.
437
 
 
438
416
        :param repository: The repository to stream from.
439
417
        :param to_network_name: The network name of the format of the target
440
418
            repository.
512
490
 
513
491
 
514
492
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
515
 
    """The same as Repository.get_stream, but will return stream CHK formats to
516
 
    clients.
517
 
 
518
 
    See SmartServerRepositoryGetStream._should_fake_unknown.
519
 
    
520
 
    New in 1.19.
521
 
    """
522
493
 
523
494
    def _should_fake_unknown(self):
524
495
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
534
505
        for record in substream:
535
506
            if record.storage_kind in ('chunked', 'fulltext'):
536
507
                serialised = record_to_fulltext_bytes(record)
 
508
            elif record.storage_kind == 'inventory-delta':
 
509
                serialised = record_to_inventory_delta_bytes(record)
537
510
            elif record.storage_kind == 'absent':
538
511
                raise ValueError("Absent factory for %s" % (record.key,))
539
512
            else:
571
544
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
572
545
    """
573
546
 
574
 
    def __init__(self, byte_stream, record_counter):
 
547
    def __init__(self, byte_stream):
575
548
        """Create a _ByteStreamDecoder."""
576
549
        self.stream_decoder = pack.ContainerPushParser()
577
550
        self.current_type = None
578
551
        self.first_bytes = None
579
552
        self.byte_stream = byte_stream
580
 
        self._record_counter = record_counter
581
 
        self.key_count = 0
582
553
 
583
554
    def iter_stream_decoder(self):
584
555
        """Iterate the contents of the pack from stream_decoder."""
609
580
 
610
581
    def record_stream(self):
611
582
        """Yield substream_type, substream from the byte stream."""
612
 
        def wrap_and_count(pb, rc, substream):
613
 
            """Yield records from stream while showing progress."""
614
 
            counter = 0
615
 
            if rc:
616
 
                if self.current_type != 'revisions' and self.key_count != 0:
617
 
                    # As we know the number of revisions now (in self.key_count)
618
 
                    # we can setup and use record_counter (rc).
619
 
                    if not rc.is_initialized():
620
 
                        rc.setup(self.key_count, self.key_count)
621
 
            for record in substream.read():
622
 
                if rc:
623
 
                    if rc.is_initialized() and counter == rc.STEP:
624
 
                        rc.increment(counter)
625
 
                        pb.update('Estimate', rc.current, rc.max)
626
 
                        counter = 0
627
 
                    if self.current_type == 'revisions':
628
 
                        # Total records is proportional to number of revs
629
 
                        # to fetch. With remote, we used self.key_count to
630
 
                        # track the number of revs. Once we have the revs
631
 
                        # counts in self.key_count, the progress bar changes
632
 
                        # from 'Estimating..' to 'Estimate' above.
633
 
                        self.key_count += 1
634
 
                        if counter == rc.STEP:
635
 
                            pb.update('Estimating..', self.key_count)
636
 
                            counter = 0
637
 
                counter += 1
638
 
                yield record
639
 
 
640
583
        self.seed_state()
641
 
        pb = ui.ui_factory.nested_progress_bar()
642
 
        rc = self._record_counter
643
584
        # Make and consume sub generators, one per substream type:
644
585
        while self.first_bytes is not None:
645
586
            substream = NetworkRecordStream(self.iter_substream_bytes())
646
587
            # after substream is fully consumed, self.current_type is set to
647
588
            # the next type, and self.first_bytes is set to the matching bytes.
648
 
            yield self.current_type, wrap_and_count(pb, rc, substream)
649
 
        if rc:
650
 
            pb.update('Done', rc.max, rc.max)
651
 
        pb.finished()
 
589
            yield self.current_type, substream.read()
652
590
 
653
591
    def seed_state(self):
654
592
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
659
597
        list(self.iter_substream_bytes())
660
598
 
661
599
 
662
 
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
600
def _byte_stream_to_stream(byte_stream):
663
601
    """Convert a byte stream into a format and a stream.
664
602
 
665
603
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
666
604
    :return: (RepositoryFormat, stream_generator)
667
605
    """
668
 
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
606
    decoder = _ByteStreamDecoder(byte_stream)
669
607
    for bytes in byte_stream:
670
608
        decoder.stream_decoder.accept_bytes(bytes)
671
609
        for record in decoder.stream_decoder.read_pending_records(max=1):
737
675
            temp.close()
738
676
 
739
677
    def _tarball_of_dir(self, dirname, compression, ofile):
740
 
        import tarfile
741
678
        filename = os.path.basename(ofile.name)
742
679
        tarball = tarfile.open(fileobj=ofile, name=filename,
743
680
            mode='w|' + compression)