~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_http.py

  • Committer: Jonathan Riddell
  • Date: 2011-05-16 11:27:37 UTC
  • mto: This revision was merged to the branch mainline in revision 5869.
  • Revision ID: jriddell@canonical.com-20110516112737-gep642p24rtzp3jt
userĀ guideĀ licence

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2012, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
 
32
32
import bzrlib
33
33
from bzrlib import (
 
34
    bzrdir,
 
35
    cethread,
34
36
    config,
35
 
    controldir,
36
 
    debug,
37
37
    errors,
38
38
    osutils,
39
39
    remote as _mod_remote,
40
40
    tests,
41
 
    trace,
42
41
    transport,
43
42
    ui,
44
43
    )
92
91
        ]
93
92
 
94
93
 
 
94
def vary_by_http_proxy_auth_scheme():
 
95
    return [
 
96
        ('basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
 
97
        ('digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
 
98
        ('basicdigest',
 
99
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
 
100
        ]
 
101
 
 
102
 
95
103
def vary_by_http_auth_scheme():
96
 
    scenarios = [
 
104
    return [
97
105
        ('basic', dict(_auth_server=http_utils.HTTPBasicAuthServer)),
98
106
        ('digest', dict(_auth_server=http_utils.HTTPDigestAuthServer)),
99
107
        ('basicdigest',
100
108
            dict(_auth_server=http_utils.HTTPBasicAndDigestAuthServer)),
101
109
        ]
102
 
    # Add some attributes common to all scenarios
103
 
    for scenario_id, scenario_dict in scenarios:
104
 
        scenario_dict.update(_auth_header='Authorization',
105
 
                             _username_prompt_prefix='',
106
 
                             _password_prompt_prefix='')
107
 
    return scenarios
108
 
 
109
 
 
110
 
def vary_by_http_proxy_auth_scheme():
111
 
    scenarios = [
112
 
        ('proxy-basic', dict(_auth_server=http_utils.ProxyBasicAuthServer)),
113
 
        ('proxy-digest', dict(_auth_server=http_utils.ProxyDigestAuthServer)),
114
 
        ('proxy-basicdigest',
115
 
            dict(_auth_server=http_utils.ProxyBasicAndDigestAuthServer)),
116
 
        ]
117
 
    # Add some attributes common to all scenarios
118
 
    for scenario_id, scenario_dict in scenarios:
119
 
        scenario_dict.update(_auth_header='Proxy-Authorization',
120
 
                             _username_prompt_prefix='Proxy ',
121
 
                             _password_prompt_prefix='Proxy ')
122
 
    return scenarios
123
110
 
124
111
 
125
112
def vary_by_http_activity():
127
114
        ('urllib,http', dict(_activity_server=ActivityHTTPServer,
128
115
                            _transport=_urllib.HttpTransport_urllib,)),
129
116
        ]
 
117
    if tests.HTTPSServerFeature.available():
 
118
        activity_scenarios.append(
 
119
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
 
120
                                _transport=_urllib.HttpTransport_urllib,)),)
130
121
    if features.pycurl.available():
131
122
        activity_scenarios.append(
132
123
            ('pycurl,http', dict(_activity_server=ActivityHTTPServer,
133
124
                                _transport=PyCurlTransport,)),)
134
 
    if features.HTTPSServerFeature.available():
135
 
        # FIXME: Until we have a better way to handle self-signed certificates
136
 
        # (like allowing them in a test specific authentication.conf for
137
 
        # example), we need some specialized pycurl/urllib transport for tests.
138
 
        # -- vila 2012-01-20
139
 
        from bzrlib.tests import (
140
 
            ssl_certs,
141
 
            )
142
 
        class HTTPS_urllib_transport(_urllib.HttpTransport_urllib):
143
 
 
144
 
            def __init__(self, base, _from_transport=None):
145
 
                super(HTTPS_urllib_transport, self).__init__(
146
 
                    base, _from_transport=_from_transport,
147
 
                    ca_certs=ssl_certs.build_path('ca.crt'))
148
 
 
149
 
        activity_scenarios.append(
150
 
            ('urllib,https', dict(_activity_server=ActivityHTTPSServer,
151
 
                                  _transport=HTTPS_urllib_transport,)),)
152
 
        if features.pycurl.available():
 
125
        if tests.HTTPSServerFeature.available():
 
126
            from bzrlib.tests import (
 
127
                ssl_certs,
 
128
                )
 
129
            # FIXME: Until we have a better way to handle self-signed
 
130
            # certificates (like allowing them in a test specific
 
131
            # authentication.conf for example), we need some specialized pycurl
 
132
            # transport for tests.
153
133
            class HTTPS_pycurl_transport(PyCurlTransport):
154
134
 
155
135
                def __init__(self, base, _from_transport=None):
288
268
        self.req_handler = RequestHandler(None, None, None)
289
269
 
290
270
    def assertRanges(self, ranges, header, file_size):
291
 
        self.assertEqual(ranges,
 
271
        self.assertEquals(ranges,
292
272
                          self.req_handler._parse_ranges(header, file_size))
293
273
 
294
274
    def test_simple_range(self):
384
364
    _transport = property(_get_pycurl_maybe)
385
365
 
386
366
 
 
367
class TestHttpUrls(tests.TestCase):
 
368
 
 
369
    # TODO: This should be moved to authorization tests once they
 
370
    # are written.
 
371
 
 
372
    def test_url_parsing(self):
 
373
        f = FakeManager()
 
374
        url = http.extract_auth('http://example.com', f)
 
375
        self.assertEqual('http://example.com', url)
 
376
        self.assertEqual(0, len(f.credentials))
 
377
        url = http.extract_auth(
 
378
            'http://user:pass@example.com/bzr/bzr.dev', f)
 
379
        self.assertEqual('http://example.com/bzr/bzr.dev', url)
 
380
        self.assertEqual(1, len(f.credentials))
 
381
        self.assertEqual([None, 'example.com', 'user', 'pass'],
 
382
                         f.credentials[0])
 
383
 
 
384
 
387
385
class TestHttpTransportUrls(tests.TestCase):
388
386
    """Test the http urls."""
389
387
 
469
467
        )
470
468
 
471
469
    def setUp(self):
472
 
        super(TestHTTPConnections, self).setUp()
 
470
        http_utils.TestCaseWithWebserver.setUp(self)
473
471
        self.build_tree(['foo/', 'foo/bar'], line_endings='binary',
474
472
                        transport=self.get_transport())
475
473
 
521
519
    scenarios = vary_by_http_client_implementation()
522
520
 
523
521
    def test_http_registered(self):
524
 
        t = transport.get_transport_from_url(
525
 
            '%s://foo.com/' % self._url_protocol)
 
522
        t = transport.get_transport('%s://foo.com/' % self._url_protocol)
526
523
        self.assertIsInstance(t, transport.Transport)
527
524
        self.assertIsInstance(t, self._transport)
528
525
 
540
537
        self.start_server(server)
541
538
        url = server.get_url()
542
539
        # FIXME: needs a cleanup -- vila 20100611
543
 
        http_transport = transport.get_transport_from_url(url)
 
540
        http_transport = transport.get_transport(url)
544
541
        code, response = http_transport._post('abc def end-of-body')
545
542
        self.assertTrue(
546
543
            server.received_bytes.startswith('POST /.bzr/smart HTTP/1.'))
656
653
 
657
654
    _req_handler_class = BadStatusRequestHandler
658
655
 
659
 
    def setUp(self):
660
 
        super(TestBadStatusServer, self).setUp()
661
 
        # See https://bugs.launchpad.net/bzr/+bug/1451448 for details.
662
 
        # TD;LR: Running both a TCP client and server in the same process and
663
 
        # thread uncovers a race in python. The fix is to run the server in a
664
 
        # different process. Trying to fix yet another race here is not worth
665
 
        # the effort. -- vila 2015-09-06
666
 
        if 'HTTP/1.0' in self.id():
667
 
            raise tests.TestSkipped(
668
 
                'Client/Server in the same process and thread can hang')
669
 
 
670
656
    def test_http_has(self):
671
657
        t = self.get_readonly_transport()
672
 
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
673
 
                           errors.InvalidHttpResponse),
