~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

(jameinel) Allow 'bzr serve' to interpret SIGHUP as a graceful shutdown.
 (bug #795025) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
 
27
import errno
27
28
import os
28
29
import sys
 
30
import time
29
31
import urllib
30
32
 
 
33
import bzrlib
31
34
from bzrlib.lazy_import import lazy_import
32
35
lazy_import(globals(), """
33
 
import atexit
 
36
import select
34
37
import socket
35
38
import thread
36
39
import weakref
38
41
from bzrlib import (
39
42
    debug,
40
43
    errors,
41
 
    symbol_versioning,
42
44
    trace,
43
45
    ui,
44
46
    urlutils,
45
47
    )
46
 
from bzrlib.smart import client, protocol, request, vfs
 
48
from bzrlib.i18n import gettext
 
49
from bzrlib.smart import client, protocol, request, signals, vfs
47
50
from bzrlib.transport import ssh
48
51
""")
49
52
from bzrlib import osutils
176
179
        ui.ui_factory.report_transport_activity(self, bytes, direction)
177
180
 
178
181
 
 
182
_bad_file_descriptor = (errno.EBADF,)
 
183
if sys.platform == 'win32':
 
184
    # Given on Windows if you pass a closed socket to select.select. Probably
 
185
    # also given if you pass a file handle to select.
 
186
    WSAENOTSOCK = 10038
 
187
    _bad_file_descriptor += (WSAENOTSOCK,)
 
188
 
 
189
 
179
190
class SmartServerStreamMedium(SmartMedium):
180
191
    """Handles smart commands coming over a stream.
181
192
 
194
205
        the stream.  See also the _push_back method.
195
206
    """
196
207
 
197
 
    def __init__(self, backing_transport, root_client_path='/'):
 
208
    _timer = time.time
 
209
 
 
210
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
198
211
        """Construct new server.
199
212
 
200
213
        :param backing_transport: Transport for the directory served.
203
216
        self.backing_transport = backing_transport
204
217
        self.root_client_path = root_client_path
205
218
        self.finished = False
 
219
        if timeout is None:
 
220
            raise AssertionError('You must supply a timeout.')
 
221
        self._client_timeout = timeout
 
222
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
206
223
        SmartMedium.__init__(self)
207
224
 
208
225
    def serve(self):
213
230
        try:
214
231
            while not self.finished:
215
232
                server_protocol = self._build_protocol()
 
233
                # TODO: This seems inelegant:
 
234
                if server_protocol is None:
 
235
                    # We could 'continue' only to notice that self.finished is
 
236
                    # True...
 
237
                    break
216
238
                self._serve_one_request(server_protocol)
 
239
        except errors.ConnectionTimeout, e:
 
240
            trace.note('%s' % (e,))
 
241
            trace.log_exception_quietly()
 
242
            self._disconnect_client()
 
243
            # We reported it, no reason to make a big fuss.
 
244
            return
217
245
        except Exception, e:
218
246
            stderr.write("%s terminating on exception %s\n" % (self, e))
219
247
            raise
 
248
        self._disconnect_client()
 
249
 
 
250
    def _stop_gracefully(self):
 
251
        """When we finish this message, stop looking for more."""
 
252
        trace.mutter('Stopping %s' % (self,))
 
253
        self.finished = True
 
254
 
 
255
    def _disconnect_client(self):
 
256
        """Close the current connection. We stopped due to a timeout/etc."""
 
257
        # The default implementation is a no-op, because that is all we used to
 
258
        # do when disconnecting from a client. I suppose we never had the
 
259
        # *server* initiate a disconnect, before
 
260
 
 
261
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
262
        """Wait for more bytes to be read, but timeout if none available.
 
263
 
 
264
        This allows us to detect idle connections, and stop trying to read from
 
265
        them, without setting the socket itself to non-blocking. This also
 
266
        allows us to specify when we watch for idle timeouts.
 
267
 
 
268
        :return: Did we timeout? (True if we timed out, False if there is data
 
269
            to be read)
 
270
        """
 
271
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
220
272
 
221
273
    def _build_protocol(self):
222
274
        """Identifies the version of the incoming request, and returns an
227
279
 
228
280
        :returns: a SmartServerRequestProtocol.
229
281
        """
 
282
        self._wait_for_bytes_with_timeout(self._client_timeout)
 
283
        if self.finished:
 
284
            # We're stopping, so don't try to do any more work
 
285
            return None
230
286
        bytes = self._get_line()
231
287
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
232
288
        protocol = protocol_factory(
234
290
        protocol.accept_bytes(unused_bytes)
235
291
        return protocol
236
292
 
 
293
    def _wait_on_descriptor(self, fd, timeout_seconds):
 
294
        """select() on a file descriptor, waiting for nonblocking read()
 
295
 
 
296
        This will raise a ConnectionTimeout exception if we do not get a
 
297
        readable handle before timeout_seconds.
 
298
        :return: None
 
299
        """
 
300
        t_end = self._timer() + timeout_seconds
 
301
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
 
302
        rs = xs = None
 
303
        while not rs and not xs and self._timer() < t_end:
 
304
            if self.finished:
 
305
                return
 
306
            try:
 
307
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
 
308
            except (select.error, socket.error) as e:
 
309
                err = getattr(e, 'errno', None)
 
310
                if err is None and getattr(e, 'args', None) is not None:
 
311
                    # select.error doesn't have 'errno', it just has args[0]
 
312
                    err = e.args[0]
 
313
                if err in _bad_file_descriptor:
 
314
                    return # Not a socket indicates read() will fail
 
315
                elif err == errno.EINTR:
 
316
                    # Interrupted, keep looping.
 
317
                    continue
 
318
                raise
 
319
        if rs or xs:
 
320
            return
 
321
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
 
322
                                       % (timeout_seconds,))
 
323
 
237
324
    def _serve_one_request(self, protocol):
238
325
        """Read one request from input, process, send back a response.
239
326
 
260
347
 
261
348
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
262
349
 
263
 
    def __init__(self, sock, backing_transport, root_client_path='/'):
 
350
    def __init__(self, sock, backing_transport, root_client_path='/',
 
351
                 timeout=None):
264
352
        """Constructor.
265
353
 
266
354
        :param sock: the socket the server will read from.  It will be put
267
355
            into blocking mode.
268
356
        """
269
357
        SmartServerStreamMedium.__init__(
270
 
            self, backing_transport, root_client_path=root_client_path)
 
358
            self, backing_transport, root_client_path=root_client_path,
 
359
            timeout=timeout)
