~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-10-05 21:15:13 UTC
  • mfrom: (5448.3.5 374700-Add-gnu-lsh-support)
  • Revision ID: pqm@pqm.ubuntu.com-20101005211513-whouyj5t7oo92gmq
(gz) Add support for GNU lsh as a secure shell client (Matthew Gordon)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
 
29
import unittest
29
30
 
30
31
from bzrlib import (
31
32
    errors,
32
33
    osutils,
33
 
    pyutils,
34
34
    tests,
35
 
    transport as _mod_transport,
36
35
    urlutils,
37
36
    )
38
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
 
41
                           LockError,
41
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
42
44
                           PathError,
43
45
                           TransportNotPossible,
44
46
                           )
45
47
from bzrlib.osutils import getcwd
46
48
from bzrlib.smart import medium
47
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
48
51
    TestSkipped,
49
52
    TestNotApplicable,
50
53
    multiply_tests,
53
56
from bzrlib.tests.test_transport import TestTransportImplementation
54
57
from bzrlib.transport import (
55
58
    ConnectedTransport,
56
 
    Transport,
 
59
    get_transport,
57
60
    _get_transport_modules,
58
61
    )
59
62
from bzrlib.transport.memory import MemoryTransport
60
 
from bzrlib.transport.remote import RemoteTransport
61
63
 
62
64
 
63
65
def get_transport_test_permutations(module):
76
78
    for module in _get_transport_modules():
77
79
        try:
78
80
            permutations = get_transport_test_permutations(
79
 
                pyutils.get_named_object(module))
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
80
82
            for (klass, server_factory) in permutations:
81
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
84
                    {"transport_class":klass,
100
102
 
101
103
    def setUp(self):
102
104
        super(TransportTests, self).setUp()
103
 
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
104
106
 
105
107
    def check_transport_contents(self, content, transport, relpath):
106
 
        """Check that transport.get_bytes(relpath) == content."""
107
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
108
110
 
109
111
    def test_ensure_base_missing(self):
110
112
        """.ensure_base() should create the directory if it doesn't exist"""
209
211
                    ]
210
212
        self.build_tree(files, transport=t, line_endings='binary')
211
213
        self.assertRaises(NoSuchFile, t.get, 'c')
212
 
        def iterate_and_close(func, *args):
213
 
            for f in func(*args):
214
 
                # We call f.read() here because things like paramiko actually
215
 
                # spawn a thread to prefetch the content, which we want to
216
 
                # consume before we close the handle.
217
 
                content = f.read()
218
 
                f.close()
219
 
        self.assertRaises(NoSuchFile, iterate_and_close,
220
 
                          t.get_multi, ['a', 'b', 'c'])
221
 
        self.assertRaises(NoSuchFile, iterate_and_close,
222
 
                          t.get_multi, iter(['a', 'b', 'c']))
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
223
216
 
224
217
    def test_get_directory_read_gives_ReadError(self):
225
218
        """consistent errors for read() on a file returned by get()."""
267
260
        handle = t.open_write_stream('foo')
268
261
        try:
269
262
            handle.write('b')
270
 
            self.assertEqual('b', t.get_bytes('foo'))
 
263
            self.assertEqual('b', t.get('foo').read())
271
264
        finally:
272
265
            handle.close()
273
266
 
279
272
        try:
280
273
            handle.write('b')
281
274
            self.assertEqual('b', t.get_bytes('foo'))
282
 
            f = t.get('foo')
283
 
            try:
284
 
                self.assertEqual('b', f.read())
285
 
            finally:
286
 
                f.close()
 
275
            self.assertEqual('b', t.get('foo').read())
287
276
        finally:
288
277
            handle.close()
289
278
 
296
285
            return
297
286
 
298
287
        t.put_bytes('a', 'some text for a\n')
299
 
        self.assertTrue(t.has('a'))
 
288
        self.failUnless(t.has('a'))
300
289
        self.check_transport_contents('some text for a\n', t, 'a')
301
290
 
302
291
        # The contents should be overwritten
314
303
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
304
            return
316
305
 
317
 
        self.assertFalse(t.has('a'))
 
306
        self.failIf(t.has('a'))
