~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/fetch.py

  • Committer: Aaron Bentley
  • Date: 2005-08-25 15:23:10 UTC
  • mto: (1092.1.42) (1185.3.4)
  • mto: This revision was merged to the branch mainline in revision 1178.
  • Revision ID: abentley@panoramicfeedback.com-20050825152310-897f26086c7fb592
Fixed negative revision handling

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
import bzrlib.errors
 
17
from bzrlib.selftest.testrevision import make_branches
 
18
from bzrlib.trace import mutter, note
 
19
from bzrlib.branch import Branch
 
20
from bzrlib.progress import ProgressBar
 
21
import sys
 
22
import os
 
23
 
 
24
def greedy_fetch(to_branch, from_branch, revision=None, pb=None):
 
25
    """Copy a revision and all available ancestors from one branch to another
 
26
    If no revision is specified, uses the last revision in the source branch's
 
27
    revision history.
 
28
    """
 
29
    from_history = from_branch.revision_history()
 
30
    required_revisions = set(from_history)
 
31
    all_failed = set()
 
32
    if revision is not None:
 
33
        required_revisions.add(revision)
 
34
        try:
 
35
            rev_index = from_history.index(revision)
 
36
        except ValueError:
 
37
            rev_index = None
 
38
        if rev_index is not None:
 
39
            from_history = from_history[:rev_index + 1]
 
40
        else:
 
41
            from_history = [revision]
 
42
    to_history = to_branch.revision_history()
 
43
    missing = []
 
44
    for rev_id in from_history:
 
45
        if not has_revision(to_branch, rev_id):
 
46
            missing.append(rev_id)
 
47
    
 
48
    count = 0
 
49
    while len(missing) > 0:
 
50
        installed, failed = to_branch.install_revisions(from_branch, 
 
51
                                                        revision_ids=missing,
 
52
                                                        pb=pb)
 
53
        count += installed
 
54
        required_failed = failed.intersection(required_revisions)
 
55
        if len(required_failed) > 0:
 
56
            raise bzrlib.errors.InstallFailed(required_failed)
 
57
        for rev_id in failed:
 
58
            note("Failed to install %s" % rev_id)
 
59
        all_failed.update(failed)
 
60
        new_missing = []
 
61
        for rev_id in missing:
 
62
            try:
 
63
                revision = from_branch.get_revision(rev_id)
 
64
            except bzrlib.errors.NoSuchRevision:
 
65
                if revision in from_history:
 
66
                    raise
 
67
                else:
 
68
                    continue
 
69
            for parent in [p.revision_id for p in revision.parents]:
 
70
                if not has_revision(to_branch, parent):
 
71
                    new_missing.append(parent)
 
72
        missing = new_missing
 
73
    return count, all_failed
 
74
 
 
75
 
 
76
from testsweet import InTempDir
 
77
from bzrlib.commit import commit
 
78
def has_revision(branch, revision_id):
 
79
    try:
 
80
        branch.get_revision_xml(revision_id)
 
81
        return True
 
82
    except bzrlib.errors.NoSuchRevision:
 
83
        return False
 
84
 
 
85
class TestFetch(InTempDir):
 
86
    def runTest(self):
 
87
        def new_branch(name):
 
88
            os.mkdir(name)
 
89
            return Branch(name, init=True)
 
90
            
 
91
        #highest indices a: 5, b: 7
 
92
        br_a, br_b = make_branches()
 
93
        assert not has_revision(br_b, br_a.revision_history()[3])
 
94
        assert has_revision(br_b, br_a.revision_history()[2])
 
95
        assert len(br_b.revision_history()) == 7
 
96
        assert greedy_fetch(br_b, br_a, br_a.revision_history()[2])[0] == 0
 
97
 
 
98
        # greedy_fetch is not supposed to alter the revision history
 
99
        assert len(br_b.revision_history()) == 7
 
100
        assert not has_revision(br_b, br_a.revision_history()[3])
 
101
 
 
102
        assert len(br_b.revision_history()) == 7
 
103
        assert greedy_fetch(br_b, br_a, br_a.revision_history()[3])[0] == 1
 
104
        assert has_revision(br_b, br_a.revision_history()[3])
 
105
        assert not has_revision(br_a, br_b.revision_history()[3])
 
106
        assert not has_revision(br_a, br_b.revision_history()[4])
 
107
 
 
108
        # When a non-branch ancestor is missing, it should be a failure, not
 
109
        # exception
 
110
        br_a4 = new_branch('br_a4')
 
111
        count, failures = greedy_fetch(br_a4, br_a)
 
112
        assert count == 6
 
113
        assert failures == set((br_b.revision_history()[4],
 
114
                                br_b.revision_history()[5])) 
 
115
 
 
116
        assert greedy_fetch(br_a, br_b)[0] == 4
 
117
        assert has_revision(br_a, br_b.revision_history()[3])
 
118
        assert has_revision(br_a, br_b.revision_history()[4])
 
119
 
 
120
        br_b2 = new_branch('br_b2')
 
121
        assert greedy_fetch(br_b2, br_b)[0] == 7
 
122
        assert has_revision(br_b2, br_b.revision_history()[4])
 
123
        assert has_revision(br_b2, br_a.revision_history()[2])
 
124
        assert not has_revision(br_b2, br_a.revision_history()[3])
 
125
 
 
126
        br_a2 = new_branch('br_a2')
 
127
        assert greedy_fetch(br_a2, br_a)[0] == 9
 
128
        assert has_revision(br_a2, br_b.revision_history()[4])
 
129
        assert has_revision(br_a2, br_a.revision_history()[3])
 
130
 
 
131
        br_a3 = new_branch('br_a3')
 
132
        assert greedy_fetch(br_a3, br_a2)[0] == 0
 
133
        for revno in range(4):
 
134
            assert not has_revision(br_a3, br_a.revision_history()[revno])
 
135
        assert greedy_fetch(br_a3, br_a2, br_a.revision_history()[2])[0] == 3
 
136
        fetched = greedy_fetch(br_a3, br_a2, br_a.revision_history()[3])[0]
 
137
        assert fetched == 3, "fetched %d instead of 3" % fetched
 
138
        # InstallFailed should be raised if the branch is missing the revision
 
139
        # that was requested.
 
140
        self.assertRaises(bzrlib.errors.InstallFailed, greedy_fetch, br_a3,
 
141
                          br_a2, 'pizza')
 
142
        # InstallFailed should be raised if the branch is missing a revision
 
143
        # from its own revision history
 
144
        br_a2.append_revision('a-b-c')
 
145
        self.assertRaises(bzrlib.errors.InstallFailed, greedy_fetch, br_a3,
 
146
                          br_a2)
 
147
 
 
148
 
 
149
 
 
150
if __name__ == '__main__':
 
151
    import sys
 
152
    sys.exit(run_suite(unittest.makeSuite()))