~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http_response.py

  • Committer: Neil Martinsen-Burrell
  • Date: 2008-06-19 06:57:22 UTC
  • mto: (3505.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 3506.
  • Revision ID: nmb@wartburg.edu-20080619065722-dboa3ko7ap8dcf2p
Fix bug 228058: user names with @ signs should work

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests from HTTP response parsing.
18
18
 
75
75
        pass
76
76
 
77
77
 
78
 
class TestResponseFileIter(tests.TestCase):
79
 
 
80
 
    def test_iter_empty(self):
81
 
        f = response.ResponseFile('empty', StringIO())
82
 
        self.assertEqual([], list(f))
83
 
 
84
 
    def test_iter_many(self):
85
 
        f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
86
 
        self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
87
 
 
88
 
 
89
78
class TestHTTPConnection(tests.TestCase):
90
79
 
91
80
    def test_cleanup_pipe(self):
103
92
        # Now, get the response
104
93
        resp = conn.getresponse()
105
94
        # Read part of the response
106
 
        self.assertEqual('0123456789\n', resp.read(11))
 
95
        self.assertEquals('0123456789\n', resp.read(11))
107
96
        # Override the thresold to force the warning emission
108
97
        conn._range_warning_thresold = 6 # There are 7 bytes pending
109
98
        conn.cleanup_pipe()
110
 
        self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
 
99
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
100
                              'Got a 200 response when asking')
111
101
 
112
102
 
113
103
class TestRangeFileMixin(object):
121
111
 
122
112
    def test_can_read_at_first_access(self):
123
113
        """Test that the just created file can be read."""
124
 
        self.assertEqual(self.alpha, self._file.read())
 
114
        self.assertEquals(self.alpha, self._file.read())
125
115
 
126
116
    def test_seek_read(self):
127
117
        """Test seek/read inside the range."""
128
118
        f = self._file
129
119
        start = self.first_range_start
130
120
        # Before any use, tell() should be at the range start
131
 
        self.assertEqual(start, f.tell())
 
121
        self.assertEquals(start, f.tell())
132
122
        cur = start # For an overall offset assertion
133
123
        f.seek(start + 3)
134
124
        cur += 3
135
 
        self.assertEqual('def', f.read(3))
 
125
        self.assertEquals('def', f.read(3))
136
126
        cur += len('def')
137
127
        f.seek(4, 1)
138
128
        cur += 4
139
 
        self.assertEqual('klmn', f.read(4))
 
129
        self.assertEquals('klmn', f.read(4))
140
130
        cur += len('klmn')
141
131
        # read(0) in the middle of a range
142
 
        self.assertEqual('', f.read(0))
 
132
        self.assertEquals('', f.read(0))
143
133
        # seek in place
144
134
        here = f.tell()
145
135
        f.seek(0, 1)
146
 
        self.assertEqual(here, f.tell())
147
 
        self.assertEqual(cur, f.tell())
 
136
        self.assertEquals(here, f.tell())
 
137
        self.assertEquals(cur, f.tell())
148
138
 
149
139
    def test_read_zero(self):
150
140
        f = self._file
151
 
        self.assertEqual('', f.read(0))
 
141
        start = self.first_range_start
 
142
        self.assertEquals('', f.read(0))
152
143
        f.seek(10, 1)
153
 
        self.assertEqual('', f.read(0))
 
144
        self.assertEquals('', f.read(0))
154
145
 
155
146
    def test_seek_at_range_end(self):
156
147
        f = self._file
159
150
    def test_read_at_range_end(self):
160
151
        """Test read behaviour at range end."""
161
152
        f = self._file
162
 
        self.assertEqual(self.alpha, f.read())
163
 
        self.assertEqual('', f.read(0))
 
153
        self.assertEquals(self.alpha, f.read())
 
154
        self.assertEquals('', f.read(0))
164
155
        self.assertRaises(errors.InvalidRange, f.read, 1)
165
156
 
166
157
    def test_unbounded_read_after_seek(self):
167
158
        f = self._file
168
159
        f.seek(24, 1)
169
160
        # Should not cross ranges
