13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from bzrlib.tests import features
20
# SFTPTransport offers better performances but relies on paramiko, if paramiko
21
# is not available, we fallback to FtpTransport
22
if features.paramiko.available():
23
from bzrlib.tests import test_sftp_transport
24
from bzrlib.transport import sftp
25
_backing_scheme = 'sftp'
26
_backing_transport_class = sftp.SFTPTransport
27
_backing_test_class = test_sftp_transport.TestCaseWithSFTPServer
29
from bzrlib.transport import ftp
30
from bzrlib.tests import test_ftp_transport
31
_backing_scheme = 'ftp'
32
_backing_transport_class = ftp.FtpTransport
33
_backing_test_class = test_ftp_transport.TestCaseWithFTPServer
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from bzrlib.hooks import Hooks
18
from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer
35
19
from bzrlib.transport import (
38
20
register_transport,
39
21
register_urlparse_netloc_protocol,
40
22
unregister_transport,
41
23
_unregister_urlparse_netloc_protocol,
46
class TransportHooks(bzrlib.hooks.Hooks):
25
from bzrlib.transport.sftp import SFTPTransport
28
class TransportHooks(Hooks):
47
29
"""Dict-mapping hook name to a list of callables for transport hooks"""
49
31
def __init__(self):
50
super(TransportHooks, self).__init__()
51
33
# Invoked when the transport has just created a new connection.
52
34
# The api signature is (transport, connection, credentials)
53
35
self['_set_connection'] = []
55
37
_hooked_scheme = 'hooked'
57
def _change_scheme_in(url, actual, desired):
58
if not url.startswith(actual + '://'):
59
raise AssertionError('url "%r" does not start with "%r]"'
61
return desired + url[len(actual):]
64
class InstrumentedTransport(_backing_transport_class):
39
class InstrumentedTransport(SFTPTransport):
65
40
"""Instrumented transport class to test commands behavior"""
67
42
hooks = TransportHooks()
69
44
def __init__(self, base, _from_transport=None):
70
if not base.startswith(_hooked_scheme + '://'):
71
raise ValueError(base)
72
# We need to trick the backing transport class about the scheme used
73
# We'll do the reverse when we need to talk to the backing server
74
fake_base = _change_scheme_in(base, _hooked_scheme, _backing_scheme)
75
super(InstrumentedTransport, self).__init__(
76
fake_base, _from_transport=_from_transport)
77
# The following is needed to minimize the effects of our trick above
78
# while retaining the best compatibility.
79
self._scheme = _hooked_scheme
80
base = self._unsplit_url(self._scheme,
81
self._user, self._password,
82
self._host, self._port,
84
super(ConnectedTransport, self).__init__(base)
45
assert base.startswith(_hooked_scheme + '://')
46
# Avoid SFTPTransport assertion since we use a dedicated scheme
47
super(SFTPTransport, self).__init__(base,
48
_from_transport=_from_transport)
87
51
class ConnectionHookedTransport(InstrumentedTransport):
95
59
hook(self, connection, credentials)
98
class TestCaseWithConnectionHookedTransport(_backing_test_class):
62
class TestCaseWithConnectionHookedTransport(TestCaseWithSFTPServer):
101
65
register_urlparse_netloc_protocol(_hooked_scheme)
102
66
register_transport(_hooked_scheme, ConnectionHookedTransport)
103
self.addCleanup(unregister_transport, _hooked_scheme,
104
ConnectionHookedTransport)
105
self.addCleanup(_unregister_urlparse_netloc_protocol, _hooked_scheme)
69
unregister_transport(_hooked_scheme, ConnectionHookedTransport)
70
_unregister_urlparse_netloc_protocol(_hooked_scheme)
72
self.addCleanup(unregister)
106
73
super(TestCaseWithConnectionHookedTransport, self).setUp()
107
74
self.reset_connections()
108
# Add the 'hooked' url to the permitted url list.
109
# XXX: See TestCase.start_server. This whole module shouldn't need to
110
# exist - a bug has been filed on that. once its cleanedup/removed, the
111
# standard test support code will work and permit the server url
114
t = get_transport(url)
115
if t.base.endswith('work/'):
117
self.permit_url(t.base)
119
76
def get_url(self, relpath=None):
120
77
super_self = super(TestCaseWithConnectionHookedTransport, self)
121
78
url = super_self.get_url(relpath)
122
# Replace the backing scheme by our own (see
123
# InstrumentedTransport.__init__)
124
url = _change_scheme_in(url, _backing_scheme, _hooked_scheme)
79
# Replace the sftp scheme by our own
80
url = _hooked_scheme + url[len('sftp'):]
127
def start_logging_connections(self):
128
self.overrideAttr(InstrumentedTransport, 'hooks', TransportHooks())
129
# We preserved the hooks class attribute. Now we install our hook.
130
ConnectionHookedTransport.hooks.install_named_hook(
131
'_set_connection', self._collect_connection, None)
83
def install_hooks(self):
84
ConnectionHookedTransport.hooks.install_hook('_set_connection',
85
self.set_connection_hook)
86
# uninstall our hooks when we are finished
87
self.addCleanup(self.reset_hooks)
89
def reset_hooks(self):
90
InstrumentedTransport.hooks = TransportHooks()
133
92
def reset_connections(self):
134
93
self.connections = []
136
def _collect_connection(self, transport, connection, credentials):
95
def set_connection_hook(self, transport, connection, credentials):
137
96
# Note: uncomment the following line and use 'bt' under pdb, that will
138
97
# identify all the connections made including the extraneous ones.
139
98
# import pdb; pdb.set_trace()