26
26
from StringIO import StringIO as pyStringIO
31
30
from bzrlib import (
35
transport as _mod_transport,
37
38
from bzrlib.errors import (ConnectionError,
45
43
TransportNotPossible,
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
55
53
from bzrlib.tests.test_transport import TestTransportImplementation
56
54
from bzrlib.transport import (
57
55
ConnectedTransport,
59
56
_get_transport_modules,
61
58
from bzrlib.transport.memory import MemoryTransport
64
class TransportTestProviderAdapter(TestScenarioApplier):
65
"""A tool to generate a suite testing all transports for a single test.
67
This is done by copying the test once for each transport and injecting
68
the transport_class and transport_server classes into each copy. Each copy
69
is also given a new id() to make it easy to identify.
73
self.scenarios = self._test_permutations()
75
def get_transport_test_permutations(self, module):
76
"""Get the permutations module wants to have tested."""
77
if getattr(module, 'get_test_permutations', None) is None:
79
"transport module %s doesn't provide get_test_permutations()"
82
return module.get_test_permutations()
84
def _test_permutations(self):
85
"""Return a list of the klass, server_factory pairs to test."""
87
for module in _get_transport_modules():
89
permutations = self.get_transport_test_permutations(
90
reduce(getattr, (module).split('.')[1:], __import__(module)))
91
for (klass, server_factory) in permutations:
92
scenario = (server_factory.__name__,
93
{"transport_class":klass,
94
"transport_server":server_factory})
95
result.append(scenario)
96
except errors.DependencyNotPresent, e:
97
# Continue even if a dependency prevents us
98
# from adding this test
61
def get_transport_test_permutations(module):
62
"""Get the permutations module wants to have tested."""
63
if getattr(module, 'get_test_permutations', None) is None:
65
"transport module %s doesn't provide get_test_permutations()"
68
return module.get_test_permutations()
71
def transport_test_permutations():
72
"""Return a list of the klass, server_factory pairs to test."""
74
for module in _get_transport_modules():
76
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
78
for (klass, server_factory) in permutations:
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
{"transport_class":klass,
81
"transport_server":server_factory})
82
result.append(scenario)
83
except errors.DependencyNotPresent, e:
84
# Continue even if a dependency prevents us
85
# from adding this test
103
90
def load_tests(standard_tests, module, loader):
104
91
"""Multiply tests for tranport implementations."""
105
92
result = loader.suiteClass()
106
adapter = TransportTestProviderAdapter()
107
for test in tests.iter_suite_tests(standard_tests):
108
result.addTests(adapter.adapt(test))
93
scenarios = transport_test_permutations()
94
return multiply_tests(standard_tests, scenarios, result)
112
97
class TransportTests(TestTransportImplementation):
115
100
super(TransportTests, self).setUp()
116
self._captureVar('BZR_NO_SMART_VFS', None)
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
118
103
def check_transport_contents(self, content, transport, relpath):
119
"""Check that transport.get(relpath).read() == content."""
120
self.assertEqualDiff(content, transport.get(relpath).read())
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
122
107
def test_ensure_base_missing(self):
123
108
""".ensure_base() should create the directory if it doesn't exist"""
167
152
self.assertEqual(True, t.has('a'))
168
153
self.assertEqual(False, t.has('c'))
169
154
self.assertEqual(True, t.has(urlutils.escape('%')))
170
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
[True, True, False, False, True, False, True, False])
155
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
156
'e', 'f', 'g', 'h'])),
157
[True, True, False, False,
158
True, False, True, False])
172
159
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
[True, True, False, False, True, False, True, False])
160
self.assertEqual(False, t.has_any(['c', 'd', 'f',
161
urlutils.escape('%%')]))
162
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
163
'e', 'f', 'g', 'h']))),
164
[True, True, False, False,
165
True, False, True, False])
176
166
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
167
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
179
169
def test_has_root_works(self):
180
from bzrlib.smart import server
181
if self.transport_server is server.SmartTCPServer_for_testing:
170
if self.transport_server is test_server.SmartTCPServer_for_testing:
182
171
raise TestNotApplicable(
183
172
"SmartTCPServer_for_testing intentionally does not allow "
210
199
for content, f in itertools.izip(contents, content_f):
211
200
self.assertEqual(content, f.read())
202
def test_get_unknown_file(self):
203
t = self.get_transport()
205
contents = ['contents of a\n',
208
self.build_tree(files, transport=t, line_endings='binary')
213
209
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
210
def iterate_and_close(func, *args):
211
for f in func(*args):
212
# We call f.read() here because things like paramiko actually
213
# spawn a thread to prefetch the content, which we want to
214
# consume before we close the handle.
217
self.assertRaises(NoSuchFile, iterate_and_close,
218
t.get_multi, ['a', 'b', 'c'])
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, iter(['a', 'b', 'c']))
217
222
def test_get_directory_read_gives_ReadError(self):
218
223
"""consistent errors for read() on a file returned by get()."""
1067
1088
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1068
1089
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1069
1090
subdir = t.clone('subdir')
1070
subdir.stat('./file')
1091
st = subdir.stat('./file')
1092
st = subdir.stat('.')
1094
def test_hardlink(self):
1095
from stat import ST_NLINK
1097
t = self.get_transport()
1099
source_name = "original_target"
1100
link_name = "target_link"
1102
self.build_tree([source_name], transport=t)
1105
t.hardlink(source_name, link_name)
1107
self.assertTrue(t.has(source_name))
1108
self.assertTrue(t.has(link_name))
1110
st = t.stat(link_name)
1111
self.assertEqual(st[ST_NLINK], 2)
1112
except TransportNotPossible:
1113
raise TestSkipped("Transport %s does not support hardlinks." %
1114
self._server.__class__)
1116
def test_symlink(self):
1117
from stat import S_ISLNK
1119
t = self.get_transport()
1121
source_name = "original_target"
1122
link_name = "target_link"
1124
self.build_tree([source_name], transport=t)
1127
t.symlink(source_name, link_name)
1129
self.assertTrue(t.has(source_name))
1130
self.assertTrue(t.has(link_name))
1132
st = t.stat(link_name)
1133
self.assertTrue(S_ISLNK(st.st_mode),
1134
"expected symlink, got mode %o" % st.st_mode)
1135
except TransportNotPossible:
1136
raise TestSkipped("Transport %s does not support symlinks." %
1137
self._server.__class__)
1139
self.knownFailure("Paramiko fails to create symlinks during tests")
1073
1141
def test_list_dir(self):
1074
1142
# TODO: Test list_dir, just try once, and if it throws, stop testing
1137
1206
raise TestSkipped("not a connected transport")
1139
1208
t2 = t1.clone('subdir')
1140
self.assertEquals(t1._scheme, t2._scheme)
1141
self.assertEquals(t1._user, t2._user)
1142
self.assertEquals(t1._password, t2._password)
1143
self.assertEquals(t1._host, t2._host)
1144
self.assertEquals(t1._port, t2._port)
1209
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1210
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1211
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1212
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1213
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1146
1215
def test__reuse_for(self):
1147
1216
t = self.get_transport()
1155
1224
Only the parameters different from None will be changed.
1157
if scheme is None: scheme = t._scheme
1158
if user is None: user = t._user
1159
if password is None: password = t._password
1160
if user is None: user = t._user
1161
if host is None: host = t._host
1162
if port is None: port = t._port
1163
if path is None: path = t._path
1164
return t._unsplit_url(scheme, user, password, host, port, path)
1226
if scheme is None: scheme = t._parsed_url.scheme
1227
if user is None: user = t._parsed_url.user
1228
if password is None: password = t._parsed_url.password
1229
if user is None: user = t._parsed_url.user
1230
if host is None: host = t._parsed_url.host
1231
if port is None: port = t._parsed_url.port
1232
if path is None: path = t._parsed_url.path
1233
return str(urlutils.URL(scheme, user, password, host, port, path))
1166
if t._scheme == 'ftp':
1235
if t._parsed_url.scheme == 'ftp':
1167
1236
scheme = 'sftp'
1170
1239
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1240
if t._parsed_url.user == 'me':
1234
1303
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1236
self.failUnless(t1.has('a'))
1237
self.failUnless(t1.has('b/c'))
1238
self.failIf(t1.has('c'))
1305
self.assertTrue(t1.has('a'))
1306
self.assertTrue(t1.has('b/c'))
1307
self.assertFalse(t1.has('c'))
1240
1309
t2 = t1.clone('b')
1241
1310
self.assertEqual(t1.base + 'b/', t2.base)
1243
self.failUnless(t2.has('c'))
1244
self.failIf(t2.has('a'))
1312
self.assertTrue(t2.has('c'))
1313
self.assertFalse(t2.has('a'))
1246
1315
t3 = t2.clone('..')
1247
self.failUnless(t3.has('a'))
1248
self.failIf(t3.has('c'))
1316
self.assertTrue(t3.has('a'))
1317
self.assertFalse(t3.has('c'))
1250
self.failIf(t1.has('b/d'))
1251
self.failIf(t2.has('d'))
1252
self.failIf(t3.has('b/d'))
1319
self.assertFalse(t1.has('b/d'))
1320
self.assertFalse(t2.has('d'))
1321
self.assertFalse(t3.has('b/d'))
1254
1323
if t1.is_readonly():
1255
1324
self.build_tree_contents([('b/d', 'newfile\n')])
1257
1326
t2.put_bytes('d', 'newfile\n')
1259
self.failUnless(t1.has('b/d'))
1260
self.failUnless(t2.has('d'))
1261
self.failUnless(t3.has('b/d'))
1328
self.assertTrue(t1.has('b/d'))
1329
self.assertTrue(t2.has('d'))
1330
self.assertTrue(t3.has('b/d'))
1263
1332
def test_clone_to_root(self):
1264
1333
orig_transport = self.get_transport()
1349
1419
# smoke test for abspath on win32.
1350
1420
# a transport based on 'file:///' never fully qualifies the drive.
1351
transport = get_transport("file:///")
1352
self.failUnlessEqual(transport.abspath("/"), "file:///")
1421
transport = _mod_transport.get_transport_from_url("file:///")
1422
self.assertEqual(transport.abspath("/"), "file:///")
1354
1424
# but a transport that starts with a drive spec must keep it.
1355
transport = get_transport("file:///C:/")
1356
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1425
transport = _mod_transport.get_transport_from_url("file:///C:/")
1426
self.assertEqual(transport.abspath("/"), "file:///C:/")
1358
1428
def test_local_abspath(self):
1359
1429
transport = self.get_transport()
1506
1580
transport.put_bytes('foo', 'bar')
1507
1581
transport3 = self.get_transport()
1508
1582
self.check_transport_contents('bar', transport3, 'foo')
1509
# its base should be usable.
1510
transport4 = get_transport(transport.base)
1511
self.check_transport_contents('bar', transport4, 'foo')
1513
1584
# now opening at a relative url should give use a sane result:
1514
1585
transport.mkdir('newdir')
1515
transport5 = get_transport(transport.base + "newdir")
1586
transport5 = self.get_transport('newdir')
1516
1587
transport6 = transport5.clone('..')
1517
1588
self.check_transport_contents('bar', transport6, 'foo')
1704
1775
# also raise a special error
1705
1776
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1706
1777
transport.readv, 'a', [(12,2)])
1779
def test_no_segment_parameters(self):
1780
"""Segment parameters should be stripped and stored in
1781
transport.segment_parameters."""
1782
transport = self.get_transport("foo")
1783
self.assertEquals({}, transport.get_segment_parameters())
1785
def test_segment_parameters(self):
1786
"""Segment parameters should be stripped and stored in
1787
transport.get_segment_parameters()."""
1788
base_url = self._server.get_url()
1789
parameters = {"key1": "val1", "key2": "val2"}
1790
url = urlutils.join_segment_parameters(base_url, parameters)
1791
transport = _mod_transport.get_transport_from_url(url)
1792
self.assertEquals(parameters, transport.get_segment_parameters())
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))