~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(jameinel) Allow 'bzr serve' to interpret SIGHUP as a graceful shutdown.
 (bug #795025) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
29
 
import unittest
30
29
 
31
30
from bzrlib import (
32
31
    errors,
33
32
    osutils,
 
33
    pyutils,
34
34
    tests,
 
35
    transport as _mod_transport,
35
36
    urlutils,
36
37
    )
37
38
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
41
 
                           LockError,
42
41
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
42
                           PathError,
45
43
                           TransportNotPossible,
46
44
                           )
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
 
    TestScenarioApplier,
52
48
    TestSkipped,
53
49
    TestNotApplicable,
 
50
    multiply_tests,
54
51
    )
 
52
from bzrlib.tests import test_server
55
53
from bzrlib.tests.test_transport import TestTransportImplementation
56
54
from bzrlib.transport import (
57
55
    ConnectedTransport,
58
 
    get_transport,
59
56
    _get_transport_modules,
60
57
    )
61
58
from bzrlib.transport.memory import MemoryTransport
62
59
 
63
60
 
64
 
class TransportTestProviderAdapter(TestScenarioApplier):
65
 
    """A tool to generate a suite testing all transports for a single test.
66
 
 
67
 
    This is done by copying the test once for each transport and injecting
68
 
    the transport_class and transport_server classes into each copy. Each copy
69
 
    is also given a new id() to make it easy to identify.
70
 
    """
71
 
 
72
 
    def __init__(self):
73
 
        self.scenarios = self._test_permutations()
74
 
 
75
 
    def get_transport_test_permutations(self, module):
76
 
        """Get the permutations module wants to have tested."""
77
 
        if getattr(module, 'get_test_permutations', None) is None:
78
 
            raise AssertionError(
79
 
                "transport module %s doesn't provide get_test_permutations()"
80
 
                % module.__name__)
81
 
            return []
82
 
        return module.get_test_permutations()
83
 
 
84
 
    def _test_permutations(self):
85
 
        """Return a list of the klass, server_factory pairs to test."""
86
 
        result = []
87
 
        for module in _get_transport_modules():
88
 
            try:
89
 
                permutations = self.get_transport_test_permutations(
90
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
91
 
                for (klass, server_factory) in permutations:
92
 
                    scenario = (server_factory.__name__,
93
 
                        {"transport_class":klass,
94
 
                         "transport_server":server_factory})
95
 
                    result.append(scenario)
96
 
            except errors.DependencyNotPresent, e:
97
 
                # Continue even if a dependency prevents us 
98
 
                # from adding this test
99
 
                pass
100
 
        return result
 
61
def get_transport_test_permutations(module):
 
62
    """Get the permutations module wants to have tested."""
 
63
    if getattr(module, 'get_test_permutations', None) is None:
 
64
        raise AssertionError(
 
65
            "transport module %s doesn't provide get_test_permutations()"
 
66
            % module.__name__)
 
67
        return []
 
68
    return module.get_test_permutations()
 
69
 
 
70
 
 
71
def transport_test_permutations():
 
72
    """Return a list of the klass, server_factory pairs to test."""
 
73
    result = []
 
74
    for module in _get_transport_modules():
 
75
        try:
 
76
            permutations = get_transport_test_permutations(
 
77
                pyutils.get_named_object(module))
 
78
            for (klass, server_factory) in permutations:
 
79
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
80
                    {"transport_class":klass,
 
81
                     "transport_server":server_factory})
 
82
                result.append(scenario)
 
83
        except errors.DependencyNotPresent, e:
 
84
            # Continue even if a dependency prevents us
 
85
            # from adding this test
 
86
            pass
 
87
    return result
101
88
 
102
89
 
103
90
def load_tests(standard_tests, module, loader):
104
91
    """Multiply tests for tranport implementations."""
105
92
    result = loader.suiteClass()
106
 
    adapter = TransportTestProviderAdapter()
107
 
    for test in tests.iter_suite_tests(standard_tests):
108
 
        result.addTests(adapter.adapt(test))
109
 
    return result
 
93
    scenarios = transport_test_permutations()
 
94
    return multiply_tests(standard_tests, scenarios, result)
110
95
 
111
96
 
112
97
class TransportTests(TestTransportImplementation):
113
98
 
114
99
    def setUp(self):
