1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
35
from bzrlib.smart.protocol import (
37
SmartServerRequestProtocolOne,
38
SmartServerRequestProtocolTwo,
42
from bzrlib.transport import ssh
43
except errors.ParamikoNotPresent:
44
# no paramiko. SmartSSHClientMedium will break.
48
class SmartServerStreamMedium(object):
49
"""Handles smart commands coming over a stream.
51
The stream may be a pipe connected to sshd, or a tcp socket, or an
52
in-process fifo for testing.
54
One instance is created for each connected client; it can serve multiple
55
requests in the lifetime of the connection.
57
The server passes requests through to an underlying backing transport,
58
which will typically be a LocalTransport looking at the server's filesystem.
61
def __init__(self, backing_transport):
62
"""Construct new server.
64
:param backing_transport: Transport for the directory served.
66
# backing_transport could be passed to serve instead of __init__
67
self.backing_transport = backing_transport
71
"""Serve requests until the client disconnects."""
72
# Keep a reference to stderr because the sys module's globals get set to
73
# None during interpreter shutdown.
74
from sys import stderr
76
while not self.finished:
77
server_protocol = self._build_protocol()
78
self._serve_one_request(server_protocol)
80
stderr.write("%s terminating on exception %s\n" % (self, e))
83
def _build_protocol(self):
84
"""Identifies the version of the incoming request, and returns an
85
a protocol object that can interpret it.
87
If more bytes than the version prefix of the request are read, they will
88
be fed into the protocol before it is returned.
90
:returns: a SmartServerRequestProtocol.
92
# Identify the protocol version.
93
bytes = self._get_line()
94
if bytes.startswith(REQUEST_VERSION_TWO):
95
protocol_class = SmartServerRequestProtocolTwo
96
bytes = bytes[len(REQUEST_VERSION_TWO):]
98
protocol_class = SmartServerRequestProtocolOne
99
protocol = protocol_class(self.backing_transport, self._write_out)
100
protocol.accept_bytes(bytes)
103
def _serve_one_request(self, protocol):
104
"""Read one request from input, process, send back a response.
106
:param protocol: a SmartServerRequestProtocol.
109
self._serve_one_request_unguarded(protocol)
110
except KeyboardInterrupt:
113
self.terminate_due_to_error()
115
def terminate_due_to_error(self):
116
"""Called when an unhandled exception from the protocol occurs."""
117
raise NotImplementedError(self.terminate_due_to_error)
119
def _get_bytes(self, desired_count):
120
"""Get some bytes from the medium.
122
:param desired_count: number of bytes we want to read.
124
raise NotImplementedError(self._get_bytes)
127
"""Read bytes from this request's response until a newline byte.
129
This isn't particularly efficient, so should only be used when the
130
expected size of the line is quite short.
132
:returns: a string of bytes ending in a newline (byte 0x0A).
134
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
136
while not line or line[-1] != '\n':
137
new_char = self._get_bytes(1)
140
# Ran out of bytes before receiving a complete line.
145
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
147
def __init__(self, sock, backing_transport):
150
:param sock: the socket the server will read from. It will be put
153
SmartServerStreamMedium.__init__(self, backing_transport)
155
sock.setblocking(True)
158
def _serve_one_request_unguarded(self, protocol):
159
while protocol.next_read_size():
161
protocol.accept_bytes(self.push_back)
164
bytes = self._get_bytes(4096)
168
protocol.accept_bytes(bytes)
170
self.push_back = protocol.excess_buffer
172
def _get_bytes(self, desired_count):
173
# We ignore the desired_count because on sockets it's more efficient to
175
return self.socket.recv(4096)
177
def terminate_due_to_error(self):
178
"""Called when an unhandled exception from the protocol occurs."""
179
# TODO: This should log to a server log file, but no such thing
180
# exists yet. Andrew Bennetts 2006-09-29.
184
def _write_out(self, bytes):
185
self.socket.sendall(bytes)
188
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
190
def __init__(self, in_file, out_file, backing_transport):
191
"""Construct new server.
193
:param in_file: Python file from which requests can be read.
194
:param out_file: Python file to write responses.
195
:param backing_transport: Transport for the directory served.
197
SmartServerStreamMedium.__init__(self, backing_transport)
198
if sys.platform == 'win32':
199
# force binary mode for files
201
for f in (in_file, out_file):
202
fileno = getattr(f, 'fileno', None)
204
msvcrt.setmode(fileno(), os.O_BINARY)
208
def _serve_one_request_unguarded(self, protocol):
210
bytes_to_read = protocol.next_read_size()
211
if bytes_to_read == 0:
212
# Finished serving this request.
215
bytes = self._get_bytes(bytes_to_read)
217
# Connection has been closed.
221
protocol.accept_bytes(bytes)
223
def _get_bytes(self, desired_count):
224
return self._in.read(desired_count)
226
def terminate_due_to_error(self):
227
# TODO: This should log to a server log file, but no such thing
228
# exists yet. Andrew Bennetts 2006-09-29.
232
def _write_out(self, bytes):
233
self._out.write(bytes)
236
class SmartClientMediumRequest(object):
237
"""A request on a SmartClientMedium.
239
Each request allows bytes to be provided to it via accept_bytes, and then
240
the response bytes to be read via read_bytes.
243
request.accept_bytes('123')
244
request.finished_writing()
245
result = request.read_bytes(3)
246
request.finished_reading()
248
It is up to the individual SmartClientMedium whether multiple concurrent
249
requests can exist. See SmartClientMedium.get_request to obtain instances
250
of SmartClientMediumRequest, and the concrete Medium you are using for
251
details on concurrency and pipelining.
254
def __init__(self, medium):
255
"""Construct a SmartClientMediumRequest for the medium medium."""
256
self._medium = medium
257
# we track state by constants - we may want to use the same
258
# pattern as BodyReader if it gets more complex.
259
# valid states are: "writing", "reading", "done"
260
self._state = "writing"
262
def accept_bytes(self, bytes):
263
"""Accept bytes for inclusion in this request.
265
This method may not be be called after finished_writing() has been
266
called. It depends upon the Medium whether or not the bytes will be
267
immediately transmitted. Message based Mediums will tend to buffer the
268
bytes until finished_writing() is called.
270
:param bytes: A bytestring.
272
if self._state != "writing":
273
raise errors.WritingCompleted(self)
274
self._accept_bytes(bytes)
276
def _accept_bytes(self, bytes):
277
"""Helper for accept_bytes.
279
Accept_bytes checks the state of the request to determing if bytes
280
should be accepted. After that it hands off to _accept_bytes to do the
283
raise NotImplementedError(self._accept_bytes)
285
def finished_reading(self):
286
"""Inform the request that all desired data has been read.
288
This will remove the request from the pipeline for its medium (if the
289
medium supports pipelining) and any further calls to methods on the
290
request will raise ReadingCompleted.
292
if self._state == "writing":
293
raise errors.WritingNotComplete(self)
294
if self._state != "reading":
295
raise errors.ReadingCompleted(self)
297
self._finished_reading()
299
def _finished_reading(self):
300
"""Helper for finished_reading.
302
finished_reading checks the state of the request to determine if
303
finished_reading is allowed, and if it is hands off to _finished_reading
304
to perform the action.
306
raise NotImplementedError(self._finished_reading)
308
def finished_writing(self):
309
"""Finish the writing phase of this request.
311
This will flush all pending data for this request along the medium.
312
After calling finished_writing, you may not call accept_bytes anymore.
314
if self._state != "writing":
315
raise errors.WritingCompleted(self)
316
self._state = "reading"
317
self._finished_writing()
319
def _finished_writing(self):
320
"""Helper for finished_writing.
322
finished_writing checks the state of the request to determine if
323
finished_writing is allowed, and if it is hands off to _finished_writing
324
to perform the action.
326
raise NotImplementedError(self._finished_writing)
328
def read_bytes(self, count):
329
"""Read bytes from this requests response.
331
This method will block and wait for count bytes to be read. It may not
332
be invoked until finished_writing() has been called - this is to ensure
333
a message-based approach to requests, for compatibility with message
334
based mediums like HTTP.
336
if self._state == "writing":
337
raise errors.WritingNotComplete(self)
338
if self._state != "reading":
339
raise errors.ReadingCompleted(self)
340
return self._read_bytes(count)
342
def _read_bytes(self, count):
343
"""Helper for read_bytes.
345
read_bytes checks the state of the request to determing if bytes
346
should be read. After that it hands off to _read_bytes to do the
349
raise NotImplementedError(self._read_bytes)
352
"""Read bytes from this request's response until a newline byte.
354
This isn't particularly efficient, so should only be used when the
355
expected size of the line is quite short.
357
:returns: a string of bytes ending in a newline (byte 0x0A).
359
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
361
while not line or line[-1] != '\n':
362
new_char = self.read_bytes(1)
365
raise errors.SmartProtocolError(
366
'unexpected end of file reading from server')
370
class SmartClientMedium(object):
371
"""Smart client is a medium for sending smart protocol requests over."""
373
def disconnect(self):
374
"""If this medium maintains a persistent connection, close it.
376
The default implementation does nothing.
380
class SmartClientStreamMedium(SmartClientMedium):
381
"""Stream based medium common class.
383
SmartClientStreamMediums operate on a stream. All subclasses use a common
384
SmartClientStreamMediumRequest for their requests, and should implement
385
_accept_bytes and _read_bytes to allow the request objects to send and
390
self._current_request = None
392
def accept_bytes(self, bytes):
393
self._accept_bytes(bytes)
396
"""The SmartClientStreamMedium knows how to close the stream when it is
402
"""Flush the output stream.
404
This method is used by the SmartClientStreamMediumRequest to ensure that
405
all data for a request is sent, to avoid long timeouts or deadlocks.
407
raise NotImplementedError(self._flush)
409
def get_request(self):
410
"""See SmartClientMedium.get_request().
412
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
415
return SmartClientStreamMediumRequest(self)
417
def read_bytes(self, count):
418
return self._read_bytes(count)
421
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
422
"""A client medium using simple pipes.
424
This client does not manage the pipes: it assumes they will always be open.
427
def __init__(self, readable_pipe, writeable_pipe):
428
SmartClientStreamMedium.__init__(self)
429
self._readable_pipe = readable_pipe
430
self._writeable_pipe = writeable_pipe
432
def _accept_bytes(self, bytes):
433
"""See SmartClientStreamMedium.accept_bytes."""
434
self._writeable_pipe.write(bytes)
437
"""See SmartClientStreamMedium._flush()."""
438
self._writeable_pipe.flush()
440
def _read_bytes(self, count):
441
"""See SmartClientStreamMedium._read_bytes."""
442
return self._readable_pipe.read(count)
445
class SmartSSHClientMedium(SmartClientStreamMedium):
446
"""A client medium using SSH."""
448
def __init__(self, host, port=None, username=None, password=None,
449
vendor=None, bzr_remote_path=None):
450
"""Creates a client that will connect on the first use.
452
:param vendor: An optional override for the ssh vendor to use. See
453
bzrlib.transport.ssh for details on ssh vendors.
455
SmartClientStreamMedium.__init__(self)
456
self._connected = False
458
self._password = password
460
self._username = username
461
self._read_from = None
462
self._ssh_connection = None
463
self._vendor = vendor
464
self._write_to = None
465
self._bzr_remote_path = bzr_remote_path
466
if self._bzr_remote_path is None:
467
symbol_versioning.warn(
468
'bzr_remote_path is required as of bzr 0.92',
469
DeprecationWarning, stacklevel=2)
470
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
472
def _accept_bytes(self, bytes):
473
"""See SmartClientStreamMedium.accept_bytes."""
474
self._ensure_connection()
475
self._write_to.write(bytes)
477
def disconnect(self):
478
"""See SmartClientMedium.disconnect()."""
479
if not self._connected:
481
self._read_from.close()
482
self._write_to.close()
483
self._ssh_connection.close()
484
self._connected = False
486
def _ensure_connection(self):
487
"""Connect this medium if not already connected."""
490
if self._vendor is None:
491
vendor = ssh._get_ssh_vendor()
493
vendor = self._vendor
494
self._ssh_connection = vendor.connect_ssh(self._username,
495
self._password, self._host, self._port,
496
command=[self._bzr_remote_path, 'serve', '--inet',
497
'--directory=/', '--allow-writes'])
498
self._read_from, self._write_to = \
499
self._ssh_connection.get_filelike_channels()
500
self._connected = True
503
"""See SmartClientStreamMedium._flush()."""
504
self._write_to.flush()
506
def _read_bytes(self, count):
507
"""See SmartClientStreamMedium.read_bytes."""
508
if not self._connected:
509
raise errors.MediumNotConnected(self)
510
return self._read_from.read(count)
513
class SmartTCPClientMedium(SmartClientStreamMedium):
514
"""A client medium using TCP."""
516
def __init__(self, host, port):
517
"""Creates a client that will connect on the first use."""
518
SmartClientStreamMedium.__init__(self)
519
self._connected = False
524
def _accept_bytes(self, bytes):
525
"""See SmartClientMedium.accept_bytes."""
526
self._ensure_connection()
527
self._socket.sendall(bytes)
529
def disconnect(self):
530
"""See SmartClientMedium.disconnect()."""
531
if not self._connected:
535
self._connected = False
537
def _ensure_connection(self):
538
"""Connect this medium if not already connected."""
541
self._socket = socket.socket()
542
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
543
result = self._socket.connect_ex((self._host, int(self._port)))
545
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
546
(self._host, self._port, os.strerror(result)))
547
self._connected = True
550
"""See SmartClientStreamMedium._flush().
552
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
553
add a means to do a flush, but that can be done in the future.
556
def _read_bytes(self, count):
557
"""See SmartClientMedium.read_bytes."""
558
if not self._connected:
559
raise errors.MediumNotConnected(self)
560
return self._socket.recv(count)
563
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
564
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
566
def __init__(self, medium):
567
SmartClientMediumRequest.__init__(self, medium)
568
# check that we are safe concurrency wise. If some streams start
569
# allowing concurrent requests - i.e. via multiplexing - then this
570
# assert should be moved to SmartClientStreamMedium.get_request,
571
# and the setting/unsetting of _current_request likewise moved into
572
# that class : but its unneeded overhead for now. RBC 20060922
573
if self._medium._current_request is not None:
574
raise errors.TooManyConcurrentRequests(self._medium)
575
self._medium._current_request = self
577
def _accept_bytes(self, bytes):
578
"""See SmartClientMediumRequest._accept_bytes.
580
This forwards to self._medium._accept_bytes because we are operating
581
on the mediums stream.
583
self._medium._accept_bytes(bytes)
585
def _finished_reading(self):
586
"""See SmartClientMediumRequest._finished_reading.
588
This clears the _current_request on self._medium to allow a new
589
request to be created.
591
assert self._medium._current_request is self
592
self._medium._current_request = None
594
def _finished_writing(self):
595
"""See SmartClientMediumRequest._finished_writing.
597
This invokes self._medium._flush to ensure all bytes are transmitted.
599
self._medium._flush()
601
def _read_bytes(self, count):
602
"""See SmartClientMediumRequest._read_bytes.
604
This forwards to self._medium._read_bytes because we are operating
605
on the mediums stream.
607
return self._medium._read_bytes(count)