20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
30
from bzrlib import (
35
transport as _mod_transport,
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
LockError, NoSmartServer, PathError,
35
TransportNotPossible, ConnectionError,
38
from bzrlib.errors import (ConnectionError,
37
45
from bzrlib.osutils import getcwd
38
from bzrlib.symbol_versioning import zero_eleven
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
40
53
from bzrlib.tests.test_transport import TestTransportImplementation
41
from bzrlib.transport import memory, smart, chroot
42
import bzrlib.transport
46
"""Append the given text (file-like object) to the supplied filename."""
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
54
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
56
105
def check_transport_contents(self, content, transport, relpath):
57
"""Check that transport.get(relpath).read() == content."""
58
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
60
149
def test_has(self):
61
150
t = self.get_transport()
65
154
self.assertEqual(True, t.has('a'))
66
155
self.assertEqual(False, t.has('c'))
67
156
self.assertEqual(True, t.has(urlutils.escape('%')))
68
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
69
[True, True, False, False, True, False, True, False])
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
70
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
71
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
72
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
73
[True, True, False, False, True, False, True, False])
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
74
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
75
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
77
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
78
176
current_transport = self.get_transport()
79
if isinstance(current_transport, chroot.ChrootTransportDecorator):
80
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
81
177
self.assertTrue(current_transport.has('/'))
82
178
root = current_transport.clone('/')
83
179
self.assertTrue(root.has(''))
94
190
self.build_tree(files, transport=t, line_endings='binary')
95
191
self.check_transport_contents('contents of a\n', t, 'a')
96
192
content_f = t.get_multi(files)
97
for content, f in zip(contents, content_f):
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
98
197
self.assertEqual(content, f.read())
100
199
content_f = t.get_multi(iter(files))
101
for content, f in zip(contents, content_f):
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
102
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
104
211
self.assertRaises(NoSuchFile, t.get, 'c')
105
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
106
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
108
244
def test_get_bytes(self):
109
245
t = self.get_transport()
120
256
for content, fname in zip(contents, files):
121
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
123
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
126
t = self.get_transport()
131
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
132
self.check_transport_contents('string\ncontents\n', t, 'a')
134
self.applyDeprecated(zero_eleven,
135
t.put, 'b', StringIO('file-like\ncontents\n'))
136
self.check_transport_contents('file-like\ncontents\n', t, 'b')
138
self.assertRaises(NoSuchFile,
139
self.applyDeprecated,
141
t.put, 'path/doesnt/exist/c', StringIO('contents'))
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
143
290
def test_put_bytes(self):
144
291
t = self.get_transport()
375
519
dir_mode=0777, create_parent_dir=True)
376
520
self.assertTransportMode(t, 'dir777', 0777)
378
def test_put_multi(self):
379
t = self.get_transport()
383
self.assertEqual(2, self.applyDeprecated(zero_eleven,
384
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
385
('d', StringIO('contents\nfor d\n'))]
387
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
388
[True, False, False, True])
389
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
390
self.check_transport_contents('contents\nfor d\n', t, 'd')
392
self.assertEqual(2, self.applyDeprecated(zero_eleven,
393
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
394
('d', StringIO('another contents\nfor d\n'))])
396
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
397
self.check_transport_contents('another contents\nfor d\n', t, 'd')
399
def test_put_permissions(self):
400
t = self.get_transport()
404
if not t._can_roundtrip_unix_modebits():
405
# Can't roundtrip, so no need to run this test
407
self.applyDeprecated(zero_eleven, t.put, 'mode644',
408
StringIO('test text\n'), mode=0644)
409
self.assertTransportMode(t, 'mode644', 0644)
410
self.applyDeprecated(zero_eleven, t.put, 'mode666',
411
StringIO('test text\n'), mode=0666)
412
self.assertTransportMode(t, 'mode666', 0666)
413
self.applyDeprecated(zero_eleven, t.put, 'mode600',
414
StringIO('test text\n'), mode=0600)
415
self.assertTransportMode(t, 'mode600', 0600)
416
# Yes, you can put a file such that it becomes readonly
417
self.applyDeprecated(zero_eleven, t.put, 'mode400',
418
StringIO('test text\n'), mode=0400)
419
self.assertTransportMode(t, 'mode400', 0400)
420
self.applyDeprecated(zero_eleven, t.put_multi,
421
[('mmode644', StringIO('text\n'))], mode=0644)
422
self.assertTransportMode(t, 'mmode644', 0644)
424
# The default permissions should be based on the current umask
425
umask = osutils.get_umask()
426
self.applyDeprecated(zero_eleven, t.put, 'nomode',
427
StringIO('test text\n'), mode=None)
428
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
522
def test_put_bytes_unicode(self):
523
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
525
# (we don't want to encode unicode here at all, callers should be
526
# strictly passing bytes to put_bytes), but we allow it for backwards
527
# compatibility. At some point we should use a specific exception.
528
# See https://bugs.launchpad.net/bzr/+bug/106898.
529
t = self.get_transport()
532
unicode_string = u'\u1234'
534
(AssertionError, UnicodeEncodeError),
535
t.put_bytes, 'foo', unicode_string)
537
def test_put_file_unicode(self):
538
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
# This situation can happen (and has) if code is careless about the type
540
# of "string" they initialise/write to a StringIO with. We cannot use
541
# cStringIO, because it never returns unicode from read.
542
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
# raise, but we raise it for hysterical raisins.
544
t = self.get_transport()
547
unicode_file = pyStringIO(u'\u1234')
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
430
550
def test_mkdir(self):
431
551
t = self.get_transport()
433
553
if t.is_readonly():
434
# cannot mkdir on readonly transports. We're not testing for
554
# cannot mkdir on readonly transports. We're not testing for
435
555
# cache coherency because cache behaviour is not currently
436
556
# defined for the transport interface.
437
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
497
617
t.mkdir('dnomode', mode=None)
498
618
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
620
def test_opening_a_file_stream_creates_file(self):
621
t = self.get_transport()
624
handle = t.open_write_stream('foo')
626
self.assertEqual('', t.get_bytes('foo'))
630
def test_opening_a_file_stream_can_set_mode(self):
631
t = self.get_transport()
634
if not t._can_roundtrip_unix_modebits():
635
# Can't roundtrip, so no need to run this test
637
def check_mode(name, mode, expected):
638
handle = t.open_write_stream(name, mode=mode)
640
self.assertTransportMode(t, name, expected)
641
check_mode('mode644', 0644, 0644)
642
check_mode('mode666', 0666, 0666)
643
check_mode('mode600', 0600, 0600)
644
# The default permissions should be based on the current umask
645
check_mode('nomode', None, 0666 & ~osutils.get_umask())
500
647
def test_copy_to(self):
501
648
# FIXME: test: same server to same server (partly done)
502
649
# same protocol two servers
503
650
# and different protocols (done for now except for MemoryTransport.
505
from bzrlib.transport.memory import MemoryTransport
507
653
def simple_copy_files(transport_from, transport_to):
508
654
files = ['a', 'b', 'c', 'd']
509
655
self.build_tree(files, transport=transport_from)
510
656
self.assertEqual(4, transport_from.copy_to(files, transport_to))
512
self.check_transport_contents(transport_to.get(f).read(),
658
self.check_transport_contents(transport_to.get_bytes(f),
513
659
transport_from, f)
515
661
t = self.get_transport()
549
695
self.assertTransportMode(temp_transport, f, mode)
551
def test_append(self):
697
def test_create_prefix(self):
552
698
t = self.get_transport()
556
t.put_bytes('a', 'diff\ncontents for\na\n')
557
t.put_bytes('b', 'contents\nfor b\n')
559
self.assertEqual(20, self.applyDeprecated(zero_eleven,
560
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
562
self.check_transport_contents(
563
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
566
# And we can create new files, too
567
self.assertEqual(0, self.applyDeprecated(zero_eleven,
568
t.append, 'c', StringIO('some text\nfor a missing file\n')))
569
self.check_transport_contents('some text\nfor a missing file\n',
699
sub = t.clone('foo').clone('bar')
702
except TransportNotPossible:
703
self.assertTrue(t.is_readonly())
705
self.assertTrue(t.has('foo/bar'))
571
707
def test_append_file(self):
572
708
t = self.get_transport()
872
1045
def test_connection_error(self):
873
1046
"""ConnectionError is raised when connection is impossible.
875
The error may be raised from either the constructor or the first
876
operation on the transport.
1048
The error should be raised from the first operation on the transport.
879
1051
url = self._server.get_bogus_url()
880
1052
except NotImplementedError:
881
1053
raise TestSkipped("Transport %s has no bogus URL support." %
882
1054
self._server.__class__)
883
# This should be: but SSH still connects on construction. No COOKIE!
884
# self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
886
t = bzrlib.transport.get_transport(url)
888
except (ConnectionError, NoSuchFile), e:
890
except (Exception), e:
891
self.fail('Wrong exception thrown (%s.%s): %s'
892
% (e.__class__.__module__, e.__class__.__name__, e))
894
self.fail('Did not get the expected ConnectionError or NoSuchFile.')
1055
t = _mod_transport.get_transport_from_url(url)
1056
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
896
1058
def test_stat(self):
897
1059
# TODO: Test stat, just try once, and if it throws, stop testing
928
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
929
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
930
1092
subdir = t.clone('subdir')
931
subdir.stat('./file')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1096
def test_hardlink(self):
1097
from stat import ST_NLINK
1099
t = self.get_transport()
1101
source_name = "original_target"
1102
link_name = "target_link"
1104
self.build_tree([source_name], transport=t)
1107
t.hardlink(source_name, link_name)
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1112
st = t.stat(link_name)
1113
self.assertEqual(st[ST_NLINK], 2)
1114
except TransportNotPossible:
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1116
self._server.__class__)
1118
def test_symlink(self):
1119
from stat import S_ISLNK
1121
t = self.get_transport()
1123
source_name = "original_target"
1124
link_name = "target_link"
1126
self.build_tree([source_name], transport=t)
1129
t.symlink(source_name, link_name)
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1134
st = t.stat(link_name)
1135
self.assertTrue(S_ISLNK(st.st_mode),
1136
"expected symlink, got mode %o" % st.st_mode)
1137
except TransportNotPossible:
1138
raise TestSkipped("Transport %s does not support symlinks." %
1139
self._server.__class__)
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
934
1143
def test_list_dir(self):
935
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
936
1145
t = self.get_transport()
938
1147
if not t.listable():
939
1148
self.assertRaises(TransportNotPossible, t.list_dir, '.')
943
l = list(t.list_dir(d))
1151
def sorted_list(d, transport):
1152
l = list(transport.list_dir(d))
947
self.assertEqual([], sorted_list('.'))
1156
self.assertEqual([], sorted_list('.', t))
948
1157
# c2 is precisely one letter longer than c here to test that
949
1158
# suffixing is not confused.
950
1159
# a%25b checks that quoting is done consistently across transports
982
1197
self.build_tree(['a/', 'a/%'], transport=t)
984
1199
self.build_tree(['a/', 'a/%'])
986
1201
names = list(t.list_dir('a'))
987
1202
self.assertEqual(['%25'], names)
988
1203
self.assertIsInstance(names[0], str)
1205
def test_clone_preserve_info(self):
1206
t1 = self.get_transport()
1207
if not isinstance(t1, ConnectedTransport):
1208
raise TestSkipped("not a connected transport")
1210
t2 = t1.clone('subdir')
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1217
def test__reuse_for(self):
1218
t = self.get_transport()
1219
if not isinstance(t, ConnectedTransport):
1220
raise TestSkipped("not a connected transport")
1222
def new_url(scheme=None, user=None, password=None,
1223
host=None, port=None, path=None):
1224
"""Build a new url from t.base changing only parts of it.
1226
Only the parameters different from None will be changed.
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1237
if t._parsed_url.scheme == 'ftp':
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1246
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
# passwords are not taken into account because:
1248
# - it makes no sense to have two different valid passwords for the
1250
# - _password in ConnectedTransport is intended to collect what the
1251
# user specified from the command-line and there are cases where the
1252
# new url can contain no password (if the url was built from an
1253
# existing transport.base for example)
1254
# - password are considered part of the credentials provided at
1255
# connection creation time and as such may not be present in the url
1256
# (they may be typed by the user when prompted for example)
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
# We will not connect, we can use a invalid host
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1264
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
# No point in trying to reuse a transport for a local URL
1266
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1268
def test_connection_sharing(self):
1269
t = self.get_transport()
1270
if not isinstance(t, ConnectedTransport):
1271
raise TestSkipped("not a connected transport")
1273
c = t.clone('subdir')
1274
# Some transports will create the connection only when needed
1275
t.has('surely_not') # Force connection
1276
self.assertIs(t._get_connection(), c._get_connection())
1278
# Temporary failure, we need to create a new dummy connection
1279
new_connection = None
1280
t._set_connection(new_connection)
1281
# Check that both transports use the same connection
1282
self.assertIs(new_connection, t._get_connection())
1283
self.assertIs(new_connection, c._get_connection())
1285
def test_reuse_connection_for_various_paths(self):
1286
t = self.get_transport()
1287
if not isinstance(t, ConnectedTransport):
1288
raise TestSkipped("not a connected transport")
1290
t.has('surely_not') # Force connection
1291
self.assertIsNot(None, t._get_connection())
1293
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
self.assertIsNot(t, subdir)
1295
self.assertIs(t._get_connection(), subdir._get_connection())
1297
home = subdir._reuse_for(t.base + 'home')
1298
self.assertIs(t._get_connection(), home._get_connection())
1299
self.assertIs(subdir._get_connection(), home._get_connection())
990
1301
def test_clone(self):
991
1302
# TODO: Test that clone moves up and down the filesystem
992
1303
t1 = self.get_transport()
993
if isinstance(t1, chroot.ChrootTransportDecorator):
994
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
996
1305
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
998
self.failUnless(t1.has('a'))
999
self.failUnless(t1.has('b/c'))
1000
self.failIf(t1.has('c'))
1307
self.assertTrue(t1.has('a'))
1308
self.assertTrue(t1.has('b/c'))
1309
self.assertFalse(t1.has('c'))
1002
1311
t2 = t1.clone('b')
1003
1312
self.assertEqual(t1.base + 'b/', t2.base)
1005
self.failUnless(t2.has('c'))
1006
self.failIf(t2.has('a'))
1314
self.assertTrue(t2.has('c'))
1315
self.assertFalse(t2.has('a'))
1008
1317
t3 = t2.clone('..')
1009
self.failUnless(t3.has('a'))
1010
self.failIf(t3.has('c'))
1318
self.assertTrue(t3.has('a'))
1319
self.assertFalse(t3.has('c'))
1012
self.failIf(t1.has('b/d'))
1013
self.failIf(t2.has('d'))
1014
self.failIf(t3.has('b/d'))
1321
self.assertFalse(t1.has('b/d'))
1322
self.assertFalse(t2.has('d'))
1323
self.assertFalse(t3.has('b/d'))
1016
1325
if t1.is_readonly():
1017
open('b/d', 'wb').write('newfile\n')
1326
self.build_tree_contents([('b/d', 'newfile\n')])
1019
1328
t2.put_bytes('d', 'newfile\n')
1021
self.failUnless(t1.has('b/d'))
1022
self.failUnless(t2.has('d'))
1023
self.failUnless(t3.has('b/d'))
1330
self.assertTrue(t1.has('b/d'))
1331
self.assertTrue(t2.has('d'))
1332
self.assertTrue(t3.has('b/d'))
1025
1334
def test_clone_to_root(self):
1026
1335
orig_transport = self.get_transport()
1027
if isinstance(orig_transport, chroot.ChrootTransportDecorator):
1028
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1029
1336
# Repeatedly go up to a parent directory until we're at the root
1030
1337
# directory of this transport
1031
1338
root_transport = orig_transport
1108
1409
self.assertEqual(transport.clone("/").abspath('foo'),
1109
1410
transport.abspath("/foo"))
1412
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1413
def test_win32_abspath(self):
1414
# Note: we tried to set sys.platform='win32' so we could test on
1415
# other platforms too, but then osutils does platform specific
1416
# things at import time which defeated us...
1417
if sys.platform != 'win32':
1419
'Testing drive letters in abspath implemented only for win32')
1421
# smoke test for abspath on win32.
1422
# a transport based on 'file:///' never fully qualifies the drive.
1423
transport = _mod_transport.get_transport_from_url("file:///")
1424
self.assertEqual(transport.abspath("/"), "file:///")
1426
# but a transport that starts with a drive spec must keep it.
1427
transport = _mod_transport.get_transport_from_url("file:///C:/")
1428
self.assertEqual(transport.abspath("/"), "file:///C:/")
1111
1430
def test_local_abspath(self):
1112
1431
transport = self.get_transport()
1114
1433
p = transport.local_abspath('.')
1115
except TransportNotPossible:
1116
pass # This is not a local transport
1434
except (errors.NotLocalUrl, TransportNotPossible), e:
1435
# should be formattable
1118
1438
self.assertEqual(getcwd(), p)
1120
1440
def test_abspath_at_root(self):
1121
1441
t = self.get_transport()
1122
if isinstance(t, chroot.ChrootTransportDecorator):
1123
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1124
1442
# clone all the way to the top
1125
1443
new_transport = t.clone('..')
1126
1444
while new_transport.base != t.base:
1222
1574
self.check_transport_contents(contents, t, urlutils.escape(fname))
1224
1576
def test_connect_twice_is_same_content(self):
1225
# check that our server (whatever it is) is accessable reliably
1577
# check that our server (whatever it is) is accessible reliably
1226
1578
# via get_transport and multiple connections share content.
1227
1579
transport = self.get_transport()
1228
if isinstance(transport, chroot.ChrootTransportDecorator):
1229
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1230
1580
if transport.is_readonly():
1232
1582
transport.put_bytes('foo', 'bar')
1233
transport2 = self.get_transport()
1234
self.check_transport_contents('bar', transport2, 'foo')
1235
# its base should be usable.
1236
transport2 = bzrlib.transport.get_transport(transport.base)
1237
self.check_transport_contents('bar', transport2, 'foo')
1583
transport3 = self.get_transport()
1584
self.check_transport_contents('bar', transport3, 'foo')
1239
1586
# now opening at a relative url should give use a sane result:
1240
1587
transport.mkdir('newdir')
1241
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1242
transport2 = transport2.clone('..')
1243
self.check_transport_contents('bar', transport2, 'foo')
1588
transport5 = self.get_transport('newdir')
1589
transport6 = transport5.clone('..')
1590
self.check_transport_contents('bar', transport6, 'foo')
1245
1592
def test_lock_write(self):
1246
1593
"""Test transport-level write locks.
1307
1654
self.assertEqual(d[2], (0, '0'))
1308
1655
self.assertEqual(d[3], (3, '34'))
1657
def test_readv_with_adjust_for_latency(self):
1658
transport = self.get_transport()
1659
# the adjust for latency flag expands the data region returned
1660
# according to a per-transport heuristic, so testing is a little
1661
# tricky as we need more data than the largest combining that our
1662
# transports do. To accomodate this we generate random data and cross
1663
# reference the returned data with the random data. To avoid doing
1664
# multiple large random byte look ups we do several tests on the same
1666
content = osutils.rand_bytes(200*1024)
1667
content_size = len(content)
1668
if transport.is_readonly():
1669
self.build_tree_contents([('a', content)])
1671
transport.put_bytes('a', content)
1672
def check_result_data(result_vector):
1673
for item in result_vector:
1674
data_len = len(item[1])
1675
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1678
result = list(transport.readv('a', ((0, 30),),
1679
adjust_for_latency=True, upper_limit=content_size))
1680
# we expect 1 result, from 0, to something > 30
1681
self.assertEqual(1, len(result))
1682
self.assertEqual(0, result[0][0])
1683
self.assertTrue(len(result[0][1]) >= 30)
1684
check_result_data(result)
1685
# end of file corner case
1686
result = list(transport.readv('a', ((204700, 100),),
1687
adjust_for_latency=True, upper_limit=content_size))
1688
# we expect 1 result, from 204800- its length, to the end
1689
self.assertEqual(1, len(result))
1690
data_len = len(result[0][1])
1691
self.assertEqual(204800-data_len, result[0][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# out of order ranges are made in order
1695
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
adjust_for_latency=True, upper_limit=content_size))
1697
# we expect 2 results, in order, start and end.
1698
self.assertEqual(2, len(result))
1700
data_len = len(result[0][1])
1701
self.assertEqual(0, result[0][0])
1702
self.assertTrue(data_len >= 30)
1704
data_len = len(result[1][1])
1705
self.assertEqual(204800-data_len, result[1][0])
1706
self.assertTrue(data_len >= 100)
1707
check_result_data(result)
1708
# close ranges get combined (even if out of order)
1709
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
result = list(transport.readv('a', request_vector,
1711
adjust_for_latency=True, upper_limit=content_size))
1712
self.assertEqual(1, len(result))
1713
data_len = len(result[0][1])
1714
# minimum length is from 400 to 1034 - 634
1715
self.assertTrue(data_len >= 634)
1716
# must contain the region 400 to 1034
1717
self.assertTrue(result[0][0] <= 400)
1718
self.assertTrue(result[0][0] + data_len >= 1034)
1719
check_result_data(result)
1721
def test_readv_with_adjust_for_latency_with_big_file(self):
1722
transport = self.get_transport()
1723
# test from observed failure case.
1724
if transport.is_readonly():
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1727
transport.put_bytes('a', 'a'*1024*1024)
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
(465373, 800), (947422, 800)]
1731
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
found_items = [False]*9
1733
for pos, (start, length) in enumerate(broken_vector):
1734
# check the range is covered by the result
1735
for offset, data in results:
1736
if offset <= start and start + length <= offset + len(data):
1737
found_items[pos] = True
1738
self.assertEqual([True]*9, found_items)
1740
def test_get_with_open_write_stream_sees_all_content(self):
1741
t = self.get_transport()
1744
handle = t.open_write_stream('foo')
1747
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1310
1751
def test_get_smart_medium(self):
1311
1752
"""All transports must either give a smart medium, or know they can't.
1313
1754
transport = self.get_transport()
1315
medium = transport.get_smart_medium()
1316
self.assertIsInstance(medium, smart.SmartClientMedium)
1756
client_medium = transport.get_smart_medium()
1757
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1317
1758
except errors.NoSmartMedium:
1318
1759
# as long as we got it we're fine
1336
1777
# also raise a special error
1337
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1338
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1808
def test_stat_symlink(self):
1809
# if a transport points directly to a symlink (and supports symlinks
1810
# at all) you can tell this. helps with bug 32669.
1811
t = self.get_transport()
1813
t.symlink('target', 'link')
1814
except TransportNotPossible:
1815
raise TestSkipped("symlinks not supported")
1816
t2 = t.clone('link')
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)