~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/medium.py

  • Committer: Robert Collins
  • Date: 2007-04-24 10:40:22 UTC
  • mto: (2432.3.5 hpss-vfs-fallback)
  • mto: This revision was merged to the branch mainline in revision 2463.
  • Revision ID: robertc@lifelesswks.robertcollins.net-20070424104022-308ab78a663c23f7
Add SuccessfulSmartServerResponse.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006,2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""The 'medium' layer for the smart servers and clients.
 
18
 
 
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
 
20
for "the quality between big and small."
 
21
 
 
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
 
23
over SSH), and pass them to and from the protocol logic.  See the overview in
 
24
bzrlib/transport/smart/__init__.py.
 
25
"""
 
26
 
 
27
import os
 
28
import socket
 
29
import sys
 
30
from bzrlib import errors
 
31
from bzrlib.smart.protocol import (
 
32
    SmartServerRequestProtocolOne,
 
33
    SmartServerRequestProtocolTwo,
 
34
    )
 
35
 
 
36
try:
 
37
    from bzrlib.transport import ssh
 
38
except errors.ParamikoNotPresent:
 
39
    # no paramiko.  SmartSSHClientMedium will break.
 
40
    pass
 
41
 
 
42
 
 
43
class SmartServerStreamMedium(object):
 
44
    """Handles smart commands coming over a stream.
 
45
 
 
46
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
47
    in-process fifo for testing.
 
48
 
 
49
    One instance is created for each connected client; it can serve multiple
 
50
    requests in the lifetime of the connection.
 
51
 
 
52
    The server passes requests through to an underlying backing transport, 
 
53
    which will typically be a LocalTransport looking at the server's filesystem.
 
54
    """
 
55
 
 
56
    def __init__(self, backing_transport):
 
57
        """Construct new server.
 
58
 
 
59
        :param backing_transport: Transport for the directory served.
 
60
        """
 
61
        # backing_transport could be passed to serve instead of __init__
 
62
        self.backing_transport = backing_transport
 
63
        self.finished = False
 
64
 
 
65
    def serve(self):
 
66
        """Serve requests until the client disconnects."""
 
67
        # Keep a reference to stderr because the sys module's globals get set to
 
68
        # None during interpreter shutdown.
 
69
        from sys import stderr
 
70
        try:
 
71
            while not self.finished:
 
72
                protocol = self._build_protocol()
 
73
                self._serve_one_request(protocol)
 
74
        except Exception, e:
 
75
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
76
            raise
 
77
 
 
78
    def _build_protocol(self):
 
79
        # Identify the protocol version.
 
80
        bytes = self._get_bytes(2)
 
81
        if bytes.startswith('2\x01'):
 
82
            protocol_class = SmartServerRequestProtocolTwo
 
83
            bytes = bytes[2:]
 
84
        else:
 
85
            protocol_class = SmartServerRequestProtocolOne
 
86
        protocol = protocol_class(self.backing_transport, self._write_out)
 
87
        protocol.accept_bytes(bytes)
 
88
        return protocol
 
89
 
 
90
    def _serve_one_request(self, protocol):
 
91
        """Read one request from input, process, send back a response.
 
92
        
 
93
        :param protocol: a SmartServerRequestProtocol.
 
94
        """
 
95
        try:
 
96
            self._serve_one_request_unguarded(protocol)
 
97
        except KeyboardInterrupt:
 
98
            raise
 
99
        except Exception, e:
 
100
            self.terminate_due_to_error()
 
101
 
 
102
    def terminate_due_to_error(self):
 
103
        """Called when an unhandled exception from the protocol occurs."""
 
104
        raise NotImplementedError(self.terminate_due_to_error)
 
105
 
 
106
    def _get_bytes(self, desired_count):
 
107
        """Get some bytes from the medium.
 
108
 
 
109
        :param desired_count: number of bytes we want to read.
 
110
        """
 
111
        raise NotImplementedError(self._get_bytes)
 
112
 
 
113
 
 
114
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
115
 
 
116
    def __init__(self, sock, backing_transport):
 
117
        """Constructor.
 
118
 
 
119
        :param sock: the socket the server will read from.  It will be put
 
