1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""The 'medium' layer for the smart servers and clients.
19
"Medium" here is the noun meaning "a means of transmission", not the adjective
20
for "the quality between big and small."
22
Media carry the bytes of the requests somehow (e.g. via TCP, wrapped in HTTP, or
23
over SSH), and pass them to and from the protocol logic. See the overview in
24
bzrlib/transport/smart/__init__.py.
32
from bzrlib.lazy_import import lazy_import
33
lazy_import(globals(), """
40
from bzrlib.smart import protocol
41
from bzrlib.transport import ssh
45
# We must not read any more than 64k at a time so we don't risk "no buffer
46
# space available" errors on some platforms. Windows in particular is likely
47
# to give error 10053 or 10055 if we read more than 64k from a socket.
48
_MAX_READ_SIZE = 64 * 1024
51
def _get_protocol_factory_for_bytes(bytes):
52
"""Determine the right protocol factory for 'bytes'.
54
This will return an appropriate protocol factory depending on the version
55
of the protocol being used, as determined by inspecting the given bytes.
56
The bytes should have at least one newline byte (i.e. be a whole line),
57
otherwise it's possible that a request will be incorrectly identified as
60
Typical use would be::
62
factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
63
server_protocol = factory(transport, write_func, root_client_path)
64
server_protocol.accept_bytes(unused_bytes)
66
:param bytes: a str of bytes of the start of the request.
67
:returns: 2-tuple of (protocol_factory, unused_bytes). protocol_factory is
68
a callable that takes three args: transport, write_func,
69
root_client_path. unused_bytes are any bytes that were not part of a
70
protocol version marker.
72
if bytes.startswith(protocol.MESSAGE_VERSION_THREE):
73
protocol_factory = protocol.build_server_protocol_three
74
bytes = bytes[len(protocol.MESSAGE_VERSION_THREE):]
75
elif bytes.startswith(protocol.REQUEST_VERSION_TWO):
76
protocol_factory = protocol.SmartServerRequestProtocolTwo
77
bytes = bytes[len(protocol.REQUEST_VERSION_TWO):]
79
protocol_factory = protocol.SmartServerRequestProtocolOne
80
return protocol_factory, bytes
83
def _get_line(read_bytes_func):
84
"""Read bytes using read_bytes_func until a newline byte.
86
This isn't particularly efficient, so should only be used when the
87
expected size of the line is quite short.
89
:returns: a tuple of two strs: (line, excess)
93
while newline_pos == -1:
94
new_bytes = read_bytes_func(1)
97
# Ran out of bytes before receiving a complete line.
99
newline_pos = bytes.find('\n')
100
line = bytes[:newline_pos+1]
101
excess = bytes[newline_pos+1:]
105
class SmartMedium(object):
106
"""Base class for smart protocol media, both client- and server-side."""
109
self._push_back_buffer = None
111
def _push_back(self, bytes):
112
"""Return unused bytes to the medium, because they belong to the next
115
This sets the _push_back_buffer to the given bytes.
117
if self._push_back_buffer is not None:
118
raise AssertionError(
119
"_push_back called when self._push_back_buffer is %r"
120
% (self._push_back_buffer,))
123
self._push_back_buffer = bytes
125
def _get_push_back_buffer(self):
126
if self._push_back_buffer == '':
127
raise AssertionError(
128
'%s._push_back_buffer should never be the empty string, '
129
'which can be confused with EOF' % (self,))
130
bytes = self._push_back_buffer
131
self._push_back_buffer = None
134
def read_bytes(self, desired_count):
135
"""Read some bytes from this medium.
137
:returns: some bytes, possibly more or less than the number requested
138
in 'desired_count' depending on the medium.
140
if self._push_back_buffer is not None:
141
return self._get_push_back_buffer()
142
bytes_to_read = min(desired_count, _MAX_READ_SIZE)
143
return self._read_bytes(bytes_to_read)
145
def _read_bytes(self, count):
146
raise NotImplementedError(self._read_bytes)
149
"""Read bytes from this request's response until a newline byte.
151
This isn't particularly efficient, so should only be used when the
152
expected size of the line is quite short.
154
:returns: a string of bytes ending in a newline (byte 0x0A).
156
line, excess = _get_line(self.read_bytes)
157
self._push_back(excess)
161
class SmartServerStreamMedium(SmartMedium):
162
"""Handles smart commands coming over a stream.
164
The stream may be a pipe connected to sshd, or a tcp socket, or an
165
in-process fifo for testing.
167
One instance is created for each connected client; it can serve multiple
168
requests in the lifetime of the connection.
170
The server passes requests through to an underlying backing transport,
171
which will typically be a LocalTransport looking at the server's filesystem.
173
:ivar _push_back_buffer: a str of bytes that have been read from the stream
174
but not used yet, or None if there are no buffered bytes. Subclasses
175
should make sure to exhaust this buffer before reading more bytes from
176
the stream. See also the _push_back method.
179
def __init__(self, backing_transport, root_client_path='/'):
180
"""Construct new server.
182
:param backing_transport: Transport for the directory served.
184
# backing_transport could be passed to serve instead of __init__
185
self.backing_transport = backing_transport
186
self.root_client_path = root_client_path
187
self.finished = False
188
SmartMedium.__init__(self)
191
"""Serve requests until the client disconnects."""
192
# Keep a reference to stderr because the sys module's globals get set to
193
# None during interpreter shutdown.
194
from sys import stderr
196
while not self.finished:
197
server_protocol = self._build_protocol()
198
self._serve_one_request(server_protocol)
200
stderr.write("%s terminating on exception %s\n" % (self, e))
203
def _build_protocol(self):
204
"""Identifies the version of the incoming request, and returns an
205
a protocol object that can interpret it.
207
If more bytes than the version prefix of the request are read, they will
208
be fed into the protocol before it is returned.
210
:returns: a SmartServerRequestProtocol.
212
bytes = self._get_line()
213
protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
214
protocol = protocol_factory(
215
self.backing_transport, self._write_out, self.root_client_path)
216
protocol.accept_bytes(unused_bytes)
219
def _serve_one_request(self, protocol):
220
"""Read one request from input, process, send back a response.
222
:param protocol: a SmartServerRequestProtocol.
225
self._serve_one_request_unguarded(protocol)
226
except KeyboardInterrupt:
229
self.terminate_due_to_error()
231
def terminate_due_to_error(self):
232
"""Called when an unhandled exception from the protocol occurs."""
233
raise NotImplementedError(self.terminate_due_to_error)
235
def _read_bytes(self, desired_count):
236
"""Get some bytes from the medium.
238
:param desired_count: number of bytes we want to read.
240
raise NotImplementedError(self._read_bytes)
243
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
245
def __init__(self, sock, backing_transport, root_client_path='/'):
248
:param sock: the socket the server will read from. It will be put
251
SmartServerStreamMedium.__init__(
252
self, backing_transport, root_client_path=root_client_path)
253
sock.setblocking(True)
256
def _serve_one_request_unguarded(self, protocol):
257
while protocol.next_read_size():
258
# We can safely try to read large chunks. If there is less data
259
# than _MAX_READ_SIZE ready, the socket wil just return a short
260
# read immediately rather than block.
261
bytes = self.read_bytes(_MAX_READ_SIZE)
265
protocol.accept_bytes(bytes)
267
self._push_back(protocol.unused_data)
269
def _read_bytes(self, desired_count):
270
# We ignore the desired_count because on sockets it's more efficient to
271
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
272
return self.socket.recv(_MAX_READ_SIZE)
274
def terminate_due_to_error(self):
275
# TODO: This should log to a server log file, but no such thing
276
# exists yet. Andrew Bennetts 2006-09-29.
280
def _write_out(self, bytes):
281
osutils.send_all(self.socket, bytes)
284
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
286
def __init__(self, in_file, out_file, backing_transport):
287
"""Construct new server.
289
:param in_file: Python file from which requests can be read.
290
:param out_file: Python file to write responses.
291
:param backing_transport: Transport for the directory served.
293
SmartServerStreamMedium.__init__(self, backing_transport)
294
if sys.platform == 'win32':
295
# force binary mode for files
297
for f in (in_file, out_file):
298
fileno = getattr(f, 'fileno', None)
300
msvcrt.setmode(fileno(), os.O_BINARY)
304
def _serve_one_request_unguarded(self, protocol):
306
# We need to be careful not to read past the end of the current
307
# request, or else the read from the pipe will block, so we use
308
# protocol.next_read_size().
309
bytes_to_read = protocol.next_read_size()
310
if bytes_to_read == 0:
311
# Finished serving this request.
314
bytes = self.read_bytes(bytes_to_read)
316
# Connection has been closed.
320
protocol.accept_bytes(bytes)
322
def _read_bytes(self, desired_count):
323
return self._in.read(desired_count)
325
def terminate_due_to_error(self):
326
# TODO: This should log to a server log file, but no such thing
327
# exists yet. Andrew Bennetts 2006-09-29.
331
def _write_out(self, bytes):
332
self._out.write(bytes)
335
class SmartClientMediumRequest(object):
336
"""A request on a SmartClientMedium.
338
Each request allows bytes to be provided to it via accept_bytes, and then
339
the response bytes to be read via read_bytes.
342
request.accept_bytes('123')
343
request.finished_writing()
344
result = request.read_bytes(3)
345
request.finished_reading()
347
It is up to the individual SmartClientMedium whether multiple concurrent
348
requests can exist. See SmartClientMedium.get_request to obtain instances
349
of SmartClientMediumRequest, and the concrete Medium you are using for
350
details on concurrency and pipelining.
353
def __init__(self, medium):
354
"""Construct a SmartClientMediumRequest for the medium medium."""
355
self._medium = medium
356
# we track state by constants - we may want to use the same
357
# pattern as BodyReader if it gets more complex.
358
# valid states are: "writing", "reading", "done"
359
self._state = "writing"
361
def accept_bytes(self, bytes):
362
"""Accept bytes for inclusion in this request.
364
This method may not be be called after finished_writing() has been
365
called. It depends upon the Medium whether or not the bytes will be
366
immediately transmitted. Message based Mediums will tend to buffer the
367
bytes until finished_writing() is called.
369
:param bytes: A bytestring.
371
if self._state != "writing":
372
raise errors.WritingCompleted(self)
373
self._accept_bytes(bytes)
375
def _accept_bytes(self, bytes):
376
"""Helper for accept_bytes.
378
Accept_bytes checks the state of the request to determing if bytes
379
should be accepted. After that it hands off to _accept_bytes to do the
382
raise NotImplementedError(self._accept_bytes)
384
def finished_reading(self):
385
"""Inform the request that all desired data has been read.
387
This will remove the request from the pipeline for its medium (if the
388
medium supports pipelining) and any further calls to methods on the
389
request will raise ReadingCompleted.
391
if self._state == "writing":
392
raise errors.WritingNotComplete(self)
393
if self._state != "reading":
394
raise errors.ReadingCompleted(self)
396
self._finished_reading()
398
def _finished_reading(self):
399
"""Helper for finished_reading.
401
finished_reading checks the state of the request to determine if
402
finished_reading is allowed, and if it is hands off to _finished_reading
403
to perform the action.
405
raise NotImplementedError(self._finished_reading)
407
def finished_writing(self):
408
"""Finish the writing phase of this request.
410
This will flush all pending data for this request along the medium.
411
After calling finished_writing, you may not call accept_bytes anymore.
413
if self._state != "writing":
414
raise errors.WritingCompleted(self)
415
self._state = "reading"
416
self._finished_writing()
418
def _finished_writing(self):
419
"""Helper for finished_writing.
421
finished_writing checks the state of the request to determine if
422
finished_writing is allowed, and if it is hands off to _finished_writing
423
to perform the action.
425
raise NotImplementedError(self._finished_writing)
427
def read_bytes(self, count):
428
"""Read bytes from this requests response.
430
This method will block and wait for count bytes to be read. It may not
431
be invoked until finished_writing() has been called - this is to ensure
432
a message-based approach to requests, for compatibility with message
433
based mediums like HTTP.
435
if self._state == "writing":
436
raise errors.WritingNotComplete(self)
437
if self._state != "reading":
438
raise errors.ReadingCompleted(self)
439
return self._read_bytes(count)
441
def _read_bytes(self, count):
442
"""Helper for SmartClientMediumRequest.read_bytes.
444
read_bytes checks the state of the request to determing if bytes
445
should be read. After that it hands off to _read_bytes to do the
448
By default this forwards to self._medium.read_bytes because we are
449
operating on the medium's stream.
451
return self._medium.read_bytes(count)
454
line = self._read_line()
455
if not line.endswith('\n'):
456
# end of file encountered reading from server
457
raise errors.ConnectionReset(
458
"please check connectivity and permissions",
459
"(and try -Dhpss if further diagnosis is required)")
462
def _read_line(self):
463
"""Helper for SmartClientMediumRequest.read_line.
465
By default this forwards to self._medium._get_line because we are
466
operating on the medium's stream.
468
return self._medium._get_line()
471
class SmartClientMedium(SmartMedium):
472
"""Smart client is a medium for sending smart protocol requests over."""
474
def __init__(self, base):
475
super(SmartClientMedium, self).__init__()
477
self._protocol_version_error = None
478
self._protocol_version = None
479
self._done_hello = False
480
# Be optimistic: we assume the remote end can accept new remote
481
# requests until we get an error saying otherwise.
482
# _remote_version_is_before tracks the bzr version the remote side
483
# can be based on what we've seen so far.
484
self._remote_version_is_before = None
486
def _is_remote_before(self, version_tuple):
487
"""Is it possible the remote side supports RPCs for a given version?
491
needed_version = (1, 2)
492
if medium._is_remote_before(needed_version):
493
fallback_to_pre_1_2_rpc()
497
except UnknownSmartMethod:
498
medium._remember_remote_is_before(needed_version)
499
fallback_to_pre_1_2_rpc()
501
:seealso: _remember_remote_is_before
503
if self._remote_version_is_before is None:
504
# So far, the remote side seems to support everything
506
return version_tuple >= self._remote_version_is_before
508
def _remember_remote_is_before(self, version_tuple):
509
"""Tell this medium that the remote side is older the given version.
511
:seealso: _is_remote_before
513
if (self._remote_version_is_before is not None and
514
version_tuple > self._remote_version_is_before):
515
raise AssertionError(
516
"_remember_remote_is_before(%r) called, but "
517
"_remember_remote_is_before(%r) was called previously."
518
% (version_tuple, self._remote_version_is_before))
519
self._remote_version_is_before = version_tuple
521
def protocol_version(self):
522
"""Find out if 'hello' smart request works."""
523
if self._protocol_version_error is not None:
524
raise self._protocol_version_error
525
if not self._done_hello:
527
medium_request = self.get_request()
528
# Send a 'hello' request in protocol version one, for maximum
529
# backwards compatibility.
530
client_protocol = protocol.SmartClientRequestProtocolOne(medium_request)
531
client_protocol.query_version()
532
self._done_hello = True
533
except errors.SmartProtocolError, e:
534
# Cache the error, just like we would cache a successful
536
self._protocol_version_error = e
540
def should_probe(self):
541
"""Should RemoteBzrDirFormat.probe_transport send a smart request on
544
Some transports are unambiguously smart-only; there's no need to check
545
if the transport is able to carry smart requests, because that's all
546
it is for. In those cases, this method should return False.
548
But some HTTP transports can sometimes fail to carry smart requests,
549
but still be usuable for accessing remote bzrdirs via plain file
550
accesses. So for those transports, their media should return True here
551
so that RemoteBzrDirFormat can determine if it is appropriate for that
556
def disconnect(self):
557
"""If this medium maintains a persistent connection, close it.
559
The default implementation does nothing.
562
def remote_path_from_transport(self, transport):
563
"""Convert transport into a path suitable for using in a request.
565
Note that the resulting remote path doesn't encode the host name or
566
anything but path, so it is only safe to use it in requests sent over
567
the medium from the matching transport.
569
medium_base = urlutils.join(self.base, '/')
570
rel_url = urlutils.relative_url(medium_base, transport.base)
571
return urllib.unquote(rel_url)
574
class SmartClientStreamMedium(SmartClientMedium):
575
"""Stream based medium common class.
577
SmartClientStreamMediums operate on a stream. All subclasses use a common
578
SmartClientStreamMediumRequest for their requests, and should implement
579
_accept_bytes and _read_bytes to allow the request objects to send and
583
def __init__(self, base):
584
SmartClientMedium.__init__(self, base)
585
self._current_request = None
587
def accept_bytes(self, bytes):
588
self._accept_bytes(bytes)
591
"""The SmartClientStreamMedium knows how to close the stream when it is
597
"""Flush the output stream.
599
This method is used by the SmartClientStreamMediumRequest to ensure that
600
all data for a request is sent, to avoid long timeouts or deadlocks.
602
raise NotImplementedError(self._flush)
604
def get_request(self):
605
"""See SmartClientMedium.get_request().
607
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
610
return SmartClientStreamMediumRequest(self)
613
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
614
"""A client medium using simple pipes.
616
This client does not manage the pipes: it assumes they will always be open.
619
def __init__(self, readable_pipe, writeable_pipe, base):
620
SmartClientStreamMedium.__init__(self, base)
621
self._readable_pipe = readable_pipe
622
self._writeable_pipe = writeable_pipe
624
def _accept_bytes(self, bytes):
625
"""See SmartClientStreamMedium.accept_bytes."""
626
self._writeable_pipe.write(bytes)
629
"""See SmartClientStreamMedium._flush()."""
630
self._writeable_pipe.flush()
632
def _read_bytes(self, count):
633
"""See SmartClientStreamMedium._read_bytes."""
634
return self._readable_pipe.read(count)
637
class SmartSSHClientMedium(SmartClientStreamMedium):
638
"""A client medium using SSH."""
640
def __init__(self, host, port=None, username=None, password=None,
641
base=None, vendor=None, bzr_remote_path=None):
642
"""Creates a client that will connect on the first use.
644
:param vendor: An optional override for the ssh vendor to use. See
645
bzrlib.transport.ssh for details on ssh vendors.
647
SmartClientStreamMedium.__init__(self, base)
648
self._connected = False
650
self._password = password
652
self._username = username
653
self._read_from = None
654
self._ssh_connection = None
655
self._vendor = vendor
656
self._write_to = None
657
self._bzr_remote_path = bzr_remote_path
658
if self._bzr_remote_path is None:
659
symbol_versioning.warn(
660
'bzr_remote_path is required as of bzr 0.92',
661
DeprecationWarning, stacklevel=2)
662
self._bzr_remote_path = os.environ.get('BZR_REMOTE_PATH', 'bzr')
664
def _accept_bytes(self, bytes):
665
"""See SmartClientStreamMedium.accept_bytes."""
666
self._ensure_connection()
667
self._write_to.write(bytes)
669
def disconnect(self):
670
"""See SmartClientMedium.disconnect()."""
671
if not self._connected:
673
self._read_from.close()
674
self._write_to.close()
675
self._ssh_connection.close()
676
self._connected = False
678
def _ensure_connection(self):
679
"""Connect this medium if not already connected."""
682
if self._vendor is None:
683
vendor = ssh._get_ssh_vendor()
685
vendor = self._vendor
686
self._ssh_connection = vendor.connect_ssh(self._username,
687
self._password, self._host, self._port,
688
command=[self._bzr_remote_path, 'serve', '--inet',
689
'--directory=/', '--allow-writes'])
690
self._read_from, self._write_to = \
691
self._ssh_connection.get_filelike_channels()
692
self._connected = True
695
"""See SmartClientStreamMedium._flush()."""
696
self._write_to.flush()
698
def _read_bytes(self, count):
699
"""See SmartClientStreamMedium.read_bytes."""
700
if not self._connected:
701
raise errors.MediumNotConnected(self)
702
bytes_to_read = min(count, _MAX_READ_SIZE)
703
return self._read_from.read(bytes_to_read)
706
# Port 4155 is the default port for bzr://, registered with IANA.
707
BZR_DEFAULT_INTERFACE = '0.0.0.0'
708
BZR_DEFAULT_PORT = 4155
711
class SmartTCPClientMedium(SmartClientStreamMedium):
712
"""A client medium using TCP."""
714
def __init__(self, host, port, base):
715
"""Creates a client that will connect on the first use."""
716
SmartClientStreamMedium.__init__(self, base)
717
self._connected = False
722
def _accept_bytes(self, bytes):
723
"""See SmartClientMedium.accept_bytes."""
724
self._ensure_connection()
725
osutils.send_all(self._socket, bytes)
727
def disconnect(self):
728
"""See SmartClientMedium.disconnect()."""
729
if not self._connected:
733
self._connected = False
735
def _ensure_connection(self):
736
"""Connect this medium if not already connected."""
739
self._socket = socket.socket()
740
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
741
if self._port is None:
742
port = BZR_DEFAULT_PORT
744
port = int(self._port)
746
self._socket.connect((self._host, port))
747
except socket.error, err:
748
# socket errors either have a (string) or (errno, string) as their
750
if type(err.args) is str:
753
err_msg = err.args[1]
754
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
755
(self._host, port, err_msg))
756
self._connected = True
759
"""See SmartClientStreamMedium._flush().
761
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
762
add a means to do a flush, but that can be done in the future.
765
def _read_bytes(self, count):
766
"""See SmartClientMedium.read_bytes."""
767
if not self._connected:
768
raise errors.MediumNotConnected(self)
769
# We ignore the desired_count because on sockets it's more efficient to
770
# read large chunks (of _MAX_READ_SIZE bytes) at a time.
771
return self._socket.recv(_MAX_READ_SIZE)
774
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
775
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
777
def __init__(self, medium):
778
SmartClientMediumRequest.__init__(self, medium)
779
# check that we are safe concurrency wise. If some streams start
780
# allowing concurrent requests - i.e. via multiplexing - then this
781
# assert should be moved to SmartClientStreamMedium.get_request,
782
# and the setting/unsetting of _current_request likewise moved into
783
# that class : but its unneeded overhead for now. RBC 20060922
784
if self._medium._current_request is not None:
785
raise errors.TooManyConcurrentRequests(self._medium)
786
self._medium._current_request = self
788
def _accept_bytes(self, bytes):
789
"""See SmartClientMediumRequest._accept_bytes.
791
This forwards to self._medium._accept_bytes because we are operating
792
on the mediums stream.
794
self._medium._accept_bytes(bytes)
796
def _finished_reading(self):
797
"""See SmartClientMediumRequest._finished_reading.
799
This clears the _current_request on self._medium to allow a new
800
request to be created.
802
if self._medium._current_request is not self:
803
raise AssertionError()
804
self._medium._current_request = None
806
def _finished_writing(self):
807
"""See SmartClientMediumRequest._finished_writing.
809
This invokes self._medium._flush to ensure all bytes are transmitted.
811
self._medium._flush()