115
100
        super(TransportTests, self).setUp()
116
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
101
        self.overrideEnv('BZR_NO_SMART_VFS', None)
117
102
 
118
103
    def check_transport_contents(self, content, transport, relpath):
119
 
        """Check that transport.get(relpath).read() == content."""
120
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
104
        """Check that transport.get_bytes(relpath) == content."""
 
105
        self.assertEqualDiff(content, transport.get_bytes(relpath))
121
106
 
122
107
    def test_ensure_base_missing(self):
123
108
        """.ensure_base() should create the directory if it doesn't exist"""
167
152
        self.assertEqual(True, t.has('a'))
168
153
        self.assertEqual(False, t.has('c'))
169
154
        self.assertEqual(True, t.has(urlutils.escape('%')))
170
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
 
                [True, True, False, False, True, False, True, False])
 
155
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
156
                                           'e', 'f', 'g', 'h'])),
 
157
                         [True, True, False, False,
 
158
                          True, False, True, False])
172
159
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
 
                [True, True, False, False, True, False, True, False])
 
160
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
161
                                           urlutils.escape('%%')]))
 
162
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
163
                                                'e', 'f', 'g', 'h']))),
 
164
                         [True, True, False, False,
 
165
                          True, False, True, False])
176
166
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
167
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
178
168
 
179
169
    def test_has_root_works(self):
180
 
        from bzrlib.smart import server
181
 
        if self.transport_server is server.SmartTCPServer_for_testing:
 
170
        if self.transport_server is test_server.SmartTCPServer_for_testing:
182
171
            raise TestNotApplicable(
183
172
                "SmartTCPServer_for_testing intentionally does not allow "
184
173
                "access to /.")
210
199
        for content, f in itertools.izip(contents, content_f):
211
200
            self.assertEqual(content, f.read())
212
201
 
 
202
    def test_get_unknown_file(self):
 
203
        t = self.get_transport()
 
204
        files = ['a', 'b']
 
205
        contents = ['contents of a\n',
 
206
                    'contents of b\n',
 
207
                    ]
 
208
        self.build_tree(files, transport=t, line_endings='binary')
213
209
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
210
        def iterate_and_close(func, *args):
 
211
            for f in func(*args):
 
212
                # We call f.read() here because things like paramiko actually
 
213
                # spawn a thread to prefetch the content, which we want to
 
214
                # consume before we close the handle.
 
215
                content = f.read()
 
216
                f.close()
 
217
        self.assertRaises(NoSuchFile, iterate_and_close,
 
218
                          t.get_multi, ['a', 'b', 'c'])
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, iter(['a', 'b', 'c']))
216
221
 
217
222
    def test_get_directory_read_gives_ReadError(self):
218
223
        """consistent errors for read() on a file returned by get()."""
249
254
        for content, fname in zip(contents, files):
250
255
            self.assertEqual(content, t.get_bytes(fname))
251
256
 
 
257
    def test_get_bytes_unknown_file(self):
 
258
        t = self.get_transport()
252
259
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
253
260
 
254
261
    def test_get_with_open_write_stream_sees_all_content(self):
258
265
        handle = t.open_write_stream('foo')
259
266
        try:
260
267
            handle.write('b')
261
 
            self.assertEqual('b', t.get('foo').read())
 
268
            self.assertEqual('b', t.get_bytes('foo'))
262
269
        finally:
263
270
            handle.close()
264
271
 
270
277
        try:
271
278
            handle.write('b')
272
279
            self.assertEqual('b', t.get_bytes('foo'))
273
 
            self.assertEqual('b', t.get('foo').read())
 
280
            f = t.get('foo')
 
281
            try:
 
282
                self.assertEqual('b', f.read())
 
283
            finally:
 
284
                f.close()
274
285
        finally:
275
286
            handle.close()
276
287
 
283
294
            return
284
295
 
285
296
        t.put_bytes('a', 'some text for a\n')
286
 
        self.failUnless(t.has('a'))
 
297
        self.assertTrue(t.has('a'))
287
298
        self.check_transport_contents('some text for a\n', t, 'a')
288
299
 
289
300
        # The contents should be overwritten
301
312
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
302
313
            return
303
314
 
304
 
        self.failIf(t.has('a'))
 
315
        self.assertFalse(t.has('a'))
305
316
        t.put_bytes_non_atomic('a', 'some text for a\n')