318
307
        t.put_bytes_non_atomic('a', 'some text for a\n')
319
 
        self.assertTrue(t.has('a'))
 
308
        self.failUnless(t.has('a'))
320
309
        self.check_transport_contents('some text for a\n', t, 'a')
321
310
        # Put also replaces contents
322
311
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
334
323
        # Now test the create_parent flag
335
324
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
336
325
                                       'contents\n')
337
 
        self.assertFalse(t.has('dir/a'))
 
326
        self.failIf(t.has('dir/a'))
338
327
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
328
                               create_parent_dir=True)
340
329
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
412
401
        result = t.put_file('a', StringIO('some text for a\n'))
413
402
        # put_file returns the length of the data written
414
403
        self.assertEqual(16, result)
415
 
        self.assertTrue(t.has('a'))
 
404
        self.failUnless(t.has('a'))
416
405
        self.check_transport_contents('some text for a\n', t, 'a')
417
406
        # Put also replaces contents
418
407
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
430
419
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
420
            return
432
421
 
433
 
        self.assertFalse(t.has('a'))
 
422
        self.failIf(t.has('a'))
434
423
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
 
        self.assertTrue(t.has('a'))
 
424
        self.failUnless(t.has('a'))
436
425
        self.check_transport_contents('some text for a\n', t, 'a')
437
426
        # Put also replaces contents
438
427
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
450
439
        # Now test the create_parent flag
451
440
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
441
                                       StringIO('contents\n'))
453
 
        self.assertFalse(t.has('dir/a'))
 
442
        self.failIf(t.has('dir/a'))
454
443
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
444
                              create_parent_dir=True)
456
445
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
520
509
        self.assertTransportMode(t, 'dir777', 0777)
521
510
 
522
511
    def test_put_bytes_unicode(self):
 
512
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
513
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
514
        # (we don't want to encode unicode here at all, callers should be
 
515
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
516
        # compatibility.  At some point we should use a specific exception.
 
517
        # See https://bugs.launchpad.net/bzr/+bug/106898.
523
518
        t = self.get_transport()
524
519
        if t.is_readonly():
525
520
            return
526
521
        unicode_string = u'\u1234'
527
 
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
522
        self.assertRaises(
 
523
            (AssertionError, UnicodeEncodeError),
 
524
            t.put_bytes, 'foo', unicode_string)
 
525
 
 
526
    def test_put_file_unicode(self):
 
527
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
528
        # This situation can happen (and has) if code is careless about the type
 
529
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
530
        # cStringIO, because it never returns unicode from read.
 
531
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
532
        # raise, but we raise it for hysterical raisins.
 
533
        t = self.get_transport()
 
534
        if t.is_readonly():
 
535
            return
 
536
        unicode_file = pyStringIO(u'\u1234')
 
537
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
528
538
 
529
539
    def test_mkdir(self):
530
540
        t = self.get_transport()
609
619
    def test_opening_a_file_stream_can_set_mode(self):
610
620
        t = self.get_transport()
611
621
        if t.is_readonly():
612
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
613
 
                              t.open_write_stream, 'foo')
614
622
            return
615
623
        if not t._can_roundtrip_unix_modebits():
616
624
            # Can't roundtrip, so no need to run this test
617
625
            return
618
 
 
619
626
        def check_mode(name, mode, expected):
620
627
            handle = t.open_write_stream(name, mode=mode)
621
628
            handle.close()
637
644
            self.build_tree(files, transport=transport_from)
638
645
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
639
646
            for f in files:
640
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
647
                self.check_transport_contents(transport_to.get(f).read(),
641
648
                                              transport_from, f)
642
649
 
643
650
        t = self.get_transport()
666
673
        files = ['a', 'b', 'c', 'd']
667
674
        t.copy_to(iter(files), temp_transport)
668
675
        for f in files:
669
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
676
            self.check_transport_contents(temp_transport.get(f).read(),
670
677
                                          t, f)
671
678
        del temp_transport
672
679
 
815
822
            return
816
823
 
817
824
        t.put_bytes('a', 'a little bit of text\n')
818
 
        self.assertTrue(t.has('a'))
 
825
        self.failUnless(t.has('a'))
819
826
        t.delete('a')
