~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

  • Committer: Robert Collins
  • Date: 2007-07-04 08:08:13 UTC
  • mfrom: (2572 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2587.
  • Revision ID: robertc@robertcollins.net-20070704080813-wzebx0r88fvwj5rq
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
22
22
 
23
23
import os
24
24
from cStringIO import StringIO
 
25
from StringIO import StringIO as pyStringIO
25
26
import stat
26
27
import sys
 
28
import unittest
27
29
 
28
30
from bzrlib import (
29
31
    errors,
30
32
    osutils,
31
33
    urlutils,
32
34
    )
33
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
 
                           LockError, NoSmartServer, PathError,
35
 
                           TransportNotPossible, ConnectionError,
36
 
                           InvalidURL)
 
35
from bzrlib.errors import (ConnectionError,
 
36
                           DirectoryNotEmpty,
 
37
                           FileExists,
 
38
                           InvalidURL,
 
39
                           LockError,
 
40
                           NoSmartServer,
 
41
                           NoSuchFile,
 
42
                           NotLocalUrl,
 
43
                           PathError,
 
44
                           TransportNotPossible,
 
45
                           )
37
46
from bzrlib.osutils import getcwd
 
47
from bzrlib.smart import medium
38
48
from bzrlib.symbol_versioning import zero_eleven
39
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
 
49
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
40
50
from bzrlib.tests.test_transport import TestTransportImplementation
41
 
from bzrlib.transport import memory, smart
 
51
from bzrlib.transport import memory, remote, _get_transport_modules
42
52
import bzrlib.transport
43
53
 
44
54
 
45
 
def _append(fn, txt):
46
 
    """Append the given text (file-like object) to the supplied filename."""
47
 
    f = open(fn, 'ab')
48
 
    try:
49
 
        f.write(txt.read())
50
 
    finally:
51
 
        f.close()
 
55
class TransportTestProviderAdapter(TestScenarioApplier):
 
56
    """A tool to generate a suite testing all transports for a single test.
 
57
 
 
58
    This is done by copying the test once for each transport and injecting
 
59
    the transport_class and transport_server classes into each copy. Each copy
 
60
    is also given a new id() to make it easy to identify.
 
61
    """
 
62
 
 
63
    def __init__(self):
 
64
        self.scenarios = self._test_permutations()
 
65
 
 
66
    def get_transport_test_permutations(self, module):
 
67
        """Get the permutations module wants to have tested."""
 
68
        if getattr(module, 'get_test_permutations', None) is None:
 
69
            raise AssertionError("transport module %s doesn't provide get_test_permutations()"
 
70
                    % module.__name__)
 
71
            ##warning("transport module %s doesn't provide get_test_permutations()"
 
72
            ##       % module.__name__)
 
73
            return []
 
74
        return module.get_test_permutations()
 
75
 
 
76
    def _test_permutations(self):
 
77
        """Return a list of the klass, server_factory pairs to test."""
 
78
        result = []
 
79
        for module in _get_transport_modules():
 
80
            try:
 
