~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-10 15:46:03 UTC
  • mfrom: (4985.3.21 update)
  • mto: This revision was merged to the branch mainline in revision 5021.
  • Revision ID: v.ladeuil+lp@free.fr-20100210154603-k4no1gvfuqpzrw7p
Update performs two merges in a more logical order but stop on conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
48
48
from bzrlib.smart import medium
49
49
from bzrlib.tests import (
50
50
    TestCaseInTempDir,
51
 
    TestScenarioApplier,
52
51
    TestSkipped,
53
52
    TestNotApplicable,
 
53
    multiply_tests,
54
54
    )
55
55
from bzrlib.tests.test_transport import TestTransportImplementation
56
56
from bzrlib.transport import (
61
61
from bzrlib.transport.memory import MemoryTransport
62
62
 
63
63
 
64
 
class TransportTestProviderAdapter(TestScenarioApplier):
65
 
    """A tool to generate a suite testing all transports for a single test.
66
 
 
67
 
    This is done by copying the test once for each transport and injecting
68
 
    the transport_class and transport_server classes into each copy. Each copy
69
 
    is also given a new id() to make it easy to identify.
70
 
    """
71
 
 
72
 
    def __init__(self):
73
 
        self.scenarios = self._test_permutations()
74
 
 
75
 
    def get_transport_test_permutations(self, module):
76
 
        """Get the permutations module wants to have tested."""
77
 
        if getattr(module, 'get_test_permutations', None) is None:
78
 
            raise AssertionError(
79
 
                "transport module %s doesn't provide get_test_permutations()"
80
 
                % module.__name__)
81
 
            return []
82
 
        return module.get_test_permutations()
83
 
 
84
 
    def _test_permutations(self):
85
 
        """Return a list of the klass, server_factory pairs to test."""
86
 
        result = []
87
 
        for module in _get_transport_modules():
88
 
            try:
89
 
                permutations = self.get_transport_test_permutations(
90
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
91
 
                for (klass, server_factory) in permutations:
92
 
                    scenario = (server_factory.__name__,
93
 
                        {"transport_class":klass,
94
 
                         "transport_server":server_factory})
95
 
                    result.append(scenario)
96
 
            except errors.DependencyNotPresent, e:
97
 
                # Continue even if a dependency prevents us 
98
 
                # from adding this test
99
 
                pass
100
 
        return result
 
64
def get_transport_test_permutations(module):
 
65
    """Get the permutations module wants to have tested."""
 
66
    if getattr(module, 'get_test_permutations', None) is None:
 
67
        raise AssertionError(
 
68
            "transport module %s doesn't provide get_test_permutations()"
 
69
            % module.__name__)
 
70
        return []
 
71
    return module.get_test_permutations()
 
72
 
 
73
 
 
74
def transport_test_permutations():
 
75
    """Return a list of the klass, server_factory pairs to test."""
 
76
    result = []
 
77
    for module in _get_transport_modules():
 
78
        try:
 
79
            permutations = get_transport_test_permutations(
 
80
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
81
            for (klass, server_factory) in permutations:
 
82
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
83
                    {"transport_class":klass,
 
84
                     "transport_server":server_factory})
 
85
                result.append(scenario)
 
86
        except errors.DependencyNotPresent, e:
 
87
            # Continue even if a dependency prevents us
 
88
            # from adding this test
 
89
            pass
 
90
    return result
101
91
 
102
92
 
103
93
def load_tests(standard_tests, module, loader):
104
94
    """Multiply tests for tranport implementations."""
105
95
    result = loader.suiteClass()
106
 
    adapter = TransportTestProviderAdapter()
107
 
    for test in tests.iter_suite_tests(standard_tests):
108
 
        result.addTests(adapter.adapt(test))
109
 
    return result
 
96
    scenarios = transport_test_permutations()
 
97
    return multiply_tests(standard_tests, scenarios, result)
110
98
 
111
99
 
112
100
class TransportTests(TestTransportImplementation):
167
155
        self.assertEqual(True, t.has('a'))
168
156
        self.assertEqual(False, t.has('c'))
169
157
        self.assertEqual(True, t.has(urlutils.escape('%')))
170
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
 
                [True, True, False, False, True, False, True, False])
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
159
                                           'e', 'f', 'g', 'h'])),
 
160
                         [True, True, False, False,
 
161
                          True, False, True, False])
172
162
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
 
                [True, True, False, False, True, False, True, False])
 
163
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
164
                                           urlutils.escape('%%')]))
 
165
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
166
                                                'e', 'f', 'g', 'h']))),
 
167
                         [True, True, False, False,
 
168
                          True, False, True, False])
176
169
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
178
171
 
210
203
        for content, f in itertools.izip(contents, content_f):
211
204
            self.assertEqual(content, f.read())
212
205
 
 
206
    def test_get_unknown_file(self):
 
207
        t = self.get_transport()
 
208
        files = ['a', 'b']
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
 
211
                    ]
 
212
        self.build_tree(files, transport=t, line_endings='binary')
213
213
        self.assertRaises(NoSuchFile, t.get, 'c')
214
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
249
249
        for content, fname in zip(contents, files):
250
250
            self.assertEqual(content, t.get_bytes(fname))
251
251
 
 
252
    def test_get_bytes_unknown_file(self):
 
253
        t = self.get_transport()
 
254
 
252
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
253
256
 
254
257
    def test_get_with_open_write_stream_sees_all_content(self):
325
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
326
329
                               create_parent_dir=True)
327
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
328
 
        
 
331
 
