~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/transport_util.py

  • Committer: Andrew Bennetts
  • Date: 2009-07-27 08:02:52 UTC
  • mto: This revision was merged to the branch mainline in revision 4573.
  • Revision ID: andrew.bennetts@canonical.com-20090727080252-1r4s9oqwlkzgywx7
Fix trivial bug in _vfs_set_tags_bytes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007-2010 Canonical Ltd
 
1
# Copyright (C) 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
import bzrlib.hooks
18
 
from bzrlib import transport
19
 
from bzrlib.tests import features
20
18
 
21
19
# SFTPTransport offers better performances but relies on paramiko, if paramiko
22
20
# is not available, we fallback to FtpTransport
23
 
if features.paramiko.available():
24
 
    from bzrlib.tests import test_sftp_transport
 
21
from bzrlib.tests import test_sftp_transport
 
22
if test_sftp_transport.paramiko_loaded:
25
23
    from bzrlib.transport import sftp
26
24
    _backing_scheme = 'sftp'
27
25
    _backing_transport_class = sftp.SFTPTransport
47
45
    """Dict-mapping hook name to a list of callables for transport hooks"""
48
46
 
49
47
    def __init__(self):
50
 
        super(TransportHooks, self).__init__("bzrlib.tests.transport_util",
51
 
            "InstrumentedTransport.hooks")
 
48
        super(TransportHooks, self).__init__()
52
49
        # Invoked when the transport has just created a new connection.
53
50
        # The api signature is (transport, connection, credentials)
54
51
        self['_set_connection'] = []
77
74
            fake_base, _from_transport=_from_transport)
78
75
        # The following is needed to minimize the effects of our trick above
79
76
        # while retaining the best compatibility.
80
 
        self._parsed_url.scheme = _hooked_scheme
81
 
        super(ConnectedTransport, self).__init__(str(self._parsed_url))
 
77
        self._scheme = _hooked_scheme
 
78
        base = self._unsplit_url(self._scheme,
 
79
                                 self._user, self._password,
 
80
                                 self._host, self._port,
 
81
                                 self._path)
 
82
        super(ConnectedTransport, self).__init__(base)
82
83
 
83
84
 
84
85
class ConnectionHookedTransport(InstrumentedTransport):
97
98
    def setUp(self):
98
99
        register_urlparse_netloc_protocol(_hooked_scheme)
99
100
        register_transport(_hooked_scheme, ConnectionHookedTransport)
100
 
        self.addCleanup(unregister_transport, _hooked_scheme,
101
 
                        ConnectionHookedTransport)
102
 
        self.addCleanup(_unregister_urlparse_netloc_protocol, _hooked_scheme)
 
101
 
 
102
        def unregister():
 
103
            unregister_transport(_hooked_scheme, ConnectionHookedTransport)
 
104
            _unregister_urlparse_netloc_protocol(_hooked_scheme)
 
105
 
 
106
        self.addCleanup(unregister)
103
107
        super(TestCaseWithConnectionHookedTransport, self).setUp()
104
108
        self.reset_connections()
105
 
        # Add the 'hooked' url to the permitted url list.
106
 
        # XXX: See TestCase.start_server. This whole module shouldn't need to
107
 
        # exist - a bug has been filed on that. once its cleanedup/removed, the
108
 
        # standard test support code will work and permit the server url
109
 
        # correctly.
110
 
        url = self.get_url()
111
 
        t = transport.get_transport(url)
112
 
        if t.base.endswith('work/'):
113
 
            t = t.clone('../..')
114
 
        self.permit_url(t.base)
115
109
 
116
110
    def get_url(self, relpath=None):
117
111
        super_self = super(TestCaseWithConnectionHookedTransport, self)
122
116
        return url
123
117
 
124
118
    def start_logging_connections(self):
125
 
        self.overrideAttr(InstrumentedTransport, 'hooks', TransportHooks())
126
 
        # We preserved the hooks class attribute. Now we install our hook.
127
119
        ConnectionHookedTransport.hooks.install_named_hook(
128
120
            '_set_connection', self._collect_connection, None)
 
121
        # uninstall our hooks when we are finished
 
122
        self.addCleanup(self.reset_hooks)
 
123
 
 
124
    def reset_hooks(self):
 
125
        InstrumentedTransport.hooks = TransportHooks()
129
126
 
130
127
    def reset_connections(self):
131
128
        self.connections = []