120
            into blocking mode.
 
121
        """
 
122
        SmartServerStreamMedium.__init__(self, backing_transport)
 
123
        self.push_back = ''
 
124
        sock.setblocking(True)
 
125
        self.socket = sock
 
126
 
 
127
    def _serve_one_request_unguarded(self, protocol):
 
128
        while protocol.next_read_size():
 
129
            if self.push_back:
 
130
                protocol.accept_bytes(self.push_back)
 
131
                self.push_back = ''
 
132
            else:
 
133
                bytes = self._get_bytes(4096)
 
134
                if bytes == '':
 
135
                    self.finished = True
 
136
                    return
 
137
                protocol.accept_bytes(bytes)
 
138
        
 
139
        self.push_back = protocol.excess_buffer
 
140
 
 
141
    def _get_bytes(self, desired_count):
 
142
        # We ignore the desired_count because on sockets it's more efficient to
 
143
        # read 4k at a time.
 
144
        return self.socket.recv(4096)
 
145
    
 
146
    def terminate_due_to_error(self):
 
147
        """Called when an unhandled exception from the protocol occurs."""
 
148
        # TODO: This should log to a server log file, but no such thing
 
149
        # exists yet.  Andrew Bennetts 2006-09-29.
 
150
        self.socket.close()
 
151
        self.finished = True
 
152
 
 
153
    def _write_out(self, bytes):
 
154
        self.socket.sendall(bytes)
 
155
 
 
156
 
 
157
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
158
 
 
159
    def __init__(self, in_file, out_file, backing_transport):
 
160
        """Construct new server.
 
161
 
 
162
        :param in_file: Python file from which requests can be read.
 
163
        :param out_file: Python file to write responses.
 
164
        :param backing_transport: Transport for the directory served.
 
165
        """
 
166
        SmartServerStreamMedium.__init__(self, backing_transport)
 
167
        if sys.platform == 'win32':
 
168
            # force binary mode for files
 
169
            import msvcrt
 
170
            for f in (in_file, out_file):
 
171
                fileno = getattr(f, 'fileno', None)
 
172
                if fileno:
 
173
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
174
        self._in = in_file
 
175
        self._out = out_file
 
176
 
 
177
    def _serve_one_request_unguarded(self, protocol):
 
178
        while True:
 
179
            bytes_to_read = protocol.next_read_size()
 
180
            if bytes_to_read == 0:
 
181
                # Finished serving this request.
 
182
                self._out.flush()
 
183
                return
 
184
            bytes = self._get_bytes(bytes_to_read)
 
185
            if bytes == '':
 
186
                # Connection has been closed.
 
187
                self.finished = True
 
188
                self._out.flush()
 
189
                return
 
190
            protocol.accept_bytes(bytes)
 
191
 
 
192
    def _get_bytes(self, desired_count):
 
193
        return self._in.read(desired_count)
 
194
 
 
195
    def terminate_due_to_error(self):
 
196
        # TODO: This should log to a server log file, but no such thing
 
197
        # exists yet.  Andrew Bennetts 2006-09-29.
 
198
        self._out.close()
 
199
        self.finished = True
 
200
 
 
201
    def _write_out(self, bytes):
 
202
        self._out.write(bytes)
 
203
 
 
204
 
 
205
class SmartClientMediumRequest(object):
 
206
    """A request on a SmartClientMedium.
 
207
 
 
208
    Each request allows bytes to be provided to it via accept_bytes, and then
 
209
    the response bytes to be read via read_bytes.
 
210
 
 
211
    For instance:
 
212
    request.accept_bytes('123')
 
213
    request.finished_writing()
 
214
    result = request.read_bytes(3)
 
215
    request.finished_reading()
 
216
 
 
217
    It is up to the individual SmartClientMedium whether multiple concurrent
 
218
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
219
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
220
    details on concurrency and pipelining.
 
221
    """
 
222
 
 
223
    def __init__(self, medium):
 
224
        """Construct a SmartClientMediumRequest for the medium medium."""
 
225
        self._medium = medium
 
226
        # we track state by constants - we may want to use the same
 
227
        # pattern as BodyReader if it gets more complex.
 
228
        # valid states are: "writing", "reading", "done"
 
229
        self._state = "writing"
 
230
 
 
231
    def accept_bytes(self, bytes):
 
232
        """Accept bytes for inclusion in this request.
 
