~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/transport_util.py

  • Committer: Vincent Ladeuil
  • Date: 2012-03-13 17:25:29 UTC
  • mfrom: (6499 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6501.
  • Revision ID: v.ladeuil+lp@free.fr-20120313172529-i0suyjnepsor25i7
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
import bzrlib.hooks
18
17
from bzrlib.tests import features
19
18
 
20
19
# SFTPTransport offers better performances but relies on paramiko, if paramiko
21
20
# is not available, we fallback to FtpTransport
22
21
if features.paramiko.available():
23
22
    from bzrlib.tests import test_sftp_transport
24
 
    from bzrlib.transport import sftp
 
23
    from bzrlib.transport import sftp, Transport
25
24
    _backing_scheme = 'sftp'
26
25
    _backing_transport_class = sftp.SFTPTransport
27
26
    _backing_test_class = test_sftp_transport.TestCaseWithSFTPServer
28
27
else:
29
 
    from bzrlib.transport import ftp
 
28
    from bzrlib.transport import ftp, Transport
30
29
    from bzrlib.tests import test_ftp_transport
31
30
    _backing_scheme = 'ftp'
32
31
    _backing_transport_class = ftp.FtpTransport
33
32
    _backing_test_class = test_ftp_transport.TestCaseWithFTPServer
34
33
 
35
 
from bzrlib.transport import (
36
 
    ConnectedTransport,
37
 
    get_transport,
38
 
    register_transport,
39
 
    register_urlparse_netloc_protocol,
40
 
    unregister_transport,
41
 
    _unregister_urlparse_netloc_protocol,
42
 
    )
43
 
 
44
 
 
45
 
 
46
 
class TransportHooks(bzrlib.hooks.Hooks):
47
 
    """Dict-mapping hook name to a list of callables for transport hooks"""
48
 
 
49
 
    def __init__(self):
50
 
        super(TransportHooks, self).__init__()
51
 
        # Invoked when the transport has just created a new connection.
52
 
        # The api signature is (transport, connection, credentials)
53
 
        self['_set_connection'] = []
54
 
 
55
 
_hooked_scheme = 'hooked'
56
 
 
57
 
def _change_scheme_in(url, actual, desired):
58
 
    if not url.startswith(actual + '://'):
59
 
        raise AssertionError('url "%r" does not start with "%r]"'
60
 
                             % (url, actual))
61
 
    return desired + url[len(actual):]
62
 
 
63
 
 
64
 
class InstrumentedTransport(_backing_transport_class):
65
 
    """Instrumented transport class to test commands behavior"""
66
 
 
67
 
    hooks = TransportHooks()
68
 
 
69
 
    def __init__(self, base, _from_transport=None):
70
 
        if not base.startswith(_hooked_scheme + '://'):
71
 
            raise ValueError(base)
72
 
        # We need to trick the backing transport class about the scheme used
73
 
        # We'll do the reverse when we need to talk to the backing server
74
 
        fake_base = _change_scheme_in(base, _hooked_scheme, _backing_scheme)
75
 
        super(InstrumentedTransport, self).__init__(
76
 
            fake_base, _from_transport=_from_transport)
77
 
        # The following is needed to minimize the effects of our trick above
78
 
        # while retaining the best compatibility.
79
 
        self._scheme = _hooked_scheme
80
 
        base = self._unsplit_url(self._scheme,
81
 
                                 self._user, self._password,
82
 
                                 self._host, self._port,
83
 
                                 self._path)
84
 
        super(ConnectedTransport, self).__init__(base)
85
 
 
86
 
 
87
 
class ConnectionHookedTransport(InstrumentedTransport):
88
 
    """Transport instrumented to inspect connections"""
89
 
 
90
 
    def _set_connection(self, connection, credentials):
91
 
        """Called when a new connection is created """
92
 
        super(ConnectionHookedTransport, self)._set_connection(connection,
93
 
                                                               credentials)
94
 
        for hook in self.hooks['_set_connection']:
95
 
            hook(self, connection, credentials)
96
 
 
97
34
 
98
35
class TestCaseWithConnectionHookedTransport(_backing_test_class):
99
36
 
100
37
    def setUp(self):
101
 
        register_urlparse_netloc_protocol(_hooked_scheme)
102
 
        register_transport(_hooked_scheme, ConnectionHookedTransport)
103
 
        self.addCleanup(unregister_transport, _hooked_scheme,
104
 
                        ConnectionHookedTransport)
105
 
        self.addCleanup(_unregister_urlparse_netloc_protocol, _hooked_scheme)
106
38
        super(TestCaseWithConnectionHookedTransport, self).setUp()
107
39
        self.reset_connections()
108
 
        # Add the 'hooked' url to the permitted url list.
109
 
        # XXX: See TestCase.start_server. This whole module shouldn't need to
110
 
        # exist - a bug has been filed on that. once its cleanedup/removed, the
111
 
        # standard test support code will work and permit the server url
112
 
        # correctly.
113
 
        url = self.get_url()
114
 
        t = get_transport(url)
115
 
        if t.base.endswith('work/'):
116
 
            t = t.clone('../..')
117
 
        self.permit_url(t.base)
118
 
 
119
 
    def get_url(self, relpath=None):
120
 
        super_self = super(TestCaseWithConnectionHookedTransport, self)
121
 
        url = super_self.get_url(relpath)
122
 
        # Replace the backing scheme by our own (see
123
 
        # InstrumentedTransport.__init__)
124
 
        url = _change_scheme_in(url, _backing_scheme, _hooked_scheme)
125
 
        return url
126
40
 
127
41
    def start_logging_connections(self):
128
 
        self.overrideAttr(InstrumentedTransport, 'hooks', TransportHooks())
129
 
        # We preserved the hooks class attribute. Now we install our hook.
130
 
        ConnectionHookedTransport.hooks.install_named_hook(
131
 
            '_set_connection', self._collect_connection, None)
 
42
        Transport.hooks.install_named_hook('post_connect',
 
43
            self.connections.append, None)
132
44
 
133
45
    def reset_connections(self):
134
46
        self.connections = []
135
47
 
136
 
    def _collect_connection(self, transport, connection, credentials):
137
 
        # Note: uncomment the following line and use 'bt' under pdb, that will
138
 
        # identify all the connections made including the extraneous ones.
139
 
        # import pdb; pdb.set_trace()
140
 
        self.connections.append(connection)
141