~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Andrew Bennetts
  • Date: 2007-04-16 17:49:35 UTC
  • mto: This revision was merged to the branch mainline in revision 2435.
  • Revision ID: andrew.bennetts@canonical.com-20070416174935-oieoe0rv424m14ix
Rename SmartClient to _SmartClient.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""The 'medium' layer for the smart servers and clients.
 
18
 
 
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
 
20
for "the quality between big and small."
 
21
 
 
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
 
23
over SSH), and pass them to and from the protocol logic.  See the overview in
 
24
bzrlib/transport/smart/__init__.py.
 
25
"""
 
26
 
 
27
import os
 
28
import socket
 
29
 
 
30
from bzrlib import errors
 
31
from bzrlib.smart import protocol
 
32
try:
 
33
    from bzrlib.transport import ssh
 
34
except errors.ParamikoNotPresent:
 
35
    # no paramiko.  SmartSSHClientMedium will break.
 
36
    pass
 
37
 
 
38
 
 
39
class SmartServerStreamMedium(object):
 
40
    """Handles smart commands coming over a stream.
 
41
 
 
42
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
43
    in-process fifo for testing.
 
44
 
 
45
    One instance is created for each connected client; it can serve multiple
 
46
    requests in the lifetime of the connection.
 
47
 
 
48
    The server passes requests through to an underlying backing transport, 
 
49
    which will typically be a LocalTransport looking at the server's filesystem.
 
50
    """
 
51
 
 
52
    def __init__(self, backing_transport):
 
53
        """Construct new server.
 
54
 
 
55
        :param backing_transport: Transport for the directory served.
 
56
        """
 
57
        # backing_transport could be passed to serve instead of __init__
 
58
        self.backing_transport = backing_transport
 
59
        self.finished = False
 
60
 
 
61
    def serve(self):
 
62
        """Serve requests until the client disconnects."""
 
63
        # Keep a reference to stderr because the sys module's globals get set to
 
64
        # None during interpreter shutdown.
 
65
        from sys import stderr
 
66
        try:
 
67
            while not self.finished:
 
68
                server_protocol = protocol.SmartServerRequestProtocolOne(
 
69
                    self.backing_transport, self._write_out)
 
70
                self._serve_one_request(server_protocol)
 
71
        except Exception, e:
 
72
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
73
            raise
 
74
 
 
75
    def _serve_one_request(self, protocol):
 
76
        """Read one request from input, process, send back a response.
 
77
        
 
78
        :param protocol: a SmartServerRequestProtocol.
 
79
        """
 
80
        try:
 
81
            self._serve_one_request_unguarded(protocol)
 
82
        except KeyboardInterrupt:
 
83
            raise
 
84
        except Exception, e:
 
85
            self.terminate_due_to_error()
 
86
 
 
87
    def terminate_due_to_error(self):
 
88
        """Called when an unhandled exception from the protocol occurs."""
 
89
        raise NotImplementedError(self.terminate_due_to_error)
 
90
 
 
91
 
 
92
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
93
 
 
94
    def __init__(self, sock, backing_transport):
 
95
        """Constructor.
 
96
 
 
97
        :param sock: the socket the server will read from.  It will be put
 
98
            into blocking mode.
 
99
        """
 
100
        SmartServerStreamMedium.__init__(self, backing_transport)
 
101
        self.push_back = ''
 
102
        sock.setblocking(True)
 
103
        self.socket = sock
 
104
 
 
105
    def _serve_one_request_unguarded(self, protocol):
 
106
        while protocol.next_read_size():
 
107
            if self.push_back:
 
108
                protocol.accept_bytes(self.push_back)
 
109
                self.push_back = ''
 
110
            else:
 
111
                bytes = self.socket.recv(4096)
 
112
                if bytes == '':
 
113
                    self.finished = True
 
114
                    return
 
115
                protocol.accept_bytes(bytes)
 
116
        
 
117
        self.push_back = protocol.excess_buffer
 
118
    
 
119
    def terminate_due_to_error(self):
 
120
        """Called when an unhandled exception from the protocol occurs."""
 
121
        # TODO: This should log to a server log file, but no such thing
 
122
        # exists yet.  Andrew Bennetts 2006-09-29.
 
123
        self.socket.close()
 
124
        self.finished = True
 
125
 
 
126
    def _write_out(self, bytes):
 
127
        self.socket.sendall(bytes)
 
128
 
 
129
 
 
130
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
131
 
 
132
    def __init__(self, in_file, out_file, backing_transport):
 
133
        """Construct new server.
 
134
 
 
135
        :param in_file: Python file from which requests can be read.
 
136
        :param out_file: Python file to write responses.
 
137
        :param backing_transport: Transport for the directory served.
 
