~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http_response.py

  • Committer: Alexander Belchenko
  • Date: 2008-02-07 07:53:19 UTC
  • mto: This revision was merged to the branch mainline in revision 3231.
  • Revision ID: bialix@ukr.net-20080207075319-d9jj7as8i5ztiff7
some cleanup in package_mf.py; mention about load_from_zip() deprecation in NEWS.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests from HTTP response parsing.
18
18
 
48
48
    response,
49
49
    _urllib2_wrappers,
50
50
    )
51
 
from bzrlib.tests.file_utils import (
52
 
    FakeReadFile,
53
 
    )
54
51
 
55
52
 
56
53
class ReadSocket(object):
62
59
    def makefile(self, mode='r', bufsize=None):
63
60
        return self.readfile
64
61
 
65
 
 
66
62
class FakeHTTPConnection(_urllib2_wrappers.HTTPConnection):
67
63
 
68
64
    def __init__(self, sock):
75
71
        pass
76
72
 
77
73
 
78
 
class TestResponseFileIter(tests.TestCase):
79
 
 
80
 
    def test_iter_empty(self):
81
 
        f = response.ResponseFile('empty', StringIO())
82
 
        self.assertEqual([], list(f))
83
 
 
84
 
    def test_iter_many(self):
85
 
        f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
 
        self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
87
 
 
88
 
 
89
74
class TestHTTPConnection(tests.TestCase):
90
75
 
91
76
    def test_cleanup_pipe(self):
103
88
        # Now, get the response
104
89
        resp = conn.getresponse()
105
90
        # Read part of the response
106
 
        self.assertEqual('0123456789\n', resp.read(11))
 
91
        self.assertEquals('0123456789\n', resp.read(11))
107
92
        # Override the thresold to force the warning emission
108
93
        conn._range_warning_thresold = 6 # There are 7 bytes pending
109
94
        conn.cleanup_pipe()
110
 
        self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
 
95
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
96
                              'Got a 200 response when asking')
111
97
 
112
98
 
113
99
class TestRangeFileMixin(object):
121
107
 
122
108
    def test_can_read_at_first_access(self):
123
109
        """Test that the just created file can be read."""
124
 
        self.assertEqual(self.alpha, self._file.read())
 
110
        self.assertEquals(self.alpha, self._file.read())
125
111
 
126
112
    def test_seek_read(self):
127
113
        """Test seek/read inside the range."""
128
114
        f = self._file
129
115
        start = self.first_range_start
130
116
        # Before any use, tell() should be at the range start
131
 
        self.assertEqual(start, f.tell())
 
117
        self.assertEquals(start, f.tell())
132
118
        cur = start # For an overall offset assertion
133
119
        f.seek(start + 3)
134
120
        cur += 3
135
 
        self.assertEqual('def', f.read(3))
 
121
        self.assertEquals('def', f.read(3))
136
122
        cur += len('def')
137
123
        f.seek(4, 1)
138
124
        cur += 4
139
 
        self.assertEqual('klmn', f.read(4))
 
125
        self.assertEquals('klmn', f.read(4))
140
126
        cur += len('klmn')
141
127
        # read(0) in the middle of a range
142
 
        self.assertEqual('', f.read(0))
 
128
        self.assertEquals('', f.read(0))
143
129
        # seek in place
144
130
        here = f.tell()
145
131
        f.seek(0, 1)
146
 
        self.assertEqual(here, f.tell())
147
 
        self.assertEqual(cur, f.tell())
 
132
        self.assertEquals(here, f.tell())
 
133
        self.assertEquals(cur, f.tell())
148
134
 
149
135
    def test_read_zero(self):
150
136
        f = self._file
151
 
        self.assertEqual('', f.read(0))
 
137
        start = self.first_range_start
 
138
        self.assertEquals('', f.read(0))
152
139
        f.seek(10, 1)
153
 
        self.assertEqual('', f.read(0))
 
140
        self.assertEquals('', f.read(0))
154
141
 
155
142
    def test_seek_at_range_end(self):
156
143
        f = self._file
159
146
    def test_read_at_range_end(self):
160
147
        """Test read behaviour at range end."""
161
148
        f = self._file
162
 
        self.assertEqual(self.alpha, f.read())
163
 
        self.assertEqual('', f.read(0))
 
149
        self.assertEquals(self.alpha, f.read())
 
150
        self.assertEquals('', f.read(0))
164
151
        self.assertRaises(errors.InvalidRange, f.read, 1)
165
152
 
166
153
    def test_unbounded_read_after_seek(self):
167
154
        f = self._file
168
155
        f.seek(24, 1)
169
156
        # Should not cross ranges
170
 
        self.assertEqual('yz', f.read())
 
157
        self.assertEquals('yz', f.read())
171
158
 
172
159
    def test_seek_backwards(self):
173
160
        f = self._file
