1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from bzrlib.tests import TestCase
18
from bzrlib.errors import SSHVendorNotFound, UnknownSSH
19
from bzrlib.transport.ssh import (
20
OpenSSHSubprocessVendor,
21
PLinkSubprocessVendor,
22
SSHCorpSubprocessVendor,
27
class TestSSHVendorManager(SSHVendorManager):
29
_ssh_version_string = ""
31
def set_ssh_version_string(self, version):
32
self._ssh_version_string = version
34
def _get_ssh_version_string(self, args):
35
return self._ssh_version_string
38
class SSHVendorManagerTests(TestCase):
40
def test_register_vendor(self):
41
manager = TestSSHVendorManager()
42
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
44
manager.register_vendor("vendor", vendor)
45
self.assertIs(manager.get_vendor({"BZR_SSH": "vendor"}), vendor)
47
def test_default_vendor(self):
48
manager = TestSSHVendorManager()
49
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
51
manager.register_default_vendor(vendor)
52
self.assertIs(manager.get_vendor({}), vendor)
54
def test_get_vendor_by_environment(self):
55
manager = TestSSHVendorManager()
56
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
57
self.assertRaises(UnknownSSH,
58
manager.get_vendor, {"BZR_SSH": "vendor"})
60
manager.register_vendor("vendor", vendor)
61
self.assertIs(manager.get_vendor({"BZR_SSH": "vendor"}), vendor)
63
def test_get_vendor_by_inspection_openssh(self):
64
manager = TestSSHVendorManager()
65
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
66
manager.set_ssh_version_string("OpenSSH")
67
self.assertIsInstance(manager.get_vendor({}), OpenSSHSubprocessVendor)
69
def test_get_vendor_by_inspection_sshcorp(self):
70
manager = TestSSHVendorManager()
71
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
72
manager.set_ssh_version_string("SSH Secure Shell")
73
self.assertIsInstance(manager.get_vendor({}), SSHCorpSubprocessVendor)
75
def test_get_vendor_by_inspection_plink(self):
76
manager = TestSSHVendorManager()
77
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
78
manager.set_ssh_version_string("plink")
79
self.assertIsInstance(manager.get_vendor({}), PLinkSubprocessVendor)
81
def test_cached_vendor(self):
82
manager = TestSSHVendorManager()
83
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
85
manager.register_vendor("vendor", vendor)
86
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
87
# Once the vendor is found the result is cached (mainly because of the
88
# 'get_vendor' sometimes can be an expensive operation) and later
89
# invocations of the 'get_vendor' just returns the cached value.
90
self.assertIs(manager.get_vendor({"BZR_SSH": "vendor"}), vendor)
91
self.assertIs(manager.get_vendor({}), vendor)
92
# The cache can be cleared by the 'clear_cache' method
94
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
96
def test_get_vendor_search_order(self):
97
# The 'get_vendor' method search for SSH vendors as following:
99
# 1. Check previously cached value
100
# 2. Check BZR_SSH environment variable
101
# 3. Check the system for known SSH vendors
102
# 4. Fall back to the default vendor if registered
104
# Let's now check the each check method in the reverse order
105
# clearing the cache between each invocation:
107
manager = TestSSHVendorManager()
108
# At first no vendors are found
109
self.assertRaises(SSHVendorNotFound, manager.get_vendor, {})
111
# If the default vendor is registered it will be returned
112
default_vendor = object()
113
manager.register_default_vendor(default_vendor)
114
self.assertIs(manager.get_vendor({}), default_vendor)
116
# If the known vendor is found in the system it will be returned
117
manager.clear_cache()
118
manager.set_ssh_version_string("OpenSSH")
119
self.assertIsInstance(manager.get_vendor({}), OpenSSHSubprocessVendor)
121
# If the BZR_SSH environment variable is found it will be treated as
123
manager.clear_cache()
125
manager.register_vendor("vendor", vendor)
126
self.assertIs(manager.get_vendor({"BZR_SSH": "vendor"}), vendor)
128
# Last cached value always checked first
129
self.assertIs(manager.get_vendor({}), vendor)
132
class SubprocessVendorsTests(TestCase):
134
def test_openssh_command_arguments(self):
135
vendor = OpenSSHSubprocessVendor()
137
vendor._get_vendor_specific_argv(
138
"user", "host", 100, command=["bzr"]),
139
["ssh", "-oForwardX11=no", "-oForwardAgent=no",
140
"-oClearAllForwardings=yes", "-oProtocol=2",
141
"-oNoHostAuthenticationForLocalhost=yes",
147
def test_openssh_subsystem_arguments(self):
148
vendor = OpenSSHSubprocessVendor()
150
vendor._get_vendor_specific_argv(
151
"user", "host", 100, subsystem="sftp"),
152
["ssh", "-oForwardX11=no", "-oForwardAgent=no",
153
"-oClearAllForwardings=yes", "-oProtocol=2",
154
"-oNoHostAuthenticationForLocalhost=yes",
157
"-s", "host", "sftp"]
160
def test_sshcorp_command_arguments(self):
161
vendor = SSHCorpSubprocessVendor()
163
vendor._get_vendor_specific_argv(
164
"user", "host", 100, command=["bzr"]),
171
def test_sshcorp_subsystem_arguments(self):
172
vendor = SSHCorpSubprocessVendor()
174
vendor._get_vendor_specific_argv(
175
"user", "host", 100, subsystem="sftp"),
179
"-s", "sftp", "host"]
182
def test_plink_command_arguments(self):
183
vendor = PLinkSubprocessVendor()
185
vendor._get_vendor_specific_argv(
186
"user", "host", 100, command=["bzr"]),
187
["plink", "-x", "-a", "-ssh", "-2",
193
def test_plink_subsystem_arguments(self):
194
vendor = PLinkSubprocessVendor()
196
vendor._get_vendor_specific_argv(
197
"user", "host", 100, subsystem="sftp"),
198
["plink", "-x", "-a", "-ssh", "-2",
201
"-s", "host", "sftp"]