~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

  • Committer: Robert Collins
  • Date: 2009-02-27 13:05:36 UTC
  • mto: This revision was merged to the branch mainline in revision 4067.
  • Revision ID: robertc@robertcollins.net-20090227130536-wsqoyhyt3n11nc8d
Implement the separate source component for fetch - repository.StreamSource.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1226
1226
        """Return a sink for streaming into this repository."""
1227
1227
        return StreamSink(self)
1228
1228
 
 
1229
    def _get_source(self, to_format):
 
1230
        """Return a source for streaming from this repository."""
 
1231
        return StreamSource(self, to_format)
 
1232
 
1229
1233
    @needs_read_lock
1230
1234
    def has_revision(self, revision_id):
1231
1235
        """True if this repository has a copy of the revision."""
2559
2563
        self.target_get_graph = self.target.get_graph
2560
2564
        self.target_get_parent_map = self.target.get_parent_map
2561
2565
 
 
2566
    @needs_write_lock
2562
2567
    def copy_content(self, revision_id=None):
2563
 
        raise NotImplementedError(self.copy_content)
 
2568
        """Make a complete copy of the content in self into destination.
 
2569
 
 
2570
        This is a destructive operation! Do not use it on existing
 
2571
        repositories.
 
2572
 
 
2573
        :param revision_id: Only copy the content needed to construct
 
2574
                            revision_id and its parents.
 
2575
        """
 
2576
        try:
 
2577
            self.target.set_make_working_trees(self.source.make_working_trees())
 
2578
        except NotImplementedError:
 
2579
            pass
 
2580
        self.target.fetch(self.source, revision_id=revision_id)
2564
2581
 
2565
2582
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2566
2583
        """Fetch the content required to construct revision_id.
2574
2591
 
2575
2592
        :returns: (copied_revision_count, failures).
2576
2593
        """
2577
 
        # Normally we should find a specific InterRepository subclass to do
2578
 
        # the fetch; if nothing else then at least InterSameDataRepository.
2579
 
        # If none of them is suitable it looks like fetching is not possible;
2580
 
        # we try to give a good message why.  _assert_same_model will probably
2581
 
        # give a helpful message; otherwise a generic one.
2582
 
        self._assert_same_model(self.source, self.target)
2583
 
        raise errors.IncompatibleRepositories(self.source, self.target,
2584
 
            "no suitableInterRepository found")
 
2594
        from bzrlib.fetch import RepoFetcher
 
2595
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
 
2596
               self.source, self.source._format, self.target,
 
2597
               self.target._format)
 
2598
        f = RepoFetcher(to_repository=self.target,
 
2599
                               from_repository=self.source,
 
2600
                               last_revision=revision_id,
 
2601
                               pb=pb, find_ghosts=find_ghosts)
 
2602
        return f.count_copied, f.failed_revisions
2585
2603
 
2586
2604
    def _walk_to_common_revisions(self, revision_ids):
2587
2605
        """Walk out from revision_ids in source to revisions target has.
2727
2745
    def is_compatible(source, target):
2728
2746
        return InterRepository._same_model(source, target)
2729
2747
 
2730
 
    @needs_write_lock
2731
 
    def copy_content(self, revision_id=None):
2732
 
        """Make a complete copy of the content in self into destination.
2733
 
 
2734
 
        This copies both the repository's revision data, and configuration information
2735
 
        such as the make_working_trees setting.
2736
 
 
2737
 
        This is a destructive operation! Do not use it on existing
2738
 
        repositories.
2739
 
 
2740
 
        :param revision_id: Only copy the content needed to construct
2741
 
                            revision_id and its parents.
2742
 
        """
2743
 
        try:
2744
 
            self.target.set_make_working_trees(self.source.make_working_trees())
2745
 
        except NotImplementedError:
2746
 
            pass
2747
 
        # but don't bother fetching if we have the needed data now.
2748
 
        if (revision_id not in (None, _mod_revision.NULL_REVISION) and
2749
 
            self.target.has_revision(revision_id)):
2750
 
            return
2751
 
        self.target.fetch(self.source, revision_id=revision_id)
2752
 
 
2753
 
    @needs_write_lock
2754
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2755
 
        """See InterRepository.fetch()."""
2756
 
        from bzrlib.fetch import RepoFetcher
2757
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2758
 
               self.source, self.source._format, self.target,
2759
 
               self.target._format)
2760
 
        f = RepoFetcher(to_repository=self.target,
2761
 
                               from_repository=self.source,
2762
 
                               last_revision=revision_id,
2763
 
                               pb=pb, find_ghosts=find_ghosts)
2764
 
        return f.count_copied, f.failed_revisions
2765
 
 
2766
2748
 
2767
2749
class InterWeaveRepo(InterSameDataRepository):
2768
2750
    """Optimised code paths between Weave based repositories.