233
 
 
234
        This method may not be be called after finished_writing() has been
 
235
        called.  It depends upon the Medium whether or not the bytes will be
 
236
        immediately transmitted. Message based Mediums will tend to buffer the
 
237
        bytes until finished_writing() is called.
 
238
 
 
239
        :param bytes: A bytestring.
 
240
        """
 
241
        if self._state != "writing":
 
242
            raise errors.WritingCompleted(self)
 
243
        self._accept_bytes(bytes)
 
244
 
 
245
    def _accept_bytes(self, bytes):
 
246
        """Helper for accept_bytes.
 
247
 
 
248
        Accept_bytes checks the state of the request to determing if bytes
 
249
        should be accepted. After that it hands off to _accept_bytes to do the
 
250
        actual acceptance.
 
251
        """
 
252
        raise NotImplementedError(self._accept_bytes)
 
253
 
 
254
    def finished_reading(self):
 
255
        """Inform the request that all desired data has been read.
 
256
 
 
257
        This will remove the request from the pipeline for its medium (if the
 
258
        medium supports pipelining) and any further calls to methods on the
 
259
        request will raise ReadingCompleted.
 
260
        """
 
261
        if self._state == "writing":
 
262
            raise errors.WritingNotComplete(self)
 
263
        if self._state != "reading":
 
264
            raise errors.ReadingCompleted(self)
 
265
        self._state = "done"
 
266
        self._finished_reading()
 
267
 
 
268
    def _finished_reading(self):
 
269
        """Helper for finished_reading.
 
270
 
 
271
        finished_reading checks the state of the request to determine if 
 
272
        finished_reading is allowed, and if it is hands off to _finished_reading
 
273
        to perform the action.
 
274
        """
 
275
        raise NotImplementedError(self._finished_reading)
 
276
 
 
277
    def finished_writing(self):
 
278
        """Finish the writing phase of this request.
 
279
 
 
280
        This will flush all pending data for this request along the medium.
 
281
        After calling finished_writing, you may not call accept_bytes anymore.
 
282
        """
 
283
        if self._state != "writing":
 
284
            raise errors.WritingCompleted(self)
 
285
        self._state = "reading"
 
286
        self._finished_writing()
 
287
 
 
288
    def _finished_writing(self):
 
289
        """Helper for finished_writing.
 
290
 
 
291
        finished_writing checks the state of the request to determine if 
 
292
        finished_writing is allowed, and if it is hands off to _finished_writing
 
293
        to perform the action.
 
294
        """
 
295
        raise NotImplementedError(self._finished_writing)
 
296
 
 
297
    def read_bytes(self, count):
 
298
        """Read bytes from this requests response.
 
299
 
 
300
        This method will block and wait for count bytes to be read. It may not
 
301
        be invoked until finished_writing() has been called - this is to ensure
 
302
        a message-based approach to requests, for compatability with message
 
303
        based mediums like HTTP.
 
304
        """
 
305
        if self._state == "writing":
 
306
            raise errors.WritingNotComplete(self)
 
307
        if self._state != "reading":
 
308
            raise errors.ReadingCompleted(self)
 
309
        return self._read_bytes(count)
 
310
 
 
311
    def _read_bytes(self, count):
 
312
        """Helper for read_bytes.
 
313
 
 
314
        read_bytes checks the state of the request to determing if bytes
 
315
        should be read. After that it hands off to _read_bytes to do the
 
316
        actual read.
 
317
        """
 
318
        raise NotImplementedError(self._read_bytes)
 
319
 
 
320
 
 
321
class SmartClientMedium(object):
 
322
    """Smart client is a medium for sending smart protocol requests over."""
 
323
 
 
324
    def disconnect(self):
 
325
        """If this medium maintains a persistent connection, close it.
 
326
        
 
327
        The default implementation does nothing.
 
328
        """
 
329
        
 
330
 
 
331
class SmartClientStreamMedium(SmartClientMedium):
 
332
    """Stream based medium common class.
 
