~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_wsgi.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for WSGI application"""
 
18
 
 
19
from cStringIO import StringIO
 
20
 
 
21
from bzrlib import tests
 
22
from bzrlib.smart import protocol
 
23
from bzrlib.transport.http import wsgi
 
24
from bzrlib.transport import chroot, memory
 
25
 
 
26
 
 
27
class TestWSGI(tests.TestCase):
 
28
 
 
29
    def setUp(self):
 
30
        tests.TestCase.setUp(self)
 
31
        self.status = None
 
32
        self.headers = None
 
33
 
 
34
    def build_environ(self, updates=None):
 
35
        """Builds an environ dict with all fields required by PEP 333.
 
36
 
 
37
        :param updates: a dict to that will be incorporated into the returned
 
38
            dict using dict.update(updates).
 
39
        """
 
40
        environ = {
 
41
            # Required CGI variables
 
42
            'REQUEST_METHOD': 'GET',
 
43
            'SCRIPT_NAME': '/script/name/',
 
44
            'PATH_INFO': 'path/info',
 
45
            'SERVER_NAME': 'test',
 
46
            'SERVER_PORT': '9999',
 
47
            'SERVER_PROTOCOL': 'HTTP/1.0',
 
48
 
 
49
            # Required WSGI variables
 
50
            'wsgi.version': (1,0),
 
51
            'wsgi.url_scheme': 'http',
 
52
            'wsgi.input': StringIO(''),
 
53
            'wsgi.errors': StringIO(),
 
54
            'wsgi.multithread': False,
 
55
            'wsgi.multiprocess': False,
 
56
            'wsgi.run_once': True,
 
57
        }
 
58
        if updates is not None:
 
59
            environ.update(updates)
 
60
        return environ
 
61
 
 
62
    def read_response(self, iterable):
 
63
        response = ''
 
64
        for string in iterable:
 
65
            response += string
 
66
        return response
 
67
 
 
68
    def start_response(self, status, headers):
 
69
        self.status = status
 
70
        self.headers = headers
 
71
 
 
72
    def test_construct(self):
 
73
        app = wsgi.SmartWSGIApp(FakeTransport())
 
74
        self.assertIsInstance(
 
75
            app.backing_transport, chroot.ChrootTransport)
 
76
 
 
77
    def test_http_get_rejected(self):
 
78
        # GET requests are rejected.
 
79
        app = wsgi.SmartWSGIApp(FakeTransport())
 
80
        environ = self.build_environ({'REQUEST_METHOD': 'GET'})
 
81
        iterable = app(environ, self.start_response)
 
82
        self.read_response(iterable)
 
83
        self.assertEqual('405 Method not allowed', self.status)
 
84
        self.assertTrue(('Allow', 'POST') in self.headers)
 
85
 
 
86
    def _fake_make_request(self, transport, write_func, bytes, rcp):
 
87
        request = FakeRequest(transport, write_func)
 
88
        request.accept_bytes(bytes)
 
89
        self.request = request
 
90
        return request
 
91
 
 
92
    def test_smart_wsgi_app_uses_given_relpath(self):
 
93
        # The SmartWSGIApp should use the "bzrlib.relpath" field from the
 
94
        # WSGI environ to clone from its backing transport to get a specific
 
95
        # transport for this request.
 
96
        transport = FakeTransport()
 
97
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
98
        wsgi_app.backing_transport = transport
 
99
        wsgi_app.make_request = self._fake_make_request
 
100
        fake_input = StringIO('fake request')
 
101
        environ = self.build_environ({
 
102
            'REQUEST_METHOD': 'POST',
 
103
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
104
            'wsgi.input': fake_input,
 
105
            'bzrlib.relpath': 'foo/bar',
 
106
        })
 
107
        iterable = wsgi_app(environ, self.start_response)
 
108
        response = self.read_response(iterable)
 
109
        self.assertEqual([('clone', 'foo/bar/')] , transport.calls)
 
110
 
 
111
    def test_smart_wsgi_app_request_and_response(self):
 
112
        # SmartWSGIApp reads the smart request from the 'wsgi.input' file-like
 
113
        # object in the environ dict, and returns the response via the iterable
 
114
        # returned to the WSGI handler.
 
115
        transport = memory.MemoryTransport()
 
116
        transport.put_bytes('foo', 'some bytes')
 
117
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
118
        wsgi_app.make_request = self._fake_make_request
 
119
        fake_input = StringIO('fake request')
 
120
        environ = self.build_environ({
 
121
            'REQUEST_METHOD': 'POST',
 
122
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
123
            'wsgi.input': fake_input,
 
124
            'bzrlib.relpath': 'foo',
 
125
        })
 
126
        iterable = wsgi_app(environ, self.start_response)
 
127
        response = self.read_response(iterable)
 
128
        self.assertEqual('200 OK', self.status)
 
129
        self.assertEqual('got bytes: fake request', response)
 
130
 
 
131
    def test_relpath_setter(self):
 
132
        # wsgi.RelpathSetter is WSGI "middleware" to set the 'bzrlib.relpath'
 
133
        # variable.
 
134
        calls = []
 
135
        def fake_app(environ, start_response):
 
136
            calls.append(environ['bzrlib.relpath'])
 
137
        wrapped_app = wsgi.RelpathSetter(
 
138
            fake_app, prefix='/abc/', path_var='FOO')
 
139
        wrapped_app({'FOO': '/abc/xyz/.bzr/smart'}, None)
 
140
        self.assertEqual(['xyz'], calls)
 
141
 
 
142
    def test_relpath_setter_bad_path_prefix(self):
 
143
        # wsgi.RelpathSetter will reject paths with that don't match the prefix
 
144
        # with a 404.  This is probably a sign of misconfiguration; a server
 
145
        # shouldn't ever be invoking our WSGI application with bad paths.
 
146
        def fake_app(environ, start_response):
 
147
            self.fail('The app should never be called when the path is wrong')
 
148
        wrapped_app = wsgi.RelpathSetter(
 
149
            fake_app, prefix='/abc/', path_var='FOO')
 
150
        iterable = wrapped_app(
 
151
            {'FOO': 'AAA/abc/xyz/.bzr/smart'}, self.start_response)
 
152
        self.read_response(iterable)
 
153
        self.assertTrue(self.status.startswith('404'))
 
154
 
 
155
    def test_relpath_setter_bad_path_suffix(self):
 
156
        # Similar to test_relpath_setter_bad_path_prefix: wsgi.RelpathSetter
 
157
        # will reject paths with that don't match the suffix '.bzr/smart' with a
 
158
        # 404 as well.  Again, this shouldn't be seen by our WSGI application if
 
159
        # the server is configured correctly.
 
160
        def fake_app(environ, start_response):
 
161
            self.fail('The app should never be called when the path is wrong')
 
162
        wrapped_app = wsgi.RelpathSetter(
 
163
            fake_app, prefix='/abc/', path_var='FOO')
 
164
        iterable = wrapped_app(
 
165
            {'FOO': '/abc/xyz/.bzr/AAA'}, self.start_response)
 
166
        self.read_response(iterable)
 
167
        self.assertTrue(self.status.startswith('404'))
 
168
 
 
169
    def test_make_app(self):
 
170
        # The make_app helper constructs a SmartWSGIApp wrapped in a
 
171
        # RelpathSetter.
 
172
        app = wsgi.make_app(
 
173
            root='a root',
 
174
            prefix='a prefix',
 
175
            path_var='a path_var')
 
176
        self.assertIsInstance(app, wsgi.RelpathSetter)
 
177
        self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
 
178
        self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
 
179
        backing_transport = app.app.backing_transport
 
180
        chroot_backing_transport = backing_transport.server.backing_transport
 
181
        self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
 
182
        self.assertEqual(app.app.root_client_path, 'a prefix')
 
183
        self.assertEqual(app.path_var, 'a path_var')
 
184
 
 
185
    def test_incomplete_request(self):
 
186
        transport = FakeTransport()
 
187
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
188
        def make_request(transport, write_func, bytes, root_client_path):
 
189
            request = IncompleteRequest(transport, write_func)
 
190
            request.accept_bytes(bytes)
 
191
            self.request = request
 
192
            return request
 
193
        wsgi_app.make_request = make_request
 
194
 
 
195
        fake_input = StringIO('incomplete request')
 
196
        environ = self.build_environ({
 
197
            'REQUEST_METHOD': 'POST',
 
198
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
199
            'wsgi.input': fake_input,
 
200
            'bzrlib.relpath': 'foo/bar',
 
201
        })
 
202
        iterable = wsgi_app(environ, self.start_response)
 
203
        response = self.read_response(iterable)
 
204
        self.assertEqual('200 OK', self.status)
 
205
        self.assertEqual('error\x01incomplete request\n', response)
 
206
 
 
207
    def test_protocol_version_detection_one(self):
 
208
        # SmartWSGIApp detects requests that don't start with
 
209
        # REQUEST_VERSION_TWO as version one.
 
210
        transport = memory.MemoryTransport()
 
211
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
212
        fake_input = StringIO('hello\n')
 
213
        environ = self.build_environ({
 
214
            'REQUEST_METHOD': 'POST',
 
215
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
216
            'wsgi.input': fake_input,
 
217
            'bzrlib.relpath': 'foo',
 
218
        })
 
219
        iterable = wsgi_app(environ, self.start_response)
 
220
        response = self.read_response(iterable)
 
221
        self.assertEqual('200 OK', self.status)
 
222
        # Expect a version 1-encoded response.
 
223
        self.assertEqual('ok\x012\n', response)
 
224
 
 
225
    def test_protocol_version_detection_two(self):
 
226
        # SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
 
227
        # as version two.
 
228
        transport = memory.MemoryTransport()
 
229
        wsgi_app = wsgi.SmartWSGIApp(transport)
 
230
        fake_input = StringIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
 
231
        environ = self.build_environ({
 
232
            'REQUEST_METHOD': 'POST',
 
233
            'CONTENT_LENGTH': len(fake_input.getvalue()),
 
234
            'wsgi.input': fake_input,
 
235
            'bzrlib.relpath': 'foo',
 
236
        })
 
237
        iterable = wsgi_app(environ, self.start_response)
 
238
        response = self.read_response(iterable)
 
239
        self.assertEqual('200 OK', self.status)
 
240
        # Expect a version 2-encoded response.
 
241
        self.assertEqual(
 
242
            protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
 
243
 
 
244
 
 
245
class FakeRequest(object):
 
246
 
 
247
    def __init__(self, transport, write_func):
 
248
        self.transport = transport
 
249
        self.write_func = write_func
 
250
        self.accepted_bytes = ''
 
251
 
 
252
    def accept_bytes(self, bytes):
 
253
        self.accepted_bytes = bytes
 
254
        self.write_func('got bytes: ' + bytes)
 
255
 
 
256
    def next_read_size(self):
 
257
        return 0
 
258
 
 
259
 
 
260
class FakeTransport(object):
 
261
 
 
262
    def __init__(self):
 
263
        self.calls = []
 
264
        self.base = 'fake:///'
 
265
 
 
266
    def abspath(self, relpath):
 
267
        return 'fake:///' + relpath
 
268
 
 
269
    def clone(self, relpath):
 
270
        self.calls.append(('clone', relpath))
 
271
        return self
 
272
 
 
273
 
 
274
class IncompleteRequest(FakeRequest):
 
275
    """A request-like object that always expects to read more bytes."""
 
276
 
 
277
    def next_read_size(self):
 
278
        # this request always asks for more
 
279
        return 1
 
280