~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/stub_sftp.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-23 09:00:18 UTC
  • mfrom: (5439.1.1 merge-2.2-into-devel)
  • Revision ID: pqm@pqm.ubuntu.com-20100923090018-01aj4ifyzaps50bo
(spiv) Merge lp:bzr/2.2. (Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
import paramiko
24
24
import select
25
25
import socket
 
26
import SocketServer
26
27
import sys
27
28
import threading
28
29
import time
38
39
from bzrlib.tests import test_server
39
40
 
40
41
 
41
 
class StubServer (paramiko.ServerInterface):
 
42
class StubServer(paramiko.ServerInterface):
42
43
 
43
 
    def __init__(self, test_case):
 
44
    def __init__(self, test_case_server):
44
45
        paramiko.ServerInterface.__init__(self)
45
 
        self._test_case = test_case
 
46
        self.log = test_case_server.log
46
47
 
47
48
    def check_auth_password(self, username, password):
48
49
        # all are allowed
49
 
        self._test_case.log('sftpserver - authorizing: %s' % (username,))
 
50
        self.log('sftpserver - authorizing: %s' % (username,))
50
51
        return paramiko.AUTH_SUCCESSFUL
51
52
 
52
53
    def check_channel_request(self, kind, chanid):
53
 
        self._test_case.log(
54
 
            'sftpserver - channel request: %s, %s' % (kind, chanid))
 
54
        self.log('sftpserver - channel request: %s, %s' % (kind, chanid))
55
55
        return paramiko.OPEN_SUCCEEDED
56
56
 
57
57
 
58
 
class StubSFTPHandle (paramiko.SFTPHandle):
 
58
class StubSFTPHandle(paramiko.SFTPHandle):
 
59
 
59
60
    def stat(self):
60
61
        try:
61
62
            return paramiko.SFTPAttributes.from_stat(
73
74
            return paramiko.SFTPServer.convert_errno(e.errno)
74
75
 
75
76
 
76
 
class StubSFTPServer (paramiko.SFTPServerInterface):
 
77
class StubSFTPServer(paramiko.SFTPServerInterface):
77
78
 
78
79
    def __init__(self, server, root, home=None):
79
80
        paramiko.SFTPServerInterface.__init__(self, server)
90
91
            self.home = home[len(self.root):]
91
92
        if self.home.startswith('/'):
92
93
            self.home = self.home[1:]
93
 
        server._test_case.log('sftpserver - new connection')
 
94
        server.log('sftpserver - new connection')
94
95
 
95
96
    def _realpath(self, path):
96
97
        # paths returned from self.canonicalize() always start with
241
242
    # removed: chattr, symlink, readlink
242
243
    # (nothing in bzr's sftp transport uses those)
243
244
 
 
245
 
244
246
# ------------- server test implementation --------------
245
247
 
246
248
STUB_SERVER_KEY = """
262
264
"""
263
265
 
264
266
 
265
 
class SocketListener(threading.Thread):
266
 
 
267
 
    def __init__(self, callback):
268
 
        threading.Thread.__init__(self)
269
 
        self._callback = callback
270
 
        self._socket = socket.socket()
271
 
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
272
 
        self._socket.bind(('localhost', 0))
273
 
        self._socket.listen(1)
274
 
        self.host, self.port = self._socket.getsockname()[:2]
275
 
        self._stop_event = threading.Event()
276
 
 
277
 
    def stop(self):
278
 
        # called from outside this thread
279
 
        self._stop_event.set()
280
 
        # use a timeout here, because if the test fails, the server thread may
281
 
        # never notice the stop_event.
282
 
        self.join(5.0)
283
 
        self._socket.close()
284
 
 
285
 
    def run(self):
286
 
        trace.mutter('SocketListener %r has started', self)
287
 
        while True:
288
 
            readable, writable_unused, exception_unused = \
289
 
                select.select([self._socket], [], [], 0.1)
290
 
            if self._stop_event.isSet():
291
 
                trace.mutter('SocketListener %r has stopped', self)
292
 
                return
293
 
            if len(readable) == 0:
294
 
                continue
295
 
            try:
296
 
                s, addr_unused = self._socket.accept()
297
 
                trace.mutter('SocketListener %r has accepted connection %r',
298
 
                    self, s)
299
 
                # because the loopback socket is inline, and transports are
300
 
                # never explicitly closed, best to launch a new thread.
301
 
                threading.Thread(target=self._callback, args=(s,)).start()
302
 
            except socket.error, x:
303
 
                sys.excepthook(*sys.exc_info())
304
 
                trace.warning('Socket error during accept() '
305
 
                              'within unit test server thread: %r' % x)
306
 
            except Exception, x:
307
 
                # probably a failed test; unit test thread will log the
308
 
                # failure/error
309
 
                sys.excepthook(*sys.exc_info())
310
 
                trace.warning(
311
 
                    'Exception from within unit test server thread: %r' % x)
312
 
 
313
 
 
314
267
class SocketDelay(object):
315
268
    """A socket decorator to make TCP appear slower.
316
269
 
386
339
        return bytes_sent
387
340
 
388
341
 
389
 
class SFTPServer(test_server.TestServer):
 
342
class TestingSFTPConnectionHandler(SocketServer.BaseRequestHandler):
 
343
 
 
344
    def setup(self):
 
345
        self.wrap_for_latency()
 
346
        tcs = self.server.test_case_server
 
347
        ptrans = paramiko.Transport(self.request)
 
348
        self.paramiko_transport = ptrans
 
349
        # Set it to a channel under 'bzr' so that we get debug info
 
350
        ptrans.set_log_channel('bzr.paramiko.transport')
 
351
        ptrans.add_server_key(tcs.get_host_key())
 
352
        ptrans.set_subsystem_handler('sftp', paramiko.SFTPServer,
 
353
                                     StubSFTPServer, root=tcs._root,
 
354
                                     home=tcs._server_homedir)
 
355
        server = tcs._server_interface(tcs)
 
356
        # This blocks until the key exchange has been done
 
357
        ptrans.start_server(None, server)
 
358
 
 
359
    def finish(self):
 
360
        # Wait for the conversation to finish, when the paramiko.Transport
 
361
        # thread finishes
 
362
        # TODO: Consider timing out after XX seconds rather than hanging.
 
363
        #       Also we could check paramiko_transport.active and possibly
 
364
        #       paramiko_transport.getException().
 
365
        self.paramiko_transport.join()
 
366
 
 
367
    def wrap_for_latency(self):
 
368
        tcs = self.server.test_case_server
 
369
        if tcs.add_latency:
 
370
            # Give the socket (which the request really is) a latency adding
 
371
            # decorator.
 
372
            self.request = SocketDelay(self.request, tcs.add_latency)
 
373
 
 
374
 
 
375
class TestingSFTPWithoutSSHConnectionHandler(TestingSFTPConnectionHandler):
 
376
 
 
377
    def setup(self):
 
378
        self.wrap_for_latency()
 
379
        # Re-import these as locals, so that they're still accessible during
 
380
        # interpreter shutdown (when all module globals get set to None, leading
 
381
        # to confusing errors like "'NoneType' object has no attribute 'error'".
 
382
        class FakeChannel(object):
 
383
            def get_transport(self):
 
384
                return self
 
385
            def get_log_channel(self):
 
386
                return 'bzr.paramiko'
 
387
            def get_name(self):
 
388
                return '1'
 
389
            def get_hexdump(self):
 
390
                return False
 
391
            def close(self):
 
392
                pass
 
393
 
 
394
        tcs = self.server.test_case_server
 
395
        sftp_server = paramiko.SFTPServer(
 
396
            FakeChannel(), 'sftp', StubServer(tcs), StubSFTPServer,
 
397
            root=tcs._root, home=tcs._server_homedir)
 
398
        self.sftp_server = sftp_server
 
399
        sys_stderr = sys.stderr # Used in error reporting during shutdown
 
400
        try:
 
401
            sftp_server.start_subsystem(
 
402
                'sftp', None, ssh.SocketAsChannelAdapter(self.request))
 
403
        except socket.error, e:
 
404
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
 
405
                # it's okay for the client to disconnect abruptly
 
406
                # (bug in paramiko 1.6: it should absorb this exception)
 
407
                pass
 
408
            else:
 
409
                raise
 
410
        except Exception, e:
 
411
            # This typically seems to happen during interpreter shutdown, so
 
412
            # most of the useful ways to report this error won't work.
 
413
            # Writing the exception type, and then the text of the exception,
 
414
            # seems to be the best we can do.
 
415
            # FIXME: All interpreter shutdown errors should have been related
 
416
            # to daemon threads, cleanup needed -- vila 20100623
 
417
            sys_stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
 
418
            sys_stderr.write('%s\n\n' % (e,))
 
419
 
 
420
    def finish(self):
 
421
        self.sftp_server.finish_subsystem()
 
422
 
 
423
 
 
424
class TestingSFTPServer(test_server.TestingThreadingTCPServer):
 
425
 
 
426
    def __init__(self, server_address, request_handler_class, test_case_server):
 
427
        test_server.TestingThreadingTCPServer.__init__(
 
428
            self, server_address, request_handler_class)
 
429
        self.test_case_server = test_case_server
 
430
 
 
431
 
 
432
class SFTPServer(test_server.TestingTCPServerInAThread):
390
433
    """Common code for SFTP server facilities."""
391
434
 
392
435
    def __init__(self, server_interface=StubServer):
 
436
        self.host = '127.0.0.1'
 
437
        self.port = 0
 
438
        super(SFTPServer, self).__init__((self.host, self.port),
 
439
                                         TestingSFTPServer,
 
440
                                         TestingSFTPConnectionHandler)
393
441
        self._original_vendor = None
394
 
        self._homedir = None
395
 
        self._server_homedir = None
396
 
        self._listener = None
397
 
        self._root = None
398
442
        self._vendor = ssh.ParamikoVendor()
399
443
        self._server_interface = server_interface
400
 
        # sftp server logs
 
444
        self._host_key = None
401
445
        self.logs = []
402
446
        self.add_latency = 0
 
447
        self._homedir = None
 
448
        self._server_homedir = None
 
449
        self._root = None
403
450
 
404
451
    def _get_sftp_url(self, path):
405
452
        """Calculate an sftp url to this server for path."""
406
 
        return 'sftp://foo:bar@%s:%d/%s' % (self._listener.host,
407
 
                                            self._listener.port, path)
 
453
        return "sftp://foo:bar@%s:%s/%s" % (self.host, self.port, path)
408
454
 
409
455
    def log(self, message):
410
456
        """StubServer uses this to log when a new server is created."""
411
457
        self.logs.append(message)
412
458
 
413
 
    def _run_server_entry(self, sock):
414
 
        """Entry point for all implementations of _run_server.
415
 
 
416
 
        If self.add_latency is > 0.000001 then sock is given a latency adding
417
 
        decorator.
418
 
        """
419
 
        if self.add_latency > 0.000001:
420
 
            sock = SocketDelay(sock, self.add_latency)
421
 
        return self._run_server(sock)
422
 
 
423
 
    def _run_server(self, s):
424
 
        ssh_server = paramiko.Transport(s)
425
 
        key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
426
 
        f = open(key_file, 'w')
427
 
        f.write(STUB_SERVER_KEY)
428
 
        f.close()
429
 
        host_key = paramiko.RSAKey.from_private_key_file(key_file)
430
 
        ssh_server.add_server_key(host_key)
431
 
        server = self._server_interface(self)
432
 
        ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
433
 
                                         StubSFTPServer, root=self._root,
434
 
                                         home=self._server_homedir)
435
 
        event = threading.Event()
436
 
        ssh_server.start_server(event, server)
437
 
        event.wait(5.0)
 
459
    def create_server(self):
 
460
        server = self.server_class((self.host, self.port),
 
461
                                   self.request_handler_class,
 
462
                                   self)
 
463
        return server
 
464
 
 
465
    def get_host_key(self):
 
466
        if self._host_key is None:
 
467
            key_file = osutils.pathjoin(self._homedir, 'test_rsa.key')
 
468
            f = open(key_file, 'w')
 
469
            try:
 
470
                f.write(STUB_SERVER_KEY)
 
471
            finally:
 
472
                f.close()
 
473
            self._host_key = paramiko.RSAKey.from_private_key_file(key_file)
 
474
        return self._host_key
438
475
 
439
476
    def start_server(self, backing_server=None):
440
477
        # XXX: TODO: make sftpserver back onto backing_server rather than local
459
496
        self._root = '/'
460
497
        if sys.platform == 'win32':
461
498
            self._root = ''
462
 
        self._listener = SocketListener(self._run_server_entry)
463
 
        self._listener.setDaemon(True)
464
 
        self._listener.start()
 
499
        super(SFTPServer, self).start_server()
465
500
 
466
501
    def stop_server(self):
467
 
        self._listener.stop()
468
 
        ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
 
502
        try:
 
503
            super(SFTPServer, self).stop_server()
 
504
        finally:
 
505
            ssh._ssh_vendor_manager._cached_ssh_vendor = self._original_vendor
469
506
 
470
507
    def get_bogus_url(self):
471
508
        """See bzrlib.transport.Server.get_bogus_url."""
472
 
        # this is chosen to try to prevent trouble with proxies, wierd dns, etc
 
509
        # this is chosen to try to prevent trouble with proxies, weird dns, etc
473
510
        # we bind a random socket, so that we get a guaranteed unused port
474
511
        # we just never listen on that port
475
512
        s = socket.socket()
495
532
    def __init__(self):
496
533
        super(SFTPServerWithoutSSH, self).__init__()
497
534
        self._vendor = ssh.LoopbackVendor()
498
 
 
499
 
    def _run_server(self, sock):
500
 
        # Re-import these as locals, so that they're still accessible during
501
 
        # interpreter shutdown (when all module globals get set to None, leading
502
 
        # to confusing errors like "'NoneType' object has no attribute 'error'".
503
 
        class FakeChannel(object):
504
 
            def get_transport(self):
505
 
                return self
506
 
            def get_log_channel(self):
507
 
                return 'paramiko'
508
 
            def get_name(self):
509
 
                return '1'
510
 
            def get_hexdump(self):
511
 
                return False
512
 
            def close(self):
513
 
                pass
514
 
 
515
 
        server = paramiko.SFTPServer(
516
 
            FakeChannel(), 'sftp', StubServer(self), StubSFTPServer,
517
 
            root=self._root, home=self._server_homedir)
518
 
        try:
519
 
            server.start_subsystem(
520
 
                'sftp', None, ssh.SocketAsChannelAdapter(sock))
521
 
        except socket.error, e:
522
 
            if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
523
 
                # it's okay for the client to disconnect abruptly
524
 
                # (bug in paramiko 1.6: it should absorb this exception)
525
 
                pass
526
 
            else:
527
 
                raise
528
 
        except Exception, e:
529
 
            # This typically seems to happen during interpreter shutdown, so
530
 
            # most of the useful ways to report this error are won't work.
531
 
            # Writing the exception type, and then the text of the exception,
532
 
            # seems to be the best we can do.
533
 
            import sys
534
 
            sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
535
 
            sys.stderr.write('%s\n\n' % (e,))
536
 
        server.finish_subsystem()
 
535
        self.request_handler_class = TestingSFTPWithoutSSHConnectionHandler
 
536
 
 
537
    def get_host_key():
 
538
        return None
537
539
 
538
540
 
539
541
class SFTPAbsoluteServer(SFTPServerWithoutSSH):
562
564
    It does this by serving from a deeply-nested directory that doesn't exist.
563
565
    """
564
566
 
565
 
    def start_server(self, backing_server=None):
566
 
        self._server_homedir = '/dev/noone/runs/tests/here'
567
 
        super(SFTPSiblingAbsoluteServer, self).start_server(backing_server)
 
567
    def create_server(self):
 
568
        # FIXME: Can't we do that in a cleaner way ? -- vila 20100623
 
569
        server = super(SFTPSiblingAbsoluteServer, self).create_server()
 
570
        server._server_homedir = '/dev/noone/runs/tests/here'
 
571
        return server
568
572