820
 
        self.assertFalse(t.has('a'))
 
827
        self.failIf(t.has('a'))
821
828
 
822
829
        self.assertRaises(NoSuchFile, t.delete, 'a')
823
830
 
829
836
        t.delete_multi(['a', 'c'])
830
837
        self.assertEqual([False, True, False],
831
838
                list(t.has_multi(['a', 'b', 'c'])))
832
 
        self.assertFalse(t.has('a'))
833
 
        self.assertTrue(t.has('b'))
834
 
        self.assertFalse(t.has('c'))
 
839
        self.failIf(t.has('a'))
 
840
        self.failUnless(t.has('b'))
 
841
        self.failIf(t.has('c'))
835
842
 
836
843
        self.assertRaises(NoSuchFile,
837
844
                t.delete_multi, ['a', 'b', 'c'])
898
905
        t.mkdir('foo-baz')
899
906
        t.rmdir('foo')
900
907
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
901
 
        self.assertTrue(t.has('foo-bar'))
 
908
        self.failUnless(t.has('foo-bar'))
902
909
 
903
910
    def test_rename_dir_succeeds(self):
904
911
        t = self.get_transport()
905
912
        if t.is_readonly():
906
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
907
 
                              t.rename, 'foo', 'bar')
908
 
            return
 
913
            raise TestSkipped("transport is readonly")
909
914
        t.mkdir('adir')
910
915
        t.mkdir('adir/asubdir')
911
916
        t.rename('adir', 'bdir')
916
921
        """Attempting to replace a nonemtpy directory should fail"""
917
922
        t = self.get_transport()
918
923
        if t.is_readonly():
919
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
920
 
                              t.rename, 'foo', 'bar')
921
 
            return
 
924
            raise TestSkipped("transport is readonly")
922
925
        t.mkdir('adir')
923
926
        t.mkdir('adir/asubdir')
924
927
        t.mkdir('bdir')
988
991
        # perhaps all of this could be done in a subdirectory
989
992
 
990
993
        t.put_bytes('a', 'a first file\n')
991
 
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
 
994
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
992
995
 
993
996
        t.move('a', 'b')
994
 
        self.assertTrue(t.has('b'))
995
 
        self.assertFalse(t.has('a'))
 
997
        self.failUnless(t.has('b'))
 
998
        self.failIf(t.has('a'))
996
999
 
997
1000
        self.check_transport_contents('a first file\n', t, 'b')
998
 
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
 
1001
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
999
1002
 
1000
1003
        # Overwrite a file
1001
1004
        t.put_bytes('c', 'c this file\n')
1002
1005
        t.move('c', 'b')
1003
 
        self.assertFalse(t.has('c'))
 
1006
        self.failIf(t.has('c'))
1004
1007
        self.check_transport_contents('c this file\n', t, 'b')
1005
1008
 
1006
1009
        # TODO: Try to write a test for atomicity
1038
1041
        except NotImplementedError:
1039
1042
            raise TestSkipped("Transport %s has no bogus URL support." %
1040
1043
                              self._server.__class__)
1041
 
        t = _mod_transport.get_transport_from_url(url)
 
1044
        t = get_transport(url)
1042
1045
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1043
1046
 
1044
1047
    def test_stat(self):
1060
1063
        for path, size in zip(paths, sizes):
1061
1064
            st = t.stat(path)
1062
1065
            if path.endswith('/'):
1063
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1066
                self.failUnless(S_ISDIR(st.st_mode))
1064
1067
                # directory sizes are meaningless
1065
1068
            else:
1066
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1069
                self.failUnless(S_ISREG(st.st_mode))
1067
1070
                self.assertEqual(size, st.st_size)
1068
1071
 
1069
1072
        remote_stats = list(t.stat_multi(paths))
1076
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1077
1080
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1078
1081
        subdir = t.clone('subdir')
1079
 
        st = subdir.stat('./file')
1080
 
        st = subdir.stat('.')
 
1082
        subdir.stat('./file')
 
1083
        subdir.stat('.')
1081
1084
 
1082
1085
    def test_hardlink(self):
1083
1086
        from stat import ST_NLINK
1092
1095
        try:
1093
1096
            t.hardlink(source_name, link_name)
