1
# Copyright (C) 2005-2011 Canonical Ltd
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
30
31
from bzrlib import (
35
transport as _mod_transport,
38
37
from bzrlib.errors import (ConnectionError,
43
45
TransportNotPossible,
45
47
from bzrlib.osutils import getcwd
46
48
from bzrlib.smart import medium
47
49
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
55
from bzrlib.tests.test_transport import TestTransportImplementation
54
56
from bzrlib.transport import (
55
57
ConnectedTransport,
56
59
_get_transport_modules,
58
61
from bzrlib.transport.memory import MemoryTransport
74
77
for module in _get_transport_modules():
76
79
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
80
reduce(getattr, (module).split('.')[1:], __import__(module)))
78
81
for (klass, server_factory) in permutations:
79
82
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
83
{"transport_class":klass,
100
103
super(TransportTests, self).setUp()
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
104
self._captureVar('BZR_NO_SMART_VFS', None)
103
106
def check_transport_contents(self, content, transport, relpath):
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
107
"""Check that transport.get(relpath).read() == content."""
108
self.assertEqualDiff(content, transport.get(relpath).read())
107
110
def test_ensure_base_missing(self):
108
111
""".ensure_base() should create the directory if it doesn't exist"""
167
170
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
169
172
def test_has_root_works(self):
170
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
from bzrlib.smart import server
174
if self.transport_server is server.SmartTCPServer_for_testing:
171
175
raise TestNotApplicable(
172
176
"SmartTCPServer_for_testing intentionally does not allow "
208
212
self.build_tree(files, transport=t, line_endings='binary')
209
213
self.assertRaises(NoSuchFile, t.get, 'c')
210
def iterate_and_close(func, *args):
211
for f in func(*args):
212
# We call f.read() here because things like paramiko actually
213
# spawn a thread to prefetch the content, which we want to
214
# consume before we close the handle.
217
self.assertRaises(NoSuchFile, iterate_and_close,
218
t.get_multi, ['a', 'b', 'c'])
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, iter(['a', 'b', 'c']))
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
222
217
def test_get_directory_read_gives_ReadError(self):
223
218
"""consistent errors for read() on a file returned by get()."""
278
274
handle.write('b')
279
275
self.assertEqual('b', t.get_bytes('foo'))
282
self.assertEqual('b', f.read())
276
self.assertEqual('b', t.get('foo').read())
296
288
t.put_bytes('a', 'some text for a\n')
297
self.assertTrue(t.has('a'))
289
self.failUnless(t.has('a'))
298
290
self.check_transport_contents('some text for a\n', t, 'a')
300
292
# The contents should be overwritten
312
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
self.assertFalse(t.has('a'))
307
self.failIf(t.has('a'))
316
308
t.put_bytes_non_atomic('a', 'some text for a\n')
317
self.assertTrue(t.has('a'))
309
self.failUnless(t.has('a'))
318
310
self.check_transport_contents('some text for a\n', t, 'a')
319
311
# Put also replaces contents
320
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
332
324
# Now test the create_parent flag
333
325
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
335
self.assertFalse(t.has('dir/a'))
327
self.failIf(t.has('dir/a'))
336
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
337
329
create_parent_dir=True)
338
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
410
402
result = t.put_file('a', StringIO('some text for a\n'))
411
403
# put_file returns the length of the data written
412
404
self.assertEqual(16, result)
413
self.assertTrue(t.has('a'))
405
self.failUnless(t.has('a'))
414
406
self.check_transport_contents('some text for a\n', t, 'a')
415
407
# Put also replaces contents
416
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
428
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
self.assertFalse(t.has('a'))
423
self.failIf(t.has('a'))
432
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
433
self.assertTrue(t.has('a'))
425
self.failUnless(t.has('a'))
434
426
self.check_transport_contents('some text for a\n', t, 'a')
435
427
# Put also replaces contents
436
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
448
440
# Now test the create_parent flag
449
441
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
450
442
StringIO('contents\n'))
451
self.assertFalse(t.has('dir/a'))
443
self.failIf(t.has('dir/a'))
452
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
453
445
create_parent_dir=True)
454
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
653
645
self.build_tree(files, transport=transport_from)
654
646
self.assertEqual(4, transport_from.copy_to(files, transport_to))
656
self.check_transport_contents(transport_to.get_bytes(f),
648
self.check_transport_contents(transport_to.get(f).read(),
657
649
transport_from, f)
659
651
t = self.get_transport()
682
674
files = ['a', 'b', 'c', 'd']
683
675
t.copy_to(iter(files), temp_transport)
685
self.check_transport_contents(temp_transport.get_bytes(f),
677
self.check_transport_contents(temp_transport.get(f).read(),
687
679
del temp_transport
833
825
t.put_bytes('a', 'a little bit of text\n')
834
self.assertTrue(t.has('a'))
826
self.failUnless(t.has('a'))
836
self.assertFalse(t.has('a'))
828
self.failIf(t.has('a'))
838
830
self.assertRaises(NoSuchFile, t.delete, 'a')
845
837
t.delete_multi(['a', 'c'])
846
838
self.assertEqual([False, True, False],
847
839
list(t.has_multi(['a', 'b', 'c'])))
848
self.assertFalse(t.has('a'))
849
self.assertTrue(t.has('b'))
850
self.assertFalse(t.has('c'))
840
self.failIf(t.has('a'))
841
self.failUnless(t.has('b'))
842
self.failIf(t.has('c'))
852
844
self.assertRaises(NoSuchFile,
853
845
t.delete_multi, ['a', 'b', 'c'])
914
906
t.mkdir('foo-baz')
916
908
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
917
self.assertTrue(t.has('foo-bar'))
909
self.failUnless(t.has('foo-bar'))
919
911
def test_rename_dir_succeeds(self):
920
912
t = self.get_transport()
1003
995
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1005
997
t.move('a', 'b')
1006
self.assertTrue(t.has('b'))
1007
self.assertFalse(t.has('a'))
998
self.failUnless(t.has('b'))
999
self.failIf(t.has('a'))
1009
1001
self.check_transport_contents('a first file\n', t, 'b')
1010
1002
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1012
1004
# Overwrite a file
1013
1005
t.put_bytes('c', 'c this file\n')
1014
1006
t.move('c', 'b')
1015
self.assertFalse(t.has('c'))
1007
self.failIf(t.has('c'))
1016
1008
self.check_transport_contents('c this file\n', t, 'b')
1018
1010
# TODO: Try to write a test for atomicity
1050
1042
except NotImplementedError:
1051
1043
raise TestSkipped("Transport %s has no bogus URL support." %
1052
1044
self._server.__class__)
1053
t = _mod_transport.get_transport(url)
1045
t = get_transport(url)
1054
1046
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1056
1048
def test_stat(self):
1072
1064
for path, size in zip(paths, sizes):
1073
1065
st = t.stat(path)
1074
1066
if path.endswith('/'):
1075
self.assertTrue(S_ISDIR(st.st_mode))
1067
self.failUnless(S_ISDIR(st.st_mode))
1076
1068
# directory sizes are meaningless
1078
self.assertTrue(S_ISREG(st.st_mode))
1070
self.failUnless(S_ISREG(st.st_mode))
1079
1071
self.assertEqual(size, st.st_size)
1081
1073
remote_stats = list(t.stat_multi(paths))
1088
1080
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1089
1081
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1090
1082
subdir = t.clone('subdir')
1091
st = subdir.stat('./file')
1092
st = subdir.stat('.')
1094
def test_hardlink(self):
1095
from stat import ST_NLINK
1097
t = self.get_transport()
1099
source_name = "original_target"
1100
link_name = "target_link"
1102
self.build_tree([source_name], transport=t)
1105
t.hardlink(source_name, link_name)
1107
self.assertTrue(t.has(source_name))
1108
self.assertTrue(t.has(link_name))
1110
st = t.stat(link_name)
1111
self.assertEqual(st[ST_NLINK], 2)
1112
except TransportNotPossible:
1113
raise TestSkipped("Transport %s does not support hardlinks." %
1114
self._server.__class__)
1116
def test_symlink(self):
1117
from stat import S_ISLNK
1119
t = self.get_transport()
1121
source_name = "original_target"
1122
link_name = "target_link"
1124
self.build_tree([source_name], transport=t)
1127
t.symlink(source_name, link_name)
1129
self.assertTrue(t.has(source_name))
1130
self.assertTrue(t.has(link_name))
1132
st = t.stat(link_name)
1133
self.assertTrue(S_ISLNK(st.st_mode),
1134
"expected symlink, got mode %o" % st.st_mode)
1135
except TransportNotPossible:
1136
raise TestSkipped("Transport %s does not support symlinks." %
1137
self._server.__class__)
1139
self.knownFailure("Paramiko fails to create symlinks during tests")
1083
subdir.stat('./file')
1141
1086
def test_list_dir(self):
1142
1087
# TODO: Test list_dir, just try once, and if it throws, stop testing
1206
1151
raise TestSkipped("not a connected transport")
1208
1153
t2 = t1.clone('subdir')
1209
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1210
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1211
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1212
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1213
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1154
self.assertEquals(t1._scheme, t2._scheme)
1155
self.assertEquals(t1._user, t2._user)
1156
self.assertEquals(t1._password, t2._password)
1157
self.assertEquals(t1._host, t2._host)
1158
self.assertEquals(t1._port, t2._port)
1215
1160
def test__reuse_for(self):
1216
1161
t = self.get_transport()
1224
1169
Only the parameters different from None will be changed.
1226
if scheme is None: scheme = t._parsed_url.scheme
1227
if user is None: user = t._parsed_url.user
1228
if password is None: password = t._parsed_url.password
1229
if user is None: user = t._parsed_url.user
1230
if host is None: host = t._parsed_url.host
1231
if port is None: port = t._parsed_url.port
1232
if path is None: path = t._parsed_url.path
1233
return str(urlutils.URL(scheme, user, password, host, port, path))
1171
if scheme is None: scheme = t._scheme
1172
if user is None: user = t._user
1173
if password is None: password = t._password
1174
if user is None: user = t._user
1175
if host is None: host = t._host
1176
if port is None: port = t._port
1177
if path is None: path = t._path
1178
return t._unsplit_url(scheme, user, password, host, port, path)
1235
if t._parsed_url.scheme == 'ftp':
1180
if t._scheme == 'ftp':
1236
1181
scheme = 'sftp'
1239
1184
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1240
if t._parsed_url.user == 'me':
1254
1199
# (they may be typed by the user when prompted for example)
1255
1200
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1256
1201
# We will not connect, we can use a invalid host
1257
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1258
if t._parsed_url.port == 1234:
1202
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1274
1219
self.assertIs(t._get_connection(), c._get_connection())
1276
1221
# Temporary failure, we need to create a new dummy connection
1277
new_connection = None
1222
new_connection = object()
1278
1223
t._set_connection(new_connection)
1279
1224
# Check that both transports use the same connection
1280
1225
self.assertIs(new_connection, t._get_connection())
1303
1248
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1305
self.assertTrue(t1.has('a'))
1306
self.assertTrue(t1.has('b/c'))
1307
self.assertFalse(t1.has('c'))
1250
self.failUnless(t1.has('a'))
1251
self.failUnless(t1.has('b/c'))
1252
self.failIf(t1.has('c'))
1309
1254
t2 = t1.clone('b')
1310
1255
self.assertEqual(t1.base + 'b/', t2.base)
1312
self.assertTrue(t2.has('c'))
1313
self.assertFalse(t2.has('a'))
1257
self.failUnless(t2.has('c'))
1258
self.failIf(t2.has('a'))
1315
1260
t3 = t2.clone('..')
1316
self.assertTrue(t3.has('a'))
1317
self.assertFalse(t3.has('c'))
1261
self.failUnless(t3.has('a'))
1262
self.failIf(t3.has('c'))
1319
self.assertFalse(t1.has('b/d'))
1320
self.assertFalse(t2.has('d'))
1321
self.assertFalse(t3.has('b/d'))
1264
self.failIf(t1.has('b/d'))
1265
self.failIf(t2.has('d'))
1266
self.failIf(t3.has('b/d'))
1323
1268
if t1.is_readonly():
1324
1269
self.build_tree_contents([('b/d', 'newfile\n')])
1326
1271
t2.put_bytes('d', 'newfile\n')
1328
self.assertTrue(t1.has('b/d'))
1329
self.assertTrue(t2.has('d'))
1330
self.assertTrue(t3.has('b/d'))
1273
self.failUnless(t1.has('b/d'))
1274
self.failUnless(t2.has('d'))
1275
self.failUnless(t3.has('b/d'))
1332
1277
def test_clone_to_root(self):
1333
1278
orig_transport = self.get_transport()
1407
1352
self.assertEqual(transport.clone("/").abspath('foo'),
1408
1353
transport.abspath("/foo"))
1410
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1411
1355
def test_win32_abspath(self):
1412
1356
# Note: we tried to set sys.platform='win32' so we could test on
1413
1357
# other platforms too, but then osutils does platform specific
1419
1363
# smoke test for abspath on win32.
1420
1364
# a transport based on 'file:///' never fully qualifies the drive.
1421
transport = _mod_transport.get_transport("file:///")
1422
self.assertEqual(transport.abspath("/"), "file:///")
1365
transport = get_transport("file:///")
1366
self.failUnlessEqual(transport.abspath("/"), "file:///")
1424
1368
# but a transport that starts with a drive spec must keep it.
1425
transport = _mod_transport.get_transport("file:///C:/")
1426
self.assertEqual(transport.abspath("/"), "file:///C:/")
1369
transport = get_transport("file:///C:/")
1370
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1428
1372
def test_local_abspath(self):
1429
1373
transport = self.get_transport()
1553
1497
u'\u65e5', # Kanji person
1556
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1557
if no_unicode_support:
1558
self.knownFailure("test server cannot handle unicode paths")
1561
1501
self.build_tree(files, transport=t, line_endings='binary')
1562
1502
except UnicodeError:
1626
1566
def test_readv(self):
1627
1567
transport = self.get_transport()
1628
1568
if transport.is_readonly():
1629
with file('a', 'w') as f: f.write('0123456789')
1569
file('a', 'w').write('0123456789')
1631
1571
transport.put_bytes('a', '0123456789')
1642
1582
def test_readv_out_of_order(self):
1643
1583
transport = self.get_transport()
1644
1584
if transport.is_readonly():
1645
with file('a', 'w') as f: f.write('0123456789')
1585
file('a', 'w').write('0123456789')
1647
1587
transport.put_bytes('a', '01234567890')
1720
1660
transport = self.get_transport()
1721
1661
# test from observed failure case.
1722
1662
if transport.is_readonly():
1723
with file('a', 'w') as f: f.write('a'*1024*1024)
1663
file('a', 'w').write('a'*1024*1024)
1725
1665
transport.put_bytes('a', 'a'*1024*1024)
1726
1666
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1760
1700
def test_readv_short_read(self):
1761
1701
transport = self.get_transport()
1762
1702
if transport.is_readonly():
1763
with file('a', 'w') as f: f.write('0123456789')
1703
file('a', 'w').write('0123456789')
1765
1705
transport.put_bytes('a', '01234567890')
1775
1715
# also raise a special error
1776
1716
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1777
1717
transport.readv, 'a', [(12,2)])
1779
def test_no_segment_parameters(self):
1780
"""Segment parameters should be stripped and stored in
1781
transport.segment_parameters."""
1782
transport = self.get_transport("foo")
1783
self.assertEquals({}, transport.get_segment_parameters())
1785
def test_segment_parameters(self):
1786
"""Segment parameters should be stripped and stored in
1787
transport.get_segment_parameters()."""
1788
base_url = self._server.get_url()
1789
parameters = {"key1": "val1", "key2": "val2"}
1790
url = urlutils.join_segment_parameters(base_url, parameters)
1791
transport = _mod_transport.get_transport(url)
1792
self.assertEquals(parameters, transport.get_segment_parameters())
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))