2997
2979
            return fetcher.count_copied, fetcher.failed_revisions
2998
2980
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2999
2981
               self.source, self.source._format, self.target, self.target._format)
3000
 
        self.count_copied = 0
3001
2982
        if revision_id is None:
3002
2983
            # TODO:
3003
2984
            # everything to do - use pack logic
3096
3077
        return self.source.revision_ids_to_search_result(result_set)
3097
3078
 
3098
3079
 
3099
 
class InterModel1and2(InterRepository):
3100
 
 
3101
 
    @classmethod
3102
 
    def _get_repo_format_to_test(self):
3103
 
        return None
3104
 
 
3105
 
    @staticmethod
3106
 
    def is_compatible(source, target):
3107
 
        if not source.supports_rich_root() and target.supports_rich_root():
3108
 
            return True
3109
 
        else:
3110
 
            return False
3111
 
 
3112
 
    @needs_write_lock
3113
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3114
 
        """See InterRepository.fetch()."""
3115
 
        from bzrlib.fetch import Model1toKnit2Fetcher
3116
 
        f = Model1toKnit2Fetcher(to_repository=self.target,
3117
 
                                 from_repository=self.source,
3118
 
                                 last_revision=revision_id,
3119
 
                                 pb=pb, find_ghosts=find_ghosts)
3120
 
        return f.count_copied, f.failed_revisions
3121
 
 
3122
 
    @needs_write_lock
3123
 
    def copy_content(self, revision_id=None):
3124
 
        """Make a complete copy of the content in self into destination.
3125
 
 
3126
 
        This is a destructive operation! Do not use it on existing
3127
 
        repositories.
3128
 
 
3129
 
        :param revision_id: Only copy the content needed to construct
3130
 
                            revision_id and its parents.
3131
 
        """
3132
 
        try:
3133
 
            self.target.set_make_working_trees(self.source.make_working_trees())
3134
 
        except NotImplementedError:
3135
 
            pass
3136
 
        # but don't bother fetching if we have the needed data now.
3137
 
        if (revision_id not in (None, _mod_revision.NULL_REVISION) and
3138
 
            self.target.has_revision(revision_id)):
3139
 
            return
3140
 
        self.target.fetch(self.source, revision_id=revision_id)
3141
 
 
3142
 
 
3143
 
class InterKnit1and2(InterKnitRepo):
3144
 
 
3145
 
    @classmethod
3146
 
    def _get_repo_format_to_test(self):
3147
 
        return None
3148
 
 
3149
 
    @staticmethod
3150
 
    def is_compatible(source, target):
3151
 
        """Be compatible with Knit1 source and Knit3 target"""
3152
 
        try:
3153
 
            from bzrlib.repofmt.knitrepo import (
3154
 
                RepositoryFormatKnit1,
3155
 
                RepositoryFormatKnit3,
3156
 
                )
3157
 
            from bzrlib.repofmt.pack_repo import (
3158
 
                RepositoryFormatKnitPack1,
3159
 
                RepositoryFormatKnitPack3,
3160
 
                RepositoryFormatKnitPack4,
3161
 
                RepositoryFormatKnitPack5,
3162
 
                RepositoryFormatKnitPack5RichRoot,
3163
 
                RepositoryFormatKnitPack6,
3164
 
                RepositoryFormatKnitPack6RichRoot,
3165
 
                RepositoryFormatPackDevelopment2,
3166
 
                RepositoryFormatPackDevelopment2Subtree,
3167
 
                )
3168
 
            norichroot = (
3169
 
                RepositoryFormatKnit1,            # no rr, no subtree
3170
 
                RepositoryFormatKnitPack1,        # no rr, no subtree
3171
 
                RepositoryFormatPackDevelopment2, # no rr, no subtree
3172
 
                RepositoryFormatKnitPack5,        # no rr, no subtree
3173
 
                RepositoryFormatKnitPack6,        # no rr, no subtree
3174
 
                )
3175
 
            richroot = (
3176
 
                RepositoryFormatKnit3,            # rr, subtree
3177
 
                RepositoryFormatKnitPack3,        # rr, subtree
3178
 
                RepositoryFormatKnitPack4,        # rr, no subtree
3179
 
                RepositoryFormatKnitPack5RichRoot,# rr, no subtree
3180
 
                RepositoryFormatKnitPack6RichRoot,# rr, no subtree
3181
 
                RepositoryFormatPackDevelopment2Subtree, # rr, subtree
3182
 
                )
