~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
import sys
27
27
 
28
28
from bzrlib import (
 
29
    errors,
29
30
    osutils,
30
31
    urlutils,
31
32
    )
32
33
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
33
 
                           LockError, PathError,
 
34
                           LockError, NoSmartServer, PathError,
34
35
                           TransportNotPossible, ConnectionError,
35
36
                           InvalidURL)
36
37
from bzrlib.osutils import getcwd
37
38
from bzrlib.symbol_versioning import zero_eleven
38
39
from bzrlib.tests import TestCaseInTempDir, TestSkipped
39
40
from bzrlib.tests.test_transport import TestTransportImplementation
40
 
from bzrlib.transport import memory
 
41
from bzrlib.transport import memory, smart
41
42
import bzrlib.transport
42
43
 
43
44
 
68
69
        except excClass:
69
70
            return
70
71
        else:
71
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
72
 
            else: excName = str(excClass)
 
72
            if getattr(excClass,'__name__', None) is not None:
 
73
                excName = excClass.__name__
 
74
            else:
 
75
                excName = str(excClass)
73
76
            raise self.failureException, "%s not raised" % excName
74
77
 
75
78
    def test_has(self):
89
92
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
90
93
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
91
94
 
 
95
    def test_has_root_works(self):
 
96
        current_transport = self.get_transport()
 
97
        # import pdb;pdb.set_trace()
 
98
        self.assertTrue(current_transport.has('/'))
 
99
        root = current_transport.clone('/')
 
100
        self.assertTrue(root.has(''))
 
101
 
92
102
    def test_get(self):
93
103
        t = self.get_transport()
94
104
 
142
152
                             t.put, 'b', StringIO('file-like\ncontents\n'))
143
153
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
144
154
 
 
155
        self.assertRaises(NoSuchFile,
 
156
            self.applyDeprecated,
 
157
            zero_eleven,
 
158
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
159
 
145
160
    def test_put_bytes(self):
146
161
        t = self.get_transport()
147
162
 
243
258
        umask = osutils.get_umask()
244
259
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
245
260
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
261
 
 
262
        # We should also be able to set the mode for a parent directory
 
263
        # when it is created
 
264
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
265
                               dir_mode=0700, create_parent_dir=True)
 
266
        self.assertTransportMode(t, 'dir700', 0700)
 
267
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
268
                               dir_mode=0770, create_parent_dir=True)
 
269
        self.assertTransportMode(t, 'dir770', 0770)
 
270
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
271
                               dir_mode=0777, create_parent_dir=True)
 
272
        self.assertTransportMode(t, 'dir777', 0777)
246
273
        
247
274
    def test_put_file(self):
248
275
        t = self.get_transport()
352
379
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
353
380
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
354
381
        
 
382
        # We should also be able to set the mode for a parent directory
 
383
        # when it is created
 
384
        sio = StringIO()
 
385
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
386
                              dir_mode=0700, create_parent_dir=True)
 
387
        self.assertTransportMode(t, 'dir700', 0700)
 
388
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
389
                              dir_mode=0770, create_parent_dir=True)
 
390
        self.assertTransportMode(t, 'dir770', 0770)
 
391
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
392
                              dir_mode=0777, create_parent_dir=True)
 
393
        self.assertTransportMode(t, 'dir777', 0777)
 
394
 
355
395
    def test_put_multi(self):
356
396
        t = self.get_transport()
357
397
 
373
413
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
374
414
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
375
415
 
 
416
    def test_put_permissions(self):
 
417
        t = self.get_transport()
 
418
 
 
419
        if t.is_readonly():
 
420
            return
 
421
        if not t._can_roundtrip_unix_modebits():
 
422
            # Can't roundtrip, so no need to run this test
 
423
            return
 
424
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
 
425
                             StringIO('test text\n'), mode=0644)
 
426
        self.assertTransportMode(t, 'mode644', 0644)
 
