~bzr-pqm/bzr/bzr.dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright (C) 2007-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import bzrlib.hooks
from bzrlib import transport
from bzrlib.tests import features

# SFTPTransport offers better performances but relies on paramiko, if paramiko
# is not available, we fallback to FtpTransport
if features.paramiko.available():
    from bzrlib.tests import test_sftp_transport
    from bzrlib.transport import sftp
    _backing_scheme = 'sftp'
    _backing_transport_class = sftp.SFTPTransport
    _backing_test_class = test_sftp_transport.TestCaseWithSFTPServer
else:
    from bzrlib.transport import ftp
    from bzrlib.tests import test_ftp_transport
    _backing_scheme = 'ftp'
    _backing_transport_class = ftp.FtpTransport
    _backing_test_class = test_ftp_transport.TestCaseWithFTPServer

from bzrlib.transport import (
    ConnectedTransport,
    register_transport,
    register_urlparse_netloc_protocol,
    unregister_transport,
    _unregister_urlparse_netloc_protocol,
    )



class TransportHooks(bzrlib.hooks.Hooks):
    """Dict-mapping hook name to a list of callables for transport hooks"""

    def __init__(self):
        super(TransportHooks, self).__init__()
        # Invoked when the transport has just created a new connection.
        # The api signature is (transport, connection, credentials)
        self['_set_connection'] = []

_hooked_scheme = 'hooked'

def _change_scheme_in(url, actual, desired):
    if not url.startswith(actual + '://'):
        raise AssertionError('url "%r" does not start with "%r]"'
                             % (url, actual))
    return desired + url[len(actual):]


class InstrumentedTransport(_backing_transport_class):
    """Instrumented transport class to test commands behavior"""

    hooks = TransportHooks()

    def __init__(self, base, _from_transport=None):
        if not base.startswith(_hooked_scheme + '://'):
            raise ValueError(base)
        # We need to trick the backing transport class about the scheme used
        # We'll do the reverse when we need to talk to the backing server
        fake_base = _change_scheme_in(base, _hooked_scheme, _backing_scheme)
        super(InstrumentedTransport, self).__init__(
            fake_base, _from_transport=_from_transport)
        # The following is needed to minimize the effects of our trick above
        # while retaining the best compatibility.
        self._scheme = _hooked_scheme
        base = self._unsplit_url(self._scheme,
                                 self._user, self._password,
                                 self._host, self._port,
                                 self._path)
        super(ConnectedTransport, self).__init__(base)


class ConnectionHookedTransport(InstrumentedTransport):
    """Transport instrumented to inspect connections"""

    def _set_connection(self, connection, credentials):
        """Called when a new connection is created """
        super(ConnectionHookedTransport, self)._set_connection(connection,
                                                               credentials)
        for hook in self.hooks['_set_connection']:
            hook(self, connection, credentials)


class TestCaseWithConnectionHookedTransport(_backing_test_class):

    def setUp(self):
        register_urlparse_netloc_protocol(_hooked_scheme)
        register_transport(_hooked_scheme, ConnectionHookedTransport)
        self.addCleanup(unregister_transport, _hooked_scheme,
                        ConnectionHookedTransport)
        self.addCleanup(_unregister_urlparse_netloc_protocol, _hooked_scheme)
        super(TestCaseWithConnectionHookedTransport, self).setUp()
        self.reset_connections()
        # Add the 'hooked' url to the permitted url list.
        # XXX: See TestCase.start_server. This whole module shouldn't need to
        # exist - a bug has been filed on that. once its cleanedup/removed, the
        # standard test support code will work and permit the server url
        # correctly.
        url = self.get_url()
        t = transport.get_transport(url)
        if t.base.endswith('work/'):
            t = t.clone('../..')
        self.permit_url(t.base)

    def get_url(self, relpath=None):
        super_self = super(TestCaseWithConnectionHookedTransport, self)
        url = super_self.get_url(relpath)
        # Replace the backing scheme by our own (see
        # InstrumentedTransport.__init__)
        url = _change_scheme_in(url, _backing_scheme, _hooked_scheme)
        return url

    def start_logging_connections(self):
        self.overrideAttr(InstrumentedTransport, 'hooks', TransportHooks())
        # We preserved the hooks class attribute. Now we install our hook.
        ConnectionHookedTransport.hooks.install_named_hook(
            '_set_connection', self._collect_connection, None)

    def reset_connections(self):
        self.connections = []

    def _collect_connection(self, transport, connection, credentials):
        # Note: uncomment the following line and use 'bt' under pdb, that will
        # identify all the connections made including the extraneous ones.
        # import pdb; pdb.set_trace()
        self.connections.append(connection)