~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(vila) Forbid more operations on ReadonlyTransportDecorator (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
29
 
import unittest
30
29
 
31
30
from bzrlib import (
32
31
    errors,
33
32
    osutils,
 
33
    pyutils,
34
34
    tests,
 
35
    transport as _mod_transport,
35
36
    urlutils,
36
37
    )
37
38
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
41
 
                           LockError,
42
41
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
42
                           PathError,
45
43
                           TransportNotPossible,
46
44
                           )
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
56
53
from bzrlib.tests.test_transport import TestTransportImplementation
57
54
from bzrlib.transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
                    {"transport_class":klass,
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
211
209
                    ]
212
210
        self.build_tree(files, transport=t, line_endings='binary')
213
211
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
216
223
 
217
224
    def test_get_directory_read_gives_ReadError(self):
218
225
        """consistent errors for read() on a file returned by get()."""
260
267
        handle = t.open_write_stream('foo')
261
268
        try:
262
269
            handle.write('b')
263
 
            self.assertEqual('b', t.get('foo').read())
 
270
            self.assertEqual('b', t.get_bytes('foo'))
264
271
        finally:
265
272
            handle.close()
266
273
 
272
279
        try:
273
280
            handle.write('b')
274
281
            self.assertEqual('b', t.get_bytes('foo'))
275
 
            self.assertEqual('b', t.get('foo').read())
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
276
287
        finally:
277
288
            handle.close()
278
289
 
285
296
            return
286
297
 
287
298
        t.put_bytes('a', 'some text for a\n')
288
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
289
300
        self.check_transport_contents('some text for a\n', t, 'a')
290
301
 
291
302
        # The contents should be overwritten
303
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
304
315
            return
305
316
 
306
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
307
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
308
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
309
320
        self.check_transport_contents('some text for a\n', t, 'a')
310
321
        # Put also replaces contents
311
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
334
        # Now test the create_parent flag
324
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
325
336
                                       'contents\n')
326
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
327
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
339
                               create_parent_dir=True)
329
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
412
        result = t.put_file('a', StringIO('some text for a\n'))
402
413
        # put_file returns the length of the data written
403
414
        self.assertEqual(16, result)
404
 
        self.failUnless(t.has('a'))
 
415
        self.assertTrue(t.has('a'))
405
416
        self.check_transport_contents('some text for a\n', t, 'a')
406
417
        # Put also replaces contents
407
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
420
431
            return
421
432
 
422
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
423
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
425
436
        self.check_transport_contents('some text for a\n', t, 'a')
426
437
        # Put also replaces contents
427
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
450
        # Now test the create_parent flag
440
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
452
                                       StringIO('contents\n'))
442
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
443
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
455
                              create_parent_dir=True)
445
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
619
630
    def test_opening_a_file_stream_can_set_mode(self):
620
631
        t = self.get_transport()
621
632
        if t.is_readonly():
 
633
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
634
                              t.open_write_stream, 'foo')
622
635
            return
623
636
        if not t._can_roundtrip_unix_modebits():
624
637
            # Can't roundtrip, so no need to run this test
625
638
            return
 
639
 
626
640
        def check_mode(name, mode, expected):
627
641
            handle = t.open_write_stream(name, mode=mode)
628
642
            handle.close()
644
658
            self.build_tree(files, transport=transport_from)
645
659
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
646
660
            for f in files:
647
 
                self.check_transport_contents(transport_to.get(f).read(),
 
661
                self.check_transport_contents(transport_to.get_bytes(f),
648
662
                                              transport_from, f)
649
663
 
650
664
        t = self.get_transport()
673
687
        files = ['a', 'b', 'c', 'd']
674
688
        t.copy_to(iter(files), temp_transport)
675
689
        for f in files:
676
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
690
            self.check_transport_contents(temp_transport.get_bytes(f),
677
691
                                          t, f)
678
692
        del temp_transport
679
693
 
822
836
            return
823
837
 
824
838
        t.put_bytes('a', 'a little bit of text\n')
825
 
        self.failUnless(t.has('a'))
 
839
        self.assertTrue(t.has('a'))
826
840
        t.delete('a')
827
 
        self.failIf(t.has('a'))
 
841
        self.assertFalse(t.has('a'))
828
842
 
829
843
        self.assertRaises(NoSuchFile, t.delete, 'a')
830
844
 
836
850
        t.delete_multi(['a', 'c'])
837
851
        self.assertEqual([False, True, False],
838
852
                list(t.has_multi(['a', 'b', 'c'])))
839
 
        self.failIf(t.has('a'))
840
 
        self.failUnless(t.has('b'))
841
 
        self.failIf(t.has('c'))
 
853
        self.assertFalse(t.has('a'))
 
854
        self.assertTrue(t.has('b'))
 
855
        self.assertFalse(t.has('c'))
842
856
 
843
857
        self.assertRaises(NoSuchFile,
844
858
                t.delete_multi, ['a', 'b', 'c'])
905
919
        t.mkdir('foo-baz')
906
920
        t.rmdir('foo')
907
921
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
 
        self.failUnless(t.has('foo-bar'))
 
922
        self.assertTrue(t.has('foo-bar'))
909
923
 
910
924
    def test_rename_dir_succeeds(self):
911
925
        t = self.get_transport()
912
926
        if t.is_readonly():
913
 
            raise TestSkipped("transport is readonly")
 
927
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
928
                              t.rename, 'foo', 'bar')
 
