1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests from HTTP response parsing.
62
59
def makefile(self, mode='r', bufsize=None):
63
60
return self.readfile
66
62
class FakeHTTPConnection(_urllib2_wrappers.HTTPConnection):
68
64
def __init__(self, sock):
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
89
74
class TestHTTPConnection(tests.TestCase):
91
76
def test_cleanup_pipe(self):
103
88
# Now, get the response
104
89
resp = conn.getresponse()
105
90
# Read part of the response
106
self.assertEqual('0123456789\n', resp.read(11))
91
self.assertEquals('0123456789\n', resp.read(11))
107
92
# Override the thresold to force the warning emission
108
93
conn._range_warning_thresold = 6 # There are 7 bytes pending
109
94
conn.cleanup_pipe()
110
self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
95
self.assertContainsRe(self._get_log(keep_log_file=True),
96
'Got a 200 response when asking')
113
99
class TestRangeFileMixin(object):
122
108
def test_can_read_at_first_access(self):
123
109
"""Test that the just created file can be read."""
124
self.assertEqual(self.alpha, self._file.read())
110
self.assertEquals(self.alpha, self._file.read())
126
112
def test_seek_read(self):
127
113
"""Test seek/read inside the range."""
129
115
start = self.first_range_start
130
116
# Before any use, tell() should be at the range start
131
self.assertEqual(start, f.tell())
117
self.assertEquals(start, f.tell())
132
118
cur = start # For an overall offset assertion
133
119
f.seek(start + 3)
135
self.assertEqual('def', f.read(3))
121
self.assertEquals('def', f.read(3))
136
122
cur += len('def')
139
self.assertEqual('klmn', f.read(4))
125
self.assertEquals('klmn', f.read(4))
140
126
cur += len('klmn')
141
127
# read(0) in the middle of a range
142
self.assertEqual('', f.read(0))
128
self.assertEquals('', f.read(0))
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
132
self.assertEquals(here, f.tell())
133
self.assertEquals(cur, f.tell())
149
135
def test_read_zero(self):
151
self.assertEqual('', f.read(0))
137
start = self.first_range_start
138
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
140
self.assertEquals('', f.read(0))
155
142
def test_seek_at_range_end(self):
159
146
def test_read_at_range_end(self):
160
147
"""Test read behaviour at range end."""
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
149
self.assertEquals(self.alpha, f.read())
150
self.assertEquals('', f.read(0))
164
151
self.assertRaises(errors.InvalidRange, f.read, 1)
166
153
def test_unbounded_read_after_seek(self):
169
156
# Should not cross ranges
170
self.assertEqual('yz', f.read())
157
self.assertEquals('yz', f.read())
172
159
def test_seek_backwards(self):
210
self.assertEqual('yz', f.read())
197
self.assertEquals('yz', f.read())
213
200
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
217
def test_read_at_range_end(self):
231
218
"""Test read behaviour at range end."""
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
220
self.assertEquals(self.alpha, f.read())
221
self.assertEquals('', f.read(0))
222
self.assertEquals('', f.read(1))
238
224
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
239
225
"""Test a RangeFile for a whole file whose size is known."""
263
249
f._pos = 0 # Force an invalid pos
264
250
self.assertRaises(errors.InvalidRange, f.read, 2)
267
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
252
class TestRangeFilMultipleRanges(tests.TestCase, TestRangeFileMixin):
268
253
"""Test a RangeFile for multiple ranges.
270
255
The RangeFile used for the tests contains three ranges:
277
262
fact) in real uses but may lead to hard to track bugs.
280
# The following is used to represent the boundary paramter defined
281
# in HTTP response headers and the boundary lines that separate
284
boundary = "separation"
287
super(TestRangeFileMultipleRanges, self).setUp()
266
super(TestRangeFilMultipleRanges, self).setUp()
289
boundary = self.boundary
268
boundary = 'separation'
292
271
self.first_range_start = 25
298
277
content += self._multipart_byterange(part, start, boundary,
301
content += self._boundary_line()
280
content += self._boundary_line(boundary)
303
282
self._file = response.RangeFile('Multiple_ranges_file',
304
283
StringIO(content))
305
self.set_file_boundary()
307
def _boundary_line(self):
308
"""Helper to build the formatted boundary line."""
309
return '--' + self.boundary + '\r\n'
311
def set_file_boundary(self):
312
284
# Ranges are set by decoding the range headers, the RangeFile user is
313
285
# supposed to call the following before using seek or read since it
314
286
# requires knowing the *response* headers (in that case the boundary
315
287
# which is part of the Content-Type header).
316
self._file.set_boundary(self.boundary)
288
self._file.set_boundary(boundary)
290
def _boundary_line(self, boundary):
291
"""Helper to build the formatted boundary line."""
292
return '--' + boundary + '\r\n'
318
294
def _multipart_byterange(self, data, offset, boundary, file_size='*'):
319
295
"""Encode a part of a file as a multipart/byterange MIME type.
331
307
:return: a string containing the data encoded as it will appear in the
332
308
HTTP response body.
334
bline = self._boundary_line()
310
bline = self._boundary_line(boundary)
335
311
# Each range begins with a boundary line
337
313
# A range is described by a set of headers, but only 'Content-Range' is
348
324
def test_read_all_ranges(self):
350
self.assertEqual(self.alpha, f.read()) # Read first range
326
self.assertEquals(self.alpha, f.read()) # Read first range
351
327
f.seek(100) # Trigger the second range recognition
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
328
self.assertEquals(self.alpha, f.read()) # Read second range
329
self.assertEquals(126, f.tell())
354
330
f.seek(126) # Start of third range which is also the current pos !
355
self.assertEqual('A', f.read(1))
331
self.assertEquals('A', f.read(1))
357
self.assertEqual('LMN', f.read(3))
333
self.assertEquals('LMN', f.read(3))
359
335
def test_seek_from_end(self):
360
336
"""See TestRangeFileMixin.test_seek_from_end."""
367
self.assertEqual('yz', f.read())
343
self.assertEquals('yz', f.read())
368
344
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
370
346
def test_seek_into_void(self):
382
358
def test_seek_across_ranges(self):
384
f.seek(126) # skip the two first ranges
385
self.assertEqual('AB', f.read(2))
387
def test_checked_read_dont_overflow_buffers(self):
389
# We force a very low value to exercise all code paths in _checked_read
390
f._discarded_buf_size = 8
391
f.seek(126) # skip the two first ranges
392
self.assertEqual('AB', f.read(2))
360
start = self.first_range_start
361
f.seek(126) # skip the two first ranges
362
self.assertEquals('AB', f.read(2))
394
364
def test_seek_twice_between_ranges(self):
408
378
def test_read_at_range_end(self):
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
380
self.assertEquals(self.alpha, f.read())
381
self.assertEquals(self.alpha, f.read())
382
self.assertEquals(self.alpha.upper(), f.read())
413
383
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
416
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
417
"""Perform the same tests as TestRangeFileMultipleRanges, but uses
418
an angle-bracket quoted boundary string like IIS 6.0 and 7.0
419
(but not IIS 5, which breaks the RFC in a different way
420
by using square brackets, not angle brackets)
422
This reveals a bug caused by
424
- The bad implementation of RFC 822 unquoting in Python (angles are not
425
quotes), coupled with
427
- The bad implementation of RFC 2046 in IIS (angles are not permitted chars
431
# The boundary as it appears in boundary lines
432
# IIS 6 and 7 use this value
433
_boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
434
boundary = '<' + _boundary_trimmed + '>'
436
def set_file_boundary(self):
437
# Emulate broken rfc822.unquote() here by removing angles
438
self._file.set_boundary(self._boundary_trimmed)
441
386
class TestRangeFileVarious(tests.TestCase):
442
387
"""Tests RangeFile aspects not covered elsewhere."""
458
403
def ok(expected, header_value):
459
404
f.set_range_from_header(header_value)
460
405
# Slightly peek under the covers to get the size
461
self.assertEqual(expected, (f.tell(), f._size))
406
self.assertEquals(expected, (f.tell(), f._size))
463
408
ok((1, 10), 'bytes 1-10/11')
464
409
ok((1, 10), 'bytes 1-10/*')
800
745
out.read() # Read the whole range
801
746
# Fail to find the boundary line
802
747
self.assertRaises(errors.InvalidHttpResponse, out.seek, 1, 1)
805
class TestRangeFileSizeReadLimited(tests.TestCase):
806
"""Test RangeFile _max_read_size functionality which limits the size of
807
read blocks to prevent MemoryError messages in socket.recv.
811
super(TestRangeFileSizeReadLimited, self).setUp()
812
# create a test datablock larger than _max_read_size.
813
chunk_size = response.RangeFile._max_read_size
814
test_pattern = '0123456789ABCDEF'
815
self.test_data = test_pattern * (3 * chunk_size / len(test_pattern))
816
self.test_data_len = len(self.test_data)
818
def test_max_read_size(self):
819
"""Read data in blocks and verify that the reads are not larger than
820
the maximum read size.
822
# retrieve data in large blocks from response.RangeFile object
823
mock_read_file = FakeReadFile(self.test_data)
824
range_file = response.RangeFile('test_max_read_size', mock_read_file)
825
response_data = range_file.read(self.test_data_len)
827
# verify read size was equal to the maximum read size
828
self.assertTrue(mock_read_file.get_max_read_size() > 0)
829
self.assertEqual(mock_read_file.get_max_read_size(),
830
response.RangeFile._max_read_size)
831
self.assertEqual(mock_read_file.get_read_count(), 3)
833
# report error if the data wasn't equal (we only report the size due
834
# to the length of the data)
835
if response_data != self.test_data:
836
message = "Data not equal. Expected %d bytes, received %d."
837
self.fail(message % (len(response_data), self.test_data_len))