14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18
from bzrlib.tests import features
19
20
# SFTPTransport offers better performances but relies on paramiko, if paramiko
20
21
# is not available, we fallback to FtpTransport
21
22
if features.paramiko.available():
22
23
from bzrlib.tests import test_sftp_transport
23
from bzrlib.transport import sftp, Transport
24
from bzrlib.transport import sftp
24
25
_backing_scheme = 'sftp'
25
26
_backing_transport_class = sftp.SFTPTransport
26
27
_backing_test_class = test_sftp_transport.TestCaseWithSFTPServer
28
from bzrlib.transport import ftp, Transport
29
from bzrlib.transport import ftp
29
30
from bzrlib.tests import test_ftp_transport
30
31
_backing_scheme = 'ftp'
31
32
_backing_transport_class = ftp.FtpTransport
32
33
_backing_test_class = test_ftp_transport.TestCaseWithFTPServer
35
from bzrlib.transport import (
39
register_urlparse_netloc_protocol,
41
_unregister_urlparse_netloc_protocol,
46
class TransportHooks(bzrlib.hooks.Hooks):
47
"""Dict-mapping hook name to a list of callables for transport hooks"""
50
super(TransportHooks, self).__init__()
51
# Invoked when the transport has just created a new connection.
52
# The api signature is (transport, connection, credentials)
53
self['_set_connection'] = []
55
_hooked_scheme = 'hooked'
57
def _change_scheme_in(url, actual, desired):
58
if not url.startswith(actual + '://'):
59
raise AssertionError('url "%r" does not start with "%r]"'
61
return desired + url[len(actual):]
64
class InstrumentedTransport(_backing_transport_class):
65
"""Instrumented transport class to test commands behavior"""
67
hooks = TransportHooks()
69
def __init__(self, base, _from_transport=None):
70
if not base.startswith(_hooked_scheme + '://'):
71
raise ValueError(base)
72
# We need to trick the backing transport class about the scheme used
73
# We'll do the reverse when we need to talk to the backing server
74
fake_base = _change_scheme_in(base, _hooked_scheme, _backing_scheme)
75
super(InstrumentedTransport, self).__init__(
76
fake_base, _from_transport=_from_transport)
77
# The following is needed to minimize the effects of our trick above
78
# while retaining the best compatibility.
79
self._scheme = _hooked_scheme
80
base = self._unsplit_url(self._scheme,
81
self._user, self._password,
82
self._host, self._port,
84
super(ConnectedTransport, self).__init__(base)
87
class ConnectionHookedTransport(InstrumentedTransport):
88
"""Transport instrumented to inspect connections"""
90
def _set_connection(self, connection, credentials):
91
"""Called when a new connection is created """
92
super(ConnectionHookedTransport, self)._set_connection(connection,
94
for hook in self.hooks['_set_connection']:
95
hook(self, connection, credentials)
35
98
class TestCaseWithConnectionHookedTransport(_backing_test_class):
101
register_urlparse_netloc_protocol(_hooked_scheme)
102
register_transport(_hooked_scheme, ConnectionHookedTransport)
105
unregister_transport(_hooked_scheme, ConnectionHookedTransport)
106
_unregister_urlparse_netloc_protocol(_hooked_scheme)
108
self.addCleanup(unregister)
38
109
super(TestCaseWithConnectionHookedTransport, self).setUp()
39
110
self.reset_connections()
111
# Add the 'hooked' url to the permitted url list.
112
# XXX: See TestCase.start_server. This whole module shouldn't need to
113
# exist - a bug has been filed on that. once its cleanedup/removed, the
114
# standard test support code will work and permit the server url
117
t = get_transport(url)
118
if t.base.endswith('work/'):
120
self.permit_url(t.base)
122
def get_url(self, relpath=None):
123
super_self = super(TestCaseWithConnectionHookedTransport, self)
124
url = super_self.get_url(relpath)
125
# Replace the backing scheme by our own (see
126
# InstrumentedTransport.__init__)
127
url = _change_scheme_in(url, _backing_scheme, _hooked_scheme)
41
130
def start_logging_connections(self):
42
Transport.hooks.install_named_hook('post_connect',
43
self.connections.append, None)
131
ConnectionHookedTransport.hooks.install_named_hook(
132
'_set_connection', self._collect_connection, None)
133
# uninstall our hooks when we are finished
134
self.addCleanup(self.reset_hooks)
136
def reset_hooks(self):
137
InstrumentedTransport.hooks = TransportHooks()
45
139
def reset_connections(self):
46
140
self.connections = []
142
def _collect_connection(self, transport, connection, credentials):
143
# Note: uncomment the following line and use 'bt' under pdb, that will
144
# identify all the connections made including the extraneous ones.
145
# import pdb; pdb.set_trace()
146
self.connections.append(connection)