3183
 
            for format in norichroot:
3184
 
                if format.rich_root_data:
3185
 
                    raise AssertionError('Format %s is a rich-root format'
3186
 
                        ' but is included in the non-rich-root list'
3187
 
                        % (format,))
3188
 
            for format in richroot:
3189
 
                if not format.rich_root_data:
3190
 
                    raise AssertionError('Format %s is not a rich-root format'
3191
 
                        ' but is included in the rich-root list'
3192
 
                        % (format,))
3193
 
            # TODO: One alternative is to just check format.rich_root_data,
3194
 
            #       instead of keeping membership lists. However, the formats
3195
 
            #       *also* have to use the same 'Knit' style of storage
3196
 
            #       (line-deltas, fulltexts, etc.)
3197
 
            return (isinstance(source._format, norichroot) and
3198
 
                    isinstance(target._format, richroot))
3199
 
        except AttributeError:
3200
 
            return False
3201
 
 
3202
 
    @needs_write_lock
3203
 
    def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3204
 
        """See InterRepository.fetch()."""
3205
 
        from bzrlib.fetch import Knit1to2Fetcher
3206
 
        mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3207
 
               self.source, self.source._format, self.target,
3208
 
               self.target._format)
3209
 
        f = Knit1to2Fetcher(to_repository=self.target,
3210
 
                            from_repository=self.source,
3211
 
                            last_revision=revision_id,
3212
 
                            pb=pb, find_ghosts=find_ghosts)
3213
 
        return f.count_copied, f.failed_revisions
3214
 
 
3215
 
 
3216
3080
class InterDifferingSerializer(InterKnitRepo):
3217
3081
 
3218
3082
    @classmethod
3507
3371
InterRepository.register_optimiser(InterSameDataRepository)
3508
3372
InterRepository.register_optimiser(InterWeaveRepo)
3509
3373
InterRepository.register_optimiser(InterKnitRepo)
3510
 
InterRepository.register_optimiser(InterModel1and2)
3511
 
InterRepository.register_optimiser(InterKnit1and2)
3512
3374
InterRepository.register_optimiser(InterPackRepo)
3513
3375
InterRepository.register_optimiser(InterOtherToRemote)
3514
3376
InterRepository.register_optimiser(InterRemoteToOther)
3784
3646
        if self.target_repo._format._fetch_reconcile:
3785
3647
            self.target_repo.reconcile()
3786
3648
 
 
3649
 
 
3650
class StreamSource(object):
 
3651
    """A source of a stream for fetching between repositories.
 
3652
 
 
3653
    :ivar count_copied: number of revisions streamed.
 
3654
    """
 
3655
 
 
3656
    def __init__(self, from_repository, to_format):
 
3657
        """Create a StreamSource streaming from from_repository."""
 
3658
        self.from_repository = from_repository
 
3659
        self.to_format = to_format
 
3660
        self.count_copied = 0
 
3661
 
 
3662
    def delta_on_metadata(self):
 
3663
        """Return True if delta's are permitted on metadata streams.
 
3664
 
 
3665
        That is on revisions and signatures.
 
3666
        """
 
3667
        src_serializer = self.from_repository._format._serializer
 
3668
        target_serializer = self.to_format._serializer
 
3669
        return (self.to_format._fetch_uses_deltas and
 
3670
            src_serializer == target_serializer)
 
3671
 
 
3672
    def _fetch_revision_texts(self, revs):
 
3673
        # fetch signatures first and then the revision texts
 
3674
        # may need to be a InterRevisionStore call here.
 
3675
        from_sf = self.from_repository.signatures
 
3676
        # A missing signature is just skipped.
 
3677
        keys = [(rev_id,) for rev_id in revs]
 
3678
        signatures = filter_absent(from_sf.get_record_stream(
 
3679
            keys,
 
3680
            self.to_format._fetch_order,
 
3681
            not self.to_format._fetch_uses_deltas))
 
3682
        # If a revision has a delta, this is actually expanded inside the
 
3683
        # insert_record_stream code now, which is an alternate fix for
 
3684
        # bug #261339
 
3685
        from_rf = self.from_repository.revisions
 
3686
        revisions = from_rf.get_record_stream(
 
3687
            keys,
 
3688
            self.to_format._fetch_order,
 
3689
            not self.delta_on_metadata())
 
3690
        return [('signatures', signatures), ('revisions', revisions)]
 
3691
 
 
3692
    def _generate_root_texts(self, revs):
 
