~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testsftp.py

  • Committer: John Arbash Meinel
  • Date: 2005-11-20 17:00:43 UTC
  • mto: (1185.50.1 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1518.
  • Revision ID: john@arbash-meinel.com-20051120170043-52cdb7aefe86140e
Use a weakref dictionary to enable re-use of a connection (for sftp).

Show diffs side-by-side

added added

removed removed

Lines of Context:
68
68
    
69
69
    def stop(self):
70
70
        self.stop_event.set()
 
71
        # We should consider waiting for the other thread
 
72
        # to stop, because otherwise we get spurious
 
73
        #   bzr: ERROR: Socket exception: Connection reset by peer (54)
 
74
        # because the test suite finishes before the thread has a chance
 
75
        # to close. (Especially when only running a few tests)
71
76
        
72
77
        
73
78
class TestCaseWithSFTPServer (TestCaseInTempDir):
82
87
        file(key_file, 'w').write(STUB_SERVER_KEY)
83
88
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
84
89
        ssh_server.add_server_key(host_key)
85
 
        server = StubServer()
 
90
        server = StubServer(self)
86
91
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer, root=self._root)
87
92
        event = threading.Event()
88
93
        ssh_server.start_server(event, server)
114
119
    readonly = False
115
120
    setup = True
116
121
 
 
122
    def setUp(self):
 
123
        TestCaseWithSFTPServer.setUp(self)
 
124
        self.sftplogs = []
 
125
 
 
126
    def log(self, *args):
 
127
        """Override the default log to grab sftp server messages"""
 
128
        TestCaseWithSFTPServer.log(self, *args)
 
129
        if args and args[0].startswith('sftpserver'):
 
130
            self.sftplogs.append(args[0])
 
131
 
117
132
    def get_transport(self):
118
133
        if self.setup:
119
134
            self.delayed_setup()
147
162
        l.unlock()
148
163
        l2.unlock()
149
164
 
 
165
    def test_multiple_connections(self):
 
166
        t = self.get_transport()
 
167
        self.assertEquals(self.sftplogs, 
 
168
                ['sftpserver - authorizing: foo'
 
169
               , 'sftpserver - channel request: session, 1'])
 
170
        self.sftplogs = []
 
171
        # The second request should reuse the first connection
 
172
        # SingleListener only allows for a single connection,
 
173
        # So the next line fails unless the connection is reused
 
174
        t2 = self.get_transport()
 
175
        self.assertEquals(self.sftplogs, [])
 
176
 
150
177
 
151
178
class FakeSFTPTransport (object):
152
179
    _sftp = object()