~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-08-24 23:20:14 UTC
  • mfrom: (5365.5.29 2.3-btree-chk-leaf)
  • Revision ID: pqm@pqm.ubuntu.com-20100824232014-nu9owzel2zym2jk2
(jam) Use a custom C type for CHK index entries, saves memory

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
 
29
import unittest
29
30
 
30
31
from bzrlib import (
31
32
    errors,
32
33
    osutils,
33
 
    pyutils,
34
34
    tests,
35
 
    transport as _mod_transport,
36
35
    urlutils,
37
36
    )
38
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
 
41
                           LockError,
41
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
42
44
                           PathError,
43
45
                           TransportNotPossible,
44
46
                           )
45
47
from bzrlib.osutils import getcwd
46
48
from bzrlib.smart import medium
47
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
48
51
    TestSkipped,
49
52
    TestNotApplicable,
50
53
    multiply_tests,
53
56
from bzrlib.tests.test_transport import TestTransportImplementation
54
57
from bzrlib.transport import (
55
58
    ConnectedTransport,
56
 
    Transport,
 
59
    get_transport,
57
60
    _get_transport_modules,
58
61
    )
59
62
from bzrlib.transport.memory import MemoryTransport
60
 
from bzrlib.transport.remote import RemoteTransport
61
63
 
62
64
 
63
65
def get_transport_test_permutations(module):
76
78
    for module in _get_transport_modules():
77
79
        try:
78
80
            permutations = get_transport_test_permutations(
79
 
                pyutils.get_named_object(module))
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
80
82
            for (klass, server_factory) in permutations:
81
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
84
                    {"transport_class":klass,
100
102
 
101
103
    def setUp(self):
102
104
        super(TransportTests, self).setUp()
103
 
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
104
106
 
105
107
    def check_transport_contents(self, content, transport, relpath):
106
 
        """Check that transport.get_bytes(relpath) == content."""
107
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
108
110
 
109
111
    def test_ensure_base_missing(self):
110
112
        """.ensure_base() should create the directory if it doesn't exist"""
209
211
                    ]
210
212
        self.build_tree(files, transport=t, line_endings='binary')
211
213
        self.assertRaises(NoSuchFile, t.get, 'c')
212
 
        def iterate_and_close(func, *args):
213
 
            for f in func(*args):
214
 
                # We call f.read() here because things like paramiko actually
215
 
                # spawn a thread to prefetch the content, which we want to
216
 
                # consume before we close the handle.
217
 
                content = f.read()
218
 
                f.close()
219
 
        self.assertRaises(NoSuchFile, iterate_and_close,
220
 
                          t.get_multi, ['a', 'b', 'c'])
221
 
        self.assertRaises(NoSuchFile, iterate_and_close,
222
 
                          t.get_multi, iter(['a', 'b', 'c']))
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
223
216
 
224
217
    def test_get_directory_read_gives_ReadError(self):
225
218
        """consistent errors for read() on a file returned by get()."""
258
251
 
259
252
    def test_get_bytes_unknown_file(self):
260
253
        t = self.get_transport()
 
254
 
261
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
262
256
 
263
257
    def test_get_with_open_write_stream_sees_all_content(self):
267
261
        handle = t.open_write_stream('foo')
268
262
        try:
269
263
            handle.write('b')
270
 
            self.assertEqual('b', t.get_bytes('foo'))
 
264
            self.assertEqual('b', t.get('foo').read())
271
265
        finally:
272
266
            handle.close()
273
267
 
279
273
        try:
280
274
            handle.write('b')
281
275
            self.assertEqual('b', t.get_bytes('foo'))
282
 
            f = t.get('foo')
283
 
            try:
284
 
                self.assertEqual('b', f.read())
285
 
            finally:
286
 
                f.close()
 
276
            self.assertEqual('b', t.get('foo').read())
287
277
        finally:
288
278
            handle.close()
289
279
 
296
286
            return
297
287
 
298
288
        t.put_bytes('a', 'some text for a\n')
299
 
        self.assertTrue(t.has('a'))
 
289
        self.failUnless(t.has('a'))
300
290
        self.check_transport_contents('some text for a\n', t, 'a')
301
291
 
302
292
        # The contents should be overwritten
314
304
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
305
            return
316
306
 