674
 
                          t.has, 'foo/bar')
 
658
        self.assertRaises(errors.InvalidHttpResponse, t.has, 'foo/bar')
675
659
 
676
660
    def test_http_get(self):
677
661
        t = self.get_readonly_transport()
678
 
        self.assertRaises((errors.ConnectionError, errors.ConnectionReset,
679
 
                           errors.InvalidHttpResponse),
680
 
                          t.get, 'foo/bar')
 
662
        self.assertRaises(errors.InvalidHttpResponse, t.get, 'foo/bar')
681
663
 
682
664
 
683
665
class InvalidStatusRequestHandler(http_server.TestingHTTPRequestHandler):
1052
1034
        self.assertEqual('single', t._range_hint)
1053
1035
 
1054
1036
 
1055
 
class TruncatedBeforeBoundaryRequestHandler(
1056
 
    http_server.TestingHTTPRequestHandler):
1057
 
    """Truncation before a boundary, like in bug 198646"""
1058
 
 
1059
 
    _truncated_ranges = 1
1060
 
 
1061
 
    def get_multiple_ranges(self, file, file_size, ranges):
1062
 
        self.send_response(206)
1063
 
        self.send_header('Accept-Ranges', 'bytes')
1064
 
        boundary = 'tagada'
1065
 
        self.send_header('Content-Type',
1066
 
                         'multipart/byteranges; boundary=%s' % boundary)
