~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http_response.py

  • Committer: Jelmer Vernooij
  • Date: 2016-04-03 16:32:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6617.
  • Revision ID: jelmer@jelmer.uk-20160403163231-h72bo0uyek2gikw0
Don't put French text in doc/en/user-reference when LANGUAGE=fr_CH.UTF_8.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2010, 2012, 2013, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
75
75
        pass
76
76
 
77
77
 
 
78
class TestResponseFileIter(tests.TestCase):
 
79
 
 
80
    def test_iter_empty(self):
 
81
        f = response.ResponseFile('empty', StringIO())
 
82
        self.assertEqual([], list(f))
 
83
 
 
84
    def test_iter_many(self):
 
85
        f = response.ResponseFile('many', StringIO('0\n1\nboo!\n'))
 
86
        self.assertEqual(['0\n', '1\n', 'boo!\n'], list(f))
 
87
 
 
88
 
78
89
class TestHTTPConnection(tests.TestCase):
79
90
 
80
91
    def test_cleanup_pipe(self):
92
103
        # Now, get the response
93
104
        resp = conn.getresponse()
94
105
        # Read part of the response
95
 
        self.assertEquals('0123456789\n', resp.read(11))
 
106
        self.assertEqual('0123456789\n', resp.read(11))
96
107
        # Override the thresold to force the warning emission
97
108
        conn._range_warning_thresold = 6 # There are 7 bytes pending
98
109
        conn.cleanup_pipe()
110
121
 
111
122
    def test_can_read_at_first_access(self):
112
123
        """Test that the just created file can be read."""
113
 
        self.assertEquals(self.alpha, self._file.read())
 
124
        self.assertEqual(self.alpha, self._file.read())
114
125
 
115
126
    def test_seek_read(self):
116
127
        """Test seek/read inside the range."""
117
128
        f = self._file
118
129
        start = self.first_range_start
119
130
        # Before any use, tell() should be at the range start
120
 
        self.assertEquals(start, f.tell())
 
131
        self.assertEqual(start, f.tell())
121
132
        cur = start # For an overall offset assertion
122
133
        f.seek(start + 3)
123
134
        cur += 3
124
 
        self.assertEquals('def', f.read(3))
 
135
        self.assertEqual('def', f.read(3))
125
136
        cur += len('def')
126
137
        f.seek(4, 1)
127
138
        cur += 4
128
 
        self.assertEquals('klmn', f.read(4))
 
139
        self.assertEqual('klmn', f.read(4))
129
140
        cur += len('klmn')
130
141
        # read(0) in the middle of a range
131
 
        self.assertEquals('', f.read(0))
 
142
        self.assertEqual('', f.read(0))
132
143
        # seek in place
133
144
        here = f.tell()
134
145
        f.seek(0, 1)
135
 
        self.assertEquals(here, f.tell())
136
 
        self.assertEquals(cur, f.tell())
 
146
        self.assertEqual(here, f.tell())
 
147
        self.assertEqual(cur, f.tell())
137
148
 
138
149
    def test_read_zero(self):
139
150
        f = self._file
140
 
        start = self.first_range_start
141
 
        self.assertEquals('', f.read(0))
 
151
        self.assertEqual('', f.read(0))
142
152
        f.seek(10, 1)
143
 
        self.assertEquals('', f.read(0))
 
153
        self.assertEqual('', f.read(0))
144
154
 
145
155
    def test_seek_at_range_end(self):
146
156
        f = self._file
149
159
    def test_read_at_range_end(self):
150
160
        """Test read behaviour at range end."""
151
161
        f = self._file
152
 
        self.assertEquals(self.alpha, f.read())
153
 
        self.assertEquals('', f.read(0))
 
162
        self.assertEqual(self.alpha, f.read())
 
163
        self.assertEqual('', f.read(0))
154
164
        self.assertRaises(errors.InvalidRange, f.read, 1)
155
165
 
156
166
    def test_unbounded_read_after_seek(self):
157
167
        f = self._file
158
168
        f.seek(24, 1)
159
169
        # Should not cross ranges
160
 
        self.assertEquals('yz', f.read())
 
170
        self.assertEqual('yz', f.read())
161
171
 
162
172
    def test_seek_backwards(self):
163
173
        f = self._file