207
194
       """
208
195
       f = self._file
209
196
       f.seek(-2, 2)
210
 
       self.assertEqual('yz', f.read())
 
197
       self.assertEquals('yz', f.read())
211
198
 
212
199
 
213
200
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
217
    def test_read_at_range_end(self):
231
218
        """Test read behaviour at range end."""
232
219
        f = self._file
233
 
        self.assertEqual(self.alpha, f.read())
234
 
        self.assertEqual('', f.read(0))
235
 
        self.assertEqual('', f.read(1))
236
 
 
 
220
        self.assertEquals(self.alpha, f.read())
 
221
        self.assertEquals('', f.read(0))
 
222
        self.assertEquals('', f.read(1))
237
223
 
238
224
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
239
225
    """Test a RangeFile for a whole file whose size is known."""
263
249
        f._pos = 0 # Force an invalid pos
264
250
        self.assertRaises(errors.InvalidRange, f.read, 2)
265
251
 
266
 
 
267
252
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
268
253
    """Test a RangeFile for multiple ranges.
269
254
 
277
262
    fact) in real uses but may lead to hard to track bugs.
278
263
    """
279
264
 
280
 
    # The following is used to represent the boundary paramter defined
281
 
    # in HTTP response headers and the boundary lines that separate
282
 
    # multipart content.
283
 
 
284
 
    boundary = "separation"
285
 
 
286
265
    def setUp(self):
287
266
        super(TestRangeFileMultipleRanges, self).setUp()
288
267
 
289
 
        boundary = self.boundary
 
268
        boundary = 'separation'
290
269
 
291
270
        content = ''
292
271
        self.first_range_start = 25
298
277
            content += self._multipart_byterange(part, start, boundary,
299
278
                                                 file_size)
300
279
        # Final boundary
301
 
        content += self._boundary_line()
 
280
        content += self._boundary_line(boundary)
302
281
 
303
282
        self._file = response.RangeFile('Multiple_ranges_file',
304
283
                                        StringIO(content))
305
 
        self.set_file_boundary()
306
 
 
307
 
    def _boundary_line(self):
308
 
        """Helper to build the formatted boundary line."""
309
 
        return '--' + self.boundary + '\r\n'
310
 
 
311
 
    def set_file_boundary(self):
312
284
        # Ranges are set by decoding the range headers, the RangeFile user is
313
285
        # supposed to call the following before using seek or read since it
314
286
        # requires knowing the *response* headers (in that case the boundary
315
287
        # which is part of the Content-Type header).
316
 
        self._file.set_boundary(self.boundary)
 
288
        self._file.set_boundary(boundary)
 
289
 
 
290
    def _boundary_line(self, boundary):
 
291
        """Helper to build the formatted boundary line."""
 
292
        return '--' + boundary + '\r\n'
317
293
 
318
294
    def _multipart_byterange(self, data, offset, boundary, file_size='*'):
319
295
        """Encode a part of a file as a multipart/byterange MIME type.
331
307
        :return: a string containing the data encoded as it will appear in the
332
308
            HTTP response body.
333
309
        """
334
 
        bline = self._boundary_line()
 
310
        bline = self._boundary_line(boundary)
335
311
        # Each range begins with a boundary line
336
312
        range = bline
337
313
        # A range is described by a set of headers, but only 'Content-Range' is
347
323
 
348
324
    def test_read_all_ranges(self):
349
325
        f = self._file
350
 
        self.assertEqual(self.alpha, f.read()) # Read first range
 
326
        self.assertEquals(self.alpha, f.read()) # Read first range
351
327
        f.seek(100) # Trigger the second range recognition
352
 
        self.assertEqual(self.alpha, f.read()) # Read second range
353
 
        self.assertEqual(126, f.tell())
 
328
        self.assertEquals(self.alpha, f.read()) # Read second range
 
329
        self.assertEquals(126, f.tell())
354
330
        f.seek(126) # Start of third range which is also the current pos !
355
 
        self.assertEqual('A', f.read(1))
 
331
        self.assertEquals('A', f.read(1))
356
332
        f.seek(10, 1)
357
 
        self.assertEqual('LMN', f.read(3))
 
333
        self.assertEquals('LMN', f.read(3))
358
334
 
359
335
    def test_seek_from_end(self):
360
336
        """See TestRangeFileMixin.test_seek_from_end."""
364
340
        # behaviour.
365
341
        f = self._file
366
342
        f.seek(-2, 2)
367
 
        self.assertEqual('yz', f.read())
 
343
        self.assertEquals('yz', f.read())
368
344
        self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
369
345
 
370
346
    def test_seek_into_void(self):
381
357
 
382
358
    def test_seek_across_ranges(self):
383
359
        f = self._file
 
360
        start = self.first_range_start
384
361
        f.seek(126) # skip the two first ranges
385
 
        self.assertEqual('AB', f.read(2))
 
362
        self.assertEquals('AB', f.read(2))
386
363
 
387
364
    def test_checked_read_dont_overflow_buffers(self):
388
365
        f = self._file
 
366
        start = self.first_range_start
389
367
        # We force a very low value to exercise all code paths in _checked_read
390
368
        f._discarded_buf_size = 8
391
369
        f.seek(126) # skip the two first ranges
392
 
        self.assertEqual('AB', f.read(2))
 
370
        self.assertEquals('AB', f.read(2))
393
371
 
394
372
    def test_seek_twice_between_ranges(self):
395
373
        f = self._file
407
385
 
408
386
    def test_read_at_range_end(self):
409
387
        f = self._file
410
 
        self.assertEqual(self.alpha, f.read())
411
 
        self.assertEqual(self.alpha, f.read())
412
 
        self.assertEqual(self.alpha.upper(), f.read())
 
388
        self.assertEquals(self.alpha, f.read())
 
389
        self.assertEquals(self.alpha, f.read())
 
390
        self.assertEquals(self.alpha.upper(), f.read())
413
391
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
414
392
 
415
393
 
416
 
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
417
 
    """Perform the same tests as TestRangeFileMultipleRanges, but uses
