~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-01 08:02:42 UTC
  • mfrom: (5390.3.3 faster-revert-593560)
  • Revision ID: pqm@pqm.ubuntu.com-20100901080242-esg62ody4frwmy66
(spiv) Avoid repeatedly calling self.target.all_file_ids() in
 InterTree.iter_changes. (Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
 
29
import unittest
29
30
 
30
31
from bzrlib import (
31
32
    errors,
32
33
    osutils,
33
 
    pyutils,
34
34
    tests,
35
 
    transport as _mod_transport,
36
35
    urlutils,
37
36
    )
38
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
 
41
                           LockError,
41
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
42
44
                           PathError,
43
45
                           TransportNotPossible,
44
46
                           )
45
47
from bzrlib.osutils import getcwd
46
48
from bzrlib.smart import medium
47
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
48
51
    TestSkipped,
49
52
    TestNotApplicable,
50
53
    multiply_tests,
53
56
from bzrlib.tests.test_transport import TestTransportImplementation
54
57
from bzrlib.transport import (
55
58
    ConnectedTransport,
56
 
    Transport,
 
59
    get_transport,
57
60
    _get_transport_modules,
58
61
    )
59
62
from bzrlib.transport.memory import MemoryTransport
60
 
from bzrlib.transport.remote import RemoteTransport
61
63
 
62
64
 
63
65
def get_transport_test_permutations(module):
76
78
    for module in _get_transport_modules():
77
79
        try:
78
80
            permutations = get_transport_test_permutations(
79
 
                pyutils.get_named_object(module))
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
80
82
            for (klass, server_factory) in permutations:
81
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
84
                    {"transport_class":klass,
100
102
 
101
103
    def setUp(self):
102
104
        super(TransportTests, self).setUp()
103
 
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
104
106
 
105
107
    def check_transport_contents(self, content, transport, relpath):
106
 
        """Check that transport.get_bytes(relpath) == content."""
107
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
108
110
 
109
111
    def test_ensure_base_missing(self):
110
112
        """.ensure_base() should create the directory if it doesn't exist"""
209
211
                    ]
210
212
        self.build_tree(files, transport=t, line_endings='binary')
211
213
        self.assertRaises(NoSuchFile, t.get, 'c')
212
 
        def iterate_and_close(func, *args):
213
 
            for f in func(*args):
214
 
                # We call f.read() here because things like paramiko actually
215
 
                # spawn a thread to prefetch the content, which we want to
216
 
                # consume before we close the handle.
217
 
                content = f.read()
218
 
                f.close()
219
 
        self.assertRaises(NoSuchFile, iterate_and_close,
220
 
                          t.get_multi, ['a', 'b', 'c'])
221
 
        self.assertRaises(NoSuchFile, iterate_and_close,
222
 
                          t.get_multi, iter(['a', 'b', 'c']))
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
223
216
 
224
217
    def test_get_directory_read_gives_ReadError(self):
225
218
        """consistent errors for read() on a file returned by get()."""
267
260
        handle = t.open_write_stream('foo')
268
261
        try:
269
262
            handle.write('b')
270
 
            self.assertEqual('b', t.get_bytes('foo'))
 
263
            self.assertEqual('b', t.get('foo').read())
271
264
        finally:
272
265
            handle.close()
273
266
 
279
272
        try:
280
273
            handle.write('b')
281
274
            self.assertEqual('b', t.get_bytes('foo'))
282
 
            f = t.get('foo')
283
 
            try:
284
 
                self.assertEqual('b', f.read())
285
 
            finally:
286
 
                f.close()
 
275
            self.assertEqual('b', t.get('foo').read())
287
276
        finally:
288
277
            handle.close()
289
278
 
296
285
            return
297
286
 
298
287
        t.put_bytes('a', 'some text for a\n')
299
 
        self.assertTrue(t.has('a'))
 
288
        self.failUnless(t.has('a'))
300
289
        self.check_transport_contents('some text for a\n', t, 'a')
301
290
 
