1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
36
from bzrlib.smart.protocol import (
38
SmartClientRequestProtocolOne,
39
SmartServerRequestProtocolOne,
40
SmartServerRequestProtocolTwo,
42
from bzrlib.transport import ssh
45
class SmartServerStreamMedium(object):
46
"""Handles smart commands coming over a stream.
48
The stream may be a pipe connected to sshd, or a tcp socket, or an
49
in-process fifo for testing.
51
One instance is created for each connected client; it can serve multiple
52
requests in the lifetime of the connection.
54
The server passes requests through to an underlying backing transport,
55
which will typically be a LocalTransport looking at the server's filesystem.
58
def __init__(self, backing_transport, root_client_path='/'):
59
"""Construct new server.
61
:param backing_transport: Transport for the directory served.
63
# backing_transport could be passed to serve instead of __init__
64
self.backing_transport = backing_transport
65
self.root_client_path = root_client_path
69
"""Serve requests until the client disconnects."""
70
# Keep a reference to stderr because the sys module's globals get set to
71
# None during interpreter shutdown.
72
from sys import stderr
74
while not self.finished:
75
server_protocol = self._build_protocol()
76
self._serve_one_request(server_protocol)
78
stderr.write("%s terminating on exception %s\n" % (self, e))
81
def _build_protocol(self):
82
"""Identifies the version of the incoming request, and returns an
83
a protocol object that can interpret it.
85
If more bytes than the version prefix of the request are read, they will
86
be fed into the protocol before it is returned.
88
:returns: a SmartServerRequestProtocol.
90
# Identify the protocol version.
91
bytes = self._get_line()
92
if bytes.startswith(REQUEST_VERSION_TWO):
93
protocol_class = SmartServerRequestProtocolTwo
94
bytes = bytes[len(REQUEST_VERSION_TWO):]
96
protocol_class = SmartServerRequestProtocolOne
97
protocol = protocol_class(
98
self.backing_transport, self._write_out, self.root_client_path)
99
protocol.accept_bytes(bytes)
102
def _serve_one_request(self, protocol):
103
"""Read one request from input, process, send back a response.
105
:param protocol: a SmartServerRequestProtocol.
108
self._serve_one_request_unguarded(protocol)
109
except KeyboardInterrupt:
112
self.terminate_due_to_error()
114
def terminate_due_to_error(self):
115
"""Called when an unhandled exception from the protocol occurs."""
116
raise NotImplementedError(self.terminate_due_to_error)
118
def _get_bytes(self, desired_count):
119
"""Get some bytes from the medium.
121
:param desired_count: number of bytes we want to read.
123
raise NotImplementedError(self._get_bytes)
126
"""Read bytes from this request's response until a newline byte.
128
This isn't particularly efficient, so should only be used when the
129
expected size of the line is quite short.
131
:returns: a string of bytes ending in a newline (byte 0x0A).
133
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
135
while not line or line[-1] != '\n':
136
new_char = self._get_bytes(1)
139
# Ran out of bytes before receiving a complete line.
144
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
146
def __init__(self, sock, backing_transport, root_client_path='/'):
149
:param sock: the socket the server will read from. It will be put
152
SmartServerStreamMedium.__init__(
153
self, backing_transport, root_client_path=root_client_path)
155
sock.setblocking(True)
158
def _serve_one_request_unguarded(self, protocol):
159
while protocol.next_read_size():
161
protocol.accept_bytes(self.push_back)
164
bytes = self._get_bytes(4096)
168
protocol.accept_bytes(bytes)
170
self.push_back = protocol.excess_buffer
172
def _get_bytes(self, desired_count):
173
# We ignore the desired_count because on sockets it's more efficient to
175
return self.socket.recv(4096)
177
def terminate_due_to_error(self):
178
"""Called when an unhandled exception from the protocol occurs."""
179
# TODO: This should log to a server log file, but no such thing
180
# exists yet. Andrew Bennetts 2006-09-29.
184
def _write_out(self, bytes):
185
osutils.send_all(self.socket, bytes)
188
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
190
def __init__(self, in_file, out_file, backing_transport):
191
"""Construct new server.
193
:param in_file: Python file from which requests can be read.
194
:param out_file: Python file to write responses.
195
:param backing_transport: Transport for the directory served.
197
SmartServerStreamMedium.__init__(self, backing_transport)
198
if sys.platform == 'win32':
199
# force binary mode for files
201
for f in (in_file, out_file):
202
fileno = getattr(f, 'fileno', None)
204
msvcrt.setmode(fileno(), os.O_BINARY)
208
def _serve_one_request_unguarded(self, protocol):
210
bytes_to_read = protocol.next_read_size()
211
if bytes_to_read == 0:
212
# Finished serving this request.
215
bytes = self._get_bytes(bytes_to_read)
217
# Connection has been closed.
221
protocol.accept_bytes(bytes)
223
def _get_bytes(self, desired_count):
224
return self._in.read(desired_count)
226
def terminate_due_to_error(self):
227
# TODO: This should log to a server log file, but no such thing
228
# exists yet. Andrew Bennetts 2006-09-29.
232
def _write_out(self, bytes):
233
self._out.write(bytes)
236
class SmartClientMediumRequest(object):
237
"""A request on a SmartClientMedium.
239
Each request allows bytes to be provided to it via accept_bytes, and then
240
the response bytes to be read via read_bytes.
243
request.accept_bytes('123')
244
request.finished_writing()
245
result = request.read_bytes(3)
246
request.finished_reading()
248
It is up to the individual SmartClientMedium whether multiple concurrent
249
requests can exist. See SmartClientMedium.get_request to obtain instances
250
of SmartClientMediumRequest, and the concrete Medium you are using for
251
details on concurrency and pipelining.
254
def __init__(self, medium):
255
"""Construct a SmartClientMediumRequest for the medium medium."""
256
self._medium = medium
257
# we track state by constants - we may want to use the same
258
# pattern as BodyReader if it gets more complex.
259
# valid states are: "writing", "reading", "done"
260
self._state = "writing"
262
def accept_bytes(self, bytes):
263
"""Accept bytes for inclusion in this request.
265
This method may not be be called after finished_writing() has been
266
called. It depends upon the Medium whether or not the bytes will be
267
immediately transmitted. Message based Mediums will tend to buffer the
268
bytes until finished_writing() is called.
270
:param bytes: A bytestring.
272
if self._state != "writing":
273
raise errors.WritingCompleted(self)
274
self._accept_bytes(bytes)
276
def _accept_bytes(self, bytes):
277
"""Helper for accept_bytes.
279
Accept_bytes checks the state of the request to determing if bytes
280
should be accepted. After that it hands off to _accept_bytes to do the
283
raise NotImplementedError(self._accept_bytes)
285
def finished_reading(self):
286
"""Inform the request that all desired data has been read.
288
This will remove the request from the pipeline for its medium (if the
289
medium supports pipelining) and any further calls to methods on the
290
request will raise ReadingCompleted.
292
if self._state == "writing":
293
raise errors.WritingNotComplete(self)
294
if self._state != "reading":
295
raise errors.ReadingCompleted(self)
297
self._finished_reading()
299
def _finished_reading(self):
300
"""Helper for finished_reading.
302
finished_reading checks the state of the request to determine if
303
finished_reading is allowed, and if it is hands off to _finished_reading
304
to perform the action.
306
raise NotImplementedError(self._finished_reading)
308
def finished_writing(self):
309
"""Finish the writing phase of this request.
311
This will flush all pending data for this request along the medium.
312
After calling finished_writing, you may not call accept_bytes anymore.
314
if self._state != "writing":
315
raise errors.WritingCompleted(self)
316
self._state = "reading"
317
self._finished_writing()
319
def _finished_writing(self):
320
"""Helper for finished_writing.
322
finished_writing checks the state of the request to determine if
323
finished_writing is allowed, and if it is hands off to _finished_writing
324
to perform the action.
326
raise NotImplementedError(self._finished_writing)
328
def read_bytes(self, count):
329
"""Read bytes from this requests response.
331
This method will block and wait for count bytes to be read. It may not
332
be invoked until finished_writing() has been called - this is to ensure
333
a message-based approach to requests, for compatibility with message
334
based mediums like HTTP.
336
if self._state == "writing":
337
raise errors.WritingNotComplete(self)
338
if self._state != "reading":
339
raise errors.ReadingCompleted(self)
340
return self._read_bytes(count)
342
def _read_bytes(self, count):
343
"""Helper for read_bytes.
345
read_bytes checks the state of the request to determing if bytes
346
should be read. After that it hands off to _read_bytes to do the
349
raise NotImplementedError(self._read_bytes)
352
"""Read bytes from this request's response until a newline byte.
354
This isn't particularly efficient, so should only be used when the
355
expected size of the line is quite short.
357
:returns: a string of bytes ending in a newline (byte 0x0A).
359
# XXX: this duplicates SmartClientRequestProtocolOne._recv_tuple
361
while not line or line[-1] != '\n':
362
new_char = self.read_bytes(1)
365
# end of file encountered reading from server
366
raise errors.ConnectionReset(
367
"please check connectivity and permissions",
368
"(and try -Dhpss if further diagnosis is required)")
372
class SmartClientMedium(object):
373
"""Smart client is a medium for sending smart protocol requests over."""
376
super(SmartClientMedium, self).__init__()
377
self._protocol_version_error = None
378
self._protocol_version = None
380
def protocol_version(self):
381
"""Find out the best protocol version to use."""
382
if self._protocol_version_error is not None:
383
raise self._protocol_version_error
384
if self._protocol_version is None:
386
medium_request = self.get_request()
387
# Send a 'hello' request in protocol version one, for maximum
388
# backwards compatibility.
389
client_protocol = SmartClientRequestProtocolOne(medium_request)
390
self._protocol_version = client_protocol.query_version()
391
except errors.SmartProtocolError, e:
392
# Cache the error, just like we would cache a successful
394
self._protocol_version_error = e
396
return self._protocol_version
398
def disconnect(self):
399
"""If this medium maintains a persistent connection, close it.
401
The default implementation does nothing.
405
class SmartClientStreamMedium(SmartClientMedium):
406
"""Stream based medium common class.
408
SmartClientStreamMediums operate on a stream. All subclasses use a common
409
SmartClientStreamMediumRequest for their requests, and should implement
410
_accept_bytes and _read_bytes to allow the request objects to send and
415
SmartClientMedium.__init__(self)
416
self._current_request = None
417
# Be optimistic: we assume the remote end can accept new remote
418
# requests until we get an error saying otherwise. (1.2 adds some
419
# requests that send bodies, which confuses older servers.)
420
self._remote_is_at_least_1_2 = True
422
def accept_bytes(self, bytes):
423
self._accept_bytes(bytes)
426
"""The SmartClientStreamMedium knows how to close the stream when it is
432
"""Flush the output stream.
434
This method is used by the SmartClientStreamMediumRequest to ensure that
435
all data for a request is sent, to avoid long timeouts or deadlocks.
437
raise NotImplementedError(self._flush)
439
def get_request(self):
440
"""See SmartClientMedium.get_request().
442
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
445
return SmartClientStreamMediumRequest(self)
447
def read_bytes(self, count):
448
return self._read_bytes(count)
451
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
452
"""A client medium using simple pipes.
454
This client does not manage the pipes: it assumes they will always be open.
457
def __init__(self, readable_pipe, writeable_pipe):
458
SmartClientStreamMedium.__init__(self)
459
self._readable_pipe = readable_pipe
460
self._writeable_pipe = writeable_pipe
462
def _accept_bytes(self, bytes):
463
"""See SmartClientStreamMedium.accept_bytes."""
464
self._writeable_pipe.write(bytes)
467
"""See SmartClientStreamMedium._flush()."""
468
self._writeable_pipe.flush()
470
def _read_bytes(self, count):
471
"""See SmartClientStreamMedium._read_bytes."""
472
return self._readable_pipe.read(count)
475
class SmartSSHClientMedium(SmartClientStreamMedium):
476
"""A client medium using SSH."""
478
def __init__(self, host, port=None, username=None, password=None,
479
vendor=None, bzr_remote_path=None):
480
"""Creates a client that will connect on the first use.
482
:param vendor: An optional override for the ssh vendor to use. See
483
bzrlib.transport.ssh for details on ssh vendors.
485
SmartClientStreamMedium.__init__(self)
486
self._connected = False
488
self._password = password
490
self._username = username
491
self._read_from = None
492
self._ssh_connection = None
493
self._vendor = vendor
494
self._write_to = None
495
self._bzr_remote_path = bzr_remote_path
496
if self._bzr_remote_path is None:
497
symbol_versioning.warn(
498
'bzr_remote_path is required as of bzr 0.92',
499
DeprecationWarning, stacklevel=2)
500
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
502
def _accept_bytes(self, bytes):
503
"""See SmartClientStreamMedium.accept_bytes."""
504
self._ensure_connection()
505
self._write_to.write(bytes)
507
def disconnect(self):
508
"""See SmartClientMedium.disconnect()."""
509
if not self._connected:
511
self._read_from.close()
512
self._write_to.close()
513
self._ssh_connection.close()
514
self._connected = False
516
def _ensure_connection(self):
517
"""Connect this medium if not already connected."""
520
if self._vendor is None:
521
vendor = ssh._get_ssh_vendor()
523
vendor = self._vendor
524
self._ssh_connection = vendor.connect_ssh(self._username,
525
self._password, self._host, self._port,
526
command=[self._bzr_remote_path, 'serve', '--inet',
527
'--directory=/', '--allow-writes'])
528
self._read_from, self._write_to = \
529
self._ssh_connection.get_filelike_channels()
530
self._connected = True
533
"""See SmartClientStreamMedium._flush()."""
534
self._write_to.flush()
536
def _read_bytes(self, count):
537
"""See SmartClientStreamMedium.read_bytes."""
538
if not self._connected:
539
raise errors.MediumNotConnected(self)
540
return self._read_from.read(count)
543
# Port 4155 is the default port for bzr://, registered with IANA.
544
BZR_DEFAULT_INTERFACE = '0.0.0.0'
545
BZR_DEFAULT_PORT = 4155
548
class SmartTCPClientMedium(SmartClientStreamMedium):
549
"""A client medium using TCP."""
551
def __init__(self, host, port):
552
"""Creates a client that will connect on the first use."""
553
SmartClientStreamMedium.__init__(self)
554
self._connected = False
559
def _accept_bytes(self, bytes):
560
"""See SmartClientMedium.accept_bytes."""
561
self._ensure_connection()
562
osutils.send_all(self._socket, bytes)
564
def disconnect(self):
565
"""See SmartClientMedium.disconnect()."""
566
if not self._connected:
570
self._connected = False
572
def _ensure_connection(self):
573
"""Connect this medium if not already connected."""
576
self._socket = socket.socket()
577
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
578
if self._port is None:
579
port = BZR_DEFAULT_PORT
581
port = int(self._port)
583
self._socket.connect((self._host, port))
584
except socket.error, err:
585
# socket errors either have a (string) or (errno, string) as their
587
if type(err.args) is str:
590
err_msg = err.args[1]
591
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
592
(self._host, port, err_msg))
593
self._connected = True
596
"""See SmartClientStreamMedium._flush().
598
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
599
add a means to do a flush, but that can be done in the future.
602
def _read_bytes(self, count):
603
"""See SmartClientMedium.read_bytes."""
604
if not self._connected:
605
raise errors.MediumNotConnected(self)
606
return self._socket.recv(count)
609
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
610
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
612
def __init__(self, medium):
613
SmartClientMediumRequest.__init__(self, medium)
614
# check that we are safe concurrency wise. If some streams start
615
# allowing concurrent requests - i.e. via multiplexing - then this
616
# assert should be moved to SmartClientStreamMedium.get_request,
617
# and the setting/unsetting of _current_request likewise moved into
618
# that class : but its unneeded overhead for now. RBC 20060922
619
if self._medium._current_request is not None:
620
raise errors.TooManyConcurrentRequests(self._medium)
621
self._medium._current_request = self
623
def _accept_bytes(self, bytes):
624
"""See SmartClientMediumRequest._accept_bytes.
626
This forwards to self._medium._accept_bytes because we are operating
627
on the mediums stream.
629
self._medium._accept_bytes(bytes)
631
def _finished_reading(self):
632
"""See SmartClientMediumRequest._finished_reading.
634
This clears the _current_request on self._medium to allow a new
635
request to be created.
637
assert self._medium._current_request is self
638
self._medium._current_request = None
640
def _finished_writing(self):
641
"""See SmartClientMediumRequest._finished_writing.
643
This invokes self._medium._flush to ensure all bytes are transmitted.
645
self._medium._flush()
647
def _read_bytes(self, count):
648
"""See SmartClientMediumRequest._read_bytes.
650
This forwards to self._medium._read_bytes because we are operating
651
on the mediums stream.
653
return self._medium._read_bytes(count)