329
332
        # But we still get NoSuchFile if we can't make the parent dir
330
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
331
334
                                       'contents\n',
353
356
        umask = osutils.get_umask()
354
357
        t.put_bytes('nomode', 'test text\n', mode=None)
355
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
356
 
        
 
359
 
357
360
    def test_put_bytes_non_atomic_permissions(self):
358
361
        t = self.get_transport()
359
362
 
387
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
388
391
                               dir_mode=0777, create_parent_dir=True)
389
392
        self.assertTransportMode(t, 'dir777', 0777)
390
 
        
 
393
 
391
394
    def test_put_file(self):
392
395
        t = self.get_transport()
393
396
 
441
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
442
445
                              create_parent_dir=True)
443
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
444
 
        
 
447
 
445
448
        # But we still get NoSuchFile if we can't make the parent dir
446
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
447
450
                                       StringIO('contents\n'),
469
472
        umask = osutils.get_umask()
470
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
471
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
472
 
        
 
475
 
473
476
    def test_put_file_non_atomic_permissions(self):
474
477
        t = self.get_transport()
475
478
 
492
495
        umask = osutils.get_umask()
493
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
494
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
495
 
        
 
498
 
496
499
        # We should also be able to set the mode for a parent directory
497
500
        # when it is created
498
501
        sio = StringIO()
538
541
        t = self.get_transport()
539
542
 
540
543
        if t.is_readonly():
541
 
            # cannot mkdir on readonly transports. We're not testing for 
 
544
            # cannot mkdir on readonly transports. We're not testing for
542
545
            # cache coherency because cache behaviour is not currently
543
546
            # defined for the transport interface.
544
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
565
568
 
566
569
        # we were testing that a local mkdir followed by a transport
567
570
        # mkdir failed thusly, but given that we * in one process * do not
568
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
571
        # concurrently fiddle with disk dirs and then use transport to do
569
572
        # things, the win here seems marginal compared to the constraint on
570
573
        # the interface. RBC 20051227
571
574
        t.mkdir('dir_g')
681
684
            for f in files:
682
685
                self.assertTransportMode(temp_transport, f, mode)
683
686
 
 
687
    def test_create_prefix(self):
 
688
        t = self.get_transport()
 
689
        sub = t.clone('foo').clone('bar')
 
690
        try:
 
691
            sub.create_prefix()
 
692
        except TransportNotPossible:
 
693
            self.assertTrue(t.is_readonly())
 
694
        else:
 
695
            self.assertTrue(t.has('foo/bar'))
 
696
 
684
697
    def test_append_file(self):
685
698
        t = self.get_transport()
686
699
 
790
803
                t.append_file, 'f', StringIO('f'), mode=None)
791
804
            return
792
805
        t.append_file('f', StringIO('f'), mode=None)
793
 
        
 
806
 
794
807
    def test_append_bytes_mode(self):
795
808
        # check append_bytes accepts a mode
796
809
        t = self.get_transport()
799
812
                t.append_bytes, 'f', 'f', mode=None)
800
813
            return
801
814
        t.append_bytes('f', 'f', mode=None)
802
 
        
 
815
 
803
816
    def test_delete(self):
804
817
        # TODO: Test Transport.delete
805
818
        t = self.get_transport()
866
879
 
867
880
    def test_rmdir_not_empty(self):
868
881
        """Deleting a non-empty directory raises an exception
869
 
        
 
882
 
870
883
        sftp (and possibly others) don't give us a specific "directory not
871
884
        empty" exception -- we can just see that the operation failed.
872
885
        """
879
892
 
880
893
    def test_rmdir_empty_but_similar_prefix(self):
881
894
        """rmdir does not get confused by sibling paths.
882
 
        
 
895
 
883
896
        A naive implementation of MemoryTransport would refuse to rmdir
884
897
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
885
898
        uses "path.startswith(dir)" on all file paths to determine if directory
1021
1034
 
1022
1035
    def test_connection_error(self):
1023
1036
        """ConnectionError is raised when connection is impossible.
1024
 
        
 
1037
 
1025
1038
        The error should be raised from the first operation on the transport.
1026
1039
        """
1027
1040
        try:
1115
1128
 
1116
1129
        self.assertListRaises(PathError, t.list_dir, 'q')
1117
1130
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1131
        # 'a' is a file, list_dir should raise an error
1118
1132
        self.assertListRaises(PathError, t.list_dir, 'a')
1119
1133
 
1120
1134
    def test_list_dir_result_is_url_escaped(self):
1483
1497
                 u'\u65e5', # Kanji person
1484
1498
                ]
1485
1499
 
 
1500
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1501
        if no_unicode_support:
 
1502
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1503
 
1486
1504
        try:
1487
1505
            self.build_tree(files, transport=t, line_endings='binary')
1488
1506
        except UnicodeError:
1506
1524
        transport.put_bytes('foo', 'bar')
1507
1525
        transport3 = self.get_transport()
1508
1526
        self.check_transport_contents('bar', transport3, 'foo')
1509
 
        # its base should be usable.
1510
 
        transport4 = get_transport(transport.base)
1511
 
        self.check_transport_contents('bar', transport4, 'foo')
1512
1527
 
1513
1528
        # now opening at a relative url should give use a sane result:
1514
1529
        transport.mkdir('newdir')
1515
 
        transport5 = get_transport(transport.base + "newdir")
 
1530
        transport5 = self.get_transport('newdir')
1516
1531
        transport6 = transport5.clone('..')
1517
1532
        self.check_transport_contents('bar', transport6, 'foo')
1518
1533