14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the idea
51
that you have multiple requests and get a read error because the other side did
52
shutdown. For pipes we have read pipe which will have a zero read which marks
53
end-of-file. For HTTP server environment there is not end-of-stream because
54
each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out requests from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialization, deserialization) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialization, deserialization) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
197
17
from cStringIO import StringIO
207
21
from bzrlib import (
215
from bzrlib.bundle.serializer import write_bundle
216
from bzrlib.hooks import Hooks
218
from bzrlib.transport import ssh
219
except errors.ParamikoNotPresent:
220
# no paramiko. SmartSSHClientMedium will break.
25
from bzrlib.smart.protocol import SmartClientRequestProtocolOne
26
from bzrlib.smart.medium import SmartTCPClientMedium, SmartSSHClientMedium
223
28
# must do this otherwise urllib can't parse the urls properly :(
224
29
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
230
35
BZR_DEFAULT_PORT = 4155
233
def _recv_tuple(from_file):
234
req_line = from_file.readline()
235
return _decode_tuple(req_line)
238
def _decode_tuple(req_line):
239
if req_line == None or req_line == '':
241
if req_line[-1] != '\n':
242
raise errors.SmartProtocolError("request %r not terminated" % req_line)
243
return tuple(req_line[:-1].split('\x01'))
246
def _encode_tuple(args):
247
"""Encode the tuple args to a bytestream."""
248
return '\x01'.join(args) + '\n'
251
class SmartProtocolBase(object):
252
"""Methods common to client and server"""
254
# TODO: this only actually accomodates a single block; possibly should
255
# support multiple chunks?
256
def _encode_bulk_data(self, body):
257
"""Encode body as a bulk data chunk."""
258
return ''.join(('%d\n' % len(body), body, 'done\n'))
260
def _serialise_offsets(self, offsets):
261
"""Serialise a readv offset list."""
263
for start, length in offsets:
264
txt.append('%d,%d' % (start, length))
265
return '\n'.join(txt)
268
class SmartServerRequestProtocolOne(SmartProtocolBase):
269
"""Server-side encoding and decoding logic for smart version 1."""
271
def __init__(self, backing_transport, write_func):
272
self._backing_transport = backing_transport
273
self.excess_buffer = ''
274
self._finished = False
276
self.has_dispatched = False
278
self._body_decoder = None
279
self._write_func = write_func
281
def accept_bytes(self, bytes):
282
"""Take bytes, and advance the internal state machine appropriately.
284
:param bytes: must be a byte string
286
assert isinstance(bytes, str)
287
self.in_buffer += bytes
288
if not self.has_dispatched:
289
if '\n' not in self.in_buffer:
290
# no command line yet
292
self.has_dispatched = True
294
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
296
req_args = _decode_tuple(first_line)
297
self.request = SmartServerRequestHandler(
298
self._backing_transport)
299
self.request.dispatch_command(req_args[0], req_args[1:])
300
if self.request.finished_reading:
302
self.excess_buffer = self.in_buffer
304
self._send_response(self.request.response.args,
305
self.request.response.body)
306
except KeyboardInterrupt:
308
except Exception, exception:
309
# everything else: pass to client, flush, and quit
310
self._send_response(('error', str(exception)))
313
if self.has_dispatched:
315
# nothing to do.XXX: this routine should be a single state
317
self.excess_buffer += self.in_buffer
320
if self._body_decoder is None:
321
self._body_decoder = LengthPrefixedBodyDecoder()
322
self._body_decoder.accept_bytes(self.in_buffer)
323
self.in_buffer = self._body_decoder.unused_data
324
body_data = self._body_decoder.read_pending_data()
325
self.request.accept_body(body_data)
326
if self._body_decoder.finished_reading:
327
self.request.end_of_body()
328
assert self.request.finished_reading, \
329
"no more body, request not finished"
330
if self.request.response is not None:
331
self._send_response(self.request.response.args,
332
self.request.response.body)
333
self.excess_buffer = self.in_buffer
336
assert not self.request.finished_reading, \
337
"no response and we have finished reading."
339
def _send_response(self, args, body=None):
340
"""Send a smart server response down the output stream."""
341
assert not self._finished, 'response already sent'
342
self._finished = True
343
self._write_func(_encode_tuple(args))
345
assert isinstance(body, str), 'body must be a str'
346
bytes = self._encode_bulk_data(body)
347
self._write_func(bytes)
349
def next_read_size(self):
352
if self._body_decoder is None:
355
return self._body_decoder.next_read_size()
358
class LengthPrefixedBodyDecoder(object):
359
"""Decodes the length-prefixed bulk data."""
362
self.bytes_left = None
363
self.finished_reading = False
364
self.unused_data = ''
365
self.state_accept = self._state_accept_expecting_length
366
self.state_read = self._state_read_no_data
368
self._trailer_buffer = ''
370
def accept_bytes(self, bytes):
371
"""Decode as much of bytes as possible.
373
If 'bytes' contains too much data it will be appended to
376
finished_reading will be set when no more data is required. Further
377
data will be appended to self.unused_data.
379
# accept_bytes is allowed to change the state
380
current_state = self.state_accept
381
self.state_accept(bytes)
382
while current_state != self.state_accept:
383
current_state = self.state_accept
384
self.state_accept('')
386
def next_read_size(self):
387
if self.bytes_left is not None:
388
# Ideally we want to read all the remainder of the body and the
390
return self.bytes_left + 5
391
elif self.state_accept == self._state_accept_reading_trailer:
392
# Just the trailer left
393
return 5 - len(self._trailer_buffer)
394
elif self.state_accept == self._state_accept_expecting_length:
395
# There's still at least 6 bytes left ('\n' to end the length, plus
399
# Reading excess data. Either way, 1 byte at a time is fine.
402
def read_pending_data(self):
403
"""Return any pending data that has been decoded."""
404
return self.state_read()
406
def _state_accept_expecting_length(self, bytes):
407
self._in_buffer += bytes
408
pos = self._in_buffer.find('\n')
411
self.bytes_left = int(self._in_buffer[:pos])
412
self._in_buffer = self._in_buffer[pos+1:]
413
self.bytes_left -= len(self._in_buffer)
414
self.state_accept = self._state_accept_reading_body
415
self.state_read = self._state_read_in_buffer
417
def _state_accept_reading_body(self, bytes):
418
self._in_buffer += bytes
419
self.bytes_left -= len(bytes)
420
if self.bytes_left <= 0:
422
if self.bytes_left != 0:
423
self._trailer_buffer = self._in_buffer[self.bytes_left:]
424
self._in_buffer = self._in_buffer[:self.bytes_left]
425
self.bytes_left = None
426
self.state_accept = self._state_accept_reading_trailer
428
def _state_accept_reading_trailer(self, bytes):
429
self._trailer_buffer += bytes
430
# TODO: what if the trailer does not match "done\n"? Should this raise
431
# a ProtocolViolation exception?
432
if self._trailer_buffer.startswith('done\n'):
433
self.unused_data = self._trailer_buffer[len('done\n'):]
434
self.state_accept = self._state_accept_reading_unused
435
self.finished_reading = True
437
def _state_accept_reading_unused(self, bytes):
438
self.unused_data += bytes
440
def _state_read_no_data(self):
443
def _state_read_in_buffer(self):
444
result = self._in_buffer
449
class SmartServerStreamMedium(object):
450
"""Handles smart commands coming over a stream.
452
The stream may be a pipe connected to sshd, or a tcp socket, or an
453
in-process fifo for testing.
455
One instance is created for each connected client; it can serve multiple
456
requests in the lifetime of the connection.
458
The server passes requests through to an underlying backing transport,
459
which will typically be a LocalTransport looking at the server's filesystem.
462
def __init__(self, backing_transport):
463
"""Construct new server.
465
:param backing_transport: Transport for the directory served.
467
# backing_transport could be passed to serve instead of __init__
468
self.backing_transport = backing_transport
469
self.finished = False
472
"""Serve requests until the client disconnects."""
473
# Keep a reference to stderr because the sys module's globals get set to
474
# None during interpreter shutdown.
475
from sys import stderr
477
while not self.finished:
478
protocol = SmartServerRequestProtocolOne(self.backing_transport,
480
self._serve_one_request(protocol)
482
stderr.write("%s terminating on exception %s\n" % (self, e))
485
def _serve_one_request(self, protocol):
486
"""Read one request from input, process, send back a response.
488
:param protocol: a SmartServerRequestProtocol.
491
self._serve_one_request_unguarded(protocol)
492
except KeyboardInterrupt:
495
self.terminate_due_to_error()
497
def terminate_due_to_error(self):
498
"""Called when an unhandled exception from the protocol occurs."""
499
raise NotImplementedError(self.terminate_due_to_error)
502
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
504
def __init__(self, sock, backing_transport):
507
:param sock: the socket the server will read from. It will be put
510
SmartServerStreamMedium.__init__(self, backing_transport)
512
sock.setblocking(True)
515
def _serve_one_request_unguarded(self, protocol):
516
while protocol.next_read_size():
518
protocol.accept_bytes(self.push_back)
521
bytes = self.socket.recv(4096)
525
protocol.accept_bytes(bytes)
527
self.push_back = protocol.excess_buffer
529
def terminate_due_to_error(self):
530
"""Called when an unhandled exception from the protocol occurs."""
531
# TODO: This should log to a server log file, but no such thing
532
# exists yet. Andrew Bennetts 2006-09-29.
536
def _write_out(self, bytes):
537
self.socket.sendall(bytes)
540
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
542
def __init__(self, in_file, out_file, backing_transport):
543
"""Construct new server.
545
:param in_file: Python file from which requests can be read.
546
:param out_file: Python file to write responses.
547
:param backing_transport: Transport for the directory served.
549
SmartServerStreamMedium.__init__(self, backing_transport)
550
if sys.platform == 'win32':
551
# force binary mode for files
553
for f in (in_file, out_file):
554
fileno = getattr(f, 'fileno', None)
556
msvcrt.setmode(fileno(), os.O_BINARY)
560
def _serve_one_request_unguarded(self, protocol):
562
bytes_to_read = protocol.next_read_size()
563
if bytes_to_read == 0:
564
# Finished serving this request.
567
bytes = self._in.read(bytes_to_read)
569
# Connection has been closed.
573
protocol.accept_bytes(bytes)
575
def terminate_due_to_error(self):
576
# TODO: This should log to a server log file, but no such thing
577
# exists yet. Andrew Bennetts 2006-09-29.
581
def _write_out(self, bytes):
582
self._out.write(bytes)
585
class SmartServerResponse(object):
586
"""Response generated by SmartServerRequestHandler."""
588
def __init__(self, args, body=None):
592
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
593
# for delivering the data for a request. This could be done with as the
594
# StreamServer, though that would create conflation between request and response
595
# which may be undesirable.
598
class SmartServerRequestHandler(object):
599
"""Protocol logic for smart server.
601
This doesn't handle serialization at all, it just processes requests and
605
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
606
# not contain encoding or decoding logic to allow the wire protocol to vary
607
# from the object protocol: we will want to tweak the wire protocol separate
608
# from the object model, and ideally we will be able to do that without
609
# having a SmartServerRequestHandler subclass for each wire protocol, rather
610
# just a Protocol subclass.
612
# TODO: Better way of representing the body for commands that take it,
613
# and allow it to be streamed into the server.
615
def __init__(self, backing_transport):
616
self._backing_transport = backing_transport
617
self._converted_command = False
618
self.finished_reading = False
619
self._body_bytes = ''
622
def accept_body(self, bytes):
625
This should be overriden for each command that desired body data to
626
handle the right format of that data. I.e. plain bytes, a bundle etc.
628
The deserialisation into that format should be done in the Protocol
629
object. Set self.desired_body_format to the format your method will
632
# default fallback is to accumulate bytes.
633
self._body_bytes += bytes
635
def _end_of_body_handler(self):
636
"""An unimplemented end of body handler."""
637
raise NotImplementedError(self._end_of_body_handler)
640
"""Answer a version request with my version."""
641
return SmartServerResponse(('ok', '1'))
643
def do_has(self, relpath):
644
r = self._backing_transport.has(relpath) and 'yes' or 'no'
645
return SmartServerResponse((r,))
647
def do_get(self, relpath):
648
backing_bytes = self._backing_transport.get_bytes(relpath)
649
return SmartServerResponse(('ok',), backing_bytes)
651
def _deserialise_optional_mode(self, mode):
652
# XXX: FIXME this should be on the protocol object.
658
def do_append(self, relpath, mode):
659
self._converted_command = True
660
self._relpath = relpath
661
self._mode = self._deserialise_optional_mode(mode)
662
self._end_of_body_handler = self._handle_do_append_end
664
def _handle_do_append_end(self):
665
old_length = self._backing_transport.append_bytes(
666
self._relpath, self._body_bytes, self._mode)
667
self.response = SmartServerResponse(('appended', '%d' % old_length))
669
def do_delete(self, relpath):
670
self._backing_transport.delete(relpath)
672
def do_iter_files_recursive(self, relpath):
673
transport = self._backing_transport.clone(relpath)
674
filenames = transport.iter_files_recursive()
675
return SmartServerResponse(('names',) + tuple(filenames))
677
def do_list_dir(self, relpath):
678
filenames = self._backing_transport.list_dir(relpath)
679
return SmartServerResponse(('names',) + tuple(filenames))
681
def do_mkdir(self, relpath, mode):
682
self._backing_transport.mkdir(relpath,
683
self._deserialise_optional_mode(mode))
685
def do_move(self, rel_from, rel_to):
686
self._backing_transport.move(rel_from, rel_to)
688
def do_put(self, relpath, mode):
689
self._converted_command = True
690
self._relpath = relpath
691
self._mode = self._deserialise_optional_mode(mode)
692
self._end_of_body_handler = self._handle_do_put
694
def _handle_do_put(self):
695
self._backing_transport.put_bytes(self._relpath,
696
self._body_bytes, self._mode)
697
self.response = SmartServerResponse(('ok',))
699
def _deserialise_offsets(self, text):
700
# XXX: FIXME this should be on the protocol object.
702
for line in text.split('\n'):
705
start, length = line.split(',')
706
offsets.append((int(start), int(length)))
709
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
710
self._converted_command = True
711
self._end_of_body_handler = self._handle_put_non_atomic
712
self._relpath = relpath
713
self._dir_mode = self._deserialise_optional_mode(dir_mode)
714
self._mode = self._deserialise_optional_mode(mode)
715
# a boolean would be nicer XXX
716
self._create_parent = (create_parent == 'T')
718
def _handle_put_non_atomic(self):
719
self._backing_transport.put_bytes_non_atomic(self._relpath,
722
create_parent_dir=self._create_parent,
723
dir_mode=self._dir_mode)
724
self.response = SmartServerResponse(('ok',))
726
def do_readv(self, relpath):
727
self._converted_command = True
728
self._end_of_body_handler = self._handle_readv_offsets
729
self._relpath = relpath
731
def end_of_body(self):
732
"""No more body data will be received."""
733
self._run_handler_code(self._end_of_body_handler, (), {})
734
# cannot read after this.
735
self.finished_reading = True
737
def _handle_readv_offsets(self):
738
"""accept offsets for a readv request."""
739
offsets = self._deserialise_offsets(self._body_bytes)
740
backing_bytes = ''.join(bytes for offset, bytes in
741
self._backing_transport.readv(self._relpath, offsets))
742
self.response = SmartServerResponse(('readv',), backing_bytes)
744
def do_rename(self, rel_from, rel_to):
745
self._backing_transport.rename(rel_from, rel_to)
747
def do_rmdir(self, relpath):
748
self._backing_transport.rmdir(relpath)
750
def do_stat(self, relpath):
751
stat = self._backing_transport.stat(relpath)
752
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
754
def do_get_bundle(self, path, revision_id):
755
# open transport relative to our base
756
t = self._backing_transport.clone(path)
757
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
758
repo = control.open_repository()
759
tmpf = tempfile.TemporaryFile()
760
base_revision = revision.NULL_REVISION
761
write_bundle(repo, revision_id, base_revision, tmpf)
763
return SmartServerResponse((), tmpf.read())
765
def dispatch_command(self, cmd, args):
766
"""Deprecated compatibility method.""" # XXX XXX
767
func = getattr(self, 'do_' + cmd, None)
769
raise errors.SmartProtocolError("bad request %r" % (cmd,))
770
self._run_handler_code(func, args, {})
772
def _run_handler_code(self, callable, args, kwargs):
773
"""Run some handler specific code 'callable'.
775
If a result is returned, it is considered to be the commands response,
776
and finished_reading is set true, and its assigned to self.response.
778
Any exceptions caught are translated and a response object created
781
result = self._call_converting_errors(callable, args, kwargs)
782
if result is not None:
783
self.response = result
784
self.finished_reading = True
785
# handle unconverted commands
786
if not self._converted_command:
787
self.finished_reading = True
789
self.response = SmartServerResponse(('ok',))
791
def _call_converting_errors(self, callable, args, kwargs):
792
"""Call callable converting errors to Response objects."""
794
return callable(*args, **kwargs)
795
except errors.NoSuchFile, e:
796
return SmartServerResponse(('NoSuchFile', e.path))
797
except errors.FileExists, e:
798
return SmartServerResponse(('FileExists', e.path))
799
except errors.DirectoryNotEmpty, e:
800
return SmartServerResponse(('DirectoryNotEmpty', e.path))
801
except errors.ShortReadvError, e:
802
return SmartServerResponse(('ShortReadvError',
803
e.path, str(e.offset), str(e.length), str(e.actual)))
804
except UnicodeError, e:
805
# If it is a DecodeError, than most likely we are starting
806
# with a plain string
807
str_or_unicode = e.object
808
if isinstance(str_or_unicode, unicode):
809
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
810
# should escape it somehow.
811
val = 'u:' + str_or_unicode.encode('utf-8')
813
val = 's:' + str_or_unicode.encode('base64')
814
# This handles UnicodeEncodeError or UnicodeDecodeError
815
return SmartServerResponse((e.__class__.__name__,
816
e.encoding, val, str(e.start), str(e.end), e.reason))
817
except errors.TransportNotPossible, e:
818
if e.msg == "readonly transport":
819
return SmartServerResponse(('ReadOnlyError', ))
824
class SmartTCPServer(object):
825
"""Listens on a TCP socket and accepts connections from smart clients
827
hooks: An instance of SmartServerHooks.
830
def __init__(self, backing_transport, host='127.0.0.1', port=0):
831
"""Construct a new server.
833
To actually start it running, call either start_background_thread or
836
:param host: Name of the interface to listen on.
837
:param port: TCP port to listen on, or 0 to allocate a transient port.
839
# let connections timeout so that we get a chance to terminate
840
# Keep a reference to the exceptions we want to catch because the socket
841
# module's globals get set to None during interpreter shutdown.
842
from socket import timeout as socket_timeout
843
from socket import error as socket_error
844
self._socket_error = socket_error
845
self._socket_timeout = socket_timeout
846
self._server_socket = socket.socket()
847
self._server_socket.bind((host, port))
848
self._sockname = self._server_socket.getsockname()
849
self.port = self._sockname[1]
850
self._server_socket.listen(1)
851
self._server_socket.settimeout(1)
852
self.backing_transport = backing_transport
853
self._started = threading.Event()
854
self._stopped = threading.Event()
857
self._should_terminate = False
858
for hook in SmartTCPServer.hooks['server_started']:
859
hook(self.backing_transport.base, self.get_url())
863
while not self._should_terminate:
865
conn, client_addr = self._server_socket.accept()
866
except self._socket_timeout:
867
# just check if we're asked to stop
869
except self._socket_error, e:
870
# if the socket is closed by stop_background_thread
871
# we might get a EBADF here, any other socket errors
873
if e.args[0] != errno.EBADF:
874
trace.warning("listening socket error: %s", e)
876
self.serve_conn(conn)
877
except KeyboardInterrupt:
878
# dont log when CTRL-C'd.
881
trace.error("Unhandled smart server error.")
882
trace.log_exception_quietly()
887
# ensure the server socket is closed.
888
self._server_socket.close()
889
except self._socket_error:
890
# ignore errors on close
892
for hook in SmartTCPServer.hooks['server_stopped']:
893
hook(self.backing_transport.base, self.get_url())
896
"""Return the url of the server"""
897
return "bzr://%s:%d/" % self._sockname
899
def serve_conn(self, conn):
900
# For WIN32, where the timeout value from the listening socket
901
# propogates to the newly accepted socket.
902
conn.setblocking(True)
903
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
904
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
905
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
906
connection_thread.setDaemon(True)
907
connection_thread.start()
909
def start_background_thread(self):
910
self._started.clear()
911
self._server_thread = threading.Thread(None,
913
name='server-' + self.get_url())
914
self._server_thread.setDaemon(True)
915
self._server_thread.start()
918
def stop_background_thread(self):
919
self._stopped.clear()
920
# tell the main loop to quit on the next iteration.
921
self._should_terminate = True
922
# close the socket - gives error to connections from here on in,
923
# rather than a connection reset error to connections made during
924
# the period between setting _should_terminate = True and
925
# the current request completing/aborting. It may also break out the
926
# main loop if it was currently in accept() (on some platforms).
928
self._server_socket.close()
929
except self._socket_error:
930
# ignore errors on close
932
if not self._stopped.isSet():
933
# server has not stopped (though it may be stopping)
934
# its likely in accept(), so give it a connection
935
temp_socket = socket.socket()
936
temp_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
937
if not temp_socket.connect_ex(self._sockname):
938
# and close it immediately: we dont choose to send any requests.
941
self._server_thread.join()
944
class SmartServerHooks(Hooks):
945
"""Hooks for the smart server."""
948
"""Create the default hooks.
950
These are all empty initially, because by default nothing should get
954
# Introduced in 0.16:
955
# invoked whenever the server starts serving a directory.
956
# The api signature is (backing url, public url).
957
self['server_started'] = []
958
# Introduced in 0.16:
959
# invoked whenever the server stops serving a directory.
960
# The api signature is (backing url, public url).
961
self['server_stopped'] = []
963
SmartTCPServer.hooks = SmartServerHooks()
966
class SmartTCPServer_for_testing(SmartTCPServer):
967
"""Server suitable for use by transport tests.
969
This server is backed by the process's cwd.
973
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
974
# The server is set up by default like for ssh access: the client
975
# passes filesystem-absolute paths; therefore the server must look
976
# them up relative to the root directory. it might be better to act
977
# a public server and have the server rewrite paths into the test
979
SmartTCPServer.__init__(self,
980
transport.get_transport(urlutils.local_path_to_url('/')))
982
def get_backing_transport(self, backing_transport_server):
983
"""Get a backing transport from a server we are decorating."""
984
return transport.get_transport(backing_transport_server.get_url())
986
def setUp(self, backing_transport_server=None):
987
"""Set up server for testing"""
988
from bzrlib.transport.chroot import TestingChrootServer
989
if backing_transport_server is None:
990
from bzrlib.transport.local import LocalURLServer
991
backing_transport_server = LocalURLServer()
992
self.chroot_server = TestingChrootServer()
993
self.chroot_server.setUp(backing_transport_server)
994
self.backing_transport = transport.get_transport(
995
self.chroot_server.get_url())
996
self.start_background_thread()
999
self.stop_background_thread()
1001
def get_bogus_url(self):
1002
"""Return a URL which will fail to connect"""
1003
return 'bzr://127.0.0.1:1/'
1006
38
class SmartStat(object):
1008
40
def __init__(self, size, mode):
1366
398
self._translate_error(resp)
1369
class SmartClientMediumRequest(object):
1370
"""A request on a SmartClientMedium.
1372
Each request allows bytes to be provided to it via accept_bytes, and then
1373
the response bytes to be read via read_bytes.
1376
request.accept_bytes('123')
1377
request.finished_writing()
1378
result = request.read_bytes(3)
1379
request.finished_reading()
1381
It is up to the individual SmartClientMedium whether multiple concurrent
1382
requests can exist. See SmartClientMedium.get_request to obtain instances
1383
of SmartClientMediumRequest, and the concrete Medium you are using for
1384
details on concurrency and pipelining.
1387
def __init__(self, medium):
1388
"""Construct a SmartClientMediumRequest for the medium medium."""
1389
self._medium = medium
1390
# we track state by constants - we may want to use the same
1391
# pattern as BodyReader if it gets more complex.
1392
# valid states are: "writing", "reading", "done"
1393
self._state = "writing"
1395
def accept_bytes(self, bytes):
1396
"""Accept bytes for inclusion in this request.
1398
This method may not be be called after finished_writing() has been
1399
called. It depends upon the Medium whether or not the bytes will be
1400
immediately transmitted. Message based Mediums will tend to buffer the
1401
bytes until finished_writing() is called.
1403
:param bytes: A bytestring.
1405
if self._state != "writing":
1406
raise errors.WritingCompleted(self)
1407
self._accept_bytes(bytes)
1409
def _accept_bytes(self, bytes):
1410
"""Helper for accept_bytes.
1412
Accept_bytes checks the state of the request to determing if bytes
1413
should be accepted. After that it hands off to _accept_bytes to do the
1416
raise NotImplementedError(self._accept_bytes)
1418
def finished_reading(self):
1419
"""Inform the request that all desired data has been read.
1421
This will remove the request from the pipeline for its medium (if the
1422
medium supports pipelining) and any further calls to methods on the
1423
request will raise ReadingCompleted.
1425
if self._state == "writing":
1426
raise errors.WritingNotComplete(self)
1427
if self._state != "reading":
1428
raise errors.ReadingCompleted(self)
1429
self._state = "done"
1430
self._finished_reading()
1432
def _finished_reading(self):
1433
"""Helper for finished_reading.
1435
finished_reading checks the state of the request to determine if
1436
finished_reading is allowed, and if it is hands off to _finished_reading
1437
to perform the action.
1439
raise NotImplementedError(self._finished_reading)
1441
def finished_writing(self):
1442
"""Finish the writing phase of this request.
1444
This will flush all pending data for this request along the medium.
1445
After calling finished_writing, you may not call accept_bytes anymore.
1447
if self._state != "writing":
1448
raise errors.WritingCompleted(self)
1449
self._state = "reading"
1450
self._finished_writing()
1452
def _finished_writing(self):
1453
"""Helper for finished_writing.
1455
finished_writing checks the state of the request to determine if
1456
finished_writing is allowed, and if it is hands off to _finished_writing
1457
to perform the action.
1459
raise NotImplementedError(self._finished_writing)
1461
def read_bytes(self, count):
1462
"""Read bytes from this requests response.
1464
This method will block and wait for count bytes to be read. It may not
1465
be invoked until finished_writing() has been called - this is to ensure
1466
a message-based approach to requests, for compatability with message
1467
based mediums like HTTP.
1469
if self._state == "writing":
1470
raise errors.WritingNotComplete(self)
1471
if self._state != "reading":
1472
raise errors.ReadingCompleted(self)
1473
return self._read_bytes(count)
1475
def _read_bytes(self, count):
1476
"""Helper for read_bytes.
1478
read_bytes checks the state of the request to determing if bytes
1479
should be read. After that it hands off to _read_bytes to do the
1482
raise NotImplementedError(self._read_bytes)
1485
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1486
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1488
def __init__(self, medium):
1489
SmartClientMediumRequest.__init__(self, medium)
1490
# check that we are safe concurrency wise. If some streams start
1491
# allowing concurrent requests - i.e. via multiplexing - then this
1492
# assert should be moved to SmartClientStreamMedium.get_request,
1493
# and the setting/unsetting of _current_request likewise moved into
1494
# that class : but its unneeded overhead for now. RBC 20060922
1495
if self._medium._current_request is not None:
1496
raise errors.TooManyConcurrentRequests(self._medium)
1497
self._medium._current_request = self
1499
def _accept_bytes(self, bytes):
1500
"""See SmartClientMediumRequest._accept_bytes.
1502
This forwards to self._medium._accept_bytes because we are operating
1503
on the mediums stream.
1505
self._medium._accept_bytes(bytes)
1507
def _finished_reading(self):
1508
"""See SmartClientMediumRequest._finished_reading.
1510
This clears the _current_request on self._medium to allow a new
1511
request to be created.
1513
assert self._medium._current_request is self
1514
self._medium._current_request = None
1516
def _finished_writing(self):
1517
"""See SmartClientMediumRequest._finished_writing.
1519
This invokes self._medium._flush to ensure all bytes are transmitted.
1521
self._medium._flush()
1523
def _read_bytes(self, count):
1524
"""See SmartClientMediumRequest._read_bytes.
1526
This forwards to self._medium._read_bytes because we are operating
1527
on the mediums stream.
1529
return self._medium._read_bytes(count)
1532
class SmartClientRequestProtocolOne(SmartProtocolBase):
1533
"""The client-side protocol for smart version 1."""
1535
def __init__(self, request):
1536
"""Construct a SmartClientRequestProtocolOne.
1538
:param request: A SmartClientMediumRequest to serialise onto and
1541
self._request = request
1542
self._body_buffer = None
1544
def call(self, *args):
1545
bytes = _encode_tuple(args)
1546
self._request.accept_bytes(bytes)
1547
self._request.finished_writing()
1549
def call_with_body_bytes(self, args, body):
1550
"""Make a remote call of args with body bytes 'body'.
1552
After calling this, call read_response_tuple to find the result out.
1554
bytes = _encode_tuple(args)
1555
self._request.accept_bytes(bytes)
1556
bytes = self._encode_bulk_data(body)
1557
self._request.accept_bytes(bytes)
1558
self._request.finished_writing()
1560
def call_with_body_readv_array(self, args, body):
1561
"""Make a remote call with a readv array.
1563
The body is encoded with one line per readv offset pair. The numbers in
1564
each pair are separated by a comma, and no trailing \n is emitted.
1566
bytes = _encode_tuple(args)
1567
self._request.accept_bytes(bytes)
1568
readv_bytes = self._serialise_offsets(body)
1569
bytes = self._encode_bulk_data(readv_bytes)
1570
self._request.accept_bytes(bytes)
1571
self._request.finished_writing()
1573
def cancel_read_body(self):
1574
"""After expecting a body, a response code may indicate one otherwise.
1576
This method lets the domain client inform the protocol that no body
1577
will be transmitted. This is a terminal method: after calling it the
1578
protocol is not able to be used further.
1580
self._request.finished_reading()
1582
def read_response_tuple(self, expect_body=False):
1583
"""Read a response tuple from the wire.
1585
This should only be called once.
1587
result = self._recv_tuple()
1589
self._request.finished_reading()
1592
def read_body_bytes(self, count=-1):
1593
"""Read bytes from the body, decoding into a byte stream.
1595
We read all bytes at once to ensure we've checked the trailer for
1596
errors, and then feed the buffer back as read_body_bytes is called.
1598
if self._body_buffer is not None:
1599
return self._body_buffer.read(count)
1600
_body_decoder = LengthPrefixedBodyDecoder()
1602
while not _body_decoder.finished_reading:
1603
bytes_wanted = _body_decoder.next_read_size()
1604
bytes = self._request.read_bytes(bytes_wanted)
1605
_body_decoder.accept_bytes(bytes)
1606
self._request.finished_reading()
1607
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1608
# XXX: TODO check the trailer result.
1609
return self._body_buffer.read(count)
1611
def _recv_tuple(self):
1612
"""Receive a tuple from the medium request."""
1614
while not line or line[-1] != '\n':
1615
# TODO: this is inefficient - but tuples are short.
1616
new_char = self._request.read_bytes(1)
1618
assert new_char != '', "end of file reading from server."
1619
return _decode_tuple(line)
1621
def query_version(self):
1622
"""Return protocol version number of the server."""
1624
resp = self.read_response_tuple()
1625
if resp == ('ok', '1'):
1628
raise errors.SmartProtocolError("bad response %r" % (resp,))
1631
class SmartClientMedium(object):
1632
"""Smart client is a medium for sending smart protocol requests over."""
1634
def disconnect(self):
1635
"""If this medium maintains a persistent connection, close it.
1637
The default implementation does nothing.
1641
class SmartClientStreamMedium(SmartClientMedium):
1642
"""Stream based medium common class.
1644
SmartClientStreamMediums operate on a stream. All subclasses use a common
1645
SmartClientStreamMediumRequest for their requests, and should implement
1646
_accept_bytes and _read_bytes to allow the request objects to send and
1651
self._current_request = None
1653
def accept_bytes(self, bytes):
1654
self._accept_bytes(bytes)
1657
"""The SmartClientStreamMedium knows how to close the stream when it is
1663
"""Flush the output stream.
1665
This method is used by the SmartClientStreamMediumRequest to ensure that
1666
all data for a request is sent, to avoid long timeouts or deadlocks.
1668
raise NotImplementedError(self._flush)
1670
def get_request(self):
1671
"""See SmartClientMedium.get_request().
1673
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1676
return SmartClientStreamMediumRequest(self)
1678
def read_bytes(self, count):
1679
return self._read_bytes(count)
1682
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1683
"""A client medium using simple pipes.
1685
This client does not manage the pipes: it assumes they will always be open.
1688
def __init__(self, readable_pipe, writeable_pipe):
1689
SmartClientStreamMedium.__init__(self)
1690
self._readable_pipe = readable_pipe
1691
self._writeable_pipe = writeable_pipe
1693
def _accept_bytes(self, bytes):
1694
"""See SmartClientStreamMedium.accept_bytes."""
1695
self._writeable_pipe.write(bytes)
1698
"""See SmartClientStreamMedium._flush()."""
1699
self._writeable_pipe.flush()
1701
def _read_bytes(self, count):
1702
"""See SmartClientStreamMedium._read_bytes."""
1703
return self._readable_pipe.read(count)
1706
class SmartSSHClientMedium(SmartClientStreamMedium):
1707
"""A client medium using SSH."""
1709
def __init__(self, host, port=None, username=None, password=None,
1711
"""Creates a client that will connect on the first use.
1713
:param vendor: An optional override for the ssh vendor to use. See
1714
bzrlib.transport.ssh for details on ssh vendors.
1716
SmartClientStreamMedium.__init__(self)
1717
self._connected = False
1719
self._password = password
1721
self._username = username
1722
self._read_from = None
1723
self._ssh_connection = None
1724
self._vendor = vendor
1725
self._write_to = None
1727
def _accept_bytes(self, bytes):
1728
"""See SmartClientStreamMedium.accept_bytes."""
1729
self._ensure_connection()
1730
self._write_to.write(bytes)
1732
def disconnect(self):
1733
"""See SmartClientMedium.disconnect()."""
1734
if not self._connected:
1736
self._read_from.close()
1737
self._write_to.close()
1738
self._ssh_connection.close()
1739
self._connected = False
1741
def _ensure_connection(self):
1742
"""Connect this medium if not already connected."""
1745
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1746
if self._vendor is None:
1747
vendor = ssh._get_ssh_vendor()
1749
vendor = self._vendor
1750
self._ssh_connection = vendor.connect_ssh(self._username,
1751
self._password, self._host, self._port,
1752
command=[executable, 'serve', '--inet', '--directory=/',
1754
self._read_from, self._write_to = \
1755
self._ssh_connection.get_filelike_channels()
1756
self._connected = True
1759
"""See SmartClientStreamMedium._flush()."""
1760
self._write_to.flush()
1762
def _read_bytes(self, count):
1763
"""See SmartClientStreamMedium.read_bytes."""
1764
if not self._connected:
1765
raise errors.MediumNotConnected(self)
1766
return self._read_from.read(count)
1769
class SmartTCPClientMedium(SmartClientStreamMedium):
1770
"""A client medium using TCP."""
1772
def __init__(self, host, port):
1773
"""Creates a client that will connect on the first use."""
1774
SmartClientStreamMedium.__init__(self)
1775
self._connected = False
1780
def _accept_bytes(self, bytes):
1781
"""See SmartClientMedium.accept_bytes."""
1782
self._ensure_connection()
1783
self._socket.sendall(bytes)
1785
def disconnect(self):
1786
"""See SmartClientMedium.disconnect()."""
1787
if not self._connected:
1789
self._socket.close()
1791
self._connected = False
1793
def _ensure_connection(self):
1794
"""Connect this medium if not already connected."""
1797
self._socket = socket.socket()
1798
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1799
result = self._socket.connect_ex((self._host, int(self._port)))
1801
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1802
(self._host, self._port, os.strerror(result)))
1803
self._connected = True
1806
"""See SmartClientStreamMedium._flush().
1808
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1809
add a means to do a flush, but that can be done in the future.
1812
def _read_bytes(self, count):
1813
"""See SmartClientMedium.read_bytes."""
1814
if not self._connected:
1815
raise errors.MediumNotConnected(self)
1816
return self._socket.recv(count)
1819
402
class SmartTCPTransport(SmartTransport):
1820
403
"""Connection to smart server over plain tcp.