~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/url_policy_open.py

  • Committer: Patch Queue Manager
  • Date: 2014-05-08 02:56:29 UTC
  • mfrom: (6596.1.1 bzr)
  • Revision ID: pqm@pqm.ubuntu.com-20140508025629-de62pqrditrp349y
(richard-wilbur) Jelmer: Don't pass blob to file.writelines(),
 but rather to file.write(). (Dimitri John Ledkov)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Branch opening with URL-based restrictions."""
 
18
 
 
19
from __future__ import absolute_import
 
20
 
 
21
import threading
 
22
 
 
23
from bzrlib import (
 
24
    errors,
 
25
    urlutils,
 
26
    )
 
27
from bzrlib.branch import Branch
 
28
from bzrlib.controldir import (
 
29
    ControlDir,
 
30
    )
 
31
 
 
32
 
 
33
class BadUrl(errors.BzrError):
 
34
 
 
35
    _fmt = "Tried to access a branch from bad URL %(url)s."
 
36
 
 
37
 
 
38
class BranchReferenceForbidden(errors.BzrError):
 
39
 
 
40
    _fmt = ("Trying to mirror a branch reference and the branch type "
 
41
            "does not allow references.")
 
42
 
 
43
 
 
44
class BranchLoopError(errors.BzrError):
 
45
    """Encountered a branch cycle.
 
46
 
 
47
    A URL may point to a branch reference or it may point to a stacked branch.
 
48
    In either case, it's possible for there to be a cycle in these references,
 
49
    and this exception is raised when we detect such a cycle.
 
50
    """
 
51
 
 
52
    _fmt = "Encountered a branch cycle"""
 
53
 
 
54
 
 
55
class BranchOpenPolicy(object):
 
56
    """Policy on how to open branches.
 
57
 
 
58
    In particular, a policy determines which branches are okay to open by
 
59
    checking their URLs and deciding whether or not to follow branch
 
60
    references.
 
61
    """
 
62
 
 
63
    def should_follow_references(self):
 
64
        """Whether we traverse references when mirroring.
 
65
 
 
66
        Subclasses must override this method.
 
67
 
 
68
        If we encounter a branch reference and this returns false, an error is
 
69
        raised.
 
70
 
 
71
        :returns: A boolean to indicate whether to follow a branch reference.
 
72
        """
 
73
        raise NotImplementedError(self.should_follow_references)
 
74
 
 
75
    def transform_fallback_location(self, branch, url):
 
76
        """Validate, maybe modify, 'url' to be used as a stacked-on location.
 
77
 
 
78
        :param branch:  The branch that is being opened.
 
79
        :param url: The URL that the branch provides for its stacked-on
 
80
            location.
 
81
        :return: (new_url, check) where 'new_url' is the URL of the branch to
 
82
            actually open and 'check' is true if 'new_url' needs to be
 
83
            validated by check_and_follow_branch_reference.
 
84
        """
 
85
        raise NotImplementedError(self.transform_fallback_location)
 
86
 
 
87
    def check_one_url(self, url):
 
88
        """Check a URL.
 
89
 
 
90
        Subclasses must override this method.
 
91
 
 
92
        :param url: The source URL to check.
 
93
        :raise BadUrl: subclasses are expected to raise this or a subclass
 
94
            when it finds a URL it deems to be unacceptable.
 
95
        """
 
96
        raise NotImplementedError(self.check_one_url)
 
97
 
 
98
 
 
99
class _BlacklistPolicy(BranchOpenPolicy):
 
100
    """Branch policy that forbids certain URLs.
 
101
 
 
102
    This doesn't cope with various alternative spellings of URLs,
 
103
    with e.g. url encoding. It's mostly useful for tests.
 
104
    """
 
105
 
 
106
    def __init__(self, should_follow_references, bad_urls=None):
 
107
        if bad_urls is None:
 
108
            bad_urls = set()
 
109
        self._bad_urls = bad_urls
 
110
        self._should_follow_references = should_follow_references
 
111
 
 
112
    def should_follow_references(self):
 
