~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Patch Queue Manager
  • Date: 2016-04-21 04:10:52 UTC
  • mfrom: (6616.1.1 fix-en-user-guide)
  • Revision ID: pqm@pqm.ubuntu.com-20160421041052-clcye7ns1qcl2n7w
(richard-wilbur) Ensure build of English use guide always uses English text
 even when user's locale specifies a different language. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
26
26
from StringIO import StringIO as pyStringIO
27
27
import stat
28
28
import sys
29
 
import unittest
30
29
 
31
30
from bzrlib import (
32
31
    errors,
33
32
    osutils,
 
33
    pyutils,
34
34
    tests,
 
35
    transport as _mod_transport,
35
36
    urlutils,
36
37
    )
37
38
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
39
                           FileExists,
40
40
                           InvalidURL,
41
 
                           LockError,
42
41
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
42
                           PathError,
45
43
                           TransportNotPossible,
46
44
                           )
47
45
from bzrlib.osutils import getcwd
48
46
from bzrlib.smart import medium
49
47
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
 
    TestScenarioApplier,
52
48
    TestSkipped,
53
49
    TestNotApplicable,
 
50
    multiply_tests,
54
51
    )
 
52
from bzrlib.tests import test_server
55
53
from bzrlib.tests.test_transport import TestTransportImplementation
56
54
from bzrlib.transport import (
57
55
    ConnectedTransport,
58
 
    get_transport,
 
56
    Transport,
59
57
    _get_transport_modules,
60
58
    )
61
59
from bzrlib.transport.memory import MemoryTransport
62
 
 
63
 
 
64
 
class TransportTestProviderAdapter(TestScenarioApplier):
65
 
    """A tool to generate a suite testing all transports for a single test.
66
 
 
67
 
    This is done by copying the test once for each transport and injecting
68
 
    the transport_class and transport_server classes into each copy. Each copy
69
 
    is also given a new id() to make it easy to identify.
70
 
    """
71
 
 
72
 
    def __init__(self):
73
 
        self.scenarios = self._test_permutations()
74
 
 
75
 
    def get_transport_test_permutations(self, module):
76
 
        """Get the permutations module wants to have tested."""
77
 
        if getattr(module, 'get_test_permutations', None) is None:
78
 
            raise AssertionError(
79
 
                "transport module %s doesn't provide get_test_permutations()"
80
 
                % module.__name__)
81
 
            return []
82
 
        return module.get_test_permutations()
83
 
 
84
 
    def _test_permutations(self):
85
 
        """Return a list of the klass, server_factory pairs to test."""
86
 
        result = []
87
 
        for module in _get_transport_modules():
88
 
            try:
89
 
                permutations = self.get_transport_test_permutations(
90
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
91
 
                for (klass, server_factory) in permutations:
92
 
                    scenario = (server_factory.__name__,
93
 
                        {"transport_class":klass,
94
 
                         "transport_server":server_factory})
95
 
                    result.append(scenario)
96
 
            except errors.DependencyNotPresent, e:
97
 
                # Continue even if a dependency prevents us 
98
 
                # from adding this test
99
 
                pass
100
 
        return result
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
101
90
 
102
91
 
103
92
def load_tests(standard_tests, module, loader):
104
93
    """Multiply tests for tranport implementations."""
105
94
    result = loader.suiteClass()
106
 
    adapter = TransportTestProviderAdapter()
107
 
    for test in tests.iter_suite_tests(standard_tests):
108
 
        result.addTests(adapter.adapt(test))
109
 
    return result
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
110
97
 
111
98
 
112
99
class TransportTests(TestTransportImplementation):
113
100
 
114
101
    def setUp(self):
115
102
        super(TransportTests, self).setUp()
116
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
117
104
 
118
105
    def check_transport_contents(self, content, transport, relpath):
119
 
        """Check that transport.get(relpath).read() == content."""
120
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
121
108
 
122
109
    def test_ensure_base_missing(self):
123
110
        """.ensure_base() should create the directory if it doesn't exist"""
167
154
        self.assertEqual(True, t.has('a'))
168
155
        self.assertEqual(False, t.has('c'))
169
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
170
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
 
                [True, True, False, False, True, False, True, False])
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
172
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
 
                [True, True, False, False, True, False, True, False])
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
176
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
178
170
 
