~bzr-pqm/bzr/bzr.dev

2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
1
# Copyright (C) 2006 Canonical Ltd
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
2018.5.19 by Andrew Bennetts
Add docstrings to all the new modules, and a few other places.
17
"""The 'medium' layer for the smart servers and clients.
18
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
21
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic.  See the overview in
24
bzrlib/transport/smart/__init__.py.
25
"""
26
2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
27
import os
28
import socket
29
30
from bzrlib import errors
2018.5.21 by Andrew Bennetts
Move bzrlib.transport.smart to bzrlib.smart
31
from bzrlib.smart import protocol
2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
32
try:
33
    from bzrlib.transport import ssh
34
except errors.ParamikoNotPresent:
35
    # no paramiko.  SmartSSHClientMedium will break.
36
    pass
37
2018.5.17 by Andrew Bennetts
Paramaterise the commands handled by SmartServerRequestHandler.
38
2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
39
class SmartServerStreamMedium(object):
40
    """Handles smart commands coming over a stream.
41
42
    The stream may be a pipe connected to sshd, or a tcp socket, or an
43
    in-process fifo for testing.
44
45
    One instance is created for each connected client; it can serve multiple
46
    requests in the lifetime of the connection.
47
48
    The server passes requests through to an underlying backing transport, 
49
    which will typically be a LocalTransport looking at the server's filesystem.
50
    """
51
52
    def __init__(self, backing_transport):
53
        """Construct new server.
54
55
        :param backing_transport: Transport for the directory served.
56
        """
57
        # backing_transport could be passed to serve instead of __init__
58
        self.backing_transport = backing_transport
59
        self.finished = False
60
61
    def serve(self):
62
        """Serve requests until the client disconnects."""
63
        # Keep a reference to stderr because the sys module's globals get set to
64
        # None during interpreter shutdown.
65
        from sys import stderr
66
        try:
67
            while not self.finished:
2018.5.14 by Andrew Bennetts
Move SmartTCPServer to smart/server.py, and SmartServerRequestHandler to smart/request.py.
68
                server_protocol = protocol.SmartServerRequestProtocolOne(
2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
69
                    self.backing_transport, self._write_out)
2018.5.14 by Andrew Bennetts
Move SmartTCPServer to smart/server.py, and SmartServerRequestHandler to smart/request.py.
70
                self._serve_one_request(server_protocol)
2018.5.2 by Andrew Bennetts
Start splitting bzrlib/transport/smart.py into a package.
71
        except Exception, e:
72
            stderr.write("%s terminating on exception %s\n" % (self, e))
73
            raise
74
75
    def _serve_one_request(self, protocol):
76
        """Read one request from input, process, send back a response.
77
        
78
        :param protocol: a SmartServerRequestProtocol.
79
        """
80
        try:
81
            self._serve_one_request_unguarded(protocol)
82
        except KeyboardInterrupt:
83
            raise
84
        except Exception, e:
85
            self.terminate_due_to_error()
86
87
    def terminate_due_to_error(self):
88
        """Called when an unhandled exception from the protocol occurs."""
89
        raise NotImplementedError(self.terminate_due_to_error)
90
91
92
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
93
94
    def __init__(self, sock, backing_transport):
95
        """Constructor.
96
97
        :param sock: the socket the server will read from.  It will be put
98
            into blocking mode.
99
        """
100
        SmartServerStreamMedium.__init__(self, backing_transport)
101
        self.push_back = ''
102
        sock.setblocking(True)
103
        self.socket = sock
104
105
    def _serve_one_request_unguarded(self, protocol):
106
        while protocol.next_read_size():
107
            if self.push_back:
108
                protocol.accept_bytes(self.push_back)
109
                self.push_back = ''
110
            else:
111
                bytes = self.socket.recv(4096)
112
                if bytes == '':
113
                    self.finished = True
114
                    return
115
                protocol.accept_bytes(bytes)
116
        
117
        self.push_back = protocol.excess_buffer
118
    
119
    def terminate_due_to_error(self):
120
        """Called when an unhandled exception from the protocol occurs."""
121
        # TODO: This should log to a server log file, but no such thing
122
        # exists yet.  Andrew Bennetts 2006-09-29.
123
        self.socket.close()
124
        self.finished = True
125
126
    def _write_out(self, bytes):