138
        """
 
139
        SmartServerStreamMedium.__init__(self, backing_transport)
 
140
        self._in = in_file
 
141
        self._out = out_file
 
142
 
 
143
    def _serve_one_request_unguarded(self, protocol):
 
144
        while True:
 
145
            bytes_to_read = protocol.next_read_size()
 
146
            if bytes_to_read == 0:
 
147
                # Finished serving this request.
 
148
                self._out.flush()
 
149
                return
 
150
            bytes = self._in.read(bytes_to_read)
 
151
            if bytes == '':
 
152
                # Connection has been closed.
 
153
                self.finished = True
 
154
                self._out.flush()
 
155
                return
 
156
            protocol.accept_bytes(bytes)
 
157
 
 
158
    def terminate_due_to_error(self):
 
159
        # TODO: This should log to a server log file, but no such thing
 
160
        # exists yet.  Andrew Bennetts 2006-09-29.
 
161
        self._out.close()
 
162
        self.finished = True
 
163
 
 
164
    def _write_out(self, bytes):
 
165
        self._out.write(bytes)
 
166
 
 
167
 
 
168
class SmartClientMediumRequest(object):
 
169
    """A request on a SmartClientMedium.
 
170
 
 
171
    Each request allows bytes to be provided to it via accept_bytes, and then
 
172
    the response bytes to be read via read_bytes.
 
173
 
 
174
    For instance:
 
175
    request.accept_bytes('123')
 
176
    request.finished_writing()
 
177
    result = request.read_bytes(3)
 
178
    request.finished_reading()
 
179
 
 
180
    It is up to the individual SmartClientMedium whether multiple concurrent
 
181
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
182
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
183
    details on concurrency and pipelining.
 
184
    """
 
185
 
 
186
    def __init__(self, medium):
 
187
        """Construct a SmartClientMediumRequest for the medium medium."""
 
188
        self._medium = medium
 
189
        # we track state by constants - we may want to use the same
 
190
        # pattern as BodyReader if it gets more complex.
 
191
        # valid states are: "writing", "reading", "done"
 
192
        self._state = "writing"
 
193
 
 
194
    def accept_bytes(self, bytes):
 
195
        """Accept bytes for inclusion in this request.
 
196
 
 
197
        This method may not be be called after finished_writing() has been
 
198
        called.  It depends upon the Medium whether or not the bytes will be
 
199
        immediately transmitted. Message based Mediums will tend to buffer the
 
200
        bytes until finished_writing() is called.
 
201
 
 
202
        :param bytes: A bytestring.
 
203
        """
 
204
        if self._state != "writing":
 
205
            raise errors.WritingCompleted(self)
 
206
        self._accept_bytes(bytes)
 
207
 
 
208
    def _accept_bytes(self, bytes):
 
209
        """Helper for accept_bytes.
 
210
 
 
211
        Accept_bytes checks the state of the request to determing if bytes
 
212
        should be accepted. After that it hands off to _accept_bytes to do the
 
213
        actual acceptance.
 
214
        """
 
215
        raise NotImplementedError(self._accept_bytes)
 
216
 
 
217
    def finished_reading(self):
 
218
        """Inform the request that all desired data has been read.
 
219
 
 
220
        This will remove the request from the pipeline for its medium (if the
 
221
        medium supports pipelining) and any further calls to methods on the
 
222
        request will raise ReadingCompleted.
 
223
        """
 
224
        if self._state == "writing":
 
225
            raise errors.WritingNotComplete(self)
 
226
        if self._state != "reading":
 
227
            raise errors.ReadingCompleted(self)
 
228
        self._state = "done"
 
229
        self._finished_reading()
 
230
 
 
231
    def _finished_reading(self):
 
232
        """Helper for finished_reading.
 
233
 
 
234
        finished_reading checks the state of the request to determine if 
 
235
        finished_reading is allowed, and if it is hands off to _finished_reading
 
236
        to perform the action.
 
237
        """
 
238
        raise NotImplementedError(self._finished_reading)
 
239
 
 
240
    def finished_writing(self):
 
241
        """Finish the writing phase of this request.
 
242
 
 
243
        This will flush all pending data for this request along the medium.
 
244
        After calling finished_writing, you may not call accept_bytes anymore.
 
245
        """
 
246
        if self._state != "writing":
 
247
            raise errors.WritingCompleted(self)
 
248
        self._state = "reading"
 
249
        self._finished_writing()
 
250
 
 
251
    def _finished_writing(self):
 
252
        """Helper for finished_writing.
 
253
 
 
254
        finished_writing checks the state of the request to determine if 
 
255
        finished_writing is allowed, and if it is hands off to _finished_writing
 
256
        to perform the action.
 
257
        """
 
258
        raise NotImplementedError(self._finished_writing)
 
259
 
 
260
    def read_bytes(self, count):
 
261
        """Read bytes from this requests response.
 
