26
26
from StringIO import StringIO as pyStringIO
31
30
from bzrlib import (
35
transport as _mod_transport,
37
38
from bzrlib.errors import (ConnectionError,
45
43
TransportNotPossible,
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
55
53
from bzrlib.tests.test_transport import TestTransportImplementation
56
54
from bzrlib.transport import (
57
55
ConnectedTransport,
59
57
_get_transport_modules,
61
59
from bzrlib.transport.memory import MemoryTransport
64
class TransportTestProviderAdapter(TestScenarioApplier):
65
"""A tool to generate a suite testing all transports for a single test.
67
This is done by copying the test once for each transport and injecting
68
the transport_class and transport_server classes into each copy. Each copy
69
is also given a new id() to make it easy to identify.
73
self.scenarios = self._test_permutations()
75
def get_transport_test_permutations(self, module):
76
"""Get the permutations module wants to have tested."""
77
if getattr(module, 'get_test_permutations', None) is None:
79
"transport module %s doesn't provide get_test_permutations()"
82
return module.get_test_permutations()
84
def _test_permutations(self):
85
"""Return a list of the klass, server_factory pairs to test."""
87
for module in _get_transport_modules():
89
permutations = self.get_transport_test_permutations(
90
reduce(getattr, (module).split('.')[1:], __import__(module)))
91
for (klass, server_factory) in permutations:
92
scenario = (server_factory.__name__,
93
{"transport_class":klass,
94
"transport_server":server_factory})
95
result.append(scenario)
96
except errors.DependencyNotPresent, e:
97
# Continue even if a dependency prevents us
98
# from adding this test
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
103
92
def load_tests(standard_tests, module, loader):
104
93
"""Multiply tests for tranport implementations."""
105
94
result = loader.suiteClass()
106
adapter = TransportTestProviderAdapter()
107
for test in tests.iter_suite_tests(standard_tests):
108
result.addTests(adapter.adapt(test))
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
112
99
class TransportTests(TestTransportImplementation):
115
102
super(TransportTests, self).setUp()
116
self._captureVar('BZR_NO_SMART_VFS', None)
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
118
105
def check_transport_contents(self, content, transport, relpath):
119
"""Check that transport.get(relpath).read() == content."""
120
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
122
109
def test_ensure_base_missing(self):
123
110
""".ensure_base() should create the directory if it doesn't exist"""
167
154
self.assertEqual(True, t.has('a'))
168
155
self.assertEqual(False, t.has('c'))
169
156
self.assertEqual(True, t.has(urlutils.escape('%')))
170
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
[True, True, False, False, True, False, True, False])
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
172
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
[True, True, False, False, True, False, True, False])
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
176
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
179
171
def test_has_root_works(self):
180
from bzrlib.smart import server
181
if self.transport_server is server.SmartTCPServer_for_testing:
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
182
173
raise TestNotApplicable(
183
174
"SmartTCPServer_for_testing intentionally does not allow "
210
201
for content, f in itertools.izip(contents, content_f):
211
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
213
211
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
217
224
def test_get_directory_read_gives_ReadError(self):
218
225
"""consistent errors for read() on a file returned by get()."""
507
520
self.assertTransportMode(t, 'dir777', 0777)
509
522
def test_put_bytes_unicode(self):
510
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
511
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
512
# (we don't want to encode unicode here at all, callers should be
513
# strictly passing bytes to put_bytes), but we allow it for backwards
514
# compatibility. At some point we should use a specific exception.
515
# See https://bugs.launchpad.net/bzr/+bug/106898.
516
523
t = self.get_transport()
517
524
if t.is_readonly():
519
526
unicode_string = u'\u1234'
521
(AssertionError, UnicodeEncodeError),
522
t.put_bytes, 'foo', unicode_string)
524
def test_put_file_unicode(self):
525
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
526
# This situation can happen (and has) if code is careless about the type
527
# of "string" they initialise/write to a StringIO with. We cannot use
528
# cStringIO, because it never returns unicode from read.
529
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
530
# raise, but we raise it for hysterical raisins.
531
t = self.get_transport()
534
unicode_file = pyStringIO(u'\u1234')
535
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
527
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
537
529
def test_mkdir(self):
538
530
t = self.get_transport()
540
532
if t.is_readonly():
541
# cannot mkdir on readonly transports. We're not testing for
533
# cannot mkdir on readonly transports. We're not testing for
542
534
# cache coherency because cache behaviour is not currently
543
535
# defined for the transport interface.
544
536
self.assertRaises(TransportNotPossible, t.mkdir, '.')
979
988
# perhaps all of this could be done in a subdirectory
981
990
t.put_bytes('a', 'a first file\n')
982
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
991
self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
985
self.failUnless(t.has('b'))
986
self.failIf(t.has('a'))
994
self.assertTrue(t.has('b'))
995
self.assertFalse(t.has('a'))
988
997
self.check_transport_contents('a first file\n', t, 'b')
989
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
998
self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
991
1000
# Overwrite a file
992
1001
t.put_bytes('c', 'c this file\n')
993
1002
t.move('c', 'b')
994
self.failIf(t.has('c'))
1003
self.assertFalse(t.has('c'))
995
1004
self.check_transport_contents('c this file\n', t, 'b')
997
1006
# TODO: Try to write a test for atomicity
1067
1076
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1068
1077
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1069
1078
subdir = t.clone('subdir')
1070
subdir.stat('./file')
1079
st = subdir.stat('./file')
1080
st = subdir.stat('.')
1082
def test_hardlink(self):
1083
from stat import ST_NLINK
1085
t = self.get_transport()
1087
source_name = "original_target"
1088
link_name = "target_link"
1090
self.build_tree([source_name], transport=t)
1093
t.hardlink(source_name, link_name)
1095
self.assertTrue(t.has(source_name))
1096
self.assertTrue(t.has(link_name))
1098
st = t.stat(link_name)
1099
self.assertEqual(st[ST_NLINK], 2)
1100
except TransportNotPossible:
1101
raise TestSkipped("Transport %s does not support hardlinks." %
1102
self._server.__class__)
1104
def test_symlink(self):
1105
from stat import S_ISLNK
1107
t = self.get_transport()
1109
source_name = "original_target"
1110
link_name = "target_link"
1112
self.build_tree([source_name], transport=t)
1115
t.symlink(source_name, link_name)
1117
self.assertTrue(t.has(source_name))
1118
self.assertTrue(t.has(link_name))
1120
st = t.stat(link_name)
1121
self.assertTrue(S_ISLNK(st.st_mode),
1122
"expected symlink, got mode %o" % st.st_mode)
1123
except TransportNotPossible:
1124
raise TestSkipped("Transport %s does not support symlinks." %
1125
self._server.__class__)
1127
self.knownFailure("Paramiko fails to create symlinks during tests")
1073
1129
def test_list_dir(self):
1074
1130
# TODO: Test list_dir, just try once, and if it throws, stop testing
1137
1194
raise TestSkipped("not a connected transport")
1139
1196
t2 = t1.clone('subdir')
1140
self.assertEquals(t1._scheme, t2._scheme)
1141
self.assertEquals(t1._user, t2._user)
1142
self.assertEquals(t1._password, t2._password)
1143
self.assertEquals(t1._host, t2._host)
1144
self.assertEquals(t1._port, t2._port)
1197
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1146
1203
def test__reuse_for(self):
1147
1204
t = self.get_transport()
1155
1212
Only the parameters different from None will be changed.
1157
if scheme is None: scheme = t._scheme
1158
if user is None: user = t._user
1159
if password is None: password = t._password
1160
if user is None: user = t._user
1161
if host is None: host = t._host
1162
if port is None: port = t._port
1163
if path is None: path = t._path
1164
return t._unsplit_url(scheme, user, password, host, port, path)
1214
if scheme is None: scheme = t._parsed_url.scheme
1215
if user is None: user = t._parsed_url.user
1216
if password is None: password = t._parsed_url.password
1217
if user is None: user = t._parsed_url.user
1218
if host is None: host = t._parsed_url.host
1219
if port is None: port = t._parsed_url.port
1220
if path is None: path = t._parsed_url.path
1221
return str(urlutils.URL(scheme, user, password, host, port, path))
1166
if t._scheme == 'ftp':
1223
if t._parsed_url.scheme == 'ftp':
1167
1224
scheme = 'sftp'
1170
1227
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
if t._parsed_url.user == 'me':
1234
1291
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1236
self.failUnless(t1.has('a'))
1237
self.failUnless(t1.has('b/c'))
1238
self.failIf(t1.has('c'))
1293
self.assertTrue(t1.has('a'))
1294
self.assertTrue(t1.has('b/c'))
1295
self.assertFalse(t1.has('c'))
1240
1297
t2 = t1.clone('b')
1241
1298
self.assertEqual(t1.base + 'b/', t2.base)
1243
self.failUnless(t2.has('c'))
1244
self.failIf(t2.has('a'))
1300
self.assertTrue(t2.has('c'))
1301
self.assertFalse(t2.has('a'))
1246
1303
t3 = t2.clone('..')
1247
self.failUnless(t3.has('a'))
1248
self.failIf(t3.has('c'))
1304
self.assertTrue(t3.has('a'))
1305
self.assertFalse(t3.has('c'))
1250
self.failIf(t1.has('b/d'))
1251
self.failIf(t2.has('d'))
1252
self.failIf(t3.has('b/d'))
1307
self.assertFalse(t1.has('b/d'))
1308
self.assertFalse(t2.has('d'))
1309
self.assertFalse(t3.has('b/d'))
1254
1311
if t1.is_readonly():
1255
open('b/d', 'wb').write('newfile\n')
1312
self.build_tree_contents([('b/d', 'newfile\n')])
1257
1314
t2.put_bytes('d', 'newfile\n')
1259
self.failUnless(t1.has('b/d'))
1260
self.failUnless(t2.has('d'))
1261
self.failUnless(t3.has('b/d'))
1316
self.assertTrue(t1.has('b/d'))
1317
self.assertTrue(t2.has('d'))
1318
self.assertTrue(t3.has('b/d'))
1263
1320
def test_clone_to_root(self):
1264
1321
orig_transport = self.get_transport()
1338
1395
self.assertEqual(transport.clone("/").abspath('foo'),
1339
1396
transport.abspath("/foo"))
1398
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1399
def test_win32_abspath(self):
1400
# Note: we tried to set sys.platform='win32' so we could test on
1401
# other platforms too, but then osutils does platform specific
1402
# things at import time which defeated us...
1403
if sys.platform != 'win32':
1405
'Testing drive letters in abspath implemented only for win32')
1407
# smoke test for abspath on win32.
1408
# a transport based on 'file:///' never fully qualifies the drive.
1409
transport = _mod_transport.get_transport_from_url("file:///")
1410
self.assertEqual(transport.abspath("/"), "file:///")
1412
# but a transport that starts with a drive spec must keep it.
1413
transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
self.assertEqual(transport.abspath("/"), "file:///C:/")
1341
1416
def test_local_abspath(self):
1342
1417
transport = self.get_transport()
1420
1495
'to/dir/b%2525z',
1498
def test_copy_tree_to_transport(self):
1499
transport = self.get_transport()
1500
if not transport.listable():
1501
self.assertRaises(TransportNotPossible,
1502
transport.iter_files_recursive)
1504
if transport.is_readonly():
1506
self.build_tree(['from/',
1510
'from/dir/b%25z', # make sure quoting is correct
1512
transport=transport)
1513
from_transport = transport.clone('from')
1514
to_transport = transport.clone('to')
1515
to_transport.ensure_base()
1516
from_transport.copy_tree_to_transport(to_transport)
1517
paths = set(transport.iter_files_recursive())
1518
self.assertEqual(paths,
1519
set(['from/dir/foo',
1423
1528
def test_unicode_paths(self):
1424
1529
"""Test that we can read/write files with Unicode names."""
1425
1530
t = self.get_transport()
1459
1568
transport.put_bytes('foo', 'bar')
1460
1569
transport3 = self.get_transport()
1461
1570
self.check_transport_contents('bar', transport3, 'foo')
1462
# its base should be usable.
1463
transport4 = get_transport(transport.base)
1464
self.check_transport_contents('bar', transport4, 'foo')
1466
1572
# now opening at a relative url should give use a sane result:
1467
1573
transport.mkdir('newdir')
1468
transport5 = get_transport(transport.base + "newdir")
1574
transport5 = self.get_transport('newdir')
1469
1575
transport6 = transport5.clone('..')
1470
1576
self.check_transport_contents('bar', transport6, 'foo')
1657
1763
# also raise a special error
1658
1764
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1659
1765
transport.readv, 'a', [(12,2)])
1767
def test_no_segment_parameters(self):
1768
"""Segment parameters should be stripped and stored in
1769
transport.segment_parameters."""
1770
transport = self.get_transport("foo")
1771
self.assertEqual({}, transport.get_segment_parameters())
1773
def test_segment_parameters(self):
1774
"""Segment parameters should be stripped and stored in
1775
transport.get_segment_parameters()."""
1776
base_url = self._server.get_url()
1777
parameters = {"key1": "val1", "key2": "val2"}
1778
url = urlutils.join_segment_parameters(base_url, parameters)
1779
transport = _mod_transport.get_transport_from_url(url)
1780
self.assertEqual(parameters, transport.get_segment_parameters())
1782
def test_set_segment_parameters(self):
1783
"""Segment parameters can be set and show up in base."""
1784
transport = self.get_transport("foo")
1785
orig_base = transport.base
1786
transport.set_segment_parameter("arm", "board")
1787
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
transport.set_segment_parameter("arm", None)
1790
transport.set_segment_parameter("nonexistant", None)
1791
self.assertEqual({}, transport.get_segment_parameters())
1792
self.assertEqual(orig_base, transport.base)
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))
1806
def test_abspath_url_unquote_unreserved(self):
1807
"""URLs from abspath should have unreserved characters unquoted
1809
Need consistent quoting notably for tildes, see lp:842223 for more.
1811
t = self.get_transport()
1812
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
self.assertEqual(t.base + "-.09AZ_az~",
1814
t.abspath(needlessly_escaped_dir))
1816
def test_clone_url_unquote_unreserved(self):
1817
"""Base URL of a cloned branch needs unreserved characters unquoted
1819
Cloned transports should be prefix comparable for things like the
1820
isolation checking of tests, see lp:842223 for more.
1822
t1 = self.get_transport()
1823
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
self.build_tree([needlessly_escaped_dir], transport=t1)
1825
t2 = t1.clone(needlessly_escaped_dir)
1826
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1828
def test_hook_post_connection_one(self):
1829
"""Fire post_connect hook after a ConnectedTransport is first used"""
1831
Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
t = self.get_transport()
1833
self.assertEqual([], log)
1834
t.has("non-existant")
1835
if isinstance(t, RemoteTransport):
1836
self.assertEqual([t.get_smart_medium()], log)
1837
elif isinstance(t, ConnectedTransport):
1838
self.assertEqual([t], log)
1840
self.assertEqual([], log)
1842
def test_hook_post_connection_multi(self):
1843
"""Fire post_connect hook once per unshared underlying connection"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t1 = self.get_transport()
1848
t3 = self.get_transport()
1849
self.assertEqual([], log)
1853
if isinstance(t1, RemoteTransport):
1854
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
elif isinstance(t1, ConnectedTransport):
1856
self.assertEqual([t1, t3], log)
1858
self.assertEqual([], log)