~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-09 17:27:46 UTC
  • mto: (5029.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5030.
  • Revision ID: v.ladeuil+lp@free.fr-20100209172746-6f4mvlnr2mac807j
Move NoSmartTransportServer to bzrlib.tests.test_server

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
52
52
    TestNotApplicable,
53
53
    multiply_tests,
54
54
    )
55
 
from bzrlib.tests import test_server
56
55
from bzrlib.tests.test_transport import TestTransportImplementation
57
56
from bzrlib.transport import (
58
57
    ConnectedTransport,
171
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
171
 
173
172
    def test_has_root_works(self):
174
 
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
        from bzrlib.smart import server
 
174
        if self.transport_server is server.SmartTCPServer_for_testing:
175
175
            raise TestNotApplicable(
176
176
                "SmartTCPServer_for_testing intentionally does not allow "
177
177
                "access to /.")
251
251
 
252
252
    def test_get_bytes_unknown_file(self):
253
253
        t = self.get_transport()
 
254
 
254
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
255
256
 
256
257
    def test_get_with_open_write_stream_sees_all_content(self):
1082
1083
        subdir.stat('./file')
1083
1084
        subdir.stat('.')
1084
1085
 
1085
 
    def test_hardlink(self):
1086
 
        from stat import ST_NLINK
1087
 
 
1088
 
        t = self.get_transport()
1089
 
 
1090
 
        source_name = "original_target"
1091
 
        link_name = "target_link"
1092
 
 
1093
 
        self.build_tree([source_name], transport=t)
1094
 
 
1095
 
        try:
1096
 
            t.hardlink(source_name, link_name)
1097
 
 
1098
 
            self.failUnless(t.has(source_name))
1099
 
            self.failUnless(t.has(link_name))
1100
 
 
1101
 
            st = t.stat(link_name)
1102
 
            self.failUnlessEqual(st[ST_NLINK], 2)
1103
 
        except TransportNotPossible:
1104
 
            raise TestSkipped("Transport %s does not support hardlinks." %
1105
 
                              self._server.__class__)
1106
 
 
1107
 
    def test_symlink(self):
1108
 
        from stat import S_ISLNK
1109
 
 
1110
 
        t = self.get_transport()
1111
 
 
1112
 
        source_name = "original_target"
1113
 
        link_name = "target_link"
1114
 
 
1115
 
        self.build_tree([source_name], transport=t)
1116
 
 
1117
 
        try:
1118
 
            t.symlink(source_name, link_name)
1119
 
 
1120
 
            self.failUnless(t.has(source_name))
1121
 
            self.failUnless(t.has(link_name))
1122
 
 
1123
 
            st = t.stat(link_name)
1124
 
            self.failUnless(S_ISLNK(st.st_mode),
1125
 
                "expected symlink, got mode %o" % st.st_mode)
1126
 
        except TransportNotPossible:
1127
 
            raise TestSkipped("Transport %s does not support symlinks." %
1128
 
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1131
 
 
1132
1086
    def test_list_dir(self):
1133
1087
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1134
1088
        t = self.get_transport()
1265
1219
        self.assertIs(t._get_connection(), c._get_connection())
1266
1220
 
1267
1221
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = None
 
1222
        new_connection = object()
1269
1223
        t._set_connection(new_connection)
1270
1224
        # Check that both transports use the same connection
1271
1225
        self.assertIs(new_connection, t._get_connection())
1765
1719
        # also raise a special error
1766
1720
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1721
                              transport.readv, 'a', [(12,2)])
1768
 
 
1769
 
    def test_stat_symlink(self):
1770
 
        # if a transport points directly to a symlink (and supports symlinks
1771
 
        # at all) you can tell this.  helps with bug 32669.
1772
 
        t = self.get_transport()
1773
 
        try:
1774
 
            t.symlink('target', 'link')
1775
 
        except TransportNotPossible:
1776
 
            raise TestSkipped("symlinks not supported")
1777
 
        t2 = t.clone('link')
1778
 
        st = t2.stat('')
1779
 
        self.assertTrue(stat.S_ISLNK(st.st_mode))