1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
78
89
class TestHTTPConnection(tests.TestCase):
80
91
def test_cleanup_pipe(self):
92
103
# Now, get the response
93
104
resp = conn.getresponse()
94
105
# Read part of the response
95
self.assertEquals('0123456789\n', resp.read(11))
106
self.assertEqual('0123456789\n', resp.read(11))
96
107
# Override the thresold to force the warning emission
97
108
conn._range_warning_thresold = 6 # There are 7 bytes pending
98
109
conn.cleanup_pipe()
99
self.assertContainsRe(self._get_log(keep_log_file=True),
100
'Got a 200 response when asking')
110
self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
103
113
class TestRangeFileMixin(object):
112
122
def test_can_read_at_first_access(self):
113
123
"""Test that the just created file can be read."""
114
self.assertEquals(self.alpha, self._file.read())
124
self.assertEqual(self.alpha, self._file.read())
116
126
def test_seek_read(self):
117
127
"""Test seek/read inside the range."""
119
129
start = self.first_range_start
120
130
# Before any use, tell() should be at the range start
121
self.assertEquals(start, f.tell())
131
self.assertEqual(start, f.tell())
122
132
cur = start # For an overall offset assertion
123
133
f.seek(start + 3)
125
self.assertEquals('def', f.read(3))
135
self.assertEqual('def', f.read(3))
126
136
cur += len('def')
129
self.assertEquals('klmn', f.read(4))
139
self.assertEqual('klmn', f.read(4))
130
140
cur += len('klmn')
131
141
# read(0) in the middle of a range
132
self.assertEquals('', f.read(0))
142
self.assertEqual('', f.read(0))
136
self.assertEquals(here, f.tell())
137
self.assertEquals(cur, f.tell())
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
139
149
def test_read_zero(self):
141
start = self.first_range_start
142
self.assertEquals('', f.read(0))
151
self.assertEqual('', f.read(0))
144
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
146
155
def test_seek_at_range_end(self):
150
159
def test_read_at_range_end(self):
151
160
"""Test read behaviour at range end."""
153
self.assertEquals(self.alpha, f.read())
154
self.assertEquals('', f.read(0))
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
155
164
self.assertRaises(errors.InvalidRange, f.read, 1)
157
166
def test_unbounded_read_after_seek(self):
160
169
# Should not cross ranges
161
self.assertEquals('yz', f.read())
170
self.assertEqual('yz', f.read())
163
172
def test_seek_backwards(self):
201
self.assertEquals('yz', f.read())
210
self.assertEqual('yz', f.read())
204
213
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
221
230
def test_read_at_range_end(self):
222
231
"""Test read behaviour at range end."""
224
self.assertEquals(self.alpha, f.read())
225
self.assertEquals('', f.read(0))
226
self.assertEquals('', f.read(1))
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
229
238
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
339
348
def test_read_all_ranges(self):
341
self.assertEquals(self.alpha, f.read()) # Read first range
350
self.assertEqual(self.alpha, f.read()) # Read first range
342
351
f.seek(100) # Trigger the second range recognition
343
self.assertEquals(self.alpha, f.read()) # Read second range
344
self.assertEquals(126, f.tell())
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
345
354
f.seek(126) # Start of third range which is also the current pos !
346
self.assertEquals('A', f.read(1))
355
self.assertEqual('A', f.read(1))
348
self.assertEquals('LMN', f.read(3))
357
self.assertEqual('LMN', f.read(3))
350
359
def test_seek_from_end(self):
351
360
"""See TestRangeFileMixin.test_seek_from_end."""
358
self.assertEquals('yz', f.read())
367
self.assertEqual('yz', f.read())
359
368
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
361
370
def test_seek_into_void(self):
373
382
def test_seek_across_ranges(self):
375
start = self.first_range_start
376
384
f.seek(126) # skip the two first ranges
377
self.assertEquals('AB', f.read(2))
385
self.assertEqual('AB', f.read(2))
379
387
def test_checked_read_dont_overflow_buffers(self):
381
start = self.first_range_start
382
389
# We force a very low value to exercise all code paths in _checked_read
383
390
f._discarded_buf_size = 8
384
391
f.seek(126) # skip the two first ranges
385
self.assertEquals('AB', f.read(2))
392
self.assertEqual('AB', f.read(2))
387
394
def test_seek_twice_between_ranges(self):
401
408
def test_read_at_range_end(self):
403
self.assertEquals(self.alpha, f.read())
404
self.assertEquals(self.alpha, f.read())
405
self.assertEquals(self.alpha.upper(), f.read())
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
406
413
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
451
458
def ok(expected, header_value):
452
459
f.set_range_from_header(header_value)
453
460
# Slightly peek under the covers to get the size
454
self.assertEquals(expected, (f.tell(), f._size))
461
self.assertEqual(expected, (f.tell(), f._size))
456
463
ok((1, 10), 'bytes 1-10/11')
457
464
ok((1, 10), 'bytes 1-10/*')
804
tests.TestCase.setUp(self)
811
super(TestRangeFileSizeReadLimited, self).setUp()
805
812
# create a test datablock larger than _max_read_size.
806
813
chunk_size = response.RangeFile._max_read_size
807
814
test_pattern = '0123456789ABCDEF'