26
26
from StringIO import StringIO as pyStringIO
31
30
from bzrlib import (
35
transport as _mod_transport,
37
38
from bzrlib.errors import (ConnectionError,
45
43
TransportNotPossible,
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
78
76
for module in _get_transport_modules():
80
78
permutations = get_transport_test_permutations(
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
79
pyutils.get_named_object(module))
82
80
for (klass, server_factory) in permutations:
83
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
{"transport_class":klass,
104
102
super(TransportTests, self).setUp()
105
self._captureVar('BZR_NO_SMART_VFS', None)
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
107
105
def check_transport_contents(self, content, transport, relpath):
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
111
109
def test_ensure_base_missing(self):
112
110
""".ensure_base() should create the directory if it doesn't exist"""
212
210
self.build_tree(files, transport=t, line_endings='binary')
213
211
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
217
224
def test_get_directory_read_gives_ReadError(self):
218
225
"""consistent errors for read() on a file returned by get()."""
287
298
t.put_bytes('a', 'some text for a\n')
288
self.failUnless(t.has('a'))
299
self.assertTrue(t.has('a'))
289
300
self.check_transport_contents('some text for a\n', t, 'a')
291
302
# The contents should be overwritten
303
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
306
self.failIf(t.has('a'))
317
self.assertFalse(t.has('a'))
307
318
t.put_bytes_non_atomic('a', 'some text for a\n')
308
self.failUnless(t.has('a'))
319
self.assertTrue(t.has('a'))
309
320
self.check_transport_contents('some text for a\n', t, 'a')
310
321
# Put also replaces contents
311
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
334
# Now test the create_parent flag
324
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
self.failIf(t.has('dir/a'))
337
self.assertFalse(t.has('dir/a'))
327
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
339
create_parent_dir=True)
329
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
412
result = t.put_file('a', StringIO('some text for a\n'))
402
413
# put_file returns the length of the data written
403
414
self.assertEqual(16, result)
404
self.failUnless(t.has('a'))
415
self.assertTrue(t.has('a'))
405
416
self.check_transport_contents('some text for a\n', t, 'a')
406
417
# Put also replaces contents
407
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
422
self.failIf(t.has('a'))
433
self.assertFalse(t.has('a'))
423
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
self.failUnless(t.has('a'))
435
self.assertTrue(t.has('a'))
425
436
self.check_transport_contents('some text for a\n', t, 'a')
426
437
# Put also replaces contents
427
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
450
# Now test the create_parent flag
440
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
452
StringIO('contents\n'))
442
self.failIf(t.has('dir/a'))
453
self.assertFalse(t.has('dir/a'))
443
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
455
create_parent_dir=True)
445
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
644
655
self.build_tree(files, transport=transport_from)
645
656
self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
self.check_transport_contents(transport_to.get(f).read(),
658
self.check_transport_contents(transport_to.get_bytes(f),
648
659
transport_from, f)
650
661
t = self.get_transport()
673
684
files = ['a', 'b', 'c', 'd']
674
685
t.copy_to(iter(files), temp_transport)
676
self.check_transport_contents(temp_transport.get(f).read(),
687
self.check_transport_contents(temp_transport.get_bytes(f),
678
689
del temp_transport
824
835
t.put_bytes('a', 'a little bit of text\n')
825
self.failUnless(t.has('a'))
836
self.assertTrue(t.has('a'))
827
self.failIf(t.has('a'))
838
self.assertFalse(t.has('a'))
829
840
self.assertRaises(NoSuchFile, t.delete, 'a')
836
847
t.delete_multi(['a', 'c'])
837
848
self.assertEqual([False, True, False],
838
849
list(t.has_multi(['a', 'b', 'c'])))
839
self.failIf(t.has('a'))
840
self.failUnless(t.has('b'))
841
self.failIf(t.has('c'))
850
self.assertFalse(t.has('a'))
851
self.assertTrue(t.has('b'))
852
self.assertFalse(t.has('c'))
843
854
self.assertRaises(NoSuchFile,
844
855
t.delete_multi, ['a', 'b', 'c'])
994
1005
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
996
1007
t.move('a', 'b')
997
self.failUnless(t.has('b'))
998
self.failIf(t.has('a'))
1008
self.assertTrue(t.has('b'))
1009
self.assertFalse(t.has('a'))
1000
1011
self.check_transport_contents('a first file\n', t, 'b')
1001
1012
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1063
1074
for path, size in zip(paths, sizes):
1064
1075
st = t.stat(path)
1065
1076
if path.endswith('/'):
1066
self.failUnless(S_ISDIR(st.st_mode))
1077
self.assertTrue(S_ISDIR(st.st_mode))
1067
1078
# directory sizes are meaningless
1069
self.failUnless(S_ISREG(st.st_mode))
1080
self.assertTrue(S_ISREG(st.st_mode))
1070
1081
self.assertEqual(size, st.st_size)
1072
1083
remote_stats = list(t.stat_multi(paths))
1079
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
1092
subdir = t.clone('subdir')
1082
subdir.stat('./file')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1085
1096
def test_hardlink(self):
1086
1097
from stat import ST_NLINK
1096
1107
t.hardlink(source_name, link_name)
1098
self.failUnless(t.has(source_name))
1099
self.failUnless(t.has(link_name))
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1101
1112
st = t.stat(link_name)
1102
self.failUnlessEqual(st[ST_NLINK], 2)
1113
self.assertEqual(st[ST_NLINK], 2)
1103
1114
except TransportNotPossible:
1104
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1105
1116
self._server.__class__)
1118
1129
t.symlink(source_name, link_name)
1120
self.failUnless(t.has(source_name))
1121
self.failUnless(t.has(link_name))
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1123
1134
st = t.stat(link_name)
1124
self.failUnless(S_ISLNK(st.st_mode),
1135
self.assertTrue(S_ISLNK(st.st_mode),
1125
1136
"expected symlink, got mode %o" % st.st_mode)
1126
1137
except TransportNotPossible:
1127
1138
raise TestSkipped("Transport %s does not support symlinks." %
1128
1139
self._server.__class__)
1129
1140
except IOError:
1130
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
1132
1143
def test_list_dir(self):
1133
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
1197
1208
raise TestSkipped("not a connected transport")
1199
1210
t2 = t1.clone('subdir')
1200
self.assertEquals(t1._scheme, t2._scheme)
1201
self.assertEquals(t1._user, t2._user)
1202
self.assertEquals(t1._password, t2._password)
1203
self.assertEquals(t1._host, t2._host)
1204
self.assertEquals(t1._port, t2._port)
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1206
1217
def test__reuse_for(self):
1207
1218
t = self.get_transport()
1215
1226
Only the parameters different from None will be changed.
1217
if scheme is None: scheme = t._scheme
1218
if user is None: user = t._user
1219
if password is None: password = t._password
1220
if user is None: user = t._user
1221
if host is None: host = t._host
1222
if port is None: port = t._port
1223
if path is None: path = t._path
1224
return t._unsplit_url(scheme, user, password, host, port, path)
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1226
if t._scheme == 'ftp':
1237
if t._parsed_url.scheme == 'ftp':
1227
1238
scheme = 'sftp'
1230
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1245
1256
# (they may be typed by the user when prompted for example)
1246
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1258
# We will not connect, we can use a invalid host
1248
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1294
1305
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.failUnless(t1.has('a'))
1297
self.failUnless(t1.has('b/c'))
1298
self.failIf(t1.has('c'))
1307
self.assertTrue(t1.has('a'))
1308
self.assertTrue(t1.has('b/c'))
1309
self.assertFalse(t1.has('c'))
1300
1311
t2 = t1.clone('b')
1301
1312
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.failUnless(t2.has('c'))
1304
self.failIf(t2.has('a'))
1314
self.assertTrue(t2.has('c'))
1315
self.assertFalse(t2.has('a'))
1306
1317
t3 = t2.clone('..')
1307
self.failUnless(t3.has('a'))
1308
self.failIf(t3.has('c'))
1318
self.assertTrue(t3.has('a'))
1319
self.assertFalse(t3.has('c'))
1310
self.failIf(t1.has('b/d'))
1311
self.failIf(t2.has('d'))
1312
self.failIf(t3.has('b/d'))
1321
self.assertFalse(t1.has('b/d'))
1322
self.assertFalse(t2.has('d'))
1323
self.assertFalse(t3.has('b/d'))
1314
1325
if t1.is_readonly():
1315
1326
self.build_tree_contents([('b/d', 'newfile\n')])
1317
1328
t2.put_bytes('d', 'newfile\n')
1319
self.failUnless(t1.has('b/d'))
1320
self.failUnless(t2.has('d'))
1321
self.failUnless(t3.has('b/d'))
1330
self.assertTrue(t1.has('b/d'))
1331
self.assertTrue(t2.has('d'))
1332
self.assertTrue(t3.has('b/d'))
1323
1334
def test_clone_to_root(self):
1324
1335
orig_transport = self.get_transport()
1398
1409
self.assertEqual(transport.clone("/").abspath('foo'),
1399
1410
transport.abspath("/foo"))
1412
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1413
def test_win32_abspath(self):
1402
1414
# Note: we tried to set sys.platform='win32' so we could test on
1403
1415
# other platforms too, but then osutils does platform specific
1409
1421
# smoke test for abspath on win32.
1410
1422
# a transport based on 'file:///' never fully qualifies the drive.
1411
transport = get_transport("file:///")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///")
1423
transport = _mod_transport.get_transport_from_url("file:///")
1424
self.assertEqual(transport.abspath("/"), "file:///")
1414
1426
# but a transport that starts with a drive spec must keep it.
1415
transport = get_transport("file:///C:/")
1416
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1427
transport = _mod_transport.get_transport_from_url("file:///C:/")
1428
self.assertEqual(transport.abspath("/"), "file:///C:/")
1418
1430
def test_local_abspath(self):
1419
1431
transport = self.get_transport()
1546
1558
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1559
if no_unicode_support:
1548
raise tests.KnownFailure("test server cannot handle unicode paths")
1560
self.knownFailure("test server cannot handle unicode paths")
1551
1563
self.build_tree(files, transport=t, line_endings='binary')
1616
1628
def test_readv(self):
1617
1629
transport = self.get_transport()
1618
1630
if transport.is_readonly():
1619
file('a', 'w').write('0123456789')
1631
with file('a', 'w') as f: f.write('0123456789')
1621
1633
transport.put_bytes('a', '0123456789')
1632
1644
def test_readv_out_of_order(self):
1633
1645
transport = self.get_transport()
1634
1646
if transport.is_readonly():
1635
file('a', 'w').write('0123456789')
1647
with file('a', 'w') as f: f.write('0123456789')
1637
1649
transport.put_bytes('a', '01234567890')
1710
1722
transport = self.get_transport()
1711
1723
# test from observed failure case.
1712
1724
if transport.is_readonly():
1713
file('a', 'w').write('a'*1024*1024)
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1715
1727
transport.put_bytes('a', 'a'*1024*1024)
1716
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1762
def test_readv_short_read(self):
1751
1763
transport = self.get_transport()
1752
1764
if transport.is_readonly():
1753
file('a', 'w').write('0123456789')
1765
with file('a', 'w') as f: f.write('0123456789')
1755
1767
transport.put_bytes('a', '01234567890')
1766
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1769
1808
def test_stat_symlink(self):
1770
1809
# if a transport points directly to a symlink (and supports symlinks
1771
1810
# at all) you can tell this. helps with bug 32669.
1777
1816
t2 = t.clone('link')
1778
1817
st = t2.stat('')
1779
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)