~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Martin
  • Date: 2010-04-16 12:59:03 UTC
  • mto: (5177.1.1 integration2)
  • mto: This revision was merged to the branch mainline in revision 5179.
  • Revision ID: gzlist@googlemail.com-20100416125903-yjm9d5uc7l8ilxpf
Catch a couple of missed plugin module docstrings, note need for assignment to __doc__ in developer documentation and NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
bzrlib/transport/smart/__init__.py.
25
25
"""
26
26
 
27
 
import errno
28
27
import os
29
28
import sys
30
 
import time
31
29
import urllib
32
30
 
33
 
import bzrlib
34
31
from bzrlib.lazy_import import lazy_import
35
32
lazy_import(globals(), """
36
 
import select
 
33
import atexit
37
34
import socket
38
35
import thread
39
36
import weakref
41
38
from bzrlib import (
42
39
    debug,
43
40
    errors,
 
41
    symbol_versioning,
44
42
    trace,
45
43
    ui,
46
44
    urlutils,
47
45
    )
48
 
from bzrlib.i18n import gettext
49
 
from bzrlib.smart import client, protocol, request, signals, vfs
 
46
from bzrlib.smart import client, protocol, request, vfs
50
47
from bzrlib.transport import ssh
51
48
""")
52
49
from bzrlib import osutils
179
176
        ui.ui_factory.report_transport_activity(self, bytes, direction)
180
177
 
181
178
 
182
 
_bad_file_descriptor = (errno.EBADF,)
183
 
if sys.platform == 'win32':
184
 
    # Given on Windows if you pass a closed socket to select.select. Probably
185
 
    # also given if you pass a file handle to select.
186
 
    WSAENOTSOCK = 10038
187
 
    _bad_file_descriptor += (WSAENOTSOCK,)
188
 
 
189
 
 
190
179
class SmartServerStreamMedium(SmartMedium):
191
180
    """Handles smart commands coming over a stream.
192
181
 
205
194
        the stream.  See also the _push_back method.
206
195
    """
207
196
 
208
 
    _timer = time.time
209
 
 
210
 
    def __init__(self, backing_transport, root_client_path='/', timeout=None):
 
197
    def __init__(self, backing_transport, root_client_path='/'):
211
198
        """Construct new server.
212
199
 
213
200
        :param backing_transport: Transport for the directory served.
216
203
        self.backing_transport = backing_transport
217
204
        self.root_client_path = root_client_path
218
205
        self.finished = False
219
 
        if timeout is None:
220
 
            raise AssertionError('You must supply a timeout.')
221
 
        self._client_timeout = timeout
222
 
        self._client_poll_timeout = min(timeout / 10.0, 1.0)
223
206
        SmartMedium.__init__(self)
224
207
 
225
208
    def serve(self):
230
213
        try:
231
214
            while not self.finished:
232
215
                server_protocol = self._build_protocol()
233
 
                # TODO: This seems inelegant:
234
 
                if server_protocol is None:
235
 
                    # We could 'continue' only to notice that self.finished is
236
 
                    # True...
237
 
                    break
238
216
                self._serve_one_request(server_protocol)
239
 
        except errors.ConnectionTimeout, e:
240
 
            trace.note('%s' % (e,))
241
 
            trace.log_exception_quietly()
242
 
            self._disconnect_client()
243
 
            # We reported it, no reason to make a big fuss.
244
 
            return
245
217
        except Exception, e:
246
218
            stderr.write("%s terminating on exception %s\n" % (self, e))
247
219
            raise
248
 
        self._disconnect_client()
249
 
 
250
 
    def _stop_gracefully(self):
251
 
        """When we finish this message, stop looking for more."""
252
 
        trace.mutter('Stopping %s' % (self,))
253
 
        self.finished = True
254
 
 
255
 
    def _disconnect_client(self):
256
 
        """Close the current connection. We stopped due to a timeout/etc."""
257
 
        # The default implementation is a no-op, because that is all we used to
258
 
        # do when disconnecting from a client. I suppose we never had the
259
 
        # *server* initiate a disconnect, before
260
 
 
261
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
262
 
        """Wait for more bytes to be read, but timeout if none available.
263
 
 
264
 
        This allows us to detect idle connections, and stop trying to read from
265
 
        them, without setting the socket itself to non-blocking. This also
266
 
        allows us to specify when we watch for idle timeouts.
267
 
 
268
 
        :return: Did we timeout? (True if we timed out, False if there is data
269
 
            to be read)
270
 
        """