929
            return
914
930
        t.mkdir('adir')
915
931
        t.mkdir('adir/asubdir')
916
932
        t.rename('adir', 'bdir')
921
937
        """Attempting to replace a nonemtpy directory should fail"""
922
938
        t = self.get_transport()
923
939
        if t.is_readonly():
924
 
            raise TestSkipped("transport is readonly")
 
940
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
941
                              t.rename, 'foo', 'bar')
 
942
            return
925
943
        t.mkdir('adir')
926
944
        t.mkdir('adir/asubdir')
927
945
        t.mkdir('bdir')
994
1012
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
995
1013
 
996
1014
        t.move('a', 'b')
997
 
        self.failUnless(t.has('b'))
998
 
        self.failIf(t.has('a'))
 
1015
        self.assertTrue(t.has('b'))
 
1016
        self.assertFalse(t.has('a'))
999
1017
 
1000
1018
        self.check_transport_contents('a first file\n', t, 'b')
1001
1019
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
1021
        # Overwrite a file
1004
1022
        t.put_bytes('c', 'c this file\n')
1005
1023
        t.move('c', 'b')
1006
 
        self.failIf(t.has('c'))
 
1024
        self.assertFalse(t.has('c'))
1007
1025
        self.check_transport_contents('c this file\n', t, 'b')
1008
1026
 
1009
1027
        # TODO: Try to write a test for atomicity
1041
1059
        except NotImplementedError:
1042
1060
            raise TestSkipped("Transport %s has no bogus URL support." %
1043
1061
                              self._server.__class__)
1044
 
        t = get_transport(url)
 
1062
        t = _mod_transport.get_transport_from_url(url)
1045
1063
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1046
1064
 
1047
1065
    def test_stat(self):
1063
1081
        for path, size in zip(paths, sizes):
1064
1082
            st = t.stat(path)
1065
1083
            if path.endswith('/'):
1066
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1084
                self.assertTrue(S_ISDIR(st.st_mode))
1067
1085
                # directory sizes are meaningless
1068
1086
            else:
1069
 
                self.failUnless(S_ISREG(st.st_mode))
 
1087
                self.assertTrue(S_ISREG(st.st_mode))
1070
1088
                self.assertEqual(size, st.st_size)
1071
1089
 
1072
1090
        remote_stats = list(t.stat_multi(paths))
1079
1097
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
1098
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
1099
        subdir = t.clone('subdir')
1082
 
        subdir.stat('./file')
1083
 
        subdir.stat('.')
 
1100
        st = subdir.stat('./file')
 
1101
        st = subdir.stat('.')
1084
1102
 
1085
1103
    def test_hardlink(self):
1086
1104
        from stat import ST_NLINK
1095
1113
        try:
1096
1114
            t.hardlink(source_name, link_name)
1097
1115
 
1098
 
            self.failUnless(t.has(source_name))
1099
 
            self.failUnless(t.has(link_name))
 
1116
            self.assertTrue(t.has(source_name))
 
1117
            self.assertTrue(t.has(link_name))
1100
1118
 
1101
1119
            st = t.stat(link_name)
1102
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1120
            self.assertEqual(st[ST_NLINK], 2)
1103
1121
        except TransportNotPossible:
1104
1122
            raise TestSkipped("Transport %s does not support hardlinks." %
1105
1123
                              self._server.__class__)
1117
1135
        try:
1118
1136
            t.symlink(source_name, link_name)
1119
1137
 
1120
 
            self.failUnless(t.has(source_name))