271
360
        sock.setblocking(True)
272
361
        self.socket = sock
 
362
        # Get the getpeername now, as we might be closed later when we care.
 
363
        try:
 
364
            self._client_info = sock.getpeername()
 
365
        except socket.error:
 
366
            self._client_info = '<unknown>'
 
367
 
 
368
    def __str__(self):
 
369
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
 
370
 
 
371
    def __repr__(self):
 
372
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
 
373
            self._client_info)
273
374
 
274
375
    def _serve_one_request_unguarded(self, protocol):
275
376
        while protocol.next_read_size():
284
385
 
285
386
        self._push_back(protocol.unused_data)
286
387
 
 
388
    def _disconnect_client(self):
 
389
        """Close the current connection. We stopped due to a timeout/etc."""
 
390
        self.socket.close()
 
391
 
 
392
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
393
        """Wait for more bytes to be read, but timeout if none available.
 
394
 
 
395
        This allows us to detect idle connections, and stop trying to read from
 
396
        them, without setting the socket itself to non-blocking. This also
 
397
        allows us to specify when we watch for idle timeouts.
 
398
 
 
399
        :return: None, this will raise ConnectionTimeout if we time out before
 
400
            data is available.
 
401
        """
 
402
        return self._wait_on_descriptor(self.socket, timeout_seconds)
 
