28
28
from bzrlib import (
32
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
LockError, NoSmartServer, PathError,
34
35
TransportNotPossible, ConnectionError,
36
37
from bzrlib.osutils import getcwd
37
38
from bzrlib.symbol_versioning import zero_eleven
38
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
39
40
from bzrlib.tests.test_transport import TestTransportImplementation
40
from bzrlib.transport import memory
41
from bzrlib.transport import memory, smart
41
42
import bzrlib.transport
71
if hasattr(excClass,'__name__'): excName = excClass.__name__
72
else: excName = str(excClass)
72
if getattr(excClass,'__name__', None) is not None:
73
excName = excClass.__name__
75
excName = str(excClass)
73
76
raise self.failureException, "%s not raised" % excName
75
78
def test_has(self):
89
92
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
90
93
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
95
def test_has_root_works(self):
96
current_transport = self.get_transport()
97
# import pdb;pdb.set_trace()
98
self.assertTrue(current_transport.has('/'))
99
root = current_transport.clone('/')
100
self.assertTrue(root.has(''))
92
102
def test_get(self):
93
103
t = self.get_transport()
142
152
t.put, 'b', StringIO('file-like\ncontents\n'))
143
153
self.check_transport_contents('file-like\ncontents\n', t, 'b')
155
self.assertRaises(NoSuchFile,
156
self.applyDeprecated,
158
t.put, 'path/doesnt/exist/c', StringIO('contents'))
145
160
def test_put_bytes(self):
146
161
t = self.get_transport()
243
258
umask = osutils.get_umask()
244
259
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
245
260
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
262
# We should also be able to set the mode for a parent directory
264
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
265
dir_mode=0700, create_parent_dir=True)
266
self.assertTransportMode(t, 'dir700', 0700)
267
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
268
dir_mode=0770, create_parent_dir=True)
269
self.assertTransportMode(t, 'dir770', 0770)
270
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
271
dir_mode=0777, create_parent_dir=True)
272
self.assertTransportMode(t, 'dir777', 0777)
247
274
def test_put_file(self):
248
275
t = self.get_transport()
352
379
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
353
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
382
# We should also be able to set the mode for a parent directory
385
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
386
dir_mode=0700, create_parent_dir=True)
387
self.assertTransportMode(t, 'dir700', 0700)
388
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
389
dir_mode=0770, create_parent_dir=True)
390
self.assertTransportMode(t, 'dir770', 0770)
391
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
392
dir_mode=0777, create_parent_dir=True)
393
self.assertTransportMode(t, 'dir777', 0777)
355
395
def test_put_multi(self):
356
396
t = self.get_transport()
373
413
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
374
414
self.check_transport_contents('another contents\nfor d\n', t, 'd')
416
def test_put_permissions(self):
417
t = self.get_transport()
421
if not t._can_roundtrip_unix_modebits():
422
# Can't roundtrip, so no need to run this test
424
self.applyDeprecated(zero_eleven, t.put, 'mode644',
425
StringIO('test text\n'), mode=0644)
426
self.assertTransportMode(t, 'mode644', 0644)
427
self.applyDeprecated(zero_eleven, t.put, 'mode666',
428
StringIO('test text\n'), mode=0666)
429
self.assertTransportMode(t, 'mode666', 0666)
430
self.applyDeprecated(zero_eleven, t.put, 'mode600',
431
StringIO('test text\n'), mode=0600)
432
self.assertTransportMode(t, 'mode600', 0600)
433
# Yes, you can put a file such that it becomes readonly
434
self.applyDeprecated(zero_eleven, t.put, 'mode400',
435
StringIO('test text\n'), mode=0400)
436
self.assertTransportMode(t, 'mode400', 0400)
437
self.applyDeprecated(zero_eleven, t.put_multi,
438
[('mmode644', StringIO('text\n'))], mode=0644)
439
self.assertTransportMode(t, 'mmode644', 0644)
441
# The default permissions should be based on the current umask
442
umask = osutils.get_umask()
443
self.applyDeprecated(zero_eleven, t.put, 'nomode',
444
StringIO('test text\n'), mode=None)
445
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
376
447
def test_mkdir(self):
377
448
t = self.get_transport()
724
795
t.mkdir('adir/asubdir')
726
797
t.mkdir('bdir/bsubdir')
798
# any kind of PathError would be OK, though we normally expect
727
800
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
728
801
# nothing was changed so it should still be as before
729
802
self.assertTrue(t.has('bdir/bsubdir'))
814
887
# TODO: test copy_multi
816
889
def test_connection_error(self):
817
"""ConnectionError is raised when connection is impossible"""
890
"""ConnectionError is raised when connection is impossible.
892
The error may be raised from either the constructor or the first
893
operation on the transport.
819
896
url = self._server.get_bogus_url()
820
897
except NotImplementedError:
821
898
raise TestSkipped("Transport %s has no bogus URL support." %
822
899
self._server.__class__)
900
# This should be: but SSH still connects on construction. No COOKIE!
901
# self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
824
903
t = bzrlib.transport.get_transport(url)
825
904
t.get('.bzr/branch')
965
1044
self.failUnless(t2.has('d'))
966
1045
self.failUnless(t3.has('b/d'))
1047
def test_clone_to_root(self):
1048
orig_transport = self.get_transport()
1049
# Repeatedly go up to a parent directory until we're at the root
1050
# directory of this transport
1051
root_transport = orig_transport
1052
new_transport = root_transport.clone("..")
1053
# as we are walking up directories, the path must be must be
1054
# growing less, except at the top
1055
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1056
or new_transport.base == root_transport.base)
1057
while new_transport.base != root_transport.base:
1058
root_transport = new_transport
1059
new_transport = root_transport.clone("..")
1060
# as we are walking up directories, the path must be must be
1061
# growing less, except at the top
1062
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1063
or new_transport.base == root_transport.base)
1065
# Cloning to "/" should take us to exactly the same location.
1066
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1067
# the abspath of "/" from the original transport should be the same
1068
# as the base at the root:
1069
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1071
# At the root, the URL must still end with / as its a directory
1072
self.assertEqual(root_transport.base[-1], '/')
1074
def test_clone_from_root(self):
1075
"""At the root, cloning to a simple dir should just do string append."""
1076
orig_transport = self.get_transport()
1077
root_transport = orig_transport.clone('/')
1078
self.assertEqual(root_transport.base + '.bzr/',
1079
root_transport.clone('.bzr').base)
1081
def test_base_url(self):
1082
t = self.get_transport()
1083
self.assertEqual('/', t.base[-1])
968
1085
def test_relpath(self):
969
1086
t = self.get_transport()
970
1087
self.assertEqual('', t.relpath(t.base))
993
1110
# specific test cases.
994
1111
transport = self.get_transport()
996
# disabled because some transports might normalize urls in generating
997
# the abspath - eg http+pycurl-> just http -- mbp 20060308
998
1113
self.assertEqual(transport.base + 'relpath',
999
1114
transport.abspath('relpath'))
1116
# This should work without raising an error.
1117
transport.abspath("/")
1119
# the abspath of "/" and "/foo/.." should result in the same location
1120
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1001
1122
def test_local_abspath(self):
1002
1123
transport = self.get_transport()
1129
1250
self.check_transport_contents('bar', transport2, 'foo')
1131
1252
def test_lock_write(self):
1253
"""Test transport-level write locks.
1255
These are deprecated and transports may decline to support them.
1132
1257
transport = self.get_transport()
1133
1258
if transport.is_readonly():
1134
1259
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1136
1261
transport.put_bytes('lock', '')
1137
lock = transport.lock_write('lock')
1263
lock = transport.lock_write('lock')
1264
except TransportNotPossible:
1138
1266
# TODO make this consistent on all platforms:
1139
1267
# self.assertRaises(LockError, transport.lock_write, 'lock')
1142
1270
def test_lock_read(self):
1271
"""Test transport-level read locks.
1273
These are deprecated and transports may decline to support them.
1143
1275
transport = self.get_transport()
1144
1276
if transport.is_readonly():
1145
1277
file('lock', 'w').close()
1147
1279
transport.put_bytes('lock', '')
1148
lock = transport.lock_read('lock')
1281
lock = transport.lock_read('lock')
1282
except TransportNotPossible:
1149
1284
# TODO make this consistent on all platforms:
1150
1285
# self.assertRaises(LockError, transport.lock_read, 'lock')
1175
1310
self.assertEqual(d[1], (9, '9'))
1176
1311
self.assertEqual(d[2], (0, '0'))
1177
1312
self.assertEqual(d[3], (3, '34'))
1314
def test_get_smart_medium(self):
1315
"""All transports must either give a smart medium, or know they can't.
1317
transport = self.get_transport()
1319
medium = transport.get_smart_medium()
1320
self.assertIsInstance(medium, smart.SmartClientMedium)
1321
except errors.NoSmartMedium:
1322
# as long as we got it we're fine
1325
def test_readv_short_read(self):
1326
transport = self.get_transport()
1327
if transport.is_readonly():
1328
file('a', 'w').write('0123456789')
1330
transport.put_bytes('a', '01234567890')
1332
# This is intentionally reading off the end of the file
1333
# since we are sure that it cannot get there
1334
self.assertListRaises((errors.ShortReadvError, AssertionError),
1335
transport.readv, 'a', [(1,1), (8,10)])
1337
# This is trying to seek past the end of the file, it should
1338
# also raise a special error
1339
self.assertListRaises(errors.ShortReadvError,
1340
transport.readv, 'a', [(12,2)])