306
 
        self.failUnless(t.has('a'))
 
317
        self.assertTrue(t.has('a'))
307
318
        self.check_transport_contents('some text for a\n', t, 'a')
308
319
        # Put also replaces contents
309
320
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
321
332
        # Now test the create_parent flag
322
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
323
334
                                       'contents\n')
324
 
        self.failIf(t.has('dir/a'))
 
335
        self.assertFalse(t.has('dir/a'))
325
336
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
326
337
                               create_parent_dir=True)
327
338
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
328
 
        
 
339
 
329
340
        # But we still get NoSuchFile if we can't make the parent dir
330
341
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
331
342
                                       'contents\n',
353
364
        umask = osutils.get_umask()
354
365
        t.put_bytes('nomode', 'test text\n', mode=None)
355
366
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
356
 
        
 
367
 
357
368
    def test_put_bytes_non_atomic_permissions(self):
358
369
        t = self.get_transport()
359
370
 
387
398
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
388
399
                               dir_mode=0777, create_parent_dir=True)
389
400
        self.assertTransportMode(t, 'dir777', 0777)
390
 
        
 
401
 
391
402
    def test_put_file(self):
392
403
        t = self.get_transport()
393
404
 
399
410
        result = t.put_file('a', StringIO('some text for a\n'))
400
411
        # put_file returns the length of the data written
401
412
        self.assertEqual(16, result)
402
 
        self.failUnless(t.has('a'))
 
413
        self.assertTrue(t.has('a'))
403
414
        self.check_transport_contents('some text for a\n', t, 'a')
404
415
        # Put also replaces contents
405
416
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
417
428
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
418
429
            return
419
430
 
420
 
        self.failIf(t.has('a'))
 
431
        self.assertFalse(t.has('a'))
421
432
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
422
 
        self.failUnless(t.has('a'))
 
433
        self.assertTrue(t.has('a'))
423
434
        self.check_transport_contents('some text for a\n', t, 'a')
424
435
        # Put also replaces contents
425
436
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
437
448
        # Now test the create_parent flag
438
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
439
450
                                       StringIO('contents\n'))
440
 
        self.failIf(t.has('dir/a'))
 
451
        self.assertFalse(t.has('dir/a'))
441
452
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
442
453
                              create_parent_dir=True)
443
454
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
444
 
        
 
455
 
445
456
        # But we still get NoSuchFile if we can't make the parent dir
446
457
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
447
458
                                       StringIO('contents\n'),
469
480
        umask = osutils.get_umask()
470
481
        t.put_file('nomode', StringIO('test text\n'), mode=None)
471
482
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
472
 
        
 
483
 
473
484
    def test_put_file_non_atomic_permissions(self):
474
485
        t = self.get_transport()
475
486
 
492
503
        umask = osutils.get_umask()
493
504
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
494
505
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
495
 
        
 
506
 
496
507
        # We should also be able to set the mode for a parent directory
497
508
        # when it is created
498
509
        sio = StringIO()
538
549
        t = self.get_transport()
539
550
 
540
551
        if t.is_readonly():
541
 
            # cannot mkdir on readonly transports. We're not testing for 
 
552
            # cannot mkdir on readonly transports. We're not testing for
542
553
            # cache coherency because cache behaviour is not currently
543
554
            # defined for the transport interface.
544
555
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
565
576
 
566
577
        # we were testing that a local mkdir followed by a transport
567
578
        # mkdir failed thusly, but given that we * in one process * do not
568
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
579
        # concurrently fiddle with disk dirs and then use transport to do
569
580
        # things, the win here seems marginal compared to the constraint on
570
581
        # the interface. RBC 20051227
571
582
        t.mkdir('dir_g')
642
653
            self.build_tree(files, transport=transport_from)
643
654
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
644
655
            for f in files:
645
 
                self.check_transport_contents(transport_to.get(f).read(),
 
656
                self.check_transport_contents(transport_to.get_bytes(f),
646
657
                                              transport_from, f)
647
658
 
648
659
        t = self.get_transport()
671
682
        files = ['a', 'b', 'c', 'd']
672
683
        t.copy_to(iter(files), temp_transport)
673
684
        for f in files:
674
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
685
            self.check_transport_contents(temp_transport.get_bytes(f),
675
686
                                          t, f)
676
687
        del temp_transport
677
688
 
681
692
            for f in files:
682
693
                self.assertTransportMode(temp_transport, f, mode)
683
694
 
 
695
    def test_create_prefix(self):
 
696
        t = self.get_transport()
 
697
        sub = t.clone('foo').clone('bar')
 
698
        try:
 
699
            sub.create_prefix()
 
700
        except TransportNotPossible:
 
701
            self.assertTrue(t.is_readonly())
 
702
        else:
 
703
            self.assertTrue(t.has('foo/bar'))
 
704
 
684
705
    def test_append_file(self):
685
706
        t = self.get_transport()
686
707
 
790
811
                t.append_file, 'f', StringIO('f'), mode=None)
791
812
            return
792
813
        t.append_file('f', StringIO('f'), mode=None)
793
 
        
 
814
 
794
815
    def test_append_bytes_mode(self):
795
816
        # check append_bytes accepts a mode
796
817
        t = self.get_transport()
799
820
                t.append_bytes, 'f', 'f', mode=None)
800
821
            return
801
822
        t.append_bytes('f', 'f', mode=None)
802
 
        
 
823
 
803
824
    def test_delete(self):
804
825
        # TODO: Test Transport.delete
805
826
        t = self.get_transport()
810
831
            return
811
832
 
812
833
        t.put_bytes('a', 'a little bit of text\n')
813
 
        self.failUnless(t.has('a'))
 
834
        self.assertTrue(t.has('a'))
814
835
        t.delete('a')
815
 
        self.failIf(t.has('a'))
 
836
        self.assertFalse(t.has('a'))
816
837
 
817
838
        self.assertRaises(NoSuchFile, t.delete, 'a')
818
839
 
824
845
        t.delete_multi(['a', 'c'])
825
846
        self.assertEqual([False, True, False],
826
847
                list(t.has_multi(['a', 'b', 'c'])))
827
 
        self.failIf(t.has('a'))
828
 
        self.failUnless(t.has('b'))
829
 
        self.failIf(t.has('c'))
 
848
        self.assertFalse(t.has('a'))
 
849
        self.assertTrue(t.has('b'))
 
850
        self.assertFalse(t.has('c'))
830
851
 
831
852
        self.assertRaises(NoSuchFile,
832
853
                t.delete_multi, ['a', 'b', 'c'])
866
887
 
867
888
    def test_rmdir_not_empty(self):
868
889
        """Deleting a non-empty directory raises an exception
869
 
        
 
890
 
870
891
        sftp (and possibly others) don't give us a specific "directory not
871
892
        empty" exception -- we can just see that the operation failed.
872
893
        """
879
900
 
880
901
    def test_rmdir_empty_but_similar_prefix(self):
881
902
        """rmdir does not get confused by sibling paths.
882
 
        
 
903
 
883
904
        A naive implementation of MemoryTransport would refuse to rmdir
884
905
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
885
906
        uses "path.startswith(dir)" on all file paths to determine if directory
893
914
        t.mkdir('foo-baz')
894
915
        t.rmdir('foo')
895
916
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
896
 
        self.failUnless(t.has('foo-bar'))
 
917
        self.assertTrue(t.has('foo-bar'))
897
918
 
898
919
    def test_rename_dir_succeeds(self):
899
920
        t = self.get_transport()
982
1003
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
983
1004
 
984
1005
        t.move('a', 'b')
985
 
        self.failUnless(t.has('b'))
986
 
        self.failIf(t.has('a'))
 
1006
        self.assertTrue(t.has('b'))
 
1007
        self.assertFalse(t.has('a'))
987
1008
 
988
1009
        self.check_transport_contents('a first file\n', t, 'b')
989
1010
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
991
1012
        # Overwrite a file
992
1013
        t.put_bytes('c', 'c this file\n')
993
1014
        t.move('c', 'b')
994
 
        self.failIf(t.has('c'))
 
1015
        self.assertFalse(t.has('c'))
995
1016
        self.check_transport_contents('c this file\n', t, 'b')
996
1017
 
997
1018
        # TODO: Try to write a test for atomicity
1021
1042
 
1022
1043
    def test_connection_error(self):
1023
1044
        """ConnectionError is raised when connection is impossible.
1024
 
        
 
1045
 
1025
1046
        The error should be raised from the first operation on the transport.