81
                permutations = self.get_transport_test_permutations(
 
82
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
 
83
                for (klass, server_factory) in permutations:
 
84
                    scenario = (server_factory.__name__,
 
85
                        {"transport_class":klass,
 
86
                         "transport_server":server_factory})
 
87
                    result.append(scenario)
 
88
            except errors.DependencyNotPresent, e:
 
89
                # Continue even if a dependency prevents us 
 
90
                # from running this test
 
91
                pass
 
92
        return result
 
93
 
52
94
 
53
95
 
54
96
class TransportTests(TestTransportImplementation):
55
97
 
 
98
    def setUp(self):
 
99
        super(TransportTests, self).setUp()
 
100
        self._captureVar('BZR_NO_SMART_VFS', None)
 
101
 
56
102
    def check_transport_contents(self, content, transport, relpath):
57
103
        """Check that transport.get(relpath).read() == content."""
58
104
        self.assertEqualDiff(content, transport.get(relpath).read())
59
105
 
60
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
61
 
        """Fail unless excClass is raised when the iterator from func is used.
62
 
        
63
 
        Many transport functions can return generators this makes sure
64
 
        to wrap them in a list() call to make sure the whole generator
65
 
        is run, and that the proper exception is raised.
66
 
        """
67
 
        try:
68
 
            list(func(*args, **kwargs))
69
 
        except excClass:
70
 
            return
71
 
        else:
72
 
            if getattr(excClass,'__name__', None) is not None:
73
 
                excName = excClass.__name__
74
 
            else:
75
 
                excName = str(excClass)
76
 
            raise self.failureException, "%s not raised" % excName
 
106
    def test_ensure_base_missing(self):
 
107
        """.ensure_base() should create the directory if it doesn't exist"""
 
108
        t = self.get_transport()
 
109
        t_a = t.clone('a')
 
110
        if t_a.is_readonly():
 
111
            self.assertRaises(TransportNotPossible,
 
112
                              t_a.ensure_base)
 
113
            return
 
114
        self.assertTrue(t_a.ensure_base())
 
115
        self.assertTrue(t.has('a'))
 
116
 
 
117
    def test_ensure_base_exists(self):
 
118
        """.ensure_base() should just be happy if it already exists"""
 
119
        t = self.get_transport()
 
120
        if t.is_readonly():
 
121
            return
 
122
 
 
123
        t.mkdir('a')
 
124
        t_a = t.clone('a')
 
125
        # ensure_base returns False if it didn't create the base
 
126
        self.assertFalse(t_a.ensure_base())
 
127
 
 
128
    def test_ensure_base_missing_parent(self):
 
129
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
130
        t = self.get_transport()
 
131
        if t.is_readonly():
 
132
            return
 
133
 
 
134
        t_a = t.clone('a')
 
135
        t_b = t_a.clone('b')
 
136
        self.assertRaises(NoSuchFile, t_b.ensure_base)
77
137
 
78
138
    def test_has(self):
79
139
        t = self.get_transport()
94
154
 
95
155
    def test_has_root_works(self):
96
156
        current_transport = self.get_transport()
97
 
        # import pdb;pdb.set_trace()
98
157
        self.assertTrue(current_transport.has('/'))
99
158
        root = current_transport.clone('/')
100
159
        self.assertTrue(root.has(''))
122
181
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
123
182
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
124
183
 
125
 
    def test_get_directory(self):
126
 
        """get() of a directory should only error when read() is called."""
 
184
    def test_get_directory_read_gives_ReadError(self):
 
185
        """consistent errors for read() on a file returned by get()."""
127
186
        t = self.get_transport()
128
187
        if t.is_readonly():
129
188
            self.build_tree(['a directory/'])
132
191
        # getting the file must either work or fail with a PathError
133
192
        try:
134
193
            a_file = t.get('a%20directory')
135
 
        except errors.PathError:
 
194
        except (errors.PathError, errors.RedirectRequested):
136
195
            # early failure return immediately.
137
196
            return
138
197
        # having got a file, read() must either work (i.e. http reading a dir listing) or
412
471
                              dir_mode=0777, create_parent_dir=True)
413
472
        self.assertTransportMode(t, 'dir777', 0777)
414
473
 
 
474
    def test_put_bytes_unicode(self):
 
475
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
476
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
477
        # (we don't want to encode unicode here at all, callers should be
 
478
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
479
        # compatibility.  At some point we should use a specific exception.
 
480
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
481
        t = self.get_transport()
 
482
        if t.is_readonly():
 
483
            return
 
484
        unicode_string = u'\u1234'
 
485
        self.assertRaises(
 
486
            (AssertionError, UnicodeEncodeError),
 
487
            t.put_bytes, 'foo', unicode_string)
 
488
 
 
489
    def test_put_file_unicode(self):
 
490
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
491
        # This situation can happen (and has) if code is careless about the type
 
492
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
493
        # cStringIO, because it never returns unicode from read.
 
494
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
495
        # raise, but we raise it for hysterical raisins.
 
496
        t = self.get_transport()
 
497
        if t.is_readonly():
 
498
            return
 
499
        unicode_file = pyStringIO(u'\u1234')
 
500
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
501
 
415
502
    def test_put_multi(self):
416
503
        t = self.get_transport()
417
504
 
796
883
        t.mkdir('adir/bdir')
797
884
        self.assertRaises(PathError, t.rmdir, 'adir')
798
885
 
 
886
    def test_rmdir_empty_but_similar_prefix(self):
 
887
        """rmdir does not get confused by sibling paths.
 
888
        
 
889
        A naive implementation of MemoryTransport would refuse to rmdir
 
890
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
891
        uses "path.startswith(dir)" on all file paths to determine if directory
 
892
        is empty.
 
893
        """
 
894
        t = self.get_transport()
 
895
        if t.is_readonly():
 
896
            return
 
897
        t.mkdir('foo')
 
898
        t.put_bytes('foo-bar', '')
 
899
        t.mkdir('foo-baz')
 
900
        t.rmdir('foo')
 
901
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
902
        self.failUnless(t.has('foo-bar'))
 
903
 
799
904
    def test_rename_dir_succeeds(self):
800
905
        t = self.get_transport()
801
906
        if t.is_readonly():
917
1022
        except NotImplementedError:
918
1023
            raise TestSkipped("Transport %s has no bogus URL support." %
919
1024
                              self._server.__class__)
 
1025
        # This should be:  but SSH still connects on construction. No COOKIE!
 
1026
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
920
1027
        try:
921
1028
            t = bzrlib.transport.get_transport(url)
922
1029
            t.get('.bzr/branch')
979
1086
            l.sort()
980
1087
            return l