179
171
    def test_has_root_works(self):
180
 
        from bzrlib.smart import server
181
 
        if self.transport_server is server.SmartTCPServer_for_testing:
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
182
173
            raise TestNotApplicable(
183
174
                "SmartTCPServer_for_testing intentionally does not allow "
184
175
                "access to /.")
210
201
        for content, f in itertools.izip(contents, content_f):
211
202
            self.assertEqual(content, f.read())
212
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
213
211
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
216
223
 
217
224
    def test_get_directory_read_gives_ReadError(self):
218
225
        """consistent errors for read() on a file returned by get()."""
249
256
        for content, fname in zip(contents, files):
250
257
            self.assertEqual(content, t.get_bytes(fname))
251
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
252
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
253
262
 
254
263
    def test_get_with_open_write_stream_sees_all_content(self):
258
267
        handle = t.open_write_stream('foo')
259
268
        try:
260
269
            handle.write('b')
261
 
            self.assertEqual('b', t.get('foo').read())
 
270
            self.assertEqual('b', t.get_bytes('foo'))
262
271
        finally:
263
272
            handle.close()
264
273
 
270
279
        try:
271
280
            handle.write('b')
272
281
            self.assertEqual('b', t.get_bytes('foo'))
273
 
            self.assertEqual('b', t.get('foo').read())
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
274
287
        finally:
275
288
            handle.close()
276
289
 
283
296
            return
284
297
 
285
298
        t.put_bytes('a', 'some text for a\n')
286
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
287
300
        self.check_transport_contents('some text for a\n', t, 'a')
288
301
 
289
302
        # The contents should be overwritten
301
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
302
315
            return
303
316
 
304
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
305
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
306
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
307
320
        self.check_transport_contents('some text for a\n', t, 'a')
308
321
        # Put also replaces contents
309
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
321
334
        # Now test the create_parent flag
322
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
323
336
                                       'contents\n')
324
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
325
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
326
339
                               create_parent_dir=True)
327
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
328
 
        
 
341
 
329
342
        # But we still get NoSuchFile if we can't make the parent dir
330
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
331
344
                                       'contents\n',
353
366
        umask = osutils.get_umask()
354
367
        t.put_bytes('nomode', 'test text\n', mode=None)
355
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
356
 
        
 
369
 
357
370
    def test_put_bytes_non_atomic_permissions(self):
358
371
        t = self.get_transport()
359
372
 
387
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
388
401
                               dir_mode=0777, create_parent_dir=True)
389
402
        self.assertTransportMode(t, 'dir777', 0777)
390
 
        
 
403
 
391
404
    def test_put_file(self):
392
405
        t = self.get_transport()
393
406
 
399
412
        result = t.put_file('a', StringIO('some text for a\n'))
400
413
        # put_file returns the length of the data written
401
414
        self.assertEqual(16, result)
402
 
        self.failUnless(t.has('a'))
 
415
        self.assertTrue(t.has('a'))
403
416
        self.check_transport_contents('some text for a\n', t, 'a')
404
417
        # Put also replaces contents
405
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
417
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
418
431
            return
419
432
 
420
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
421
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
422
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
423
436
        self.check_transport_contents('some text for a\n', t, 'a')
424
437
        # Put also replaces contents
425
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
437
450
        # Now test the create_parent flag
438
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
439
452
                                       StringIO('contents\n'))
440
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
441
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
442
455
                              create_parent_dir=True)
443
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
444
 
        
 
457
 
445
458
        # But we still get NoSuchFile if we can't make the parent dir
446
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
447
460
                                       StringIO('contents\n'),
469
482
        umask = osutils.get_umask()
470
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
471
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
472
 
        
 
485
 
473
486
    def test_put_file_non_atomic_permissions(self):
474
487
        t = self.get_transport()
475
488
 
492
505
        umask = osutils.get_umask()
493
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
494
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
495
 
        
 
508
 
496
509
        # We should also be able to set the mode for a parent directory
497
510
        # when it is created
498
511
        sio = StringIO()