317
 
        self.assertFalse(t.has('a'))
 
307
        self.failIf(t.has('a'))
318
308
        t.put_bytes_non_atomic('a', 'some text for a\n')
319
 
        self.assertTrue(t.has('a'))
 
309
        self.failUnless(t.has('a'))
320
310
        self.check_transport_contents('some text for a\n', t, 'a')
321
311
        # Put also replaces contents
322
312
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
334
324
        # Now test the create_parent flag
335
325
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
336
326
                                       'contents\n')
337
 
        self.assertFalse(t.has('dir/a'))
 
327
        self.failIf(t.has('dir/a'))
338
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
329
                               create_parent_dir=True)
340
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
412
402
        result = t.put_file('a', StringIO('some text for a\n'))
413
403
        # put_file returns the length of the data written
414
404
        self.assertEqual(16, result)
415
 
        self.assertTrue(t.has('a'))
 
405
        self.failUnless(t.has('a'))
416
406
        self.check_transport_contents('some text for a\n', t, 'a')
417
407
        # Put also replaces contents
418
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
430
420
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
421
            return
432
422
 
433
 
        self.assertFalse(t.has('a'))
 
423
        self.failIf(t.has('a'))
434
424
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
 
        self.assertTrue(t.has('a'))
 
425
        self.failUnless(t.has('a'))
436
426
        self.check_transport_contents('some text for a\n', t, 'a')
437
427
        # Put also replaces contents
438
428
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
450
440
        # Now test the create_parent flag
451
441
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
442
                                       StringIO('contents\n'))
453
 
        self.assertFalse(t.has('dir/a'))
 
443
        self.failIf(t.has('dir/a'))
454
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
445
                              create_parent_dir=True)
456
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
520
510
        self.assertTransportMode(t, 'dir777', 0777)
521
511
 
522
512
    def test_put_bytes_unicode(self):
 
513
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
514
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
515
        # (we don't want to encode unicode here at all, callers should be
 
516
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
517
        # compatibility.  At some point we should use a specific exception.
 
518
        # See https://bugs.launchpad.net/bzr/+bug/106898.
523
519
        t = self.get_transport()
524
520
        if t.is_readonly():
525
521
            return
526
522
        unicode_string = u'\u1234'
527
 
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
523
        self.assertRaises(
 
524
            (AssertionError, UnicodeEncodeError),
 
525
            t.put_bytes, 'foo', unicode_string)
 
526
 
 
527
    def test_put_file_unicode(self):
 
528
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
529
        # This situation can happen (and has) if code is careless about the type
 
530
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
531
        # cStringIO, because it never returns unicode from read.
 
532
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
533
        # raise, but we raise it for hysterical raisins.
 
534
        t = self.get_transport()
 
535
        if t.is_readonly():
 
536
            return
 
537
        unicode_file = pyStringIO(u'\u1234')
 
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
528
539
 
529
540
    def test_mkdir(self):
530
541
        t = self.get_transport()
609
620
    def test_opening_a_file_stream_can_set_mode(self):
610
621
        t = self.get_transport()
611
622
        if t.is_readonly():
612
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
613
 
                              t.open_write_stream, 'foo')
614
623
            return
615
624
        if not t._can_roundtrip_unix_modebits():
616
625
            # Can't roundtrip, so no need to run this test
617
626
            return
618
 
 
619
627
        def check_mode(name, mode, expected):
620
628
            handle = t.open_write_stream(name, mode=mode)
621
629
            handle.close()
637
645
            self.build_tree(files, transport=transport_from)
638
646
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
639
647
            for f in files:
640
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
648
                self.check_transport_contents(transport_to.get(f).read(),
641
649
                                              transport_from, f)
642
650
 
643
651
        t = self.get_transport()
666
674
        files = ['a', 'b', 'c', 'd']
667
675
        t.copy_to(iter(files), temp_transport)
668
676
        for f in files:
669
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
677
            self.check_transport_contents(temp_transport.get(f).read(),
670
678
                                          t, f)
671
679
        del temp_transport
672
680
 
815
823
            return
816
824
 
817
825
        t.put_bytes('a', 'a little bit of text\n')
818
 
        self.assertTrue(t.has('a'))
 
826
        self.failUnless(t.has('a'))
819
827
        t.delete('a')
