~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
import sys
27
27
 
28
28
from bzrlib import (
 
29
    errors,
29
30
    osutils,
30
31
    urlutils,
31
32
    )
32
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
33
 
                           LockError, PathError,
 
34
                           LockError, NoSmartServer, PathError,
34
35
                           TransportNotPossible, ConnectionError,
35
36
                           InvalidURL)
36
37
from bzrlib.osutils import getcwd
37
38
from bzrlib.symbol_versioning import zero_eleven
38
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
39
40
from bzrlib.tests.test_transport import TestTransportImplementation
40
 
from bzrlib.transport import memory
 
41
from bzrlib.transport import memory, smart
41
42
import bzrlib.transport
42
43
 
43
44
 
91
92
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
92
93
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
93
94
 
 
95
    def test_has_root_works(self):
 
96
        current_transport = self.get_transport()
 
97
        # import pdb;pdb.set_trace()
 
98
        self.assertTrue(current_transport.has('/'))
 
99
        root = current_transport.clone('/')
 
100
        self.assertTrue(root.has(''))
 
101
 
94
102
    def test_get(self):
95
103
        t = self.get_transport()
96
104
 
144
152
                             t.put, 'b', StringIO('file-like\ncontents\n'))
145
153
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
146
154
 
 
155
        self.assertRaises(NoSuchFile,
 
156
            self.applyDeprecated,
 
157
            zero_eleven,
 
158
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
159
 
147
160
    def test_put_bytes(self):
148
161
        t = self.get_transport()
149
162
 
400
413
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
401
414
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
402
415
 
 
416
    def test_put_permissions(self):
 
417
        t = self.get_transport()
 
418
 
 
419
        if t.is_readonly():
 
420
            return
 
421
        if not t._can_roundtrip_unix_modebits():
 
422
            # Can't roundtrip, so no need to run this test
 
423
            return
 
424
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
 
425
                             StringIO('test text\n'), mode=0644)
 
426
        self.assertTransportMode(t, 'mode644', 0644)
 
427
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
 
428
                             StringIO('test text\n'), mode=0666)
 
429
        self.assertTransportMode(t, 'mode666', 0666)
 
430
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
 
431
                             StringIO('test text\n'), mode=0600)
 
432
        self.assertTransportMode(t, 'mode600', 0600)
 
433
        # Yes, you can put a file such that it becomes readonly
 
434
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
 
435
                             StringIO('test text\n'), mode=0400)
 
436
        self.assertTransportMode(t, 'mode400', 0400)
 
437
        self.applyDeprecated(zero_eleven, t.put_multi,
 
438
                             [('mmode644', StringIO('text\n'))], mode=0644)
 
439
        self.assertTransportMode(t, 'mmode644', 0644)
 
440
 
 
441
        # The default permissions should be based on the current umask
 
442
        umask = osutils.get_umask()
 
443
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
 
444
                             StringIO('test text\n'), mode=None)
 
445
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
446
        
403
447
    def test_mkdir(self):
404
448
        t = self.get_transport()
405
449
 
751
795
        t.mkdir('adir/asubdir')
752
796
        t.mkdir('bdir')
753
797
        t.mkdir('bdir/bsubdir')
 
798
        # any kind of PathError would be OK, though we normally expect
 
799
        # DirectoryNotEmpty
754
800
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
755
801
        # nothing was changed so it should still be as before
756
802
        self.assertTrue(t.has('bdir/bsubdir'))
841
887
        # TODO: test copy_multi
842
888
 
843
889
    def test_connection_error(self):
844
 
        """ConnectionError is raised when connection is impossible"""
 
890
        """ConnectionError is raised when connection is impossible.
 
891
        
 
892
        The error may be raised from either the constructor or the first
 
893
        operation on the transport.
 
894
        """
845
895
        try:
846
896
            url = self._server.get_bogus_url()
847
897
        except NotImplementedError:
992
1042
        self.failUnless(t2.has('d'))
993
1043
        self.failUnless(t3.has('b/d'))
994
1044
 
 
1045
    def test_clone_to_root(self):
 
1046
        orig_transport = self.get_transport()
 
1047
        # Repeatedly go up to a parent directory until we're at the root
 
1048
        # directory of this transport
 
1049
        root_transport = orig_transport
 
1050
        new_transport = root_transport.clone("..")
 
1051
        # as we are walking up directories, the path must be must be 
 
1052
        # growing less, except at the top
 
1053
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1054
            or new_transport.base == root_transport.base)
 
1055
        while new_transport.base != root_transport.base:
 
1056
            root_transport = new_transport
 
1057
            new_transport = root_transport.clone("..")
 
1058
            # as we are walking up directories, the path must be must be 
 
1059
            # growing less, except at the top
 
1060
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1061
                or new_transport.base == root_transport.base)
 
