1
# Copyright (C) 2005-2011 Canonical Ltd
1
# Copyright (C) 2005-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
30
31
from bzrlib import (
35
transport as _mod_transport,
38
37
from bzrlib.errors import (ConnectionError,
43
45
TransportNotPossible,
45
47
from bzrlib.osutils import getcwd
46
48
from bzrlib.smart import medium
47
49
from bzrlib.tests import (
74
78
for module in _get_transport_modules():
76
80
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
78
82
for (klass, server_factory) in permutations:
79
83
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
84
{"transport_class":klass,
100
104
super(TransportTests, self).setUp()
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
105
self._captureVar('BZR_NO_SMART_VFS', None)
103
107
def check_transport_contents(self, content, transport, relpath):
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
107
111
def test_ensure_base_missing(self):
108
112
""".ensure_base() should create the directory if it doesn't exist"""
269
273
handle.write('b')
270
274
self.assertEqual('b', t.get_bytes('foo'))
273
self.assertEqual('b', f.read())
275
self.assertEqual('b', t.get('foo').read())
287
287
t.put_bytes('a', 'some text for a\n')
288
self.assertTrue(t.has('a'))
288
self.failUnless(t.has('a'))
289
289
self.check_transport_contents('some text for a\n', t, 'a')
291
291
# The contents should be overwritten
303
303
t.put_bytes_non_atomic, 'a', 'some text for a\n')
306
self.assertFalse(t.has('a'))
306
self.failIf(t.has('a'))
307
307
t.put_bytes_non_atomic('a', 'some text for a\n')
308
self.assertTrue(t.has('a'))
308
self.failUnless(t.has('a'))
309
309
self.check_transport_contents('some text for a\n', t, 'a')
310
310
# Put also replaces contents
311
311
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
323
# Now test the create_parent flag
324
324
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
self.assertFalse(t.has('dir/a'))
326
self.failIf(t.has('dir/a'))
327
327
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
328
create_parent_dir=True)
329
329
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
401
result = t.put_file('a', StringIO('some text for a\n'))
402
402
# put_file returns the length of the data written
403
403
self.assertEqual(16, result)
404
self.assertTrue(t.has('a'))
404
self.failUnless(t.has('a'))
405
405
self.check_transport_contents('some text for a\n', t, 'a')
406
406
# Put also replaces contents
407
407
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
419
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
422
self.assertFalse(t.has('a'))
422
self.failIf(t.has('a'))
423
423
t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
self.assertTrue(t.has('a'))
424
self.failUnless(t.has('a'))
425
425
self.check_transport_contents('some text for a\n', t, 'a')
426
426
# Put also replaces contents
427
427
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
439
# Now test the create_parent flag
440
440
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
441
StringIO('contents\n'))
442
self.assertFalse(t.has('dir/a'))
442
self.failIf(t.has('dir/a'))
443
443
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
444
create_parent_dir=True)
445
445
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
644
644
self.build_tree(files, transport=transport_from)
645
645
self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
self.check_transport_contents(transport_to.get_bytes(f),
647
self.check_transport_contents(transport_to.get(f).read(),
648
648
transport_from, f)
650
650
t = self.get_transport()
673
673
files = ['a', 'b', 'c', 'd']
674
674
t.copy_to(iter(files), temp_transport)
676
self.check_transport_contents(temp_transport.get_bytes(f),
676
self.check_transport_contents(temp_transport.get(f).read(),
678
678
del temp_transport
824
824
t.put_bytes('a', 'a little bit of text\n')
825
self.assertTrue(t.has('a'))
825
self.failUnless(t.has('a'))
827
self.assertFalse(t.has('a'))
827
self.failIf(t.has('a'))
829
829
self.assertRaises(NoSuchFile, t.delete, 'a')
836
836
t.delete_multi(['a', 'c'])
837
837
self.assertEqual([False, True, False],
838
838
list(t.has_multi(['a', 'b', 'c'])))
839
self.assertFalse(t.has('a'))
840
self.assertTrue(t.has('b'))
841
self.assertFalse(t.has('c'))
839
self.failIf(t.has('a'))
840
self.failUnless(t.has('b'))
841
self.failIf(t.has('c'))
843
843
self.assertRaises(NoSuchFile,
844
844
t.delete_multi, ['a', 'b', 'c'])
905
905
t.mkdir('foo-baz')
907
907
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
self.assertTrue(t.has('foo-bar'))
908
self.failUnless(t.has('foo-bar'))
910
910
def test_rename_dir_succeeds(self):
911
911
t = self.get_transport()
994
994
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
997
self.assertTrue(t.has('b'))
998
self.assertFalse(t.has('a'))
997
self.failUnless(t.has('b'))
998
self.failIf(t.has('a'))
1000
1000
self.check_transport_contents('a first file\n', t, 'b')
1001
1001
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
1003
# Overwrite a file
1004
1004
t.put_bytes('c', 'c this file\n')
1005
1005
t.move('c', 'b')
1006
self.assertFalse(t.has('c'))
1006
self.failIf(t.has('c'))
1007
1007
self.check_transport_contents('c this file\n', t, 'b')
1009
1009
# TODO: Try to write a test for atomicity
1041
1041
except NotImplementedError:
1042
1042
raise TestSkipped("Transport %s has no bogus URL support." %
1043
1043
self._server.__class__)
1044
t = _mod_transport.get_transport(url)
1044
t = get_transport(url)
1045
1045
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1047
def test_stat(self):
1063
1063
for path, size in zip(paths, sizes):
1064
1064
st = t.stat(path)
1065
1065
if path.endswith('/'):
1066
self.assertTrue(S_ISDIR(st.st_mode))
1066
self.failUnless(S_ISDIR(st.st_mode))
1067
1067
# directory sizes are meaningless
1069
self.assertTrue(S_ISREG(st.st_mode))
1069
self.failUnless(S_ISREG(st.st_mode))
1070
1070
self.assertEqual(size, st.st_size)
1072
1072
remote_stats = list(t.stat_multi(paths))
1096
1096
t.hardlink(source_name, link_name)
1098
self.assertTrue(t.has(source_name))
1099
self.assertTrue(t.has(link_name))
1098
self.failUnless(t.has(source_name))
1099
self.failUnless(t.has(link_name))
1101
1101
st = t.stat(link_name)
1102
self.assertEqual(st[ST_NLINK], 2)
1102
self.failUnlessEqual(st[ST_NLINK], 2)
1103
1103
except TransportNotPossible:
1104
1104
raise TestSkipped("Transport %s does not support hardlinks." %
1105
1105
self._server.__class__)
1118
1118
t.symlink(source_name, link_name)
1120
self.assertTrue(t.has(source_name))
1121
self.assertTrue(t.has(link_name))
1120
self.failUnless(t.has(source_name))
1121
self.failUnless(t.has(link_name))
1123
1123
st = t.stat(link_name)
1124
self.assertTrue(S_ISLNK(st.st_mode),
1124
self.failUnless(S_ISLNK(st.st_mode),
1125
1125
"expected symlink, got mode %o" % st.st_mode)
1126
1126
except TransportNotPossible:
1127
1127
raise TestSkipped("Transport %s does not support symlinks." %
1294
1294
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.assertTrue(t1.has('a'))
1297
self.assertTrue(t1.has('b/c'))
1298
self.assertFalse(t1.has('c'))
1296
self.failUnless(t1.has('a'))
1297
self.failUnless(t1.has('b/c'))
1298
self.failIf(t1.has('c'))
1300
1300
t2 = t1.clone('b')
1301
1301
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.assertTrue(t2.has('c'))
1304
self.assertFalse(t2.has('a'))
1303
self.failUnless(t2.has('c'))
1304
self.failIf(t2.has('a'))
1306
1306
t3 = t2.clone('..')
1307
self.assertTrue(t3.has('a'))
1308
self.assertFalse(t3.has('c'))
1307
self.failUnless(t3.has('a'))
1308
self.failIf(t3.has('c'))
1310
self.assertFalse(t1.has('b/d'))
1311
self.assertFalse(t2.has('d'))
1312
self.assertFalse(t3.has('b/d'))
1310
self.failIf(t1.has('b/d'))
1311
self.failIf(t2.has('d'))
1312
self.failIf(t3.has('b/d'))
1314
1314
if t1.is_readonly():
1315
1315
self.build_tree_contents([('b/d', 'newfile\n')])
1317
1317
t2.put_bytes('d', 'newfile\n')
1319
self.assertTrue(t1.has('b/d'))
1320
self.assertTrue(t2.has('d'))
1321
self.assertTrue(t3.has('b/d'))
1319
self.failUnless(t1.has('b/d'))
1320
self.failUnless(t2.has('d'))
1321
self.failUnless(t3.has('b/d'))
1323
1323
def test_clone_to_root(self):
1324
1324
orig_transport = self.get_transport()
1398
1398
self.assertEqual(transport.clone("/").abspath('foo'),
1399
1399
transport.abspath("/foo"))
1401
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1402
1401
def test_win32_abspath(self):
1403
1402
# Note: we tried to set sys.platform='win32' so we could test on
1404
1403
# other platforms too, but then osutils does platform specific
1410
1409
# smoke test for abspath on win32.
1411
1410
# a transport based on 'file:///' never fully qualifies the drive.
1412
transport = _mod_transport.get_transport("file:///")
1413
self.assertEqual(transport.abspath("/"), "file:///")
1411
transport = get_transport("file:///")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///")
1415
1414
# but a transport that starts with a drive spec must keep it.
1416
transport = _mod_transport.get_transport("file:///C:/")
1417
self.assertEqual(transport.abspath("/"), "file:///C:/")
1415
transport = get_transport("file:///C:/")
1416
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1419
1418
def test_local_abspath(self):
1420
1419
transport = self.get_transport()