14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests from HTTP response parsing."""
17
"""Tests from HTTP response parsing.
19
The handle_response method read the response body of a GET request an returns
20
the corresponding RangeFile.
22
There are four different kinds of RangeFile:
23
- a whole file whose size is unknown, seen as a simple byte stream,
24
- a whole file whose size is known, we can't read past its end,
25
- a single range file, a part of a file with a start and a size,
26
- a multiple range file, several consecutive parts with known start offset
29
Some properties are common to all kinds:
30
- seek can only be forward (its really a socket underneath),
31
- read can't cross ranges,
32
- successive ranges are taken into account transparently,
34
- the expected pattern of use is either seek(offset)+read(size) or a single
35
read with no size specified. For multiple range files, multiple read() will
36
return the corresponding ranges, trying to read further will raise
19
40
from cStringIO import StringIO
22
from bzrlib import errors
23
from bzrlib.transport import http
24
47
from bzrlib.transport.http import response
25
from bzrlib.tests import TestCase
28
class TestResponseRange(TestCase):
29
"""Test the ResponseRange class."""
32
RR = response.ResponseRange
35
self.assertTrue(r1 < r2)
36
self.assertFalse(r1 > r2)
37
self.assertTrue(r1 < 5)
38
self.assertFalse(r2 < 5)
40
self.assertEqual(RR(0, 10, 5), RR(0, 10, 5))
41
self.assertNotEqual(RR(0, 10, 5), RR(0, 8, 5))
42
self.assertNotEqual(RR(0, 10, 5), RR(0, 10, 6))
44
def test_sort_list(self):
45
"""Ensure longer ranges are sorted after shorter ones"""
46
RR = response.ResponseRange
47
lst = [RR(3, 8, 0), 5, RR(3, 7, 0), 6]
49
self.assertEqual([RR(3,7,0), RR(3,8,0), 5, 6], lst)
52
class TestRangeFile(TestCase):
56
content = "abcdefghijklmnopqrstuvwxyz"
57
self.fp = response.RangeFile('foo', StringIO(content))
58
self.fp._add_range(0, 9, 0)
59
self.fp._add_range(20, 29, 10)
60
self.fp._add_range(30, 39, 15)
62
def test_valid_accesses(self):
63
"""Test so that valid accesses work to the file."""
65
self.assertEquals(self.fp.read(3), 'abc')
66
self.assertEquals(self.fp.read(3), 'def')
67
self.assertEquals(self.fp.tell(), 6)
69
self.assertEquals(self.fp.read(3), 'klm')
70
self.assertEquals(self.fp.read(2), 'no')
71
self.assertEquals(self.fp.tell(), 25)
72
# should wrap over to 30-39 entity
73
self.assertEquals(self.fp.read(3), 'pqr')
75
self.assertEquals(self.fp.read(3), 'def')
76
self.assertEquals(self.fp.tell(), 6)
78
def test_invalid_accesses(self):
79
"""Test so that invalid accesses trigger errors."""
81
self.assertRaises(errors.InvalidRange, self.fp.read, 2)
83
self.assertRaises(errors.InvalidRange, self.fp.read, 2)
85
self.assertRaises(errors.InvalidRange, self.fp.read, 2)
87
def test__finish_ranges(self):
88
"""Test that after RangeFile._finish_ranges the list is sorted."""
89
self.fp._add_range(1, 2, 3)
90
self.fp._add_range(8, 9, 10)
91
self.fp._add_range(3, 4, 5)
93
# TODO: jam 20060706 If we switch to inserting
94
# in sorted order, remove this test
95
self.assertNotEqual(self.fp._ranges, sorted(self.fp._ranges))
97
self.fp._finish_ranges()
98
self.assertEqual(self.fp._ranges, sorted(self.fp._ranges))
100
def test_seek_and_tell(self):
101
# Check for seeking before start
103
self.assertEqual(0, self.fp.tell())
106
self.assertEqual(5, self.fp.tell())
109
self.assertEqual(3, self.fp.tell())
111
# TODO: jam 20060706 following tests will fail if this
112
# is not true, and would be difficult to debug
113
# but it is a layering violation
114
self.assertEqual(39, self.fp._len)
117
self.assertEqual(39, self.fp.tell())
120
self.assertEqual(29, self.fp.tell())
122
self.assertRaises(ValueError, self.fp.seek, 0, 4)
123
self.assertRaises(ValueError, self.fp.seek, 0, -1)
126
class TestRegexes(TestCase):
128
def assertRegexMatches(self, groups, text):
129
"""Check that the regex matches and returns the right values"""
130
m = self.regex.match(text)
131
self.assertNotEqual(None, m, "text %s did not match regex" % (text,))
133
self.assertEqual(groups, m.groups())
135
def test_range_re(self):
136
"""Test that we match valid ranges."""
137
self.regex = response.HttpRangeResponse._CONTENT_RANGE_RE
138
self.assertRegexMatches(('bytes', '1', '10', '11'),
140
self.assertRegexMatches(('bytes', '1', '10', '11'),
142
self.assertRegexMatches(('bytes', '2123', '4242', '1231'),
143
'\tbytes 2123-4242/1231 ')
144
self.assertRegexMatches(('chars', '1', '2', '3'),
147
def test_content_type_re(self):
148
self.regex = response.HttpMultipartRangeResponse._CONTENT_TYPE_RE
149
self.assertRegexMatches(('', 'xxyyzz'),
150
'multipart/byteranges; boundary = xxyyzz')
151
self.assertRegexMatches(('', 'xxyyzz'),
152
'multipart/byteranges;boundary=xxyyzz')
153
self.assertRegexMatches(('', 'xx yy zz'),
154
' multipart/byteranges ; boundary= xx yy zz ')
155
self.assertRegexMatches(('"', 'xx yy zz'),
156
' multipart/byteranges ; boundary= "xx yy zz" ')
157
self.assertEqual(None,
159
' multipart/byteranges ; boundary= "xx yy zz '))
160
self.assertEqual(None,
162
' multipart/byteranges ; boundary= xx yy zz" '))
163
self.assertEqual(None,
164
self.regex.match('multipart byteranges;boundary=xx'))
170
Content-range: bytes 1-10/20\r
174
Content-Range: bytes 21-30/20\r
179
content-range: bytes 41-50/20\r
183
content-range: bytes 51-60/20\r
189
class TestHelpers(TestCase):
190
"""Test the helper functions"""
192
def test__parse_range(self):
193
"""Test that _parse_range acts reasonably."""
194
content = StringIO('')
195
parse_range = response.HttpRangeResponse._parse_range
196
self.assertEqual((1,2), parse_range('bytes 1-2/3'))
197
self.assertEqual((10,20), parse_range('bytes 10-20/2'))
199
self.assertRaises(errors.InvalidHttpRange, parse_range, 'char 1-3/2')
200
self.assertRaises(errors.InvalidHttpRange, parse_range, 'bytes a-3/2')
203
parse_range('bytes x-10/3', path='http://foo/bar')
204
except errors.InvalidHttpRange, e:
205
self.assertContainsRe(str(e), 'http://foo/bar')
206
self.assertContainsRe(str(e), 'bytes x-10/3')
208
self.fail('Did not raise InvalidHttpRange')
210
def test__parse_boundary_simple(self):
211
"""Test that _parse_boundary handles Content-type properly"""
212
parse_boundary = response.HttpMultipartRangeResponse._parse_boundary
213
m = parse_boundary(' multipart/byteranges; boundary=xxyyzz')
214
self.assertNotEqual(None, m)
215
# Check that the returned regex is capable of splitting simple_data
216
matches = list(m.finditer(simple_data))
217
self.assertEqual(4, len(matches))
219
# match.group() should be the content-range entry
220
# and match.end() should be the start of the content
221
self.assertEqual(' bytes 1-10/20', matches[0].group(1))
222
self.assertEqual(simple_data.find('1234567890'), matches[0].end())
223
self.assertEqual(' bytes 21-30/20', matches[1].group(1))
224
self.assertEqual(simple_data.find('abcdefghij'), matches[1].end())
225
self.assertEqual(' bytes 41-50/20', matches[2].group(1))
226
self.assertEqual(simple_data.find('zyxwvutsrq'), matches[2].end())
227
self.assertEqual(' bytes 51-60/20', matches[3].group(1))
228
self.assertEqual(simple_data.find('xxyyzz fbd'), matches[3].end())
230
def test__parse_boundary_invalid(self):
231
parse_boundary = response.HttpMultipartRangeResponse._parse_boundary
233
parse_boundary(' multipart/bytes;boundary=xxyyzz',
234
path='http://foo/bar')
235
except errors.InvalidHttpContentType, e:
236
self.assertContainsRe(str(e), 'http://foo/bar')
237
self.assertContainsRe(str(e), 'multipart/bytes;boundary=xxyyzz')
239
self.fail('Did not raise InvalidHttpContentType')
242
class TestHttpRangeResponse(TestCase):
244
def test_smoketest(self):
245
"""A basic test that HttpRangeResponse is reasonable."""
246
content = StringIO('0123456789')
247
f = response.HttpRangeResponse('http://foo', 'bytes 1-10/9', content)
248
self.assertEqual([response.ResponseRange(1,10,0)], f._ranges)
50
class TestRangeFileMixin(object):
51
"""Tests for accessing the first range in a RangeFile."""
53
# A simple string used to represent a file part (also called a range), in
54
# which offsets are easy to calculate for test writers. It's used as a
55
# building block with slight variations but basically 'a' is the first char
56
# of the range and 'z' is the last.
57
alpha = 'abcdefghijklmnopqrstuvwxyz'
59
def test_can_read_at_first_access(self):
60
"""Test that the just created file can be read."""
61
self.assertEquals(self.alpha, self._file.read())
63
def test_seek_read(self):
64
"""Test seek/read inside the range."""
66
start = self.first_range_start
67
# Before any use, tell() should be at the range start
68
self.assertEquals(start, f.tell())
69
cur = start # For an overall offset assertion
72
self.assertEquals('def', f.read(3))
76
self.assertEquals('klmn', f.read(4))
78
# read(0) in the middle of a range
79
self.assertEquals('', f.read(0))
83
self.assertEquals(here, f.tell())
84
self.assertEquals(cur, f.tell())
86
def test_read_zero(self):
88
start = self.first_range_start
89
self.assertEquals('', f.read(0))
91
self.assertEquals('', f.read(0))
93
def test_seek_at_range_end(self):
97
def test_read_at_range_end(self):
98
"""Test read behaviour at range end."""
100
self.assertEquals(self.alpha, f.read())
101
self.assertEquals('', f.read(0))
102
self.assertRaises(errors.InvalidRange, f.read, 1)
104
def test_unbounded_read_after_seek(self):
107
# Should not cross ranges
108
self.assertEquals('yz', f.read())
110
def test_seek_backwards(self):
112
start = self.first_range_start
115
self.assertRaises(errors.InvalidRange, f.seek, start + 5)
117
def test_seek_outside_single_range(self):
119
if f._size == -1 or f._boundary is not None:
120
raise tests.TestNotApplicable('Needs a fully defined range')
121
# Will seek past the range and then errors out
122
self.assertRaises(errors.InvalidRange,
123
f.seek, self.first_range_start + 27)
125
def test_read_past_end_of_range(self):
128
raise tests.TestNotApplicable("Can't check an unknown size")
129
start = self.first_range_start
131
self.assertRaises(errors.InvalidRange, f.read, 10)
133
def test_seek_from_end(self):
134
"""Test seeking from the end of the file.
136
The semantic is unclear in case of multiple ranges. Seeking from end
137
exists only for the http transports, cannot be used if the file size is
138
unknown and is not used in bzrlib itself. This test must be (and is)
139
overridden by daughter classes.
141
Reading from end makes sense only when a range has been requested from
142
the end of the file (see HttpTransportBase._get() when using the
143
'tail_amount' parameter). The HTTP response can only be a whole file or
148
self.assertEquals('yz', f.read())
151
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
152
"""Test a RangeFile for a whole file whose size is not known."""
155
super(TestRangeFileSizeUnknown, self).setUp()
156
self._file = response.RangeFile('Whole_file_size_known',
157
StringIO(self.alpha))
158
# We define no range, relying on RangeFile to provide default values
159
self.first_range_start = 0 # It's the whole file
161
def test_seek_from_end(self):
162
"""See TestRangeFileMixin.test_seek_from_end.
164
The end of the file can't be determined since the size is unknown.
166
self.assertRaises(errors.InvalidRange, self._file.seek, -1, 2)
168
def test_read_at_range_end(self):
169
"""Test read behaviour at range end."""
171
self.assertEquals(self.alpha, f.read())
172
self.assertEquals('', f.read(0))
173
self.assertEquals('', f.read(1))
175
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
176
"""Test a RangeFile for a whole file whose size is known."""
179
super(TestRangeFileSizeKnown, self).setUp()
180
self._file = response.RangeFile('Whole_file_size_known',
181
StringIO(self.alpha))
182
self._file.set_range(0, len(self.alpha))
183
self.first_range_start = 0 # It's the whole file
186
class TestRangeFileSingleRange(tests.TestCase, TestRangeFileMixin):
187
"""Test a RangeFile for a single range."""
190
super(TestRangeFileSingleRange, self).setUp()
191
self._file = response.RangeFile('Single_range_file',
192
StringIO(self.alpha))
193
self.first_range_start = 15
194
self._file.set_range(self.first_range_start, len(self.alpha))
197
def test_read_before_range(self):
198
# This can't occur under normal circumstances, we have to force it
200
f._pos = 0 # Force an invalid pos
201
self.assertRaises(errors.InvalidRange, f.read, 2)
203
class TestRangeFilMultipleRanges(tests.TestCase, TestRangeFileMixin):
204
"""Test a RangeFile for multiple ranges.
206
The RangeFile used for the tests contains three ranges:
208
- at offset 25: alpha
209
- at offset 100: alpha
210
- at offset 126: alpha.upper()
212
The two last ranges are contiguous. This only rarely occurs (should not in
213
fact) in real uses but may lead to hard to track bugs.
217
super(TestRangeFilMultipleRanges, self).setUp()
219
boundary = 'separation'
222
self.first_range_start = 25
223
file_size = 200 # big enough to encompass all ranges
224
for (start, part) in [(self.first_range_start, self.alpha),
225
# Two contiguous ranges
227
(126, self.alpha.upper())]:
228
content += self._multipart_byterange(part, start, boundary,
231
content += self._boundary_line(boundary)
233
self._file = response.RangeFile('Multiple_ranges_file',
235
# Ranges are set by decoding the range headers, the RangeFile user is
236
# supposed to call the following before using seek or read since it
237
# requires knowing the *response* headers (in that case the boundary
238
# which is part of the Content-Type header).
239
self._file.set_boundary(boundary)
241
def _boundary_line(self, boundary):
242
"""Helper to build the formatted boundary line."""
243
return '--' + boundary + '\r\n'
245
def _multipart_byterange(self, data, offset, boundary, file_size='*'):
246
"""Encode a part of a file as a multipart/byterange MIME type.
248
When a range request is issued, the HTTP response body can be
249
decomposed in parts, each one representing a range (start, size) in a
252
:param data: The payload.
253
:param offset: where data starts in the file
254
:param boundary: used to separate the parts
255
:param file_size: the size of the file containing the range (default to
258
:return: a string containing the data encoded as it will appear in the
261
bline = self._boundary_line(boundary)
262
# Each range begins with a boundary line
264
# A range is described by a set of headers, but only 'Content-Range' is
265
# required for our implementation (TestHandleResponse below will
266
# exercise ranges with multiple or missing headers')
267
range += 'Content-Range: bytes %d-%d/%d\r\n' % (offset,
271
# Finally the raw bytes
275
def test_read_all_ranges(self):
277
self.assertEquals(self.alpha, f.read()) # Read first range
278
f.seek(100) # Trigger the second range recognition
279
self.assertEquals(self.alpha, f.read()) # Read second range
280
self.assertEquals(126, f.tell())
281
f.seek(126) # Start of third range which is also the current pos !
282
self.assertEquals('A', f.read(1))
284
self.assertEquals('LMN', f.read(3))
286
def test_seek_from_end(self):
287
"""See TestRangeFileMixin.test_seek_from_end."""
288
# The actual implementation will seek from end for the first range only
289
# and then fail. Since seeking from end is intended to be used for a
290
# single range only anyway, this test just document the actual
294
self.assertEquals('yz', f.read())
295
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
297
def test_seek_into_void(self):
299
start = self.first_range_start
301
# Seeking to a point between two ranges is possible (only once) but
302
# reading there is forbidden
304
# We crossed a range boundary, so now the file is positioned at the
305
# start of the new range (i.e. trying to seek below 100 will error out)
309
def test_seek_across_ranges(self):
311
start = self.first_range_start
312
f.seek(126) # skip the two first ranges
313
self.assertEquals('AB', f.read(2))
315
def test_seek_twice_between_ranges(self):
317
start = self.first_range_start
318
f.seek(start + 40) # Past the first range but before the second
319
# Now the file is positioned at the second range start (100)
320
self.assertRaises(errors.InvalidRange, f.seek, start + 41)
322
def test_seek_at_range_end(self):
323
"""Test seek behavior at range end."""
329
def test_read_at_range_end(self):
331
self.assertEquals(self.alpha, f.read())
332
self.assertEquals(self.alpha, f.read())
333
self.assertEquals(self.alpha.upper(), f.read())
334
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
337
class TestRangeFileVarious(tests.TestCase):
338
"""Tests RangeFile aspects not covered elsewhere."""
340
def test_seek_whence(self):
341
"""Test the seek whence parameter values."""
342
f = response.RangeFile('foo', StringIO('abc'))
251
self.assertRaises(errors.InvalidRange, f.read, 2)
253
self.assertEqual('012345', f.read(6))
255
def test_invalid(self):
257
f = response.HttpRangeResponse('http://foo', 'bytes x-10/9',
258
StringIO('0123456789'))
259
except errors.InvalidHttpRange, e:
260
self.assertContainsRe(str(e), 'http://foo')
261
self.assertContainsRe(str(e), 'bytes x-10/9')
263
self.fail('Failed to raise InvalidHttpRange')
266
class TestHttpMultipartRangeResponse(TestCase):
267
"""Test the handling of multipart range responses"""
269
def test_simple(self):
270
content = StringIO(simple_data)
271
multi = response.HttpMultipartRangeResponse('http://foo',
272
'multipart/byteranges; boundary = xxyyzz', content)
274
self.assertEqual(4, len(multi._ranges))
277
self.assertEqual('1234567890', multi.read(10))
279
self.assertEqual('abcdefghij', multi.read(10))
281
self.assertEqual('zyxwvutsrq', multi.read(10))
283
self.assertEqual('xxyyzz fbd', multi.read(10))
284
# TODO: jam 20060706 Currently RangeFile does not support
285
# reading across ranges. Consider adding it.
287
# self.assertEqual('zyxwvutsrqxxyyzz fbd', multi.read(20))
288
self.assertRaises(errors.InvalidRange, multi.read, 20)
291
self.assertRaises(errors.InvalidRange, multi.read, 11)
293
self.assertRaises(errors.InvalidRange, multi.read, 10)
295
def test_invalid(self):
296
content = StringIO('')
298
response.HttpMultipartRangeResponse('http://foo',
299
'multipart/byte;boundary=invalid', content)
300
except errors.InvalidHttpContentType, e:
301
self.assertContainsRe(str(e), 'http://foo')
302
self.assertContainsRe(str(e), 'multipart/byte;')
347
self.assertRaises(ValueError, f.seek, 0, 14)
349
def test_range_syntax(self):
350
"""Test the Content-Range scanning."""
352
f = response.RangeFile('foo', StringIO())
354
def ok(expected, header_value):
355
f.set_range_from_header(header_value)
356
# Slightly peek under the covers to get the size
357
self.assertEquals(expected, (f.tell(), f._size))
359
ok((1, 10), 'bytes 1-10/11')
360
ok((1, 10), 'bytes 1-10/*')
361
ok((12, 2), '\tbytes 12-13/*')
362
ok((28, 1), ' bytes 28-28/*')
363
ok((2123, 2120), 'bytes 2123-4242/12310')
364
ok((1, 10), 'bytes 1-10/ttt') # We don't check total (ttt)
366
def nok(header_value):
367
self.assertRaises(errors.InvalidHttpRange,
368
f.set_range_from_header, header_value)
372
nok('bytes xx-yyy/zzz')
373
nok('bytes xx-12/zzz')
374
nok('bytes 11-yy/zzz')
305
378
# Taken from real request responses
493
# This should be in test_http.py, but the headers we
494
# want to parse are here
495
class TestExtractHeader(TestCase):
497
def use_response(self, response):
498
self.headers = http._extract_headers(response[1], 'http://foo')
500
def check_header(self, header, value):
501
self.assertEqual(value, self.headers[header])
503
def test_full_text(self):
504
self.use_response(_full_text_response)
506
self.check_header('Date', 'Tue, 11 Jul 2006 04:32:56 GMT')
507
self.check_header('date', 'Tue, 11 Jul 2006 04:32:56 GMT')
508
self.check_header('Content-Length', '35')
509
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
510
self.check_header('content-type', 'text/plain; charset=UTF-8')
512
def test_missing_response(self):
513
self.use_response(_missing_response)
515
self.check_header('Content-Length', '336')
516
self.check_header('Content-Type', 'text/html; charset=iso-8859-1')
518
def test_single_range(self):
519
self.use_response(_single_range_response)
521
self.check_header('Content-Length', '100')
522
self.check_header('Content-Range', 'bytes 100-199/93890')
523
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
525
def test_single_range_no_content(self):
526
self.use_response(_single_range_no_content_type)
528
self.check_header('Content-Length', '100')
529
self.check_header('Content-Range', 'bytes 100-199/93890')
531
def test_multi_range(self):
532
self.use_response(_multipart_range_response)
534
self.check_header('Content-Length', '1534')
535
self.check_header('Content-Type',
536
'multipart/byteranges; boundary=418470f848b63279b')
538
def test_multi_squid_range(self):
539
self.use_response(_multipart_squid_range_response)
541
self.check_header('Content-Length', '598')
542
self.check_header('Content-Type',
543
'multipart/byteranges; '\
544
'boundary="squid/2.5.STABLE12:C99323425AD4FE26F726261FA6C24196"')
546
def test_redirect(self):
547
"""We default to returning the last group of headers in the file."""
548
self.use_response(_redirect_response)
549
self.check_header('Content-Range', 'bytes 8623075-8623499/8623500')
550
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
552
def test_empty(self):
553
self.assertRaises(errors.InvalidHttpResponse,
554
http._extract_headers, '', 'bad url')
556
def test_no_opening_http(self):
557
# Remove the HTTP line from the header
558
first, txt = _full_text_response[1].split('\r\n', 1)
559
self.assertRaises(errors.InvalidHttpResponse,
560
http._extract_headers, txt, 'missing HTTTP')
562
def test_trailing_whitespace(self):
563
# Test that we ignore bogus whitespace on the end
564
code, txt, body = _full_text_response
565
txt += '\r\n\n\n\n\n'
566
self.use_response((code, txt, body))
568
self.check_header('Date', 'Tue, 11 Jul 2006 04:32:56 GMT')
569
self.check_header('Content-Length', '35')
570
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
572
def test_trailing_non_http(self):
573
# Test that we ignore bogus stuff on the end
574
code, txt, body = _full_text_response
575
txt = txt + 'Foo: Bar\r\nBaz: Bling\r\n\r\n'
576
self.use_response((code, txt, body))
578
self.check_header('Date', 'Tue, 11 Jul 2006 04:32:56 GMT')
579
self.check_header('Content-Length', '35')
580
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
581
self.assertRaises(KeyError, self.headers.__getitem__, 'Foo')
583
def test_extra_whitespace(self):
584
# Test that we read an HTTP response, even with extra whitespace
585
code, txt, body = _redirect_response
586
# Find the second HTTP location
587
loc = txt.find('HTTP', 5)
588
txt = txt[:loc] + '\r\n\n' + txt[loc:]
589
self.use_response((code, txt, body))
590
self.check_header('Content-Range', 'bytes 8623075-8623499/8623500')
591
self.check_header('Content-Type', 'text/plain; charset=UTF-8')
594
class TestHandleResponse(TestCase):
574
_multipart_no_content_range = (206, """HTTP/1.0 206 Partial Content\r
575
Content-Type: multipart/byteranges; boundary=THIS_SEPARATES\r
576
Content-Length: 598\r
581
Content-Type: text/plain\r
588
_multipart_no_boundary = (206, """HTTP/1.0 206 Partial Content\r
589
Content-Type: multipart/byteranges; boundary=THIS_SEPARATES\r
590
Content-Length: 598\r
595
Content-Type: text/plain\r
596
Content-Range: bytes 0-18/18672\r
600
The range ended at the line above, this text is garbage instead of a boundary
605
class TestHandleResponse(tests.TestCase):
607
def _build_HTTPMessage(self, raw_headers):
608
status_and_headers = StringIO(raw_headers)
609
# Get rid of the status line
610
status_and_headers.readline()
611
msg = httplib.HTTPMessage(status_and_headers)
596
614
def get_response(self, a_response):
597
615
"""Process a supplied response, and return the result."""
598
headers = http._extract_headers(a_response[1], 'http://foo')
599
return response.handle_response('http://foo', a_response[0], headers,
616
code, raw_headers, body = a_response
617
msg = self._build_HTTPMessage(raw_headers)
618
return response.handle_response('http://foo', code, msg,
600
619
StringIO(a_response[2]))
602
621
def test_full_text(self):