1094
1097
 
1095
 
            self.assertTrue(t.has(source_name))
1096
 
            self.assertTrue(t.has(link_name))
 
1098
            self.failUnless(t.has(source_name))
 
1099
            self.failUnless(t.has(link_name))
1097
1100
 
1098
1101
            st = t.stat(link_name)
1099
 
            self.assertEqual(st[ST_NLINK], 2)
 
1102
            self.failUnlessEqual(st[ST_NLINK], 2)
1100
1103
        except TransportNotPossible:
1101
1104
            raise TestSkipped("Transport %s does not support hardlinks." %
1102
1105
                              self._server.__class__)
1114
1117
        try:
1115
1118
            t.symlink(source_name, link_name)
1116
1119
 
1117
 
            self.assertTrue(t.has(source_name))
1118
 
            self.assertTrue(t.has(link_name))
 
1120
            self.failUnless(t.has(source_name))
 
1121
            self.failUnless(t.has(link_name))
1119
1122
 
1120
1123
            st = t.stat(link_name)
1121
 
            self.assertTrue(S_ISLNK(st.st_mode),
 
1124
            self.failUnless(S_ISLNK(st.st_mode),
1122
1125
                "expected symlink, got mode %o" % st.st_mode)
1123
1126
        except TransportNotPossible:
1124
1127
            raise TestSkipped("Transport %s does not support symlinks." %
1125
1128
                              self._server.__class__)
1126
1129
        except IOError:
1127
 
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1130
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1128
1131
 
1129
1132
    def test_list_dir(self):
1130
1133
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1194
1197
            raise TestSkipped("not a connected transport")
1195
1198
 
1196
1199
        t2 = t1.clone('subdir')
1197
 
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
 
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
 
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
 
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
 
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1200
        self.assertEquals(t1._scheme, t2._scheme)
 
1201
        self.assertEquals(t1._user, t2._user)
 
1202
        self.assertEquals(t1._password, t2._password)
 
1203
        self.assertEquals(t1._host, t2._host)
 
1204
        self.assertEquals(t1._port, t2._port)
1202
1205
 
1203
1206
    def test__reuse_for(self):
1204
1207
        t = self.get_transport()
1211
1214
 
1212
1215
            Only the parameters different from None will be changed.
1213
1216
            """
1214
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1215
 
            if user     is None: user     = t._parsed_url.user
1216
 
            if password is None: password = t._parsed_url.password
1217
 
            if user     is None: user     = t._parsed_url.user
1218
 
            if host     is None: host     = t._parsed_url.host
1219
 
            if port     is None: port     = t._parsed_url.port
1220
 
            if path     is None: path     = t._parsed_url.path
1221
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1217
            if scheme   is None: scheme   = t._scheme
 
1218
            if user     is None: user     = t._user
 
1219
            if password is None: password = t._password
 
1220
            if user     is None: user     = t._user
 
1221
            if host     is None: host     = t._host
 
1222
            if port     is None: port     = t._port
 
1223
            if path     is None: path     = t._path
 
1224
            return t._unsplit_url(scheme, user, password, host, port, path)
1222
1225
 
1223
 
        if t._parsed_url.scheme == 'ftp':
 
1226
        if t._scheme == 'ftp':
1224
1227
            scheme = 'sftp'
1225
1228
        else:
1226
1229
            scheme = 'ftp'
1227
1230
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
 
        if t._parsed_url.user == 'me':
 
1231
        if t._user == 'me':
1229
1232
            user = 'you'
1230
1233
        else:
1231
1234
            user = 'me'
1242
1245
        #   (they may be typed by the user when prompted for example)
1243
1246
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1244
1247
        # We will not connect, we can use a invalid host
1245
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1246
 
        if t._parsed_url.port == 1234:
 
1248
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1249
        if t._port == 1234:
1247
1250
            port = 4321
1248
1251
        else:
1249
1252
            port = 1234
1290
1293
 
1291
1294
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1292
1295
 
1293
 
        self.assertTrue(t1.has('a'))
1294
 
        self.assertTrue(t1.has('b/c'))
1295
 
        self.assertFalse(t1.has('c'))
 
1296
        self.failUnless(t1.has('a'))
 
1297
        self.failUnless(t1.has('b/c'))
 
1298
        self.failIf(t1.has('c'))
1296
1299
 
1297
1300
        t2 = t1.clone('b')
1298
1301
        self.assertEqual(t1.base + 'b/', t2.base)
1299
1302
 
1300
 
        self.assertTrue(t2.has('c'))
1301
 
        self.assertFalse(t2.has('a'))
 
1303
        self.failUnless(t2.has('c'))
 
1304
        self.failIf(t2.has('a'))
1302
1305
 
1303
1306
        t3 = t2.clone('..')
1304
 
        self.assertTrue(t3.has('a'))
1305
 
        self.assertFalse(t3.has('c'))
 
1307
        self.failUnless(t3.has('a'))
 
1308
        self.failIf(t3.has('c'))
1306
1309
 
1307
 
        self.assertFalse(t1.has('b/d'))
1308
 
        self.assertFalse(t2.has('d'))
1309
 
        self.assertFalse(t3.has('b/d'))
 
1310
        self.failIf(t1.has('b/d'))
 
1311
        self.failIf(t2.has('d'))
 
1312
        self.failIf(t3.has('b/d'))
1310
1313
 
1311
1314
        if t1.is_readonly():
1312
1315
            self.build_tree_contents([('b/d', 'newfile\n')])
1313
1316
        else:
1314
1317
            t2.put_bytes('d', 'newfile\n')
1315
1318
 
1316
 
        self.assertTrue(t1.has('b/d'))
1317
 
        self.assertTrue(t2.has('d'))
1318
 
        self.assertTrue(t3.has('b/d'))
 
1319
        self.failUnless(t1.has('b/d'))
 
1320
        self.failUnless(t2.has('d'))
 
1321
        self.failUnless(t3.has('b/d'))
1319
1322
 
1320
1323
    def test_clone_to_root(self):
1321
1324
        orig_transport = self.get_transport()
1395
1398
        self.assertEqual(transport.clone("/").abspath('foo'),
1396
1399
                         transport.abspath("/foo"))
1397
1400
 
1398
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1399
1401
    def test_win32_abspath(self):
1400
1402
        # Note: we tried to set sys.platform='win32' so we could test on
1401
1403
        # other platforms too, but then osutils does platform specific
1406
1408
 
1407
1409
        # smoke test for abspath on win32.
1408
1410
        # a transport based on 'file:///' never fully qualifies the drive.
1409
 
        transport = _mod_transport.get_transport_from_url("file:///")
1410
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
        transport = get_transport("file:///")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1411
1413
 
1412
1414
        # but a transport that starts with a drive spec must keep it.
1413
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
        transport = get_transport("file:///C:/")
 
1416
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1415
1417
 
1416
1418
    def test_local_abspath(self):
1417
1419
        transport = self.get_transport()
1543
1545
 
1544
1546
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1545
1547
        if no_unicode_support:
1546
 
            self.knownFailure("test server cannot handle unicode paths")
 
1548
            raise tests.KnownFailure("test server cannot handle unicode paths")
1547
1549
 
1548
1550
        try:
1549
1551
            self.build_tree(files, transport=t, line_endings='binary')
1614
1616
    def test_readv(self):
1615
1617
        transport = self.get_transport()
1616
1618
        if transport.is_readonly():
1617
 
            with file('a', 'w') as f: f.write('0123456789')
 
1619
            file('a', 'w').write('0123456789')
1618
1620
        else:
1619
1621
            transport.put_bytes('a', '0123456789')
1620
1622
 
1630
1632
    def test_readv_out_of_order(self):
1631
1633
        transport = self.get_transport()
1632
1634
        if transport.is_readonly():
1633
 
            with file('a', 'w') as f: f.write('0123456789')
 
1635
            file('a', 'w').write('0123456789')
1634
1636
        else:
1635
1637
            transport.put_bytes('a', '01234567890')
1636
1638
 
1708
1710
        transport = self.get_transport()
1709
1711
        # test from observed failure case.
1710
1712
        if transport.is_readonly():
1711
 
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1713
            file('a', 'w').write('a'*1024*1024)
1712
1714
        else:
1713
1715
            transport.put_bytes('a', 'a'*1024*1024)
1714
1716
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1748
1750
    def test_readv_short_read(self):
1749
1751
        transport = self.get_transport()
1750
1752
        if transport.is_readonly():
1751
 
            with file('a', 'w') as f: f.write('0123456789')
 
1753
            file('a', 'w').write('0123456789')
1752
1754
        else:
1753
1755
            transport.put_bytes('a', '01234567890')
1754
1756
 
1764
1766
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1765
1767
                              transport.readv, 'a', [(12,2)])
1766
1768
 
1767
 
    def test_no_segment_parameters(self):
1768
 
        """Segment parameters should be stripped and stored in
1769
 
        transport.segment_parameters."""
1770
 
        transport = self.get_transport("foo")
1771
 
        self.assertEqual({}, transport.get_segment_parameters())
1772
 
 
1773
 
    def test_segment_parameters(self):
1774
 
        """Segment parameters should be stripped and stored in
1775
 
        transport.get_segment_parameters()."""
1776
 
        base_url = self._server.get_url()
1777
 
        parameters = {"key1": "val1", "key2": "val2"}
1778
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1779
 
        transport = _mod_transport.get_transport_from_url(url)
1780
 
        self.assertEqual(parameters, transport.get_segment_parameters())
1781
 
 
1782
 
    def test_set_segment_parameters(self):
1783
 
        """Segment parameters can be set and show up in base."""
1784
 
        transport = self.get_transport("foo")
1785
 
        orig_base = transport.base
1786
 
        transport.set_segment_parameter("arm", "board")
1787
 
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
 
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
 
        transport.set_segment_parameter("arm", None)
1790
 
        transport.set_segment_parameter("nonexistant", None)
1791
 
        self.assertEqual({}, transport.get_segment_parameters())
1792
 
        self.assertEqual(orig_base, transport.base)
1793
 
 
1794
1769
    def test_stat_symlink(self):
1795
1770
        # if a transport points directly to a symlink (and supports symlinks
1796
1771
        # at all) you can tell this.  helps with bug 32669.
1802
1777
        t2 = t.clone('link')
1803
1778
        st = t2.stat('')
1804
1779
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1805
 
 
1806
 
    def test_abspath_url_unquote_unreserved(self):
1807
 
        """URLs from abspath should have unreserved characters unquoted
1808
 
        
1809
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1810
 
        """
1811
 
        t = self.get_transport()
1812
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
 
        self.assertEqual(t.base + "-.09AZ_az~",
1814
 
            t.abspath(needlessly_escaped_dir))
1815
 
 
1816
 
    def test_clone_url_unquote_unreserved(self):
1817
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1818
 
        
1819
 
        Cloned transports should be prefix comparable for things like the
1820
 
        isolation checking of tests, see lp:842223 for more.
1821
 
        """
1822
 
        t1 = self.get_transport()
1823
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1825
 
        t2 = t1.clone(needlessly_escaped_dir)
1826
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1827
 
 
1828
 
    def test_hook_post_connection_one(self):
1829
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1830
 
        log = []
1831
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
 
        t = self.get_transport()
1833
 
        self.assertEqual([], log)
1834
 
        t.has("non-existant")
1835
 
        if isinstance(t, RemoteTransport):
1836
 
            self.assertEqual([t.get_smart_medium()], log)
1837
 
        elif isinstance(t, ConnectedTransport):
1838
 
            self.assertEqual([t], log)
1839
 
        else:
1840
 
            self.assertEqual([], log)
1841
 
 
1842
 
    def test_hook_post_connection_multi(self):
1843
 
        """Fire post_connect hook once per unshared underlying connection"""
1844
 
        log = []
1845
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
 
        t1 = self.get_transport()
1847
 
        t2 = t1.clone(".")
1848
 
        t3 = self.get_transport()
1849
 
        self.assertEqual([], log)
1850
 
        t1.has("x")
1851
 
        t2.has("x")
1852
 
        t3.has("x")
1853
 
        if isinstance(t1, RemoteTransport):
1854
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
 
        elif isinstance(t1, ConnectedTransport):
1856
 
            self.assertEqual([t1, t3], log)
1857
 
        else:
1858
 
            self.assertEqual([], log)