20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
25
26
from StringIO import StringIO as pyStringIO
30
30
from bzrlib import (
35
transport as _mod_transport,
35
38
from bzrlib.errors import (ConnectionError,
44
43
TransportNotPossible,
46
45
from bzrlib.osutils import getcwd
47
46
from bzrlib.smart import medium
48
from bzrlib.symbol_versioning import zero_eleven
49
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
50
53
from bzrlib.tests.test_transport import TestTransportImplementation
51
from bzrlib.transport import memory, remote, _get_transport_modules
52
import bzrlib.transport
55
class TransportTestProviderAdapter(TestScenarioApplier):
56
"""A tool to generate a suite testing all transports for a single test.
58
This is done by copying the test once for each transport and injecting
59
the transport_class and transport_server classes into each copy. Each copy
60
is also given a new id() to make it easy to identify.
64
self.scenarios = self._test_permutations()
66
def get_transport_test_permutations(self, module):
67
"""Get the permutations module wants to have tested."""
68
if getattr(module, 'get_test_permutations', None) is None:
69
raise AssertionError("transport module %s doesn't provide get_test_permutations()"
71
##warning("transport module %s doesn't provide get_test_permutations()"
74
return module.get_test_permutations()
76
def _test_permutations(self):
77
"""Return a list of the klass, server_factory pairs to test."""
79
for module in _get_transport_modules():
81
permutations = self.get_transport_test_permutations(
82
reduce(getattr, (module).split('.')[1:], __import__(module)))
83
for (klass, server_factory) in permutations:
84
scenario = (server_factory.__name__,
85
{"transport_class":klass,
86
"transport_server":server_factory})
87
result.append(scenario)
88
except errors.DependencyNotPresent, e:
89
# Continue even if a dependency prevents us
90
# from running this test
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
96
99
class TransportTests(TestTransportImplementation):
99
102
super(TransportTests, self).setUp()
100
self._captureVar('BZR_NO_SMART_VFS', None)
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
102
105
def check_transport_contents(self, content, transport, relpath):
103
"""Check that transport.get(relpath).read() == content."""
104
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
106
109
def test_ensure_base_missing(self):
107
110
""".ensure_base() should create the directory if it doesn't exist"""
143
154
self.assertEqual(True, t.has('a'))
144
155
self.assertEqual(False, t.has('c'))
145
156
self.assertEqual(True, t.has(urlutils.escape('%')))
146
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
147
[True, True, False, False, True, False, True, False])
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
148
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
149
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
150
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
151
[True, True, False, False, True, False, True, False])
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
152
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
153
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
155
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
156
176
current_transport = self.get_transport()
157
177
self.assertTrue(current_transport.has('/'))
158
178
root = current_transport.clone('/')
170
190
self.build_tree(files, transport=t, line_endings='binary')
171
191
self.check_transport_contents('contents of a\n', t, 'a')
172
192
content_f = t.get_multi(files)
173
for content, f in zip(contents, content_f):
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
174
197
self.assertEqual(content, f.read())
176
199
content_f = t.get_multi(iter(files))
177
for content, f in zip(contents, content_f):
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
178
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
180
211
self.assertRaises(NoSuchFile, t.get, 'c')
181
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
182
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
184
244
def test_get_bytes(self):
185
245
t = self.get_transport()
196
256
for content, fname in zip(contents, files):
197
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
199
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
202
t = self.get_transport()
207
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
208
self.check_transport_contents('string\ncontents\n', t, 'a')
210
self.applyDeprecated(zero_eleven,
211
t.put, 'b', StringIO('file-like\ncontents\n'))
212
self.check_transport_contents('file-like\ncontents\n', t, 'b')
214
self.assertRaises(NoSuchFile,
215
self.applyDeprecated,
217
t.put, 'path/doesnt/exist/c', StringIO('contents'))
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
219
290
def test_put_bytes(self):
220
291
t = self.get_transport()
338
409
t.put_file, 'a', StringIO('some text for a\n'))
341
t.put_file('a', StringIO('some text for a\n'))
342
self.failUnless(t.has('a'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
343
416
self.check_transport_contents('some text for a\n', t, 'a')
344
417
# Put also replaces contents
345
t.put_file('a', StringIO('new\ncontents for\na\n'))
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
346
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
347
421
self.assertRaises(NoSuchFile,
348
422
t.put_file, 'path/doesnt/exist/c',
479
547
unicode_file = pyStringIO(u'\u1234')
480
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
482
def test_put_multi(self):
483
t = self.get_transport()
487
self.assertEqual(2, self.applyDeprecated(zero_eleven,
488
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
489
('d', StringIO('contents\nfor d\n'))]
491
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
492
[True, False, False, True])
493
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
494
self.check_transport_contents('contents\nfor d\n', t, 'd')
496
self.assertEqual(2, self.applyDeprecated(zero_eleven,
497
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
498
('d', StringIO('another contents\nfor d\n'))])
500
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
501
self.check_transport_contents('another contents\nfor d\n', t, 'd')
503
def test_put_permissions(self):
504
t = self.get_transport()
508
if not t._can_roundtrip_unix_modebits():
509
# Can't roundtrip, so no need to run this test
511
self.applyDeprecated(zero_eleven, t.put, 'mode644',
512
StringIO('test text\n'), mode=0644)
513
self.assertTransportMode(t, 'mode644', 0644)
514
self.applyDeprecated(zero_eleven, t.put, 'mode666',
515
StringIO('test text\n'), mode=0666)
516
self.assertTransportMode(t, 'mode666', 0666)
517
self.applyDeprecated(zero_eleven, t.put, 'mode600',
518
StringIO('test text\n'), mode=0600)
519
self.assertTransportMode(t, 'mode600', 0600)
520
# Yes, you can put a file such that it becomes readonly
521
self.applyDeprecated(zero_eleven, t.put, 'mode400',
522
StringIO('test text\n'), mode=0400)
523
self.assertTransportMode(t, 'mode400', 0400)
524
self.applyDeprecated(zero_eleven, t.put_multi,
525
[('mmode644', StringIO('text\n'))], mode=0644)
526
self.assertTransportMode(t, 'mmode644', 0644)
528
# The default permissions should be based on the current umask
529
umask = osutils.get_umask()
530
self.applyDeprecated(zero_eleven, t.put, 'nomode',
531
StringIO('test text\n'), mode=None)
532
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
534
550
def test_mkdir(self):
535
551
t = self.get_transport()
537
553
if t.is_readonly():
538
# cannot mkdir on readonly transports. We're not testing for
554
# cannot mkdir on readonly transports. We're not testing for
539
555
# cache coherency because cache behaviour is not currently
540
556
# defined for the transport interface.
541
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
601
617
t.mkdir('dnomode', mode=None)
602
618
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
620
def test_opening_a_file_stream_creates_file(self):
621
t = self.get_transport()
624
handle = t.open_write_stream('foo')
626
self.assertEqual('', t.get_bytes('foo'))
630
def test_opening_a_file_stream_can_set_mode(self):
631
t = self.get_transport()
634
if not t._can_roundtrip_unix_modebits():
635
# Can't roundtrip, so no need to run this test
637
def check_mode(name, mode, expected):
638
handle = t.open_write_stream(name, mode=mode)
640
self.assertTransportMode(t, name, expected)
641
check_mode('mode644', 0644, 0644)
642
check_mode('mode666', 0666, 0666)
643
check_mode('mode600', 0600, 0600)
644
# The default permissions should be based on the current umask
645
check_mode('nomode', None, 0666 & ~osutils.get_umask())
604
647
def test_copy_to(self):
605
648
# FIXME: test: same server to same server (partly done)
606
649
# same protocol two servers
607
650
# and different protocols (done for now except for MemoryTransport.
609
from bzrlib.transport.memory import MemoryTransport
611
653
def simple_copy_files(transport_from, transport_to):
612
654
files = ['a', 'b', 'c', 'd']
613
655
self.build_tree(files, transport=transport_from)
614
656
self.assertEqual(4, transport_from.copy_to(files, transport_to))
616
self.check_transport_contents(transport_to.get(f).read(),
658
self.check_transport_contents(transport_to.get_bytes(f),
617
659
transport_from, f)
619
661
t = self.get_transport()
653
695
self.assertTransportMode(temp_transport, f, mode)
655
def test_append(self):
697
def test_create_prefix(self):
656
698
t = self.get_transport()
660
t.put_bytes('a', 'diff\ncontents for\na\n')
661
t.put_bytes('b', 'contents\nfor b\n')
663
self.assertEqual(20, self.applyDeprecated(zero_eleven,
664
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
666
self.check_transport_contents(
667
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
670
# And we can create new files, too
671
self.assertEqual(0, self.applyDeprecated(zero_eleven,
672
t.append, 'c', StringIO('some text\nfor a missing file\n')))
673
self.check_transport_contents('some text\nfor a missing file\n',
699
sub = t.clone('foo').clone('bar')
702
except TransportNotPossible:
703
self.assertTrue(t.is_readonly())
705
self.assertTrue(t.has('foo/bar'))
675
707
def test_append_file(self):
676
708
t = self.get_transport()
994
1045
def test_connection_error(self):
995
1046
"""ConnectionError is raised when connection is impossible.
997
The error may be raised from either the constructor or the first
998
operation on the transport.
1048
The error should be raised from the first operation on the transport.
1001
1051
url = self._server.get_bogus_url()
1002
1052
except NotImplementedError:
1003
1053
raise TestSkipped("Transport %s has no bogus URL support." %
1004
1054
self._server.__class__)
1005
# This should be: but SSH still connects on construction. No COOKIE!
1006
# self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1008
t = bzrlib.transport.get_transport(url)
1009
t.get('.bzr/branch')
1010
except (ConnectionError, NoSuchFile), e:
1012
except (Exception), e:
1013
self.fail('Wrong exception thrown (%s.%s): %s'
1014
% (e.__class__.__module__, e.__class__.__name__, e))
1016
self.fail('Did not get the expected ConnectionError or NoSuchFile.')
1055
t = _mod_transport.get_transport_from_url(url)
1056
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1018
1058
def test_stat(self):
1019
1059
# TODO: Test stat, just try once, and if it throws, stop testing
1050
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1051
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1052
1092
subdir = t.clone('subdir')
1053
subdir.stat('./file')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1096
def test_hardlink(self):
1097
from stat import ST_NLINK
1099
t = self.get_transport()
1101
source_name = "original_target"
1102
link_name = "target_link"
1104
self.build_tree([source_name], transport=t)
1107
t.hardlink(source_name, link_name)
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1112
st = t.stat(link_name)
1113
self.assertEqual(st[ST_NLINK], 2)
1114
except TransportNotPossible:
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1116
self._server.__class__)
1118
def test_symlink(self):
1119
from stat import S_ISLNK
1121
t = self.get_transport()
1123
source_name = "original_target"
1124
link_name = "target_link"
1126
self.build_tree([source_name], transport=t)
1129
t.symlink(source_name, link_name)
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1134
st = t.stat(link_name)
1135
self.assertTrue(S_ISLNK(st.st_mode),
1136
"expected symlink, got mode %o" % st.st_mode)
1137
except TransportNotPossible:
1138
raise TestSkipped("Transport %s does not support symlinks." %
1139
self._server.__class__)
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
1056
1143
def test_list_dir(self):
1057
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
1058
1145
t = self.get_transport()
1060
1147
if not t.listable():
1061
1148
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1065
l = list(t.list_dir(d))
1151
def sorted_list(d, transport):
1152
l = list(transport.list_dir(d))
1069
self.assertEqual([], sorted_list('.'))
1156
self.assertEqual([], sorted_list('.', t))
1070
1157
# c2 is precisely one letter longer than c here to test that
1071
1158
# suffixing is not confused.
1072
1159
# a%25b checks that quoting is done consistently across transports
1104
1197
self.build_tree(['a/', 'a/%'], transport=t)
1106
1199
self.build_tree(['a/', 'a/%'])
1108
1201
names = list(t.list_dir('a'))
1109
1202
self.assertEqual(['%25'], names)
1110
1203
self.assertIsInstance(names[0], str)
1205
def test_clone_preserve_info(self):
1206
t1 = self.get_transport()
1207
if not isinstance(t1, ConnectedTransport):
1208
raise TestSkipped("not a connected transport")
1210
t2 = t1.clone('subdir')
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1217
def test__reuse_for(self):
1218
t = self.get_transport()
1219
if not isinstance(t, ConnectedTransport):
1220
raise TestSkipped("not a connected transport")
1222
def new_url(scheme=None, user=None, password=None,
1223
host=None, port=None, path=None):
1224
"""Build a new url from t.base changing only parts of it.
1226
Only the parameters different from None will be changed.
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1237
if t._parsed_url.scheme == 'ftp':
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1246
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
# passwords are not taken into account because:
1248
# - it makes no sense to have two different valid passwords for the
1250
# - _password in ConnectedTransport is intended to collect what the
1251
# user specified from the command-line and there are cases where the
1252
# new url can contain no password (if the url was built from an
1253
# existing transport.base for example)
1254
# - password are considered part of the credentials provided at
1255
# connection creation time and as such may not be present in the url
1256
# (they may be typed by the user when prompted for example)
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
# We will not connect, we can use a invalid host
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1264
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
# No point in trying to reuse a transport for a local URL
1266
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1268
def test_connection_sharing(self):
1269
t = self.get_transport()
1270
if not isinstance(t, ConnectedTransport):
1271
raise TestSkipped("not a connected transport")
1273
c = t.clone('subdir')
1274
# Some transports will create the connection only when needed
1275
t.has('surely_not') # Force connection
1276
self.assertIs(t._get_connection(), c._get_connection())
1278
# Temporary failure, we need to create a new dummy connection
1279
new_connection = None
1280
t._set_connection(new_connection)
1281
# Check that both transports use the same connection
1282
self.assertIs(new_connection, t._get_connection())
1283
self.assertIs(new_connection, c._get_connection())
1285
def test_reuse_connection_for_various_paths(self):
1286
t = self.get_transport()
1287
if not isinstance(t, ConnectedTransport):
1288
raise TestSkipped("not a connected transport")
1290
t.has('surely_not') # Force connection
1291
self.assertIsNot(None, t._get_connection())
1293
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
self.assertIsNot(t, subdir)
1295
self.assertIs(t._get_connection(), subdir._get_connection())
1297
home = subdir._reuse_for(t.base + 'home')
1298
self.assertIs(t._get_connection(), home._get_connection())
1299
self.assertIs(subdir._get_connection(), home._get_connection())
1112
1301
def test_clone(self):
1113
1302
# TODO: Test that clone moves up and down the filesystem
1114
1303
t1 = self.get_transport()
1116
1305
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1118
self.failUnless(t1.has('a'))
1119
self.failUnless(t1.has('b/c'))
1120
self.failIf(t1.has('c'))
1307
self.assertTrue(t1.has('a'))
1308
self.assertTrue(t1.has('b/c'))
1309
self.assertFalse(t1.has('c'))
1122
1311
t2 = t1.clone('b')
1123
1312
self.assertEqual(t1.base + 'b/', t2.base)
1125
self.failUnless(t2.has('c'))
1126
self.failIf(t2.has('a'))
1314
self.assertTrue(t2.has('c'))
1315
self.assertFalse(t2.has('a'))
1128
1317
t3 = t2.clone('..')
1129
self.failUnless(t3.has('a'))
1130
self.failIf(t3.has('c'))
1318
self.assertTrue(t3.has('a'))
1319
self.assertFalse(t3.has('c'))
1132
self.failIf(t1.has('b/d'))
1133
self.failIf(t2.has('d'))
1134
self.failIf(t3.has('b/d'))
1321
self.assertFalse(t1.has('b/d'))
1322
self.assertFalse(t2.has('d'))
1323
self.assertFalse(t3.has('b/d'))
1136
1325
if t1.is_readonly():
1137
open('b/d', 'wb').write('newfile\n')
1326
self.build_tree_contents([('b/d', 'newfile\n')])
1139
1328
t2.put_bytes('d', 'newfile\n')
1141
self.failUnless(t1.has('b/d'))
1142
self.failUnless(t2.has('d'))
1143
self.failUnless(t3.has('b/d'))
1330
self.assertTrue(t1.has('b/d'))
1331
self.assertTrue(t2.has('d'))
1332
self.assertTrue(t3.has('b/d'))
1145
1334
def test_clone_to_root(self):
1146
1335
orig_transport = self.get_transport()
1333
1574
self.check_transport_contents(contents, t, urlutils.escape(fname))
1335
1576
def test_connect_twice_is_same_content(self):
1336
# check that our server (whatever it is) is accessable reliably
1577
# check that our server (whatever it is) is accessible reliably
1337
1578
# via get_transport and multiple connections share content.
1338
1579
transport = self.get_transport()
1339
1580
if transport.is_readonly():
1341
1582
transport.put_bytes('foo', 'bar')
1342
transport2 = self.get_transport()
1343
self.check_transport_contents('bar', transport2, 'foo')
1344
# its base should be usable.
1345
transport2 = bzrlib.transport.get_transport(transport.base)
1346
self.check_transport_contents('bar', transport2, 'foo')
1583
transport3 = self.get_transport()
1584
self.check_transport_contents('bar', transport3, 'foo')
1348
1586
# now opening at a relative url should give use a sane result:
1349
1587
transport.mkdir('newdir')
1350
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1351
transport2 = transport2.clone('..')
1352
self.check_transport_contents('bar', transport2, 'foo')
1588
transport5 = self.get_transport('newdir')
1589
transport6 = transport5.clone('..')
1590
self.check_transport_contents('bar', transport6, 'foo')
1354
1592
def test_lock_write(self):
1355
1593
"""Test transport-level write locks.
1416
1654
self.assertEqual(d[2], (0, '0'))
1417
1655
self.assertEqual(d[3], (3, '34'))
1657
def test_readv_with_adjust_for_latency(self):
1658
transport = self.get_transport()
1659
# the adjust for latency flag expands the data region returned
1660
# according to a per-transport heuristic, so testing is a little
1661
# tricky as we need more data than the largest combining that our
1662
# transports do. To accomodate this we generate random data and cross
1663
# reference the returned data with the random data. To avoid doing
1664
# multiple large random byte look ups we do several tests on the same
1666
content = osutils.rand_bytes(200*1024)
1667
content_size = len(content)
1668
if transport.is_readonly():
1669
self.build_tree_contents([('a', content)])
1671
transport.put_bytes('a', content)
1672
def check_result_data(result_vector):
1673
for item in result_vector:
1674
data_len = len(item[1])
1675
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1678
result = list(transport.readv('a', ((0, 30),),
1679
adjust_for_latency=True, upper_limit=content_size))
1680
# we expect 1 result, from 0, to something > 30
1681
self.assertEqual(1, len(result))
1682
self.assertEqual(0, result[0][0])
1683
self.assertTrue(len(result[0][1]) >= 30)
1684
check_result_data(result)
1685
# end of file corner case
1686
result = list(transport.readv('a', ((204700, 100),),
1687
adjust_for_latency=True, upper_limit=content_size))
1688
# we expect 1 result, from 204800- its length, to the end
1689
self.assertEqual(1, len(result))
1690
data_len = len(result[0][1])
1691
self.assertEqual(204800-data_len, result[0][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# out of order ranges are made in order
1695
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
adjust_for_latency=True, upper_limit=content_size))
1697
# we expect 2 results, in order, start and end.
1698
self.assertEqual(2, len(result))
1700
data_len = len(result[0][1])
1701
self.assertEqual(0, result[0][0])
1702
self.assertTrue(data_len >= 30)
1704
data_len = len(result[1][1])
1705
self.assertEqual(204800-data_len, result[1][0])
1706
self.assertTrue(data_len >= 100)
1707
check_result_data(result)
1708
# close ranges get combined (even if out of order)
1709
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
result = list(transport.readv('a', request_vector,
1711
adjust_for_latency=True, upper_limit=content_size))
1712
self.assertEqual(1, len(result))
1713
data_len = len(result[0][1])
1714
# minimum length is from 400 to 1034 - 634
1715
self.assertTrue(data_len >= 634)
1716
# must contain the region 400 to 1034
1717
self.assertTrue(result[0][0] <= 400)
1718
self.assertTrue(result[0][0] + data_len >= 1034)
1719
check_result_data(result)
1721
def test_readv_with_adjust_for_latency_with_big_file(self):
1722
transport = self.get_transport()
1723
# test from observed failure case.
1724
if transport.is_readonly():
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1727
transport.put_bytes('a', 'a'*1024*1024)
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
(465373, 800), (947422, 800)]
1731
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
found_items = [False]*9
1733
for pos, (start, length) in enumerate(broken_vector):
1734
# check the range is covered by the result
1735
for offset, data in results:
1736
if offset <= start and start + length <= offset + len(data):
1737
found_items[pos] = True
1738
self.assertEqual([True]*9, found_items)
1740
def test_get_with_open_write_stream_sees_all_content(self):
1741
t = self.get_transport()
1744
handle = t.open_write_stream('foo')
1747
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1419
1751
def test_get_smart_medium(self):
1420
1752
"""All transports must either give a smart medium, or know they can't.
1445
1777
# also raise a special error
1446
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1447
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1808
def test_stat_symlink(self):
1809
# if a transport points directly to a symlink (and supports symlinks
1810
# at all) you can tell this. helps with bug 32669.
1811
t = self.get_transport()
1813
t.symlink('target', 'link')
1814
except TransportNotPossible:
1815
raise TestSkipped("symlinks not supported")
1816
t2 = t.clone('link')
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)