170
 
        self.assertEqual('yz', f.read())
 
161
        self.assertEquals('yz', f.read())
171
162
 
172
163
    def test_seek_backwards(self):
173
164
        f = self._file
207
198
       """
208
199
       f = self._file
209
200
       f.seek(-2, 2)
210
 
       self.assertEqual('yz', f.read())
 
201
       self.assertEquals('yz', f.read())
211
202
 
212
203
 
213
204
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
230
221
    def test_read_at_range_end(self):
231
222
        """Test read behaviour at range end."""
232
223
        f = self._file
233
 
        self.assertEqual(self.alpha, f.read())
234
 
        self.assertEqual('', f.read(0))
235
 
        self.assertEqual('', f.read(1))
236
 
 
 
224
        self.assertEquals(self.alpha, f.read())
 
225
        self.assertEquals('', f.read(0))
 
226
        self.assertEquals('', f.read(1))
237
227
 
238
228
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
239
229
    """Test a RangeFile for a whole file whose size is known."""
263
253
        f._pos = 0 # Force an invalid pos
264
254
        self.assertRaises(errors.InvalidRange, f.read, 2)
265
255
 
266
 
 
267
256
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
268
257
    """Test a RangeFile for multiple ranges.
269
258
 
277
266
    fact) in real uses but may lead to hard to track bugs.
278
267
    """
279
268
 
280
 
    # The following is used to represent the boundary paramter defined
281
 
    # in HTTP response headers and the boundary lines that separate
282
 
    # multipart content.
283
 
 
284
 
    boundary = "separation"
285
 
 
286
269
    def setUp(self):
287
270
        super(TestRangeFileMultipleRanges, self).setUp()
288
271
 
289
 
        boundary = self.boundary
 
272
        boundary = 'separation'
290
273
 
291
274
        content = ''
292
275
        self.first_range_start = 25
298
281
            content += self._multipart_byterange(part, start, boundary,
299
282
                                                 file_size)
300
283
        # Final boundary
301
 
        content += self._boundary_line()
 
284
        content += self._boundary_line(boundary)
302
285
 
303
286
        self._file = response.RangeFile('Multiple_ranges_file',
304
287
                                        StringIO(content))
305
 
        self.set_file_boundary()
306
 
 
307
 
    def _boundary_line(self):
308
 
        """Helper to build the formatted boundary line."""
309
 
        return '--' + self.boundary + '\r\n'
310
 
 
311
 
    def set_file_boundary(self):
312
288
        # Ranges are set by decoding the range headers, the RangeFile user is
313
289
        # supposed to call the following before using seek or read since it
314
290
        # requires knowing the *response* headers (in that case the boundary
315
291
        # which is part of the Content-Type header).
316
 
        self._file.set_boundary(self.boundary)
 
292
        self._file.set_boundary(boundary)
 
293
 
 
294
    def _boundary_line(self, boundary):
 
295
        """Helper to build the formatted boundary line."""
 
296
        return '--' + boundary + '\r\n'
317
297
 
318
298
    def _multipart_byterange(self, data, offset, boundary, file_size='*'):
319
299
        """Encode a part of a file as a multipart/byterange MIME type.
331
311
        :return: a string containing the data encoded as it will appear in the
332
312
            HTTP response body.