127
        self.socket.sendall(bytes)
128
129
130
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
131
132
    def __init__(self, in_file, out_file, backing_transport):
133
        """Construct new server.
134
135
        :param in_file: Python file from which requests can be read.
136
        :param out_file: Python file to write responses.
137
        :param backing_transport: Transport for the directory served.
138
        """
139
        SmartServerStreamMedium.__init__(self, backing_transport)
140
        self._in = in_file
141
        self._out = out_file
142
143
    def _serve_one_request_unguarded(self, protocol):
144
        while True:
145
            bytes_to_read = protocol.next_read_size()
146
            if bytes_to_read == 0:
147
                # Finished serving this request.
148
                self._out.flush()
149
                return
150
            bytes = self._in.read(bytes_to_read)
151
            if bytes == '':
152
                # Connection has been closed.
153
                self.finished = True
154
                self._out.flush()
155
                return
156
            protocol.accept_bytes(bytes)
157
158
    def terminate_due_to_error(self):
159
        # TODO: This should log to a server log file, but no such thing
160
        # exists yet.  Andrew Bennetts 2006-09-29.
161
        self._out.close()
162
        self.finished = True
163
164
    def _write_out(self, bytes):
165
        self._out.write(bytes)
166
167
168
class SmartClientMediumRequest(object):
169
    """A request on a SmartClientMedium.
170
171
    Each request allows bytes to be provided to it via accept_bytes, and then
172
    the response bytes to be read via read_bytes.
173
174
    For instance:
175
    request.accept_bytes('123')
176
    request.finished_writing()
177
    result = request.read_bytes(3)
178
    request.finished_reading()
179
180
    It is up to the individual SmartClientMedium whether multiple concurrent
181
    requests can exist. See SmartClientMedium.get_request to obtain instances 
182
    of SmartClientMediumRequest, and the concrete Medium you are using for 
183
    details on concurrency and pipelining.
184
    """
185
186
    def __init__(self, medium):
187
        """Construct a SmartClientMediumRequest for the medium medium."""
188
        self._medium = medium
189
        # we track state by constants - we may want to use the same
190
        # pattern as BodyReader if it gets more complex.
191
        # valid states are: "writing", "reading", "done"
192
        self._state = "writing"
193
194
    def accept_bytes(self, bytes):
195
        """Accept bytes for inclusion in this request.
196
197
        This method may not be be called after finished_writing() has been
198
        called.  It depends upon the Medium whether or not the bytes will be
199
        immediately transmitted. Message based Mediums will tend to buffer the
200
        bytes until finished_writing() is called.
201
202
        :param bytes: A bytestring.
203
        """
204
        if self._state != "writing":
205
            raise errors.WritingCompleted(self)
206
        self._accept_bytes(bytes)
207
208
    def _accept_bytes(self, bytes):
209
        """Helper for accept_bytes.
210
211
        Accept_bytes checks the state of the request to determing if bytes
212
        should be accepted. After that it hands off to _accept_bytes to do the
213
        actual acceptance.
214
        """
215
        raise NotImplementedError(self._accept_bytes)
216
217
    def finished_reading(self):
218
        """Inform the request that all desired data has been read.
219
220
        This will remove the request from the pipeline for its medium (if the
221
        medium supports pipelining) and any further calls to methods on the
222
        request will raise ReadingCompleted.
223
        """
224
        if self._state == "writing":
225
            raise errors.WritingNotComplete(self)
226
        if self._state != "reading":
227
            raise errors.ReadingCompleted(self)
228
        self._state = "done"
229
        self._finished_reading()
230
231
    def _finished_reading(self):
232
        """Helper for finished_reading.
233
234
        finished_reading checks the state of the request to determine if 
235
        finished_reading is allowed, and if it is hands off to _finished_reading
236
        to perform the action.
237
        """
238
        raise NotImplementedError(self._finished_reading)
239
240
    def finished_writing(self):
241
        """Finish the writing phase of this request.
242
243
        This will flush all pending data for this request along the medium.
244
        After calling finished_writing, you may not call accept_bytes anymore.
245
        """
246
        if self._state != "writing":
247
            raise errors.WritingCompleted(self)
248
        self._state = "reading"
249
        self._finished_writing()
250
251
    def _finished_writing(self):