262
 
 
263
        This method will block and wait for count bytes to be read. It may not
 
264
        be invoked until finished_writing() has been called - this is to ensure
 
265
        a message-based approach to requests, for compatability with message
 
266
        based mediums like HTTP.
 
267
        """
 
268
        if self._state == "writing":
 
269
            raise errors.WritingNotComplete(self)
 
270
        if self._state != "reading":
 
271
            raise errors.ReadingCompleted(self)
 
272
        return self._read_bytes(count)
 
273
 
 
274
    def _read_bytes(self, count):
 
275
        """Helper for read_bytes.
 
276
 
 
277
        read_bytes checks the state of the request to determing if bytes
 
278
        should be read. After that it hands off to _read_bytes to do the
 
279
        actual read.
 
280
        """
 
281
        raise NotImplementedError(self._read_bytes)
 
282
 
 
283
 
 
284
class SmartClientMedium(object):
 
285
    """Smart client is a medium for sending smart protocol requests over."""
 
286
 
 
287
    def disconnect(self):
 
288
        """If this medium maintains a persistent connection, close it.
 
289
        
 
290
        The default implementation does nothing.
 
291
        """
 
292
        
 
293
 
 
294
class SmartClientStreamMedium(SmartClientMedium):
 
295
    """Stream based medium common class.
 
296
 
 
297
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
298
    SmartClientStreamMediumRequest for their requests, and should implement
 
299
    _accept_bytes and _read_bytes to allow the request objects to send and
 
300
    receive bytes.
 
301
    """
 
302
 
 
303
    def __init__(self):
 
304
        self._current_request = None
 
305
 
 
306
    def accept_bytes(self, bytes):
 
307
        self._accept_bytes(bytes)
 
308
 
 
309
    def __del__(self):
 
310
        """The SmartClientStreamMedium knows how to close the stream when it is
 
311
        finished with it.
 
312
        """
 
313
        self.disconnect()
 
314
 
 
315
    def _flush(self):
 
316
        """Flush the output stream.
 
317
        
 
318
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
319
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
320
        """
 
321
        raise NotImplementedError(self._flush)
 
322
 
 
323
    def get_request(self):
 
324
        """See SmartClientMedium.get_request().
 
325
 
 
326
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
327
        for get_request.
 
328
        """
 
329
        return SmartClientStreamMediumRequest(self)
 
330
 
 
331
    def read_bytes(self, count):
 
332
        return self._read_bytes(count)
 
333
 
 
334
 
 
335
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
336
    """A client medium using simple pipes.
 
337
    
 
338
    This client does not manage the pipes: it assumes they will always be open.
 
339
    """
 
340
 
 
341
    def __init__(self, readable_pipe, writeable_pipe):
 
342
        SmartClientStreamMedium.__init__(self)
 
343
        self._readable_pipe = readable_pipe
 
344
        self._writeable_pipe = writeable_pipe
 
345
 
 
346
    def _accept_bytes(self, bytes):
 
347
        """See SmartClientStreamMedium.accept_bytes."""
 
348
        self._writeable_pipe.write(bytes)
 
349
 
 
350
    def _flush(self):
 
351
        """See SmartClientStreamMedium._flush()."""
 
352
        self._writeable_pipe.flush()
 
353
 
 
354
    def _read_bytes(self, count):
 
355
        """See SmartClientStreamMedium._read_bytes."""
 
356
        return self._readable_pipe.read(count)
 
357
 
 
358
 
 
359
class SmartSSHClientMedium(SmartClientStreamMedium):
 
360
    """A client medium using SSH."""
 
361
    
 
362
    def __init__(self, host, port=None, username=None, password=None,
 
363
            vendor=None):
 
364
        """Creates a client that will connect on the first use.
 
365
        
 
366
        :param vendor: An optional override for the ssh vendor to use. See
 
367
            bzrlib.transport.ssh for details on ssh vendors.
 