403
 
287
404
    def _read_bytes(self, desired_count):
288
405
        return osutils.read_bytes_from_socket(
289
406
            self.socket, self._report_activity)
306
423
 
307
424
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
308
425
 
309
 
    def __init__(self, in_file, out_file, backing_transport):
 
426
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
310
427
        """Construct new server.
311
428
 
312
429
        :param in_file: Python file from which requests can be read.
313
430
        :param out_file: Python file to write responses.
314
431
        :param backing_transport: Transport for the directory served.
315
432
        """
316
 
        SmartServerStreamMedium.__init__(self, backing_transport)
 
433
        SmartServerStreamMedium.__init__(self, backing_transport,
 
434
            timeout=timeout)
317
435
        if sys.platform == 'win32':
318
436
            # force binary mode for files
319
437
            import msvcrt
324
442
        self._in = in_file
325
443
        self._out = out_file
326
444
 
 
445
    def serve(self):
 
446
        """See SmartServerStreamMedium.serve"""
 
447
        # This is the regular serve, except it adds signal trapping for soft
 
448
        # shutdown.
 
449
        stop_gracefully = self._stop_gracefully
 
450
        signals.register_on_hangup(id(self), stop_gracefully)
 
451
        try:
 
452
            return super(SmartServerPipeStreamMedium, self).serve()
 
453
        finally:
 
454
            signals.unregister_on_hangup(id(self))
 
455
 
327
456
    def _serve_one_request_unguarded(self, protocol):
328
457
        while True:
329
458
            # We need to be careful not to read past the end of the current
342
471
                return
343
472
            protocol.accept_bytes(bytes)
344
473
 
 
474
    def _disconnect_client(self):
 
475
        self._in.close()
 
476
        self._out.flush()
 
477
        self._out.close()
 
478
 
 
479
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
 
480
        """Wait for more bytes to be read, but timeout if none available.
 
481
 
 
482
        This allows us to detect idle connections, and stop trying to read from
 
483
        them, without setting the socket itself to non-blocking. This also
 
484
        allows us to specify when we watch for idle timeouts.
 
485
 
 
486
        :return: None, this will raise ConnectionTimeout if we time out before
 
487
            data is available.
 
488
        """
 
489
        if (getattr(self._in, 'fileno', None) is None
 
490
            or sys.platform == 'win32'):
 
491
            # You can't select() file descriptors on Windows.
 
492
            return
 
493
        return self._wait_on_descriptor(self._in, timeout_seconds)
 
494
 
345
495
    def _read_bytes(self, desired_count):
346
496
        return self._in.read(desired_count)
347
497
 
491
641
        return self._medium._get_line()
492
642
 
493
643
 
 
644
class _VfsRefuser(object):
 
645
    """An object that refuses all VFS requests.
 
646
 
 
647
    """
 
648
 
 
649
    def __init__(self):
 
650
        client._SmartClient.hooks.install_named_hook(
 
651
            'call', self.check_vfs, 'vfs refuser')
 
652
 
 
653
    def check_vfs(self, params):
 
654
        try:
 
655
            request_method = request.request_handlers.get(params.method)
 
656
        except KeyError:
 
657
            # A method we don't know about doesn't count as a VFS method.
 
658
            return
 
659
        if issubclass(request_method, vfs.VfsRequest):
 
660
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
 
661
 
 
662
 
494
663
class _DebugCounter(object):
495
664
    """An object that counts the HPSS calls made to each client medium.
496
665
 
497
 
    When a medium is garbage-collected, or failing that when atexit functions
498
 
    are run, the total number of calls made on that medium are reported via
499
 
    trace.note.
 
666
    When a medium is garbage-collected, or failing that when
 
667
    bzrlib.global_state exits, the total number of calls made on that medium
 
668
    are reported via trace.note.
500
669
    """
501
670
 
502
671
    def __init__(self):
503
672
        self.counts = weakref.WeakKeyDictionary()