302
291
        # The contents should be overwritten
314
303
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
304
            return
316
305
 
317
 
        self.assertFalse(t.has('a'))
 
306
        self.failIf(t.has('a'))
318
307
        t.put_bytes_non_atomic('a', 'some text for a\n')
319
 
        self.assertTrue(t.has('a'))
 
308
        self.failUnless(t.has('a'))
320
309
        self.check_transport_contents('some text for a\n', t, 'a')
321
310
        # Put also replaces contents
322
311
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
334
323
        # Now test the create_parent flag
335
324
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
336
325
                                       'contents\n')
337
 
        self.assertFalse(t.has('dir/a'))
 
326
        self.failIf(t.has('dir/a'))
338
327
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
328
                               create_parent_dir=True)
340
329
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
412
401
        result = t.put_file('a', StringIO('some text for a\n'))
413
402
        # put_file returns the length of the data written
414
403
        self.assertEqual(16, result)
415
 
        self.assertTrue(t.has('a'))
 
404
        self.failUnless(t.has('a'))
416
405
        self.check_transport_contents('some text for a\n', t, 'a')
417
406
        # Put also replaces contents
418
407
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
430
419
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
420
            return
432
421
 
433
 
        self.assertFalse(t.has('a'))
 
422
        self.failIf(t.has('a'))
434
423
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
 
        self.assertTrue(t.has('a'))
 
424
        self.failUnless(t.has('a'))
436
425
        self.check_transport_contents('some text for a\n', t, 'a')
437
426
        # Put also replaces contents
438
427
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
450
439
        # Now test the create_parent flag
451
440
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
441
                                       StringIO('contents\n'))
453
 
        self.assertFalse(t.has('dir/a'))
 
442
        self.failIf(t.has('dir/a'))
454
443
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
444
                              create_parent_dir=True)
456
445
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
630
619
    def test_opening_a_file_stream_can_set_mode(self):
631
620
        t = self.get_transport()
632
621
        if t.is_readonly():
633
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
634
 
                              t.open_write_stream, 'foo')
635
622
            return
636
623
        if not t._can_roundtrip_unix_modebits():
637
624
            # Can't roundtrip, so no need to run this test
638
625
            return
639
 
 
640
626
        def check_mode(name, mode, expected):
641
627
            handle = t.open_write_stream(name, mode=mode)
642
628
            handle.close()
658
644
            self.build_tree(files, transport=transport_from)
659
645
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
660
646
            for f in files:
661
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
647
                self.check_transport_contents(transport_to.get(f).read(),
662
648
                                              transport_from, f)
663
649
 
664
650
        t = self.get_transport()
687
673
        files = ['a', 'b', 'c', 'd']
688
674
        t.copy_to(iter(files), temp_transport)
689
675
        for f in files:
690
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
676
            self.check_transport_contents(temp_transport.get(f).read(),
691
677
                                          t, f)
692
678
        del temp_transport
693
679
 
836
822
            return
837
823
 
838
824
        t.put_bytes('a', 'a little bit of text\n')
839
 
        self.assertTrue(t.has('a'))
 
825
        self.failUnless(t.has('a'))
840
826
        t.delete('a')
841
 
        self.assertFalse(t.has('a'))
 
827
        self.failIf(t.has('a'))
842
828
 
843
829
        self.assertRaises(NoSuchFile, t.delete, 'a')
844
830
 
850
836
        t.delete_multi(['a', 'c'])
851
837
        self.assertEqual([False, True, False],
852
838
                list(t.has_multi(['a', 'b', 'c'])))
853
 
        self.assertFalse(t.has('a'))
854
 
        self.assertTrue(t.has('b'))
855
 
        self.assertFalse(t.has('c'))
 
839
        self.failIf(t.has('a'))
 
840
        self.failUnless(t.has('b'))
 
841
        self.failIf(t.has('c'))
856
842
 
857
843
        self.assertRaises(NoSuchFile,
858
844
                t.delete_multi, ['a', 'b', 'c'])
