1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
1
# Copyright (C) 2006-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
78
class TestResponseFileIter(tests.TestCase):
80
def test_iter_empty(self):
81
f = response.ResponseFile('empty', StringIO())
82
self.assertEqual([], list(f))
84
def test_iter_many(self):
85
f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
89
78
class TestHTTPConnection(tests.TestCase):
91
80
def test_cleanup_pipe(self):
103
92
# Now, get the response
104
93
resp = conn.getresponse()
105
94
# Read part of the response
106
self.assertEqual('0123456789\n', resp.read(11))
95
self.assertEquals('0123456789\n', resp.read(11))
107
96
# Override the thresold to force the warning emission
108
97
conn._range_warning_thresold = 6 # There are 7 bytes pending
109
98
conn.cleanup_pipe()
122
111
def test_can_read_at_first_access(self):
123
112
"""Test that the just created file can be read."""
124
self.assertEqual(self.alpha, self._file.read())
113
self.assertEquals(self.alpha, self._file.read())
126
115
def test_seek_read(self):
127
116
"""Test seek/read inside the range."""
129
118
start = self.first_range_start
130
119
# Before any use, tell() should be at the range start
131
self.assertEqual(start, f.tell())
120
self.assertEquals(start, f.tell())
132
121
cur = start # For an overall offset assertion
133
122
f.seek(start + 3)
135
self.assertEqual('def', f.read(3))
124
self.assertEquals('def', f.read(3))
136
125
cur += len('def')
139
self.assertEqual('klmn', f.read(4))
128
self.assertEquals('klmn', f.read(4))
140
129
cur += len('klmn')
141
130
# read(0) in the middle of a range
142
self.assertEqual('', f.read(0))
131
self.assertEquals('', f.read(0))
146
self.assertEqual(here, f.tell())
147
self.assertEqual(cur, f.tell())
135
self.assertEquals(here, f.tell())
136
self.assertEquals(cur, f.tell())
149
138
def test_read_zero(self):
151
self.assertEqual('', f.read(0))
140
start = self.first_range_start
141
self.assertEquals('', f.read(0))
153
self.assertEqual('', f.read(0))
143
self.assertEquals('', f.read(0))
155
145
def test_seek_at_range_end(self):
159
149
def test_read_at_range_end(self):
160
150
"""Test read behaviour at range end."""
162
self.assertEqual(self.alpha, f.read())
163
self.assertEqual('', f.read(0))
152
self.assertEquals(self.alpha, f.read())
153
self.assertEquals('', f.read(0))
164
154
self.assertRaises(errors.InvalidRange, f.read, 1)
166
156
def test_unbounded_read_after_seek(self):
169
159
# Should not cross ranges
170
self.assertEqual('yz', f.read())
160
self.assertEquals('yz', f.read())
172
162
def test_seek_backwards(self):
210
self.assertEqual('yz', f.read())
200
self.assertEquals('yz', f.read())
213
203
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
220
def test_read_at_range_end(self):
231
221
"""Test read behaviour at range end."""
233
self.assertEqual(self.alpha, f.read())
234
self.assertEqual('', f.read(0))
235
self.assertEqual('', f.read(1))
223
self.assertEquals(self.alpha, f.read())
224
self.assertEquals('', f.read(0))
225
self.assertEquals('', f.read(1))
238
228
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
348
338
def test_read_all_ranges(self):
350
self.assertEqual(self.alpha, f.read()) # Read first range
340
self.assertEquals(self.alpha, f.read()) # Read first range
351
341
f.seek(100) # Trigger the second range recognition
352
self.assertEqual(self.alpha, f.read()) # Read second range
353
self.assertEqual(126, f.tell())
342
self.assertEquals(self.alpha, f.read()) # Read second range
343
self.assertEquals(126, f.tell())
354
344
f.seek(126) # Start of third range which is also the current pos !
355
self.assertEqual('A', f.read(1))
345
self.assertEquals('A', f.read(1))
357
self.assertEqual('LMN', f.read(3))
347
self.assertEquals('LMN', f.read(3))
359
349
def test_seek_from_end(self):
360
350
"""See TestRangeFileMixin.test_seek_from_end."""
367
self.assertEqual('yz', f.read())
357
self.assertEquals('yz', f.read())
368
358
self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
370
360
def test_seek_into_void(self):
382
372
def test_seek_across_ranges(self):
374
start = self.first_range_start
384
375
f.seek(126) # skip the two first ranges
385
self.assertEqual('AB', f.read(2))
376
self.assertEquals('AB', f.read(2))
387
378
def test_checked_read_dont_overflow_buffers(self):
380
start = self.first_range_start
389
381
# We force a very low value to exercise all code paths in _checked_read
390
382
f._discarded_buf_size = 8
391
383
f.seek(126) # skip the two first ranges
392
self.assertEqual('AB', f.read(2))
384
self.assertEquals('AB', f.read(2))
394
386
def test_seek_twice_between_ranges(self):
408
400
def test_read_at_range_end(self):
410
self.assertEqual(self.alpha, f.read())
411
self.assertEqual(self.alpha, f.read())
412
self.assertEqual(self.alpha.upper(), f.read())
402
self.assertEquals(self.alpha, f.read())
403
self.assertEquals(self.alpha, f.read())
404
self.assertEquals(self.alpha.upper(), f.read())
413
405
self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
458
450
def ok(expected, header_value):
459
451
f.set_range_from_header(header_value)
460
452
# Slightly peek under the covers to get the size
461
self.assertEqual(expected, (f.tell(), f._size))
453
self.assertEquals(expected, (f.tell(), f._size))
463
455
ok((1, 10), 'bytes 1-10/11')
464
456
ok((1, 10), 'bytes 1-10/*')
811
super(TestRangeFileSizeReadLimited, self).setUp()
803
tests.TestCase.setUp(self)
812
804
# create a test datablock larger than _max_read_size.
813
805
chunk_size = response.RangeFile._max_read_size
814
806
test_pattern = '0123456789ABCDEF'