~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http_response.py

  • Committer: Jelmer Vernooij
  • Date: 2008-06-11 18:58:19 UTC
  • mto: (3649.3.2 bzr.dev)
  • mto: This revision was merged to the branch mainline in revision 3658.
  • Revision ID: jelmer@samba.org-20080611185819-o4shi1ranh9zh01e
Move ftp transport into separate directory.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests from HTTP response parsing.
18
18
 
48
48
    response,
49
49
    _urllib2_wrappers,
50
50
    )
51
 
from bzrlib.tests.file_utils import (
52
 
    FakeReadFile,
53
 
    )
54
51
 
55
52
 
56
53
class ReadSocket(object):
62
59
    def makefile(self, mode='r', bufsize=None):
63
60
        return self.readfile
64
61
 
65
 
 
66
62
class FakeHTTPConnection(_urllib2_wrappers.HTTPConnection):
67
63
 
68
64
    def __init__(self, sock):
96
92
        # Override the thresold to force the warning emission
97
93
        conn._range_warning_thresold = 6 # There are 7 bytes pending
98
94
        conn.cleanup_pipe()
99
 
        self.assertContainsRe(self.get_log(), 'Got a 200 response when asking')
 
95
        self.assertContainsRe(self._get_log(keep_log_file=True),
 
96
                              'Got a 200 response when asking')
100
97
 
101
98
 
102
99
class TestRangeFileMixin(object):
224
221
        self.assertEquals('', f.read(0))
225
222
        self.assertEquals('', f.read(1))
226
223
 
227
 
 
228
224
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
229
225
    """Test a RangeFile for a whole file whose size is known."""
230
226
 
253
249
        f._pos = 0 # Force an invalid pos
254
250
        self.assertRaises(errors.InvalidRange, f.read, 2)
255
251
 
256
 
 
257
252
class TestRangeFileMultipleRanges(tests.TestCase, TestRangeFileMixin):
258
253
    """Test a RangeFile for multiple ranges.
259
254
 
267
262
    fact) in real uses but may lead to hard to track bugs.
268
263
    """
269
264
 
270
 
    # The following is used to represent the boundary paramter defined
271
 
    # in HTTP response headers and the boundary lines that separate
272
 
    # multipart content.
273
 
 
274
 
    boundary = "separation"
275
 
 
276
265
    def setUp(self):
277
266
        super(TestRangeFileMultipleRanges, self).setUp()
278
267
 
279
 
        boundary = self.boundary
 
268
        boundary = 'separation'
280
269
 
281
270
        content = ''
282
271
        self.first_range_start = 25
288
277
            content += self._multipart_byterange(part, start, boundary,
289
278
                                                 file_size)
290
279
        # Final boundary
291
 
        content += self._boundary_line()
 
280
        content += self._boundary_line(boundary)
292
281
 
293
282
        self._file = response.RangeFile('Multiple_ranges_file',
294
283
                                        StringIO(content))
295
 
        self.set_file_boundary()
296
 
 
297
 
    def _boundary_line(self):
298
 
        """Helper to build the formatted boundary line."""
299
 
        return '--' + self.boundary + '\r\n'
300
 
 
301
 
    def set_file_boundary(self):
302
284
        # Ranges are set by decoding the range headers, the RangeFile user is
303
285
        # supposed to call the following before using seek or read since it
304
286
        # requires knowing the *response* headers (in that case the boundary
305
287
        # which is part of the Content-Type header).
306
 
        self._file.set_boundary(self.boundary)
 
288
        self._file.set_boundary(boundary)
 
289
 
 
290
    def _boundary_line(self, boundary):
 
291
        """Helper to build the formatted boundary line."""
 
292
        return '--' + boundary + '\r\n'
307
293
 
308
294
    def _multipart_byterange(self, data, offset, boundary, file_size='*'):
309
295
        """Encode a part of a file as a multipart/byterange MIME type.
321
307
        :return: a string containing the data encoded as it will appear in the
322
308
            HTTP response body.
323
309
        """
324
 
        bline = self._boundary_line()
 
310
        bline = self._boundary_line(boundary)
325
311
        # Each range begins with a boundary line
326
312
        range = bline
327
313
        # A range is described by a set of headers, but only 'Content-Range' is
405
391
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
406
392
 
407
393
 
408
 
class TestRangeFileMultipleRangesQuotedBoundaries(TestRangeFileMultipleRanges):
409
 
    """Perform the same tests as TestRangeFileMultipleRanges, but uses
410
 
    an angle-bracket quoted boundary string like IIS 6.0 and 7.0
411
 
    (but not IIS 5, which breaks the RFC in a different way
412
 
    by using square brackets, not angle brackets)
413
 
 
414
 
    This reveals a bug caused by
415
 
 
416
 
    - The bad implementation of RFC 822 unquoting in Python (angles are not
417
 
      quotes), coupled with
418
 
 
419
 
    - The bad implementation of RFC 2046 in IIS (angles are not permitted chars
420
 
      in boundary lines).
421
 
 
422
 
    """
423
 
    # The boundary as it appears in boundary lines
424
 
    # IIS 6 and 7 use this value
425
 
    _boundary_trimmed = "q1w2e3r4t5y6u7i8o9p0zaxscdvfbgnhmjklkl"
426
 
    boundary = '<' + _boundary_trimmed + '>'
427
 
 
428
 
    def set_file_boundary(self):
429
 
        # Emulate broken rfc822.unquote() here by removing angles
430
 
        self._file.set_boundary(self._boundary_trimmed)
431
 
 
432
 
 
433
394
class TestRangeFileVarious(tests.TestCase):
434
395
    """Tests RangeFile aspects not covered elsewhere."""
435
396
 
792
753
        out.read()  # Read the whole range
793
754
        # Fail to find the boundary line
794
755
        self.assertRaises(errors.InvalidHttpResponse, out.seek, 1, 1)
795
 
 
796
 
 
797
 
class TestRangeFileSizeReadLimited(tests.TestCase):
798
 
    """Test RangeFile _max_read_size functionality which limits the size of
799
 
    read blocks to prevent MemoryError messages in socket.recv.
800
 
    """
801
 
 
802
 
    def setUp(self):
803
 
        tests.TestCase.setUp(self)
804
 
        # create a test datablock larger than _max_read_size.
805
 
        chunk_size = response.RangeFile._max_read_size
806
 
        test_pattern = '0123456789ABCDEF'
807
 
        self.test_data =  test_pattern * (3 * chunk_size / len(test_pattern))
808
 
        self.test_data_len = len(self.test_data)
809
 
 
810
 
    def test_max_read_size(self):
811
 
        """Read data in blocks and verify that the reads are not larger than
812
 
           the maximum read size.
813
 
        """
814
 
        # retrieve data in large blocks from response.RangeFile object
815
 
        mock_read_file = FakeReadFile(self.test_data)
816
 
        range_file = response.RangeFile('test_max_read_size', mock_read_file)
817
 
        response_data = range_file.read(self.test_data_len)
818
 
 
819
 
        # verify read size was equal to the maximum read size
820
 
        self.assertTrue(mock_read_file.get_max_read_size() > 0)
821
 
        self.assertEqual(mock_read_file.get_max_read_size(),
822
 
                         response.RangeFile._max_read_size)
823
 
        self.assertEqual(mock_read_file.get_read_count(), 3)
824
 
 
825
 
        # report error if the data wasn't equal (we only report the size due
826
 
        # to the length of the data)
827
 
        if response_data != self.test_data:
828
 
            message = "Data not equal.  Expected %d bytes, received %d."
829
 
            self.fail(message % (len(response_data), self.test_data_len))
830