~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Patch Queue Manager
  • Date: 2016-01-31 13:36:59 UTC
  • mfrom: (6613.1.5 1538480-match-hostname)
  • Revision ID: pqm@pqm.ubuntu.com-20160131133659-ouy92ee2wlv9xz8m
(vila) Use ssl.match_hostname instead of our own. (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
29
 
import unittest
30
29
 
31
30
from bzrlib import (
32
31
    errors,
33
32
    osutils,
 
33
    pyutils,
34
34
    tests,
 
35
    transport as _mod_transport,
35
36
    urlutils,
36
37
    )
37
38
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
41
 
                           LockError,
42
41
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
42
                           PathError,
45
43
                           TransportNotPossible,
46
44
                           )
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
56
53
from bzrlib.tests.test_transport import TestTransportImplementation
57
54
from bzrlib.transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
                    {"transport_class":klass,
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
211
209
                    ]
212
210
        self.build_tree(files, transport=t, line_endings='binary')
213
211
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
216
223
 
217
224
    def test_get_directory_read_gives_ReadError(self):
218
225
        """consistent errors for read() on a file returned by get()."""
260
267
        handle = t.open_write_stream('foo')
261
268
        try:
262
269
            handle.write('b')
263
 
            self.assertEqual('b', t.get('foo').read())
 
270
            self.assertEqual('b', t.get_bytes('foo'))
264
271
        finally:
265
272
            handle.close()
266
273
 
272
279
        try:
273
280
            handle.write('b')
274
281
            self.assertEqual('b', t.get_bytes('foo'))
275
 
            self.assertEqual('b', t.get('foo').read())
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
276
287
        finally:
277
288
            handle.close()
278
289
 
285
296
            return
286
297
 
287
298
        t.put_bytes('a', 'some text for a\n')
288
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
289
300
        self.check_transport_contents('some text for a\n', t, 'a')
290
301
 
291
302
        # The contents should be overwritten
303
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
304
315
            return
305
316
 
306
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
307
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
308
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
309
320
        self.check_transport_contents('some text for a\n', t, 'a')
310
321
        # Put also replaces contents
311
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
334
        # Now test the create_parent flag
324
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
325
336
                                       'contents\n')
326
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
327
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
339
                               create_parent_dir=True)
329
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
412
        result = t.put_file('a', StringIO('some text for a\n'))
402
413
        # put_file returns the length of the data written
403
414
        self.assertEqual(16, result)
404
 
        self.failUnless(t.has('a'))
 
415
        self.assertTrue(t.has('a'))
405
416
        self.check_transport_contents('some text for a\n', t, 'a')
406
417
        # Put also replaces contents
407
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
420
431
            return
421
432
 
422
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
423
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
425
436
        self.check_transport_contents('some text for a\n', t, 'a')
426
437
        # Put also replaces contents
427
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
450
        # Now test the create_parent flag
440
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
452
                                       StringIO('contents\n'))
442
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
443
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
455
                              create_parent_dir=True)
445
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
509
520
        self.assertTransportMode(t, 'dir777', 0777)
510
521
 
511
522
    def test_put_bytes_unicode(self):
512
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
513
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
514
 
        # (we don't want to encode unicode here at all, callers should be
515
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
516
 
        # compatibility.  At some point we should use a specific exception.
517
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
518
523
        t = self.get_transport()
519
524
        if t.is_readonly():
520
525
            return
521
526
        unicode_string = u'\u1234'
522
 
        self.assertRaises(
523
 
            (AssertionError, UnicodeEncodeError),
524
 
            t.put_bytes, 'foo', unicode_string)
525
 
 
526
 
    def test_put_file_unicode(self):
527
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
528
 
        # This situation can happen (and has) if code is careless about the type
529
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
530
 
        # cStringIO, because it never returns unicode from read.
531
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
532
 
        # raise, but we raise it for hysterical raisins.
533
 
        t = self.get_transport()
534
 
        if t.is_readonly():
535
 
            return
536
 
        unicode_file = pyStringIO(u'\u1234')
537
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
538
528
 
539
529
    def test_mkdir(self):
540
530
        t = self.get_transport()
619
609
    def test_opening_a_file_stream_can_set_mode(self):
620
610
        t = self.get_transport()
621
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
622
614
            return
623
615
        if not t._can_roundtrip_unix_modebits():
624
616
            # Can't roundtrip, so no need to run this test
625
617
            return
 
618
 
626
619
        def check_mode(name, mode, expected):
627
620
            handle = t.open_write_stream(name, mode=mode)
628
621
            handle.close()
644
637
            self.build_tree(files, transport=transport_from)
645
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
646
639
            for f in files:
647
 
                self.check_transport_contents(transport_to.get(f).read(),
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
648
641
                                              transport_from, f)
649
642
 
650
643
        t = self.get_transport()