1067
 
        boundary_line = '--%s\r\n' % boundary
1068
 
        # Calculate the Content-Length
1069
 
        content_length = 0
1070
 
        for (start, end) in ranges:
1071
 
            content_length += len(boundary_line)
1072
 
            content_length += self._header_line_length(
1073
 
                'Content-type', 'application/octet-stream')
1074
 
            content_length += self._header_line_length(
1075
 
                'Content-Range', 'bytes %d-%d/%d' % (start, end, file_size))
1076
 
            content_length += len('\r\n') # end headers
1077
 
            content_length += end - start # + 1
1078
 
        content_length += len(boundary_line)
1079
 
        self.send_header('Content-length', content_length)
1080
 
        self.end_headers()
1081
 
 
1082
 
        # Send the multipart body
1083
 
        cur = 0
1084
 
        for (start, end) in ranges:
1085
 
            if cur + self._truncated_ranges >= len(ranges):
1086
 
                # Abruptly ends the response and close the connection
1087
 
                self.close_connection = 1
1088
 
                return
1089
 
            self.wfile.write(boundary_line)
1090
 
            self.send_header('Content-type', 'application/octet-stream')
1091
 
            self.send_header('Content-Range', 'bytes %d-%d/%d'
1092
 
                             % (start, end, file_size))
1093
 
            self.end_headers()
1094
 
            self.send_range_content(file, start, end - start + 1)
1095
 
            cur += 1
1096
 
        # Final boundary
1097
 
        self.wfile.write(boundary_line)
1098
 
 
1099
 
 
1100
 
class TestTruncatedBeforeBoundary(TestSpecificRequestHandler):
1101
 
    """Tests the case of bug 198646, disconnecting before a boundary."""
1102
 
 
1103
 
    _req_handler_class = TruncatedBeforeBoundaryRequestHandler
1104
 
 
1105
 
    def setUp(self):
1106
 
        super(TestTruncatedBeforeBoundary, self).setUp()
1107
 
        self.build_tree_contents([('a', '0123456789')],)
1108
 
 
1109
 
    def test_readv_with_short_reads(self):
1110
 
        server = self.get_readonly_server()
1111
 
        t = self.get_readonly_transport()
1112
 
        # Force separate ranges for each offset
1113
 
        t._bytes_to_read_before_seek = 0
1114
 
        ireadv = iter(t.readv('a', ((0, 1), (2, 1), (4, 2), (9, 1))))
1115
 
        self.assertEqual((0, '0'), ireadv.next())
1116
 
        self.assertEqual((2, '2'), ireadv.next())
1117
 
        self.assertEqual((4, '45'), ireadv.next())