252
        """Helper for finished_writing.
253
254
        finished_writing checks the state of the request to determine if 
255
        finished_writing is allowed, and if it is hands off to _finished_writing
256
        to perform the action.
257
        """
258
        raise NotImplementedError(self._finished_writing)
259
260
    def read_bytes(self, count):
261
        """Read bytes from this requests response.
262
263
        This method will block and wait for count bytes to be read. It may not
264
        be invoked until finished_writing() has been called - this is to ensure
265
        a message-based approach to requests, for compatability with message
266
        based mediums like HTTP.
267
        """
268
        if self._state == "writing":
269
            raise errors.WritingNotComplete(self)
270
        if self._state != "reading":
271
            raise errors.ReadingCompleted(self)
272
        return self._read_bytes(count)
273
274
    def _read_bytes(self, count):
275
        """Helper for read_bytes.
276
277
        read_bytes checks the state of the request to determing if bytes
278
        should be read. After that it hands off to _read_bytes to do the
279
        actual read.
280
        """
281
        raise NotImplementedError(self._read_bytes)
282
283
284
class SmartClientMedium(object):
285
    """Smart client is a medium for sending smart protocol requests over."""
286
287
    def disconnect(self):
288
        """If this medium maintains a persistent connection, close it.
289
        
290
        The default implementation does nothing.
291
        """
292
        
293
294
class SmartClientStreamMedium(SmartClientMedium):
295
    """Stream based medium common class.
296
297
    SmartClientStreamMediums operate on a stream. All subclasses use a common
298
    SmartClientStreamMediumRequest for their requests, and should implement
299
    _accept_bytes and _read_bytes to allow the request objects to send and
300
    receive bytes.
301
    """
302
303
    def __init__(self):
304
        self._current_request = None
305
306
    def accept_bytes(self, bytes):
307
        self._accept_bytes(bytes)
308
309
    def __del__(self):
310
        """The SmartClientStreamMedium knows how to close the stream when it is
311
        finished with it.
312
        """
313
        self.disconnect()
314
315
    def _flush(self):
316
        """Flush the output stream.
317
        
318
        This method is used by the SmartClientStreamMediumRequest to ensure that
319
        all data for a request is sent, to avoid long timeouts or deadlocks.
320
        """
321
        raise NotImplementedError(self._flush)
322
323
    def get_request(self):
324
        """See SmartClientMedium.get_request().
325
326
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
327
        for get_request.
328
        """
329
        return SmartClientStreamMediumRequest(self)
330
331
    def read_bytes(self, count):
332
        return self._read_bytes(count)
333
334
335
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
336
    """A client medium using simple pipes.
337
    
338
    This client does not manage the pipes: it assumes they will always be open.
339
    """
340
341
    def __init__(self, readable_pipe, writeable_pipe):
342
        SmartClientStreamMedium.__init__(self)
343
        self._readable_pipe = readable_pipe
344
        self._writeable_pipe = writeable_pipe
345
346
    def _accept_bytes(self, bytes):
347
        """See SmartClientStreamMedium.accept_bytes."""
348
        self._writeable_pipe.write(bytes)
349
350
    def _flush(self):
351
        """See SmartClientStreamMedium._flush()."""
352
        self._writeable_pipe.flush()
353
354
    def _read_bytes(self, count):
355
        """See SmartClientStreamMedium._read_bytes."""
356
        return self._readable_pipe.read(count)
357
358
359
class SmartSSHClientMedium(SmartClientStreamMedium):
360
    """A client medium using SSH."""
361
    
362
    def __init__(self, host, port=None, username=None, password=None,
363
            vendor=None):
364
        """Creates a client that will connect on the first use.
365
        
366
        :param vendor: An optional override for the ssh vendor to use. See
367
            bzrlib.transport.ssh for details on ssh vendors.
368
        """
369
        SmartClientStreamMedium.__init__(self)
370
        self._connected = False
371
        self._host = host
372
        self._password = password
373
        self._port = port
374
        self._username = username
375
        self._read_from = None
376
        self._ssh_connection = None
377
        self._vendor = vendor
378
        self._write_to = None
379
380
    def _accept_bytes(self, bytes):
381
        """See SmartClientStreamMedium.accept_bytes."""
382
        self._ensure_connection()
383
        self._write_to.write(bytes)
384
385
    def disconnect(self):
386
        """See SmartClientMedium.disconnect()."""