368
        """
 
369
        SmartClientStreamMedium.__init__(self)
 
370
        self._connected = False
 
371
        self._host = host
 
372
        self._password = password
 
373
        self._port = port
 
374
        self._username = username
 
375
        self._read_from = None
 
376
        self._ssh_connection = None
 
377
        self._vendor = vendor
 
378
        self._write_to = None
 
379
 
 
380
    def _accept_bytes(self, bytes):
 
381
        """See SmartClientStreamMedium.accept_bytes."""
 
382
        self._ensure_connection()
 
383
        self._write_to.write(bytes)
 
384
 
 
385
    def disconnect(self):
 
386
        """See SmartClientMedium.disconnect()."""
 
387
        if not self._connected:
 
388
            return
 
389
        self._read_from.close()
 
390
        self._write_to.close()
 
391
        self._ssh_connection.close()
 
392
        self._connected = False
 
393
 
 
394
    def _ensure_connection(self):
 
395
        """Connect this medium if not already connected."""
 
396
        if self._connected:
 
397
            return
 
398
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
399
        if self._vendor is None:
 
400
            vendor = ssh._get_ssh_vendor()
 
401
        else:
 
402
            vendor = self._vendor
 
403
        self._ssh_connection = vendor.connect_ssh(self._username,
 
404
                self._password, self._host, self._port,
 
405
                command=[executable, 'serve', '--inet', '--directory=/',
 
406
                         '--allow-writes'])
 
407
        self._read_from, self._write_to = \
 
408
            self._ssh_connection.get_filelike_channels()
 
409
        self._connected = True
 
410
 
 
411
    def _flush(self):
 
412
        """See SmartClientStreamMedium._flush()."""
 
413
        self._write_to.flush()
 
414
 
 
415
    def _read_bytes(self, count):
 
416
        """See SmartClientStreamMedium.read_bytes."""
 
417
        if not self._connected:
 
418
            raise errors.MediumNotConnected(self)
 
419
        return self._read_from.read(count)
 
420
 
 
421
 
 
422
class SmartTCPClientMedium(SmartClientStreamMedium):
 
423
    """A client medium using TCP."""
 
424
    
 
425
    def __init__(self, host, port):
 
426
        """Creates a client that will connect on the first use."""
 
427
        SmartClientStreamMedium.__init__(self)
 
428
        self._connected = False
 
429
        self._host = host
 
430
        self._port = port
 
431
        self._socket = None
 
432
 
 
433
    def _accept_bytes(self, bytes):
 
434
        """See SmartClientMedium.accept_bytes."""
 
435
        self._ensure_connection()
 
436
        self._socket.sendall(bytes)
 
437
 
 
438
    def disconnect(self):
 
439
        """See SmartClientMedium.disconnect()."""
 
440
        if not self._connected:
 
441
            return
 
442
        self._socket.close()
 
443
        self._socket = None
 
444
        self._connected = False
 
445
 
 
446
    def _ensure_connection(self):
 
447
        """Connect this medium if not already connected."""
 
448
        if self._connected:
 
449
            return
 
450
        self._socket = socket.socket()
 
451
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
452
        result = self._socket.connect_ex((self._host, int(self._port)))
 
453
        if result:
 
454
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
455
                    (self._host, self._port, os.strerror(result)))
 
456
        self._connected = True
 
457
 
 
458
    def _flush(self):
 
459
        """See SmartClientStreamMedium._flush().
 
460
        
 
461
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
462
        add a means to do a flush, but that can be done in the future.
 
463
        """
 
464
 
 
465
    def _read_bytes(self, count):
 
466
        """See SmartClientMedium.read_bytes."""
 
467
        if not self._connected:
 
468
            raise errors.MediumNotConnected(self)
 
469
        return self._socket.recv(count)
 
470
 
 
471
 
 
472
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
473
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
474
 
 
475
    def __init__(self, medium):
 
476
        SmartClientMediumRequest.__init__(self, medium)
 
477
        # check that we are safe concurrency wise. If some streams start
 
478
        # allowing concurrent requests - i.e. via multiplexing - then this
 
479
        # assert should be moved to SmartClientStreamMedium.get_request,
 
480
        # and the setting/unsetting of _current_request likewise moved into
 
481
        # that class : but its unneeded overhead for now. RBC 20060922
 
482
        if self._medium._current_request is not None:
 
483
            raise errors.TooManyConcurrentRequests(self._medium)
 
484
        self._medium._current_request = self
 
485
 
 
486
    def _accept_bytes(self, bytes):
 
487
        """See SmartClientMediumRequest._accept_bytes.
 
488
        
 
489
        This forwards to self._medium._accept_bytes because we are operating
 
490
        on the mediums stream.
 
491
        """
 
492
        self._medium._accept_bytes(bytes)
 
493
 
 
494
    def _finished_reading(self):
 
495
        """See SmartClientMediumRequest._finished_reading.
 
496
 
 
497
        This clears the _current_request on self._medium to allow a new 
 
498
        request to be created.
 
499
        """
 
500
        assert self._medium._current_request is self
 
501
        self._medium._current_request = None
 
502
        
 
503
    def _finished_writing(self):
 
504
        """See SmartClientMediumRequest._finished_writing.
 
505
 
 
506
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
507
        """
 
508
        self._medium._flush()
 
509
 
 
510
    def _read_bytes(self, count):
 
511
        """See SmartClientMediumRequest._read_bytes.
 
512
        
 
513
        This forwards to self._medium._read_bytes because we are operating
 
514
        on the mediums stream.
 
515
        """
 
516
        return self._medium._read_bytes(count)
 
517