1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests from HTTP response parsing.
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
89
78
class TestHTTPConnection(tests.TestCase):
91
80
def test_cleanup_pipe(self):
103
92
# Now, get the response
104
93
resp = conn.getresponse()
105
94
# Read part of the response
106
self.assertEqual('0123456789\n', resp.read(11))
95
self.assertEquals('0123456789\n', resp.read(11))
107
96
# Override the thresold to force the warning emission
108
97
conn._range_warning_thresold = 6 # There are 7 bytes pending
109
98
conn.cleanup_pipe()
110
self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
99
self.assertContainsRe(self._get_log(keep_log_file=True),
100
'Got a 200 response when asking')
113
103
class TestRangeFileMixin(object):
122
112
def test_can_read_at_first_access(self):
123
113
"""Test that the just created file can be read."""
124
self.assertEqual(self.alpha, self._file.read())
114
self.assertEquals(self.alpha, self._file.read())
126
116
def test_seek_read(self):
127
117
"""Test seek/read inside the range."""
129
119
start = self.first_range_start
130
120
# Before any use, tell() should be at the range start
131
self.assertEqual(start, f.tell())
121
self.assertEquals(start, f.tell())
132
122
cur = start # For an overall offset assertion
133
123
f.seek(start + 3)
135
self.assertEqual('def', f.read(3))
125
self.assertEquals('def', f.read(3))
136
126
cur += len('def')
139
self.assertEqual('klmn', f.read(4))
129
self.assertEquals('klmn', f.read(4))
140
130
cur += len('klmn')
141
131
# read(0) in the middle of a range
142
self.assertEqual('', f.read(0))
132
self.assertEquals('', f.read(0))
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
136
self.assertEquals(here, f.tell())
137
self.assertEquals(cur, f.tell())
149
139
def test_read_zero(self):
151
self.assertEqual('', f.read(0))
141
start = self.first_range_start
142
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
144
self.assertEquals('', f.read(0))
155
146
def test_seek_at_range_end(self):
159
150
def test_read_at_range_end(self):
160
151
"""Test read behaviour at range end."""
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
153
self.assertEquals(self.alpha, f.read())
154
self.assertEquals('', f.read(0))
164
155
self.assertRaises(errors.InvalidRange, f.read, 1)
166
157
def test_unbounded_read_after_seek(self):
169
160
# Should not cross ranges
170
self.assertEqual('yz', f.read())
161
self.assertEquals('yz', f.read())
172
163
def test_seek_backwards(self):
210
self.assertEqual('yz', f.read())
201
self.assertEquals('yz', f.read())
213
204
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
221
def test_read_at_range_end(self):
231
222
"""Test read behaviour at range end."""
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
224
self.assertEquals(self.alpha, f.read())
225
self.assertEquals('', f.read(0))
226
self.assertEquals('', f.read(1))
238
228
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
239
229
"""Test a RangeFile for a whole file whose size is known."""
263
253
f._pos = 0 # Force an invalid pos
264
254
self.assertRaises(errors.InvalidRange, f.read, 2)
267
256
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
268
257
"""Test a RangeFile for multiple ranges.
277
266
fact) in real uses but may lead to hard to track bugs.
280
# The following is used to represent the boundary paramter defined
281
# in HTTP response headers and the boundary lines that separate
284
boundary = "separation"
287
270
super(TestRangeFileMultipleRanges, self).setUp()
289
boundary = self.boundary
272
boundary = 'separation'
292
275
self.first_range_start = 25
298
281
content += self._multipart_byterange(part, start, boundary,
301
content += self._boundary_line()
284
content += self._boundary_line(boundary)
303
286
self._file = response.RangeFile('Multiple_ranges_file',
304
287
StringIO(content))
305
self.set_file_boundary()
307
def _boundary_line(self):
308
"""Helper to build the formatted boundary line."""
309
return '--' + self.boundary + '\r\n'
311
def set_file_boundary(self):
312
288
# Ranges are set by decoding the range headers, the RangeFile user is
313
289
# supposed to call the following before using seek or read since it
314
290
# requires knowing the *response* headers (in that case the boundary
315
291
# which is part of the Content-Type header).
316
self._file.set_boundary(self.boundary)
292
self._file.set_boundary(boundary)
294
def _boundary_line(self, boundary):
295
"""Helper to build the formatted boundary line."""
296
return '--' + boundary + '\r\n'
318
298
def _multipart_byterange(self, data, offset, boundary, file_size='*'):
319
299
"""Encode a part of a file as a multipart/byterange MIME type.
331
311
:return: a string containing the data encoded as it will appear in the
332
312
HTTP response body.
334
bline = self._boundary_line()
314
bline = self._boundary_line(boundary)
335
315
# Each range begins with a boundary line
337
317
# A range is described by a set of headers, but only 'Content-Range' is
348
328
def test_read_all_ranges(self):
350
self.assertEqual(self.alpha, f.read()) # Read first range
330
self.assertEquals(self.alpha, f.read()) # Read first range
351
331
f.seek(100) # Trigger the second range recognition
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
332
self.assertEquals(self.alpha, f.read()) # Read second range
333
self.assertEquals(126, f.tell())
354
334
f.seek(126) # Start of third range which is also the current pos !
355
self.assertEqual('A', f.read(1))
335
self.assertEquals('A', f.read(1))
357
self.assertEqual('LMN', f.read(3))
337
self.assertEquals('LMN', f.read(3))
359
339
def test_seek_from_end(self):
360
340
"""See TestRangeFileMixin.test_seek_from_end."""
367
self.assertEqual('yz', f.read())
347
self.assertEquals('yz', f.read())
368
348
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
370
350
def test_seek_into_void(self):
382
362
def test_seek_across_ranges(self):
364
start = self.first_range_start
384
365
f.seek(126) # skip the two first ranges
385
self.assertEqual('AB', f.read(2))
366
self.assertEquals('AB', f.read(2))
387
368
def test_checked_read_dont_overflow_buffers(self):
370
start = self.first_range_start
389
371
# We force a very low value to exercise all code paths in _checked_read
390
372
f._discarded_buf_size = 8
391
373
f.seek(126) # skip the two first ranges
392
self.assertEqual('AB', f.read(2))
374
self.assertEquals('AB', f.read(2))
394
376
def test_seek_twice_between_ranges(self):
408
390
def test_read_at_range_end(self):
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
392
self.assertEquals(self.alpha, f.read())
393
self.assertEquals(self.alpha, f.read())
394
self.assertEquals(self.alpha.upper(), f.read())
413
395
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
416
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
417
"""Perform the same tests as TestRangeFileMultipleRanges, but uses
418
an angle-bracket quoted boundary string like IIS 6.0 and 7.0
419
(but not IIS 5, which breaks the RFC in a different way
420
by using square brackets, not angle brackets)
422
This reveals a bug caused by
424
- The bad implementation of RFC 822 unquoting in Python (angles are not
425
quotes), coupled with
427
- The bad implementation of RFC 2046 in IIS (angles are not permitted chars
431
# The boundary as it appears in boundary lines
432
# IIS 6 and 7 use this value
433
_boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
434
boundary = '<' + _boundary_trimmed + '>'
436
def set_file_boundary(self):
437
# Emulate broken rfc822.unquote() here by removing angles
438
self._file.set_boundary(self._boundary_trimmed)
441
398
class TestRangeFileVarious(tests.TestCase):
442
399
"""Tests RangeFile aspects not covered elsewhere."""
458
415
def ok(expected, header_value):
459
416
f.set_range_from_header(header_value)
460
417
# Slightly peek under the covers to get the size
461
self.assertEqual(expected, (f.tell(), f._size))
418
self.assertEquals(expected, (f.tell(), f._size))
463
420
ok((1, 10), 'bytes 1-10/11')
464
421
ok((1, 10), 'bytes 1-10/*')
811
super(TestRangeFileSizeReadLimited, self).setUp()
812
768
# create a test datablock larger than _max_read_size.
813
769
chunk_size = response.RangeFile._max_read_size
814
770
test_pattern = '0123456789ABCDEF'