981
1088
 
982
 
        # SftpServer creates control files in the working directory
983
 
        # so lets move down a directory to avoid those.
984
 
        if not t.is_readonly():
985
 
            t.mkdir('wd')
986
 
        else:
987
 
            os.mkdir('wd')
988
 
        t = t.clone('wd')
989
 
 
990
1089
        self.assertEqual([], sorted_list('.'))
991
1090
        # c2 is precisely one letter longer than c here to test that
992
1091
        # suffixing is not confused.
993
1092
        # a%25b checks that quoting is done consistently across transports
994
1093
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1094
 
995
1095
        if not t.is_readonly():
996
1096
            self.build_tree(tree_names, transport=t)
997
1097
        else:
998
 
            self.build_tree(['wd/' + name for name in tree_names])
 
1098
            self.build_tree(tree_names)
999
1099
 
1000
1100
        self.assertEqual(
1001
1101
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
1005
1105
            t.delete('c/d')
1006
1106
            t.delete('b')
1007
1107
        else:
1008
 
            os.unlink('wd/c/d')
1009
 
            os.unlink('wd/b')
 
1108
            os.unlink('c/d')
 
1109
            os.unlink('b')
1010
1110
            
1011
1111
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
1012
1112
        self.assertEqual(['e'], sorted_list('c'))
1068
1168
        # directory of this transport
1069
1169
        root_transport = orig_transport
1070
1170
        new_transport = root_transport.clone("..")
1071
 
        # as we are walking up directories, the path must be must be 
 
1171
        # as we are walking up directories, the path must be
1072
1172
        # growing less, except at the top
1073
1173
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1074
1174
            or new_transport.base == root_transport.base)
1075
1175
        while new_transport.base != root_transport.base:
1076
1176
            root_transport = new_transport
1077
1177
            new_transport = root_transport.clone("..")
1078
 
            # as we are walking up directories, the path must be must be 
 
1178
            # as we are walking up directories, the path must be
1079
1179
            # growing less, except at the top
1080
1180
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1081
1181
                or new_transport.base == root_transport.base)
1137
1237
        # the abspath of "/" and "/foo/.." should result in the same location
1138
1238
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1139
1239
 
 
1240
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1241
                         transport.abspath("/foo"))
 
1242
 
1140
1243
    def test_local_abspath(self):
1141
1244
        transport = self.get_transport()
1142
1245
        try:
1143
1246
            p = transport.local_abspath('.')
1144
 
        except TransportNotPossible:
1145
 
            pass # This is not a local transport
 
1247
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1248
            # should be formattable
 
1249
            s = str(e)
1146
1250
        else:
1147
1251
            self.assertEqual(getcwd(), p)
1148
1252
 
1310
1414
        else:
1311
1415
            transport.put_bytes('a', '0123456789')
1312
1416
 
 
1417
        d = list(transport.readv('a', ((0, 1),)))
 
1418
        self.assertEqual(d[0], (0, '0'))
 
1419
 
1313
1420
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1314
1421
        self.assertEqual(d[0], (0, '0'))
1315
1422
        self.assertEqual(d[1], (1, '1'))
1329
1436
        self.assertEqual(d[2], (0, '0'))
1330
1437
        self.assertEqual(d[3], (3, '34'))
1331
1438
 
1332
 
    def test_get_smart_client(self):
1333
 
        """All transports must either give a smart client, or know they can't.
1334
 
 
1335
 
        For some transports such as http this might depend on probing to see 
1336
 
        what's actually present on the other end.  (But we can adjust for that 
1337
 
        in the future.)
 
1439
    def test_get_smart_medium(self):
 
1440
        """All transports must either give a smart medium, or know they can't.
1338
1441
        """
1339
1442
        transport = self.get_transport()
1340
1443
        try:
1341
 
            client = transport.get_smart_client()
1342
 
            # XXX: should be a more general class
1343
 
            self.assertIsInstance(client, smart.SmartStreamClient)
1344
 
        except NoSmartServer:
 
1444
            client_medium = transport.get_smart_medium()
 
1445
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1446
        except errors.NoSmartMedium:
1345
1447
            # as long as we got it we're fine
1346
1448
            pass
1347
1449
 
1354
1456
 
1355
1457
        # This is intentionally reading off the end of the file
1356
1458
        # since we are sure that it cannot get there
1357
 
        self.assertListRaises((errors.ShortReadvError, AssertionError),
 
1459
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1460
                               # Can be raised by paramiko
 
1461
                               AssertionError),
1358
1462
                              transport.readv, 'a', [(1,1), (8,10)])
1359
1463
 
1360
1464
        # This is trying to seek past the end of the file, it should
1361
1465
        # also raise a special error
1362
 
        self.assertListRaises(errors.ShortReadvError,
 
1466
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1363
1467
                              transport.readv, 'a', [(12,2)])