1
# Copyright (C) 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Branch opening with URL-based restrictions."""
19
from __future__ import absolute_import
27
from bzrlib.branch import Branch
28
from bzrlib.controldir import (
33
class BadUrl(errors.BzrError):
35
_fmt = "Tried to access a branch from bad URL %(url)s."
38
class BranchReferenceForbidden(errors.BzrError):
40
_fmt = ("Trying to mirror a branch reference and the branch type "
41
"does not allow references.")
44
class BranchLoopError(errors.BzrError):
45
"""Encountered a branch cycle.
47
A URL may point to a branch reference or it may point to a stacked branch.
48
In either case, it's possible for there to be a cycle in these references,
49
and this exception is raised when we detect such a cycle.
52
_fmt = "Encountered a branch cycle"""
55
class BranchOpenPolicy(object):
56
"""Policy on how to open branches.
58
In particular, a policy determines which branches are okay to open by
59
checking their URLs and deciding whether or not to follow branch
63
def should_follow_references(self):
64
"""Whether we traverse references when mirroring.
66
Subclasses must override this method.
68
If we encounter a branch reference and this returns false, an error is
71
:returns: A boolean to indicate whether to follow a branch reference.
73
raise NotImplementedError(self.should_follow_references)
75
def transform_fallback_location(self, branch, url):
76
"""Validate, maybe modify, 'url' to be used as a stacked-on location.
78
:param branch: The branch that is being opened.
79
:param url: The URL that the branch provides for its stacked-on
81
:return: (new_url, check) where 'new_url' is the URL of the branch to
82
actually open and 'check' is true if 'new_url' needs to be
83
validated by check_and_follow_branch_reference.
85
raise NotImplementedError(self.transform_fallback_location)
87
def check_one_url(self, url):
90
Subclasses must override this method.
92
:param url: The source URL to check.
93
:raise BadUrl: subclasses are expected to raise this or a subclass
94
when it finds a URL it deems to be unacceptable.
96
raise NotImplementedError(self.check_one_url)
99
class _BlacklistPolicy(BranchOpenPolicy):
100
"""Branch policy that forbids certain URLs.
102
This doesn't cope with various alternative spellings of URLs,
103
with e.g. url encoding. It's mostly useful for tests.
106
def __init__(self, should_follow_references, bad_urls=None):
109
self._bad_urls = bad_urls
110
self._should_follow_references = should_follow_references
112
def should_follow_references(self):
113
return self._should_follow_references
115
def check_one_url(self, url):
116
if url in self._bad_urls:
119
def transform_fallback_location(self, branch, url):
120
"""See `BranchOpenPolicy.transform_fallback_location`.
122
This class is not used for testing our smarter stacking features so we
123
just do the simplest thing: return the URL that would be used anyway
126
return urlutils.join(branch.base, url), False
129
class AcceptAnythingPolicy(_BlacklistPolicy):
130
"""Accept anything, to make testing easier."""
133
super(AcceptAnythingPolicy, self).__init__(True, set())
136
class WhitelistPolicy(BranchOpenPolicy):
137
"""Branch policy that only allows certain URLs."""
139
def __init__(self, should_follow_references, allowed_urls=None,
141
if allowed_urls is None:
143
self.allowed_urls = set(url.rstrip('/') for url in allowed_urls)
146
def should_follow_references(self):
147
return self._should_follow_references
149
def check_one_url(self, url):
150
if url.rstrip('/') not in self.allowed_urls:
153
def transform_fallback_location(self, branch, url):
154
"""See `BranchOpenPolicy.transform_fallback_location`.
156
Here we return the URL that would be used anyway and optionally check
159
return urlutils.join(branch.base, url), self.check
162
class SingleSchemePolicy(BranchOpenPolicy):
163
"""Branch open policy that rejects URLs not on the given scheme."""
165
def __init__(self, allowed_scheme):
166
self.allowed_scheme = allowed_scheme
168
def should_follow_references(self):
171
def transform_fallback_location(self, branch, url):
172
return urlutils.join(branch.base, url), True
174
def check_one_url(self, url):
175
"""Check that `url` is okay to open."""
176
if urlutils.URL.from_string(str(url)).scheme != self.allowed_scheme:
180
class BranchOpener(object):
181
"""Branch opener which uses a URL policy.
183
All locations that are opened (stacked-on branches, references) are
184
checked against a policy object.
186
The policy object is expected to have the following methods:
188
* should_follow_references
189
* transform_fallback_location
192
_threading_data = threading.local()
194
def __init__(self, policy, probers=None):
195
"""Create a new BranchOpener.
197
:param policy: The opener policy to use.
198
:param probers: Optional list of probers to allow.
199
Defaults to local and remote bzr probers.
202
self._seen_urls = set()
203
self.probers = probers
206
def install_hook(cls):
207
"""Install the ``transform_fallback_location`` hook.
209
This is done at module import time, but transform_fallback_locationHook
210
doesn't do anything unless the `_active_openers` threading.Local
211
object has a 'opener' attribute in this thread.
213
This is in a module-level function rather than performed at module
214
level so that it can be called in setUp for testing `BranchOpener`
215
as bzrlib.tests.TestCase.setUp clears hooks.
217
Branch.hooks.install_named_hook(
218
'transform_fallback_location',
219
cls.transform_fallback_locationHook,
220
'BranchOpener.transform_fallback_locationHook')
222
def check_and_follow_branch_reference(self, url):
223
"""Check URL (and possibly the referenced URL).
225
This method checks that `url` passes the policy's `check_one_url`
226
method, and if `url` refers to a branch reference, it checks whether
227
references are allowed and whether the reference's URL passes muster
228
also -- recursively, until a real branch is found.
230
:param url: URL to check
231
:raise BranchLoopError: If the branch references form a loop.
232
:raise BranchReferenceForbidden: If this opener forbids branch
236
if url in self._seen_urls:
237
raise BranchLoopError()
238
self._seen_urls.add(url)
239
self.policy.check_one_url(url)
240
next_url = self.follow_reference(url)
244
if not self.policy.should_follow_references():
245
raise BranchReferenceForbidden(url)
248
def transform_fallback_locationHook(cls, branch, url):
249
"""Installed as the 'transform_fallback_location' Branch hook.
251
This method calls `transform_fallback_location` on the policy object and
252
either returns the url it provides or passes it back to
253
check_and_follow_branch_reference.
256
opener = getattr(cls._threading_data, "opener")
257
except AttributeError:
259
new_url, check = opener.policy.transform_fallback_location(branch, url)
261
return opener.check_and_follow_branch_reference(new_url)
265
def run_with_transform_fallback_location_hook_installed(
266
self, callable, *args, **kw):
267
if (self.transform_fallback_locationHook not in
268
Branch.hooks['transform_fallback_location']):
269
raise AssertionError("hook not installed")
270
self._threading_data.opener = self
272
return callable(*args, **kw)
274
del self._threading_data.opener
275
# We reset _seen_urls here to avoid multiple calls to open giving
276
# spurious loop exceptions.
277
self._seen_urls = set()
279
def follow_reference(self, url):
280
"""Get the branch-reference value at the specified url.
282
This exists as a separate method only to be overriden in unit tests.
284
bzrdir = ControlDir.open(url, probers=self.probers)
285
return bzrdir.get_branch_reference()
288
"""Open the Bazaar branch at url, first checking it.
290
What is acceptable means is defined by the policy's `follow_reference` and
291
`check_one_url` methods.
296
url = self.check_and_follow_branch_reference(url)
298
def open_branch(url):
299
dir = ControlDir.open(url, probers=self.probers)
300
return dir.open_branch()
301
return self.run_with_transform_fallback_location_hook_installed(
305
def open_only_scheme(allowed_scheme, url):
306
"""Open the branch at `url`, only accessing URLs on `allowed_scheme`.
308
:raises BadUrl: An attempt was made to open a URL that was not on
311
return BranchOpener(SingleSchemePolicy(allowed_scheme)).open(url)
314
BranchOpener.install_hook()