387
        if not self._connected:
388
            return
389
        self._read_from.close()
390
        self._write_to.close()
391
        self._ssh_connection.close()
392
        self._connected = False
393
394
    def _ensure_connection(self):
395
        """Connect this medium if not already connected."""
396
        if self._connected:
397
            return
398
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
399
        if self._vendor is None:
400
            vendor = ssh._get_ssh_vendor()
401
        else:
402
            vendor = self._vendor
403
        self._ssh_connection = vendor.connect_ssh(self._username,
404
                self._password, self._host, self._port,
405
                command=[executable, 'serve', '--inet', '--directory=/',
406
                         '--allow-writes'])
407
        self._read_from, self._write_to = \
408
            self._ssh_connection.get_filelike_channels()
409
        self._connected = True
410
411
    def _flush(self):
412
        """See SmartClientStreamMedium._flush()."""
413
        self._write_to.flush()
414
415
    def _read_bytes(self, count):
416
        """See SmartClientStreamMedium.read_bytes."""
417
        if not self._connected:
418
            raise errors.MediumNotConnected(self)
419
        return self._read_from.read(count)
420
421
422
class SmartTCPClientMedium(SmartClientStreamMedium):
423
    """A client medium using TCP."""
424
    
425
    def __init__(self, host, port):
426
        """Creates a client that will connect on the first use."""
427
        SmartClientStreamMedium.__init__(self)
428
        self._connected = False
429
        self._host = host
430
        self._port = port
431
        self._socket = None
432
433
    def _accept_bytes(self, bytes):
434
        """See SmartClientMedium.accept_bytes."""
435
        self._ensure_connection()
436
        self._socket.sendall(bytes)
437
438
    def disconnect(self):
439
        """See SmartClientMedium.disconnect()."""
440
        if not self._connected:
441
            return
442
        self._socket.close()
443
        self._socket = None
444
        self._connected = False
445
446
    def _ensure_connection(self):
447
        """Connect this medium if not already connected."""
448
        if self._connected:
449
            return
450
        self._socket = socket.socket()
451
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
452
        result = self._socket.connect_ex((self._host, int(self._port)))
453
        if result:
454
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
455
                    (self._host, self._port, os.strerror(result)))
456
        self._connected = True
457
458
    def _flush(self):
459
        """See SmartClientStreamMedium._flush().
460
        
461
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
462
        add a means to do a flush, but that can be done in the future.
463
        """
464
465
    def _read_bytes(self, count):
466
        """See SmartClientMedium.read_bytes."""
467
        if not self._connected:
468
            raise errors.MediumNotConnected(self)
469
        return self._socket.recv(count)
470
471
472
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
473
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
474
475
    def __init__(self, medium):
476
        SmartClientMediumRequest.__init__(self, medium)
477
        # check that we are safe concurrency wise. If some streams start
478
        # allowing concurrent requests - i.e. via multiplexing - then this
479
        # assert should be moved to SmartClientStreamMedium.get_request,
480
        # and the setting/unsetting of _current_request likewise moved into
481
        # that class : but its unneeded overhead for now. RBC 20060922
482
        if self._medium._current_request is not None:
483
            raise errors.TooManyConcurrentRequests(self._medium)
484
        self._medium._current_request = self
485
486
    def _accept_bytes(self, bytes):
487
        """See SmartClientMediumRequest._accept_bytes.
488
        
489
        This forwards to self._medium._accept_bytes because we are operating
490
        on the mediums stream.
491
        """
492
        self._medium._accept_bytes(bytes)
493
494
    def _finished_reading(self):
495
        """See SmartClientMediumRequest._finished_reading.
496
497
        This clears the _current_request on self._medium to allow a new 
498
        request to be created.
499
        """
500
        assert self._medium._current_request is self
501
        self._medium._current_request = None
502
        
503
    def _finished_writing(self):
504
        """See SmartClientMediumRequest._finished_writing.
505
506
        This invokes self._medium._flush to ensure all bytes are transmitted.
507
        """
508
        self._medium._flush()
509
510
    def _read_bytes(self, count):
511
        """See SmartClientMediumRequest._read_bytes.
512
        
513
        This forwards to self._medium._read_bytes because we are operating
514
        on the mediums stream.
515
        """
516
        return self._medium._read_bytes(count)
517