507
520
        self.assertTransportMode(t, 'dir777', 0777)
508
521
 
509
522
    def test_put_bytes_unicode(self):
510
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
511
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
512
 
        # (we don't want to encode unicode here at all, callers should be
513
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
514
 
        # compatibility.  At some point we should use a specific exception.
515
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
516
523
        t = self.get_transport()
517
524
        if t.is_readonly():
518
525
            return
519
526
        unicode_string = u'\u1234'
520
 
        self.assertRaises(
521
 
            (AssertionError, UnicodeEncodeError),
522
 
            t.put_bytes, 'foo', unicode_string)
523
 
 
524
 
    def test_put_file_unicode(self):
525
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
526
 
        # This situation can happen (and has) if code is careless about the type
527
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
528
 
        # cStringIO, because it never returns unicode from read.
529
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
530
 
        # raise, but we raise it for hysterical raisins.
531
 
        t = self.get_transport()
532
 
        if t.is_readonly():
533
 
            return
534
 
        unicode_file = pyStringIO(u'\u1234')
535
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
536
528
 
537
529
    def test_mkdir(self):
538
530
        t = self.get_transport()
539
531
 
540
532
        if t.is_readonly():
541
 
            # cannot mkdir on readonly transports. We're not testing for 
 
533
            # cannot mkdir on readonly transports. We're not testing for
542
534
            # cache coherency because cache behaviour is not currently
543
535
            # defined for the transport interface.
544
536
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
565
557
 
566
558
        # we were testing that a local mkdir followed by a transport
567
559
        # mkdir failed thusly, but given that we * in one process * do not
568
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
560
        # concurrently fiddle with disk dirs and then use transport to do
569
561
        # things, the win here seems marginal compared to the constraint on
570
562
        # the interface. RBC 20051227
571
563
        t.mkdir('dir_g')
617
609
    def test_opening_a_file_stream_can_set_mode(self):
618
610
        t = self.get_transport()
619
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
620
614
            return
621
615
        if not t._can_roundtrip_unix_modebits():
622
616
            # Can't roundtrip, so no need to run this test
623
617
            return
 
618
 
624
619
        def check_mode(name, mode, expected):
625
620
            handle = t.open_write_stream(name, mode=mode)
626
621
            handle.close()
642
637
            self.build_tree(files, transport=transport_from)
643
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
644
639
            for f in files:
645
 
                self.check_transport_contents(transport_to.get(f).read(),
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
646
641
                                              transport_from, f)
647
642
 
648
643
        t = self.get_transport()
671
666
        files = ['a', 'b', 'c', 'd']
672
667
        t.copy_to(iter(files), temp_transport)
673
668
        for f in files:
674
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
669
            self.check_transport_contents(temp_transport.get_bytes(f),
675
670
                                          t, f)
676
671
        del temp_transport
677
672
 
681
676
            for f in files:
682
677
                self.assertTransportMode(temp_transport, f, mode)
683
678
 
 
679
    def test_create_prefix(self):
 
680
        t = self.get_transport()
 
681
        sub = t.clone('foo').clone('bar')
 
682
        try:
 
683
            sub.create_prefix()
 
684
        except TransportNotPossible:
 
685
            self.assertTrue(t.is_readonly())
 
686
        else:
 
687
            self.assertTrue(t.has('foo/bar'))
 
688
 
684
689
    def test_append_file(self):
685
690
        t = self.get_transport()
686
691
 
790
795
                t.append_file, 'f', StringIO('f'), mode=None)
791
796
            return
792
797
        t.append_file('f', StringIO('f'), mode=None)
793
 
        
 
798
 
794
799
    def test_append_bytes_mode(self):
795
800
        # check append_bytes accepts a mode
796
801
        t = self.get_transport()
799
804
                t.append_bytes, 'f', 'f', mode=None)
800
805
            return
801
806
        t.append_bytes('f', 'f', mode=None)
802
 
        
 
807
 
803
808
    def test_delete(self):
804
809
        # TODO: Test Transport.delete
805
810
        t = self.get_transport()
810
815
            return
811
816
 
812
817
        t.put_bytes('a', 'a little bit of text\n')