504
673
        client._SmartClient.hooks.install_named_hook(
505
674
            'call', self.increment_call_count, 'hpss call counter')
506
 
        atexit.register(self.flush_all)
 
675
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
507
676
 
508
677
    def track(self, medium):
509
678
        """Start tracking calls made to a medium.
543
712
        value['count'] = 0
544
713
        value['vfs_count'] = 0
545
714
        if count != 0:
546
 
            trace.note('HPSS calls: %d (%d vfs) %s',
547
 
                       count, vfs_count, medium_repr)
 
715
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
 
716
                       count, vfs_count, medium_repr))
548
717
 
549
718
    def flush_all(self):
550
719
        for ref in list(self.counts.keys()):
551
720
            self.done(ref)
552
721
 
553
722
_debug_counter = None
 
723
_vfs_refuser = None
554
724
 
555
725
 
556
726
class SmartClientMedium(SmartMedium):
573
743
            if _debug_counter is None:
574
744
                _debug_counter = _DebugCounter()
575
745
            _debug_counter.track(self)
 
746
        if 'hpss_client_no_vfs' in debug.debug_flags:
 
747
            global _vfs_refuser
 
748
            if _vfs_refuser is None:
 
749
                _vfs_refuser = _VfsRefuser()
576
750
 
577
751
    def _is_remote_before(self, version_tuple):
578
752
        """Is it possible the remote side supports RPCs for a given version?
715
889
    """A client medium using simple pipes.
716
890
 
717
891
    This client does not manage the pipes: it assumes they will always be open.
718
 
 
719
 
    Note that if readable_pipe.read might raise IOError or OSError with errno
720
 
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
721
 
    (such as used for sys.stdin) are safe.
722
892
    """
723
893
 
724
894
    def __init__(self, readable_pipe, writeable_pipe, base):
737
907
 
738
908
    def _read_bytes(self, count):
739
909
        """See SmartClientStreamMedium._read_bytes."""
740
 
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
 
910
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
911
        bytes = self._readable_pipe.read(bytes_to_read)
741
912
        self._report_activity(len(bytes), 'read')
742
913
        return bytes
743
914
 
744
915
 
 
916
class SSHParams(object):
 
917
    """A set of parameters for starting a remote bzr via SSH."""
 
918
 
 
919
    def __init__(self, host, port=None, username=None, password=None,
 
920
            bzr_remote_path='bzr'):
 
921
        self.host = host
 
922
        self.port = port
 
923
        self.username = username
 
924
        self.password = password
 
925
        self.bzr_remote_path = bzr_remote_path
 
926
 
 
927
 
745
928
class SmartSSHClientMedium(SmartClientStreamMedium):
746
 
    """A client medium using SSH."""
 
929
    """A client medium using SSH.
 
930
    
 
931
    It delegates IO to a SmartClientSocketMedium or
 
932
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
 
933
    """
747
934
 
748
 
    def __init__(self, host, port=None, username=None, password=None,
749
 
            base=None, vendor=None, bzr_remote_path=None):
 
935
    def __init__(self, base, ssh_params, vendor=None):
750
936
        """Creates a client that will connect on the first use.
751
937
 
 
938
        :param ssh_params: A SSHParams instance.
752
939
        :param vendor: An optional override for the ssh vendor to use. See
753
940
            bzrlib.transport.ssh for details on ssh vendors.
754
941
        """
755
 
        self._connected = False
756
 
        self._host = host
757
 
        self._password = password
758
 
        self._port = port
759
 
        self._username = username
 
942
        self._real_medium = None
 
943
        self._ssh_params = ssh_params
760
944
        # for the benefit of progress making a short description of this
761
945
        # transport
762
946
        self._scheme = 'bzr+ssh'
764
948
        # _DebugCounter so we have to store all the values used in our repr
765
949
        # method before calling the super init.
766
950
        SmartClientStreamMedium.__init__(self, base)
767
 
        self._read_from = None
 
951
        self._vendor = vendor
