1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
31
30
from bzrlib import (
35
transport as _mod_transport,
37
38
from bzrlib.errors import (ConnectionError,
45
43
TransportNotPossible,
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
78
74
for module in _get_transport_modules():
80
76
permutations = get_transport_test_permutations(
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
77
pyutils.get_named_object(module))
82
78
for (klass, server_factory) in permutations:
83
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
80
{"transport_class":klass,
104
100
super(TransportTests, self).setUp()
105
self._captureVar('BZR_NO_SMART_VFS', None)
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
107
103
def check_transport_contents(self, content, transport, relpath):
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
111
107
def test_ensure_base_missing(self):
112
108
""".ensure_base() should create the directory if it doesn't exist"""
212
208
self.build_tree(files, transport=t, line_endings='binary')
213
209
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
210
def iterate_and_close(func, *args):
211
for f in func(*args):
212
# We call f.read() here because things like paramiko actually
213
# spawn a thread to prefetch the content, which we want to
214
# consume before we close the handle.
217
self.assertRaises(NoSuchFile, iterate_and_close,
218
t.get_multi, ['a', 'b', 'c'])
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, iter(['a', 'b', 'c']))
217
222
def test_get_directory_read_gives_ReadError(self):
218
223
"""consistent errors for read() on a file returned by get()."""
273
278
handle.write('b')
274
279
self.assertEqual('b', t.get_bytes('foo'))
275
self.assertEqual('b', t.get('foo').read())
282
self.assertEqual('b', f.read())
287
296
t.put_bytes('a', 'some text for a\n')
288
self.failUnless(t.has('a'))
297
self.assertTrue(t.has('a'))
289
298
self.check_transport_contents('some text for a\n', t, 'a')
291
300
# The contents should be overwritten
303
312
t.put_bytes_non_atomic, 'a', 'some text for a\n')
306
self.failIf(t.has('a'))
315
self.assertFalse(t.has('a'))
307
316
t.put_bytes_non_atomic('a', 'some text for a\n')
308
self.failUnless(t.has('a'))
317
self.assertTrue(t.has('a'))
309
318
self.check_transport_contents('some text for a\n', t, 'a')
310
319
# Put also replaces contents
311
320
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
332
# Now test the create_parent flag
324
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
self.failIf(t.has('dir/a'))
335
self.assertFalse(t.has('dir/a'))
327
336
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
337
create_parent_dir=True)
329
338
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
410
result = t.put_file('a', StringIO('some text for a\n'))
402
411
# put_file returns the length of the data written
403
412
self.assertEqual(16, result)
404
self.failUnless(t.has('a'))
413
self.assertTrue(t.has('a'))
405
414
self.check_transport_contents('some text for a\n', t, 'a')
406
415
# Put also replaces contents
407
416
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
428
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
422
self.failIf(t.has('a'))
431
self.assertFalse(t.has('a'))
423
432
t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
self.failUnless(t.has('a'))
433
self.assertTrue(t.has('a'))
425
434
self.check_transport_contents('some text for a\n', t, 'a')
426
435
# Put also replaces contents
427
436
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
448
# Now test the create_parent flag
440
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
450
StringIO('contents\n'))
442
self.failIf(t.has('dir/a'))
451
self.assertFalse(t.has('dir/a'))
443
452
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
453
create_parent_dir=True)
445
454
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
644
653
self.build_tree(files, transport=transport_from)
645
654
self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
self.check_transport_contents(transport_to.get(f).read(),
656
self.check_transport_contents(transport_to.get_bytes(f),
648
657
transport_from, f)
650
659
t = self.get_transport()
673
682
files = ['a', 'b', 'c', 'd']
674
683
t.copy_to(iter(files), temp_transport)
676
self.check_transport_contents(temp_transport.get(f).read(),
685
self.check_transport_contents(temp_transport.get_bytes(f),
678
687
del temp_transport
824
833
t.put_bytes('a', 'a little bit of text\n')
825
self.failUnless(t.has('a'))
834
self.assertTrue(t.has('a'))
827
self.failIf(t.has('a'))
836
self.assertFalse(t.has('a'))
829
838
self.assertRaises(NoSuchFile, t.delete, 'a')
836
845
t.delete_multi(['a', 'c'])
837
846
self.assertEqual([False, True, False],
838
847
list(t.has_multi(['a', 'b', 'c'])))
839
self.failIf(t.has('a'))
840
self.failUnless(t.has('b'))
841
self.failIf(t.has('c'))
848
self.assertFalse(t.has('a'))
849
self.assertTrue(t.has('b'))
850
self.assertFalse(t.has('c'))
843
852
self.assertRaises(NoSuchFile,
844
853
t.delete_multi, ['a', 'b', 'c'])
905
914
t.mkdir('foo-baz')
907
916
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
self.failUnless(t.has('foo-bar'))
917
self.assertTrue(t.has('foo-bar'))
910
919
def test_rename_dir_succeeds(self):
911
920
t = self.get_transport()
994
1003
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
996
1005
t.move('a', 'b')
997
self.failUnless(t.has('b'))
998
self.failIf(t.has('a'))
1006
self.assertTrue(t.has('b'))
1007
self.assertFalse(t.has('a'))
1000
1009
self.check_transport_contents('a first file\n', t, 'b')
1001
1010
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
1012
# Overwrite a file
1004
1013
t.put_bytes('c', 'c this file\n')
1005
1014
t.move('c', 'b')
1006
self.failIf(t.has('c'))
1015
self.assertFalse(t.has('c'))
1007
1016
self.check_transport_contents('c this file\n', t, 'b')
1009
1018
# TODO: Try to write a test for atomicity
1041
1050
except NotImplementedError:
1042
1051
raise TestSkipped("Transport %s has no bogus URL support." %
1043
1052
self._server.__class__)
1044
t = get_transport(url)
1053
t = _mod_transport.get_transport(url)
1045
1054
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1056
def test_stat(self):
1063
1072
for path, size in zip(paths, sizes):
1064
1073
st = t.stat(path)
1065
1074
if path.endswith('/'):
1066
self.failUnless(S_ISDIR(st.st_mode))
1075
self.assertTrue(S_ISDIR(st.st_mode))
1067
1076
# directory sizes are meaningless
1069
self.failUnless(S_ISREG(st.st_mode))
1078
self.assertTrue(S_ISREG(st.st_mode))
1070
1079
self.assertEqual(size, st.st_size)
1072
1081
remote_stats = list(t.stat_multi(paths))
1079
1088
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
1089
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
1090
subdir = t.clone('subdir')
1082
subdir.stat('./file')
1091
st = subdir.stat('./file')
1092
st = subdir.stat('.')
1085
1094
def test_hardlink(self):
1086
1095
from stat import ST_NLINK
1096
1105
t.hardlink(source_name, link_name)
1098
self.failUnless(t.has(source_name))
1099
self.failUnless(t.has(link_name))
1107
self.assertTrue(t.has(source_name))
1108
self.assertTrue(t.has(link_name))
1101
1110
st = t.stat(link_name)
1102
self.failUnlessEqual(st[ST_NLINK], 2)
1111
self.assertEqual(st[ST_NLINK], 2)
1103
1112
except TransportNotPossible:
1104
1113
raise TestSkipped("Transport %s does not support hardlinks." %
1105
1114
self._server.__class__)
1118
1127
t.symlink(source_name, link_name)
1120
self.failUnless(t.has(source_name))
1121
self.failUnless(t.has(link_name))
1129
self.assertTrue(t.has(source_name))
1130
self.assertTrue(t.has(link_name))
1123
1132
st = t.stat(link_name)
1124
self.failUnless(S_ISLNK(st.st_mode),
1133
self.assertTrue(S_ISLNK(st.st_mode),
1125
1134
"expected symlink, got mode %o" % st.st_mode)
1126
1135
except TransportNotPossible:
1127
1136
raise TestSkipped("Transport %s does not support symlinks." %
1294
1303
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.failUnless(t1.has('a'))
1297
self.failUnless(t1.has('b/c'))
1298
self.failIf(t1.has('c'))
1305
self.assertTrue(t1.has('a'))
1306
self.assertTrue(t1.has('b/c'))
1307
self.assertFalse(t1.has('c'))
1300
1309
t2 = t1.clone('b')
1301
1310
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.failUnless(t2.has('c'))
1304
self.failIf(t2.has('a'))
1312
self.assertTrue(t2.has('c'))
1313
self.assertFalse(t2.has('a'))
1306
1315
t3 = t2.clone('..')
1307
self.failUnless(t3.has('a'))
1308
self.failIf(t3.has('c'))
1316
self.assertTrue(t3.has('a'))
1317
self.assertFalse(t3.has('c'))
1310
self.failIf(t1.has('b/d'))
1311
self.failIf(t2.has('d'))
1312
self.failIf(t3.has('b/d'))
1319
self.assertFalse(t1.has('b/d'))
1320
self.assertFalse(t2.has('d'))
1321
self.assertFalse(t3.has('b/d'))
1314
1323
if t1.is_readonly():
1315
1324
self.build_tree_contents([('b/d', 'newfile\n')])
1317
1326
t2.put_bytes('d', 'newfile\n')
1319
self.failUnless(t1.has('b/d'))
1320
self.failUnless(t2.has('d'))
1321
self.failUnless(t3.has('b/d'))
1328
self.assertTrue(t1.has('b/d'))
1329
self.assertTrue(t2.has('d'))
1330
self.assertTrue(t3.has('b/d'))
1323
1332
def test_clone_to_root(self):
1324
1333
orig_transport = self.get_transport()
1398
1407
self.assertEqual(transport.clone("/").abspath('foo'),
1399
1408
transport.abspath("/foo"))
1410
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1411
def test_win32_abspath(self):
1402
1412
# Note: we tried to set sys.platform='win32' so we could test on
1403
1413
# other platforms too, but then osutils does platform specific
1409
1419
# smoke test for abspath on win32.
1410
1420
# a transport based on 'file:///' never fully qualifies the drive.
1411
transport = get_transport("file:///")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///")
1421
transport = _mod_transport.get_transport("file:///")
1422
self.assertEqual(transport.abspath("/"), "file:///")
1414
1424
# but a transport that starts with a drive spec must keep it.
1415
transport = get_transport("file:///C:/")
1416
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1425
transport = _mod_transport.get_transport("file:///C:/")
1426
self.assertEqual(transport.abspath("/"), "file:///C:/")
1418
1428
def test_local_abspath(self):
1419
1429
transport = self.get_transport()
1616
1626
def test_readv(self):
1617
1627
transport = self.get_transport()
1618
1628
if transport.is_readonly():
1619
file('a', 'w').write('0123456789')
1629
with file('a', 'w') as f: f.write('0123456789')
1621
1631
transport.put_bytes('a', '0123456789')
1632
1642
def test_readv_out_of_order(self):
1633
1643
transport = self.get_transport()
1634
1644
if transport.is_readonly():
1635
file('a', 'w').write('0123456789')
1645
with file('a', 'w') as f: f.write('0123456789')
1637
1647
transport.put_bytes('a', '01234567890')
1710
1720
transport = self.get_transport()
1711
1721
# test from observed failure case.
1712
1722
if transport.is_readonly():
1713
file('a', 'w').write('a'*1024*1024)
1723
with file('a', 'w') as f: f.write('a'*1024*1024)
1715
1725
transport.put_bytes('a', 'a'*1024*1024)
1716
1726
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1760
def test_readv_short_read(self):
1751
1761
transport = self.get_transport()
1752
1762
if transport.is_readonly():
1753
file('a', 'w').write('0123456789')
1763
with file('a', 'w') as f: f.write('0123456789')
1755
1765
transport.put_bytes('a', '01234567890')