13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests for WSGI application"""
19
19
from cStringIO import StringIO
21
21
from bzrlib import tests
22
from bzrlib.smart import medium, protocol
23
22
from bzrlib.transport.http import wsgi
24
23
from bzrlib.transport import chroot, memory
27
class WSGITestMixin(object):
26
class TestWSGI(tests.TestCase):
29
tests.TestCase.setUp(self)
29
33
def build_environ(self, updates=None):
30
34
"""Builds an environ dict with all fields required by PEP 333.
32
36
:param updates: a dict to that will be incorporated into the returned
33
37
dict using dict.update(updates).
85
81
self.read_response(iterable)
86
82
self.assertEqual('405 Method not allowed', self.status)
87
83
self.assertTrue(('Allow', 'POST') in self.headers)
89
def _fake_make_request(self, transport, write_func, bytes, rcp):
90
request = FakeRequest(transport, write_func)
91
request.accept_bytes(bytes)
92
self.request = request
95
85
def test_smart_wsgi_app_uses_given_relpath(self):
96
86
# The SmartWSGIApp should use the "bzrlib.relpath" field from the
97
# WSGI environ to clone from its backing transport to get a specific
98
# transport for this request.
87
# WSGI environ to construct the transport for this request, by cloning
88
# its base transport with the given relpath.
99
89
transport = FakeTransport()
100
90
wsgi_app = wsgi.SmartWSGIApp(transport)
101
wsgi_app.backing_transport = transport
102
wsgi_app.make_request = self._fake_make_request
91
def make_request(transport, write_func):
92
request = FakeRequest(transport, write_func)
93
self.request = request
95
wsgi_app.make_request = make_request
103
96
fake_input = StringIO('fake request')
104
97
environ = self.build_environ({
105
98
'REQUEST_METHOD': 'POST',
178
175
path_var='a path_var')
179
176
self.assertIsInstance(app, wsgi.RelpathSetter)
180
177
self.assertIsInstance(app.app, wsgi.SmartWSGIApp)
181
self.assertStartsWith(app.app.backing_transport.base, 'chroot-')
182
backing_transport = app.app.backing_transport
183
chroot_backing_transport = backing_transport.server.backing_transport
184
self.assertEndsWith(chroot_backing_transport.base, 'a%20root/')
185
self.assertEqual(app.app.root_client_path, 'a prefix')
178
self.assertEndsWith(app.app.backing_transport.base, 'a%20root/')
179
self.assertEqual(app.prefix, 'a prefix')
186
180
self.assertEqual(app.path_var, 'a path_var')
188
182
def test_incomplete_request(self):
189
183
transport = FakeTransport()
190
184
wsgi_app = wsgi.SmartWSGIApp(transport)
191
def make_request(transport, write_func, bytes, root_client_path):
185
def make_request(transport, write_func):
192
186
request = IncompleteRequest(transport, write_func)
193
request.accept_bytes(bytes)
194
187
self.request = request
196
189
wsgi_app.make_request = make_request
207
200
self.assertEqual('200 OK', self.status)
208
201
self.assertEqual('error\x01incomplete request\n', response)
210
def test_protocol_version_detection_one(self):
211
# SmartWSGIApp detects requests that don't start with
212
# REQUEST_VERSION_TWO as version one.
203
def test_chrooting(self):
204
# Show that requests that try to access things outside of the base
205
# really will get intercepted by the ChrootTransportDecorator.
213
206
transport = memory.MemoryTransport()
214
wsgi_app = wsgi.SmartWSGIApp(transport)
215
fake_input = StringIO('hello\n')
216
environ = self.build_environ({
217
'REQUEST_METHOD': 'POST',
218
'CONTENT_LENGTH': len(fake_input.getvalue()),
219
'wsgi.input': fake_input,
220
'bzrlib.relpath': 'foo',
222
iterable = wsgi_app(environ, self.start_response)
223
response = self.read_response(iterable)
224
self.assertEqual('200 OK', self.status)
225
# Expect a version 1-encoded response.
226
self.assertEqual('ok\x012\n', response)
207
transport.mkdir('foo')
208
transport.put_bytes('foo/bar', 'this is foo/bar')
209
wsgi_app = wsgi.SmartWSGIApp(transport.clone('foo'))
228
def test_protocol_version_detection_two(self):
229
# SmartWSGIApp detects requests that start with REQUEST_VERSION_TWO
231
transport = memory.MemoryTransport()
232
wsgi_app = wsgi.SmartWSGIApp(transport)
233
fake_input = StringIO(protocol.REQUEST_VERSION_TWO + 'hello\n')
211
smart_request = StringIO('mkdir\x01/bad file\x01\n0\ndone\n')
234
212
environ = self.build_environ({
235
213
'REQUEST_METHOD': 'POST',
236
'CONTENT_LENGTH': len(fake_input.getvalue()),
237
'wsgi.input': fake_input,
238
'bzrlib.relpath': 'foo',
214
'CONTENT_LENGTH': len(smart_request.getvalue()),
215
'wsgi.input': smart_request,
216
'bzrlib.relpath': '.',
240
218
iterable = wsgi_app(environ, self.start_response)
241
219
response = self.read_response(iterable)
242
220
self.assertEqual('200 OK', self.status)
243
# Expect a version 2-encoded response.
244
221
self.assertEqual(
245
protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n', response)
248
class TestWSGIJail(tests.TestCaseWithMemoryTransport, WSGITestMixin):
250
def make_hpss_wsgi_request(self, wsgi_relpath, *args):
251
write_buf = StringIO()
252
request_medium = medium.SmartSimplePipesClientMedium(
253
None, write_buf, 'fake:' + wsgi_relpath)
254
request_encoder = protocol.ProtocolThreeRequester(
255
request_medium.get_request())
256
request_encoder.call(*args)
258
environ = self.build_environ({
259
'REQUEST_METHOD': 'POST',
260
'CONTENT_LENGTH': len(write_buf.getvalue()),
261
'wsgi.input': write_buf,
262
'bzrlib.relpath': wsgi_relpath,
266
def test_jail_root(self):
267
"""The WSGI HPSS glue allows access to the whole WSGI backing
268
transport, regardless of which HTTP path the request was delivered
271
# make a branch in a shared repo
272
self.make_repository('repo', shared=True)
273
branch = self.make_bzrdir('repo/branch').create_branch()
274
# serve the repo via bzr+http WSGI
275
wsgi_app = wsgi.SmartWSGIApp(self.get_transport())
276
# send a request to /repo/branch that will have to access /repo.
277
environ = self.make_hpss_wsgi_request(
278
'/repo/branch', 'BzrDir.open_branchV2', '.')
279
iterable = wsgi_app(environ, self.start_response)
280
response_bytes = self.read_response(iterable)
281
self.assertEqual('200 OK', self.status)
282
# expect a successful response, rather than a jail break error
283
from bzrlib.tests.test_smart_transport import LoggingMessageHandler
284
message_handler = LoggingMessageHandler()
285
decoder = protocol.ProtocolThreeDecoder(
286
message_handler, expect_version_marker=True)
287
decoder.accept_bytes(response_bytes)
289
('structure', ('branch', branch._format.network_name()))
290
in message_handler.event_log)
222
"error\x01Path '/bad file' is not a child of "
223
"path 'memory:///foo/'\n",
293
227
class FakeRequest(object):
295
229
def __init__(self, transport, write_func):
296
230
self.transport = transport
297
231
self.write_func = write_func