20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
30
from bzrlib import (
35
transport as _mod_transport,
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
LockError, NoSmartServer, PathError,
35
TransportNotPossible, ConnectionError,
38
from bzrlib.errors import (ConnectionError,
37
45
from bzrlib.osutils import getcwd
38
from bzrlib.symbol_versioning import zero_eleven
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
40
53
from bzrlib.tests.test_transport import TestTransportImplementation
41
from bzrlib.transport import memory, smart, chroot
42
import bzrlib.transport
46
"""Append the given text (file-like object) to the supplied filename."""
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
54
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
56
105
def check_transport_contents(self, content, transport, relpath):
57
"""Check that transport.get(relpath).read() == content."""
58
self.assertEqualDiff(content, transport.get(relpath).read())
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
60
149
def test_has(self):
61
150
t = self.get_transport()
65
154
self.assertEqual(True, t.has('a'))
66
155
self.assertEqual(False, t.has('c'))
67
156
self.assertEqual(True, t.has(urlutils.escape('%')))
68
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
69
[True, True, False, False, True, False, True, False])
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
70
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
71
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
72
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
73
[True, True, False, False, True, False, True, False])
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
74
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
75
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
77
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
78
176
current_transport = self.get_transport()
79
if isinstance(current_transport, chroot.ChrootTransportDecorator):
80
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
81
177
self.assertTrue(current_transport.has('/'))
82
178
root = current_transport.clone('/')
83
179
self.assertTrue(root.has(''))
94
190
self.build_tree(files, transport=t, line_endings='binary')
95
191
self.check_transport_contents('contents of a\n', t, 'a')
96
192
content_f = t.get_multi(files)
97
for content, f in zip(contents, content_f):
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
98
197
self.assertEqual(content, f.read())
100
199
content_f = t.get_multi(iter(files))
101
for content, f in zip(contents, content_f):
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
102
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
104
211
self.assertRaises(NoSuchFile, t.get, 'c')
105
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
106
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
108
244
def test_get_bytes(self):
109
245
t = self.get_transport()
120
256
for content, fname in zip(contents, files):
121
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
123
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
126
t = self.get_transport()
131
self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
132
self.check_transport_contents('string\ncontents\n', t, 'a')
134
self.applyDeprecated(zero_eleven,
135
t.put, 'b', StringIO('file-like\ncontents\n'))
136
self.check_transport_contents('file-like\ncontents\n', t, 'b')
138
self.assertRaises(NoSuchFile,
139
self.applyDeprecated,
141
t.put, 'path/doesnt/exist/c', StringIO('contents'))
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
143
290
def test_put_bytes(self):
144
291
t = self.get_transport()
375
519
dir_mode=0777, create_parent_dir=True)
376
520
self.assertTransportMode(t, 'dir777', 0777)
378
def test_put_multi(self):
379
t = self.get_transport()
383
self.assertEqual(2, self.applyDeprecated(zero_eleven,
384
t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
385
('d', StringIO('contents\nfor d\n'))]
387
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
388
[True, False, False, True])
389
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
390
self.check_transport_contents('contents\nfor d\n', t, 'd')
392
self.assertEqual(2, self.applyDeprecated(zero_eleven,
393
t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
394
('d', StringIO('another contents\nfor d\n'))])
396
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
397
self.check_transport_contents('another contents\nfor d\n', t, 'd')
399
def test_put_permissions(self):
400
t = self.get_transport()
404
if not t._can_roundtrip_unix_modebits():
405
# Can't roundtrip, so no need to run this test
407
self.applyDeprecated(zero_eleven, t.put, 'mode644',
408
StringIO('test text\n'), mode=0644)
409
self.assertTransportMode(t, 'mode644', 0644)
410
self.applyDeprecated(zero_eleven, t.put, 'mode666',
411
StringIO('test text\n'), mode=0666)
412
self.assertTransportMode(t, 'mode666', 0666)
413
self.applyDeprecated(zero_eleven, t.put, 'mode600',
414
StringIO('test text\n'), mode=0600)
415
self.assertTransportMode(t, 'mode600', 0600)
416
# Yes, you can put a file such that it becomes readonly
417
self.applyDeprecated(zero_eleven, t.put, 'mode400',
418
StringIO('test text\n'), mode=0400)
419
self.assertTransportMode(t, 'mode400', 0400)
420
self.applyDeprecated(zero_eleven, t.put_multi,
421
[('mmode644', StringIO('text\n'))], mode=0644)
422
self.assertTransportMode(t, 'mmode644', 0644)
424
# The default permissions should be based on the current umask
425
umask = osutils.get_umask()
426
self.applyDeprecated(zero_eleven, t.put, 'nomode',
427
StringIO('test text\n'), mode=None)
428
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
522
def test_put_bytes_unicode(self):
523
t = self.get_transport()
526
unicode_string = u'\u1234'
527
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
430
529
def test_mkdir(self):
431
530
t = self.get_transport()
433
532
if t.is_readonly():
434
# cannot mkdir on readonly transports. We're not testing for
533
# cannot mkdir on readonly transports. We're not testing for
435
534
# cache coherency because cache behaviour is not currently
436
535
# defined for the transport interface.
437
536
self.assertRaises(TransportNotPossible, t.mkdir, '.')
497
596
t.mkdir('dnomode', mode=None)
498
597
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
599
def test_opening_a_file_stream_creates_file(self):
600
t = self.get_transport()
603
handle = t.open_write_stream('foo')
605
self.assertEqual('', t.get_bytes('foo'))
609
def test_opening_a_file_stream_can_set_mode(self):
610
t = self.get_transport()
612
self.assertRaises((TransportNotPossible, NotImplementedError),
613
t.open_write_stream, 'foo')
615
if not t._can_roundtrip_unix_modebits():
616
# Can't roundtrip, so no need to run this test
619
def check_mode(name, mode, expected):
620
handle = t.open_write_stream(name, mode=mode)
622
self.assertTransportMode(t, name, expected)
623
check_mode('mode644', 0644, 0644)
624
check_mode('mode666', 0666, 0666)
625
check_mode('mode600', 0600, 0600)
626
# The default permissions should be based on the current umask
627
check_mode('nomode', None, 0666 & ~osutils.get_umask())
500
629
def test_copy_to(self):
501
630
# FIXME: test: same server to same server (partly done)
502
631
# same protocol two servers
503
632
# and different protocols (done for now except for MemoryTransport.
505
from bzrlib.transport.memory import MemoryTransport
507
635
def simple_copy_files(transport_from, transport_to):
508
636
files = ['a', 'b', 'c', 'd']
509
637
self.build_tree(files, transport=transport_from)
510
638
self.assertEqual(4, transport_from.copy_to(files, transport_to))
512
self.check_transport_contents(transport_to.get(f).read(),
640
self.check_transport_contents(transport_to.get_bytes(f),
513
641
transport_from, f)
515
643
t = self.get_transport()
549
677
self.assertTransportMode(temp_transport, f, mode)
551
def test_append(self):
679
def test_create_prefix(self):
552
680
t = self.get_transport()
556
t.put_bytes('a', 'diff\ncontents for\na\n')
557
t.put_bytes('b', 'contents\nfor b\n')
559
self.assertEqual(20, self.applyDeprecated(zero_eleven,
560
t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
562
self.check_transport_contents(
563
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
566
# And we can create new files, too
567
self.assertEqual(0, self.applyDeprecated(zero_eleven,
568
t.append, 'c', StringIO('some text\nfor a missing file\n')))
569
self.check_transport_contents('some text\nfor a missing file\n',
681
sub = t.clone('foo').clone('bar')
684
except TransportNotPossible:
685
self.assertTrue(t.is_readonly())
687
self.assertTrue(t.has('foo/bar'))
571
689
def test_append_file(self):
572
690
t = self.get_transport()
829
988
# perhaps all of this could be done in a subdirectory
831
990
t.put_bytes('a', 'a first file\n')
832
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
991
self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
835
self.failUnless(t.has('b'))
836
self.failIf(t.has('a'))
994
self.assertTrue(t.has('b'))
995
self.assertFalse(t.has('a'))
838
997
self.check_transport_contents('a first file\n', t, 'b')
839
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
998
self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
841
1000
# Overwrite a file
842
1001
t.put_bytes('c', 'c this file\n')
843
1002
t.move('c', 'b')
844
self.failIf(t.has('c'))
1003
self.assertFalse(t.has('c'))
845
1004
self.check_transport_contents('c this file\n', t, 'b')
847
1006
# TODO: Try to write a test for atomicity
848
# TODO: Test moving into a non-existant subdirectory
1007
# TODO: Test moving into a non-existent subdirectory
849
1008
# TODO: Test Transport.move_multi
851
1010
def test_copy(self):
872
1031
def test_connection_error(self):
873
1032
"""ConnectionError is raised when connection is impossible.
875
The error may be raised from either the constructor or the first
876
operation on the transport.
1034
The error should be raised from the first operation on the transport.
879
1037
url = self._server.get_bogus_url()
880
1038
except NotImplementedError:
881
1039
raise TestSkipped("Transport %s has no bogus URL support." %
882
1040
self._server.__class__)
883
# This should be: but SSH still connects on construction. No COOKIE!
884
# self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
886
t = bzrlib.transport.get_transport(url)
888
except (ConnectionError, NoSuchFile), e:
890
except (Exception), e:
891
self.fail('Wrong exception thrown (%s.%s): %s'
892
% (e.__class__.__module__, e.__class__.__name__, e))
894
self.fail('Did not get the expected ConnectionError or NoSuchFile.')
1041
t = _mod_transport.get_transport_from_url(url)
1042
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
896
1044
def test_stat(self):
897
1045
# TODO: Test stat, just try once, and if it throws, stop testing
928
1076
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
929
1077
self.build_tree(['subdir/', 'subdir/file'], transport=t)
930
1078
subdir = t.clone('subdir')
931
subdir.stat('./file')
1079
st = subdir.stat('./file')
1080
st = subdir.stat('.')
1082
def test_hardlink(self):
1083
from stat import ST_NLINK
1085
t = self.get_transport()
1087
source_name = "original_target"
1088
link_name = "target_link"
1090
self.build_tree([source_name], transport=t)
1093
t.hardlink(source_name, link_name)
1095
self.assertTrue(t.has(source_name))
1096
self.assertTrue(t.has(link_name))
1098
st = t.stat(link_name)
1099
self.assertEqual(st[ST_NLINK], 2)
1100
except TransportNotPossible:
1101
raise TestSkipped("Transport %s does not support hardlinks." %
1102
self._server.__class__)
1104
def test_symlink(self):
1105
from stat import S_ISLNK
1107
t = self.get_transport()
1109
source_name = "original_target"
1110
link_name = "target_link"
1112
self.build_tree([source_name], transport=t)
1115
t.symlink(source_name, link_name)
1117
self.assertTrue(t.has(source_name))
1118
self.assertTrue(t.has(link_name))
1120
st = t.stat(link_name)
1121
self.assertTrue(S_ISLNK(st.st_mode),
1122
"expected symlink, got mode %o" % st.st_mode)
1123
except TransportNotPossible:
1124
raise TestSkipped("Transport %s does not support symlinks." %
1125
self._server.__class__)
1127
self.knownFailure("Paramiko fails to create symlinks during tests")
934
1129
def test_list_dir(self):
935
1130
# TODO: Test list_dir, just try once, and if it throws, stop testing
936
1131
t = self.get_transport()
938
1133
if not t.listable():
939
1134
self.assertRaises(TransportNotPossible, t.list_dir, '.')
943
l = list(t.list_dir(d))
1137
def sorted_list(d, transport):
1138
l = list(transport.list_dir(d))
947
self.assertEqual([], sorted_list('.'))
1142
self.assertEqual([], sorted_list('.', t))
948
1143
# c2 is precisely one letter longer than c here to test that
949
1144
# suffixing is not confused.
950
1145
# a%25b checks that quoting is done consistently across transports
982
1183
self.build_tree(['a/', 'a/%'], transport=t)
984
1185
self.build_tree(['a/', 'a/%'])
986
1187
names = list(t.list_dir('a'))
987
1188
self.assertEqual(['%25'], names)
988
1189
self.assertIsInstance(names[0], str)
1191
def test_clone_preserve_info(self):
1192
t1 = self.get_transport()
1193
if not isinstance(t1, ConnectedTransport):
1194
raise TestSkipped("not a connected transport")
1196
t2 = t1.clone('subdir')
1197
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1203
def test__reuse_for(self):
1204
t = self.get_transport()
1205
if not isinstance(t, ConnectedTransport):
1206
raise TestSkipped("not a connected transport")
1208
def new_url(scheme=None, user=None, password=None,
1209
host=None, port=None, path=None):
1210
"""Build a new url from t.base changing only parts of it.
1212
Only the parameters different from None will be changed.
1214
if scheme is None: scheme = t._parsed_url.scheme
1215
if user is None: user = t._parsed_url.user
1216
if password is None: password = t._parsed_url.password
1217
if user is None: user = t._parsed_url.user
1218
if host is None: host = t._parsed_url.host
1219
if port is None: port = t._parsed_url.port
1220
if path is None: path = t._parsed_url.path
1221
return str(urlutils.URL(scheme, user, password, host, port, path))
1223
if t._parsed_url.scheme == 'ftp':
1227
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
if t._parsed_url.user == 'me':
1232
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1233
# passwords are not taken into account because:
1234
# - it makes no sense to have two different valid passwords for the
1236
# - _password in ConnectedTransport is intended to collect what the
1237
# user specified from the command-line and there are cases where the
1238
# new url can contain no password (if the url was built from an
1239
# existing transport.base for example)
1240
# - password are considered part of the credentials provided at
1241
# connection creation time and as such may not be present in the url
1242
# (they may be typed by the user when prompted for example)
1243
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1244
# We will not connect, we can use a invalid host
1245
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1246
if t._parsed_url.port == 1234:
1250
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1251
# No point in trying to reuse a transport for a local URL
1252
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1254
def test_connection_sharing(self):
1255
t = self.get_transport()
1256
if not isinstance(t, ConnectedTransport):
1257
raise TestSkipped("not a connected transport")
1259
c = t.clone('subdir')
1260
# Some transports will create the connection only when needed
1261
t.has('surely_not') # Force connection
1262
self.assertIs(t._get_connection(), c._get_connection())
1264
# Temporary failure, we need to create a new dummy connection
1265
new_connection = None
1266
t._set_connection(new_connection)
1267
# Check that both transports use the same connection
1268
self.assertIs(new_connection, t._get_connection())
1269
self.assertIs(new_connection, c._get_connection())
1271
def test_reuse_connection_for_various_paths(self):
1272
t = self.get_transport()
1273
if not isinstance(t, ConnectedTransport):
1274
raise TestSkipped("not a connected transport")
1276
t.has('surely_not') # Force connection
1277
self.assertIsNot(None, t._get_connection())
1279
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1280
self.assertIsNot(t, subdir)
1281
self.assertIs(t._get_connection(), subdir._get_connection())
1283
home = subdir._reuse_for(t.base + 'home')
1284
self.assertIs(t._get_connection(), home._get_connection())
1285
self.assertIs(subdir._get_connection(), home._get_connection())
990
1287
def test_clone(self):
991
1288
# TODO: Test that clone moves up and down the filesystem
992
1289
t1 = self.get_transport()
993
if isinstance(t1, chroot.ChrootTransportDecorator):
994
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
996
1291
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
998
self.failUnless(t1.has('a'))
999
self.failUnless(t1.has('b/c'))
1000
self.failIf(t1.has('c'))
1293
self.assertTrue(t1.has('a'))
1294
self.assertTrue(t1.has('b/c'))
1295
self.assertFalse(t1.has('c'))
1002
1297
t2 = t1.clone('b')
1003
1298
self.assertEqual(t1.base + 'b/', t2.base)
1005
self.failUnless(t2.has('c'))
1006
self.failIf(t2.has('a'))
1300
self.assertTrue(t2.has('c'))
1301
self.assertFalse(t2.has('a'))
1008
1303
t3 = t2.clone('..')
1009
self.failUnless(t3.has('a'))
1010
self.failIf(t3.has('c'))
1304
self.assertTrue(t3.has('a'))
1305
self.assertFalse(t3.has('c'))
1012
self.failIf(t1.has('b/d'))
1013
self.failIf(t2.has('d'))
1014
self.failIf(t3.has('b/d'))
1307
self.assertFalse(t1.has('b/d'))
1308
self.assertFalse(t2.has('d'))
1309
self.assertFalse(t3.has('b/d'))
1016
1311
if t1.is_readonly():
1017
open('b/d', 'wb').write('newfile\n')
1312
self.build_tree_contents([('b/d', 'newfile\n')])
1019
1314
t2.put_bytes('d', 'newfile\n')
1021
self.failUnless(t1.has('b/d'))
1022
self.failUnless(t2.has('d'))
1023
self.failUnless(t3.has('b/d'))
1316
self.assertTrue(t1.has('b/d'))
1317
self.assertTrue(t2.has('d'))
1318
self.assertTrue(t3.has('b/d'))
1025
1320
def test_clone_to_root(self):
1026
1321
orig_transport = self.get_transport()
1027
if isinstance(orig_transport, chroot.ChrootTransportDecorator):
1028
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1029
1322
# Repeatedly go up to a parent directory until we're at the root
1030
1323
# directory of this transport
1031
1324
root_transport = orig_transport
1108
1395
self.assertEqual(transport.clone("/").abspath('foo'),
1109
1396
transport.abspath("/foo"))
1398
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1399
def test_win32_abspath(self):
1400
# Note: we tried to set sys.platform='win32' so we could test on
1401
# other platforms too, but then osutils does platform specific
1402
# things at import time which defeated us...
1403
if sys.platform != 'win32':
1405
'Testing drive letters in abspath implemented only for win32')
1407
# smoke test for abspath on win32.
1408
# a transport based on 'file:///' never fully qualifies the drive.
1409
transport = _mod_transport.get_transport_from_url("file:///")
1410
self.assertEqual(transport.abspath("/"), "file:///")
1412
# but a transport that starts with a drive spec must keep it.
1413
transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
self.assertEqual(transport.abspath("/"), "file:///C:/")
1111
1416
def test_local_abspath(self):
1112
1417
transport = self.get_transport()
1114
1419
p = transport.local_abspath('.')
1115
except TransportNotPossible:
1116
pass # This is not a local transport
1420
except (errors.NotLocalUrl, TransportNotPossible), e:
1421
# should be formattable
1118
1424
self.assertEqual(getcwd(), p)
1120
1426
def test_abspath_at_root(self):
1121
1427
t = self.get_transport()
1122
if isinstance(t, chroot.ChrootTransportDecorator):
1123
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1124
1428
# clone all the way to the top
1125
1429
new_transport = t.clone('..')
1126
1430
while new_transport.base != t.base:
1222
1560
self.check_transport_contents(contents, t, urlutils.escape(fname))
1224
1562
def test_connect_twice_is_same_content(self):
1225
# check that our server (whatever it is) is accessable reliably
1563
# check that our server (whatever it is) is accessible reliably
1226
1564
# via get_transport and multiple connections share content.
1227
1565
transport = self.get_transport()
1228
if isinstance(transport, chroot.ChrootTransportDecorator):
1229
raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1230
1566
if transport.is_readonly():
1232
1568
transport.put_bytes('foo', 'bar')
1233
transport2 = self.get_transport()
1234
self.check_transport_contents('bar', transport2, 'foo')
1235
# its base should be usable.
1236
transport2 = bzrlib.transport.get_transport(transport.base)
1237
self.check_transport_contents('bar', transport2, 'foo')
1569
transport3 = self.get_transport()
1570
self.check_transport_contents('bar', transport3, 'foo')
1239
1572
# now opening at a relative url should give use a sane result:
1240
1573
transport.mkdir('newdir')
1241
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1242
transport2 = transport2.clone('..')
1243
self.check_transport_contents('bar', transport2, 'foo')
1574
transport5 = self.get_transport('newdir')
1575
transport6 = transport5.clone('..')
1576
self.check_transport_contents('bar', transport6, 'foo')
1245
1578
def test_lock_write(self):
1246
1579
"""Test transport-level write locks.
1307
1640
self.assertEqual(d[2], (0, '0'))
1308
1641
self.assertEqual(d[3], (3, '34'))
1643
def test_readv_with_adjust_for_latency(self):
1644
transport = self.get_transport()
1645
# the adjust for latency flag expands the data region returned
1646
# according to a per-transport heuristic, so testing is a little
1647
# tricky as we need more data than the largest combining that our
1648
# transports do. To accomodate this we generate random data and cross
1649
# reference the returned data with the random data. To avoid doing
1650
# multiple large random byte look ups we do several tests on the same
1652
content = osutils.rand_bytes(200*1024)
1653
content_size = len(content)
1654
if transport.is_readonly():
1655
self.build_tree_contents([('a', content)])
1657
transport.put_bytes('a', content)
1658
def check_result_data(result_vector):
1659
for item in result_vector:
1660
data_len = len(item[1])
1661
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1664
result = list(transport.readv('a', ((0, 30),),
1665
adjust_for_latency=True, upper_limit=content_size))
1666
# we expect 1 result, from 0, to something > 30
1667
self.assertEqual(1, len(result))
1668
self.assertEqual(0, result[0][0])
1669
self.assertTrue(len(result[0][1]) >= 30)
1670
check_result_data(result)
1671
# end of file corner case
1672
result = list(transport.readv('a', ((204700, 100),),
1673
adjust_for_latency=True, upper_limit=content_size))
1674
# we expect 1 result, from 204800- its length, to the end
1675
self.assertEqual(1, len(result))
1676
data_len = len(result[0][1])
1677
self.assertEqual(204800-data_len, result[0][0])
1678
self.assertTrue(data_len >= 100)
1679
check_result_data(result)
1680
# out of order ranges are made in order
1681
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1682
adjust_for_latency=True, upper_limit=content_size))
1683
# we expect 2 results, in order, start and end.
1684
self.assertEqual(2, len(result))
1686
data_len = len(result[0][1])
1687
self.assertEqual(0, result[0][0])
1688
self.assertTrue(data_len >= 30)
1690
data_len = len(result[1][1])
1691
self.assertEqual(204800-data_len, result[1][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# close ranges get combined (even if out of order)
1695
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1696
result = list(transport.readv('a', request_vector,
1697
adjust_for_latency=True, upper_limit=content_size))
1698
self.assertEqual(1, len(result))
1699
data_len = len(result[0][1])
1700
# minimum length is from 400 to 1034 - 634
1701
self.assertTrue(data_len >= 634)
1702
# must contain the region 400 to 1034
1703
self.assertTrue(result[0][0] <= 400)
1704
self.assertTrue(result[0][0] + data_len >= 1034)
1705
check_result_data(result)
1707
def test_readv_with_adjust_for_latency_with_big_file(self):
1708
transport = self.get_transport()
1709
# test from observed failure case.
1710
if transport.is_readonly():
1711
with file('a', 'w') as f: f.write('a'*1024*1024)
1713
transport.put_bytes('a', 'a'*1024*1024)
1714
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1715
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1716
(465373, 800), (947422, 800)]
1717
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1718
found_items = [False]*9
1719
for pos, (start, length) in enumerate(broken_vector):
1720
# check the range is covered by the result
1721
for offset, data in results:
1722
if offset <= start and start + length <= offset + len(data):
1723
found_items[pos] = True
1724
self.assertEqual([True]*9, found_items)
1726
def test_get_with_open_write_stream_sees_all_content(self):
1727
t = self.get_transport()
1730
handle = t.open_write_stream('foo')
1733
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1310
1737
def test_get_smart_medium(self):
1311
1738
"""All transports must either give a smart medium, or know they can't.
1313
1740
transport = self.get_transport()
1315
medium = transport.get_smart_medium()
1316
self.assertIsInstance(medium, smart.SmartClientMedium)
1742
client_medium = transport.get_smart_medium()
1743
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1317
1744
except errors.NoSmartMedium:
1318
1745
# as long as we got it we're fine
1336
1763
# also raise a special error
1337
1764
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1338
1765
transport.readv, 'a', [(12,2)])
1767
def test_no_segment_parameters(self):
1768
"""Segment parameters should be stripped and stored in
1769
transport.segment_parameters."""
1770
transport = self.get_transport("foo")
1771
self.assertEqual({}, transport.get_segment_parameters())
1773
def test_segment_parameters(self):
1774
"""Segment parameters should be stripped and stored in
1775
transport.get_segment_parameters()."""
1776
base_url = self._server.get_url()
1777
parameters = {"key1": "val1", "key2": "val2"}
1778
url = urlutils.join_segment_parameters(base_url, parameters)
1779
transport = _mod_transport.get_transport_from_url(url)
1780
self.assertEqual(parameters, transport.get_segment_parameters())
1782
def test_set_segment_parameters(self):
1783
"""Segment parameters can be set and show up in base."""
1784
transport = self.get_transport("foo")
1785
orig_base = transport.base
1786
transport.set_segment_parameter("arm", "board")
1787
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
transport.set_segment_parameter("arm", None)
1790
transport.set_segment_parameter("nonexistant", None)
1791
self.assertEqual({}, transport.get_segment_parameters())
1792
self.assertEqual(orig_base, transport.base)
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))
1806
def test_abspath_url_unquote_unreserved(self):
1807
"""URLs from abspath should have unreserved characters unquoted
1809
Need consistent quoting notably for tildes, see lp:842223 for more.
1811
t = self.get_transport()
1812
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
self.assertEqual(t.base + "-.09AZ_az~",
1814
t.abspath(needlessly_escaped_dir))
1816
def test_clone_url_unquote_unreserved(self):
1817
"""Base URL of a cloned branch needs unreserved characters unquoted
1819
Cloned transports should be prefix comparable for things like the
1820
isolation checking of tests, see lp:842223 for more.
1822
t1 = self.get_transport()
1823
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
self.build_tree([needlessly_escaped_dir], transport=t1)
1825
t2 = t1.clone(needlessly_escaped_dir)
1826
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1828
def test_hook_post_connection_one(self):
1829
"""Fire post_connect hook after a ConnectedTransport is first used"""
1831
Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
t = self.get_transport()
1833
self.assertEqual([], log)
1834
t.has("non-existant")
1835
if isinstance(t, RemoteTransport):
1836
self.assertEqual([t.get_smart_medium()], log)
1837
elif isinstance(t, ConnectedTransport):
1838
self.assertEqual([t], log)
1840
self.assertEqual([], log)
1842
def test_hook_post_connection_multi(self):
1843
"""Fire post_connect hook once per unshared underlying connection"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t1 = self.get_transport()
1848
t3 = self.get_transport()
1849
self.assertEqual([], log)
1853
if isinstance(t1, RemoteTransport):
1854
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
elif isinstance(t1, ConnectedTransport):
1856
self.assertEqual([t1, t3], log)
1858
self.assertEqual([], log)