333
313
        """
334
 
        bline = self._boundary_line()
 
314
        bline = self._boundary_line(boundary)
335
315
        # Each range begins with a boundary line
336
316
        range = bline
337
317
        # A range is described by a set of headers, but only 'Content-Range' is
347
327
 
348
328
    def test_read_all_ranges(self):
349
329
        f = self._file
350
 
        self.assertEqual(self.alpha, f.read()) # Read first range
 
330
        self.assertEquals(self.alpha, f.read()) # Read first range
351
331
        f.seek(100) # Trigger the second range recognition
352
 
        self.assertEqual(self.alpha, f.read()) # Read second range
353
 
        self.assertEqual(126, f.tell())
 
332
        self.assertEquals(self.alpha, f.read()) # Read second range
 
333
        self.assertEquals(126, f.tell())
354
334
        f.seek(126) # Start of third range which is also the current pos !
355
 
        self.assertEqual('A', f.read(1))
 
335
        self.assertEquals('A', f.read(1))
356
336
        f.seek(10, 1)
357
 
        self.assertEqual('LMN', f.read(3))
 
337
        self.assertEquals('LMN', f.read(3))
358
338
 
359
339
    def test_seek_from_end(self):
360
340
        """See TestRangeFileMixin.test_seek_from_end."""
364
344
        # behaviour.
365
345
        f = self._file
366
346
        f.seek(-2, 2)
367
 
        self.assertEqual('yz', f.read())
 
347
        self.assertEquals('yz', f.read())
368
348
        self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
369
349
 
370
350
    def test_seek_into_void(self):
381
361
 
382
362
    def test_seek_across_ranges(self):
383
363
        f = self._file
 
364
        start = self.first_range_start
384
365
        f.seek(126) # skip the two first ranges
385
 
        self.assertEqual('AB', f.read(2))
 
366
        self.assertEquals('AB', f.read(2))
386
367
 
387
368
    def test_checked_read_dont_overflow_buffers(self):
388
369
        f = self._file
 
370
        start = self.first_range_start
389
371
        # We force a very low value to exercise all code paths in _checked_read
390
372
        f._discarded_buf_size = 8
391
373
        f.seek(126) # skip the two first ranges
392
 
        self.assertEqual('AB', f.read(2))
 
374
        self.assertEquals('AB', f.read(2))
393
375
 
394
376
    def test_seek_twice_between_ranges(self):
395
377
        f = self._file
407
389
 
408
390
    def test_read_at_range_end(self):
409
391
        f = self._file
410
 
        self.assertEqual(self.alpha, f.read())
411
 
        self.assertEqual(self.alpha, f.read())
412
 
        self.assertEqual(self.alpha.upper(), f.read())
 
392
        self.assertEquals(self.alpha, f.read())
 
393
        self.assertEquals(self.alpha, f.read())
 
394
        self.assertEquals(self.alpha.upper(), f.read())
413
395
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
414
396
 
415
397
 
416
 
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
417
 
    """Perform the same tests as TestRangeFileMultipleRanges, but uses
418
 
    an angle-bracket quoted boundary string like IIS 6.0 and 7.0
419
 
    (but not IIS 5, which breaks the RFC in a different way
420
 
    by using square brackets, not angle brackets)
421
 
 
422
 
    This reveals a bug caused by
423
 
 
424
 
    - The bad implementation of RFC 822 unquoting in Python (angles are not
425
 
      quotes), coupled with
426
 
 
427
 
    - The bad implementation of RFC 2046 in IIS (angles are not permitted chars
428
 
      in boundary lines).
429
 
 
430
 
    """
431
 
    # The boundary as it appears in boundary lines
432
 
    # IIS 6 and 7 use this value
433
 
    _boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
434
 
    boundary = '<' + _boundary_trimmed + '>'
435
 
 
436
 
    def set_file_boundary(self):
437
 
        # Emulate broken rfc822.unquote() here by removing angles
438
 
        self._file.set_boundary(self._boundary_trimmed)
439
 
 
440
 
 
441
398
class TestRangeFileVarious(tests.TestCase):
442
399
    """Tests RangeFile aspects not covered elsewhere."""
443
400
 
458
415
        def ok(expected, header_value):
459
416
            f.set_range_from_header(header_value)
460
417
            # Slightly peek under the covers to get the size
461
 
            self.assertEqual(expected, (f.tell(), f._size))
 
418
            self.assertEquals(expected, (f.tell(), f._size))
462
419
 
463
420
        ok((1, 10), 'bytes 1-10/11')
464
421
        ok((1, 10), 'bytes 1-10/*')
808
765
    """
809
766
 
810
767
    def setUp(self):
811
 
        super(TestRangeFileSizeReadLimited, self).setUp()
812
768
        # create a test datablock larger than _max_read_size.
813
769
        chunk_size = response.RangeFile._max_read_size
814
770
        test_pattern = '0123456789ABCDEF'