768
952
        self._ssh_connection = None
769
 
        self._vendor = vendor
770
 
        self._write_to = None
771
 
        self._bzr_remote_path = bzr_remote_path
772
953
 
773
954
    def __repr__(self):
774
 
        if self._port is None:
 
955
        if self._ssh_params.port is None:
775
956
            maybe_port = ''
776
957
        else:
777
 
            maybe_port = ':%s' % self._port
 
958
            maybe_port = ':%s' % self._ssh_params.port
778
959
        return "%s(%s://%s@%s%s/)" % (
779
960
            self.__class__.__name__,
780
961
            self._scheme,
781
 
            self._username,
782
 
            self._host,
 
962
            self._ssh_params.username,
 
963
            self._ssh_params.host,
783
964
            maybe_port)
784
965
 
785
966
    def _accept_bytes(self, bytes):
786
967
        """See SmartClientStreamMedium.accept_bytes."""
787
968
        self._ensure_connection()
788
 
        self._write_to.write(bytes)
789
 
        self._report_activity(len(bytes), 'write')
 
969
        self._real_medium.accept_bytes(bytes)
790
970
 
791
971
    def disconnect(self):
792
972
        """See SmartClientMedium.disconnect()."""
793
 
        if not self._connected:
794
 
            return
795
 
        self._read_from.close()
796
 
        self._write_to.close()
797
 
        self._ssh_connection.close()
798
 
        self._connected = False
 
973
        if self._real_medium is not None:
 
974
            self._real_medium.disconnect()
 
975
            self._real_medium = None
 
976
        if self._ssh_connection is not None:
 
977
            self._ssh_connection.close()
 
978
            self._ssh_connection = None
799
979
 
800
980
    def _ensure_connection(self):
801
981
        """Connect this medium if not already connected."""
802
 
        if self._connected:
 
982
        if self._real_medium is not None:
803
983
            return
804
984
        if self._vendor is None:
805
985
            vendor = ssh._get_ssh_vendor()
806
986
        else:
807
987
            vendor = self._vendor
808
 
        self._ssh_connection = vendor.connect_ssh(self._username,
809
 
                self._password, self._host, self._port,
810
 
                command=[self._bzr_remote_path, 'serve', '--inet',
 
988
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
 
989
                self._ssh_params.password, self._ssh_params.host,
 
990
                self._ssh_params.port,
 
991
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
811
992
                         '--directory=/', '--allow-writes'])
812
 
        self._read_from, self._write_to = \
813
 
            self._ssh_connection.get_filelike_channels()
814
 
        self._connected = True
 
993
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
 
994
        if io_kind == 'socket':
 
995
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
 
996
                self.base, io_object)
 
997
        elif io_kind == 'pipes':
 
998
            read_from, write_to = io_object
 
999
            self._real_medium = SmartSimplePipesClientMedium(
 
1000
                read_from, write_to, self.base)
 
1001
        else:
 
1002
            raise AssertionError(
 
1003
                "Unexpected io_kind %r from %r"
 
1004
                % (io_kind, self._ssh_connection))
815
1005
 
816
1006
    def _flush(self):
817
1007
        """See SmartClientStreamMedium._flush()."""
818
 
        self._write_to.flush()
 
1008
        self._real_medium._flush()
819
1009
 
820
1010
    def _read_bytes(self, count):
821
1011
        """See SmartClientStreamMedium.read_bytes."""
822
 
        if not self._connected:
 
1012
        if self._real_medium is None:
823
1013
            raise errors.MediumNotConnected(self)
824
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
825
 
        bytes = self._read_from.read(bytes_to_read)
826
 
        self._report_activity(len(bytes), 'read')
827
 
        return bytes
 
1014
        return self._real_medium.read_bytes(count)
828
1015
 
829
1016
 
830
1017
# Port 4155 is the default port for bzr://, registered with IANA.
832
1019
BZR_DEFAULT_PORT = 4155
833
1020
 
834
1021
 
