20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
30
from bzrlib import (
32
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
TransportNotPossible, ConnectionError,
37
from bzrlib.errors import (ConnectionError,
36
44
from bzrlib.osutils import getcwd
37
from bzrlib.symbol_versioning import zero_eleven
38
from bzrlib.tests import TestCaseInTempDir, TestSkipped
45
from bzrlib.smart import medium
46
from bzrlib.tests import (
51
from bzrlib.tests import test_server
39
52
from bzrlib.tests.test_transport import TestTransportImplementation
40
from bzrlib.transport import memory
41
import bzrlib.transport
45
"""Append the given text (file-like object) to the supplied filename."""
53
from bzrlib.transport import (
56
_get_transport_modules,
58
from bzrlib.transport.memory import MemoryTransport
61
def get_transport_test_permutations(module):
62
"""Get the permutations module wants to have tested."""
63
if getattr(module, 'get_test_permutations', None) is None:
65
"transport module %s doesn't provide get_test_permutations()"
68
return module.get_test_permutations()
71
def transport_test_permutations():
72
"""Return a list of the klass, server_factory pairs to test."""
74
for module in _get_transport_modules():
76
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
78
for (klass, server_factory) in permutations:
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
{"transport_class":klass,
81
"transport_server":server_factory})
82
result.append(scenario)
83
except errors.DependencyNotPresent, e:
84
# Continue even if a dependency prevents us
85
# from adding this test
90
def load_tests(standard_tests, module, loader):
91
"""Multiply tests for tranport implementations."""
92
result = loader.suiteClass()
93
scenarios = transport_test_permutations()
94
return multiply_tests(standard_tests, scenarios, result)
53
97
class TransportTests(TestTransportImplementation):
100
super(TransportTests, self).setUp()
101
self._captureVar('BZR_NO_SMART_VFS', None)
55
103
def check_transport_contents(self, content, transport, relpath):
56
104
"""Check that transport.get(relpath).read() == content."""
57
105
self.assertEqualDiff(content, transport.get(relpath).read())
59
def assertListRaises(self, excClass, func, *args, **kwargs):
60
"""Fail unless excClass is raised when the iterator from func is used.
62
Many transport functions can return generators this makes sure
63
to wrap them in a list() call to make sure the whole generator
64
is run, and that the proper exception is raised.
107
def test_ensure_base_missing(self):
108
""".ensure_base() should create the directory if it doesn't exist"""
109
t = self.get_transport()
111
if t_a.is_readonly():
112
self.assertRaises(TransportNotPossible,
115
self.assertTrue(t_a.ensure_base())
116
self.assertTrue(t.has('a'))
118
def test_ensure_base_exists(self):
119
""".ensure_base() should just be happy if it already exists"""
120
t = self.get_transport()
126
# ensure_base returns False if it didn't create the base
127
self.assertFalse(t_a.ensure_base())
129
def test_ensure_base_missing_parent(self):
130
""".ensure_base() will fail if the parent dir doesn't exist"""
131
t = self.get_transport()
137
self.assertRaises(NoSuchFile, t_b.ensure_base)
139
def test_external_url(self):
140
""".external_url either works or raises InProcessTransport."""
141
t = self.get_transport()
67
list(func(*args, **kwargs))
71
if getattr(excClass,'__name__', None) is not None:
72
excName = excClass.__name__
74
excName = str(excClass)
75
raise self.failureException, "%s not raised" % excName
144
except errors.InProcessTransport:
77
147
def test_has(self):
78
148
t = self.get_transport()
82
152
self.assertEqual(True, t.has('a'))
83
153
self.assertEqual(False, t.has('c'))
84
154
self.assertEqual(True, t.has(urlutils.escape('%')))
85
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
86
[True, True, False, False, True, False, True, False])
155
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
156
'e', 'f', 'g', 'h'])),
157
[True, True, False, False,
158
True, False, True, False])
87
159
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
88
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
89
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
90
[True, True, False, False, True, False, True, False])
160
self.assertEqual(False, t.has_any(['c', 'd', 'f',
161
urlutils.escape('%%')]))
162
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
163
'e', 'f', 'g', 'h']))),
164
[True, True, False, False,
165
True, False, True, False])
91
166
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
92
167
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
169
def test_has_root_works(self):
170
if self.transport_server is test_server.SmartTCPServer_for_testing:
171
raise TestNotApplicable(
172
"SmartTCPServer_for_testing intentionally does not allow "
174
current_transport = self.get_transport()
175
self.assertTrue(current_transport.has('/'))
176
root = current_transport.clone('/')
177
self.assertTrue(root.has(''))
94
179
def test_get(self):
95
180
t = self.get_transport()
103
188
self.build_tree(files, transport=t, line_endings='binary')
104
189
self.check_transport_contents('contents of a\n', t, 'a')
105
190
content_f = t.get_multi(files)
106
for content, f in zip(contents, content_f):
191
# Use itertools.izip() instead of use zip() or map(), since they fully
192
# evaluate their inputs, the transport requests should be issued and
193
# handled sequentially (we don't want to force transport to buffer).
194
for content, f in itertools.izip(contents, content_f):
107
195
self.assertEqual(content, f.read())
109
197
content_f = t.get_multi(iter(files))
110
for content, f in zip(contents, content_f):
198
# Use itertools.izip() for the same reason
199
for content, f in itertools.izip(contents, content_f):
111
200
self.assertEqual(content, f.read())
202
def test_get_unknown_file(self):
203
t = self.get_transport()
205
contents = ['contents of a\n',
208
self.build_tree(files, transport=t, line_endings='binary')
113
209
self.assertRaises(NoSuchFile, t.get, 'c')
114
210
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
115
211
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
213
def test_get_directory_read_gives_ReadError(self):
214
"""consistent errors for read() on a file returned by get()."""
215
t = self.get_transport()
217
self.build_tree(['a directory/'])
219
t.mkdir('a%20directory')
220
# getting the file must either work or fail with a PathError
222
a_file = t.get('a%20directory')
223
except (errors.PathError, errors.RedirectRequested):
224
# early failure return immediately.
226
# having got a file, read() must either work (i.e. http reading a dir
227
# listing) or fail with ReadError
230
except errors.ReadError:
117
233
def test_get_bytes(self):
118
234
t = self.get_transport()
379
504
dir_mode=0777, create_parent_dir=True)
380
505
self.assertTransportMode(t, 'dir777', 0777)
382
def test_put_multi(self):
383
t = self.get_transport()
387
self.assertEqual(2, self.applyDeprecated(zero_eleven,
388
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
389
('d', StringIO('contents\nfor d\n'))]
391
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
392
[True, False, False, True])
393
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
394
self.check_transport_contents('contents\nfor d\n', t, 'd')
396
self.assertEqual(2, self.applyDeprecated(zero_eleven,
397
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
398
('d', StringIO('another contents\nfor d\n'))])
400
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
401
self.check_transport_contents('another contents\nfor d\n', t, 'd')
507
def test_put_bytes_unicode(self):
508
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
509
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
510
# (we don't want to encode unicode here at all, callers should be
511
# strictly passing bytes to put_bytes), but we allow it for backwards
512
# compatibility. At some point we should use a specific exception.
513
# See https://bugs.launchpad.net/bzr/+bug/106898.
514
t = self.get_transport()
517
unicode_string = u'\u1234'
519
(AssertionError, UnicodeEncodeError),
520
t.put_bytes, 'foo', unicode_string)
522
def test_put_file_unicode(self):
523
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
524
# This situation can happen (and has) if code is careless about the type
525
# of "string" they initialise/write to a StringIO with. We cannot use
526
# cStringIO, because it never returns unicode from read.
527
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
528
# raise, but we raise it for hysterical raisins.
529
t = self.get_transport()
532
unicode_file = pyStringIO(u'\u1234')
533
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
403
535
def test_mkdir(self):
404
536
t = self.get_transport()
406
538
if t.is_readonly():
407
# cannot mkdir on readonly transports. We're not testing for
539
# cannot mkdir on readonly transports. We're not testing for
408
540
# cache coherency because cache behaviour is not currently
409
541
# defined for the transport interface.
410
542
self.assertRaises(TransportNotPossible, t.mkdir, '.')
470
602
t.mkdir('dnomode', mode=None)
471
603
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
605
def test_opening_a_file_stream_creates_file(self):
606
t = self.get_transport()
609
handle = t.open_write_stream('foo')
611
self.assertEqual('', t.get_bytes('foo'))
615
def test_opening_a_file_stream_can_set_mode(self):
616
t = self.get_transport()
619
if not t._can_roundtrip_unix_modebits():
620
# Can't roundtrip, so no need to run this test
622
def check_mode(name, mode, expected):
623
handle = t.open_write_stream(name, mode=mode)
625
self.assertTransportMode(t, name, expected)
626
check_mode('mode644', 0644, 0644)
627
check_mode('mode666', 0666, 0666)
628
check_mode('mode600', 0600, 0600)
629
# The default permissions should be based on the current umask
630
check_mode('nomode', None, 0666 & ~osutils.get_umask())
473
632
def test_copy_to(self):
474
633
# FIXME: test: same server to same server (partly done)
475
634
# same protocol two servers
476
635
# and different protocols (done for now except for MemoryTransport.
478
from bzrlib.transport.memory import MemoryTransport
480
638
def simple_copy_files(transport_from, transport_to):
481
639
files = ['a', 'b', 'c', 'd']
896
1078
subdir.stat('./file')
897
1079
subdir.stat('.')
1081
def test_hardlink(self):
1082
from stat import ST_NLINK
1084
t = self.get_transport()
1086
source_name = "original_target"
1087
link_name = "target_link"
1089
self.build_tree([source_name], transport=t)
1092
t.hardlink(source_name, link_name)
1094
self.failUnless(t.has(source_name))
1095
self.failUnless(t.has(link_name))
1097
st = t.stat(link_name)
1098
self.failUnlessEqual(st[ST_NLINK], 2)
1099
except TransportNotPossible:
1100
raise TestSkipped("Transport %s does not support hardlinks." %
1101
self._server.__class__)
1103
def test_symlink(self):
1104
from stat import S_ISLNK
1106
t = self.get_transport()
1108
source_name = "original_target"
1109
link_name = "target_link"
1111
self.build_tree([source_name], transport=t)
1114
t.symlink(source_name, link_name)
1116
self.failUnless(t.has(source_name))
1117
self.failUnless(t.has(link_name))
1119
st = t.stat(link_name)
1120
self.failUnless(S_ISLNK(st.st_mode),
1121
"expected symlink, got mode %o" % st.st_mode)
1122
except TransportNotPossible:
1123
raise TestSkipped("Transport %s does not support symlinks." %
1124
self._server.__class__)
1126
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
899
1128
def test_list_dir(self):
900
1129
# TODO: Test list_dir, just try once, and if it throws, stop testing
901
1130
t = self.get_transport()
903
1132
if not t.listable():
904
1133
self.assertRaises(TransportNotPossible, t.list_dir, '.')
908
l = list(t.list_dir(d))
1136
def sorted_list(d, transport):
1137
l = list(transport.list_dir(d))
912
# SftpServer creates control files in the working directory
913
# so lets move down a directory to avoid those.
914
if not t.is_readonly():
920
self.assertEqual([], sorted_list('.'))
1141
self.assertEqual([], sorted_list('.', t))
921
1142
# c2 is precisely one letter longer than c here to test that
922
1143
# suffixing is not confused.
923
1144
# a%25b checks that quoting is done consistently across transports
924
1145
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
925
1147
if not t.is_readonly():
926
1148
self.build_tree(tree_names, transport=t)
928
self.build_tree(['wd/' + name for name in tree_names])
931
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
932
self.assertEqual(['d', 'e'], sorted_list('c'))
1150
self.build_tree(tree_names)
1153
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1155
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1156
self.assertEqual(['d', 'e'], sorted_list('c', t))
1158
# Cloning the transport produces an equivalent listing
1159
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
934
1161
if not t.is_readonly():
941
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
942
self.assertEqual(['e'], sorted_list('c'))
1168
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1169
self.assertEqual(['e'], sorted_list('c', t))
944
1171
self.assertListRaises(PathError, t.list_dir, 'q')
945
1172
self.assertListRaises(PathError, t.list_dir, 'c/f')
1173
# 'a' is a file, list_dir should raise an error
946
1174
self.assertListRaises(PathError, t.list_dir, 'a')
948
1176
def test_list_dir_result_is_url_escaped(self):
954
1182
self.build_tree(['a/', 'a/%'], transport=t)
956
1184
self.build_tree(['a/', 'a/%'])
958
1186
names = list(t.list_dir('a'))
959
1187
self.assertEqual(['%25'], names)
960
1188
self.assertIsInstance(names[0], str)
1190
def test_clone_preserve_info(self):
1191
t1 = self.get_transport()
1192
if not isinstance(t1, ConnectedTransport):
1193
raise TestSkipped("not a connected transport")
1195
t2 = t1.clone('subdir')
1196
self.assertEquals(t1._scheme, t2._scheme)
1197
self.assertEquals(t1._user, t2._user)
1198
self.assertEquals(t1._password, t2._password)
1199
self.assertEquals(t1._host, t2._host)
1200
self.assertEquals(t1._port, t2._port)
1202
def test__reuse_for(self):
1203
t = self.get_transport()
1204
if not isinstance(t, ConnectedTransport):
1205
raise TestSkipped("not a connected transport")
1207
def new_url(scheme=None, user=None, password=None,
1208
host=None, port=None, path=None):
1209
"""Build a new url from t.base changing only parts of it.
1211
Only the parameters different from None will be changed.
1213
if scheme is None: scheme = t._scheme
1214
if user is None: user = t._user
1215
if password is None: password = t._password
1216
if user is None: user = t._user
1217
if host is None: host = t._host
1218
if port is None: port = t._port
1219
if path is None: path = t._path
1220
return t._unsplit_url(scheme, user, password, host, port, path)
1222
if t._scheme == 'ftp':
1226
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1232
# passwords are not taken into account because:
1233
# - it makes no sense to have two different valid passwords for the
1235
# - _password in ConnectedTransport is intended to collect what the
1236
# user specified from the command-line and there are cases where the
1237
# new url can contain no password (if the url was built from an
1238
# existing transport.base for example)
1239
# - password are considered part of the credentials provided at
1240
# connection creation time and as such may not be present in the url
1241
# (they may be typed by the user when prompted for example)
1242
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1243
# We will not connect, we can use a invalid host
1244
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1250
# No point in trying to reuse a transport for a local URL
1251
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1253
def test_connection_sharing(self):
1254
t = self.get_transport()
1255
if not isinstance(t, ConnectedTransport):
1256
raise TestSkipped("not a connected transport")
1258
c = t.clone('subdir')
1259
# Some transports will create the connection only when needed
1260
t.has('surely_not') # Force connection
1261
self.assertIs(t._get_connection(), c._get_connection())
1263
# Temporary failure, we need to create a new dummy connection
1264
new_connection = None
1265
t._set_connection(new_connection)
1266
# Check that both transports use the same connection
1267
self.assertIs(new_connection, t._get_connection())
1268
self.assertIs(new_connection, c._get_connection())
1270
def test_reuse_connection_for_various_paths(self):
1271
t = self.get_transport()
1272
if not isinstance(t, ConnectedTransport):
1273
raise TestSkipped("not a connected transport")
1275
t.has('surely_not') # Force connection
1276
self.assertIsNot(None, t._get_connection())
1278
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1279
self.assertIsNot(t, subdir)
1280
self.assertIs(t._get_connection(), subdir._get_connection())
1282
home = subdir._reuse_for(t.base + 'home')
1283
self.assertIs(t._get_connection(), home._get_connection())
1284
self.assertIs(subdir._get_connection(), home._get_connection())
962
1286
def test_clone(self):
963
1287
# TODO: Test that clone moves up and down the filesystem
964
1288
t1 = self.get_transport()
992
1316
self.failUnless(t2.has('d'))
993
1317
self.failUnless(t3.has('b/d'))
1319
def test_clone_to_root(self):
1320
orig_transport = self.get_transport()
1321
# Repeatedly go up to a parent directory until we're at the root
1322
# directory of this transport
1323
root_transport = orig_transport
1324
new_transport = root_transport.clone("..")
1325
# as we are walking up directories, the path must be
1326
# growing less, except at the top
1327
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1328
or new_transport.base == root_transport.base)
1329
while new_transport.base != root_transport.base:
1330
root_transport = new_transport
1331
new_transport = root_transport.clone("..")
1332
# as we are walking up directories, the path must be
1333
# growing less, except at the top
1334
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1335
or new_transport.base == root_transport.base)
1337
# Cloning to "/" should take us to exactly the same location.
1338
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1339
# the abspath of "/" from the original transport should be the same
1340
# as the base at the root:
1341
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1343
# At the root, the URL must still end with / as its a directory
1344
self.assertEqual(root_transport.base[-1], '/')
1346
def test_clone_from_root(self):
1347
"""At the root, cloning to a simple dir should just do string append."""
1348
orig_transport = self.get_transport()
1349
root_transport = orig_transport.clone('/')
1350
self.assertEqual(root_transport.base + '.bzr/',
1351
root_transport.clone('.bzr').base)
1353
def test_base_url(self):
1354
t = self.get_transport()
1355
self.assertEqual('/', t.base[-1])
995
1357
def test_relpath(self):
996
1358
t = self.get_transport()
997
1359
self.assertEqual('', t.relpath(t.base))
998
1360
# base ends with /
999
1361
self.assertEqual('', t.relpath(t.base[:-1]))
1000
# subdirs which dont exist should still give relpaths.
1362
# subdirs which don't exist should still give relpaths.
1001
1363
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1002
1364
# trailing slash should be the same.
1003
1365
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1019
1381
# that have aliasing problems like symlinks should go in backend
1020
1382
# specific test cases.
1021
1383
transport = self.get_transport()
1023
# disabled because some transports might normalize urls in generating
1024
# the abspath - eg http+pycurl-> just http -- mbp 20060308
1025
1385
self.assertEqual(transport.base + 'relpath',
1026
1386
transport.abspath('relpath'))
1388
# This should work without raising an error.
1389
transport.abspath("/")
1391
# the abspath of "/" and "/foo/.." should result in the same location
1392
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1394
self.assertEqual(transport.clone("/").abspath('foo'),
1395
transport.abspath("/foo"))
1397
def test_win32_abspath(self):
1398
# Note: we tried to set sys.platform='win32' so we could test on
1399
# other platforms too, but then osutils does platform specific
1400
# things at import time which defeated us...
1401
if sys.platform != 'win32':
1403
'Testing drive letters in abspath implemented only for win32')
1405
# smoke test for abspath on win32.
1406
# a transport based on 'file:///' never fully qualifies the drive.
1407
transport = get_transport("file:///")
1408
self.failUnlessEqual(transport.abspath("/"), "file:///")
1410
# but a transport that starts with a drive spec must keep it.
1411
transport = get_transport("file:///C:/")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1028
1414
def test_local_abspath(self):
1029
1415
transport = self.get_transport()
1031
1417
p = transport.local_abspath('.')
1032
except TransportNotPossible:
1033
pass # This is not a local transport
1418
except (errors.NotLocalUrl, TransportNotPossible), e:
1419
# should be formattable
1035
1422
self.assertEqual(getcwd(), p)
1137
1558
self.check_transport_contents(contents, t, urlutils.escape(fname))
1139
1560
def test_connect_twice_is_same_content(self):
1140
# check that our server (whatever it is) is accessable reliably
1561
# check that our server (whatever it is) is accessible reliably
1141
1562
# via get_transport and multiple connections share content.
1142
1563
transport = self.get_transport()
1143
1564
if transport.is_readonly():
1145
1566
transport.put_bytes('foo', 'bar')
1146
transport2 = self.get_transport()
1147
self.check_transport_contents('bar', transport2, 'foo')
1148
# its base should be usable.
1149
transport2 = bzrlib.transport.get_transport(transport.base)
1150
self.check_transport_contents('bar', transport2, 'foo')
1567
transport3 = self.get_transport()
1568
self.check_transport_contents('bar', transport3, 'foo')
1152
1570
# now opening at a relative url should give use a sane result:
1153
1571
transport.mkdir('newdir')
1154
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1155
transport2 = transport2.clone('..')
1156
self.check_transport_contents('bar', transport2, 'foo')
1572
transport5 = self.get_transport('newdir')
1573
transport6 = transport5.clone('..')
1574
self.check_transport_contents('bar', transport6, 'foo')
1158
1576
def test_lock_write(self):
1577
"""Test transport-level write locks.
1579
These are deprecated and transports may decline to support them.
1159
1581
transport = self.get_transport()
1160
1582
if transport.is_readonly():
1161
1583
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1163
1585
transport.put_bytes('lock', '')
1164
lock = transport.lock_write('lock')
1587
lock = transport.lock_write('lock')
1588
except TransportNotPossible:
1165
1590
# TODO make this consistent on all platforms:
1166
1591
# self.assertRaises(LockError, transport.lock_write, 'lock')
1169
1594
def test_lock_read(self):
1595
"""Test transport-level read locks.
1597
These are deprecated and transports may decline to support them.
1170
1599
transport = self.get_transport()
1171
1600
if transport.is_readonly():
1172
1601
file('lock', 'w').close()
1174
1603
transport.put_bytes('lock', '')
1175
lock = transport.lock_read('lock')
1605
lock = transport.lock_read('lock')
1606
except TransportNotPossible:
1176
1608
# TODO make this consistent on all platforms:
1177
1609
# self.assertRaises(LockError, transport.lock_read, 'lock')
1202
1637
self.assertEqual(d[1], (9, '9'))
1203
1638
self.assertEqual(d[2], (0, '0'))
1204
1639
self.assertEqual(d[3], (3, '34'))
1641
def test_readv_with_adjust_for_latency(self):
1642
transport = self.get_transport()
1643
# the adjust for latency flag expands the data region returned
1644
# according to a per-transport heuristic, so testing is a little
1645
# tricky as we need more data than the largest combining that our
1646
# transports do. To accomodate this we generate random data and cross
1647
# reference the returned data with the random data. To avoid doing
1648
# multiple large random byte look ups we do several tests on the same
1650
content = osutils.rand_bytes(200*1024)
1651
content_size = len(content)
1652
if transport.is_readonly():
1653
self.build_tree_contents([('a', content)])
1655
transport.put_bytes('a', content)
1656
def check_result_data(result_vector):
1657
for item in result_vector:
1658
data_len = len(item[1])
1659
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1662
result = list(transport.readv('a', ((0, 30),),
1663
adjust_for_latency=True, upper_limit=content_size))
1664
# we expect 1 result, from 0, to something > 30
1665
self.assertEqual(1, len(result))
1666
self.assertEqual(0, result[0][0])
1667
self.assertTrue(len(result[0][1]) >= 30)
1668
check_result_data(result)
1669
# end of file corner case
1670
result = list(transport.readv('a', ((204700, 100),),
1671
adjust_for_latency=True, upper_limit=content_size))
1672
# we expect 1 result, from 204800- its length, to the end
1673
self.assertEqual(1, len(result))
1674
data_len = len(result[0][1])
1675
self.assertEqual(204800-data_len, result[0][0])
1676
self.assertTrue(data_len >= 100)
1677
check_result_data(result)
1678
# out of order ranges are made in order
1679
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1680
adjust_for_latency=True, upper_limit=content_size))
1681
# we expect 2 results, in order, start and end.
1682
self.assertEqual(2, len(result))
1684
data_len = len(result[0][1])
1685
self.assertEqual(0, result[0][0])
1686
self.assertTrue(data_len >= 30)
1688
data_len = len(result[1][1])
1689
self.assertEqual(204800-data_len, result[1][0])
1690
self.assertTrue(data_len >= 100)
1691
check_result_data(result)
1692
# close ranges get combined (even if out of order)
1693
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1694
result = list(transport.readv('a', request_vector,
1695
adjust_for_latency=True, upper_limit=content_size))
1696
self.assertEqual(1, len(result))
1697
data_len = len(result[0][1])
1698
# minimum length is from 400 to 1034 - 634
1699
self.assertTrue(data_len >= 634)
1700
# must contain the region 400 to 1034
1701
self.assertTrue(result[0][0] <= 400)
1702
self.assertTrue(result[0][0] + data_len >= 1034)
1703
check_result_data(result)
1705
def test_readv_with_adjust_for_latency_with_big_file(self):
1706
transport = self.get_transport()
1707
# test from observed failure case.
1708
if transport.is_readonly():
1709
file('a', 'w').write('a'*1024*1024)
1711
transport.put_bytes('a', 'a'*1024*1024)
1712
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1713
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1714
(465373, 800), (947422, 800)]
1715
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1716
found_items = [False]*9
1717
for pos, (start, length) in enumerate(broken_vector):
1718
# check the range is covered by the result
1719
for offset, data in results:
1720
if offset <= start and start + length <= offset + len(data):
1721
found_items[pos] = True
1722
self.assertEqual([True]*9, found_items)
1724
def test_get_with_open_write_stream_sees_all_content(self):
1725
t = self.get_transport()
1728
handle = t.open_write_stream('foo')
1731
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1735
def test_get_smart_medium(self):
1736
"""All transports must either give a smart medium, or know they can't.
1738
transport = self.get_transport()
1740
client_medium = transport.get_smart_medium()
1741
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1742
except errors.NoSmartMedium:
1743
# as long as we got it we're fine
1746
def test_readv_short_read(self):
1747
transport = self.get_transport()
1748
if transport.is_readonly():
1749
file('a', 'w').write('0123456789')
1751
transport.put_bytes('a', '01234567890')
1753
# This is intentionally reading off the end of the file
1754
# since we are sure that it cannot get there
1755
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1756
# Can be raised by paramiko
1758
transport.readv, 'a', [(1,1), (8,10)])
1760
# This is trying to seek past the end of the file, it should
1761
# also raise a special error
1762
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1763
transport.readv, 'a', [(12,2)])
1765
def test_stat_symlink(self):
1766
# if a transport points directly to a symlink (and supports symlinks
1767
# at all) you can tell this. helps with bug 32669.
1768
t = self.get_transport()
1770
t.symlink('target', 'link')
1771
except TransportNotPossible:
1772
raise TestSkipped("symlinks not supported")
1773
t2 = t.clone('link')
1775
self.assertTrue(stat.S_ISLNK(st.st_mode))