1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
100
100
super(TransportTests, self).setUp()
101
self._captureVar('BZR_NO_SMART_VFS', None)
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
103
103
def check_transport_contents(self, content, transport, relpath):
104
"""Check that transport.get(relpath).read() == content."""
105
self.assertEqualDiff(content, transport.get(relpath).read())
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
107
107
def test_ensure_base_missing(self):
108
108
""".ensure_base() should create the directory if it doesn't exist"""
208
208
self.build_tree(files, transport=t, line_endings='binary')
209
209
self.assertRaises(NoSuchFile, t.get, 'c')
210
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
211
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
210
def iterate_and_close(func, *args):
211
for f in func(*args):
212
# We call f.read() here because things like paramiko actually
213
# spawn a thread to prefetch the content, which we want to
214
# consume before we close the handle.
217
self.assertRaises(NoSuchFile, iterate_and_close,
218
t.get_multi, ['a', 'b', 'c'])
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, iter(['a', 'b', 'c']))
213
222
def test_get_directory_read_gives_ReadError(self):
214
223
"""consistent errors for read() on a file returned by get()."""
269
278
handle.write('b')
270
279
self.assertEqual('b', t.get_bytes('foo'))
271
self.assertEqual('b', t.get('foo').read())
282
self.assertEqual('b', f.read())
283
296
t.put_bytes('a', 'some text for a\n')
284
self.failUnless(t.has('a'))
297
self.assertTrue(t.has('a'))
285
298
self.check_transport_contents('some text for a\n', t, 'a')
287
300
# The contents should be overwritten
299
312
t.put_bytes_non_atomic, 'a', 'some text for a\n')
302
self.failIf(t.has('a'))
315
self.assertFalse(t.has('a'))
303
316
t.put_bytes_non_atomic('a', 'some text for a\n')
304
self.failUnless(t.has('a'))
317
self.assertTrue(t.has('a'))
305
318
self.check_transport_contents('some text for a\n', t, 'a')
306
319
# Put also replaces contents
307
320
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
319
332
# Now test the create_parent flag
320
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
322
self.failIf(t.has('dir/a'))
335
self.assertFalse(t.has('dir/a'))
323
336
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
324
337
create_parent_dir=True)
325
338
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
397
410
result = t.put_file('a', StringIO('some text for a\n'))
398
411
# put_file returns the length of the data written
399
412
self.assertEqual(16, result)
400
self.failUnless(t.has('a'))
413
self.assertTrue(t.has('a'))
401
414
self.check_transport_contents('some text for a\n', t, 'a')
402
415
# Put also replaces contents
403
416
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
415
428
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
418
self.failIf(t.has('a'))
431
self.assertFalse(t.has('a'))
419
432
t.put_file_non_atomic('a', StringIO('some text for a\n'))
420
self.failUnless(t.has('a'))
433
self.assertTrue(t.has('a'))
421
434
self.check_transport_contents('some text for a\n', t, 'a')
422
435
# Put also replaces contents
423
436
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
435
448
# Now test the create_parent flag
436
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
437
450
StringIO('contents\n'))
438
self.failIf(t.has('dir/a'))
451
self.assertFalse(t.has('dir/a'))
439
452
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
440
453
create_parent_dir=True)
441
454
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
640
653
self.build_tree(files, transport=transport_from)
641
654
self.assertEqual(4, transport_from.copy_to(files, transport_to))
643
self.check_transport_contents(transport_to.get(f).read(),
656
self.check_transport_contents(transport_to.get_bytes(f),
644
657
transport_from, f)
646
659
t = self.get_transport()
669
682
files = ['a', 'b', 'c', 'd']
670
683
t.copy_to(iter(files), temp_transport)
672
self.check_transport_contents(temp_transport.get(f).read(),
685
self.check_transport_contents(temp_transport.get_bytes(f),
674
687
del temp_transport
820
833
t.put_bytes('a', 'a little bit of text\n')
821
self.failUnless(t.has('a'))
834
self.assertTrue(t.has('a'))
823
self.failIf(t.has('a'))
836
self.assertFalse(t.has('a'))
825
838
self.assertRaises(NoSuchFile, t.delete, 'a')
832
845
t.delete_multi(['a', 'c'])
833
846
self.assertEqual([False, True, False],
834
847
list(t.has_multi(['a', 'b', 'c'])))
835
self.failIf(t.has('a'))
836
self.failUnless(t.has('b'))
837
self.failIf(t.has('c'))
848
self.assertFalse(t.has('a'))
849
self.assertTrue(t.has('b'))
850
self.assertFalse(t.has('c'))
839
852
self.assertRaises(NoSuchFile,
840
853
t.delete_multi, ['a', 'b', 'c'])
901
914
t.mkdir('foo-baz')
903
916
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
904
self.failUnless(t.has('foo-bar'))
917
self.assertTrue(t.has('foo-bar'))
906
919
def test_rename_dir_succeeds(self):
907
920
t = self.get_transport()
990
1003
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
992
1005
t.move('a', 'b')
993
self.failUnless(t.has('b'))
994
self.failIf(t.has('a'))
1006
self.assertTrue(t.has('b'))
1007
self.assertFalse(t.has('a'))
996
1009
self.check_transport_contents('a first file\n', t, 'b')
997
1010
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
999
1012
# Overwrite a file
1000
1013
t.put_bytes('c', 'c this file\n')
1001
1014
t.move('c', 'b')
1002
self.failIf(t.has('c'))
1015
self.assertFalse(t.has('c'))
1003
1016
self.check_transport_contents('c this file\n', t, 'b')
1005
1018
# TODO: Try to write a test for atomicity
1037
1050
except NotImplementedError:
1038
1051
raise TestSkipped("Transport %s has no bogus URL support." %
1039
1052
self._server.__class__)
1040
t = get_transport(url)
1053
t = _mod_transport.get_transport_from_url(url)
1041
1054
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1043
1056
def test_stat(self):
1059
1072
for path, size in zip(paths, sizes):
1060
1073
st = t.stat(path)
1061
1074
if path.endswith('/'):
1062
self.failUnless(S_ISDIR(st.st_mode))
1075
self.assertTrue(S_ISDIR(st.st_mode))
1063
1076
# directory sizes are meaningless
1065
self.failUnless(S_ISREG(st.st_mode))
1078
self.assertTrue(S_ISREG(st.st_mode))
1066
1079
self.assertEqual(size, st.st_size)
1068
1081
remote_stats = list(t.stat_multi(paths))
1075
1088
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1076
1089
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1077
1090
subdir = t.clone('subdir')
1078
subdir.stat('./file')
1091
st = subdir.stat('./file')
1092
st = subdir.stat('.')
1081
1094
def test_hardlink(self):
1082
1095
from stat import ST_NLINK
1092
1105
t.hardlink(source_name, link_name)
1094
self.failUnless(t.has(source_name))
1095
self.failUnless(t.has(link_name))
1107
self.assertTrue(t.has(source_name))
1108
self.assertTrue(t.has(link_name))
1097
1110
st = t.stat(link_name)
1098
self.failUnlessEqual(st[ST_NLINK], 2)
1111
self.assertEqual(st[ST_NLINK], 2)
1099
1112
except TransportNotPossible:
1100
1113
raise TestSkipped("Transport %s does not support hardlinks." %
1101
1114
self._server.__class__)
1114
1127
t.symlink(source_name, link_name)
1116
self.failUnless(t.has(source_name))
1117
self.failUnless(t.has(link_name))
1129
self.assertTrue(t.has(source_name))
1130
self.assertTrue(t.has(link_name))
1119
1132
st = t.stat(link_name)
1120
self.failUnless(S_ISLNK(st.st_mode),
1133
self.assertTrue(S_ISLNK(st.st_mode),
1121
1134
"expected symlink, got mode %o" % st.st_mode)
1122
1135
except TransportNotPossible:
1123
1136
raise TestSkipped("Transport %s does not support symlinks." %
1124
1137
self._server.__class__)
1125
1138
except IOError:
1126
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1139
self.knownFailure("Paramiko fails to create symlinks during tests")
1128
1141
def test_list_dir(self):
1129
1142
# TODO: Test list_dir, just try once, and if it throws, stop testing
1193
1206
raise TestSkipped("not a connected transport")
1195
1208
t2 = t1.clone('subdir')
1196
self.assertEquals(t1._scheme, t2._scheme)
1197
self.assertEquals(t1._user, t2._user)
1198
self.assertEquals(t1._password, t2._password)
1199
self.assertEquals(t1._host, t2._host)
1200
self.assertEquals(t1._port, t2._port)
1209
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1210
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1211
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1212
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1213
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1202
1215
def test__reuse_for(self):
1203
1216
t = self.get_transport()
1211
1224
Only the parameters different from None will be changed.
1213
if scheme is None: scheme = t._scheme
1214
if user is None: user = t._user
1215
if password is None: password = t._password
1216
if user is None: user = t._user
1217
if host is None: host = t._host
1218
if port is None: port = t._port
1219
if path is None: path = t._path
1220
return t._unsplit_url(scheme, user, password, host, port, path)
1226
if scheme is None: scheme = t._parsed_url.scheme
1227
if user is None: user = t._parsed_url.user
1228
if password is None: password = t._parsed_url.password
1229
if user is None: user = t._parsed_url.user
1230
if host is None: host = t._parsed_url.host
1231
if port is None: port = t._parsed_url.port
1232
if path is None: path = t._parsed_url.path
1233
return str(urlutils.URL(scheme, user, password, host, port, path))
1222
if t._scheme == 'ftp':
1235
if t._parsed_url.scheme == 'ftp':
1223
1236
scheme = 'sftp'
1226
1239
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1240
if t._parsed_url.user == 'me':
1241
1254
# (they may be typed by the user when prompted for example)
1242
1255
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1243
1256
# We will not connect, we can use a invalid host
1244
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1257
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1258
if t._parsed_url.port == 1234:
1290
1303
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1292
self.failUnless(t1.has('a'))
1293
self.failUnless(t1.has('b/c'))
1294
self.failIf(t1.has('c'))
1305
self.assertTrue(t1.has('a'))
1306
self.assertTrue(t1.has('b/c'))
1307
self.assertFalse(t1.has('c'))
1296
1309
t2 = t1.clone('b')
1297
1310
self.assertEqual(t1.base + 'b/', t2.base)
1299
self.failUnless(t2.has('c'))
1300
self.failIf(t2.has('a'))
1312
self.assertTrue(t2.has('c'))
1313
self.assertFalse(t2.has('a'))
1302
1315
t3 = t2.clone('..')
1303
self.failUnless(t3.has('a'))
1304
self.failIf(t3.has('c'))
1316
self.assertTrue(t3.has('a'))
1317
self.assertFalse(t3.has('c'))
1306
self.failIf(t1.has('b/d'))
1307
self.failIf(t2.has('d'))
1308
self.failIf(t3.has('b/d'))
1319
self.assertFalse(t1.has('b/d'))
1320
self.assertFalse(t2.has('d'))
1321
self.assertFalse(t3.has('b/d'))
1310
1323
if t1.is_readonly():
1311
1324
self.build_tree_contents([('b/d', 'newfile\n')])
1313
1326
t2.put_bytes('d', 'newfile\n')
1315
self.failUnless(t1.has('b/d'))
1316
self.failUnless(t2.has('d'))
1317
self.failUnless(t3.has('b/d'))
1328
self.assertTrue(t1.has('b/d'))
1329
self.assertTrue(t2.has('d'))
1330
self.assertTrue(t3.has('b/d'))
1319
1332
def test_clone_to_root(self):
1320
1333
orig_transport = self.get_transport()
1394
1407
self.assertEqual(transport.clone("/").abspath('foo'),
1395
1408
transport.abspath("/foo"))
1410
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1397
1411
def test_win32_abspath(self):
1398
1412
# Note: we tried to set sys.platform='win32' so we could test on
1399
1413
# other platforms too, but then osutils does platform specific
1405
1419
# smoke test for abspath on win32.
1406
1420
# a transport based on 'file:///' never fully qualifies the drive.
1407
transport = get_transport("file:///")
1408
self.failUnlessEqual(transport.abspath("/"), "file:///")
1421
transport = _mod_transport.get_transport_from_url("file:///")
1422
self.assertEqual(transport.abspath("/"), "file:///")
1410
1424
# but a transport that starts with a drive spec must keep it.
1411
transport = get_transport("file:///C:/")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1425
transport = _mod_transport.get_transport_from_url("file:///C:/")
1426
self.assertEqual(transport.abspath("/"), "file:///C:/")
1414
1428
def test_local_abspath(self):
1415
1429
transport = self.get_transport()
1542
1556
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1543
1557
if no_unicode_support:
1544
raise tests.KnownFailure("test server cannot handle unicode paths")
1558
self.knownFailure("test server cannot handle unicode paths")
1547
1561
self.build_tree(files, transport=t, line_endings='binary')
1612
1626
def test_readv(self):
1613
1627
transport = self.get_transport()
1614
1628
if transport.is_readonly():
1615
file('a', 'w').write('0123456789')
1629
with file('a', 'w') as f: f.write('0123456789')
1617
1631
transport.put_bytes('a', '0123456789')
1628
1642
def test_readv_out_of_order(self):
1629
1643
transport = self.get_transport()
1630
1644
if transport.is_readonly():
1631
file('a', 'w').write('0123456789')
1645
with file('a', 'w') as f: f.write('0123456789')
1633
1647
transport.put_bytes('a', '01234567890')
1706
1720
transport = self.get_transport()
1707
1721
# test from observed failure case.
1708
1722
if transport.is_readonly():
1709
file('a', 'w').write('a'*1024*1024)
1723
with file('a', 'w') as f: f.write('a'*1024*1024)
1711
1725
transport.put_bytes('a', 'a'*1024*1024)
1712
1726
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1746
1760
def test_readv_short_read(self):
1747
1761
transport = self.get_transport()
1748
1762
if transport.is_readonly():
1749
file('a', 'w').write('0123456789')
1763
with file('a', 'w') as f: f.write('0123456789')
1751
1765
transport.put_bytes('a', '01234567890')
1762
1776
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1763
1777
transport.readv, 'a', [(12,2)])
1779
def test_no_segment_parameters(self):
1780
"""Segment parameters should be stripped and stored in
1781
transport.segment_parameters."""
1782
transport = self.get_transport("foo")
1783
self.assertEquals({}, transport.get_segment_parameters())
1785
def test_segment_parameters(self):
1786
"""Segment parameters should be stripped and stored in
1787
transport.get_segment_parameters()."""
1788
base_url = self._server.get_url()
1789
parameters = {"key1": "val1", "key2": "val2"}
1790
url = urlutils.join_segment_parameters(base_url, parameters)
1791
transport = _mod_transport.get_transport_from_url(url)
1792
self.assertEquals(parameters, transport.get_segment_parameters())
1794
def test_set_segment_parameters(self):
1795
"""Segment parameters can be set and show up in base."""
1796
transport = self.get_transport("foo")
1797
orig_base = transport.base
1798
transport.set_segment_parameter("arm", "board")
1799
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1800
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1801
transport.set_segment_parameter("arm", None)
1802
transport.set_segment_parameter("nonexistant", None)
1803
self.assertEquals({}, transport.get_segment_parameters())
1804
self.assertEquals(orig_base, transport.base)
1765
1806
def test_stat_symlink(self):
1766
1807
# if a transport points directly to a symlink (and supports symlinks
1767
1808
# at all) you can tell this. helps with bug 32669.
1773
1814
t2 = t.clone('link')
1774
1815
st = t2.stat('')
1775
1816
self.assertTrue(stat.S_ISLNK(st.st_mode))
1818
def test_abspath_url_unquote_unreserved(self):
1819
"""URLs from abspath should have unreserved characters unquoted
1821
Need consistent quoting notably for tildes, see lp:842223 for more.
1823
t = self.get_transport()
1824
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1825
self.assertEqual(t.base + "-.09AZ_az~",
1826
t.abspath(needlessly_escaped_dir))
1828
def test_clone_url_unquote_unreserved(self):
1829
"""Base URL of a cloned branch needs unreserved characters unquoted
1831
Cloned transports should be prefix comparable for things like the
1832
isolation checking of tests, see lp:842223 for more.
1834
t1 = self.get_transport()
1835
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1836
self.build_tree([needlessly_escaped_dir], transport=t1)
1837
t2 = t1.clone(needlessly_escaped_dir)
1838
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)