26
26
from StringIO import StringIO as pyStringIO
31
30
from bzrlib import (
35
transport as _mod_transport,
37
38
from bzrlib.errors import (ConnectionError,
45
43
TransportNotPossible,
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
78
76
for module in _get_transport_modules():
80
78
permutations = get_transport_test_permutations(
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
79
pyutils.get_named_object(module))
82
80
for (klass, server_factory) in permutations:
83
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
{"transport_class":klass,
104
102
super(TransportTests, self).setUp()
105
self._captureVar('BZR_NO_SMART_VFS', None)
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
107
105
def check_transport_contents(self, content, transport, relpath):
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
111
109
def test_ensure_base_missing(self):
112
110
""".ensure_base() should create the directory if it doesn't exist"""
212
210
self.build_tree(files, transport=t, line_endings='binary')
213
211
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
217
224
def test_get_directory_read_gives_ReadError(self):
218
225
"""consistent errors for read() on a file returned by get()."""
288
298
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
299
self.assertTrue(t.has('a'))
290
300
self.check_transport_contents('some text for a\n', t, 'a')
292
302
# The contents should be overwritten
304
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
307
self.failIf(t.has('a'))
317
self.assertFalse(t.has('a'))
308
318
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
319
self.assertTrue(t.has('a'))
310
320
self.check_transport_contents('some text for a\n', t, 'a')
311
321
# Put also replaces contents
312
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
324
334
# Now test the create_parent flag
325
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
337
self.assertFalse(t.has('dir/a'))
328
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
339
create_parent_dir=True)
330
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
402
412
result = t.put_file('a', StringIO('some text for a\n'))
403
413
# put_file returns the length of the data written
404
414
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
415
self.assertTrue(t.has('a'))
406
416
self.check_transport_contents('some text for a\n', t, 'a')
407
417
# Put also replaces contents
408
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
420
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
423
self.failIf(t.has('a'))
433
self.assertFalse(t.has('a'))
424
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
435
self.assertTrue(t.has('a'))
426
436
self.check_transport_contents('some text for a\n', t, 'a')
427
437
# Put also replaces contents
428
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
440
450
# Now test the create_parent flag
441
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
452
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
453
self.assertFalse(t.has('dir/a'))
444
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
455
create_parent_dir=True)
446
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
510
520
self.assertTransportMode(t, 'dir777', 0777)
512
522
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
523
t = self.get_transport()
520
524
if t.is_readonly():
522
526
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
527
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
540
529
def test_mkdir(self):
541
530
t = self.get_transport()
620
609
def test_opening_a_file_stream_can_set_mode(self):
621
610
t = self.get_transport()
622
611
if t.is_readonly():
612
self.assertRaises((TransportNotPossible, NotImplementedError),
613
t.open_write_stream, 'foo')
624
615
if not t._can_roundtrip_unix_modebits():
625
616
# Can't roundtrip, so no need to run this test
627
619
def check_mode(name, mode, expected):
628
620
handle = t.open_write_stream(name, mode=mode)
645
637
self.build_tree(files, transport=transport_from)
646
638
self.assertEqual(4, transport_from.copy_to(files, transport_to))
648
self.check_transport_contents(transport_to.get(f).read(),
640
self.check_transport_contents(transport_to.get_bytes(f),
649
641
transport_from, f)
651
643
t = self.get_transport()
674
666
files = ['a', 'b', 'c', 'd']
675
667
t.copy_to(iter(files), temp_transport)
677
self.check_transport_contents(temp_transport.get(f).read(),
669
self.check_transport_contents(temp_transport.get_bytes(f),
679
671
del temp_transport
825
817
t.put_bytes('a', 'a little bit of text\n')
826
self.failUnless(t.has('a'))
818
self.assertTrue(t.has('a'))
828
self.failIf(t.has('a'))
820
self.assertFalse(t.has('a'))
830
822
self.assertRaises(NoSuchFile, t.delete, 'a')
837
829
t.delete_multi(['a', 'c'])
838
830
self.assertEqual([False, True, False],
839
831
list(t.has_multi(['a', 'b', 'c'])))
840
self.failIf(t.has('a'))
841
self.failUnless(t.has('b'))
842
self.failIf(t.has('c'))
832
self.assertFalse(t.has('a'))
833
self.assertTrue(t.has('b'))
834
self.assertFalse(t.has('c'))
844
836
self.assertRaises(NoSuchFile,
845
837
t.delete_multi, ['a', 'b', 'c'])
906
898
t.mkdir('foo-baz')
908
900
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
self.failUnless(t.has('foo-bar'))
901
self.assertTrue(t.has('foo-bar'))
911
903
def test_rename_dir_succeeds(self):
912
904
t = self.get_transport()
913
905
if t.is_readonly():
914
raise TestSkipped("transport is readonly")
906
self.assertRaises((TransportNotPossible, NotImplementedError),
907
t.rename, 'foo', 'bar')
916
910
t.mkdir('adir/asubdir')
917
911
t.rename('adir', 'bdir')
992
988
# perhaps all of this could be done in a subdirectory
994
990
t.put_bytes('a', 'a first file\n')
995
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
991
self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
998
self.failUnless(t.has('b'))
999
self.failIf(t.has('a'))
994
self.assertTrue(t.has('b'))
995
self.assertFalse(t.has('a'))
1001
997
self.check_transport_contents('a first file\n', t, 'b')
1002
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
998
self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1004
1000
# Overwrite a file
1005
1001
t.put_bytes('c', 'c this file\n')
1006
1002
t.move('c', 'b')
1007
self.failIf(t.has('c'))
1003
self.assertFalse(t.has('c'))
1008
1004
self.check_transport_contents('c this file\n', t, 'b')
1010
1006
# TODO: Try to write a test for atomicity
1064
1060
for path, size in zip(paths, sizes):
1065
1061
st = t.stat(path)
1066
1062
if path.endswith('/'):
1067
self.failUnless(S_ISDIR(st.st_mode))
1063
self.assertTrue(S_ISDIR(st.st_mode))
1068
1064
# directory sizes are meaningless
1070
self.failUnless(S_ISREG(st.st_mode))
1066
self.assertTrue(S_ISREG(st.st_mode))
1071
1067
self.assertEqual(size, st.st_size)
1073
1069
remote_stats = list(t.stat_multi(paths))
1080
1076
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
1077
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
1078
subdir = t.clone('subdir')
1083
subdir.stat('./file')
1079
st = subdir.stat('./file')
1080
st = subdir.stat('.')
1086
1082
def test_hardlink(self):
1087
1083
from stat import ST_NLINK
1097
1093
t.hardlink(source_name, link_name)
1099
self.failUnless(t.has(source_name))
1100
self.failUnless(t.has(link_name))
1095
self.assertTrue(t.has(source_name))
1096
self.assertTrue(t.has(link_name))
1102
1098
st = t.stat(link_name)
1103
self.failUnlessEqual(st[ST_NLINK], 2)
1099
self.assertEqual(st[ST_NLINK], 2)
1104
1100
except TransportNotPossible:
1105
1101
raise TestSkipped("Transport %s does not support hardlinks." %
1106
1102
self._server.__class__)
1119
1115
t.symlink(source_name, link_name)
1121
self.failUnless(t.has(source_name))
1122
self.failUnless(t.has(link_name))
1117
self.assertTrue(t.has(source_name))
1118
self.assertTrue(t.has(link_name))
1124
1120
st = t.stat(link_name)
1125
self.failUnless(S_ISLNK(st.st_mode),
1121
self.assertTrue(S_ISLNK(st.st_mode),
1126
1122
"expected symlink, got mode %o" % st.st_mode)
1127
1123
except TransportNotPossible:
1128
1124
raise TestSkipped("Transport %s does not support symlinks." %
1129
1125
self._server.__class__)
1130
1126
except IOError:
1131
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1127
self.knownFailure("Paramiko fails to create symlinks during tests")
1133
1129
def test_list_dir(self):
1134
1130
# TODO: Test list_dir, just try once, and if it throws, stop testing
1198
1194
raise TestSkipped("not a connected transport")
1200
1196
t2 = t1.clone('subdir')
1201
self.assertEquals(t1._scheme, t2._scheme)
1202
self.assertEquals(t1._user, t2._user)
1203
self.assertEquals(t1._password, t2._password)
1204
self.assertEquals(t1._host, t2._host)
1205
self.assertEquals(t1._port, t2._port)
1197
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1207
1203
def test__reuse_for(self):
1208
1204
t = self.get_transport()
1216
1212
Only the parameters different from None will be changed.
1218
if scheme is None: scheme = t._scheme
1219
if user is None: user = t._user
1220
if password is None: password = t._password
1221
if user is None: user = t._user
1222
if host is None: host = t._host
1223
if port is None: port = t._port
1224
if path is None: path = t._path
1225
return t._unsplit_url(scheme, user, password, host, port, path)
1214
if scheme is None: scheme = t._parsed_url.scheme
1215
if user is None: user = t._parsed_url.user
1216
if password is None: password = t._parsed_url.password
1217
if user is None: user = t._parsed_url.user
1218
if host is None: host = t._parsed_url.host
1219
if port is None: port = t._parsed_url.port
1220
if path is None: path = t._parsed_url.path
1221
return str(urlutils.URL(scheme, user, password, host, port, path))
1227
if t._scheme == 'ftp':
1223
if t._parsed_url.scheme == 'ftp':
1228
1224
scheme = 'sftp'
1231
1227
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
if t._parsed_url.user == 'me':
1246
1242
# (they may be typed by the user when prompted for example)
1247
1243
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1248
1244
# We will not connect, we can use a invalid host
1249
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1245
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1246
if t._parsed_url.port == 1234:
1266
1262
self.assertIs(t._get_connection(), c._get_connection())
1268
1264
# Temporary failure, we need to create a new dummy connection
1269
new_connection = object()
1265
new_connection = None
1270
1266
t._set_connection(new_connection)
1271
1267
# Check that both transports use the same connection
1272
1268
self.assertIs(new_connection, t._get_connection())
1295
1291
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1297
self.failUnless(t1.has('a'))
1298
self.failUnless(t1.has('b/c'))
1299
self.failIf(t1.has('c'))
1293
self.assertTrue(t1.has('a'))
1294
self.assertTrue(t1.has('b/c'))
1295
self.assertFalse(t1.has('c'))
1301
1297
t2 = t1.clone('b')
1302
1298
self.assertEqual(t1.base + 'b/', t2.base)
1304
self.failUnless(t2.has('c'))
1305
self.failIf(t2.has('a'))
1300
self.assertTrue(t2.has('c'))
1301
self.assertFalse(t2.has('a'))
1307
1303
t3 = t2.clone('..')
1308
self.failUnless(t3.has('a'))
1309
self.failIf(t3.has('c'))
1304
self.assertTrue(t3.has('a'))
1305
self.assertFalse(t3.has('c'))
1311
self.failIf(t1.has('b/d'))
1312
self.failIf(t2.has('d'))
1313
self.failIf(t3.has('b/d'))
1307
self.assertFalse(t1.has('b/d'))
1308
self.assertFalse(t2.has('d'))
1309
self.assertFalse(t3.has('b/d'))
1315
1311
if t1.is_readonly():
1316
1312
self.build_tree_contents([('b/d', 'newfile\n')])
1318
1314
t2.put_bytes('d', 'newfile\n')
1320
self.failUnless(t1.has('b/d'))
1321
self.failUnless(t2.has('d'))
1322
self.failUnless(t3.has('b/d'))
1316
self.assertTrue(t1.has('b/d'))
1317
self.assertTrue(t2.has('d'))
1318
self.assertTrue(t3.has('b/d'))
1324
1320
def test_clone_to_root(self):
1325
1321
orig_transport = self.get_transport()
1399
1395
self.assertEqual(transport.clone("/").abspath('foo'),
1400
1396
transport.abspath("/foo"))
1398
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1402
1399
def test_win32_abspath(self):
1403
1400
# Note: we tried to set sys.platform='win32' so we could test on
1404
1401
# other platforms too, but then osutils does platform specific
1410
1407
# smoke test for abspath on win32.
1411
1408
# a transport based on 'file:///' never fully qualifies the drive.
1412
transport = get_transport("file:///")
1413
self.failUnlessEqual(transport.abspath("/"), "file:///")
1409
transport = _mod_transport.get_transport_from_url("file:///")
1410
self.assertEqual(transport.abspath("/"), "file:///")
1415
1412
# but a transport that starts with a drive spec must keep it.
1416
transport = get_transport("file:///C:/")
1417
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1413
transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
self.assertEqual(transport.abspath("/"), "file:///C:/")
1419
1416
def test_local_abspath(self):
1420
1417
transport = self.get_transport()
1547
1544
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1548
1545
if no_unicode_support:
1549
raise tests.KnownFailure("test server cannot handle unicode paths")
1546
self.knownFailure("test server cannot handle unicode paths")
1552
1549
self.build_tree(files, transport=t, line_endings='binary')
1617
1614
def test_readv(self):
1618
1615
transport = self.get_transport()
1619
1616
if transport.is_readonly():
1620
file('a', 'w').write('0123456789')
1617
with file('a', 'w') as f: f.write('0123456789')
1622
1619
transport.put_bytes('a', '0123456789')
1633
1630
def test_readv_out_of_order(self):
1634
1631
transport = self.get_transport()
1635
1632
if transport.is_readonly():
1636
file('a', 'w').write('0123456789')
1633
with file('a', 'w') as f: f.write('0123456789')
1638
1635
transport.put_bytes('a', '01234567890')
1711
1708
transport = self.get_transport()
1712
1709
# test from observed failure case.
1713
1710
if transport.is_readonly():
1714
file('a', 'w').write('a'*1024*1024)
1711
with file('a', 'w') as f: f.write('a'*1024*1024)
1716
1713
transport.put_bytes('a', 'a'*1024*1024)
1717
1714
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1751
1748
def test_readv_short_read(self):
1752
1749
transport = self.get_transport()
1753
1750
if transport.is_readonly():
1754
file('a', 'w').write('0123456789')
1751
with file('a', 'w') as f: f.write('0123456789')
1756
1753
transport.put_bytes('a', '01234567890')
1767
1764
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1768
1765
transport.readv, 'a', [(12,2)])
1767
def test_no_segment_parameters(self):
1768
"""Segment parameters should be stripped and stored in
1769
transport.segment_parameters."""
1770
transport = self.get_transport("foo")
1771
self.assertEqual({}, transport.get_segment_parameters())
1773
def test_segment_parameters(self):
1774
"""Segment parameters should be stripped and stored in
1775
transport.get_segment_parameters()."""
1776
base_url = self._server.get_url()
1777
parameters = {"key1": "val1", "key2": "val2"}
1778
url = urlutils.join_segment_parameters(base_url, parameters)
1779
transport = _mod_transport.get_transport_from_url(url)
1780
self.assertEqual(parameters, transport.get_segment_parameters())
1782
def test_set_segment_parameters(self):
1783
"""Segment parameters can be set and show up in base."""
1784
transport = self.get_transport("foo")
1785
orig_base = transport.base
1786
transport.set_segment_parameter("arm", "board")
1787
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
transport.set_segment_parameter("arm", None)
1790
transport.set_segment_parameter("nonexistant", None)
1791
self.assertEqual({}, transport.get_segment_parameters())
1792
self.assertEqual(orig_base, transport.base)
1770
1794
def test_stat_symlink(self):
1771
1795
# if a transport points directly to a symlink (and supports symlinks
1772
1796
# at all) you can tell this. helps with bug 32669.
1778
1802
t2 = t.clone('link')
1779
1803
st = t2.stat('')
1780
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))
1806
def test_abspath_url_unquote_unreserved(self):
1807
"""URLs from abspath should have unreserved characters unquoted
1809
Need consistent quoting notably for tildes, see lp:842223 for more.
1811
t = self.get_transport()
1812
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
self.assertEqual(t.base + "-.09AZ_az~",
1814
t.abspath(needlessly_escaped_dir))
1816
def test_clone_url_unquote_unreserved(self):
1817
"""Base URL of a cloned branch needs unreserved characters unquoted
1819
Cloned transports should be prefix comparable for things like the
1820
isolation checking of tests, see lp:842223 for more.
1822
t1 = self.get_transport()
1823
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
self.build_tree([needlessly_escaped_dir], transport=t1)
1825
t2 = t1.clone(needlessly_escaped_dir)
1826
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1828
def test_hook_post_connection_one(self):
1829
"""Fire post_connect hook after a ConnectedTransport is first used"""
1831
Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
t = self.get_transport()
1833
self.assertEqual([], log)
1834
t.has("non-existant")
1835
if isinstance(t, RemoteTransport):
1836
self.assertEqual([t.get_smart_medium()], log)
1837
elif isinstance(t, ConnectedTransport):
1838
self.assertEqual([t], log)
1840
self.assertEqual([], log)
1842
def test_hook_post_connection_multi(self):
1843
"""Fire post_connect hook once per unshared underlying connection"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t1 = self.get_transport()
1848
t3 = self.get_transport()
1849
self.assertEqual([], log)
1853
if isinstance(t1, RemoteTransport):
1854
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
elif isinstance(t1, ConnectedTransport):
1856
self.assertEqual([t1, t3], log)
1858
self.assertEqual([], log)