1026
1047
        """
1027
1048
        try:
1029
1050
        except NotImplementedError:
1030
1051
            raise TestSkipped("Transport %s has no bogus URL support." %
1031
1052
                              self._server.__class__)
1032
 
        t = get_transport(url)
 
1053
        t = _mod_transport.get_transport_from_url(url)
1033
1054
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1034
1055
 
1035
1056
    def test_stat(self):
1051
1072
        for path, size in zip(paths, sizes):
1052
1073
            st = t.stat(path)
1053
1074
            if path.endswith('/'):
1054
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1075
                self.assertTrue(S_ISDIR(st.st_mode))
1055
1076
                # directory sizes are meaningless
1056
1077
            else:
1057
 
                self.failUnless(S_ISREG(st.st_mode))
 
1078
                self.assertTrue(S_ISREG(st.st_mode))
1058
1079
                self.assertEqual(size, st.st_size)
1059
1080
 
1060
1081
        remote_stats = list(t.stat_multi(paths))
1067
1088
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1068
1089
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1069
1090
        subdir = t.clone('subdir')
1070
 
        subdir.stat('./file')
1071
 
        subdir.stat('.')
 
1091
        st = subdir.stat('./file')
 
1092
        st = subdir.stat('.')
 
1093
 
 
1094
    def test_hardlink(self):
 
1095
        from stat import ST_NLINK
 
1096
 
 
1097
        t = self.get_transport()
 
1098
 
 
1099
        source_name = "original_target"
 
1100
        link_name = "target_link"
 
1101
 
 
1102
        self.build_tree([source_name], transport=t)
 
1103
 
 
1104
        try:
 
1105
            t.hardlink(source_name, link_name)
 
1106
 
 
1107
            self.assertTrue(t.has(source_name))
 
1108
            self.assertTrue(t.has(link_name))
 
1109
 
 
1110
            st = t.stat(link_name)
 
1111
            self.assertEqual(st[ST_NLINK], 2)
 
1112
        except TransportNotPossible:
 
1113
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1114
                              self._server.__class__)
 
1115
 
 
1116
    def test_symlink(self):
 
1117
        from stat import S_ISLNK
 
1118
 
 
1119
        t = self.get_transport()
 
1120
 
 
1121
        source_name = "original_target"
 
1122
        link_name = "target_link"
 
1123
 
 
1124
        self.build_tree([source_name], transport=t)
 
1125
 
 
1126
        try:
 
1127
            t.symlink(source_name, link_name)
 
1128
 
 
1129
            self.assertTrue(t.has(source_name))
 
1130
            self.assertTrue(t.has(link_name))
 
1131
 
 
1132
            st = t.stat(link_name)
 
1133
            self.assertTrue(S_ISLNK(st.st_mode),
 
1134
                "expected symlink, got mode %o" % st.st_mode)
 
1135
        except TransportNotPossible:
 
1136
            raise TestSkipped("Transport %s does not support symlinks." %
 
1137
                              self._server.__class__)
 
1138
        except IOError:
 
1139
            self.knownFailure("Paramiko fails to create symlinks during tests")
1072
1140
 
1073
1141
    def test_list_dir(self):
1074
1142
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1115
1183
 
1116
1184
        self.assertListRaises(PathError, t.list_dir, 'q')
1117
1185
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1186
        # 'a' is a file, list_dir should raise an error
1118
1187
        self.assertListRaises(PathError, t.list_dir, 'a')
1119
1188
 
1120
1189
    def test_list_dir_result_is_url_escaped(self):
1137
1206
            raise TestSkipped("not a connected transport")
1138
1207
 
1139
1208
        t2 = t1.clone('subdir')
1140
 
        self.assertEquals(t1._scheme, t2._scheme)
1141
 
        self.assertEquals(t1._user, t2._user)
1142
 
        self.assertEquals(t1._password, t2._password)
1143
 
        self.assertEquals(t1._host, t2._host)
1144
 
        self.assertEquals(t1._port, t2._port)
 
1209
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1210
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1211
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1212
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1213
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1145
1214
 
1146
1215
    def test__reuse_for(self):
1147
1216
        t = self.get_transport()
1154
1223
 
1155
1224
            Only the parameters different from None will be changed.
1156
1225
            """
1157
 
            if scheme   is None: scheme   = t._scheme
