1
# Copyright (C) 2006-2010 Canonical Ltd
1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
78
89
class TestHTTPConnection(tests.TestCase):
80
91
def test_cleanup_pipe(self):
92
103
# Now, get the response
93
104
resp = conn.getresponse()
94
105
# Read part of the response
95
self.assertEquals('0123456789\n', resp.read(11))
106
self.assertEqual('0123456789\n', resp.read(11))
96
107
# Override the thresold to force the warning emission
97
108
conn._range_warning_thresold = 6 # There are 7 bytes pending
98
109
conn.cleanup_pipe()
111
122
def test_can_read_at_first_access(self):
112
123
"""Test that the just created file can be read."""
113
self.assertEquals(self.alpha, self._file.read())
124
self.assertEqual(self.alpha, self._file.read())
115
126
def test_seek_read(self):
116
127
"""Test seek/read inside the range."""
118
129
start = self.first_range_start
119
130
# Before any use, tell() should be at the range start
120
self.assertEquals(start, f.tell())
131
self.assertEqual(start, f.tell())
121
132
cur = start # For an overall offset assertion
122
133
f.seek(start + 3)
124
self.assertEquals('def', f.read(3))
135
self.assertEqual('def', f.read(3))
125
136
cur += len('def')
128
self.assertEquals('klmn', f.read(4))
139
self.assertEqual('klmn', f.read(4))
129
140
cur += len('klmn')
130
141
# read(0) in the middle of a range
131
self.assertEquals('', f.read(0))
142
self.assertEqual('', f.read(0))
135
self.assertEquals(here, f.tell())
136
self.assertEquals(cur, f.tell())
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
138
149
def test_read_zero(self):
140
start = self.first_range_start
141
self.assertEquals('', f.read(0))
151
self.assertEqual('', f.read(0))
143
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
145
155
def test_seek_at_range_end(self):
149
159
def test_read_at_range_end(self):
150
160
"""Test read behaviour at range end."""
152
self.assertEquals(self.alpha, f.read())
153
self.assertEquals('', f.read(0))
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
154
164
self.assertRaises(errors.InvalidRange, f.read, 1)
156
166
def test_unbounded_read_after_seek(self):
159
169
# Should not cross ranges
160
self.assertEquals('yz', f.read())
170
self.assertEqual('yz', f.read())
162
172
def test_seek_backwards(self):
200
self.assertEquals('yz', f.read())
210
self.assertEqual('yz', f.read())
203
213
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
220
230
def test_read_at_range_end(self):
221
231
"""Test read behaviour at range end."""
223
self.assertEquals(self.alpha, f.read())
224
self.assertEquals('', f.read(0))
225
self.assertEquals('', f.read(1))
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
228
238
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
338
348
def test_read_all_ranges(self):
340
self.assertEquals(self.alpha, f.read()) # Read first range
350
self.assertEqual(self.alpha, f.read()) # Read first range
341
351
f.seek(100) # Trigger the second range recognition
342
self.assertEquals(self.alpha, f.read()) # Read second range
343
self.assertEquals(126, f.tell())
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
344
354
f.seek(126) # Start of third range which is also the current pos !
345
self.assertEquals('A', f.read(1))
355
self.assertEqual('A', f.read(1))
347
self.assertEquals('LMN', f.read(3))
357
self.assertEqual('LMN', f.read(3))
349
359
def test_seek_from_end(self):
350
360
"""See TestRangeFileMixin.test_seek_from_end."""
357
self.assertEquals('yz', f.read())
367
self.assertEqual('yz', f.read())
358
368
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
360
370
def test_seek_into_void(self):
372
382
def test_seek_across_ranges(self):
374
start = self.first_range_start
375
384
f.seek(126) # skip the two first ranges
376
self.assertEquals('AB', f.read(2))
385
self.assertEqual('AB', f.read(2))
378
387
def test_checked_read_dont_overflow_buffers(self):
380
start = self.first_range_start
381
389
# We force a very low value to exercise all code paths in _checked_read
382
390
f._discarded_buf_size = 8
383
391
f.seek(126) # skip the two first ranges
384
self.assertEquals('AB', f.read(2))
392
self.assertEqual('AB', f.read(2))
386
394
def test_seek_twice_between_ranges(self):
400
408
def test_read_at_range_end(self):
402
self.assertEquals(self.alpha, f.read())
403
self.assertEquals(self.alpha, f.read())
404
self.assertEquals(self.alpha.upper(), f.read())
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
405
413
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
450
458
def ok(expected, header_value):
451
459
f.set_range_from_header(header_value)
452
460
# Slightly peek under the covers to get the size
453
self.assertEquals(expected, (f.tell(), f._size))
461
self.assertEqual(expected, (f.tell(), f._size))
455
463
ok((1, 10), 'bytes 1-10/11')
456
464
ok((1, 10), 'bytes 1-10/*')
803
tests.TestCase.setUp(self)
811
super(TestRangeFileSizeReadLimited, self).setUp()
804
812
# create a test datablock larger than _max_read_size.
805
813
chunk_size = response.RangeFile._max_read_size
806
814
test_pattern = '0123456789ABCDEF'