919
905
        t.mkdir('foo-baz')
920
906
        t.rmdir('foo')
921
907
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
922
 
        self.assertTrue(t.has('foo-bar'))
 
908
        self.failUnless(t.has('foo-bar'))
923
909
 
924
910
    def test_rename_dir_succeeds(self):
925
911
        t = self.get_transport()
926
912
        if t.is_readonly():
927
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
928
 
                              t.rename, 'foo', 'bar')
929
 
            return
 
913
            raise TestSkipped("transport is readonly")
930
914
        t.mkdir('adir')
931
915
        t.mkdir('adir/asubdir')
932
916
        t.rename('adir', 'bdir')
937
921
        """Attempting to replace a nonemtpy directory should fail"""
938
922
        t = self.get_transport()
939
923
        if t.is_readonly():
940
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
941
 
                              t.rename, 'foo', 'bar')
942
 
            return
 
924
            raise TestSkipped("transport is readonly")
943
925
        t.mkdir('adir')
944
926
        t.mkdir('adir/asubdir')
945
927
        t.mkdir('bdir')
1012
994
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1013
995
 
1014
996
        t.move('a', 'b')
1015
 
        self.assertTrue(t.has('b'))
1016
 
        self.assertFalse(t.has('a'))
 
997
        self.failUnless(t.has('b'))
 
998
        self.failIf(t.has('a'))
1017
999
 
1018
1000
        self.check_transport_contents('a first file\n', t, 'b')
1019
1001
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1021
1003
        # Overwrite a file
1022
1004
        t.put_bytes('c', 'c this file\n')
1023
1005
        t.move('c', 'b')
1024
 
        self.assertFalse(t.has('c'))
 
1006
        self.failIf(t.has('c'))
1025
1007
        self.check_transport_contents('c this file\n', t, 'b')
1026
1008
 
1027
1009
        # TODO: Try to write a test for atomicity
1059
1041
        except NotImplementedError:
1060
1042
            raise TestSkipped("Transport %s has no bogus URL support." %
1061
1043
                              self._server.__class__)
1062
 
        t = _mod_transport.get_transport_from_url(url)
 
1044
        t = get_transport(url)
1063
1045
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1064
1046
 
1065
1047
    def test_stat(self):
1081
1063
        for path, size in zip(paths, sizes):
1082
1064
            st = t.stat(path)
1083
1065
            if path.endswith('/'):
1084
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1066
                self.failUnless(S_ISDIR(st.st_mode))
1085
1067
                # directory sizes are meaningless
1086
1068
            else:
1087
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1069
                self.failUnless(S_ISREG(st.st_mode))
1088
1070
                self.assertEqual(size, st.st_size)
1089
1071
 
1090
1072
        remote_stats = list(t.stat_multi(paths))
1097
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1098
1080
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1099
1081
        subdir = t.clone('subdir')
1100
 
        st = subdir.stat('./file')
1101
 
        st = subdir.stat('.')
 
1082
        subdir.stat('./file')
 
1083
        subdir.stat('.')
1102
1084
 
1103
1085
    def test_hardlink(self):
1104
1086
        from stat import ST_NLINK
1113
1095
        try:
1114
1096
            t.hardlink(source_name, link_name)
1115
1097
 
1116
 
            self.assertTrue(t.has(source_name))
1117
 
            self.assertTrue(t.has(link_name))
 
1098
            self.failUnless(t.has(source_name))
 
1099
            self.failUnless(t.has(link_name))
1118
1100
 
1119
1101
            st = t.stat(link_name)
1120
 
            self.assertEqual(st[ST_NLINK], 2)
 
1102
            self.failUnlessEqual(st[ST_NLINK], 2)
1121
1103
        except TransportNotPossible:
1122
1104
            raise TestSkipped("Transport %s does not support hardlinks." %
1123
1105
                              self._server.__class__)
1135
1117
        try:
1136
1118
            t.symlink(source_name, link_name)
1137
1119
 