1118
 
        self.assertEqual((9, '9'), ireadv.next())
1119
 
 
1120
 
 
1121
1037
class LimitedRangeRequestHandler(http_server.TestingHTTPRequestHandler):
1122
1038
    """Errors out when range specifiers exceed the limit"""
1123
1039
 
1160
1076
                                      protocol_version=self._protocol_version)
1161
1077
 
1162
1078
    def setUp(self):
1163
 
        super(TestLimitedRangeRequestServer, self).setUp()
 
1079
        http_utils.TestCaseWithWebserver.setUp(self)
1164
1080
        # We need to manipulate ranges that correspond to real chunks in the
1165
1081
        # response, so we build a content appropriately.
1166
1082
        filler = ''.join(['abcdefghij' for x in range(102)])
1200
1116
 
1201
1117
    def assertEvaluateProxyBypass(self, expected, host, no_proxy):
1202
1118
        handler = _urllib2_wrappers.ProxyHandler()
1203
 
        self.assertEqual(expected,
 
1119
        self.assertEquals(expected,
1204
1120
                          handler.evaluate_proxy_bypass(host, no_proxy))
1205
1121
 
1206
1122
    def test_empty_user(self):
1352
1268
        )
1353
1269
 
1354
1270
    def setUp(self):
1355
 
        super(TestRanges, self).setUp()
 
1271
        http_utils.TestCaseWithWebserver.setUp(self)
1356
1272
        self.build_tree_contents([('a', '0123456789')],)
1357
1273
 
1358
1274
    def create_transport_readonly_server(self):
1573
1489
                          self.get_a, self.old_transport, redirected)
1574
1490
 
1575
1491
 
1576
 
def _setup_authentication_config(**kwargs):
1577
 
    conf = config.AuthenticationConfig()
1578
 
    conf._get_config().update({'httptest': kwargs})
1579
 
    conf._save()
1580
 
 
1581
 
 
1582
 
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
1583
 
    """Unit tests for glue by which urllib2 asks us for authentication"""
1584
 
 
1585
 
    def test_get_user_password_without_port(self):
1586
 
        """We cope if urllib2 doesn't tell us the port.
1587
 
 
1588
 
        See https://bugs.launchpad.net/bzr/+bug/654684
1589
 
        """
1590
 
        user = 'joe'
1591
 
        password = 'foo'
1592
 
        _setup_authentication_config(scheme='http', host='localhost',
1593
 
                                     user=user, password=password)
1594
 
        handler = _urllib2_wrappers.HTTPAuthHandler()
1595
 
        got_pass = handler.get_user_password(dict(
1596
 
            user='joe',
1597
 
            protocol='http',
1598
 
            host='localhost',
1599
 
            path='/',
1600
 
            realm='Realm',
1601
 
            ))
1602
 
        self.assertEqual((user, password), got_pass)
1603
 
 
1604
 
 
1605
1492
class TestAuth(http_utils.TestCaseWithWebserver):
1606
1493
    """Test authentication scheme"""
1607
1494
 
1611
1498
        vary_by_http_auth_scheme(),
1612
1499
        )
1613
1500
 
 
1501
    _auth_header = 'Authorization'
 
1502
    _password_prompt_prefix = ''
 
1503
    _username_prompt_prefix = ''
 
1504
    # Set by load_tests
 
1505
    _auth_server = None
 
1506
 
1614
1507
    def setUp(self):
1615
1508
        super(TestAuth, self).setUp()
1616
1509
        self.server = self.get_readonly_server()
1639
1532
        return url
1640
1533
 
1641
1534
    def get_user_transport(self, user, password):
1642
 
        t = transport.get_transport_from_url(
1643
 
            self.get_user_url(user, password))
 
1535
        t = transport.get_transport(self.get_user_url(user, password))
1644
1536
        return t
1645
1537
 
1646
1538
    def test_no_user(self):
1758
1650
        ui.ui_factory = tests.TestUIFactory(stdin=stdin_content,
1759
1651
                                            stderr=tests.StringIOWrapper())
1760
1652
        # Create a minimal config file with the right password
1761
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1762
 
                                     user=user, password=password)
 