1158
 
            if user     is None: user     = t._user
1159
 
            if password is None: password = t._password
1160
 
            if user     is None: user     = t._user
1161
 
            if host     is None: host     = t._host
1162
 
            if port     is None: port     = t._port
1163
 
            if path     is None: path     = t._path
1164
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1226
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1227
            if user     is None: user     = t._parsed_url.user
 
1228
            if password is None: password = t._parsed_url.password
 
1229
            if user     is None: user     = t._parsed_url.user
 
1230
            if host     is None: host     = t._parsed_url.host
 
1231
            if port     is None: port     = t._parsed_url.port
 
1232
            if path     is None: path     = t._parsed_url.path
 
1233
            return str(urlutils.URL(scheme, user, password, host, port, path))
1165
1234
 
1166
 
        if t._scheme == 'ftp':
 
1235
        if t._parsed_url.scheme == 'ftp':
1167
1236
            scheme = 'sftp'
1168
1237
        else:
1169
1238
            scheme = 'ftp'
1170
1239
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1171
 
        if t._user == 'me':
 
1240
        if t._parsed_url.user == 'me':
1172
1241
            user = 'you'
1173
1242
        else:
1174
1243
            user = 'me'
1185
1254
        #   (they may be typed by the user when prompted for example)
1186
1255
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1187
1256
        # We will not connect, we can use a invalid host
1188
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1189
 
        if t._port == 1234:
 
1257
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1258
        if t._parsed_url.port == 1234:
1190
1259
            port = 4321
1191
1260
        else:
1192
1261
            port = 1234
1205
1274
        self.assertIs(t._get_connection(), c._get_connection())
1206
1275
 
1207
1276
        # Temporary failure, we need to create a new dummy connection
1208
 
        new_connection = object()
 
1277
        new_connection = None
1209
1278
        t._set_connection(new_connection)
1210
1279
        # Check that both transports use the same connection
1211
1280
        self.assertIs(new_connection, t._get_connection())
1233
1302
 
1234
1303
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1235
1304
 
1236
 
        self.failUnless(t1.has('a'))
1237
 
        self.failUnless(t1.has('b/c'))
1238
 
        self.failIf(t1.has('c'))
 
1305
        self.assertTrue(t1.has('a'))
 
1306
        self.assertTrue(t1.has('b/c'))
 
1307
        self.assertFalse(t1.has('c'))
1239
1308
 
1240
1309
        t2 = t1.clone('b')
1241
1310
        self.assertEqual(t1.base + 'b/', t2.base)
1242
1311
 
1243
 
        self.failUnless(t2.has('c'))
1244
 
        self.failIf(t2.has('a'))
 
1312
        self.assertTrue(t2.has('c'))
 
1313
        self.assertFalse(t2.has('a'))
1245
1314
 
1246
1315
        t3 = t2.clone('..')
1247
 
        self.failUnless(t3.has('a'))
1248
 
        self.failIf(t3.has('c'))
 
1316
        self.assertTrue(t3.has('a'))
 
1317
        self.assertFalse(t3.has('c'))
1249
1318
 
1250
 
        self.failIf(t1.has('b/d'))
1251
 
        self.failIf(t2.has('d'))
1252
 
        self.failIf(t3.has('b/d'))
 
1319
        self.assertFalse(t1.has('b/d'))
 
1320
        self.assertFalse(t2.has('d'))
 
1321
        self.assertFalse(t3.has('b/d'))
1253
1322
 
1254
1323
        if t1.is_readonly():
1255
1324
            self.build_tree_contents([('b/d', 'newfile\n')])
1256
1325
        else:
1257
1326
            t2.put_bytes('d', 'newfile\n')
1258
1327
 
1259
 
        self.failUnless(t1.has('b/d'))
1260
 
        self.failUnless(t2.has('d'))
1261
 
        self.failUnless(t3.has('b/d'))
 
1328
        self.assertTrue(t1.has('b/d'))
 
1329
        self.assertTrue(t2.has('d'))
 
1330
        self.assertTrue(t3.has('b/d'))
1262
1331
 
1263
1332
    def test_clone_to_root(self):
1264
1333
        orig_transport = self.get_transport()
1338
1407
        self.assertEqual(transport.clone("/").abspath('foo'),
1339
1408
                         transport.abspath("/foo"))