673
666
        files = ['a', 'b', 'c', 'd']
674
667
        t.copy_to(iter(files), temp_transport)
675
668
        for f in files:
676
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
669
            self.check_transport_contents(temp_transport.get_bytes(f),
677
670
                                          t, f)
678
671
        del temp_transport
679
672
 
822
815
            return
823
816
 
824
817
        t.put_bytes('a', 'a little bit of text\n')
825
 
        self.failUnless(t.has('a'))
 
818
        self.assertTrue(t.has('a'))
826
819
        t.delete('a')
827
 
        self.failIf(t.has('a'))
 
820
        self.assertFalse(t.has('a'))
828
821
 
829
822
        self.assertRaises(NoSuchFile, t.delete, 'a')
830
823
 
836
829
        t.delete_multi(['a', 'c'])
837
830
        self.assertEqual([False, True, False],
838
831
                list(t.has_multi(['a', 'b', 'c'])))
839
 
        self.failIf(t.has('a'))
840
 
        self.failUnless(t.has('b'))
841
 
        self.failIf(t.has('c'))
 
832
        self.assertFalse(t.has('a'))
 
833
        self.assertTrue(t.has('b'))
 
834
        self.assertFalse(t.has('c'))
842
835
 
843
836
        self.assertRaises(NoSuchFile,
844
837
                t.delete_multi, ['a', 'b', 'c'])
905
898
        t.mkdir('foo-baz')
906
899
        t.rmdir('foo')
907
900
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
 
        self.failUnless(t.has('foo-bar'))
 
901
        self.assertTrue(t.has('foo-bar'))
909
902
 
910
903
    def test_rename_dir_succeeds(self):
911
904
        t = self.get_transport()
912
905
        if t.is_readonly():
913
 
            raise TestSkipped("transport is readonly")
 
906
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
907
                              t.rename, 'foo', 'bar')
 
908
            return
914
909
        t.mkdir('adir')
915
910
        t.mkdir('adir/asubdir')
916
911
        t.rename('adir', 'bdir')
921
916
        """Attempting to replace a nonemtpy directory should fail"""
922
917
        t = self.get_transport()
923
918
        if t.is_readonly():
924
 
            raise TestSkipped("transport is readonly")
 
919
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
920
                              t.rename, 'foo', 'bar')
 
921
            return
925
922
        t.mkdir('adir')
926
923
        t.mkdir('adir/asubdir')
927
924
        t.mkdir('bdir')
994
991
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
995
992
 
996
993
        t.move('a', 'b')
997
 
        self.failUnless(t.has('b'))
998
 
        self.failIf(t.has('a'))
 
994
        self.assertTrue(t.has('b'))
 
995
        self.assertFalse(t.has('a'))
999
996
 
1000
997
        self.check_transport_contents('a first file\n', t, 'b')
1001
998
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
1000
        # Overwrite a file
1004
1001
        t.put_bytes('c', 'c this file\n')
1005
1002
        t.move('c', 'b')
1006
 
        self.failIf(t.has('c'))
 
1003
        self.assertFalse(t.has('c'))
1007
1004
        self.check_transport_contents('c this file\n', t, 'b')
1008
1005
 
1009
1006
        # TODO: Try to write a test for atomicity
1041
1038
        except NotImplementedError:
1042
1039
            raise TestSkipped("Transport %s has no bogus URL support." %
1043
1040
                              self._server.__class__)
1044
 
        t = get_transport(url)
 
1041
        t = _mod_transport.get_transport_from_url(url)
1045
1042
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1046
1043
 
1047
1044
    def test_stat(self):
1063
1060
        for path, size in zip(paths, sizes):
1064
1061
            st = t.stat(path)
1065
1062
            if path.endswith('/'):
1066
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1063
                self.assertTrue(S_ISDIR(st.st_mode))
1067
1064
                # directory sizes are meaningless
1068
1065
            else:
1069
 
                self.failUnless(S_ISREG(st.st_mode))
 
1066
                self.assertTrue(S_ISREG(st.st_mode))
1070
1067
                self.assertEqual(size, st.st_size)
1071
1068
 
1072
1069
        remote_stats = list(t.stat_multi(paths))
1079
1076
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
1077
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
1078
        subdir = t.clone('subdir')
1082
 
        subdir.stat('./file')
1083
 
        subdir.stat('.')
 
1079
        st = subdir.stat('./file')
 
1080
        st = subdir.stat('.')
1084
1081
 
1085
1082
    def test_hardlink(self):
1086
1083
        from stat import ST_NLINK
1095
1092
        try:
1096
1093
            t.hardlink(source_name, link_name)
1097
1094
 
1098
 
            self.failUnless(t.has(source_name))
1099
 
            self.failUnless(t.has(link_name))
 
1095
            self.assertTrue(t.has(source_name))
 