835
 
class SmartTCPClientMedium(SmartClientStreamMedium):
836
 
    """A client medium using TCP."""
 
1022
class SmartClientSocketMedium(SmartClientStreamMedium):
 
1023
    """A client medium using a socket.
 
1024
    
 
1025
    This class isn't usable directly.  Use one of its subclasses instead.
 
1026
    """
837
1027
 
838
 
    def __init__(self, host, port, base):
839
 
        """Creates a client that will connect on the first use."""
 
1028
    def __init__(self, base):
840
1029
        SmartClientStreamMedium.__init__(self, base)
 
1030
        self._socket = None
841
1031
        self._connected = False
842
 
        self._host = host
843
 
        self._port = port
844
 
        self._socket = None
845
1032
 
846
1033
    def _accept_bytes(self, bytes):
847
1034
        """See SmartClientMedium.accept_bytes."""
848
1035
        self._ensure_connection()
849
1036
        osutils.send_all(self._socket, bytes, self._report_activity)
850
1037
 
 
1038
    def _ensure_connection(self):
 
1039
        """Connect this medium if not already connected."""
 
1040
        raise NotImplementedError(self._ensure_connection)
 
1041
 
 
1042
    def _flush(self):
 
1043
        """See SmartClientStreamMedium._flush().
 
1044
 
 
1045
        For sockets we do no flushing. For TCP sockets we may want to turn off
 
1046
        TCP_NODELAY and add a means to do a flush, but that can be done in the
 
1047
        future.
 
1048
        """
 
1049
 
 
1050
    def _read_bytes(self, count):
 
1051
        """See SmartClientMedium.read_bytes."""
 
1052
        if not self._connected:
 
1053
            raise errors.MediumNotConnected(self)
 
1054
        return osutils.read_bytes_from_socket(
 
1055
            self._socket, self._report_activity)
 
1056
 
851
1057
    def disconnect(self):
852
1058
        """See SmartClientMedium.disconnect()."""
853
1059
        if not self._connected:
856
1062
        self._socket = None
857
1063
        self._connected = False
858
1064
 
 
1065
 
 
1066
class SmartTCPClientMedium(SmartClientSocketMedium):
 
1067
    """A client medium that creates a TCP connection."""
 
1068
 
 
1069
    def __init__(self, host, port, base):
 
1070
        """Creates a client that will connect on the first use."""
 
1071
        SmartClientSocketMedium.__init__(self, base)
 
1072
        self._host = host
 
1073
        self._port = port
 
1074
 
859
1075
    def _ensure_connection(self):
860
1076
        """Connect this medium if not already connected."""
861
1077
        if self._connected:
895
1111
                    (self._host, port, err_msg))
896
1112
        self._connected = True
897
1113
 
898
 
    def _flush(self):
899
 
        """See SmartClientStreamMedium._flush().
900
 
 
901
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
902
 
        add a means to do a flush, but that can be done in the future.
903
 
        """
904
 
 
905
 
    def _read_bytes(self, count):
906
 
        """See SmartClientMedium.read_bytes."""
907
 
        if not self._connected:
908
 
            raise errors.MediumNotConnected(self)
909
 
        return osutils.read_bytes_from_socket(
910
 
            self._socket, self._report_activity)
 
1114
 
 
1115
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
 
1116
    """A client medium for an already connected socket.
 
1117
    
 
1118
    Note that this class will assume it "owns" the socket, so it will close it
 
1119
    when its disconnect method is called.
 
1120
    """
 
1121
 
 
1122
    def __init__(self, base, sock):
 
1123
        SmartClientSocketMedium.__init__(self, base)
 
1124
        self._socket = sock
 
1125
        self._connected = True
 
1126
 
 
1127
    def _ensure_connection(self):
 
1128
        # Already connected, by definition!  So nothing to do.
 
1129
        pass
911
1130
 
912
1131
 
913
1132
class SmartClientStreamMediumRequest(SmartClientMediumRequest):