418
 
    an angle-bracket quoted boundary string like IIS 6.0 and 7.0
419
 
    (but not IIS 5, which breaks the RFC in a different way
420
 
    by using square brackets, not angle brackets)
421
 
 
422
 
    This reveals a bug caused by
423
 
 
424
 
    - The bad implementation of RFC 822 unquoting in Python (angles are not
425
 
      quotes), coupled with
426
 
 
427
 
    - The bad implementation of RFC 2046 in IIS (angles are not permitted chars
428
 
      in boundary lines).
429
 
 
430
 
    """
431
 
    # The boundary as it appears in boundary lines
432
 
    # IIS 6 and 7 use this value
433
 
    _boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
434
 
    boundary = '<' + _boundary_trimmed + '>'
435
 
 
436
 
    def set_file_boundary(self):
437
 
        # Emulate broken rfc822.unquote() here by removing angles
438
 
        self._file.set_boundary(self._boundary_trimmed)
439
 
 
440
 
 
441
394
class TestRangeFileVarious(tests.TestCase):
442
395
    """Tests RangeFile aspects not covered elsewhere."""
443
396
 
458
411
        def ok(expected, header_value):
459
412
            f.set_range_from_header(header_value)
460
413
            # Slightly peek under the covers to get the size
461
 
            self.assertEqual(expected, (f.tell(), f._size))
 
414
            self.assertEquals(expected, (f.tell(), f._size))
462
415
 
463
416
        ok((1, 10), 'bytes 1-10/11')
464
417
        ok((1, 10), 'bytes 1-10/*')
800
753
        out.read()  # Read the whole range
801
754
        # Fail to find the boundary line
802
755
        self.assertRaises(errors.InvalidHttpResponse, out.seek, 1, 1)
803
 
 
804
 
 
805
 
class TestRangeFileSizeReadLimited(tests.TestCase):
806
 
    """Test RangeFile _max_read_size functionality which limits the size of
807
 
    read blocks to prevent MemoryError messages in socket.recv.
808
 
    """
809
 
 
810
 
    def setUp(self):
811
 
        super(TestRangeFileSizeReadLimited, self).setUp()
812
 
        # create a test datablock larger than _max_read_size.
813
 
        chunk_size = response.RangeFile._max_read_size
814
 
        test_pattern = '0123456789ABCDEF'
815
 
        self.test_data =  test_pattern * (3 * chunk_size / len(test_pattern))
816
 
        self.test_data_len = len(self.test_data)
817
 
 
818
 
    def test_max_read_size(self):
819
 
        """Read data in blocks and verify that the reads are not larger than
820
 
           the maximum read size.
821
 
        """
822
 
        # retrieve data in large blocks from response.RangeFile object
823
 
        mock_read_file = FakeReadFile(self.test_data)
824
 
        range_file = response.RangeFile('test_max_read_size', mock_read_file)
825
 
        response_data = range_file.read(self.test_data_len)
826
 
 
827
 
        # verify read size was equal to the maximum read size
828
 
        self.assertTrue(mock_read_file.get_max_read_size() > 0)
829
 
        self.assertEqual(mock_read_file.get_max_read_size(),
830
 
                         response.RangeFile._max_read_size)
831
 
        self.assertEqual(mock_read_file.get_read_count(), 3)
832
 
 
833
 
        # report error if the data wasn't equal (we only report the size due
834
 
        # to the length of the data)
835
 
        if response_data != self.test_data:
836
 
            message = "Data not equal.  Expected %d bytes, received %d."
837
 
            self.fail(message % (len(response_data), self.test_data_len))
838