1340
1409
 
 
1410
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1341
1411
    def test_win32_abspath(self):
1342
1412
        # Note: we tried to set sys.platform='win32' so we could test on
1343
1413
        # other platforms too, but then osutils does platform specific
1348
1418
 
1349
1419
        # smoke test for abspath on win32.
1350
1420
        # a transport based on 'file:///' never fully qualifies the drive.
1351
 
        transport = get_transport("file:///")
1352
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1421
        transport = _mod_transport.get_transport_from_url("file:///")
 
1422
        self.assertEqual(transport.abspath("/"), "file:///")
1353
1423
 
1354
1424
        # but a transport that starts with a drive spec must keep it.
1355
 
        transport = get_transport("file:///C:/")
1356
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1425
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1426
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1357
1427
 
1358
1428
    def test_local_abspath(self):
1359
1429
        transport = self.get_transport()
1483
1553
                 u'\u65e5', # Kanji person
1484
1554
                ]
1485
1555
 
 
1556
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1557
        if no_unicode_support:
 
1558
            self.knownFailure("test server cannot handle unicode paths")
 
1559
 
1486
1560
        try:
1487
1561
            self.build_tree(files, transport=t, line_endings='binary')
1488
1562
        except UnicodeError:
1506
1580
        transport.put_bytes('foo', 'bar')
1507
1581
        transport3 = self.get_transport()
1508
1582
        self.check_transport_contents('bar', transport3, 'foo')
1509
 
        # its base should be usable.
1510
 
        transport4 = get_transport(transport.base)
1511
 
        self.check_transport_contents('bar', transport4, 'foo')
1512
1583
 
1513
1584
        # now opening at a relative url should give use a sane result:
1514
1585
        transport.mkdir('newdir')
1515
 
        transport5 = get_transport(transport.base + "newdir")
 
1586
        transport5 = self.get_transport('newdir')
1516
1587
        transport6 = transport5.clone('..')
1517
1588
        self.check_transport_contents('bar', transport6, 'foo')
1518
1589
 
1555
1626
    def test_readv(self):
1556
1627
        transport = self.get_transport()
1557
1628
        if transport.is_readonly():
1558
 
            file('a', 'w').write('0123456789')
 
1629
            with file('a', 'w') as f: f.write('0123456789')
1559
1630
        else:
1560
1631
            transport.put_bytes('a', '0123456789')
1561
1632
 
1571
1642
    def test_readv_out_of_order(self):
1572
1643
        transport = self.get_transport()
1573
1644
        if transport.is_readonly():
1574
 
            file('a', 'w').write('0123456789')
 
1645
            with file('a', 'w') as f: f.write('0123456789')
1575
1646
        else:
1576
1647
            transport.put_bytes('a', '01234567890')
1577
1648
 
1649
1720
        transport = self.get_transport()
1650
1721
        # test from observed failure case.
1651
1722
        if transport.is_readonly():
1652
 
            file('a', 'w').write('a'*1024*1024)
 
1723
            with file('a', 'w') as f: f.write('a'*1024*1024)
1653
1724
        else:
1654
1725
            transport.put_bytes('a', 'a'*1024*1024)
1655
1726
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1689
1760
    def test_readv_short_read(self):
1690
1761
        transport = self.get_transport()
1691
1762
        if transport.is_readonly():
1692
 
            file('a', 'w').write('0123456789')
 
1763
            with file('a', 'w') as f: f.write('0123456789')
1693
1764
        else:
1694
1765
            transport.put_bytes('a', '01234567890')
1695
1766
 
1704
1775
        # also raise a special error
1705
1776
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1706
1777
                              transport.readv, 'a', [(12,2)])
 
1778
 
 
1779
    def test_no_segment_parameters(self):
 
1780
        """Segment parameters should be stripped and stored in
 
1781
        transport.segment_parameters."""
 
1782
        transport = self.get_transport("foo")
 
1783
        self.assertEquals({}, transport.get_segment_parameters())
 
1784
 
 
1785
    def test_segment_parameters(self):
 
1786
        """Segment parameters should be stripped and stored in
 
1787
        transport.get_segment_parameters()."""
 
1788
        base_url = self._server.get_url()
 
1789
        parameters = {"key1": "val1", "key2": "val2"}
 
1790
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1791
        transport = _mod_transport.get_transport_from_url(url)
 
1792
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))