333
 
 
334
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
335
    SmartClientStreamMediumRequest for their requests, and should implement
 
336
    _accept_bytes and _read_bytes to allow the request objects to send and
 
337
    receive bytes.
 
338
    """
 
339
 
 
340
    def __init__(self):
 
341
        self._current_request = None
 
342
 
 
343
    def accept_bytes(self, bytes):
 
344
        self._accept_bytes(bytes)
 
345
 
 
346
    def __del__(self):
 
347
        """The SmartClientStreamMedium knows how to close the stream when it is
 
348
        finished with it.
 
349
        """
 
350
        self.disconnect()
 
351
 
 
352
    def _flush(self):
 
353
        """Flush the output stream.
 
354
        
 
355
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
356
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
357
        """
 
358
        raise NotImplementedError(self._flush)
 
359
 
 
360
    def get_request(self):
 
361
        """See SmartClientMedium.get_request().
 
362
 
 
363
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
364
        for get_request.
 
365
        """
 
366
        return SmartClientStreamMediumRequest(self)
 
367
 
 
368
    def read_bytes(self, count):
 
369
        return self._read_bytes(count)
 
370
 
 
371
 
 
372
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
373
    """A client medium using simple pipes.
 
374
    
 
375
    This client does not manage the pipes: it assumes they will always be open.
 
376
    """
 
377
 
 
378
    def __init__(self, readable_pipe, writeable_pipe):
 
379
        SmartClientStreamMedium.__init__(self)
 
380
        self._readable_pipe = readable_pipe
 
381
        self._writeable_pipe = writeable_pipe
 
382
 
 
383
    def _accept_bytes(self, bytes):
 
384
        """See SmartClientStreamMedium.accept_bytes."""
 
385
        self._writeable_pipe.write(bytes)
 
386
 
 
387
    def _flush(self):
 
388
        """See SmartClientStreamMedium._flush()."""
 
389
        self._writeable_pipe.flush()
 
390
 
 
391
    def _read_bytes(self, count):
 
392
        """See SmartClientStreamMedium._read_bytes."""
 
393
        return self._readable_pipe.read(count)
 
394
 
 
395
 
 
396
class SmartSSHClientMedium(SmartClientStreamMedium):
 
397
    """A client medium using SSH."""
 
398
    
 
399
    def __init__(self, host, port=None, username=None, password=None,
 
400
            vendor=None):
 
401
        """Creates a client that will connect on the first use.
 
402
        
 
403
        :param vendor: An optional override for the ssh vendor to use. See
 
404
            bzrlib.transport.ssh for details on ssh vendors.
 
