~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/transport_util.py

  • Committer: Robert Collins
  • Date: 2009-12-16 22:29:31 UTC
  • mto: This revision was merged to the branch mainline in revision 4920.
  • Revision ID: robertc@robertcollins.net-20091216222931-wbbn5ey4mwmpatwd
Review feedback.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007-2010 Canonical Ltd
 
1
# Copyright (C) 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
import bzrlib.hooks
18
 
from bzrlib import transport
19
 
from bzrlib.tests import features
20
18
 
21
19
# SFTPTransport offers better performances but relies on paramiko, if paramiko
22
20
# is not available, we fallback to FtpTransport
23
 
if features.paramiko.available():
24
 
    from bzrlib.tests import test_sftp_transport
 
21
from bzrlib.tests import test_sftp_transport
 
22
if test_sftp_transport.paramiko_loaded:
25
23
    from bzrlib.transport import sftp
26
24
    _backing_scheme = 'sftp'
27
25
    _backing_transport_class = sftp.SFTPTransport
35
33
 
36
34
from bzrlib.transport import (
37
35
    ConnectedTransport,
 
36
    get_transport,
38
37
    register_transport,
39
38
    register_urlparse_netloc_protocol,
40
39
    unregister_transport,
47
46
    """Dict-mapping hook name to a list of callables for transport hooks"""
48
47
 
49
48
    def __init__(self):
50
 
        super(TransportHooks, self).__init__("bzrlib.tests.transport_util",
51
 
            "InstrumentedTransport.hooks")
 
49
        super(TransportHooks, self).__init__()
52
50
        # Invoked when the transport has just created a new connection.
53
51
        # The api signature is (transport, connection, credentials)
54
52
        self['_set_connection'] = []
77
75
            fake_base, _from_transport=_from_transport)
78
76
        # The following is needed to minimize the effects of our trick above
79
77
        # while retaining the best compatibility.
80
 
        self._parsed_url.scheme = _hooked_scheme
81
 
        super(ConnectedTransport, self).__init__(str(self._parsed_url))
 
78
        self._scheme = _hooked_scheme
 
79
        base = self._unsplit_url(self._scheme,
 
80
                                 self._user, self._password,
 
81
                                 self._host, self._port,
 
82
                                 self._path)
 
83
        super(ConnectedTransport, self).__init__(base)
82
84
 
83
85
 
84
86
class ConnectionHookedTransport(InstrumentedTransport):
97
99
    def setUp(self):
98
100
        register_urlparse_netloc_protocol(_hooked_scheme)
99
101
        register_transport(_hooked_scheme, ConnectionHookedTransport)
100
 
        self.addCleanup(unregister_transport, _hooked_scheme,
101
 
                        ConnectionHookedTransport)
102
 
        self.addCleanup(_unregister_urlparse_netloc_protocol, _hooked_scheme)
 
102
 
 
103
        def unregister():
 
104
            unregister_transport(_hooked_scheme, ConnectionHookedTransport)
 
105
            _unregister_urlparse_netloc_protocol(_hooked_scheme)
 
106
 
 
107
        self.addCleanup(unregister)
103
108
        super(TestCaseWithConnectionHookedTransport, self).setUp()
104
109
        self.reset_connections()
105
110
        # Add the 'hooked' url to the permitted url list.
108
113
        # standard test support code will work and permit the server url
109
114
        # correctly.
110
115
        url = self.get_url()
111
 
        t = transport.get_transport(url)
 
116
        t = get_transport(url)
112
117
        if t.base.endswith('work/'):
113
118
            t = t.clone('../..')
114
119
        self.permit_url(t.base)
122
127
        return url
123
128
 
124
129
    def start_logging_connections(self):
125
 
        self.overrideAttr(InstrumentedTransport, 'hooks', TransportHooks())
126
 
        # We preserved the hooks class attribute. Now we install our hook.
127
130
        ConnectionHookedTransport.hooks.install_named_hook(
128
131
            '_set_connection', self._collect_connection, None)
 
132
        # uninstall our hooks when we are finished
 
133
        self.addCleanup(self.reset_hooks)
 
134
 
 
135
    def reset_hooks(self):
 
136
        InstrumentedTransport.hooks = TransportHooks()
129
137
 
130
138
    def reset_connections(self):
131
139
        self.connections = []