427
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
 
428
                             StringIO('test text\n'), mode=0666)
 
429
        self.assertTransportMode(t, 'mode666', 0666)
 
430
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
 
431
                             StringIO('test text\n'), mode=0600)
 
432
        self.assertTransportMode(t, 'mode600', 0600)
 
433
        # Yes, you can put a file such that it becomes readonly
 
434
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
 
435
                             StringIO('test text\n'), mode=0400)
 
436
        self.assertTransportMode(t, 'mode400', 0400)
 
437
        self.applyDeprecated(zero_eleven, t.put_multi,
 
438
                             [('mmode644', StringIO('text\n'))], mode=0644)
 
439
        self.assertTransportMode(t, 'mmode644', 0644)
 
440
 
 
441
        # The default permissions should be based on the current umask
 
442
        umask = osutils.get_umask()
 
443
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
 
444
                             StringIO('test text\n'), mode=None)
 
445
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
446
        
376
447
    def test_mkdir(self):
377
448
        t = self.get_transport()
378
449
 
724
795
        t.mkdir('adir/asubdir')
725
796
        t.mkdir('bdir')
726
797
        t.mkdir('bdir/bsubdir')
 
798
        # any kind of PathError would be OK, though we normally expect
 
799
        # DirectoryNotEmpty
727
800
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
728
801
        # nothing was changed so it should still be as before
729
802
        self.assertTrue(t.has('bdir/bsubdir'))
814
887
        # TODO: test copy_multi
815
888
 
816
889
    def test_connection_error(self):
817
 
        """ConnectionError is raised when connection is impossible"""
 
890
        """ConnectionError is raised when connection is impossible.
 
891
        
 
892
        The error may be raised from either the constructor or the first
 
893
        operation on the transport.
 
894
        """
818
895
        try:
819
896
            url = self._server.get_bogus_url()
820
897
        except NotImplementedError:
821
898
            raise TestSkipped("Transport %s has no bogus URL support." %
822
899
                              self._server.__class__)
 
900
        # This should be:  but SSH still connects on construction. No COOKIE!
 
901
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
823
902
        try:
824
903
            t = bzrlib.transport.get_transport(url)
825
904
            t.get('.bzr/branch')
965
1044
        self.failUnless(t2.has('d'))
966
1045
        self.failUnless(t3.has('b/d'))
967
1046
 
 
1047
    def test_clone_to_root(self):
 
1048
        orig_transport = self.get_transport()
 
1049
        # Repeatedly go up to a parent directory until we're at the root
 
1050
        # directory of this transport
 
1051
        root_transport = orig_transport
 
1052
        new_transport = root_transport.clone("..")
 
1053
        # as we are walking up directories, the path must be must be 
 
1054
        # growing less, except at the top
 
1055
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1056
            or new_transport.base == root_transport.base)
 
1057
        while new_transport.base != root_transport.base:
 
1058
            root_transport = new_transport
 
1059
            new_transport = root_transport.clone("..")
 
1060
            # as we are walking up directories, the path must be must be 
 
1061
            # growing less, except at the top
 
1062
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1063
                or new_transport.base == root_transport.base)
 
1064
 
 
1065
        # Cloning to "/" should take us to exactly the same location.
 
1066
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1067
        # the abspath of "/" from the original transport should be the same
 
1068
        # as the base at the root:
 
1069
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1070
 
 
1071
        # At the root, the URL must still end with / as its a directory
 
1072
        self.assertEqual(root_transport.base[-1], '/')
 
1073
 
 
1074
    def test_clone_from_root(self):
 
1075
        """At the root, cloning to a simple dir should just do string append."""
 
1076
        orig_transport = self.get_transport()
 
1077
        root_transport = orig_transport.clone('/')
 
1078
        self.assertEqual(root_transport.base + '.bzr/',
 
1079
            root_transport.clone('.bzr').base)
 
1080
 
 
1081
    def test_base_url(self):
 