813
 
        self.failUnless(t.has('a'))
 
818
        self.assertTrue(t.has('a'))
814
819
        t.delete('a')
815
 
        self.failIf(t.has('a'))
 
820
        self.assertFalse(t.has('a'))
816
821
 
817
822
        self.assertRaises(NoSuchFile, t.delete, 'a')
818
823
 
824
829
        t.delete_multi(['a', 'c'])
825
830
        self.assertEqual([False, True, False],
826
831
                list(t.has_multi(['a', 'b', 'c'])))
827
 
        self.failIf(t.has('a'))
828
 
        self.failUnless(t.has('b'))
829
 
        self.failIf(t.has('c'))
 
832
        self.assertFalse(t.has('a'))
 
833
        self.assertTrue(t.has('b'))
 
834
        self.assertFalse(t.has('c'))
830
835
 
831
836
        self.assertRaises(NoSuchFile,
832
837
                t.delete_multi, ['a', 'b', 'c'])
866
871
 
867
872
    def test_rmdir_not_empty(self):
868
873
        """Deleting a non-empty directory raises an exception
869
 
        
 
874
 
870
875
        sftp (and possibly others) don't give us a specific "directory not
871
876
        empty" exception -- we can just see that the operation failed.
872
877
        """
879
884
 
880
885
    def test_rmdir_empty_but_similar_prefix(self):
881
886
        """rmdir does not get confused by sibling paths.
882
 
        
 
887
 
883
888
        A naive implementation of MemoryTransport would refuse to rmdir
884
889
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
885
890
        uses "path.startswith(dir)" on all file paths to determine if directory
893
898
        t.mkdir('foo-baz')
894
899
        t.rmdir('foo')
895
900
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
896
 
        self.failUnless(t.has('foo-bar'))
 
901
        self.assertTrue(t.has('foo-bar'))
897
902
 
898
903
    def test_rename_dir_succeeds(self):
899
904
        t = self.get_transport()
900
905
        if t.is_readonly():
901
 
            raise TestSkipped("transport is readonly")
 
906
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
907
                              t.rename, 'foo', 'bar')
 
908
            return
902
909
        t.mkdir('adir')
903
910
        t.mkdir('adir/asubdir')
904
911
        t.rename('adir', 'bdir')
909
916
        """Attempting to replace a nonemtpy directory should fail"""
910
917
        t = self.get_transport()
911
918
        if t.is_readonly():
912
 
            raise TestSkipped("transport is readonly")
 
919
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
920
                              t.rename, 'foo', 'bar')
 
921
            return
913
922
        t.mkdir('adir')
914
923
        t.mkdir('adir/asubdir')
915
924
        t.mkdir('bdir')
979
988
        # perhaps all of this could be done in a subdirectory
980
989
 
981
990
        t.put_bytes('a', 'a first file\n')
982
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
991
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
983
992
 
984
993
        t.move('a', 'b')
985
 
        self.failUnless(t.has('b'))
986
 
        self.failIf(t.has('a'))
 
994
        self.assertTrue(t.has('b'))
 
995
        self.assertFalse(t.has('a'))
987
996
 
988
997
        self.check_transport_contents('a first file\n', t, 'b')
989
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
998
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
990
999
 
991
1000
        # Overwrite a file
992
1001
        t.put_bytes('c', 'c this file\n')
993
1002
        t.move('c', 'b')
994
 
        self.failIf(t.has('c'))
 
1003
        self.assertFalse(t.has('c'))
995
1004
        self.check_transport_contents('c this file\n', t, 'b')
996
1005
 
997
1006
        # TODO: Try to write a test for atomicity
1021
1030
 
1022
1031
    def test_connection_error(self):
1023
1032
        """ConnectionError is raised when connection is impossible.
1024
 
        
 
1033
 
1025
1034
        The error should be raised from the first operation on the transport.