405
        """
 
406
        SmartClientStreamMedium.__init__(self)
 
407
        self._connected = False
 
408
        self._host = host
 
409
        self._password = password
 
410
        self._port = port
 
411
        self._username = username
 
412
        self._read_from = None
 
413
        self._ssh_connection = None
 
414
        self._vendor = vendor
 
415
        self._write_to = None
 
416
 
 
417
    def _accept_bytes(self, bytes):
 
418
        """See SmartClientStreamMedium.accept_bytes."""
 
419
        self._ensure_connection()
 
420
        self._write_to.write(bytes)
 
421
 
 
422
    def disconnect(self):
 
423
        """See SmartClientMedium.disconnect()."""
 
424
        if not self._connected:
 
425
            return
 
426
        self._read_from.close()
 
427
        self._write_to.close()
 
428
        self._ssh_connection.close()
 
429
        self._connected = False
 
430
 
 
431
    def _ensure_connection(self):
 
432
        """Connect this medium if not already connected."""
 
433
        if self._connected:
 
434
            return
 
435
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
436
        if self._vendor is None:
 
437
            vendor = ssh._get_ssh_vendor()
 
438
        else:
 
439
            vendor = self._vendor
 
440
        self._ssh_connection = vendor.connect_ssh(self._username,
 
441
                self._password, self._host, self._port,
 
442
                command=[executable, 'serve', '--inet', '--directory=/',
 
443
                         '--allow-writes'])
 
444
        self._read_from, self._write_to = \
 
445
            self._ssh_connection.get_filelike_channels()
 
446
        self._connected = True
 
447
 
 
448
    def _flush(self):
 
449
        """See SmartClientStreamMedium._flush()."""
 
450
        self._write_to.flush()
 
451
 
 
452
    def _read_bytes(self, count):
 
453
        """See SmartClientStreamMedium.read_bytes."""
 
454
        if not self._connected:
 
455
            raise errors.MediumNotConnected(self)
 
456
        return self._read_from.read(count)
 
457
 
 
458
 
 
459
class SmartTCPClientMedium(SmartClientStreamMedium):
 
460
    """A client medium using TCP."""
 
461
    
 
462
    def __init__(self, host, port):
 
463
        """Creates a client that will connect on the first use."""
 
464
        SmartClientStreamMedium.__init__(self)
 
465
        self._connected = False
 
466
        self._host = host
 
467
        self._port = port
 
468
        self._socket = None
 
469
 
 
470
    def _accept_bytes(self, bytes):
 
471
        """See SmartClientMedium.accept_bytes."""
 
472
        self._ensure_connection()
 
473
        self._socket.sendall(bytes)
 
474
 
 
475
    def disconnect(self):
 
476
        """See SmartClientMedium.disconnect()."""
 
477
        if not self._connected:
 
478
            return
 
479
        self._socket.close()
 
480
        self._socket = None
 
481
        self._connected = False
 
482
 
 
483
    def _ensure_connection(self):
 
484
        """Connect this medium if not already connected."""
 
485
        if self._connected:
 
486
            return
 
487
        self._socket = socket.socket()
 
488
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
489
        result = self._socket.connect_ex((self._host, int(self._port)))
 
490
        if result:
 
491
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
492
                    (self._host, self._port, os.strerror(result)))
 
493
        self._connected = True
 
494
 
 
495
    def _flush(self):
 
496
        """See SmartClientStreamMedium._flush().
 
497
        
 
498
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
499
        add a means to do a flush, but that can be done in the future.
 
500
        """
 
501
 
 
502
    def _read_bytes(self, count):
 
503
        """See SmartClientMedium.read_bytes."""
 
504
        if not self._connected:
 
505
            raise errors.MediumNotConnected(self)
 
506
        return self._socket.recv(count)
 
507
 
 
508
 
 
509
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
510
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
511
 
 
512
    def __init__(self, medium):
 
513
        SmartClientMediumRequest.__init__(self, medium)
 
514
        # check that we are safe concurrency wise. If some streams start
 
515
        # allowing concurrent requests - i.e. via multiplexing - then this
 
516
        # assert should be moved to SmartClientStreamMedium.get_request,
 
517
        # and the setting/unsetting of _current_request likewise moved into
 
518
        # that class : but its unneeded overhead for now. RBC 20060922
 
519
        if self._medium._current_request is not None:
 
520
            raise errors.TooManyConcurrentRequests(self._medium)
 
521
        self._medium._current_request = self
 
522
 
 
523
    def _accept_bytes(self, bytes):
 
524
        """See SmartClientMediumRequest._accept_bytes.
 
525
        
 
526
        This forwards to self._medium._accept_bytes because we are operating
 
527
        on the mediums stream.
 
528
        """
 
529
        self._medium._accept_bytes(bytes)
 
530
 
 
531
    def _finished_reading(self):
 
532
        """See SmartClientMediumRequest._finished_reading.
 
533
 
 
534
        This clears the _current_request on self._medium to allow a new 
 
535
        request to be created.
 
536
        """
 
537
        assert self._medium._current_request is self
 
538
        self._medium._current_request = None
 
539
        
 
540
    def _finished_writing(self):
 
541
        """See SmartClientMediumRequest._finished_writing.
 
542
 
 
543
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
544
        """
 
545
        self._medium._flush()
 
546
 
 
547
    def _read_bytes(self, count):
 
548
        """See SmartClientMediumRequest._read_bytes.
 
549
        
 
550
        This forwards to self._medium._read_bytes because we are operating
 
551
        on the mediums stream.
 
552
        """
 
553
        return self._medium._read_bytes(count)
 
554
 
 
555