~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/chroot.py

  • Committer: John Arbash Meinel
  • Date: 2007-04-26 22:56:01 UTC
  • mto: This revision was merged to the branch mainline in revision 2471.
  • Revision ID: john@arbash-meinel.com-20070426225601-ae4qfcb8bzcfomny
Clean up the (failing) test so that the last thing
to fail is what I'm fixing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Implementation of Transport that prevents access to locations above a set
 
18
root.
 
19
"""
 
20
from urlparse import urlparse
 
21
 
 
22
from bzrlib import errors, urlutils
 
23
from bzrlib.transport import (
 
24
    get_transport,
 
25
    register_transport,
 
26
    Server,
 
27
    Transport,
 
28
    unregister_transport,
 
29
    )
 
30
from bzrlib.transport.decorator import TransportDecorator, DecoratorServer
 
31
from bzrlib.transport.memory import MemoryTransport
 
32
 
 
33
 
 
34
class ChrootServer(Server):
 
35
    """User space 'chroot' facility.
 
36
    
 
37
    The server's get_url returns the url for a chroot transport mapped to the
 
38
    backing transport. The url is of the form chroot-xxx:/// so parent
 
39
    directories of the backing transport are not visible. The chroot url will
 
40
    not allow '..' sequences to result in requests to the chroot affecting
 
41
    directories outside the backing transport.
 
42
    """
 
43
 
 
44
    def __init__(self, backing_transport):
 
45
        self.backing_transport = backing_transport
 
46
 
 
47
    def _factory(self, url):
 
48
        assert url.startswith(self.scheme)
 
49
        return ChrootTransport(self, url)
 
50
 
 
51
    def get_url(self):
 
52
        return self.scheme
 
53
 
 
54
    def setUp(self):
 
55
        self.scheme = 'chroot-%d:///' % id(self)
 
56
        register_transport(self.scheme, self._factory)
 
57
 
 
58
    def tearDown(self):
 
59
        unregister_transport(self.scheme, self._factory)
 
60
 
 
61
 
 
62
class ChrootTransport(Transport):
 
63
    """A ChrootTransport.
 
64
 
 
65
    Please see ChrootServer for details.
 
66
    """
 
67
 
 
68
    def __init__(self, server, base):
 
69
        self.server = server
 
70
        if not base.endswith('/'):
 
71
            base += '/'
 
72
        Transport.__init__(self, base)
 
73
        self.base_path = self.base[len(self.server.scheme)-1:]
 
74
        self.scheme = self.server.scheme
 
75
 
 
76
    def _call(self, methodname, relpath, *args):
 
77
        method = getattr(self.server.backing_transport, methodname)
 
78
        return method(self._safe_relpath(relpath), *args)
 
79
 
 
80
    def _safe_relpath(self, relpath):
 
81
        safe_relpath = self._combine_paths(self.base_path, relpath)
 
82
        assert safe_relpath.startswith('/')
 
83
        return safe_relpath[1:]
 
84
 
 
85
    # Transport methods
 
86
    def abspath(self, relpath):
 
87
        return self.scheme + self._safe_relpath(relpath)
 
88
 
 
89
    def append_file(self, relpath, f, mode=None):
 
90
        return self._call('append_file', relpath, f, mode)
 
91
 
 
92
    def clone(self, relpath):
 
93
        return ChrootTransport(self.server, self.abspath(relpath))
 
94
 
 
95
    def delete(self, relpath):
 
96
        return self._call('delete', relpath)
 
97
 
 
98
    def delete_tree(self, relpath):
 
99
        return self._call('delete_tree', relpath)
 
100
 
 
101
    def get(self, relpath):
 
102
        return self._call('get', relpath)
 
103
 
 
104
    def has(self, relpath):
 
105
        return self._call('has', relpath)
 
106
 
 
107
    def iter_files_recursive(self):
 
108
        backing_transport = self.server.backing_transport.clone(
 
109
            self._safe_relpath('.'))
 
110
        return backing_transport.iter_files_recursive()
 
111
 
 
112
    def listable(self):
 
113
        return self.server.backing_transport.listable()
 
114
 
 
115
    def list_dir(self, relpath):
 
116
        return self._call('list_dir', relpath)
 
117
 
 
118
    def lock_read(self, relpath):
 
119
        return self._call('lock_read', relpath)
 
120
 
 
121
    def lock_write(self, relpath):
 
122
        return self._call('lock_write', relpath)
 
123
 
 
124
    def mkdir(self, relpath, mode=None):
 
125
        return self._call('mkdir', relpath, mode)
 
126
 
 
127
    def put_file(self, relpath, f, mode=None):
 
128
        return self._call('put_file', relpath, f, mode)
 
129
 
 
130
    def rename(self, rel_from, rel_to):
 
131
        return self._call('rename', rel_from, self._safe_relpath(rel_to))
 
132
 
 
133
    def rmdir(self, relpath):
 
134
        return self._call('rmdir', relpath)
 
135
 
 
136
    def stat(self, relpath):
 
137
        return self._call('stat', relpath)
 
138
 
 
139
 
 
140
class TestingChrootServer(ChrootServer):
 
141
 
 
142
    def __init__(self):
 
143
        """TestingChrootServer is not usable until setUp is called."""
 
144
 
 
145
    def setUp(self, backing_server=None):
 
146
        """Setup the Chroot on backing_server."""
 
147
        if backing_server is not None:
 
148
            self.backing_transport = get_transport(backing_server.get_url())
 
149
        else:
 
150
            self.backing_transport = get_transport('.')
 
151
        ChrootServer.setUp(self)
 
152
 
 
153
 
 
154
def get_test_permutations():
 
155
    """Return the permutations to be used in testing."""
 
156
    return [(ChrootTransport, TestingChrootServer),
 
157
            ]