820
 
        self.assertFalse(t.has('a'))
 
828
        self.failIf(t.has('a'))
821
829
 
822
830
        self.assertRaises(NoSuchFile, t.delete, 'a')
823
831
 
829
837
        t.delete_multi(['a', 'c'])
830
838
        self.assertEqual([False, True, False],
831
839
                list(t.has_multi(['a', 'b', 'c'])))
832
 
        self.assertFalse(t.has('a'))
833
 
        self.assertTrue(t.has('b'))
834
 
        self.assertFalse(t.has('c'))
 
840
        self.failIf(t.has('a'))
 
841
        self.failUnless(t.has('b'))
 
842
        self.failIf(t.has('c'))
835
843
 
836
844
        self.assertRaises(NoSuchFile,
837
845
                t.delete_multi, ['a', 'b', 'c'])
898
906
        t.mkdir('foo-baz')
899
907
        t.rmdir('foo')
900
908
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
901
 
        self.assertTrue(t.has('foo-bar'))
 
909
        self.failUnless(t.has('foo-bar'))
902
910
 
903
911
    def test_rename_dir_succeeds(self):
904
912
        t = self.get_transport()
905
913
        if t.is_readonly():
906
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
907
 
                              t.rename, 'foo', 'bar')
908
 
            return
 
914
            raise TestSkipped("transport is readonly")
909
915
        t.mkdir('adir')
910
916
        t.mkdir('adir/asubdir')
911
917
        t.rename('adir', 'bdir')
916
922
        """Attempting to replace a nonemtpy directory should fail"""
917
923
        t = self.get_transport()
918
924
        if t.is_readonly():
919
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
920
 
                              t.rename, 'foo', 'bar')
921
 
            return
 
925
            raise TestSkipped("transport is readonly")
922
926
        t.mkdir('adir')
923
927
        t.mkdir('adir/asubdir')
924
928
        t.mkdir('bdir')
988
992
        # perhaps all of this could be done in a subdirectory
989
993
 
990
994
        t.put_bytes('a', 'a first file\n')
991
 
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
 
995
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
992
996
 
993
997
        t.move('a', 'b')
994
 
        self.assertTrue(t.has('b'))
995
 
        self.assertFalse(t.has('a'))
 
998
        self.failUnless(t.has('b'))
 
999
        self.failIf(t.has('a'))
996
1000
 
997
1001
        self.check_transport_contents('a first file\n', t, 'b')
998
 
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
 
1002
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
999
1003
 
1000
1004
        # Overwrite a file
1001
1005
        t.put_bytes('c', 'c this file\n')
1002
1006
        t.move('c', 'b')
1003
 
        self.assertFalse(t.has('c'))
 
1007
        self.failIf(t.has('c'))
1004
1008
        self.check_transport_contents('c this file\n', t, 'b')
1005
1009
 
1006
1010
        # TODO: Try to write a test for atomicity
1038
1042
        except NotImplementedError:
1039
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
1040
1044
                              self._server.__class__)
1041
 
        t = _mod_transport.get_transport_from_url(url)
 
1045
        t = get_transport(url)
1042
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1043
1047
 
1044
1048
    def test_stat(self):
1060
1064
        for path, size in zip(paths, sizes):
1061
1065
            st = t.stat(path)
1062
1066
            if path.endswith('/'):
1063
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1067
                self.failUnless(S_ISDIR(st.st_mode))
1064
1068
                # directory sizes are meaningless
1065
1069
            else:
1066
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1070
                self.failUnless(S_ISREG(st.st_mode))
1067
1071
                self.assertEqual(size, st.st_size)
1068
1072
 
1069
1073
        remote_stats = list(t.stat_multi(paths))
1076
1080
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1077
1081
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1078
1082
        subdir = t.clone('subdir')
1079
 
        st = subdir.stat('./file')
1080
 
        st = subdir.stat('.')
 
1083
        subdir.stat('./file')
 
1084
        subdir.stat('.')
1081
1085
 
1082
1086
    def test_hardlink(self):
1083
1087
        from stat import ST_NLINK
1092
1096
        try:
1093
1097
            t.hardlink(source_name, link_name)
1094
1098
 
1095
 
            self.assertTrue(t.has(source_name))
1096
 
            self.assertTrue(t.has(link_name))
 
