~bzr-pqm/bzr/bzr.dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Copyright (C) 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


import webbrowser

from bzrlib import (
    errors,
    msgeditor,
)
from bzrlib.hooks import HookPoint, Hooks
from bzrlib.plugins.launchpad import (
    lp_api,
    lp_registration,
)

from lazr.restfulclient import errors as restful_errors


class ProposeMergeHooks(Hooks):
    """Hooks for proposing a merge on Launchpad."""

    def __init__(self):
        Hooks.__init__(self)
        self.create_hook(
            HookPoint(
                'get_prerequisite',
                "Return the prerequisite branch for proposing as merge.",
                (2, 1), None),
        )
        self.create_hook(
            HookPoint(
                'merge_proposal_body',
                "Return an initial body for the merge proposal message.",
                (2, 1), None),
        )


class Proposer(object):

    hooks = ProposeMergeHooks()

    def __init__(self, tree, source_branch, target_branch, message, reviews,
                 staging=False):
        """Constructor.

        :param tree: The working tree for the source branch.
        :param source_branch: The branch to propose for merging.
        :param target_branch: The branch to merge into.
        :param message: The commit message to use.  (May be None.)
        :param reviews: A list of tuples of reviewer, review type.
        :param staging: If True, propose the merge against staging instead of
            production.
        """
        self.tree = tree
        if staging:
            lp_instance = 'staging'
        else:
            lp_instance = 'edge'
        service = lp_registration.LaunchpadService(lp_instance=lp_instance)
        self.launchpad = lp_api.login(service)
        self.source_branch = lp_api.LaunchpadBranch.from_bzr(
            self.launchpad, source_branch)
        if target_branch is None:
            self.target_branch = self.source_branch.get_dev_focus()
        else:
            self.target_branch = lp_api.LaunchpadBranch.from_bzr(
                self.launchpad, target_branch)
        self.commit_message = message
        if reviews == []:
            target_reviewer = self.target_branch.lp.reviewer
            if target_reviewer is None:
                raise errors.BzrCommandError('No reviewer specified')
            self.reviews = [(target_reviewer, '')]
        else:
            self.reviews = [(self.launchpad.people[reviewer], review_type)
                            for reviewer, review_type in
                            reviews]

    def get_comment(self, prerequisite_branch):
        """Determine the initial comment for the merge proposal."""
        info = ["Source: %s\n" % self.source_branch.lp.bzr_identity]
        info.append("Target: %s\n" % self.target_branch.lp.bzr_identity)
        if prerequisite_branch is not None:
            info.append("Prereq: %s\n" % prerequisite_branch.lp.bzr_identity)
        for rdata in self.reviews:
            uniquename = "%s (%s)" % (rdata[0].display_name, rdata[0].name)
            info.append('Reviewer: %s, type "%s"\n' % (uniquename, rdata[1]))
        self.source_branch.bzr.lock_read()
        try:
            self.target_branch.bzr.lock_read()
            try:
                body = self.get_initial_body()
            finally:
                self.target_branch.bzr.unlock()
        finally:
            self.source_branch.bzr.unlock()
        initial_comment = msgeditor.edit_commit_message(''.join(info),
                                                        start_message=body)
        return initial_comment.strip().encode('utf-8')

    def get_initial_body(self):
        """Get a body for the proposal for the user to modify.

        :return: a str or None.
        """
        def list_modified_files():
            lca_tree = self.source_branch.find_lca_tree(
                self.target_branch)
            source_tree = self.source_branch.bzr.basis_tree()
            files = modified_files(lca_tree, source_tree)
            return list(files)
        target_loc = ('bzr+ssh://bazaar.launchpad.net/%s' %
                       self.target_branch.lp.unique_name)
        body = None
        for hook in self.hooks['merge_proposal_body']:
            body = hook({
                'tree': self.tree,
                'target_branch': target_loc,
                'modified_files_callback': list_modified_files,
                'old_body': body,
            })
        return body

    def check_proposal(self):
        """Check that the submission is sensible."""
        if self.source_branch.lp.self_link == self.target_branch.lp.self_link:
            raise errors.BzrCommandError(
                'Source and target branches must be different.')
        for mp in self.source_branch.lp.landing_targets:
            if mp.queue_status in ('Merged', 'Rejected'):
                continue
            if mp.target_branch.self_link == self.target_branch.lp.self_link:
                raise errors.BzrCommandError(
                    'There is already a branch merge proposal: %s' %
                    canonical_url(mp))

    def _get_prerequisite_branch(self):
        hooks = self.hooks['get_prerequisite']
        prerequisite_branch = None
        for hook in hooks:
            prerequisite_branch = hook(
                {'launchpad': self.launchpad,
                 'source_branch': self.source_branch,
                 'target_branch': self.target_branch,
                 'prerequisite_branch': prerequisite_branch})
        return prerequisite_branch

    def create_proposal(self):
        """Perform the submission."""
        prerequisite_branch = self._get_prerequisite_branch()
        if prerequisite_branch is None:
            prereq = None
        else:
            prereq = prerequisite_branch.lp
            prerequisite_branch.update_lp()
        self.source_branch.update_lp()
        reviewers = []
        review_types = []
        for reviewer, review_type in self.reviews:
            review_types.append(review_type)
            reviewers.append(reviewer.self_link)
        initial_comment = self.get_comment(prerequisite_branch)
        try:
            mp = self.source_branch.lp.createMergeProposal(
                target_branch=self.target_branch.lp,
                prerequisite_branch=prereq,
                initial_comment=initial_comment,
                commit_message=self.commit_message, reviewers=reviewers,
                review_types=review_types)
        except restful_errors.HTTPError, e:
            error_lines = []
            for line in e.content.splitlines():
                if line.startswith('Traceback (most recent call last):'):
                    break
                error_lines.append(line)
            raise Exception(''.join(error_lines))
        else:
            webbrowser.open(canonical_url(mp))


def modified_files(old_tree, new_tree):
    """Return a list of paths in the new tree with modified contents."""
    for f, (op, path), c, v, p, n, (ok, k), e in new_tree.iter_changes(
        old_tree):
        if c and k == 'file':
            yield str(path)


def canonical_url(object):
    """Return the canonical URL for a branch."""
    url = object.self_link.replace('https://api.', 'https://code.')
    return url.replace('/beta/', '/')