1096
            self.assertTrue(t.has(link_name))
1100
1097
 
1101
1098
            st = t.stat(link_name)
1102
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1099
            self.assertEqual(st[ST_NLINK], 2)
1103
1100
        except TransportNotPossible:
1104
1101
            raise TestSkipped("Transport %s does not support hardlinks." %
1105
1102
                              self._server.__class__)
1117
1114
        try:
1118
1115
            t.symlink(source_name, link_name)
1119
1116
 
1120
 
            self.failUnless(t.has(source_name))
1121
 
            self.failUnless(t.has(link_name))
 
1117
            self.assertTrue(t.has(source_name))
 
1118
            self.assertTrue(t.has(link_name))
1122
1119
 
1123
1120
            st = t.stat(link_name)
1124
 
            self.failUnless(S_ISLNK(st.st_mode),
 
1121
            self.assertTrue(S_ISLNK(st.st_mode),
1125
1122
                "expected symlink, got mode %o" % st.st_mode)
1126
1123
        except TransportNotPossible:
1127
1124
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1125
                              self._server.__class__)
1129
1126
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1127
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1128
 
1132
1129
    def test_list_dir(self):
1133
1130
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1197
1194
            raise TestSkipped("not a connected transport")
1198
1195
 
1199
1196
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1197
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1205
1202
 
1206
1203
    def test__reuse_for(self):
1207
1204
        t = self.get_transport()
1214
1211
 
1215
1212
            Only the parameters different from None will be changed.
1216
1213
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1222
 
1226
 
        if t._scheme == 'ftp':
 
1223
        if t._parsed_url.scheme == 'ftp':
1227
1224
            scheme = 'sftp'
1228
1225
        else:
1229
1226
            scheme = 'ftp'
1230
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1228
        if t._parsed_url.user == 'me':
1232
1229
            user = 'you'
1233
1230
        else:
1234
1231
            user = 'me'
1245
1242
        #   (they may be typed by the user when prompted for example)
1246
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1244
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
1250
1247
            port = 4321
1251
1248
        else:
1252
1249
            port = 1234
1293
1290
 
1294
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1292
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
1299
1296
 
1300
1297
        t2 = t1.clone('b')
1301
1298
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1299
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
1305
1302
 
1306
1303
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
1309
1306
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
1313
1310
 
1314
1311
        if t1.is_readonly():
1315
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1313
        else:
1317
1314
            t2.put_bytes('d', 'newfile\n')
1318
1315
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
1322
1319
 
1323
1320
    def test_clone_to_root(self):
1324
1321
        orig_transport = self.get_transport()
1398
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1396
                         transport.abspath("/foo"))
1400
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1399
    def test_win32_abspath(self):
1402
1400
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1401
        # other platforms too, but then osutils does platform specific
1408
1406
 
1409
1407
        # smoke test for abspath on win32.
1410
1408
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1411
 
1414
1412
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1415
 
1418
1416
    def test_local_abspath(self):
1419
1417
        transport = self.get_transport()
1545
1543
 
1546
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1545
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1546
            self.knownFailure("test server cannot handle unicode paths")
1549
1547
 
1550
1548
        try:
1551
1549
            self.build_tree(files, transport=t, line_endings='binary')
1616
1614
    def test_readv(self):
1617
1615
        transport = self.get_transport()
1618
1616
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1617
            with file('a', 'w') as f: f.write('0123456789')
1620
1618
        else:
1621
1619
            transport.put_bytes('a', '0123456789')
1622
1620
 
1632
1630
    def test_readv_out_of_order(self):
1633
1631
        transport = self.get_transport()
1634
1632
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1633
            with file('a', 'w') as f: f.write('0123456789')
1636
1634
        else:
1637
1635
            transport.put_bytes('a', '01234567890')
1638
1636
 
1710
1708
        transport = self.get_transport()
1711
1709
        # test from observed failure case.
1712
1710
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1712
        else:
1715
1713
            transport.put_bytes('a', 'a'*1024*1024)
1716
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1748
    def test_readv_short_read(self):
1751
1749
        transport = self.get_transport()
1752
1750
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1751
            with file('a', 'w') as f: f.write('0123456789')
1754
1752
        else:
1755
1753
            transport.put_bytes('a', '01234567890')
1756
1754
 
1766
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1765
                              transport.readv, 'a', [(12,2)])
1768
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEquals({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEquals({}, transport.get_segment_parameters())
 
1792
        self.assertEquals(orig_base, transport.base)
 
1793
 
1769
1794
    def test_stat_symlink(self):
1770
1795
        # if a transport points directly to a symlink (and supports symlinks
1771
1796
        # at all) you can tell this.  helps with bug 32669.
1777
1802
        t2 = t.clone('link')
1778
1803
        st = t2.stat('')
1779
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)