1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Tests from HTTP response parsing.
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
89
78
class TestHTTPConnection(tests.TestCase):
91
80
def test_cleanup_pipe(self):
103
92
# Now, get the response
104
93
resp = conn.getresponse()
105
94
# Read part of the response
106
self.assertEqual('0123456789\n', resp.read(11))
95
self.assertEquals('0123456789\n', resp.read(11))
107
96
# Override the thresold to force the warning emission
108
97
conn._range_warning_thresold = 6 # There are 7 bytes pending
109
98
conn.cleanup_pipe()
110
self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
99
self.assertContainsRe(self._get_log(keep_log_file=True),
100
'Got a 200 response when asking')
113
103
class TestRangeFileMixin(object):
122
112
def test_can_read_at_first_access(self):
123
113
"""Test that the just created file can be read."""
124
self.assertEqual(self.alpha, self._file.read())
114
self.assertEquals(self.alpha, self._file.read())
126
116
def test_seek_read(self):
127
117
"""Test seek/read inside the range."""
129
119
start = self.first_range_start
130
120
# Before any use, tell() should be at the range start
131
self.assertEqual(start, f.tell())
121
self.assertEquals(start, f.tell())
132
122
cur = start # For an overall offset assertion
133
123
f.seek(start + 3)
135
self.assertEqual('def', f.read(3))
125
self.assertEquals('def', f.read(3))
136
126
cur += len('def')
139
self.assertEqual('klmn', f.read(4))
129
self.assertEquals('klmn', f.read(4))
140
130
cur += len('klmn')
141
131
# read(0) in the middle of a range
142
self.assertEqual('', f.read(0))
132
self.assertEquals('', f.read(0))
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
136
self.assertEquals(here, f.tell())
137
self.assertEquals(cur, f.tell())
149
139
def test_read_zero(self):
151
self.assertEqual('', f.read(0))
141
start = self.first_range_start
142
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
144
self.assertEquals('', f.read(0))
155
146
def test_seek_at_range_end(self):
159
150
def test_read_at_range_end(self):
160
151
"""Test read behaviour at range end."""
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
153
self.assertEquals(self.alpha, f.read())
154
self.assertEquals('', f.read(0))
164
155
self.assertRaises(errors.InvalidRange, f.read, 1)
166
157
def test_unbounded_read_after_seek(self):
169
160
# Should not cross ranges
170
self.assertEqual('yz', f.read())
161
self.assertEquals('yz', f.read())
172
163
def test_seek_backwards(self):
210
self.assertEqual('yz', f.read())
201
self.assertEquals('yz', f.read())
213
204
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
221
def test_read_at_range_end(self):
231
222
"""Test read behaviour at range end."""
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
224
self.assertEquals(self.alpha, f.read())
225
self.assertEquals('', f.read(0))
226
self.assertEquals('', f.read(1))
238
229
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
348
339
def test_read_all_ranges(self):
350
self.assertEqual(self.alpha, f.read()) # Read first range
341
self.assertEquals(self.alpha, f.read()) # Read first range
351
342
f.seek(100) # Trigger the second range recognition
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
343
self.assertEquals(self.alpha, f.read()) # Read second range
344
self.assertEquals(126, f.tell())
354
345
f.seek(126) # Start of third range which is also the current pos !
355
self.assertEqual('A', f.read(1))
346
self.assertEquals('A', f.read(1))
357
self.assertEqual('LMN', f.read(3))
348
self.assertEquals('LMN', f.read(3))
359
350
def test_seek_from_end(self):
360
351
"""See TestRangeFileMixin.test_seek_from_end."""
367
self.assertEqual('yz', f.read())
358
self.assertEquals('yz', f.read())
368
359
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
370
361
def test_seek_into_void(self):
382
373
def test_seek_across_ranges(self):
375
start = self.first_range_start
384
376
f.seek(126) # skip the two first ranges
385
self.assertEqual('AB', f.read(2))
377
self.assertEquals('AB', f.read(2))
387
379
def test_checked_read_dont_overflow_buffers(self):
381
start = self.first_range_start
389
382
# We force a very low value to exercise all code paths in _checked_read
390
383
f._discarded_buf_size = 8
391
384
f.seek(126) # skip the two first ranges
392
self.assertEqual('AB', f.read(2))
385
self.assertEquals('AB', f.read(2))
394
387
def test_seek_twice_between_ranges(self):
408
401
def test_read_at_range_end(self):
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
403
self.assertEquals(self.alpha, f.read())
404
self.assertEquals(self.alpha, f.read())
405
self.assertEquals(self.alpha.upper(), f.read())
413
406
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
416
409
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
417
"""Perform the same tests as TestRangeFileMultipleRanges, but uses
410
"""Perform the same tests as TestRangeFileMultipleRanges, but uses
418
411
an angle-bracket quoted boundary string like IIS 6.0 and 7.0
419
412
(but not IIS 5, which breaks the RFC in a different way
420
413
by using square brackets, not angle brackets)
422
This reveals a bug caused by
424
- The bad implementation of RFC 822 unquoting in Python (angles are not
425
quotes), coupled with
415
This reveals a bug caused by
417
- The bad implementation of RFC 822 unquoting in Python (angles are not
418
quotes), coupled with
427
420
- The bad implementation of RFC 2046 in IIS (angles are not permitted chars
428
421
in boundary lines).
431
424
# The boundary as it appears in boundary lines
432
425
# IIS 6 and 7 use this value
458
451
def ok(expected, header_value):
459
452
f.set_range_from_header(header_value)
460
453
# Slightly peek under the covers to get the size
461
self.assertEqual(expected, (f.tell(), f._size))
454
self.assertEquals(expected, (f.tell(), f._size))
463
456
ok((1, 10), 'bytes 1-10/11')
464
457
ok((1, 10), 'bytes 1-10/*')
811
super(TestRangeFileSizeReadLimited, self).setUp()
812
804
# create a test datablock larger than _max_read_size.
813
805
chunk_size = response.RangeFile._max_read_size
814
806
test_pattern = '0123456789ABCDEF'