1026
1035
        """
1027
1036
        try:
1029
1038
        except NotImplementedError:
1030
1039
            raise TestSkipped("Transport %s has no bogus URL support." %
1031
1040
                              self._server.__class__)
1032
 
        t = get_transport(url)
 
1041
        t = _mod_transport.get_transport_from_url(url)
1033
1042
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1034
1043
 
1035
1044
    def test_stat(self):
1051
1060
        for path, size in zip(paths, sizes):
1052
1061
            st = t.stat(path)
1053
1062
            if path.endswith('/'):
1054
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1063
                self.assertTrue(S_ISDIR(st.st_mode))
1055
1064
                # directory sizes are meaningless
1056
1065
            else:
1057
 
                self.failUnless(S_ISREG(st.st_mode))
 
1066
                self.assertTrue(S_ISREG(st.st_mode))
1058
1067
                self.assertEqual(size, st.st_size)
1059
1068
 
1060
1069
        remote_stats = list(t.stat_multi(paths))
1067
1076
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1068
1077
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1069
1078
        subdir = t.clone('subdir')
1070
 
        subdir.stat('./file')
1071
 
        subdir.stat('.')
 
1079
        st = subdir.stat('./file')
 
1080
        st = subdir.stat('.')
 
1081
 
 
1082
    def test_hardlink(self):
 
1083
        from stat import ST_NLINK
 
1084
 
 
1085
        t = self.get_transport()
 
1086
 
 
1087
        source_name = "original_target"
 
1088
        link_name = "target_link"
 
1089
 
 
1090
        self.build_tree([source_name], transport=t)
 
1091
 
 
1092
        try:
 
1093
            t.hardlink(source_name, link_name)
 
1094
 
 
1095
            self.assertTrue(t.has(source_name))
 
1096
            self.assertTrue(t.has(link_name))
 
1097
 
 
1098
            st = t.stat(link_name)
 
1099
            self.assertEqual(st[ST_NLINK], 2)
 
1100
        except TransportNotPossible:
 
1101
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1102
                              self._server.__class__)
 
1103
 
 
1104
    def test_symlink(self):
 
1105
        from stat import S_ISLNK
 
1106
 
 
1107
        t = self.get_transport()
 
1108
 
 
1109
        source_name = "original_target"
 
1110
        link_name = "target_link"
 
1111
 
 
1112
        self.build_tree([source_name], transport=t)
 
1113
 
 
1114
        try:
 
1115
            t.symlink(source_name, link_name)
 
1116
 
 
1117
            self.assertTrue(t.has(source_name))
 
1118
            self.assertTrue(t.has(link_name))
 
1119
 
 
1120
            st = t.stat(link_name)
 
1121
            self.assertTrue(S_ISLNK(st.st_mode),
 
1122
                "expected symlink, got mode %o" % st.st_mode)
 
1123
        except TransportNotPossible:
 
1124
            raise TestSkipped("Transport %s does not support symlinks." %
 
1125
                              self._server.__class__)
 
1126
        except IOError:
 
1127
            self.knownFailure("Paramiko fails to create symlinks during tests")
1072
1128
 
1073
1129
    def test_list_dir(self):
1074
1130
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1115
1171
 
1116
1172
        self.assertListRaises(PathError, t.list_dir, 'q')
1117
1173
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1174
        # 'a' is a file, list_dir should raise an error
1118
1175
        self.assertListRaises(PathError, t.list_dir, 'a')
1119
1176
 
1120
1177
    def test_list_dir_result_is_url_escaped(self):
1137
1194
            raise TestSkipped("not a connected transport")
1138
1195
 
1139
1196
        t2 = t1.clone('subdir')
1140
 
        self.assertEquals(t1._scheme, t2._scheme)
1141
 
        self.assertEquals(t1._user, t2._user)
1142
 
        self.assertEquals(t1._password, t2._password)
1143
 
        self.assertEquals(t1._host, t2._host)
1144
 
        self.assertEquals(t1._port, t2._port)
 
1197
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1145
1202
 
1146
1203
    def test__reuse_for(self):
1147
1204
        t = self.get_transport()
1154
1211
 
1155
1212
            Only the parameters different from None will be changed.
1156
1213
            """
1157
 
            if scheme   is None: scheme   = t._scheme
1158
 
            if user     is None: user     = t._user
1159
 
            if password is None: password = t._password
1160
 
            if user     is None: user     = t._user
1161
 
            if host     is None: host     = t._host