1082
        t = self.get_transport()
 
1083
        self.assertEqual('/', t.base[-1])
 
1084
 
968
1085
    def test_relpath(self):
969
1086
        t = self.get_transport()
970
1087
        self.assertEqual('', t.relpath(t.base))
993
1110
        # specific test cases.
994
1111
        transport = self.get_transport()
995
1112
        
996
 
        # disabled because some transports might normalize urls in generating
997
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
998
1113
        self.assertEqual(transport.base + 'relpath',
999
1114
                         transport.abspath('relpath'))
1000
1115
 
 
1116
        # This should work without raising an error.
 
1117
        transport.abspath("/")
 
1118
 
 
1119
        # the abspath of "/" and "/foo/.." should result in the same location
 
1120
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1121
 
1001
1122
    def test_local_abspath(self):
1002
1123
        transport = self.get_transport()
1003
1124
        try:
1129
1250
        self.check_transport_contents('bar', transport2, 'foo')
1130
1251
 
1131
1252
    def test_lock_write(self):
 
1253
        """Test transport-level write locks.
 
1254
 
 
1255
        These are deprecated and transports may decline to support them.
 
1256
        """
1132
1257
        transport = self.get_transport()
1133
1258
        if transport.is_readonly():
1134
1259
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1135
1260
            return
1136
1261
        transport.put_bytes('lock', '')
1137
 
        lock = transport.lock_write('lock')
 
1262
        try:
 
1263
            lock = transport.lock_write('lock')
 
1264
        except TransportNotPossible:
 
1265
            return
1138
1266
        # TODO make this consistent on all platforms:
1139
1267
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1140
1268
        lock.unlock()
1141
1269
 
1142
1270
    def test_lock_read(self):
 
1271
        """Test transport-level read locks.
 
1272
 
 
1273
        These are deprecated and transports may decline to support them.
 
1274
        """
1143
1275
        transport = self.get_transport()
1144
1276
        if transport.is_readonly():
1145
1277
            file('lock', 'w').close()
1146
1278
        else:
1147
1279
            transport.put_bytes('lock', '')
1148
 
        lock = transport.lock_read('lock')
 
1280
        try:
 
1281
            lock = transport.lock_read('lock')
 
1282
        except TransportNotPossible:
 
1283
            return
1149
1284
        # TODO make this consistent on all platforms:
1150
1285
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1151
1286
        lock.unlock()
1175
1310
        self.assertEqual(d[1], (9, '9'))
1176
1311
        self.assertEqual(d[2], (0, '0'))
1177
1312
        self.assertEqual(d[3], (3, '34'))
 
1313
 
 
1314
    def test_get_smart_medium(self):
 
1315
        """All transports must either give a smart medium, or know they can't.
 
1316
        """
 
1317
        transport = self.get_transport()
 
1318
        try:
 
1319
            medium = transport.get_smart_medium()
 
1320
            self.assertIsInstance(medium, smart.SmartClientMedium)
 
1321
        except errors.NoSmartMedium:
 
1322
            # as long as we got it we're fine
 
1323
            pass
 
1324
 
 
1325
    def test_readv_short_read(self):
 
1326
        transport = self.get_transport()
 
1327
        if transport.is_readonly():
 
1328
            file('a', 'w').write('0123456789')
 
1329
        else:
 
1330
            transport.put_bytes('a', '01234567890')
 
1331
 
 
1332
        # This is intentionally reading off the end of the file
 
1333
        # since we are sure that it cannot get there
 
1334
        self.assertListRaises((errors.ShortReadvError, AssertionError),
 
1335
                              transport.readv, 'a', [(1,1), (8,10)])
 
1336
 
 
1337
        # This is trying to seek past the end of the file, it should
 
1338
        # also raise a special error
 
1339
        self.assertListRaises(errors.ShortReadvError,
 
1340
                              transport.readv, 'a', [(12,2)])