197
207
       """
198
208
       f = self._file
199
209
       f.seek(-2, 2)
200
 
       self.assertEquals('yz', f.read())
 
210
       self.assertEqual('yz', f.read())
201
211
 
202
212
 
203
213
class TestRangeFileSizeUnknown(tests.TestCase, TestRangeFileMixin):
220
230
    def test_read_at_range_end(self):
221
231
        """Test read behaviour at range end."""
222
232
        f = self._file
223
 
        self.assertEquals(self.alpha, f.read())
224
 
        self.assertEquals('', f.read(0))
225
 
        self.assertEquals('', f.read(1))
 
233
        self.assertEqual(self.alpha, f.read())
 
234
        self.assertEqual('', f.read(0))
 
235
        self.assertEqual('', f.read(1))
226
236
 
227
237
 
228
238
class TestRangeFileSizeKnown(tests.TestCase, TestRangeFileMixin):
337
347
 
338
348
    def test_read_all_ranges(self):
339
349
        f = self._file
340
 
        self.assertEquals(self.alpha, f.read()) # Read first range
 
350
        self.assertEqual(self.alpha, f.read()) # Read first range
341
351
        f.seek(100) # Trigger the second range recognition
342
 
        self.assertEquals(self.alpha, f.read()) # Read second range
343
 
        self.assertEquals(126, f.tell())
 
352
        self.assertEqual(self.alpha, f.read()) # Read second range
 
353
        self.assertEqual(126, f.tell())
344
354
        f.seek(126) # Start of third range which is also the current pos !
345
 
        self.assertEquals('A', f.read(1))
 
355
        self.assertEqual('A', f.read(1))
346
356
        f.seek(10, 1)
347
 
        self.assertEquals('LMN', f.read(3))
 
357
        self.assertEqual('LMN', f.read(3))
348
358
 
349
359
    def test_seek_from_end(self):
350
360
        """See TestRangeFileMixin.test_seek_from_end."""
354
364
        # behaviour.
355
365
        f = self._file
356
366
        f.seek(-2, 2)
357
 
        self.assertEquals('yz', f.read())
 
367
        self.assertEqual('yz', f.read())
358
368
        self.assertRaises(errors.InvalidRange, f.seek, -2, 2)
359
369
 
360
370
    def test_seek_into_void(self):
371
381
 
372
382
    def test_seek_across_ranges(self):
373
383
        f = self._file
374
 
        start = self.first_range_start
375
384
        f.seek(126) # skip the two first ranges
376
 
        self.assertEquals('AB', f.read(2))
 
385
        self.assertEqual('AB', f.read(2))
377
386
 
378
387
    def test_checked_read_dont_overflow_buffers(self):
379
388
        f = self._file
380
 
        start = self.first_range_start
381
389
        # We force a very low value to exercise all code paths in _checked_read
382
390
        f._discarded_buf_size = 8
383
391
        f.seek(126) # skip the two first ranges
384
 
        self.assertEquals('AB', f.read(2))
 
392
        self.assertEqual('AB', f.read(2))
385
393
 
386
394
    def test_seek_twice_between_ranges(self):
387
395
        f = self._file
399
407
 
400
408
    def test_read_at_range_end(self):
401
409
        f = self._file
402
 
        self.assertEquals(self.alpha, f.read())
403
 
        self.assertEquals(self.alpha, f.read())
404
 
        self.assertEquals(self.alpha.upper(), f.read())
 
410
        self.assertEqual(self.alpha, f.read())
 
411
        self.assertEqual(self.alpha, f.read())
 
412
        self.assertEqual(self.alpha.upper(), f.read())
405
413
        self.assertRaises(errors.InvalidHttpResponse, f.read, 1)
406
414
 
407
415
 
450
458
        def ok(expected, header_value):
451
459
            f.set_range_from_header(header_value)
452
460
            # Slightly peek under the covers to get the size
453
 
            self.assertEquals(expected, (f.tell(), f._size))
 
461
            self.assertEqual(expected, (f.tell(), f._size))
454
462
 
455
463
        ok((1, 10), 'bytes 1-10/11')
456
464
        ok((1, 10), 'bytes 1-10/*')
800
808
    """
801
809
 
802
810
    def setUp(self):
803
 
        tests.TestCase.setUp(self)
 
811
        super(TestRangeFileSizeReadLimited, self).setUp()
804
812
        # create a test datablock larger than _max_read_size.
805
813
        chunk_size = response.RangeFile._max_read_size
806
814
        test_pattern = '0123456789ABCDEF'