1138
 
            self.assertTrue(t.has(source_name))
1139
 
            self.assertTrue(t.has(link_name))
 
1120
            self.failUnless(t.has(source_name))
 
1121
            self.failUnless(t.has(link_name))
1140
1122
 
1141
1123
            st = t.stat(link_name)
1142
 
            self.assertTrue(S_ISLNK(st.st_mode),
 
1124
            self.failUnless(S_ISLNK(st.st_mode),
1143
1125
                "expected symlink, got mode %o" % st.st_mode)
1144
1126
        except TransportNotPossible:
1145
1127
            raise TestSkipped("Transport %s does not support symlinks." %
1146
1128
                              self._server.__class__)
1147
1129
        except IOError:
1148
 
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1130
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1149
1131
 
1150
1132
    def test_list_dir(self):
1151
1133
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1215
1197
            raise TestSkipped("not a connected transport")
1216
1198
 
1217
1199
        t2 = t1.clone('subdir')
1218
 
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1219
 
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1220
 
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1221
 
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1222
 
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
 
1200
        self.assertEquals(t1._scheme, t2._scheme)
 
1201
        self.assertEquals(t1._user, t2._user)
 
1202
        self.assertEquals(t1._password, t2._password)
 
1203
        self.assertEquals(t1._host, t2._host)
 
1204
        self.assertEquals(t1._port, t2._port)
1223
1205
 
1224
1206
    def test__reuse_for(self):
1225
1207
        t = self.get_transport()
1232
1214
 
1233
1215
            Only the parameters different from None will be changed.
1234
1216
            """
1235
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1236
 
            if user     is None: user     = t._parsed_url.user
1237
 
            if password is None: password = t._parsed_url.password
1238
 
            if user     is None: user     = t._parsed_url.user
1239
 
            if host     is None: host     = t._parsed_url.host
1240
 
            if port     is None: port     = t._parsed_url.port
1241
 
            if path     is None: path     = t._parsed_url.path
1242
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1217
            if scheme   is None: scheme   = t._scheme
 
1218
            if user     is None: user     = t._user
 
1219
            if password is None: password = t._password
 
1220
            if user     is None: user     = t._user
 
1221
            if host     is None: host     = t._host
 
1222
            if port     is None: port     = t._port
 
1223
            if path     is None: path     = t._path
 
1224
            return t._unsplit_url(scheme, user, password, host, port, path)
1243
1225
 
1244
 
        if t._parsed_url.scheme == 'ftp':
 
1226
        if t._scheme == 'ftp':
1245
1227
            scheme = 'sftp'
1246
1228
        else:
1247
1229
            scheme = 'ftp'
1248
1230
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1249
 
        if t._parsed_url.user == 'me':
 
1231
        if t._user == 'me':
1250
1232
            user = 'you'
1251
1233
        else:
1252
1234
            user = 'me'
1263
1245
        #   (they may be typed by the user when prompted for example)
1264
1246
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1265
1247
        # We will not connect, we can use a invalid host
1266
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1267
 
        if t._parsed_url.port == 1234:
 
1248
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1249
        if t._port == 1234:
1268
1250
            port = 4321
1269
1251
        else:
1270
1252
            port = 1234
1311
1293
 
1312
1294
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1313
1295
 
1314
 
        self.assertTrue(t1.has('a'))
1315
 
        self.assertTrue(t1.has('b/c'))
1316
 
        self.assertFalse(t1.has('c'))
 
1296
        self.failUnless(t1.has('a'))
 
1297
        self.failUnless(t1.has('b/c'))
 
1298
        self.failIf(t1.has('c'))
1317
1299
 
1318
1300
        t2 = t1.clone('b')
1319
1301
        self.assertEqual(t1.base + 'b/', t2.base)
1320
1302
 
1321
 
        self.assertTrue(t2.has('c'))
1322
 
        self.assertFalse(t2.has('a'))
 
1303
        self.failUnless(t2.has('c'))
 
1304
        self.failIf(t2.has('a'))
1323
1305
 
1324
1306
        t3 = t2.clone('..')
1325
 
        self.assertTrue(t3.has('a'))
1326
 
        self.assertFalse(t3.has('c'))
 
1307
        self.failUnless(t3.has('a'))
 
1308
        self.failIf(t3.has('c'))
1327
1309
 
1328
 
        self.assertFalse(t1.has('b/d'))
1329
 
        self.assertFalse(t2.has('d'))
1330
 
        self.assertFalse(t3.has('b/d'))
 
1310
        self.failIf(t1.has('b/d'))
 
1311
        self.failIf(t2.has('d'))
 
1312
        self.failIf(t3.has('b/d'))
1331
1313
 
1332
1314
        if t1.is_readonly():
1333
1315
            self.build_tree_contents([('b/d', 'newfile\n')])
1334
1316
        else:
1335
1317
            t2.put_bytes('d', 'newfile\n')
1336
1318
 
1337
 
        self.assertTrue(t1.has('b/d'))
1338
 
        self.assertTrue(t2.has('d'))
1339
 
        self.assertTrue(t3.has('b/d'))
 
1319
        self.failUnless(t1.has('b/d'))
 
1320
        self.failUnless(t2.has('d'))
 
1321
        self.failUnless(t3.has('b/d'))
1340
1322
 
1341
1323
    def test_clone_to_root(self):
1342
1324
        orig_transport = self.get_transport()
1416
1398
        self.assertEqual(transport.clone("/").abspath('foo'),
1417
1399
                         transport.abspath("/foo"))
1418
1400
 
1419
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1420
1401
    def test_win32_abspath(self):
1421
1402
        # Note: we tried to set sys.platform='win32' so we could test on
1422
1403
        # other platforms too, but then osutils does platform specific
1427
1408
 
1428
1409
        # smoke test for abspath on win32.
1429
1410
        # a transport based on 'file:///' never fully qualifies the drive.
1430
 
        transport = _mod_transport.get_transport_from_url("file:///")
1431
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
        transport = get_transport("file:///")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1432
1413
 
1433
1414
        # but a transport that starts with a drive spec must keep it.
1434
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1435
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
        transport = get_transport("file:///C:/")
 
1416
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1436
1417
 
1437
1418
    def test_local_abspath(self):
1438
1419
        transport = self.get_transport()
1564
1545
 
1565
1546
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1566
1547
        if no_unicode_support:
1567
 
            self.knownFailure("test server cannot handle unicode paths")
 
1548
            raise tests.KnownFailure("test server cannot handle unicode paths")
1568
1549
 
1569
1550
        try:
1570
1551
            self.build_tree(files, transport=t, line_endings='binary')
1635
1616
    def test_readv(self):
1636
1617
        transport = self.get_transport()
1637
1618
        if transport.is_readonly():
1638
 
            with file('a', 'w') as f: f.write('0123456789')
 
1619
            file('a', 'w').write('0123456789')
1639
1620
        else:
1640
1621
            transport.put_bytes('a', '0123456789')
1641
1622
 
1651
1632
    def test_readv_out_of_order(self):
1652
1633
        transport = self.get_transport()
1653
1634
        if transport.is_readonly():
1654
 
            with file('a', 'w') as f: f.write('0123456789')
 
1635
            file('a', 'w').write('0123456789')
1655
1636
        else:
1656
1637
            transport.put_bytes('a', '01234567890')
1657
1638
 
1729
1710
        transport = self.get_transport()
1730
1711
        # test from observed failure case.
1731
1712
        if transport.is_readonly():
1732
 
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1713
            file('a', 'w').write('a'*1024*1024)
1733
1714
        else:
1734
1715
            transport.put_bytes('a', 'a'*1024*1024)
1735
1716
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1769
1750
    def test_readv_short_read(self):
1770
1751
        transport = self.get_transport()
1771
1752
        if transport.is_readonly():
1772
 
            with file('a', 'w') as f: f.write('0123456789')
 
1753
            file('a', 'w').write('0123456789')
1773
1754
        else:
1774
1755
            transport.put_bytes('a', '01234567890')
1775
1756
 
1785
1766
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1786
1767
                              transport.readv, 'a', [(12,2)])
1787
1768
 
1788
 
    def test_no_segment_parameters(self):
1789
 
        """Segment parameters should be stripped and stored in
1790
 
        transport.segment_parameters."""
1791
 
        transport = self.get_transport("foo")
1792
 
        self.assertEquals({}, transport.get_segment_parameters())
1793
 
 
1794
 
    def test_segment_parameters(self):
1795
 
        """Segment parameters should be stripped and stored in
1796
 
        transport.get_segment_parameters()."""
1797
 
        base_url = self._server.get_url()
1798
 
        parameters = {"key1": "val1", "key2": "val2"}
1799
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1800
 
        transport = _mod_transport.get_transport_from_url(url)
1801
 
        self.assertEquals(parameters, transport.get_segment_parameters())
1802
 
 
1803
 
    def test_set_segment_parameters(self):
1804
 
        """Segment parameters can be set and show up in base."""
1805
 
        transport = self.get_transport("foo")
1806
 
        orig_base = transport.base
1807
 
        transport.set_segment_parameter("arm", "board")
1808
 
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
1809
 
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1810
 
        transport.set_segment_parameter("arm", None)
1811
 
        transport.set_segment_parameter("nonexistant", None)
1812
 
        self.assertEquals({}, transport.get_segment_parameters())
1813
 
        self.assertEquals(orig_base, transport.base)
1814
 
 
1815
1769
    def test_stat_symlink(self):
1816
1770
        # if a transport points directly to a symlink (and supports symlinks
1817
1771
        # at all) you can tell this.  helps with bug 32669.
1823
1777
        t2 = t.clone('link')
1824
1778
        st = t2.stat('')
1825
1779
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1826
 
 
1827
 
    def test_abspath_url_unquote_unreserved(self):
1828
 
        """URLs from abspath should have unreserved characters unquoted
1829
 
        
1830
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1831
 
        """
1832
 
        t = self.get_transport()
1833
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1834
 
        self.assertEqual(t.base + "-.09AZ_az~",
1835
 
            t.abspath(needlessly_escaped_dir))
1836
 
 
1837
 
    def test_clone_url_unquote_unreserved(self):
1838
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1839
 
        
1840
 
        Cloned transports should be prefix comparable for things like the
1841
 
        isolation checking of tests, see lp:842223 for more.
1842
 
        """
1843
 
        t1 = self.get_transport()
1844
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1845
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1846
 
        t2 = t1.clone(needlessly_escaped_dir)
1847
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1848
 
 
1849
 
    def test_hook_post_connection_one(self):
1850
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1851
 
        log = []
1852
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1853
 
        t = self.get_transport()
1854
 
        self.assertEqual([], log)
1855
 
        t.has("non-existant")
1856
 
        if isinstance(t, RemoteTransport):
1857
 
            self.assertEqual([t.get_smart_medium()], log)
1858
 
        elif isinstance(t, ConnectedTransport):
1859
 
            self.assertEqual([t], log)
1860
 
        else:
1861
 
            self.assertEqual([], log)
1862
 
 
1863
 
    def test_hook_post_connection_multi(self):
1864
 
        """Fire post_connect hook once per unshared underlying connection"""
1865
 
        log = []
1866
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1867
 
        t1 = self.get_transport()
1868
 
        t2 = t1.clone(".")
1869
 
        t3 = self.get_transport()
1870
 
        self.assertEqual([], log)
1871
 
        t1.has("x")
1872
 
        t2.has("x")
1873
 
        t3.has("x")
1874
 
        if isinstance(t1, RemoteTransport):
1875
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1876
 
        elif isinstance(t1, ConnectedTransport):
1877
 
            self.assertEqual([t1, t3], log)
1878
 
        else:
1879
 
            self.assertEqual([], log)