1653
        _setup_authentication_config(
 
1654
            scheme='http', 
 
1655
            port=self.server.port,
 
1656
            user=user,
 
1657
            password=password)
1763
1658
        # Issue a request to the server to connect
1764
1659
        self.assertEqual('contents of a\n',t.get('a').read())
1765
1660
        # stdin should have  been left untouched
1772
1667
                                     http_utils.ProxyDigestAuthServer):
1773
1668
            raise tests.TestNotApplicable('HTTP/proxy auth digest only test')
1774
1669
        if self._testing_pycurl():
1775
 
            self.knownFailure(
 
1670
            raise tests.KnownFailure(
1776
1671
                'pycurl does not handle a nonce change')
1777
1672
        self.server.add_user('joe', 'foo')
1778
1673
        t = self.get_user_transport('joe', 'foo')
1795
1690
        user = 'joe'
1796
1691
        password = 'foo'
1797
1692
        self.server.add_user(user, password)
1798
 
        _setup_authentication_config(scheme='http', port=self.server.port,
1799
 
                                     user=user, password=password)
 
1693
        _setup_authentication_config(
 
1694
            scheme='http', 
 
1695
            port=self.server.port,
 
1696
            user=user,
 
1697
            password=password)
1800
1698
        t = self.get_user_transport(None, None)
1801
1699
        # Issue a request to the server to connect
1802
1700
        self.assertEqual('contents of a\n', t.get('a').read())
1803
1701
        # Only one 'Authentication Required' error should occur
1804
1702
        self.assertEqual(1, self.server.auth_required_errors)
1805
1703
 
1806
 
    def test_no_credential_leaks_in_log(self):
1807
 
        self.overrideAttr(debug, 'debug_flags', set(['http']))
 
1704
 
 
1705
def _setup_authentication_config(**kwargs):
 
1706
    conf = config.AuthenticationConfig()
 
1707
    conf._get_config().update({'httptest': kwargs})
 
1708
    conf._save()
 
1709
 
 
1710
 
 
1711
 
 
1712
class TestUrllib2AuthHandler(tests.TestCaseWithTransport):
 
1713
    """Unit tests for glue by which urllib2 asks us for authentication"""
 
1714
 
 
1715
    def test_get_user_password_without_port(self):
 
1716
        """We cope if urllib2 doesn't tell us the port.
 
1717
 
 
1718
        See https://bugs.launchpad.net/bzr/+bug/654684
 
1719
        """
1808
1720
        user = 'joe'
1809
 
        password = 'very-sensitive-password'
1810
 
        self.server.add_user(user, password)
1811
 
        t = self.get_user_transport(user, password)
1812
 
        # Capture the debug calls to mutter
1813
 
        self.mutters = []
1814
 
        def mutter(*args):
1815
 
            lines = args[0] % args[1:]
1816
 
            # Some calls output multiple lines, just split them now since we
1817
 
            # care about a single one later.
1818
 
            self.mutters.extend(lines.splitlines())
1819
 
        self.overrideAttr(trace, 'mutter', mutter)
1820
 
        # Issue a request to the server to connect
1821
 
        self.assertEqual(True, t.has('a'))
1822
 
        # Only one 'Authentication Required' error should occur
1823
 
        self.assertEqual(1, self.server.auth_required_errors)
1824
 
        # Since the authentification succeeded, there should be a corresponding
1825
 
        # debug line
1826
 
        sent_auth_headers = [line for line in self.mutters
1827
 
                             if line.startswith('> %s' % (self._auth_header,))]
1828
 
        self.assertLength(1, sent_auth_headers)
1829
 
        self.assertStartsWith(sent_auth_headers[0],
1830
 
                              '> %s: <masked>' % (self._auth_header,))
 
1721
        password = 'foo'
 
1722
        _setup_authentication_config(
 
1723
            scheme='http', 
 
1724
            host='localhost',
 
1725
            user=user,
 
1726
            password=password)
 
1727
        handler = _urllib2_wrappers.HTTPAuthHandler()
 
1728
        got_pass = handler.get_user_password(dict(
 
1729
            user='joe',
 
1730
            protocol='http',
 
1731
            host='localhost',
 
1732
            path='/',
 
1733
            realm='Realm',
 
1734
            ))
 
1735
        self.assertEquals((user, password), got_pass)
1831
1736
 
1832
1737
 
1833
1738
class TestProxyAuth(TestAuth):
1834
 
    """Test proxy authentication schemes.
1835
 
 
1836
 
    This inherits from TestAuth to tweak the setUp and filter some failing
1837
 
    tests.
1838
 
    """
 
1739
    """Test proxy authentication schemes."""
1839
1740
 
1840
1741
    scenarios = multiply_scenarios(
1841
1742
        vary_by_http_client_implementation(),
1843
1744
        vary_by_http_proxy_auth_scheme(),
1844
1745
        )
1845
1746
 
 
1747
    _auth_header = 'Proxy-authorization'
 
1748
    _password_prompt_prefix = 'Proxy '
 
1749
    _username_prompt_prefix = 'Proxy '
 
1750
 
1846
1751
    def setUp(self):
1847
1752
        super(TestProxyAuth, self).setUp()
1848
1753
        # Override the contents to avoid false positives
1860
1765
        if self._testing_pycurl():
1861
1766
            import pycurl
1862
1767
            if pycurl.version_info()[1] < '7.16.0':
1863
 
                self.knownFailure(
 
1768
                raise tests.KnownFailure(
1864
1769
                    'pycurl < 7.16.0 does not handle empty proxy passwords')
1865
1770
        super(TestProxyAuth, self).test_empty_pass()
1866
1771
 
1908
1813
        server._url_protocol = self._url_protocol
1909
1814
        return server
1910
1815
 
1911
 
    def test_open_controldir(self):
 
1816
    def test_open_bzrdir(self):
1912
1817
        branch = self.make_branch('relpath')
1913
1818
        url = self.http_server.get_url() + 'relpath'
1914
 
        bd = controldir.ControlDir.open(url)
 
1819
        bd = bzrdir.BzrDir.open(url)
1915
1820
        self.addCleanup(bd.transport.disconnect)
1916
1821
        self.assertIsInstance(bd, _mod_remote.RemoteBzrDir)
1917
1822
 
1920
1825
        # The 'readv' command in the smart protocol both sends and receives
1921
1826
        # bulk data, so we use that.
1922
1827
        self.build_tree(['data-file'])
1923
 
        http_transport = transport.get_transport_from_url(
1924
 
            self.http_server.get_url())
 
1828
        http_transport = transport.get_transport(self.http_server.get_url())
1925
1829
        medium = http_transport.get_smart_medium()
1926
1830
        # Since we provide the medium, the url below will be mostly ignored
1927
1831
        # during the test, as long as the path is '/'.
1935
1839
        post_body = 'hello\n'
1936
1840
        expected_reply_body = 'ok\x012\n'
1937
1841
 
1938
 
        http_transport = transport.get_transport_from_url(
1939
 
            self.http_server.get_url())
 
1842
        http_transport = transport.get_transport(self.http_server.get_url())
1940
1843
        medium = http_transport.get_smart_medium()
1941
1844
        response = medium.send_http_smart_request(post_body)
1942
1845
        reply_body = response.read()
2000
1903
        self.assertIsInstance(r, type(t))
2001
1904
        # Both transports share the some connection
2002
1905
        self.assertEqual(t._get_connection(), r._get_connection())
2003
 
        self.assertEqual('http://www.example.com/foo/subdir/', r.base)
2004
1906
 
2005
1907
    def test_redirected_to_self_with_slash(self):
2006
1908
        t = self._transport('http://www.example.com/foo')
2017
1919
        r = t._redirected_to('http://www.example.com/foo',
2018
1920
                             'http://foo.example.com/foo/subdir')
2019
1921
        self.assertIsInstance(r, type(t))
2020
 
        self.assertEqual('http://foo.example.com/foo/subdir/',
2021
 
            r.external_url())
2022
1922
 
2023
1923
    def test_redirected_to_same_host_sibling_protocol(self):
2024
1924
        t = self._transport('http://www.example.com/foo')
2025
1925
        r = t._redirected_to('http://www.example.com/foo',
2026
1926
                             'https://www.example.com/foo')
2027
1927
        self.assertIsInstance(r, type(t))
2028
 
        self.assertEqual('https://www.example.com/foo/',
2029
 
            r.external_url())
2030
1928
 
2031
1929
    def test_redirected_to_same_host_different_protocol(self):
2032
1930
        t = self._transport('http://www.example.com/foo')
2033
1931
        r = t._redirected_to('http://www.example.com/foo',
2034
1932
                             'ftp://www.example.com/foo')
2035
 
        self.assertNotEqual(type(r), type(t))
2036
 
        self.assertEqual('ftp://www.example.com/foo/', r.external_url())
2037
 
 
2038
 
    def test_redirected_to_same_host_specific_implementation(self):
2039
 
        t = self._transport('http://www.example.com/foo')
2040
 
        r = t._redirected_to('http://www.example.com/foo',
2041
 
                             'https+urllib://www.example.com/foo')
2042
 
        self.assertEqual('https://www.example.com/foo/', r.external_url())
 
1933
        self.assertNotEquals(type(r), type(t))
2043
1934
 
2044
1935
    def test_redirected_to_different_host_same_user(self):
2045
1936
        t = self._transport('http://joe@www.example.com/foo')
2046
1937
        r = t._redirected_to('http://www.example.com/foo',
2047
1938
                             'https://foo.example.com/foo')
2048
1939
        self.assertIsInstance(r, type(t))
2049
 
        self.assertEqual(t._parsed_url.user, r._parsed_url.user)
2050
 
        self.assertEqual('https://joe@foo.example.com/foo/', r.external_url())
 
1940
        self.assertEqual(t._user, r._user)
2051
1941
 
2052
1942
 
2053
1943
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
2106
1996
    pass
2107
1997
 
2108
1998
 
2109
 
if features.HTTPSServerFeature.available():
 
1999
if tests.HTTPSServerFeature.available():
2110
2000
    from bzrlib.tests import https_server
2111
2001
    class ActivityHTTPSServer(ActivityServerMixin, https_server.HTTPSServer):
2112
2002
        pass
2120
2010
    """
2121
2011
 
2122
2012
    def setUp(self):
 
2013
        tests.TestCase.setUp(self)
2123
2014
        self.server = self._activity_server(self._protocol_version)
2124
2015
        self.server.start_server()
2125
 
        self.addCleanup(self.server.stop_server)
2126
 
        _activities = {} # Don't close over self and create a cycle
 
2016
        self.activities = {}
2127
2017
        def report_activity(t, bytes, direction):
2128
 
            count = _activities.get(direction, 0)
 
2018
            count = self.activities.get(direction, 0)
2129
2019
            count += bytes
2130
 
            _activities[direction] = count
2131
 
        self.activities = _activities
 
2020
            self.activities[direction] = count
 
2021
 
2132
2022
        # We override at class level because constructors may propagate the
2133
2023
        # bound method and render instance overriding ineffective (an
2134
2024
        # alternative would be to define a specific ui factory instead...)
2135
2025
        self.overrideAttr(self._transport, '_report_activity', report_activity)
 
2026
        self.addCleanup(self.server.stop_server)
2136
2027
 
2137
2028
    def get_transport(self):
2138
2029
        t = self._transport(self.server.get_url())
2262
2153
        )
2263
2154
 
2264
2155
    def setUp(self):
2265
 
        super(TestActivity, self).setUp()
2266
2156
        TestActivityMixin.setUp(self)
2267
2157
 
2268
2158
 
2277
2167
    _protocol_version = 'HTTP/1.1'
2278
2168
 
2279
2169
    def setUp(self):
2280
 
        super(TestNoReportActivity, self).setUp()
2281
2170
        self._transport =_urllib.HttpTransport_urllib
2282
2171
        TestActivityMixin.setUp(self)
2283
2172
 
2359
2248
        # stdout should be empty, stderr will contains the prompts
2360
2249
        self.assertEqual('', stdout.getvalue())
2361
2250
 
 
2251