1121
 
            self.failUnless(t.has(link_name))
 
1138
            self.assertTrue(t.has(source_name))
 
1139
            self.assertTrue(t.has(link_name))
1122
1140
 
1123
1141
            st = t.stat(link_name)
1124
 
            self.failUnless(S_ISLNK(st.st_mode),
 
1142
            self.assertTrue(S_ISLNK(st.st_mode),
1125
1143
                "expected symlink, got mode %o" % st.st_mode)
1126
1144
        except TransportNotPossible:
1127
1145
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1146
                              self._server.__class__)
1129
1147
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1148
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1149
 
1132
1150
    def test_list_dir(self):
1133
1151
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1197
1215
            raise TestSkipped("not a connected transport")
1198
1216
 
1199
1217
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1218
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1219
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1220
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1221
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1222
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1205
1223
 
1206
1224
    def test__reuse_for(self):
1207
1225
        t = self.get_transport()
1214
1232
 
1215
1233
            Only the parameters different from None will be changed.
1216
1234
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1235
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1236
            if user     is None: user     = t._parsed_url.user
 
1237
            if password is None: password = t._parsed_url.password
 
1238
            if user     is None: user     = t._parsed_url.user
 
1239
            if host     is None: host     = t._parsed_url.host
 
1240
            if port     is None: port     = t._parsed_url.port
 
1241
            if path     is None: path     = t._parsed_url.path
 
1242
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1243
 
1226
 
        if t._scheme == 'ftp':
 
1244
        if t._parsed_url.scheme == 'ftp':
1227
1245
            scheme = 'sftp'
1228
1246
        else:
1229
1247
            scheme = 'ftp'
1230
1248
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1249
        if t._parsed_url.user == 'me':
1232
1250
            user = 'you'
1233
1251
        else:
1234
1252
            user = 'me'
1245
1263
        #   (they may be typed by the user when prompted for example)
1246
1264
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1265
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1266
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1267
        if t._parsed_url.port == 1234:
1250
1268
            port = 4321
1251
1269
        else:
1252
1270
            port = 1234
1293
1311
 
1294
1312
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1313
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1314
        self.assertTrue(t1.has('a'))
 
1315
        self.assertTrue(t1.has('b/c'))
 
1316
        self.assertFalse(t1.has('c'))
1299
1317
 
1300
1318
        t2 = t1.clone('b')
1301
1319
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1320
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1321
        self.assertTrue(t2.has('c'))
 
1322
        self.assertFalse(t2.has('a'))
1305
1323
 
1306
1324
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1325
        self.assertTrue(t3.has('a'))
 
1326
        self.assertFalse(t3.has('c'))
1309
1327
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1328
        self.assertFalse(t1.has('b/d'))
 
1329
        self.assertFalse(t2.has('d'))
 
1330
        self.assertFalse(t3.has('b/d'))
1313
1331
 
1314
1332
        if t1.is_readonly():
1315
1333
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1334
        else:
1317
1335
            t2.put_bytes('d', 'newfile\n')
1318
1336
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1337
        self.assertTrue(t1.has('b/d'))
 
1338
        self.assertTrue(t2.has('d'))
 
1339
        self.assertTrue(t3.has('b/d'))
1322
1340
 
1323
1341
    def test_clone_to_root(self):
1324
1342
        orig_transport = self.get_transport()
1398
1416
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1417
                         transport.abspath("/foo"))
1400
1418
 
 
1419
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1420
    def test_win32_abspath(self):
1402
1421
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1422
        # other platforms too, but then osutils does platform specific
1408
1427
 
1409
1428
        # smoke test for abspath on win32.
1410
1429
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1430
        transport = _mod_transport.get_transport_from_url("file:///")
 
1431
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1432
 
1414
1433
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1434
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1435
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1436
 
1418
1437
    def test_local_abspath(self):
1419
1438
        transport = self.get_transport()
1545
1564
 
1546
1565
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1566
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1567
            self.knownFailure("test server cannot handle unicode paths")
1549
1568
 
1550
1569
        try:
1551
1570
            self.build_tree(files, transport=t, line_endings='binary')
1616
1635
    def test_readv(self):
1617
1636
        transport = self.get_transport()
1618
1637
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1638
            with file('a', 'w') as f: f.write('0123456789')
1620
1639
        else:
1621
1640
            transport.put_bytes('a', '0123456789')
1622
1641
 
1632
1651
    def test_readv_out_of_order(self):
