13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Implementation of Transport that prevents access to locations above a set
20
from urlparse import urlparse
22
from bzrlib import errors, urlutils
21
from __future__ import absolute_import
23
23
from bzrlib.transport import (
25
25
register_transport,
30
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
31
from bzrlib.transport.memory import MemoryTransport
34
class ChrootServer(Server):
29
class ChrootServer(pathfilter.PathFilteringServer):
35
30
"""User space 'chroot' facility.
37
32
The server's get_url returns the url for a chroot transport mapped to the
38
33
backing transport. The url is of the form chroot-xxx:/// so parent
39
34
directories of the backing transport are not visible. The chroot url will
40
35
not allow '..' sequences to result in requests to the chroot affecting
41
36
directories outside the backing transport.
38
PathFilteringServer does all the path sanitation needed to enforce a
39
chroot, so this is a simple subclass of PathFilteringServer that ignores
44
43
def __init__(self, backing_transport):
45
self.backing_transport = backing_transport
44
pathfilter.PathFilteringServer.__init__(self, backing_transport, None)
47
46
def _factory(self, url):
48
assert url.startswith(self.scheme)
49
47
return ChrootTransport(self, url)
49
def start_server(self):
55
50
self.scheme = 'chroot-%d:///' % id(self)
56
51
register_transport(self.scheme, self._factory)
59
unregister_transport(self.scheme, self._factory)
62
class ChrootTransport(Transport):
54
class ChrootTransport(pathfilter.PathFilteringTransport):
63
55
"""A ChrootTransport.
65
57
Please see ChrootServer for details.
68
def __init__(self, server, base):
70
if not base.endswith('/'):
72
Transport.__init__(self, base)
73
self.base_path = self.base[len(self.server.scheme)-1:]
74
self.scheme = self.server.scheme
76
def _call(self, methodname, relpath, *args):
77
method = getattr(self.server.backing_transport, methodname)
78
return method(self._safe_relpath(relpath), *args)
80
def _safe_relpath(self, relpath):
81
safe_relpath = self._combine_paths(self.base_path, relpath)
82
assert safe_relpath.startswith('/')
83
return safe_relpath[1:]
86
def abspath(self, relpath):
87
return self.scheme + self._safe_relpath(relpath)
89
def append_file(self, relpath, f, mode=None):
90
return self._call('append_file', relpath, f, mode)
92
def clone(self, relpath):
93
return ChrootTransport(self.server, self.abspath(relpath))
95
def delete(self, relpath):
96
return self._call('delete', relpath)
98
def delete_tree(self, relpath):
99
return self._call('delete_tree', relpath)
101
def get(self, relpath):
102
return self._call('get', relpath)
104
def has(self, relpath):
105
return self._call('has', relpath)
107
def iter_files_recursive(self):
108
backing_transport = self.server.backing_transport.clone(
109
self._safe_relpath('.'))
110
return backing_transport.iter_files_recursive()
113
return self.server.backing_transport.listable()
115
def list_dir(self, relpath):
116
return self._call('list_dir', relpath)
118
def lock_read(self, relpath):
119
return self._call('lock_read', relpath)
121
def lock_write(self, relpath):
122
return self._call('lock_write', relpath)
124
def mkdir(self, relpath, mode=None):
125
return self._call('mkdir', relpath, mode)
127
def put_file(self, relpath, f, mode=None):
128
return self._call('put_file', relpath, f, mode)
130
def rename(self, rel_from, rel_to):
131
return self._call('rename', rel_from, self._safe_relpath(rel_to))
133
def rmdir(self, relpath):
134
return self._call('rmdir', relpath)
136
def stat(self, relpath):
137
return self._call('stat', relpath)
140
class TestingChrootServer(ChrootServer):
143
"""TestingChrootServer is not usable until setUp is called."""
145
def setUp(self, backing_server=None):
146
"""Setup the Chroot on backing_server."""
147
if backing_server is not None:
148
self.backing_transport = get_transport(backing_server.get_url())
150
self.backing_transport = get_transport('.')
151
ChrootServer.setUp(self)
60
def _filter(self, relpath):
61
# A simplified version of PathFilteringTransport's _filter that omits
62
# the call to self.server.filter_func.
63
return self._relpath_from_server_root(relpath)
154
66
def get_test_permutations():
155
67
"""Return the permutations to be used in testing."""
156
return [(ChrootTransport, TestingChrootServer),
68
from bzrlib.tests import test_server
69
return [(ChrootTransport, test_server.TestingChrootServer)]