1062
 
 
1063
        # Cloning to "/" should take us to exactly the same location.
 
1064
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1065
        # the abspath of "/" from the original transport should be the same
 
1066
        # as the base at the root:
 
1067
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1068
 
 
1069
        # At the root, the URL must still end with / as its a directory
 
1070
        self.assertEqual(root_transport.base[-1], '/')
 
1071
 
 
1072
    def test_clone_from_root(self):
 
1073
        """At the root, cloning to a simple dir should just do string append."""
 
1074
        orig_transport = self.get_transport()
 
1075
        root_transport = orig_transport.clone('/')
 
1076
        self.assertEqual(root_transport.base + '.bzr/',
 
1077
            root_transport.clone('.bzr').base)
 
1078
 
 
1079
    def test_base_url(self):
 
1080
        t = self.get_transport()
 
1081
        self.assertEqual('/', t.base[-1])
 
1082
 
995
1083
    def test_relpath(self):
996
1084
        t = self.get_transport()
997
1085
        self.assertEqual('', t.relpath(t.base))
1020
1108
        # specific test cases.
1021
1109
        transport = self.get_transport()
1022
1110
        
1023
 
        # disabled because some transports might normalize urls in generating
1024
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
1025
1111
        self.assertEqual(transport.base + 'relpath',
1026
1112
                         transport.abspath('relpath'))
1027
1113
 
 
1114
        # This should work without raising an error.
 
1115
        transport.abspath("/")
 
1116
 
 
1117
        # the abspath of "/" and "/foo/.." should result in the same location
 
1118
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1119
 
1028
1120
    def test_local_abspath(self):
1029
1121
        transport = self.get_transport()
1030
1122
        try:
1156
1248
        self.check_transport_contents('bar', transport2, 'foo')
1157
1249
 
1158
1250
    def test_lock_write(self):
 
1251
        """Test transport-level write locks.
 
1252
 
 
1253
        These are deprecated and transports may decline to support them.
 
1254
        """
1159
1255
        transport = self.get_transport()
1160
1256
        if transport.is_readonly():
1161
1257
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1162
1258
            return
1163
1259
        transport.put_bytes('lock', '')
1164
 
        lock = transport.lock_write('lock')
 
1260
        try:
 
1261
            lock = transport.lock_write('lock')
 
1262
        except TransportNotPossible:
 
1263
            return
1165
1264
        # TODO make this consistent on all platforms:
1166
1265
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1167
1266
        lock.unlock()
1168
1267
 
1169
1268
    def test_lock_read(self):
 
1269
        """Test transport-level read locks.
 
1270
 
 
1271
        These are deprecated and transports may decline to support them.
 
1272
        """
1170
1273
        transport = self.get_transport()
1171
1274
        if transport.is_readonly():
1172
1275
            file('lock', 'w').close()
1173
1276
        else:
1174
1277
            transport.put_bytes('lock', '')
1175
 
        lock = transport.lock_read('lock')
 
1278
        try:
 
1279
            lock = transport.lock_read('lock')
 
1280
        except TransportNotPossible:
 
1281
            return
1176
1282
        # TODO make this consistent on all platforms:
1177
1283
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1178
1284
        lock.unlock()
1202
1308
        self.assertEqual(d[1], (9, '9'))
1203
1309
        self.assertEqual(d[2], (0, '0'))
1204
1310
        self.assertEqual(d[3], (3, '34'))
 
1311
 
 
1312
    def test_get_smart_client(self):
 
1313
        """All transports must either give a smart client, or know they can't.
 
1314
 
 
1315
        For some transports such as http this might depend on probing to see 
 
1316
        what's actually present on the other end.  (But we can adjust for that 
 
1317
        in the future.)
 
1318
        """
 
1319
        transport = self.get_transport()
 
1320
        try:
 
1321
            client = transport.get_smart_client()
 
1322
            # XXX: should be a more general class
 
1323
            self.assertIsInstance(client, smart.SmartStreamClient)
 
1324
        except NoSmartServer:
 
1325
            # as long as we got it we're fine
 
1326
            pass
 
1327
 
 
1328
    def test_readv_short_read(self):
 
1329
        transport = self.get_transport()
 
1330
        if transport.is_readonly():
 
1331
            file('a', 'w').write('0123456789')
 
1332
        else:
 
1333
            transport.put_bytes('a', '01234567890')
 
1334
 
 
1335
        # This is intentionally reading off the end of the file
 
1336
        # since we are sure that it cannot get there
 
1337
        self.assertListRaises((errors.ShortReadvError, AssertionError),
 
1338
                              transport.readv, 'a', [(1,1), (8,10)])
 
1339
 
 
1340
        # This is trying to seek past the end of the file, it should
 
1341
        # also raise a special error
 
1342
        self.assertListRaises(errors.ShortReadvError,
 
1343
                              transport.readv, 'a', [(12,2)])