1162
 
            if port     is None: port     = t._port
1163
 
            if path     is None: path     = t._path
1164
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
1165
1222
 
1166
 
        if t._scheme == 'ftp':
 
1223
        if t._parsed_url.scheme == 'ftp':
1167
1224
            scheme = 'sftp'
1168
1225
        else:
1169
1226
            scheme = 'ftp'
1170
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1171
 
        if t._user == 'me':
 
1228
        if t._parsed_url.user == 'me':
1172
1229
            user = 'you'
1173
1230
        else:
1174
1231
            user = 'me'
1185
1242
        #   (they may be typed by the user when prompted for example)
1186
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1187
1244
        # We will not connect, we can use a invalid host
1188
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1189
 
        if t._port == 1234:
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
1190
1247
            port = 4321
1191
1248
        else:
1192
1249
            port = 1234
1205
1262
        self.assertIs(t._get_connection(), c._get_connection())
1206
1263
 
1207
1264
        # Temporary failure, we need to create a new dummy connection
1208
 
        new_connection = object()
 
1265
        new_connection = None
1209
1266
        t._set_connection(new_connection)
1210
1267
        # Check that both transports use the same connection
1211
1268
        self.assertIs(new_connection, t._get_connection())
1233
1290
 
1234
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1235
1292
 
1236
 
        self.failUnless(t1.has('a'))
1237
 
        self.failUnless(t1.has('b/c'))
1238
 
        self.failIf(t1.has('c'))
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
1239
1296
 
1240
1297
        t2 = t1.clone('b')
1241
1298
        self.assertEqual(t1.base + 'b/', t2.base)
1242
1299
 
1243
 
        self.failUnless(t2.has('c'))
1244
 
        self.failIf(t2.has('a'))
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
1245
1302
 
1246
1303
        t3 = t2.clone('..')
1247
 
        self.failUnless(t3.has('a'))
1248
 
        self.failIf(t3.has('c'))
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
1249
1306
 
1250
 
        self.failIf(t1.has('b/d'))
1251
 
        self.failIf(t2.has('d'))
1252
 
        self.failIf(t3.has('b/d'))
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
1253
1310
 
1254
1311
        if t1.is_readonly():
1255
 
            open('b/d', 'wb').write('newfile\n')
 
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
1256
1313
        else:
1257
1314
            t2.put_bytes('d', 'newfile\n')
1258
1315
 
1259
 
        self.failUnless(t1.has('b/d'))
1260
 
        self.failUnless(t2.has('d'))
1261
 
        self.failUnless(t3.has('b/d'))
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
1262
1319
 
1263
1320
    def test_clone_to_root(self):
1264
1321
        orig_transport = self.get_transport()
1338
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
1339
1396
                         transport.abspath("/foo"))
1340
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1399
    def test_win32_abspath(self):
 
1400
        # Note: we tried to set sys.platform='win32' so we could test on
 
1401
        # other platforms too, but then osutils does platform specific
 
1402
        # things at import time which defeated us...
 
1403
        if sys.platform != 'win32':
 
1404
            raise TestSkipped(
 
1405
                'Testing drive letters in abspath implemented only for win32')
 
1406
 
 
1407
        # smoke test for abspath on win32.
 
1408
        # a transport based on 'file:///' never fully qualifies the drive.
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
 
 
1412
        # but a transport that starts with a drive spec must keep it.
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
 
1341
1416
    def test_local_abspath(self):
1342
1417
        transport = self.get_transport()
1343
1418
        try:
1420
1495
                         'to/dir/b%2525z',
1421
1496
                         'to/bar',]))
1422
1497
 
 
1498
    def test_copy_tree_to_transport(self):
 
1499
        transport = self.get_transport()
 
1500
        if not transport.listable():
 
1501
            self.assertRaises(TransportNotPossible,
 
1502
                              transport.iter_files_recursive)
 
1503
            return
 
1504
        if transport.is_readonly():
 
1505
            return
 
1506
        self.build_tree(['from/',
 
1507
                         'from/dir/',
 
1508
                         'from/dir/foo',
 
1509
                         'from/dir/bar',
 
1510
                         'from/dir/b%25z', # make sure quoting is correct
 
1511
                         'from/bar'],
 
1512
                        transport=transport)
 