3693
        """This will be called by __fetch between fetching weave texts and
 
3694
        fetching the inventory weave.
 
3695
 
 
3696
        Subclasses should override this if they need to generate root texts
 
3697
        after fetching weave texts.
 
3698
        """
 
3699
        if self._rich_root_upgrade():
 
3700
            import bzrlib.fetch
 
3701
            return bzrlib.fetch.Inter1and2Helper(
 
3702
                self.from_repository).generate_root_texts(revs)
 
3703
        else:
 
3704
            return []
 
3705
 
 
3706
    def get_stream(self, search):
 
3707
        phase = 'file'
 
3708
        revs = search.get_keys()
 
3709
        graph = self.from_repository.get_graph()
 
3710
        revs = list(graph.iter_topo_order(revs))
 
3711
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
 
3712
        text_keys = []
 
3713
        for knit_kind, file_id, revisions in data_to_fetch:
 
3714
            if knit_kind != phase:
 
3715
                phase = knit_kind
 
3716
                # Make a new progress bar for this phase
 
3717
            if knit_kind == "file":
 
3718
                # Accumulate file texts
 
3719
                text_keys.extend([(file_id, revision) for revision in
 
3720
                    revisions])
 
3721
            elif knit_kind == "inventory":
 
3722
                # Now copy the file texts.
 
3723
                from_texts = self.from_repository.texts
 
3724
                yield ('texts', from_texts.get_record_stream(
 
3725
                    text_keys, self.to_format._fetch_order,
 
3726
                    not self.to_format._fetch_uses_deltas))
 
3727
                # Cause an error if a text occurs after we have done the
 
3728
                # copy.
 
3729
                text_keys = None
 
3730
                # Before we process the inventory we generate the root
 
3731
                # texts (if necessary) so that the inventories references
 
3732
                # will be valid.
 
3733
                for _ in self._generate_root_texts(revs):
 
3734
                    yield _
 
3735
                # NB: This currently reopens the inventory weave in source;
 
3736
                # using a single stream interface instead would avoid this.
 
3737
                from_weave = self.from_repository.inventories
 
3738
                # we fetch only the referenced inventories because we do not
 
3739
                # know for unselected inventories whether all their required
 
3740
                # texts are present in the other repository - it could be
 
3741
                # corrupt.
 
3742
                yield ('inventories', from_weave.get_record_stream(
 
3743
                    [(rev_id,) for rev_id in revs],
 
3744
                    self.inventory_fetch_order(),
 
3745
                    not self.delta_on_metadata()))
 
3746
            elif knit_kind == "signatures":
 
3747
                # Nothing to do here; this will be taken care of when
 
3748
                # _fetch_revision_texts happens.
 
3749
                pass
 
3750
            elif knit_kind == "revisions":
 
3751
                for record in self._fetch_revision_texts(revs):
 
3752
                    yield record
 
3753
            else:
 
3754
                raise AssertionError("Unknown knit kind %r" % knit_kind)
 
3755
        self.count_copied += len(revs)
 
3756
 
 
3757
    def get_stream_for_missing_keys(self, missing_keys):
 
3758
        # missing keys can only occur when we are byte copying and not
 
3759
        # translating (because translation means we don't send
 
3760
        # unreconstructable deltas ever).
 
3761
        keys = {}
 
3762
        keys['texts'] = set()
 
3763
        keys['revisions'] = set()
 
3764
        keys['inventories'] = set()
 
3765
        keys['signatures'] = set()
 
3766
        for key in missing_keys:
 
3767
            keys[key[0]].add(key[1:])
 
3768
        if len(keys['revisions']):
 
3769
            # If we allowed copying revisions at this point, we could end up
 
3770
            # copying a revision without copying its required texts: a
 
3771
            # violation of the requirements for repository integrity.
 
3772
            raise AssertionError(
 
3773
                'cannot copy revisions to fill in missing deltas %s' % (
 
3774
                    keys['revisions'],))
 
3775
        for substream_kind, keys in keys.iteritems():
 
3776
            vf = getattr(self.from_repository, substream_kind)
 
3777
            # Ask for full texts always so that we don't need more round trips
 
3778
            # after this stream.
 
3779
            stream = vf.get_record_stream(keys,
 
3780
                self.to_format._fetch_order, True)
 
3781
            yield substream_kind, stream
 
3782
 
 
3783
    def inventory_fetch_order(self):
 
3784
        if self._rich_root_upgrade():
 
3785
            return 'topological'
 
3786
        else:
 
3787
            return self.to_format._fetch_order
 
3788
 
 
3789
    def _rich_root_upgrade(self):
 
3790
        return (not self.from_repository._format.rich_root_data and
 
3791
            self.to_format.rich_root_data)
 
3792