1099
            self.failUnless(t.has(source_name))
 
1100
            self.failUnless(t.has(link_name))
1097
1101
 
1098
1102
            st = t.stat(link_name)
1099
 
            self.assertEqual(st[ST_NLINK], 2)
 
1103
            self.failUnlessEqual(st[ST_NLINK], 2)
1100
1104
        except TransportNotPossible:
1101
1105
            raise TestSkipped("Transport %s does not support hardlinks." %
1102
1106
                              self._server.__class__)
1114
1118
        try:
1115
1119
            t.symlink(source_name, link_name)
1116
1120
 
1117
 
            self.assertTrue(t.has(source_name))
1118
 
            self.assertTrue(t.has(link_name))
 
1121
            self.failUnless(t.has(source_name))
 
1122
            self.failUnless(t.has(link_name))
1119
1123
 
1120
1124
            st = t.stat(link_name)
1121
 
            self.assertTrue(S_ISLNK(st.st_mode),
 
1125
            self.failUnless(S_ISLNK(st.st_mode),
1122
1126
                "expected symlink, got mode %o" % st.st_mode)
1123
1127
        except TransportNotPossible:
1124
1128
            raise TestSkipped("Transport %s does not support symlinks." %
1125
1129
                              self._server.__class__)
1126
1130
        except IOError:
1127
 
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1131
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1128
1132
 
1129
1133
    def test_list_dir(self):
1130
1134
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1194
1198
            raise TestSkipped("not a connected transport")
1195
1199
 
1196
1200
        t2 = t1.clone('subdir')
1197
 
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
 
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
 
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
 
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
 
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1201
        self.assertEquals(t1._scheme, t2._scheme)
 
1202
        self.assertEquals(t1._user, t2._user)
 
1203
        self.assertEquals(t1._password, t2._password)
 
1204
        self.assertEquals(t1._host, t2._host)
 
1205
        self.assertEquals(t1._port, t2._port)
1202
1206
 
1203
1207
    def test__reuse_for(self):
1204
1208
        t = self.get_transport()
1211
1215
 
1212
1216
            Only the parameters different from None will be changed.
1213
1217
            """
1214
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1215
 
            if user     is None: user     = t._parsed_url.user
1216
 
            if password is None: password = t._parsed_url.password
1217
 
            if user     is None: user     = t._parsed_url.user
1218
 
            if host     is None: host     = t._parsed_url.host
1219
 
            if port     is None: port     = t._parsed_url.port
1220
 
            if path     is None: path     = t._parsed_url.path
1221
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1218
            if scheme   is None: scheme   = t._scheme
 
1219
            if user     is None: user     = t._user
 
1220
            if password is None: password = t._password
 
1221
            if user     is None: user     = t._user
 
1222
            if host     is None: host     = t._host
 
1223
            if port     is None: port     = t._port
 
1224
            if path     is None: path     = t._path
 
1225
            return t._unsplit_url(scheme, user, password, host, port, path)
1222
1226
 
1223
 
        if t._parsed_url.scheme == 'ftp':
 
1227
        if t._scheme == 'ftp':
1224
1228
            scheme = 'sftp'
1225
1229
        else:
1226
1230
            scheme = 'ftp'
1227
1231
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
 
        if t._parsed_url.user == 'me':
 
1232
        if t._user == 'me':
1229
1233
            user = 'you'
1230
1234
        else:
1231
1235
            user = 'me'
1242
1246
        #   (they may be typed by the user when prompted for example)
1243
1247
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1244
1248
        # We will not connect, we can use a invalid host
1245
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1246
 
        if t._parsed_url.port == 1234:
 
1249
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1250
        if t._port == 1234:
1247
1251
            port = 4321
1248
1252
        else:
1249
1253
            port = 1234
1262
1266
        self.assertIs(t._get_connection(), c._get_connection())
1263
1267
 
1264
1268
        # Temporary failure, we need to create a new dummy connection
1265
 
        new_connection = None
 
1269
        new_connection = object()
1266
1270
        t._set_connection(new_connection)
1267
1271
        # Check that both transports use the same connection
1268
1272
        self.assertIs(new_connection, t._get_connection())
1290
1294
 
1291
1295
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1292
1296
 
1293
 
        self.assertTrue(t1.has('a'))
1294
 
        self.assertTrue(t1.has('b/c'))
1295
 
        self.assertFalse(t1.has('c'))
 
1297
        self.failUnless(t1.has('a'))
 
1298
        self.failUnless(t1.has('b/c'))
 
1299
        self.failIf(t1.has('c'))
1296
1300
 
1297
1301
        t2 = t1.clone('b')
1298
1302
        self.assertEqual(t1.base + 'b/', t2.base)
1299
1303
 
1300
 
        self.assertTrue(t2.has('c'))
1301
 
        self.assertFalse(t2.has('a'))
 
1304
        self.failUnless(t2.has('c'))
 
1305
        self.failIf(t2.has('a'))
1302
1306
 
1303
1307
        t3 = t2.clone('..')
1304
 
        self.assertTrue(t3.has('a'))
1305
 
        self.assertFalse(t3.has('c'))
 
1308
        self.failUnless(t3.has('a'))
 
1309
        self.failIf(t3.has('c'))
1306
1310
 
1307
 
        self.assertFalse(t1.has('b/d'))
1308
 
        self.assertFalse(t2.has('d'))
1309
 
        self.assertFalse(t3.has('b/d'))
 
1311
        self.failIf(t1.has('b/d'))
 
1312
        self.failIf(t2.has('d'))
 
1313
        self.failIf(t3.has('b/d'))
1310
1314
 
1311
1315
        if t1.is_readonly():
1312
1316
            self.build_tree_contents([('b/d', 'newfile\n')])
1313
1317
        else:
1314
1318
            t2.put_bytes('d', 'newfile\n')
1315
1319
 
1316
 
        self.assertTrue(t1.has('b/d'))
1317
 
        self.assertTrue(t2.has('d'))
1318
 
        self.assertTrue(t3.has('b/d'))
 
1320
        self.failUnless(t1.has('b/d'))
 
1321
        self.failUnless(t2.has('d'))
 
1322
        self.failUnless(t3.has('b/d'))
1319
1323
 
1320
1324
    def test_clone_to_root(self):
1321
1325
        orig_transport = self.get_transport()
1395
1399
        self.assertEqual(transport.clone("/").abspath('foo'),
1396
1400
                         transport.abspath("/foo"))
1397
1401
 
1398
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1399
1402
    def test_win32_abspath(self):
1400
1403
        # Note: we tried to set sys.platform='win32' so we could test on
1401
1404
        # other platforms too, but then osutils does platform specific
1406
1409
 
1407
1410
        # smoke test for abspath on win32.
1408
1411
        # a transport based on 'file:///' never fully qualifies the drive.
1409
 
        transport = _mod_transport.get_transport_from_url("file:///")
1410
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1412
        transport = get_transport("file:///")
 
1413
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1411
1414
 
1412
1415
        # but a transport that starts with a drive spec must keep it.
1413
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1416
        transport = get_transport("file:///C:/")
 
1417
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1415
1418
 
1416
1419
    def test_local_abspath(self):
1417
1420
        transport = self.get_transport()
1543
1546
 
1544
1547
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1545
1548
        if no_unicode_support:
1546
 
            self.knownFailure("test server cannot handle unicode paths")
 
1549
            raise tests.KnownFailure("test server cannot handle unicode paths")
1547
1550
 
1548
1551
        try:
1549
1552
            self.build_tree(files, transport=t, line_endings='binary')
1614
1617
    def test_readv(self):
1615
1618
        transport = self.get_transport()
1616
1619
        if transport.is_readonly():
1617
 
            with file('a', 'w') as f: f.write('0123456789')
 
1620
            file('a', 'w').write('0123456789')
1618
1621
        else:
1619
1622
            transport.put_bytes('a', '0123456789')
1620
1623
 
1630
1633
    def test_readv_out_of_order(self):
1631
1634
        transport = self.get_transport()
1632
1635
        if transport.is_readonly():
1633
 
            with file('a', 'w') as f: f.write('0123456789')
 
1636
            file('a', 'w').write('0123456789')
1634
1637
        else:
1635
1638
            transport.put_bytes('a', '01234567890')
1636
1639
 
1708
1711
        transport = self.get_transport()
1709
1712
        # test from observed failure case.
1710
1713
        if transport.is_readonly():
1711
 
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1714
            file('a', 'w').write('a'*1024*1024)
1712
1715
        else:
1713
1716
            transport.put_bytes('a', 'a'*1024*1024)
1714
1717
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1748
1751
    def test_readv_short_read(self):
1749
1752
        transport = self.get_transport()
1750
1753
        if transport.is_readonly():
1751
 
            with file('a', 'w') as f: f.write('0123456789')
 
1754
            file('a', 'w').write('0123456789')
1752
1755
        else:
1753
1756
            transport.put_bytes('a', '01234567890')
1754
1757
 
1764
1767
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1765
1768
                              transport.readv, 'a', [(12,2)])
1766
1769
 
1767
 
    def test_no_segment_parameters(self):
1768
 
        """Segment parameters should be stripped and stored in
1769
 
        transport.segment_parameters."""
1770
 
        transport = self.get_transport("foo")
1771
 
        self.assertEqual({}, transport.get_segment_parameters())
1772
 
 
1773
 
    def test_segment_parameters(self):
1774
 
        """Segment parameters should be stripped and stored in
1775
 
        transport.get_segment_parameters()."""
1776
 
        base_url = self._server.get_url()
1777
 
        parameters = {"key1": "val1", "key2": "val2"}
1778
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1779
 
        transport = _mod_transport.get_transport_from_url(url)
1780
 
        self.assertEqual(parameters, transport.get_segment_parameters())
1781
 
 
1782
 
    def test_set_segment_parameters(self):
1783
 
        """Segment parameters can be set and show up in base."""
1784
 
        transport = self.get_transport("foo")
1785
 
        orig_base = transport.base
1786
 
        transport.set_segment_parameter("arm", "board")
1787
 
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
 
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
 
        transport.set_segment_parameter("arm", None)
1790
 
        transport.set_segment_parameter("nonexistant", None)
1791
 
        self.assertEqual({}, transport.get_segment_parameters())
1792
 
        self.assertEqual(orig_base, transport.base)
1793
 
 
1794
1770
    def test_stat_symlink(self):
1795
1771
        # if a transport points directly to a symlink (and supports symlinks
1796
1772
        # at all) you can tell this.  helps with bug 32669.
1802
1778
        t2 = t.clone('link')
1803
1779
        st = t2.stat('')
1804
1780
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1805
 
 
1806
 
    def test_abspath_url_unquote_unreserved(self):
1807
 
        """URLs from abspath should have unreserved characters unquoted
1808
 
        
1809
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1810
 
        """
1811
 
        t = self.get_transport()
1812
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
 
        self.assertEqual(t.base + "-.09AZ_az~",
1814
 
            t.abspath(needlessly_escaped_dir))
1815
 
 
1816
 
    def test_clone_url_unquote_unreserved(self):
1817
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1818
 
        
1819
 
        Cloned transports should be prefix comparable for things like the
1820
 
        isolation checking of tests, see lp:842223 for more.
1821
 
        """
1822
 
        t1 = self.get_transport()
1823
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1825
 
        t2 = t1.clone(needlessly_escaped_dir)
1826
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1827
 
 
1828
 
    def test_hook_post_connection_one(self):
1829
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1830
 
        log = []
1831
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
 
        t = self.get_transport()
1833
 
        self.assertEqual([], log)
1834
 
        t.has("non-existant")
1835
 
        if isinstance(t, RemoteTransport):
1836
 
            self.assertEqual([t.get_smart_medium()], log)
1837
 
        elif isinstance(t, ConnectedTransport):
1838
 
            self.assertEqual([t], log)
1839
 
        else:
1840
 
            self.assertEqual([], log)
1841
 
 
1842
 
    def test_hook_post_connection_multi(self):
1843
 
        """Fire post_connect hook once per unshared underlying connection"""
1844
 
        log = []
1845
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
 
        t1 = self.get_transport()
1847
 
        t2 = t1.clone(".")
1848
 
        t3 = self.get_transport()
1849
 
        self.assertEqual([], log)
1850
 
        t1.has("x")
1851
 
        t2.has("x")
1852
 
        t3.has("x")
1853
 
        if isinstance(t1, RemoteTransport):
1854
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
 
        elif isinstance(t1, ConnectedTransport):
1856
 
            self.assertEqual([t1, t3], log)
1857
 
        else:
1858
 
            self.assertEqual([], log)