1513
        from_transport = transport.clone('from')
 
1514
        to_transport = transport.clone('to')
 
1515
        to_transport.ensure_base()
 
1516
        from_transport.copy_tree_to_transport(to_transport)
 
1517
        paths = set(transport.iter_files_recursive())
 
1518
        self.assertEqual(paths,
 
1519
                    set(['from/dir/foo',
 
1520
                         'from/dir/bar',
 
1521
                         'from/dir/b%2525z',
 
1522
                         'from/bar',
 
1523
                         'to/dir/foo',
 
1524
                         'to/dir/bar',
 
1525
                         'to/dir/b%2525z',
 
1526
                         'to/bar',]))
 
1527
 
1423
1528
    def test_unicode_paths(self):
1424
1529
        """Test that we can read/write files with Unicode names."""
1425
1530
        t = self.get_transport()
1436
1541
                 u'\u65e5', # Kanji person
1437
1542
                ]
1438
1543
 
 
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1545
        if no_unicode_support:
 
1546
            self.knownFailure("test server cannot handle unicode paths")
 
1547
 
1439
1548
        try:
1440
1549
            self.build_tree(files, transport=t, line_endings='binary')
1441
1550
        except UnicodeError:
1459
1568
        transport.put_bytes('foo', 'bar')
1460
1569
        transport3 = self.get_transport()
1461
1570
        self.check_transport_contents('bar', transport3, 'foo')
1462
 
        # its base should be usable.
1463
 
        transport4 = get_transport(transport.base)
1464
 
        self.check_transport_contents('bar', transport4, 'foo')
1465
1571
 
1466
1572
        # now opening at a relative url should give use a sane result:
1467
1573
        transport.mkdir('newdir')
1468
 
        transport5 = get_transport(transport.base + "newdir")
 
1574
        transport5 = self.get_transport('newdir')
1469
1575
        transport6 = transport5.clone('..')
1470
1576
        self.check_transport_contents('bar', transport6, 'foo')
1471
1577
 
1508
1614
    def test_readv(self):
1509
1615
        transport = self.get_transport()
1510
1616
        if transport.is_readonly():
1511
 
            file('a', 'w').write('0123456789')
 
1617
            with file('a', 'w') as f: f.write('0123456789')
1512
1618
        else:
1513
1619
            transport.put_bytes('a', '0123456789')
1514
1620
 
1524
1630
    def test_readv_out_of_order(self):
1525
1631
        transport = self.get_transport()
1526
1632
        if transport.is_readonly():
1527
 
            file('a', 'w').write('0123456789')
 
1633
            with file('a', 'w') as f: f.write('0123456789')
1528
1634
        else:
1529
1635
            transport.put_bytes('a', '01234567890')
1530
1636
 
1546
1652
        content = osutils.rand_bytes(200*1024)
1547
1653
        content_size = len(content)
1548
1654
        if transport.is_readonly():
1549
 
            file('a', 'w').write(content)
 
1655
            self.build_tree_contents([('a', content)])
1550
1656
        else:
1551
1657
            transport.put_bytes('a', content)
1552
1658
        def check_result_data(result_vector):
1602
1708
        transport = self.get_transport()
1603
1709
        # test from observed failure case.
1604
1710
        if transport.is_readonly():
1605
 
            file('a', 'w').write('a'*1024*1024)
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
1606
1712
        else:
1607
1713
            transport.put_bytes('a', 'a'*1024*1024)
1608
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1642
1748
    def test_readv_short_read(self):
1643
1749
        transport = self.get_transport()
1644
1750
        if transport.is_readonly():
1645
 
            file('a', 'w').write('0123456789')
 
1751
            with file('a', 'w') as f: f.write('0123456789')
1646
1752
        else:
1647
1753
            transport.put_bytes('a', '01234567890')
1648
1754
 
1657
1763
        # also raise a special error
1658
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1659
1765
                              transport.readv, 'a', [(12,2)])
 
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEqual({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEqual({}, transport.get_segment_parameters())
 
1792
        self.assertEqual(orig_base, transport.base)
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)