113
        return self._should_follow_references
 
114
 
 
115
    def check_one_url(self, url):
 
116
        if url in self._bad_urls:
 
117
            raise BadUrl(url)
 
118
 
 
119
    def transform_fallback_location(self, branch, url):
 
120
        """See `BranchOpenPolicy.transform_fallback_location`.
 
121
 
 
122
        This class is not used for testing our smarter stacking features so we
 
123
        just do the simplest thing: return the URL that would be used anyway
 
124
        and don't check it.
 
125
        """
 
126
        return urlutils.join(branch.base, url), False
 
127
 
 
128
 
 
129
class AcceptAnythingPolicy(_BlacklistPolicy):
 
130
    """Accept anything, to make testing easier."""
 
131
 
 
132
    def __init__(self):
 
133
        super(AcceptAnythingPolicy, self).__init__(True, set())
 
134
 
 
135
 
 
136
class WhitelistPolicy(BranchOpenPolicy):
 
137
    """Branch policy that only allows certain URLs."""
 
138
 
 
139
    def __init__(self, should_follow_references, allowed_urls=None,
 
140
                 check=False):
 
141
        if allowed_urls is None:
 
142
            allowed_urls = []
 
143
        self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
 
144
        self.check = check
 
145
 
 
146
    def should_follow_references(self):
 
147
        return self._should_follow_references
 
148
 
 
149
    def check_one_url(self, url):
 
150
        if url.rstrip('/') not in self.allowed_urls:
 
151
            raise BadUrl(url)
 
152
 
 
153
    def transform_fallback_location(self, branch, url):
 
154
        """See `BranchOpenPolicy.transform_fallback_location`.
 
155
 
 
156
        Here we return the URL that would be used anyway and optionally check
 
157
        it.
 
158
        """
 
159
        return urlutils.join(branch.base, url), self.check
 
160
 
 
161
 
 
162
class SingleSchemePolicy(BranchOpenPolicy):
 
163
    """Branch open policy that rejects URLs not on the given scheme."""
 
164
 
 
165
    def __init__(self, allowed_scheme):
 
166
        self.allowed_scheme = allowed_scheme
 
167
 
 
168
    def should_follow_references(self):
 
169
        return True
 
170
 
 
171
    def transform_fallback_location(self, branch, url):
 
172
        return urlutils.join(branch.base, url), True
 
173
 
 
174
    def check_one_url(self, url):
 
175
        """Check that `url` is okay to open."""
 
176
        if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
 
177
            raise BadUrl(url)
 
178
 
 
179
 
 
180
class BranchOpener(object):
 
181
    """Branch opener which uses a URL policy.
 
182
 
 
183
    All locations that are opened (stacked-on branches, references) are
 
184
    checked against a policy object.
 
185
 
 
186
    The policy object is expected to have the following methods:
 
187
    * check_one_url 
 
188
    * should_follow_references
 
189
    * transform_fallback_location
 
190
    """
 
191
 
 
192
    _threading_data = threading.local()
 
193
 
 
194
    def __init__(self, policy, probers=None):
 
195
        """Create a new BranchOpener.
 
196
 
 
197
        :param policy: The opener policy to use.
 
198
        :param probers: Optional list of probers to allow.
 
199
            Defaults to local and remote bzr probers.
 
200
        """
 
201
        self.policy = policy
 
202
        self._seen_urls = set()
 
203
        self.probers = probers
 
204
 
 
205
    @classmethod
 
206
    def install_hook(cls):
 
207
        """Install the ``transform_fallback_location`` hook.
 
208
 
 
209
        This is done at module import time, but transform_fallback_locationHook
 
210
        doesn't do anything unless the `_active_openers` threading.Local
 
211
        object has a 'opener' attribute in this thread.
 
212
 
 
213
        This is in a module-level function rather than performed at module
 
214
        level so that it can be called in setUp for testing `BranchOpener`
 
215
        as bzrlib.tests.TestCase.setUp clears hooks.
 
216
        """
 
