~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_wsgi.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-05-04 12:10:51 UTC
  • mfrom: (5819.1.4 777007-developer-doc)
  • Revision ID: pqm@pqm.ubuntu.com-20110504121051-aovlsmqiivjmc4fc
(jelmer) Small fixes to developer documentation. (Jonathan Riddell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2009, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for WSGI application"""
18
18
 
19
19
from cStringIO import StringIO
20
20
 
21
21
from bzrlib import tests
 
22
from bzrlib.smart import medium, protocol
22
23
from bzrlib.transport.http import wsgi
23
24
from bzrlib.transport import chroot, memory
24
25
 
25
26
 
26
 
class TestWSGI(tests.TestCase):
27
 
 
28
 
    def setUp(self):
29
 
        tests.TestCase.setUp(self)
30
 
        self.status = None
31
 
        self.headers = None
 
27
class WSGITestMixin(object):
32
28
 
33
29
    def build_environ(self, updates=None):
34
30
        """Builds an environ dict with all fields required by PEP 333.
35
 
        
 
31
 
36
32
        :param updates: a dict to that will be incorporated into the returned
37
33
            dict using dict.update(updates).
38
34
        """
57
53
        if updates is not None:
58
54
            environ.update(updates)
59
55
        return environ
60
 
        
 
56
 
61
57
    def read_response(self, iterable):
62
58
        response = ''
63
59
        for string in iterable:
68
64
        self.status = status
69
65
        self.headers = headers
70
66
 
 
67
 
 
68
class TestWSGI(tests.TestCase, WSGITestMixin):
 
69
 
 
70
    def setUp(self):
 
71
        tests.TestCase.setUp(self)
 
72
        self.status = None
 
73
        self.headers = None
 
74
 
71
75
    def test_construct(self):
72
76
        app = wsgi.SmartWSGIApp(FakeTransport())
73
77
        self.assertIsInstance(
74
 
            app.backing_transport, chroot.ChrootTransportDecorator)
 
78
            app.backing_transport, chroot.ChrootTransport)
75
79
 
76
80
    def test_http_get_rejected(self):
77
81
        # GET requests are rejected.
81
85
        self.read_response(iterable)
82
86
        self.assertEqual('405 Method not allowed', self.status)
83
87
        self.assertTrue(('Allow', 'POST') in self.headers)
84
 
        
 
88
 
 
89
    def _fake_make_request(self, transport, write_func, bytes, rcp):
 
90
        request = FakeRequest(transport, write_func)
 
91
        request.accept_bytes(bytes)
 
92
        self.request = request
 
93
        return request
 
94
 
85
95
    def test_smart_wsgi_app_uses_given_relpath(self):
86
96
        # The SmartWSGIApp should use the "bzrlib.relpath" field from the
87
 
        # WSGI environ to construct the transport for this request, by cloning
88
 
        # its base transport with the given relpath.
 
97
        # WSGI environ to clone from its backing transport to get a specific
 
98
        # transport for this request.
89
99
        transport = FakeTransport()
90
100
        wsgi_app = wsgi.SmartWSGIApp(transport)
91
 
        def make_request(transport, write_func):
92
 
            request = FakeRequest(transport, write_func)
93
 
            self.request = request
94
 
            return request
95
 
        wsgi_app.make_request = make_request
 
101
        wsgi_app.backing_transport = transport
 
102
        wsgi_app.make_request = self._fake_make_request
96
103
        fake_input = StringIO('fake request')
97
104
        environ = self.build_environ({
98
105
            'REQUEST_METHOD': 'POST',
102
109
        })
103
110
        iterable = wsgi_app(environ, self.start_response)
104
111
        response = self.read_response(iterable)
105
 
        self.assertEqual([('clone', 'foo/bar')] , transport.calls)
 
112
        self.assertEqual([('clone', 'foo/bar/')] , transport.calls)
106
113
 
107
114
    def test_smart_wsgi_app_request_and_response(self):
108
115
        # SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
111
118
        transport = memory.MemoryTransport()
112
119
        transport.put_bytes('foo', 'some bytes')
113
120
        wsgi_app = wsgi.SmartWSGIApp(transport)
114
 
        def make_request(transport, write_func):
115
 
            request = FakeRequest(transport, write_func)
116
 
            self.request = request
117
 
            return request
118
 
        wsgi_app.make_request = make_request
 
121
        wsgi_app.make_request = self._fake_make_request
119
122
        fake_input = StringIO('fake request')
120
123
        environ = self.build_environ({
121
124
            'REQUEST_METHOD': 'POST',
138
141
            fake_app, prefix='/abc/', path_var='FOO')
139
142
        wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
140
143
        self.assertEqual(['xyz'], calls)
141
 
       
 
144
 
142
145
    def test_relpath_setter_bad_path_prefix(self):
143
146
        # wsgi.RelpathSetter will reject paths with that don't match the prefix
144
147
        # with a 404.  This is probably a sign of misconfiguration; a server
151
154
            {'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
152
155
        self.read_response(iterable)
153
156
        self.assertTrue(self.status.startswith('404'))
154
 
        
 
157
 
155
158
    def test_relpath_setter_bad_path_suffix(self):
156
159
        # Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
157
160
        # will reject paths with that don't match the suffix '.bzr/smart' with a
165
168
            {'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
166
169
        self.read_response(iterable)
167
170
        self.assertTrue(self.status.startswith('404'))
168
 
        
 
171
 
169
172
    def test_make_app(self):
170
173
        # The make_app helper constructs a SmartWSGIApp wrapped in a
171
174
        # RelpathSetter.
175
178
            path_var='a path_var')
176
179
        self.assertIsInstance(app, wsgi.RelpathSetter)
177
180
        self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
178
 
        self.assertEndsWith(app.app.backing_transport.base, 'a%20root/')
179
 
        self.assertEqual(app.prefix, 'a prefix')
 
181
        self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
 
182
        backing_transport = app.app.backing_transport
 
183
        chroot_backing_transport = backing_transport.server.backing_transport
 
184
        self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
 
185
        self.assertEqual(app.app.root_client_path, 'a prefix')
180
186
        self.assertEqual(app.path_var, 'a path_var')
181
187
 
182
188
    def test_incomplete_request(self):
183
189
        transport = FakeTransport()
184
190
        wsgi_app = wsgi.SmartWSGIApp(transport)
185
 
        def make_request(transport, write_func):
 
191
        def make_request(transport, write_func, bytes, root_client_path):
186
192
            request = IncompleteRequest(transport, write_func)
 
193
            request.accept_bytes(bytes)
187
194
            self.request = request
188
195
            return request
189
196
        wsgi_app.make_request = make_request
200
207
        self.assertEqual('200 OK', self.status)
201
208
        self.assertEqual('error\x01incomplete request\n', response)
202
209
 
203
 
    def test_chrooting(self):
204
 
        # Show that requests that try to access things outside of the base
205
 
        # really will get intercepted by the ChrootTransportDecorator.
 
210
    def test_protocol_version_detection_one(self):
 
211
        # SmartWSGIApp detects requests that don't start with
 
212
        # REQUEST_VERSION_TWO as version one.
206
213
        transport = memory.MemoryTransport()
207
 
        transport.mkdir('foo')
208
 
        transport.put_bytes('foo/bar', 'this is foo/bar')
209
 
        wsgi_app = wsgi.SmartWSGIApp(transport.clone('foo'))
 
214
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
215
        fake_input = StringIO('hello\n')
 
216
        environ = self.build_environ({
 
217
            'REQUEST_METHOD': 'POST',
 
218
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
219
            'wsgi.input': fake_input,
 
220
            'bzrlib.relpath': 'foo',
 
221
        })
 
222
        iterable = wsgi_app(environ, self.start_response)
 
223
        response = self.read_response(iterable)
 
224
        self.assertEqual('200 OK', self.status)
 
225
        # Expect a version 1-encoded response.
 
226
        self.assertEqual('ok\x012\n', response)
210
227
 
211
 
        smart_request = StringIO('mkdir\x01/bad file\x01\n0\ndone\n')
 
228
    def test_protocol_version_detection_two(self):
 
229
        # SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
 
230
        # as version two.
 
231
        transport = memory.MemoryTransport()
 
232
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
233
        fake_input = StringIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
212
234
        environ = self.build_environ({
213
235
            'REQUEST_METHOD': 'POST',
214
 
            'CONTENT_LENGTH': len(smart_request.getvalue()),
215
 
            'wsgi.input': smart_request,
216
 
            'bzrlib.relpath': '.',
 
236
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
237
            'wsgi.input': fake_input,
 
238
            'bzrlib.relpath': 'foo',
217
239
        })
218
240
        iterable = wsgi_app(environ, self.start_response)
219
241
        response = self.read_response(iterable)
220
242
        self.assertEqual('200 OK', self.status)
 
243
        # Expect a version 2-encoded response.
221
244
        self.assertEqual(
222
 
            "error\x01Path '/bad file' is not a child of "
223
 
            "path 'memory:///foo/'\n",
224
 
            response)
 
245
            protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
 
246
 
 
247
 
 
248
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
 
249
 
 
250
    def make_hpss_wsgi_request(self, wsgi_relpath, *args):
 
251
        write_buf = StringIO()
 
252
        request_medium = medium.SmartSimplePipesClientMedium(
 
253
            None, write_buf, 'fake:' + wsgi_relpath)
 
254
        request_encoder = protocol.ProtocolThreeRequester(
 
255
            request_medium.get_request())
 
256
        request_encoder.call(*args)
 
257
        write_buf.seek(0)
 
258
        environ = self.build_environ({
 
259
            'REQUEST_METHOD': 'POST',
 
260
            'CONTENT_LENGTH': len(write_buf.getvalue()),
 
261
            'wsgi.input': write_buf,
 
262
            'bzrlib.relpath': wsgi_relpath,
 
263
        })
 
264
        return environ
 
265
 
 
266
    def test_jail_root(self):
 
267
        """The WSGI HPSS glue allows access to the whole WSGI backing
 
268
        transport, regardless of which HTTP path the request was delivered
 
269
        to.
 
270
        """
 
271
        # make a branch in a shared repo
 
272
        self.make_repository('repo', shared=True)
 
273
        branch = self.make_bzrdir('repo/branch').create_branch()
 
274
        # serve the repo via bzr+http WSGI
 
275
        wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
 
276
        # send a request to /repo/branch that will have to access /repo.
 
277
        environ = self.make_hpss_wsgi_request(
 
278
            '/repo/branch', 'BzrDir.open_branchV2', '.')
 
279
        iterable = wsgi_app(environ, self.start_response)
 
280
        response_bytes = self.read_response(iterable)
 
281
        self.assertEqual('200 OK', self.status)
 
282
        # expect a successful response, rather than a jail break error
 
283
        from bzrlib.tests.test_smart_transport import LoggingMessageHandler
 
284
        message_handler = LoggingMessageHandler()
 
285
        decoder = protocol.ProtocolThreeDecoder(
 
286
            message_handler, expect_version_marker=True)
 
287
        decoder.accept_bytes(response_bytes)
 
288
        self.assertTrue(
 
289
            ('structure', ('branch', branch._format.network_name()))
 
290
            in message_handler.event_log)
225
291
 
226
292
 
227
293
class FakeRequest(object):
228
 
    
 
294
 
229
295
    def __init__(self, transport, write_func):
230
296
        self.transport = transport
231
297
        self.write_func = write_func