1633
1652
        transport = self.get_transport()
1634
1653
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1654
            with file('a', 'w') as f: f.write('0123456789')
1636
1655
        else:
1637
1656
            transport.put_bytes('a', '01234567890')
1638
1657
 
1710
1729
        transport = self.get_transport()
1711
1730
        # test from observed failure case.
1712
1731
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1732
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1733
        else:
1715
1734
            transport.put_bytes('a', 'a'*1024*1024)
1716
1735
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1769
    def test_readv_short_read(self):
1751
1770
        transport = self.get_transport()
1752
1771
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1772
            with file('a', 'w') as f: f.write('0123456789')
1754
1773
        else:
1755
1774
            transport.put_bytes('a', '01234567890')
1756
1775
 
1766
1785
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1786
                              transport.readv, 'a', [(12,2)])
1768
1787
 
 
1788
    def test_no_segment_parameters(self):
 
1789
        """Segment parameters should be stripped and stored in
 
1790
        transport.segment_parameters."""
 
1791
        transport = self.get_transport("foo")
 
1792
        self.assertEquals({}, transport.get_segment_parameters())
 
1793
 
 
1794
    def test_segment_parameters(self):
 
1795
        """Segment parameters should be stripped and stored in
 
1796
        transport.get_segment_parameters()."""
 
1797
        base_url = self._server.get_url()
 
1798
        parameters = {"key1": "val1", "key2": "val2"}
 
1799
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1800
        transport = _mod_transport.get_transport_from_url(url)
 
1801
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1802
 
 
1803
    def test_set_segment_parameters(self):
 
1804
        """Segment parameters can be set and show up in base."""
 
1805
        transport = self.get_transport("foo")
 
1806
        orig_base = transport.base
 
1807
        transport.set_segment_parameter("arm", "board")
 
1808
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
 
1809
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
 
1810
        transport.set_segment_parameter("arm", None)
 
1811
        transport.set_segment_parameter("nonexistant", None)
 
1812
        self.assertEquals({}, transport.get_segment_parameters())
 
1813
        self.assertEquals(orig_base, transport.base)
 
1814
 
1769
1815
    def test_stat_symlink(self):
1770
1816
        # if a transport points directly to a symlink (and supports symlinks
1771
1817
        # at all) you can tell this.  helps with bug 32669.
1777
1823
        t2 = t.clone('link')
1778
1824
        st = t2.stat('')
1779
1825
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1826
 
 
1827
    def test_abspath_url_unquote_unreserved(self):
 
1828
        """URLs from abspath should have unreserved characters unquoted
 
1829
        
 
1830
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1831
        """
 
1832
        t = self.get_transport()
 
1833
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1834
        self.assertEqual(t.base + "-.09AZ_az~",
 
1835
            t.abspath(needlessly_escaped_dir))
 
1836
 
 
1837
    def test_clone_url_unquote_unreserved(self):
 
1838
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1839
        
 
1840
        Cloned transports should be prefix comparable for things like the
 
1841
        isolation checking of tests, see lp:842223 for more.
 
1842
        """
 
1843
        t1 = self.get_transport()
 
1844
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1845
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1846
        t2 = t1.clone(needlessly_escaped_dir)
 
1847
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1848
 
 
1849
    def test_hook_post_connection_one(self):
 
1850
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1851
        log = []
 
1852
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1853
        t = self.get_transport()
 
1854
        self.assertEqual([], log)
 
1855
        t.has("non-existant")
 
1856
        if isinstance(t, RemoteTransport):
 
1857
            self.assertEqual([t.get_smart_medium()], log)
 
1858
        elif isinstance(t, ConnectedTransport):
 
1859
            self.assertEqual([t], log)
 
1860
        else:
 
1861
            self.assertEqual([], log)
 
1862
 
 
1863
    def test_hook_post_connection_multi(self):
 
1864
        """Fire post_connect hook once per unshared underlying connection"""
 
1865
        log = []
 
1866
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1867
        t1 = self.get_transport()
 
1868
        t2 = t1.clone(".")
 
1869
        t3 = self.get_transport()
 
1870
        self.assertEqual([], log)
 
1871
        t1.has("x")
 
1872
        t2.has("x")
 
1873
        t3.has("x")
 
1874
        if isinstance(t1, RemoteTransport):
 
1875
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1876
        elif isinstance(t1, ConnectedTransport):
 
1877
            self.assertEqual([t1, t3], log)
 
1878
        else:
 
1879
            self.assertEqual([], log)