1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Implementation of Transport that prevents access to locations above a set
20
from urlparse import urlparse
22
from bzrlib import errors, urlutils
23
from bzrlib.transport import (
30
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
31
from bzrlib.transport.memory import MemoryTransport
34
class ChrootServer(Server):
35
"""User space 'chroot' facility.
37
The server's get_url returns the url for a chroot transport mapped to the
38
backing transport. The url is of the form chroot-xxx:/// so parent
39
directories of the backing transport are not visible. The chroot url will
40
not allow '..' sequences to result in requests to the chroot affecting
41
directories outside the backing transport.
44
def __init__(self, backing_transport):
45
self.backing_transport = backing_transport
47
def _factory(self, url):
48
assert url.startswith(self.scheme)
49
return ChrootTransport(self, url)
55
self.scheme = 'chroot-%d:///' % id(self)
56
register_transport(self.scheme, self._factory)
59
unregister_transport(self.scheme, self._factory)
62
class ChrootTransport(Transport):
65
Please see ChrootServer for details.
68
def __init__(self, server, base):
70
if not base.endswith('/'):
72
Transport.__init__(self, base)
73
self.base_path = self.base[len(self.server.scheme)-1:]
74
self.scheme = self.server.scheme
76
def _call(self, methodname, relpath, *args):
77
method = getattr(self.server.backing_transport, methodname)
78
return method(self._safe_relpath(relpath), *args)
80
def _safe_relpath(self, relpath):
81
safe_relpath = self._combine_paths(self.base_path, relpath)
82
assert safe_relpath.startswith('/')
83
return safe_relpath[1:]
86
def abspath(self, relpath):
87
return self.scheme + self._safe_relpath(relpath)
89
def append_file(self, relpath, f, mode=None):
90
return self._call('append_file', relpath, f, mode)
92
def _can_roundtrip_unix_modebits(self):
93
return self.server.backing_transport._can_roundtrip_unix_modebits()
95
def clone(self, relpath):
96
return ChrootTransport(self.server, self.abspath(relpath))
98
def delete(self, relpath):
99
return self._call('delete', relpath)
101
def delete_tree(self, relpath):
102
return self._call('delete_tree', relpath)
104
def external_url(self):
105
"""See bzrlib.transport.Transport.external_url."""
106
# Chroots, like MemoryTransport depend on in-process
107
# state and thus the base cannot simply be handed out.
108
# See the base class docstring for more details and
109
# possible directions. For now we return the chrooted
111
return self.server.backing_transport.external_url()
113
def get(self, relpath):
114
return self._call('get', relpath)
116
def has(self, relpath):
117
return self._call('has', relpath)
119
def iter_files_recursive(self):
120
backing_transport = self.server.backing_transport.clone(
121
self._safe_relpath('.'))
122
return backing_transport.iter_files_recursive()
125
return self.server.backing_transport.listable()
127
def list_dir(self, relpath):
128
return self._call('list_dir', relpath)
130
def lock_read(self, relpath):
131
return self._call('lock_read', relpath)
133
def lock_write(self, relpath):
134
return self._call('lock_write', relpath)
136
def mkdir(self, relpath, mode=None):
137
return self._call('mkdir', relpath, mode)
139
def open_write_stream(self, relpath, mode=None):
140
return self._call('open_write_stream', relpath, mode)
142
def put_file(self, relpath, f, mode=None):
143
return self._call('put_file', relpath, f, mode)
145
def rename(self, rel_from, rel_to):
146
return self._call('rename', rel_from, self._safe_relpath(rel_to))
148
def rmdir(self, relpath):
149
return self._call('rmdir', relpath)
151
def stat(self, relpath):
152
return self._call('stat', relpath)
155
class TestingChrootServer(ChrootServer):
158
"""TestingChrootServer is not usable until setUp is called."""
160
def setUp(self, backing_server=None):
161
"""Setup the Chroot on backing_server."""
162
if backing_server is not None:
163
self.backing_transport = get_transport(backing_server.get_url())
165
self.backing_transport = get_transport('.')
166
ChrootServer.setUp(self)
169
def get_test_permutations():
170
"""Return the permutations to be used in testing."""
171
return [(ChrootTransport, TestingChrootServer),