271
 
        raise NotImplementedError(self._wait_for_bytes_with_timeout)
272
220
 
273
221
    def _build_protocol(self):
274
222
        """Identifies the version of the incoming request, and returns an
279
227
 
280
228
        :returns: a SmartServerRequestProtocol.
281
229
        """
282
 
        self._wait_for_bytes_with_timeout(self._client_timeout)
283
 
        if self.finished:
284
 
            # We're stopping, so don't try to do any more work
285
 
            return None
286
230
        bytes = self._get_line()
287
231
        protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
288
232
        protocol = protocol_factory(
290
234
        protocol.accept_bytes(unused_bytes)
291
235
        return protocol
292
236
 
293
 
    def _wait_on_descriptor(self, fd, timeout_seconds):
294
 
        """select() on a file descriptor, waiting for nonblocking read()
295
 
 
296
 
        This will raise a ConnectionTimeout exception if we do not get a
297
 
        readable handle before timeout_seconds.
298
 
        :return: None
299
 
        """
300
 
        t_end = self._timer() + timeout_seconds
301
 
        poll_timeout = min(timeout_seconds, self._client_poll_timeout)
302
 
        rs = xs = None
303
 
        while not rs and not xs and self._timer() < t_end:
304
 
            if self.finished:
305
 
                return
306
 
            try:
307
 
                rs, _, xs = select.select([fd], [], [fd], poll_timeout)
308
 
            except (select.error, socket.error) as e:
309
 
                err = getattr(e, 'errno', None)
310
 
                if err is None and getattr(e, 'args', None) is not None:
311
 
                    # select.error doesn't have 'errno', it just has args[0]
312
 
                    err = e.args[0]
313
 
                if err in _bad_file_descriptor:
314
 
                    return # Not a socket indicates read() will fail
315
 
                elif err == errno.EINTR:
316
 
                    # Interrupted, keep looping.
317
 
                    continue
318
 
                raise
319
 
        if rs or xs:
320
 
            return
321
 
        raise errors.ConnectionTimeout('disconnecting client after %.1f seconds'
322
 
                                       % (timeout_seconds,))
323
 
 
324
237
    def _serve_one_request(self, protocol):
325
238
        """Read one request from input, process, send back a response.
326
239
 
347
260
 
348
261
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
349
262
 
350
 
    def __init__(self, sock, backing_transport, root_client_path='/',
351
 
                 timeout=None):
 
263
    def __init__(self, sock, backing_transport, root_client_path='/'):
352
264
        """Constructor.
353
265
 
354
266
        :param sock: the socket the server will read from.  It will be put
355
267
            into blocking mode.
356
268
        """
357
269
        SmartServerStreamMedium.__init__(
358
 
            self, backing_transport, root_client_path=root_client_path,
359
 
            timeout=timeout)
 
270
            self, backing_transport, root_client_path=root_client_path)
360
271
        sock.setblocking(True)
361
272
        self.socket = sock
362
 
        # Get the getpeername now, as we might be closed later when we care.
363
 
        try:
364
 
            self._client_info = sock.getpeername()
365
 
        except socket.error:
366
 
            self._client_info = '<unknown>'
367
 
 
368
 
    def __str__(self):
369
 
        return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
370
 
 
371
 
    def __repr__(self):
372
 
        return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
373
 
            self._client_info)
374
273
 
375
274
    def _serve_one_request_unguarded(self, protocol):
376
275
        while protocol.next_read_size():
385
284
 
386
285
        self._push_back(protocol.unused_data)
387
286
 
388
 
    def _disconnect_client(self):
389
 
        """Close the current connection. We stopped due to a timeout/etc."""
390
 
        self.socket.close()
391
 
 
392
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
393
 
        """Wait for more bytes to be read, but timeout if none available.
394
 
 
395
 
        This allows us to detect idle connections, and stop trying to read from
396
 
        them, without setting the socket itself to non-blocking. This also
397
 
        allows us to specify when we watch for idle timeouts.
398
 
 
399
 
        :return: None, this will raise ConnectionTimeout if we time out before
400
 
            data is available.
401
 
        """
402
 
        return self._wait_on_descriptor(self.socket, timeout_seconds)
403
 
 
404
287
    def _read_bytes(self, desired_count):
405
288
        return osutils.read_bytes_from_socket(
406
289
            self.socket, self._report_activity)
423
306
 
424
307
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
425
308
 
426
 
    def __init__(self, in_file, out_file, backing_transport, timeout=None):
 
309
    def __init__(self, in_file, out_file, backing_transport):
427
310
        """Construct new server.
428
311
 
429
312
        :param in_file: Python file from which requests can be read.
430
313
        :param out_file: Python file to write responses.
431
314
        :param backing_transport: Transport for the directory served.
432
315
        """
433
 
        SmartServerStreamMedium.__init__(self, backing_transport,
434
 
            timeout=timeout)
 
316
        SmartServerStreamMedium.__init__(self, backing_transport)
435
317
        if sys.platform == 'win32':
436
318
            # force binary mode for files
437
319
            import msvcrt
442
324
        self._in = in_file
443
325
        self._out = out_file
444
326
 
445
 
    def serve(self):
446
 
        """See SmartServerStreamMedium.serve"""
447
 
        # This is the regular serve, except it adds signal trapping for soft
448
 
        # shutdown.
449
 
        stop_gracefully = self._stop_gracefully
450
 
        signals.register_on_hangup(id(self), stop_gracefully)
451
 
        try:
452
 
            return super(SmartServerPipeStreamMedium, self).serve()
453
 
        finally:
454
 
            signals.unregister_on_hangup(id(self))
455
 
 
456
327
    def _serve_one_request_unguarded(self, protocol):
457
328
        while True:
458
329
            # We need to be careful not to read past the end of the current
471
342
                return
472
343
            protocol.accept_bytes(bytes)
473
344
 
474
 
    def _disconnect_client(self):
475
 
        self._in.close()
476
 
        self._out.flush()
477
 
        self._out.close()
478
 
 
479
 
    def _wait_for_bytes_with_timeout(self, timeout_seconds):
480
 
        """Wait for more bytes to be read, but timeout if none available.
481
 
 
482
 
        This allows us to detect idle connections, and stop trying to read from
483
 
        them, without setting the socket itself to non-blocking. This also
484
 
        allows us to specify when we watch for idle timeouts.
485
 
 
486
 
        :return: None, this will raise ConnectionTimeout if we time out before
487
 
            data is available.
488
 
        """
489
 
        if (getattr(self._in, 'fileno', None) is None
490
 
            or sys.platform == 'win32'):
491
 
            # You can't select() file descriptors on Windows.
492
 
            return
493
 
        return self._wait_on_descriptor(self._in, timeout_seconds)
494
 
 
495
345
    def _read_bytes(self, desired_count):
496
346
        return self._in.read(desired_count)
497
347
 
641
491
        return self._medium._get_line()
642
492
 
643
493
 
644
 
class _VfsRefuser(object):
645
 
    """An object that refuses all VFS requests.
646
 
 
647
 
    """
648
 
 
649
 
    def __init__(self):
650
 
        client._SmartClient.hooks.install_named_hook(
651
 
            'call', self.check_vfs, 'vfs refuser')
652
 
 
653
 
    def check_vfs(self, params):
654
 
        try:
655
 
            request_method = request.request_handlers.get(params.method)
656
 
        except KeyError:
657
 
            # A method we don't know about doesn't count as a VFS method.
658
 
            return
659
 
        if issubclass(request_method, vfs.VfsRequest):
660
 
            raise errors.HpssVfsRequestNotAllowed(params.method, params.args)
661
 
 
662
 
 
663
494
class _DebugCounter(object):
664
495
    """An object that counts the HPSS calls made to each client medium.
665
496
 
666
 
    When a medium is garbage-collected, or failing that when
667
 
    bzrlib.global_state exits, the total number of calls made on that medium
668
 
    are reported via trace.note.
 
497
    When a medium is garbage-collected, or failing that when atexit functions
 
498
    are run, the total number of calls made on that medium are reported via
 
499
    trace.note.
669
500
    """
670
501
 
671
502
    def __init__(self):
672
503
        self.counts = weakref.WeakKeyDictionary()
673
504
        client._SmartClient.hooks.install_named_hook(
674
505
            'call', self.increment_call_count, 'hpss call counter')
675
 
        bzrlib.global_state.cleanups.add_cleanup(self.flush_all)
 
506
        atexit.register(self.flush_all)
676
507
 
677
508
    def track(self, medium):
678
509
        """Start tracking calls made to a medium.
712
543
        value['count'] = 0
713
544
        value['vfs_count'] = 0
714
545
        if count != 0:
715
 
            trace.note(gettext('HPSS calls: {0} ({1} vfs) {2}').format(
716
 
                       count, vfs_count, medium_repr))
 
546
            trace.note('HPSS calls: %d (%d vfs) %s',
 
547
                       count, vfs_count, medium_repr)
717
548
 
718
549
    def flush_all(self):
719
550
        for ref in list(self.counts.keys()):
720
551
            self.done(ref)
721
552
 
722
553
_debug_counter = None
723
 
_vfs_refuser = None
724
554
 
725
555
 
726
556
class SmartClientMedium(SmartMedium):
743
573
            if _debug_counter is None:
744
574
                _debug_counter = _DebugCounter()
745
575
            _debug_counter.track(self)
746
 
        if 'hpss_client_no_vfs' in debug.debug_flags:
747
 
            global _vfs_refuser
748
 
            if _vfs_refuser is None:
749
 
                _vfs_refuser = _VfsRefuser()
750
576
 
751
577
    def _is_remote_before(self, version_tuple):
752
578
        """Is it possible the remote side supports RPCs for a given version?
781
607
            # which is newer than a previously supplied older-than version.
782
608
            # This indicates that some smart verb call is not guarded
783
609
            # appropriately (it should simply not have been tried).
784
 
            trace.mutter(
 
610
            raise AssertionError(
785
611
                "_remember_remote_is_before(%r) called, but "
786
612
                "_remember_remote_is_before(%r) was called previously."
787
 
                , version_tuple, self._remote_version_is_before)
788
 
            if 'hpss' in debug.debug_flags:
789
 
                ui.ui_factory.show_warning(
790
 
                    "_remember_remote_is_before(%r) called, but "
791
 
                    "_remember_remote_is_before(%r) was called previously."
792
 
                    % (version_tuple, self._remote_version_is_before))
793
 
            return
 
613
                % (version_tuple, self._remote_version_is_before))
794
614
        self._remote_version_is_before = version_tuple
795
615
 
796
616
    def protocol_version(self):
889
709
    """A client medium using simple pipes.
890
710
 
891
711
    This client does not manage the pipes: it assumes they will always be open.
 
712
 
 
713
    Note that if readable_pipe.read might raise IOError or OSError with errno
 
714
    of EINTR, it must be safe to retry the read.  Plain CPython fileobjects
 
715
    (such as used for sys.stdin) are safe.
892
716
    """
893
717
 
894
718
    def __init__(self, readable_pipe, writeable_pipe, base):
907
731
 
908
732
    def _read_bytes(self, count):
909
733
        """See SmartClientStreamMedium._read_bytes."""
910
 
        bytes_to_read = min(count, _MAX_READ_SIZE)
911
 
        bytes = self._readable_pipe.read(bytes_to_read)
 
734
        bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
912
735
        self._report_activity(len(bytes), 'read')
913
736
        return bytes
914
737
 
915
738
 
916
 
class SSHParams(object):
917
 
    """A set of parameters for starting a remote bzr via SSH."""
 
739
class SmartSSHClientMedium(SmartClientStreamMedium):
 
740
    """A client medium using SSH."""
918
741
 
919
742
    def __init__(self, host, port=None, username=None, password=None,
920
 
            bzr_remote_path='bzr'):
921
 
        self.host = host
922
 
        self.port = port
923
 
        self.username = username
924
 
        self.password = password
925
 
        self.bzr_remote_path = bzr_remote_path
926
 
 
927
 
 
928
 
class SmartSSHClientMedium(SmartClientStreamMedium):
929
 
    """A client medium using SSH.
930
 
    
931
 
    It delegates IO to a SmartClientSocketMedium or
932
 
    SmartClientAlreadyConnectedSocketMedium (depending on platform).
933
 
    """
934
 
 
935
 
    def __init__(self, base, ssh_params, vendor=None):
 
743
            base=None, vendor=None, bzr_remote_path=None):
936
744
        """Creates a client that will connect on the first use.
937
745
 
938
 
        :param ssh_params: A SSHParams instance.
939
746
        :param vendor: An optional override for the ssh vendor to use. See
940
747
            bzrlib.transport.ssh for details on ssh vendors.
941
748
        """
942
 
        self._real_medium = None
943
 
        self._ssh_params = ssh_params
 
749
        self._connected = False
 
750
        self._host = host
 
751
        self._password = password
 
752
        self._port = port
 
753
        self._username = username
944
754
        # for the benefit of progress making a short description of this
945
755
        # transport
946
756
        self._scheme = 'bzr+ssh'
948
758
        # _DebugCounter so we have to store all the values used in our repr
949
759
        # method before calling the super init.
950
760
        SmartClientStreamMedium.__init__(self, base)
 
761
        self._read_from = None
 
762
        self._ssh_connection = None
951
763
        self._vendor = vendor
952
 
        self._ssh_connection = None
 
764
        self._write_to = None
 
765
        self._bzr_remote_path = bzr_remote_path
953
766
 
954
767
    def __repr__(self):
955
 
        if self._ssh_params.port is None:
 
768
        if self._port is None:
956
769
            maybe_port = ''
957
770
        else:
958
 
            maybe_port = ':%s' % self._ssh_params.port
 
771
            maybe_port = ':%s' % self._port
959
772
        return "%s(%s://%s@%s%s/)" % (
960
773
            self.__class__.__name__,
961
774
            self._scheme,
962
 
            self._ssh_params.username,
963
 
            self._ssh_params.host,
 
775
            self._username,
 
776
            self._host,
964
777
            maybe_port)
965
778
 
966
779
    def _accept_bytes(self, bytes):
967
780
        """See SmartClientStreamMedium.accept_bytes."""
968
781
        self._ensure_connection()
969
 
        self._real_medium.accept_bytes(bytes)
 
782
        self._write_to.write(bytes)
 
783
        self._report_activity(len(bytes), 'write')
970
784
 
971
785
    def disconnect(self):
972
786
        """See SmartClientMedium.disconnect()."""
973
 
        if self._real_medium is not None:
974
 
            self._real_medium.disconnect()
975
 
            self._real_medium = None
976
 
        if self._ssh_connection is not None:
977
 
            self._ssh_connection.close()
978
 
            self._ssh_connection = None
 
787
        if not self._connected:
 
788
            return
 
789
        self._read_from.close()
 
790
        self._write_to.close()
 
791
        self._ssh_connection.close()
 
792
        self._connected = False
979
793
 
980
794
    def _ensure_connection(self):
981
795
        """Connect this medium if not already connected."""
982
 
        if self._real_medium is not None:
 
796
        if self._connected:
983
797
            return
984
798
        if self._vendor is None:
985
799
            vendor = ssh._get_ssh_vendor()
986
800
        else:
987
801
            vendor = self._vendor
988
 
        self._ssh_connection = vendor.connect_ssh(self._ssh_params.username,
989
 
                self._ssh_params.password, self._ssh_params.host,
990
 
                self._ssh_params.port,
991
 
                command=[self._ssh_params.bzr_remote_path, 'serve', '--inet',
 
802
        self._ssh_connection = vendor.connect_ssh(self._username,
 
803
                self._password, self._host, self._port,
 
804
                command=[self._bzr_remote_path, 'serve', '--inet',
992
805
                         '--directory=/', '--allow-writes'])
993
 
        io_kind, io_object = self._ssh_connection.get_sock_or_pipes()
994
 
        if io_kind == 'socket':
995
 
            self._real_medium = SmartClientAlreadyConnectedSocketMedium(
996
 
                self.base, io_object)
997
 
        elif io_kind == 'pipes':
998
 
            read_from, write_to = io_object
999
 
            self._real_medium = SmartSimplePipesClientMedium(
1000
 
                read_from, write_to, self.base)
1001
 
        else:
1002
 
            raise AssertionError(
1003
 
                "Unexpected io_kind %r from %r"
1004
 
                % (io_kind, self._ssh_connection))
 
806
        self._read_from, self._write_to = \
 
807
            self._ssh_connection.get_filelike_channels()
 
808
        self._connected = True
1005
809
 
1006
810
    def _flush(self):
1007
811
        """See SmartClientStreamMedium._flush()."""
1008
 
        self._real_medium._flush()
 
812
        self._write_to.flush()
1009
813
 
1010
814
    def _read_bytes(self, count):
1011
815
        """See SmartClientStreamMedium.read_bytes."""
1012
 
        if self._real_medium is None:
 
816
        if not self._connected:
1013
817
            raise errors.MediumNotConnected(self)
1014
 
        return self._real_medium.read_bytes(count)
 
818
        bytes_to_read = min(count, _MAX_READ_SIZE)
 
819
        bytes = self._read_from.read(bytes_to_read)
 
820
        self._report_activity(len(bytes), 'read')
 
821
        return bytes
1015
822
 
1016
823
 
1017
824
# Port 4155 is the default port for bzr://, registered with IANA.
1019
826
BZR_DEFAULT_PORT = 4155
1020
827
 
1021
828
 
1022
 
class SmartClientSocketMedium(SmartClientStreamMedium):
1023
 
    """A client medium using a socket.
1024
 
    
1025
 
    This class isn't usable directly.  Use one of its subclasses instead.
1026
 
    """
 
829
class SmartTCPClientMedium(SmartClientStreamMedium):
 
830
    """A client medium using TCP."""
1027
831
 
1028
 
    def __init__(self, base):
 
832
    def __init__(self, host, port, base):
 
833
        """Creates a client that will connect on the first use."""
1029
834
        SmartClientStreamMedium.__init__(self, base)
 
835
        self._connected = False
 
836
        self._host = host
 
837
        self._port = port
1030
838
        self._socket = None
1031
 
        self._connected = False
1032
839
 
1033
840
    def _accept_bytes(self, bytes):
1034
841
        """See SmartClientMedium.accept_bytes."""
1035
842
        self._ensure_connection()
1036
843
        osutils.send_all(self._socket, bytes, self._report_activity)
1037
844
 
1038
 
    def _ensure_connection(self):
1039
 
        """Connect this medium if not already connected."""
1040
 
        raise NotImplementedError(self._ensure_connection)
1041
 
 
1042
 
    def _flush(self):
1043
 
        """See SmartClientStreamMedium._flush().
1044
 
 
1045
 
        For sockets we do no flushing. For TCP sockets we may want to turn off
1046
 
        TCP_NODELAY and add a means to do a flush, but that can be done in the
1047
 
        future.
1048
 
        """
1049
 
 
1050
 
    def _read_bytes(self, count):
1051
 
        """See SmartClientMedium.read_bytes."""
1052
 
        if not self._connected:
1053
 
            raise errors.MediumNotConnected(self)
1054
 
        return osutils.read_bytes_from_socket(
1055
 
            self._socket, self._report_activity)
1056
 
 
1057
845
    def disconnect(self):
1058
846
        """See SmartClientMedium.disconnect()."""
1059
847
        if not self._connected:
1062
850
        self._socket = None
1063
851
        self._connected = False
1064
852
 
1065
 
 
1066
 
class SmartTCPClientMedium(SmartClientSocketMedium):
1067
 
    """A client medium that creates a TCP connection."""
1068
 
 
1069
 
    def __init__(self, host, port, base):
1070
 
        """Creates a client that will connect on the first use."""
1071
 
        SmartClientSocketMedium.__init__(self, base)
1072
 
        self._host = host
1073
 
        self._port = port
1074
 
 
1075
853
    def _ensure_connection(self):
1076
854
        """Connect this medium if not already connected."""
1077
855
        if self._connected:
1111
889
                    (self._host, port, err_msg))
1112
890
        self._connected = True
1113
891
 
1114
 
 
1115
 
class SmartClientAlreadyConnectedSocketMedium(SmartClientSocketMedium):
1116
 
    """A client medium for an already connected socket.
1117
 
    
1118
 
    Note that this class will assume it "owns" the socket, so it will close it
1119
 
    when its disconnect method is called.
1120
 
    """
1121
 
 
1122
 
    def __init__(self, base, sock):
1123
 
        SmartClientSocketMedium.__init__(self, base)
1124
 
        self._socket = sock
1125
 
        self._connected = True
1126
 
 
1127
 
    def _ensure_connection(self):
1128
 
        # Already connected, by definition!  So nothing to do.
1129
 
        pass
 
892
    def _flush(self):
 
893
        """See SmartClientStreamMedium._flush().
 
894
 
 
895
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and
 
896
        add a means to do a flush, but that can be done in the future.
 
897
        """
 
898
 
 
899
    def _read_bytes(self, count):
 
900
        """See SmartClientMedium.read_bytes."""
 
901
        if not self._connected:
 
902
            raise errors.MediumNotConnected(self)
 
903
        return osutils.read_bytes_from_socket(
 
904
            self._socket, self._report_activity)
1130
905
 
1131
906
 
1132
907
class SmartClientStreamMediumRequest(SmartClientMediumRequest):