217
        Branch.hooks.install_named_hook(
 
218
            'transform_fallback_location',
 
219
            cls.transform_fallback_locationHook,
 
220
            'BranchOpener.transform_fallback_locationHook')
 
221
 
 
222
    def check_and_follow_branch_reference(self, url):
 
223
        """Check URL (and possibly the referenced URL).
 
224
 
 
225
        This method checks that `url` passes the policy's `check_one_url`
 
226
        method, and if `url` refers to a branch reference, it checks whether
 
227
        references are allowed and whether the reference's URL passes muster
 
228
        also -- recursively, until a real branch is found.
 
229
 
 
230
        :param url: URL to check
 
231
        :raise BranchLoopError: If the branch references form a loop.
 
232
        :raise BranchReferenceForbidden: If this opener forbids branch
 
233
            references.
 
234
        """
 
235
        while True:
 
236
            if url in self._seen_urls:
 
237
                raise BranchLoopError()
 
238
            self._seen_urls.add(url)
 
239
            self.policy.check_one_url(url)
 
240
            next_url = self.follow_reference(url)
 
241
            if next_url is None:
 
242
                return url
 
243
            url = next_url
 
244
            if not self.policy.should_follow_references():
 
245
                raise BranchReferenceForbidden(url)
 
246
 
 
247
    @classmethod
 
248
    def transform_fallback_locationHook(cls, branch, url):
 
249
        """Installed as the 'transform_fallback_location' Branch hook.
 
250
 
 
251
        This method calls `transform_fallback_location` on the policy object and
 
252
        either returns the url it provides or passes it back to
 
253
        check_and_follow_branch_reference.
 
254
        """
 
255
        try:
 
256
            opener = getattr(cls._threading_data, "opener")
 
257
        except AttributeError:
 
258
            return url
 
259
        new_url, check = opener.policy.transform_fallback_location(branch, url)
 
260
        if check:
 
261
            return opener.check_and_follow_branch_reference(new_url)
 
262
        else:
 
263
            return new_url
 
264
 
 
265
    def run_with_transform_fallback_location_hook_installed(
 
266
            self, callable, *args, **kw):
 
267
        if (self.transform_fallback_locationHook not in
 
268
                Branch.hooks['transform_fallback_location']):
 
269
            raise AssertionError("hook not installed")
 
270
        self._threading_data.opener = self
 
271
        try:
 
272
            return callable(*args, **kw)
 
273
        finally:
 
274
            del self._threading_data.opener
 
275
            # We reset _seen_urls here to avoid multiple calls to open giving
 
276
            # spurious loop exceptions.
 
277
            self._seen_urls = set()
 
278
 
 
279
    def follow_reference(self, url):
 
280
        """Get the branch-reference value at the specified url.
 
281
 
 
282
        This exists as a separate method only to be overriden in unit tests.
 
283
        """
 
284
        bzrdir = ControlDir.open(url, probers=self.probers)
 
285
        return bzrdir.get_branch_reference()
 
286
 
 
287
    def open(self, url):
 
288
        """Open the Bazaar branch at url, first checking it.
 
289
 
 
290
        What is acceptable means is defined by the policy's `follow_reference` and
 
291
        `check_one_url` methods.
 
292
        """
 
293
        if type(url) != str:
 
294
            raise TypeError
 
295
 
 
296
        url = self.check_and_follow_branch_reference(url)
 
297
 
 
298
        def open_branch(url):
 
299
            dir = ControlDir.open(url, probers=self.probers)
 
300
            return dir.open_branch()
 
301
        return self.run_with_transform_fallback_location_hook_installed(
 
302
            open_branch, url)
 
303
 
 
304
 
 
305
def open_only_scheme(allowed_scheme, url):
 
306
    """Open the branch at `url`, only accessing URLs on `allowed_scheme`.
 
307
 
 
308
    :raises BadUrl: An attempt was made to open a URL that was not on
 
309
        `allowed_scheme`.
 
310
    """
 
311
    return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
 
312
 
 
313
 
 
314
BranchOpener.install_hook()