1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
35
from bzrlib.smart.protocol import (
37
SmartServerRequestProtocolOne,
38
SmartServerRequestProtocolTwo,
40
from bzrlib.transport import ssh
43
class SmartServerStreamMedium(object):
44
"""Handles smart commands coming over a stream.
46
The stream may be a pipe connected to sshd, or a tcp socket, or an
47
in-process fifo for testing.
49
One instance is created for each connected client; it can serve multiple
50
requests in the lifetime of the connection.
52
The server passes requests through to an underlying backing transport,
53
which will typically be a LocalTransport looking at the server's filesystem.
56
def __init__(self, backing_transport):
57
"""Construct new server.
59
:param backing_transport: Transport for the directory served.
61
# backing_transport could be passed to serve instead of __init__
62
self.backing_transport = backing_transport
66
"""Serve requests until the client disconnects."""
67
# Keep a reference to stderr because the sys module's globals get set to
68
# None during interpreter shutdown.
69
from sys import stderr
71
while not self.finished:
72
server_protocol = self._build_protocol()
73
self._serve_one_request(server_protocol)
75
stderr.write("%s terminating on exception %s\n" % (self, e))
78
def _build_protocol(self):
79
"""Identifies the version of the incoming request, and returns an
80
a protocol object that can interpret it.
82
If more bytes than the version prefix of the request are read, they will
83
be fed into the protocol before it is returned.
85
:returns: a SmartServerRequestProtocol.
87
# Identify the protocol version.
88
bytes = self._get_line()
89
if bytes.startswith(REQUEST_VERSION_TWO):
90
protocol_class = SmartServerRequestProtocolTwo
91
bytes = bytes[len(REQUEST_VERSION_TWO):]
93
protocol_class = SmartServerRequestProtocolOne
94
protocol = protocol_class(self.backing_transport, self._write_out)
95
protocol.accept_bytes(bytes)
98
def _serve_one_request(self, protocol):
99
"""Read one request from input, process, send back a response.
101
:param protocol: a SmartServerRequestProtocol.
104
self._serve_one_request_unguarded(protocol)
105
except KeyboardInterrupt:
108
self.terminate_due_to_error()
110
def terminate_due_to_error(self):
111
"""Called when an unhandled exception from the protocol occurs."""
112
raise NotImplementedError(self.terminate_due_to_error)
114
def _get_bytes(self, desired_count):
115
"""Get some bytes from the medium.
117
:param desired_count: number of bytes we want to read.
119
raise NotImplementedError(self._get_bytes)
122
"""Read bytes from this request's response until a newline byte.
124
This isn't particularly efficient, so should only be used when the
125
expected size of the line is quite short.
127
:returns: a string of bytes ending in a newline (byte 0x0A).
129
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
131
while not line or line[-1] != '\n':
132
new_char = self._get_bytes(1)
135
# Ran out of bytes before receiving a complete line.
140
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
142
def __init__(self, sock, backing_transport):
145
:param sock: the socket the server will read from. It will be put
148
SmartServerStreamMedium.__init__(self, backing_transport)
150
sock.setblocking(True)
153
def _serve_one_request_unguarded(self, protocol):
154
while protocol.next_read_size():
156
protocol.accept_bytes(self.push_back)
159
bytes = self._get_bytes(4096)
163
protocol.accept_bytes(bytes)
165
self.push_back = protocol.excess_buffer
167
def _get_bytes(self, desired_count):
168
# We ignore the desired_count because on sockets it's more efficient to
170
return self.socket.recv(4096)
172
def terminate_due_to_error(self):
173
"""Called when an unhandled exception from the protocol occurs."""
174
# TODO: This should log to a server log file, but no such thing
175
# exists yet. Andrew Bennetts 2006-09-29.
179
def _write_out(self, bytes):
180
self.socket.sendall(bytes)
183
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
185
def __init__(self, in_file, out_file, backing_transport):
186
"""Construct new server.
188
:param in_file: Python file from which requests can be read.
189
:param out_file: Python file to write responses.
190
:param backing_transport: Transport for the directory served.
192
SmartServerStreamMedium.__init__(self, backing_transport)
193
if sys.platform == 'win32':
194
# force binary mode for files
196
for f in (in_file, out_file):
197
fileno = getattr(f, 'fileno', None)
199
msvcrt.setmode(fileno(), os.O_BINARY)
203
def _serve_one_request_unguarded(self, protocol):
205
bytes_to_read = protocol.next_read_size()
206
if bytes_to_read == 0:
207
# Finished serving this request.
210
bytes = self._get_bytes(bytes_to_read)
212
# Connection has been closed.
216
protocol.accept_bytes(bytes)
218
def _get_bytes(self, desired_count):
219
return self._in.read(desired_count)
221
def terminate_due_to_error(self):
222
# TODO: This should log to a server log file, but no such thing
223
# exists yet. Andrew Bennetts 2006-09-29.
227
def _write_out(self, bytes):
228
self._out.write(bytes)
231
class SmartClientMediumRequest(object):
232
"""A request on a SmartClientMedium.
234
Each request allows bytes to be provided to it via accept_bytes, and then
235
the response bytes to be read via read_bytes.
238
request.accept_bytes('123')
239
request.finished_writing()
240
result = request.read_bytes(3)
241
request.finished_reading()
243
It is up to the individual SmartClientMedium whether multiple concurrent
244
requests can exist. See SmartClientMedium.get_request to obtain instances
245
of SmartClientMediumRequest, and the concrete Medium you are using for
246
details on concurrency and pipelining.
249
def __init__(self, medium):
250
"""Construct a SmartClientMediumRequest for the medium medium."""
251
self._medium = medium
252
# we track state by constants - we may want to use the same
253
# pattern as BodyReader if it gets more complex.
254
# valid states are: "writing", "reading", "done"
255
self._state = "writing"
257
def accept_bytes(self, bytes):
258
"""Accept bytes for inclusion in this request.
260
This method may not be be called after finished_writing() has been
261
called. It depends upon the Medium whether or not the bytes will be
262
immediately transmitted. Message based Mediums will tend to buffer the
263
bytes until finished_writing() is called.
265
:param bytes: A bytestring.
267
if self._state != "writing":
268
raise errors.WritingCompleted(self)
269
self._accept_bytes(bytes)
271
def _accept_bytes(self, bytes):
272
"""Helper for accept_bytes.
274
Accept_bytes checks the state of the request to determing if bytes
275
should be accepted. After that it hands off to _accept_bytes to do the
278
raise NotImplementedError(self._accept_bytes)
280
def finished_reading(self):
281
"""Inform the request that all desired data has been read.
283
This will remove the request from the pipeline for its medium (if the
284
medium supports pipelining) and any further calls to methods on the
285
request will raise ReadingCompleted.
287
if self._state == "writing":
288
raise errors.WritingNotComplete(self)
289
if self._state != "reading":
290
raise errors.ReadingCompleted(self)
292
self._finished_reading()
294
def _finished_reading(self):
295
"""Helper for finished_reading.
297
finished_reading checks the state of the request to determine if
298
finished_reading is allowed, and if it is hands off to _finished_reading
299
to perform the action.
301
raise NotImplementedError(self._finished_reading)
303
def finished_writing(self):
304
"""Finish the writing phase of this request.
306
This will flush all pending data for this request along the medium.
307
After calling finished_writing, you may not call accept_bytes anymore.
309
if self._state != "writing":
310
raise errors.WritingCompleted(self)
311
self._state = "reading"
312
self._finished_writing()
314
def _finished_writing(self):
315
"""Helper for finished_writing.
317
finished_writing checks the state of the request to determine if
318
finished_writing is allowed, and if it is hands off to _finished_writing
319
to perform the action.
321
raise NotImplementedError(self._finished_writing)
323
def read_bytes(self, count):
324
"""Read bytes from this requests response.
326
This method will block and wait for count bytes to be read. It may not
327
be invoked until finished_writing() has been called - this is to ensure
328
a message-based approach to requests, for compatibility with message
329
based mediums like HTTP.
331
if self._state == "writing":
332
raise errors.WritingNotComplete(self)
333
if self._state != "reading":
334
raise errors.ReadingCompleted(self)
335
return self._read_bytes(count)
337
def _read_bytes(self, count):
338
"""Helper for read_bytes.
340
read_bytes checks the state of the request to determing if bytes
341
should be read. After that it hands off to _read_bytes to do the
344
raise NotImplementedError(self._read_bytes)
347
"""Read bytes from this request's response until a newline byte.
349
This isn't particularly efficient, so should only be used when the
350
expected size of the line is quite short.
352
:returns: a string of bytes ending in a newline (byte 0x0A).
354
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
356
while not line or line[-1] != '\n':
357
new_char = self.read_bytes(1)
360
raise errors.SmartProtocolError(
361
'unexpected end of file reading from server')
365
class SmartClientMedium(object):
366
"""Smart client is a medium for sending smart protocol requests over."""
368
def disconnect(self):
369
"""If this medium maintains a persistent connection, close it.
371
The default implementation does nothing.
375
class SmartClientStreamMedium(SmartClientMedium):
376
"""Stream based medium common class.
378
SmartClientStreamMediums operate on a stream. All subclasses use a common
379
SmartClientStreamMediumRequest for their requests, and should implement
380
_accept_bytes and _read_bytes to allow the request objects to send and
385
self._current_request = None
387
def accept_bytes(self, bytes):
388
self._accept_bytes(bytes)
391
"""The SmartClientStreamMedium knows how to close the stream when it is
397
"""Flush the output stream.
399
This method is used by the SmartClientStreamMediumRequest to ensure that
400
all data for a request is sent, to avoid long timeouts or deadlocks.
402
raise NotImplementedError(self._flush)
404
def get_request(self):
405
"""See SmartClientMedium.get_request().
407
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
410
return SmartClientStreamMediumRequest(self)
412
def read_bytes(self, count):
413
return self._read_bytes(count)
416
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
417
"""A client medium using simple pipes.
419
This client does not manage the pipes: it assumes they will always be open.
422
def __init__(self, readable_pipe, writeable_pipe):
423
SmartClientStreamMedium.__init__(self)
424
self._readable_pipe = readable_pipe
425
self._writeable_pipe = writeable_pipe
427
def _accept_bytes(self, bytes):
428
"""See SmartClientStreamMedium.accept_bytes."""
429
self._writeable_pipe.write(bytes)
432
"""See SmartClientStreamMedium._flush()."""
433
self._writeable_pipe.flush()
435
def _read_bytes(self, count):
436
"""See SmartClientStreamMedium._read_bytes."""
437
return self._readable_pipe.read(count)
440
class SmartSSHClientMedium(SmartClientStreamMedium):
441
"""A client medium using SSH."""
443
def __init__(self, host, port=None, username=None, password=None,
444
vendor=None, bzr_remote_path=None):
445
"""Creates a client that will connect on the first use.
447
:param vendor: An optional override for the ssh vendor to use. See
448
bzrlib.transport.ssh for details on ssh vendors.
450
SmartClientStreamMedium.__init__(self)
451
self._connected = False
453
self._password = password
455
self._username = username
456
self._read_from = None
457
self._ssh_connection = None
458
self._vendor = vendor
459
self._write_to = None
460
self._bzr_remote_path = bzr_remote_path
461
if self._bzr_remote_path is None:
462
symbol_versioning.warn(
463
'bzr_remote_path is required as of bzr 0.92',
464
DeprecationWarning, stacklevel=2)
465
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
467
def _accept_bytes(self, bytes):
468
"""See SmartClientStreamMedium.accept_bytes."""
469
self._ensure_connection()
470
self._write_to.write(bytes)
472
def disconnect(self):
473
"""See SmartClientMedium.disconnect()."""
474
if not self._connected:
476
self._read_from.close()
477
self._write_to.close()
478
self._ssh_connection.close()
479
self._connected = False
481
def _ensure_connection(self):
482
"""Connect this medium if not already connected."""
485
if self._vendor is None:
486
vendor = ssh._get_ssh_vendor()
488
vendor = self._vendor
489
self._ssh_connection = vendor.connect_ssh(self._username,
490
self._password, self._host, self._port,
491
command=[self._bzr_remote_path, 'serve', '--inet',
492
'--directory=/', '--allow-writes'])
493
self._read_from, self._write_to = \
494
self._ssh_connection.get_filelike_channels()
495
self._connected = True
498
"""See SmartClientStreamMedium._flush()."""
499
self._write_to.flush()
501
def _read_bytes(self, count):
502
"""See SmartClientStreamMedium.read_bytes."""
503
if not self._connected:
504
raise errors.MediumNotConnected(self)
505
return self._read_from.read(count)
508
# Port 4155 is the default port for bzr://, registered with IANA.
509
BZR_DEFAULT_INTERFACE = '0.0.0.0'
510
BZR_DEFAULT_PORT = 4155
513
class SmartTCPClientMedium(SmartClientStreamMedium):
514
"""A client medium using TCP."""
516
def __init__(self, host, port):
517
"""Creates a client that will connect on the first use."""
518
SmartClientStreamMedium.__init__(self)
519
self._connected = False
524
def _accept_bytes(self, bytes):
525
"""See SmartClientMedium.accept_bytes."""
526
self._ensure_connection()
527
self._socket.sendall(bytes)
529
def disconnect(self):
530
"""See SmartClientMedium.disconnect()."""
531
if not self._connected:
535
self._connected = False
537
def _ensure_connection(self):
538
"""Connect this medium if not already connected."""
541
self._socket = socket.socket()
542
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
543
if self._port is None:
544
port = BZR_DEFAULT_PORT
546
port = int(self._port)
547
result = self._socket.connect_ex((self._host, port))
549
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
550
(self._host, port, os.strerror(result)))
551
self._connected = True
554
"""See SmartClientStreamMedium._flush().
556
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
557
add a means to do a flush, but that can be done in the future.
560
def _read_bytes(self, count):
561
"""See SmartClientMedium.read_bytes."""
562
if not self._connected:
563
raise errors.MediumNotConnected(self)
564
return self._socket.recv(count)
567
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
568
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
570
def __init__(self, medium):
571
SmartClientMediumRequest.__init__(self, medium)
572
# check that we are safe concurrency wise. If some streams start
573
# allowing concurrent requests - i.e. via multiplexing - then this
574
# assert should be moved to SmartClientStreamMedium.get_request,
575
# and the setting/unsetting of _current_request likewise moved into
576
# that class : but its unneeded overhead for now. RBC 20060922
577
if self._medium._current_request is not None:
578
raise errors.TooManyConcurrentRequests(self._medium)
579
self._medium._current_request = self
581
def _accept_bytes(self, bytes):
582
"""See SmartClientMediumRequest._accept_bytes.
584
This forwards to self._medium._accept_bytes because we are operating
585
on the mediums stream.
587
self._medium._accept_bytes(bytes)
589
def _finished_reading(self):
590
"""See SmartClientMediumRequest._finished_reading.
592
This clears the _current_request on self._medium to allow a new
593
request to be created.
595
assert self._medium._current_request is self
596
self._medium._current_request = None
598
def _finished_writing(self):
599
"""See SmartClientMediumRequest._finished_writing.
601
This invokes self._medium._flush to ensure all bytes are transmitted.
603
self._medium._flush()
605
def _read_bytes(self, count):
606
"""See SmartClientMediumRequest._read_bytes.
608
This forwards to self._medium._read_bytes because we are operating
609
on the mediums stream.
611
return self._medium._read_bytes(count)