1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# Copyright (C) 2007 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
import bzrlib.hooks
# SFTPTransport offers better performances but relies on paramiko, if paramiko
# is not available, we fallback to FtpTransport
from bzrlib.tests import test_sftp_transport
if test_sftp_transport.paramiko_loaded:
from bzrlib.transport import sftp
_backing_scheme = 'sftp'
_backing_transport_class = sftp.SFTPTransport
_backing_test_class = test_sftp_transport.TestCaseWithSFTPServer
else:
from bzrlib.transport import ftp
from bzrlib.tests import test_ftp_transport
_backing_scheme = 'ftp'
_backing_transport_class = ftp.FtpTransport
_backing_test_class = test_ftp_transport.TestCaseWithFTPServer
from bzrlib.transport import (
ConnectedTransport,
get_transport,
register_transport,
register_urlparse_netloc_protocol,
unregister_transport,
_unregister_urlparse_netloc_protocol,
)
class TransportHooks(bzrlib.hooks.Hooks):
"""Dict-mapping hook name to a list of callables for transport hooks"""
def __init__(self):
super(TransportHooks, self).__init__()
# Invoked when the transport has just created a new connection.
# The api signature is (transport, connection, credentials)
self['_set_connection'] = []
_hooked_scheme = 'hooked'
def _change_scheme_in(url, actual, desired):
if not url.startswith(actual + '://'):
raise AssertionError('url "%r" does not start with "%r]"'
% (url, actual))
return desired + url[len(actual):]
class InstrumentedTransport(_backing_transport_class):
"""Instrumented transport class to test commands behavior"""
hooks = TransportHooks()
def __init__(self, base, _from_transport=None):
if not base.startswith(_hooked_scheme + '://'):
raise ValueError(base)
# We need to trick the backing transport class about the scheme used
# We'll do the reverse when we need to talk to the backing server
fake_base = _change_scheme_in(base, _hooked_scheme, _backing_scheme)
super(InstrumentedTransport, self).__init__(
fake_base, _from_transport=_from_transport)
# The following is needed to minimize the effects of our trick above
# while retaining the best compatibility.
self._scheme = _hooked_scheme
base = self._unsplit_url(self._scheme,
self._user, self._password,
self._host, self._port,
self._path)
super(ConnectedTransport, self).__init__(base)
class ConnectionHookedTransport(InstrumentedTransport):
"""Transport instrumented to inspect connections"""
def _set_connection(self, connection, credentials):
"""Called when a new connection is created """
super(ConnectionHookedTransport, self)._set_connection(connection,
credentials)
for hook in self.hooks['_set_connection']:
hook(self, connection, credentials)
class TestCaseWithConnectionHookedTransport(_backing_test_class):
def setUp(self):
register_urlparse_netloc_protocol(_hooked_scheme)
register_transport(_hooked_scheme, ConnectionHookedTransport)
def unregister():
unregister_transport(_hooked_scheme, ConnectionHookedTransport)
_unregister_urlparse_netloc_protocol(_hooked_scheme)
self.addCleanup(unregister)
super(TestCaseWithConnectionHookedTransport, self).setUp()
self.reset_connections()
# Add the 'hooked' url to the permitted url list.
# XXX: See TestCase.start_server. This whole module shouldn't need to
# exist - a bug has been filed on that. once its cleanedup/removed, the
# standard test support code will work and permit the server url
# correctly.
url = self.get_url()
t = get_transport(url)
if t.base.endswith('work/'):
t = t.clone('../..')
self.permit_url(t.base)
def get_url(self, relpath=None):
super_self = super(TestCaseWithConnectionHookedTransport, self)
url = super_self.get_url(relpath)
# Replace the backing scheme by our own (see
# InstrumentedTransport.__init__)
url = _change_scheme_in(url, _backing_scheme, _hooked_scheme)
return url
def start_logging_connections(self):
ConnectionHookedTransport.hooks.install_named_hook(
'_set_connection', self._collect_connection, None)
# uninstall our hooks when we are finished
self.addCleanup(self.reset_hooks)
def reset_hooks(self):
InstrumentedTransport.hooks = TransportHooks()
def reset_connections(self):
self.connections = []
def _collect_connection(self, transport, connection, credentials):
# Note: uncomment the following line and use 'bt' under pdb, that will
# identify all the connections made including the extraneous ones.
# import pdb; pdb.set_trace()
self.connections.append(connection)
|