1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the idea
51
that you have multiple requests and get a read error because the other side did
52
shutdown. For pipes we have read pipe which will have a zero read which marks
53
end-of-file. For HTTP server environment there is not end-of-stream because
54
each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out requests from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialization, deserialization) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialization, deserialization) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
197
from cStringIO import StringIO
213
from bzrlib.bundle.serializer import write_bundle
215
from bzrlib.transport import ssh
216
except errors.ParamikoNotPresent:
217
# no paramiko. SmartSSHClientMedium will break.
220
# must do this otherwise urllib can't parse the urls properly :(
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
222
transport.register_urlparse_netloc_protocol(scheme)
226
def _recv_tuple(from_file):
227
req_line = from_file.readline()
228
return _decode_tuple(req_line)
231
def _decode_tuple(req_line):
232
if req_line == None or req_line == '':
234
if req_line[-1] != '\n':
235
raise errors.SmartProtocolError("request %r not terminated" % req_line)
237
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
238
except UnicodeDecodeError:
239
raise errors.SmartProtocolError(
240
"one or more arguments of request %r are not valid UTF-8"
244
def _encode_tuple(args):
245
"""Encode the tuple args to a bytestream."""
246
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
249
class SmartProtocolBase(object):
250
"""Methods common to client and server"""
252
# TODO: this only actually accomodates a single block; possibly should
253
# support multiple chunks?
254
def _encode_bulk_data(self, body):
255
"""Encode body as a bulk data chunk."""
256
return ''.join(('%d\n' % len(body), body, 'done\n'))
258
def _serialise_offsets(self, offsets):
259
"""Serialise a readv offset list."""
261
for start, length in offsets:
262
txt.append('%d,%d' % (start, length))
263
return '\n'.join(txt)
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
267
"""Server-side encoding and decoding logic for smart version 1."""
269
def __init__(self, backing_transport, write_func):
270
self._backing_transport = backing_transport
271
self.excess_buffer = ''
272
self._finished_reading = False
274
self.has_dispatched = False
276
self._body_decoder = None
277
self._write_func = write_func
279
def accept_bytes(self, bytes):
280
"""Take bytes, and advance the internal state machine appropriately.
282
:param bytes: must be a byte string
284
assert isinstance(bytes, str)
285
self.in_buffer += bytes
286
if not self.has_dispatched:
287
if '\n' not in self.in_buffer:
288
# no command line yet
290
self.has_dispatched = True
292
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
294
req_args = _decode_tuple(first_line)
295
self.request = SmartServerRequestHandler(
296
self._backing_transport)
297
self.request.dispatch_command(req_args[0], req_args[1:])
298
if self.request.finished_reading:
300
self.excess_buffer = self.in_buffer
302
self._send_response(self.request.response.args,
303
self.request.response.body)
304
self.sync_with_request(self.request)
305
except KeyboardInterrupt:
307
except Exception, exception:
308
# everything else: pass to client, flush, and quit
309
self._send_response(('error', str(exception)))
312
if self.has_dispatched:
313
if self._finished_reading:
314
# nothing to do.XXX: this routine should be a single state
316
self.excess_buffer += self.in_buffer
319
if self._body_decoder is None:
320
self._body_decoder = LengthPrefixedBodyDecoder()
321
self._body_decoder.accept_bytes(self.in_buffer)
322
self.in_buffer = self._body_decoder.unused_data
323
body_data = self._body_decoder.read_pending_data()
324
self.request.accept_body(body_data)
325
if self._body_decoder.finished_reading:
326
self.request.end_of_body()
327
assert self.request.finished_reading, \
328
"no more body, request not finished"
329
self.sync_with_request(self.request)
330
if self.request.response is not None:
331
self._send_response(self.request.response.args,
332
self.request.response.body)
333
self.excess_buffer = self.in_buffer
336
assert not self.request.finished_reading, \
337
"no response and we have finished reading."
339
def _send_response(self, args, body=None):
340
"""Send a smart server response down the output stream."""
341
self._write_func(_encode_tuple(args))
343
assert isinstance(body, str), 'body must be a str'
344
bytes = self._encode_bulk_data(body)
345
self._write_func(bytes)
347
def sync_with_request(self, request):
348
self._finished_reading = request.finished_reading
350
def next_read_size(self):
351
if self._finished_reading:
353
if self._body_decoder is None:
356
return self._body_decoder.next_read_size()
359
class LengthPrefixedBodyDecoder(object):
360
"""Decodes the length-prefixed bulk data."""
363
self.bytes_left = None
364
self.finished_reading = False
365
self.unused_data = ''
366
self.state_accept = self._state_accept_expecting_length
367
self.state_read = self._state_read_no_data
369
self._trailer_buffer = ''
371
def accept_bytes(self, bytes):
372
"""Decode as much of bytes as possible.
374
If 'bytes' contains too much data it will be appended to
377
finished_reading will be set when no more data is required. Further
378
data will be appended to self.unused_data.
380
# accept_bytes is allowed to change the state
381
current_state = self.state_accept
382
self.state_accept(bytes)
383
while current_state != self.state_accept:
384
current_state = self.state_accept
385
self.state_accept('')
387
def next_read_size(self):
388
if self.bytes_left is not None:
389
# Ideally we want to read all the remainder of the body and the
391
return self.bytes_left + 5
392
elif self.state_accept == self._state_accept_reading_trailer:
393
# Just the trailer left
394
return 5 - len(self._trailer_buffer)
395
elif self.state_accept == self._state_accept_expecting_length:
396
# There's still at least 6 bytes left ('\n' to end the length, plus
400
# Reading excess data. Either way, 1 byte at a time is fine.
403
def read_pending_data(self):
404
"""Return any pending data that has been decoded."""
405
return self.state_read()
407
def _state_accept_expecting_length(self, bytes):
408
self._in_buffer += bytes
409
pos = self._in_buffer.find('\n')
412
self.bytes_left = int(self._in_buffer[:pos])
413
self._in_buffer = self._in_buffer[pos+1:]
414
self.bytes_left -= len(self._in_buffer)
415
self.state_accept = self._state_accept_reading_body
416
self.state_read = self._state_read_in_buffer
418
def _state_accept_reading_body(self, bytes):
419
self._in_buffer += bytes
420
self.bytes_left -= len(bytes)
421
if self.bytes_left <= 0:
423
if self.bytes_left != 0:
424
self._trailer_buffer = self._in_buffer[self.bytes_left:]
425
self._in_buffer = self._in_buffer[:self.bytes_left]
426
self.bytes_left = None
427
self.state_accept = self._state_accept_reading_trailer
429
def _state_accept_reading_trailer(self, bytes):
430
self._trailer_buffer += bytes
431
# TODO: what if the trailer does not match "done\n"? Should this raise
432
# a ProtocolViolation exception?
433
if self._trailer_buffer.startswith('done\n'):
434
self.unused_data = self._trailer_buffer[len('done\n'):]
435
self.state_accept = self._state_accept_reading_unused
436
self.finished_reading = True
438
def _state_accept_reading_unused(self, bytes):
439
self.unused_data += bytes
441
def _state_read_no_data(self):
444
def _state_read_in_buffer(self):
445
result = self._in_buffer
450
class SmartServerStreamMedium(object):
451
"""Handles smart commands coming over a stream.
453
The stream may be a pipe connected to sshd, or a tcp socket, or an
454
in-process fifo for testing.
456
One instance is created for each connected client; it can serve multiple
457
requests in the lifetime of the connection.
459
The server passes requests through to an underlying backing transport,
460
which will typically be a LocalTransport looking at the server's filesystem.
463
def __init__(self, backing_transport):
464
"""Construct new server.
466
:param backing_transport: Transport for the directory served.
468
# backing_transport could be passed to serve instead of __init__
469
self.backing_transport = backing_transport
470
self.finished = False
473
"""Serve requests until the client disconnects."""
474
# Keep a reference to stderr because the sys module's globals get set to
475
# None during interpreter shutdown.
476
from sys import stderr
478
while not self.finished:
479
protocol = SmartServerRequestProtocolOne(self.backing_transport,
481
self._serve_one_request(protocol)
483
stderr.write("%s terminating on exception %s\n" % (self, e))
486
def _serve_one_request(self, protocol):
487
"""Read one request from input, process, send back a response.
489
:param protocol: a SmartServerRequestProtocol.
492
self._serve_one_request_unguarded(protocol)
493
except KeyboardInterrupt:
496
self.terminate_due_to_error()
498
def terminate_due_to_error(self):
499
"""Called when an unhandled exception from the protocol occurs."""
500
raise NotImplementedError(self.terminate_due_to_error)
503
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
505
def __init__(self, sock, backing_transport):
508
:param sock: the socket the server will read from. It will be put
511
SmartServerStreamMedium.__init__(self, backing_transport)
513
sock.setblocking(True)
516
def _serve_one_request_unguarded(self, protocol):
517
while protocol.next_read_size():
519
protocol.accept_bytes(self.push_back)
522
bytes = self.socket.recv(4096)
526
protocol.accept_bytes(bytes)
528
self.push_back = protocol.excess_buffer
530
def terminate_due_to_error(self):
531
"""Called when an unhandled exception from the protocol occurs."""
532
# TODO: This should log to a server log file, but no such thing
533
# exists yet. Andrew Bennetts 2006-09-29.
537
def _write_out(self, bytes):
538
self.socket.sendall(bytes)
541
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
543
def __init__(self, in_file, out_file, backing_transport):
544
"""Construct new server.
546
:param in_file: Python file from which requests can be read.
547
:param out_file: Python file to write responses.
548
:param backing_transport: Transport for the directory served.
550
SmartServerStreamMedium.__init__(self, backing_transport)
554
def _serve_one_request_unguarded(self, protocol):
556
bytes_to_read = protocol.next_read_size()
557
if bytes_to_read == 0:
558
# Finished serving this request.
561
bytes = self._in.read(bytes_to_read)
563
# Connection has been closed.
567
protocol.accept_bytes(bytes)
569
def terminate_due_to_error(self):
570
# TODO: This should log to a server log file, but no such thing
571
# exists yet. Andrew Bennetts 2006-09-29.
575
def _write_out(self, bytes):
576
self._out.write(bytes)
579
class SmartServerResponse(object):
580
"""Response generated by SmartServerRequestHandler."""
582
def __init__(self, args, body=None):
586
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
587
# for delivering the data for a request. This could be done with as the
588
# StreamServer, though that would create conflation between request and response
589
# which may be undesirable.
592
class SmartServerRequestHandler(object):
593
"""Protocol logic for smart server.
595
This doesn't handle serialization at all, it just processes requests and
599
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
600
# not contain encoding or decoding logic to allow the wire protocol to vary
601
# from the object protocol: we will want to tweak the wire protocol separate
602
# from the object model, and ideally we will be able to do that without
603
# having a SmartServerRequestHandler subclass for each wire protocol, rather
604
# just a Protocol subclass.
606
# TODO: Better way of representing the body for commands that take it,
607
# and allow it to be streamed into the server.
609
def __init__(self, backing_transport):
610
self._backing_transport = backing_transport
611
self._converted_command = False
612
self.finished_reading = False
613
self._body_bytes = ''
616
def accept_body(self, bytes):
619
This should be overriden for each command that desired body data to
620
handle the right format of that data. I.e. plain bytes, a bundle etc.
622
The deserialisation into that format should be done in the Protocol
623
object. Set self.desired_body_format to the format your method will
626
# default fallback is to accumulate bytes.
627
self._body_bytes += bytes
629
def _end_of_body_handler(self):
630
"""An unimplemented end of body handler."""
631
raise NotImplementedError(self._end_of_body_handler)
634
"""Answer a version request with my version."""
635
return SmartServerResponse(('ok', '1'))
637
def do_has(self, relpath):
638
r = self._backing_transport.has(relpath) and 'yes' or 'no'
639
return SmartServerResponse((r,))
641
def do_get(self, relpath):
642
backing_bytes = self._backing_transport.get_bytes(relpath)
643
return SmartServerResponse(('ok',), backing_bytes)
645
def _deserialise_optional_mode(self, mode):
646
# XXX: FIXME this should be on the protocol object.
652
def do_append(self, relpath, mode):
653
self._converted_command = True
654
self._relpath = relpath
655
self._mode = self._deserialise_optional_mode(mode)
656
self._end_of_body_handler = self._handle_do_append_end
658
def _handle_do_append_end(self):
659
old_length = self._backing_transport.append_bytes(
660
self._relpath, self._body_bytes, self._mode)
661
self.response = SmartServerResponse(('appended', '%d' % old_length))
663
def do_delete(self, relpath):
664
self._backing_transport.delete(relpath)
666
def do_iter_files_recursive(self, abspath):
667
# XXX: the path handling needs some thought.
668
#relpath = self._backing_transport.relpath(abspath)
669
transport = self._backing_transport.clone(abspath)
670
filenames = transport.iter_files_recursive()
671
return SmartServerResponse(('names',) + tuple(filenames))
673
def do_list_dir(self, relpath):
674
filenames = self._backing_transport.list_dir(relpath)
675
return SmartServerResponse(('names',) + tuple(filenames))
677
def do_mkdir(self, relpath, mode):
678
self._backing_transport.mkdir(relpath,
679
self._deserialise_optional_mode(mode))
681
def do_move(self, rel_from, rel_to):
682
self._backing_transport.move(rel_from, rel_to)
684
def do_put(self, relpath, mode):
685
self._converted_command = True
686
self._relpath = relpath
687
self._mode = self._deserialise_optional_mode(mode)
688
self._end_of_body_handler = self._handle_do_put
690
def _handle_do_put(self):
691
self._backing_transport.put_bytes(self._relpath,
692
self._body_bytes, self._mode)
693
self.response = SmartServerResponse(('ok',))
695
def _deserialise_offsets(self, text):
696
# XXX: FIXME this should be on the protocol object.
698
for line in text.split('\n'):
701
start, length = line.split(',')
702
offsets.append((int(start), int(length)))
705
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
706
self._converted_command = True
707
self._end_of_body_handler = self._handle_put_non_atomic
708
self._relpath = relpath
709
self._dir_mode = self._deserialise_optional_mode(dir_mode)
710
self._mode = self._deserialise_optional_mode(mode)
711
# a boolean would be nicer XXX
712
self._create_parent = (create_parent == 'T')
714
def _handle_put_non_atomic(self):
715
self._backing_transport.put_bytes_non_atomic(self._relpath,
718
create_parent_dir=self._create_parent,
719
dir_mode=self._dir_mode)
720
self.response = SmartServerResponse(('ok',))
722
def do_readv(self, relpath):
723
self._converted_command = True
724
self._end_of_body_handler = self._handle_readv_offsets
725
self._relpath = relpath
727
def end_of_body(self):
728
"""No more body data will be received."""
729
self._run_handler_code(self._end_of_body_handler, (), {})
730
# cannot read after this.
731
self.finished_reading = True
733
def _handle_readv_offsets(self):
734
"""accept offsets for a readv request."""
735
offsets = self._deserialise_offsets(self._body_bytes)
736
backing_bytes = ''.join(bytes for offset, bytes in
737
self._backing_transport.readv(self._relpath, offsets))
738
self.response = SmartServerResponse(('readv',), backing_bytes)
740
def do_rename(self, rel_from, rel_to):
741
self._backing_transport.rename(rel_from, rel_to)
743
def do_rmdir(self, relpath):
744
self._backing_transport.rmdir(relpath)
746
def do_stat(self, relpath):
747
stat = self._backing_transport.stat(relpath)
748
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
750
def do_get_bundle(self, path, revision_id):
751
# open transport relative to our base
752
t = self._backing_transport.clone(path)
753
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
754
repo = control.open_repository()
755
tmpf = tempfile.TemporaryFile()
756
base_revision = revision.NULL_REVISION
757
write_bundle(repo, revision_id, base_revision, tmpf)
759
return SmartServerResponse((), tmpf.read())
761
def dispatch_command(self, cmd, args):
762
"""Deprecated compatibility method.""" # XXX XXX
763
func = getattr(self, 'do_' + cmd, None)
765
raise errors.SmartProtocolError("bad request %r" % (cmd,))
766
self._run_handler_code(func, args, {})
768
def _run_handler_code(self, callable, args, kwargs):
769
"""Run some handler specific code 'callable'.
771
If a result is returned, it is considered to be the commands response,
772
and finished_reading is set true, and its assigned to self.response.
774
Any exceptions caught are translated and a response object created
777
result = self._call_converting_errors(callable, args, kwargs)
778
if result is not None:
779
self.response = result
780
self.finished_reading = True
781
# handle unconverted commands
782
if not self._converted_command:
783
self.finished_reading = True
785
self.response = SmartServerResponse(('ok',))
787
def _call_converting_errors(self, callable, args, kwargs):
788
"""Call callable converting errors to Response objects."""
790
return callable(*args, **kwargs)
791
except errors.NoSuchFile, e:
792
return SmartServerResponse(('NoSuchFile', e.path))
793
except errors.FileExists, e:
794
return SmartServerResponse(('FileExists', e.path))
795
except errors.DirectoryNotEmpty, e:
796
return SmartServerResponse(('DirectoryNotEmpty', e.path))
797
except errors.ShortReadvError, e:
798
return SmartServerResponse(('ShortReadvError',
799
e.path, str(e.offset), str(e.length), str(e.actual)))
800
except UnicodeError, e:
801
# If it is a DecodeError, than most likely we are starting
802
# with a plain string
803
str_or_unicode = e.object
804
if isinstance(str_or_unicode, unicode):
805
val = u'u:' + str_or_unicode
807
val = u's:' + str_or_unicode.encode('base64')
808
# This handles UnicodeEncodeError or UnicodeDecodeError
809
return SmartServerResponse((e.__class__.__name__,
810
e.encoding, val, str(e.start), str(e.end), e.reason))
811
except errors.TransportNotPossible, e:
812
if e.msg == "readonly transport":
813
return SmartServerResponse(('ReadOnlyError', ))
818
class SmartTCPServer(object):
819
"""Listens on a TCP socket and accepts connections from smart clients"""
821
def __init__(self, backing_transport, host='127.0.0.1', port=0):
822
"""Construct a new server.
824
To actually start it running, call either start_background_thread or
827
:param host: Name of the interface to listen on.
828
:param port: TCP port to listen on, or 0 to allocate a transient port.
830
self._server_socket = socket.socket()
831
self._server_socket.bind((host, port))
832
self.port = self._server_socket.getsockname()[1]
833
self._server_socket.listen(1)
834
self._server_socket.settimeout(1)
835
self.backing_transport = backing_transport
838
# let connections timeout so that we get a chance to terminate
839
# Keep a reference to the exceptions we want to catch because the socket
840
# module's globals get set to None during interpreter shutdown.
841
from socket import timeout as socket_timeout
842
from socket import error as socket_error
843
self._should_terminate = False
844
while not self._should_terminate:
846
self.accept_and_serve()
847
except socket_timeout:
848
# just check if we're asked to stop
850
except socket_error, e:
851
trace.warning("client disconnected: %s", e)
855
"""Return the url of the server"""
856
return "bzr://%s:%d/" % self._server_socket.getsockname()
858
def accept_and_serve(self):
859
conn, client_addr = self._server_socket.accept()
860
# For WIN32, where the timeout value from the listening socket
861
# propogates to the newly accepted socket.
862
conn.setblocking(True)
863
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
864
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
865
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
866
connection_thread.setDaemon(True)
867
connection_thread.start()
869
def start_background_thread(self):
870
self._server_thread = threading.Thread(None,
872
name='server-' + self.get_url())
873
self._server_thread.setDaemon(True)
874
self._server_thread.start()
876
def stop_background_thread(self):
877
self._should_terminate = True
878
# self._server_socket.close()
879
# we used to join the thread, but it's not really necessary; it will
881
## self._server_thread.join()
884
class SmartTCPServer_for_testing(SmartTCPServer):
885
"""Server suitable for use by transport tests.
887
This server is backed by the process's cwd.
891
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
892
# The server is set up by default like for ssh access: the client
893
# passes filesystem-absolute paths; therefore the server must look
894
# them up relative to the root directory. it might be better to act
895
# a public server and have the server rewrite paths into the test
897
SmartTCPServer.__init__(self,
898
transport.get_transport(urlutils.local_path_to_url('/')))
901
"""Set up server for testing"""
902
self.start_background_thread()
905
self.stop_background_thread()
908
"""Return the url of the server"""
909
host, port = self._server_socket.getsockname()
910
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
912
def get_bogus_url(self):
913
"""Return a URL which will fail to connect"""
914
return 'bzr://127.0.0.1:1/'
917
class SmartStat(object):
919
def __init__(self, size, mode):
924
class SmartTransport(transport.Transport):
925
"""Connection to a smart server.
927
The connection holds references to pipes that can be used to send requests
930
The connection has a notion of the current directory to which it's
931
connected; this is incorporated in filenames passed to the server.
933
This supports some higher-level RPC operations and can also be treated
934
like a Transport to do file-like operations.
936
The connection can be made over a tcp socket, or (in future) an ssh pipe
937
or a series of http requests. There are concrete subclasses for each
938
type: SmartTCPTransport, etc.
941
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
942
# responsibilities: Put those on SmartClient or similar. This is vital for
943
# the ability to support multiple versions of the smart protocol over time:
944
# SmartTransport is an adapter from the Transport object model to the
945
# SmartClient model, not an encoder.
947
def __init__(self, url, clone_from=None, medium=None):
950
:param medium: The medium to use for this RemoteTransport. This must be
951
supplied if clone_from is None.
953
### Technically super() here is faulty because Transport's __init__
954
### fails to take 2 parameters, and if super were to choose a silly
955
### initialisation order things would blow up.
956
if not url.endswith('/'):
958
super(SmartTransport, self).__init__(url)
959
self._scheme, self._username, self._password, self._host, self._port, self._path = \
960
transport.split_url(url)
961
if clone_from is None:
962
self._medium = medium
964
# credentials may be stripped from the base in some circumstances
965
# as yet to be clearly defined or documented, so copy them.
966
self._username = clone_from._username
967
# reuse same connection
968
self._medium = clone_from._medium
969
assert self._medium is not None
971
def abspath(self, relpath):
972
"""Return the full url to the given relative path.
974
@param relpath: the relative path or path components
975
@type relpath: str or list
977
return self._unparse_url(self._remote_path(relpath))
979
def clone(self, relative_url):
980
"""Make a new SmartTransport related to me, sharing the same connection.
982
This essentially opens a handle on a different remote directory.
984
if relative_url is None:
985
return SmartTransport(self.base, self)
987
return SmartTransport(self.abspath(relative_url), self)
989
def is_readonly(self):
990
"""Smart server transport can do read/write file operations."""
993
def get_smart_client(self):
996
def get_smart_medium(self):
999
def _unparse_url(self, path):
1000
"""Return URL for a path.
1002
:see: SFTPUrlHandling._unparse_url
1004
# TODO: Eventually it should be possible to unify this with
1005
# SFTPUrlHandling._unparse_url?
1008
path = urllib.quote(path)
1009
netloc = urllib.quote(self._host)
1010
if self._username is not None:
1011
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1012
if self._port is not None:
1013
netloc = '%s:%d' % (netloc, self._port)
1014
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1016
def _remote_path(self, relpath):
1017
"""Returns the Unicode version of the absolute path for relpath."""
1018
return self._combine_paths(self._path, relpath)
1020
def _call(self, method, *args):
1021
resp = self._call2(method, *args)
1022
self._translate_error(resp)
1024
def _call2(self, method, *args):
1025
"""Call a method on the remote server."""
1026
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1027
protocol.call(method, *args)
1028
return protocol.read_response_tuple()
1030
def _call_with_body_bytes(self, method, args, body):
1031
"""Call a method on the remote server with body bytes."""
1032
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1033
protocol.call_with_body_bytes((method, ) + args, body)
1034
return protocol.read_response_tuple()
1036
def has(self, relpath):
1037
"""Indicate whether a remote file of the given name exists or not.
1039
:see: Transport.has()
1041
resp = self._call2('has', self._remote_path(relpath))
1042
if resp == ('yes', ):
1044
elif resp == ('no', ):
1047
self._translate_error(resp)
1049
def get(self, relpath):
1050
"""Return file-like object reading the contents of a remote file.
1052
:see: Transport.get_bytes()/get_file()
1054
return StringIO(self.get_bytes(relpath))
1056
def get_bytes(self, relpath):
1057
remote = self._remote_path(relpath)
1058
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1059
protocol.call('get', remote)
1060
resp = protocol.read_response_tuple(True)
1061
if resp != ('ok', ):
1062
protocol.cancel_read_body()
1063
self._translate_error(resp, relpath)
1064
return protocol.read_body_bytes()
1066
def _serialise_optional_mode(self, mode):
1072
def mkdir(self, relpath, mode=None):
1073
resp = self._call2('mkdir', self._remote_path(relpath),
1074
self._serialise_optional_mode(mode))
1075
self._translate_error(resp)
1077
def put_bytes(self, relpath, upload_contents, mode=None):
1078
# FIXME: upload_file is probably not safe for non-ascii characters -
1079
# should probably just pass all parameters as length-delimited
1081
resp = self._call_with_body_bytes('put',
1082
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1084
self._translate_error(resp)
1086
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1087
create_parent_dir=False,
1089
"""See Transport.put_bytes_non_atomic."""
1090
# FIXME: no encoding in the transport!
1091
create_parent_str = 'F'
1092
if create_parent_dir:
1093
create_parent_str = 'T'
1095
resp = self._call_with_body_bytes(
1097
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1098
create_parent_str, self._serialise_optional_mode(dir_mode)),
1100
self._translate_error(resp)
1102
def put_file(self, relpath, upload_file, mode=None):
1103
# its not ideal to seek back, but currently put_non_atomic_file depends
1104
# on transports not reading before failing - which is a faulty
1105
# assumption I think - RBC 20060915
1106
pos = upload_file.tell()
1108
return self.put_bytes(relpath, upload_file.read(), mode)
1110
upload_file.seek(pos)
1113
def put_file_non_atomic(self, relpath, f, mode=None,
1114
create_parent_dir=False,
1116
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1117
create_parent_dir=create_parent_dir,
1120
def append_file(self, relpath, from_file, mode=None):
1121
return self.append_bytes(relpath, from_file.read(), mode)
1123
def append_bytes(self, relpath, bytes, mode=None):
1124
resp = self._call_with_body_bytes(
1126
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1128
if resp[0] == 'appended':
1130
self._translate_error(resp)
1132
def delete(self, relpath):
1133
resp = self._call2('delete', self._remote_path(relpath))
1134
self._translate_error(resp)
1136
def readv(self, relpath, offsets):
1140
offsets = list(offsets)
1142
sorted_offsets = sorted(offsets)
1143
# turn the list of offsets into a stack
1144
offset_stack = iter(offsets)
1145
cur_offset_and_size = offset_stack.next()
1146
coalesced = list(self._coalesce_offsets(sorted_offsets,
1147
limit=self._max_readv_combine,
1148
fudge_factor=self._bytes_to_read_before_seek))
1150
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1151
protocol.call_with_body_readv_array(
1152
('readv', self._remote_path(relpath)),
1153
[(c.start, c.length) for c in coalesced])
1154
resp = protocol.read_response_tuple(True)
1156
if resp[0] != 'readv':
1157
# This should raise an exception
1158
protocol.cancel_read_body()
1159
self._translate_error(resp)
1162
# FIXME: this should know how many bytes are needed, for clarity.
1163
data = protocol.read_body_bytes()
1164
# Cache the results, but only until they have been fulfilled
1166
for c_offset in coalesced:
1167
if len(data) < c_offset.length:
1168
raise errors.ShortReadvError(relpath, c_offset.start,
1169
c_offset.length, actual=len(data))
1170
for suboffset, subsize in c_offset.ranges:
1171
key = (c_offset.start+suboffset, subsize)
1172
data_map[key] = data[suboffset:suboffset+subsize]
1173
data = data[c_offset.length:]
1175
# Now that we've read some data, see if we can yield anything back
1176
while cur_offset_and_size in data_map:
1177
this_data = data_map.pop(cur_offset_and_size)
1178
yield cur_offset_and_size[0], this_data
1179
cur_offset_and_size = offset_stack.next()
1181
def rename(self, rel_from, rel_to):
1182
self._call('rename',
1183
self._remote_path(rel_from),
1184
self._remote_path(rel_to))
1186
def move(self, rel_from, rel_to):
1188
self._remote_path(rel_from),
1189
self._remote_path(rel_to))
1191
def rmdir(self, relpath):
1192
resp = self._call('rmdir', self._remote_path(relpath))
1194
def _translate_error(self, resp, orig_path=None):
1195
"""Raise an exception from a response"""
1202
elif what == 'NoSuchFile':
1203
if orig_path is not None:
1204
error_path = orig_path
1206
error_path = resp[1]
1207
raise errors.NoSuchFile(error_path)
1208
elif what == 'error':
1209
raise errors.SmartProtocolError(unicode(resp[1]))
1210
elif what == 'FileExists':
1211
raise errors.FileExists(resp[1])
1212
elif what == 'DirectoryNotEmpty':
1213
raise errors.DirectoryNotEmpty(resp[1])
1214
elif what == 'ShortReadvError':
1215
raise errors.ShortReadvError(resp[1], int(resp[2]),
1216
int(resp[3]), int(resp[4]))
1217
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1218
encoding = str(resp[1]) # encoding must always be a string
1220
start = int(resp[3])
1222
reason = str(resp[5]) # reason must always be a string
1223
if val.startswith('u:'):
1225
elif val.startswith('s:'):
1226
val = val[2:].decode('base64')
1227
if what == 'UnicodeDecodeError':
1228
raise UnicodeDecodeError(encoding, val, start, end, reason)
1229
elif what == 'UnicodeEncodeError':
1230
raise UnicodeEncodeError(encoding, val, start, end, reason)
1231
elif what == "ReadOnlyError":
1232
raise errors.TransportNotPossible('readonly transport')
1234
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1236
def disconnect(self):
1237
self._medium.disconnect()
1239
def delete_tree(self, relpath):
1240
raise errors.TransportNotPossible('readonly transport')
1242
def stat(self, relpath):
1243
resp = self._call2('stat', self._remote_path(relpath))
1244
if resp[0] == 'stat':
1245
return SmartStat(int(resp[1]), int(resp[2], 8))
1247
self._translate_error(resp)
1249
## def lock_read(self, relpath):
1250
## """Lock the given file for shared (read) access.
1251
## :return: A lock object, which should be passed to Transport.unlock()
1253
## # The old RemoteBranch ignore lock for reading, so we will
1254
## # continue that tradition and return a bogus lock object.
1255
## class BogusLock(object):
1256
## def __init__(self, path):
1258
## def unlock(self):
1260
## return BogusLock(relpath)
1265
def list_dir(self, relpath):
1266
resp = self._call2('list_dir', self._remote_path(relpath))
1267
if resp[0] == 'names':
1268
return [name.encode('ascii') for name in resp[1:]]
1270
self._translate_error(resp)
1272
def iter_files_recursive(self):
1273
resp = self._call2('iter_files_recursive', self._remote_path(''))
1274
if resp[0] == 'names':
1277
self._translate_error(resp)
1280
class SmartClientMediumRequest(object):
1281
"""A request on a SmartClientMedium.
1283
Each request allows bytes to be provided to it via accept_bytes, and then
1284
the response bytes to be read via read_bytes.
1287
request.accept_bytes('123')
1288
request.finished_writing()
1289
result = request.read_bytes(3)
1290
request.finished_reading()
1292
It is up to the individual SmartClientMedium whether multiple concurrent
1293
requests can exist. See SmartClientMedium.get_request to obtain instances
1294
of SmartClientMediumRequest, and the concrete Medium you are using for
1295
details on concurrency and pipelining.
1298
def __init__(self, medium):
1299
"""Construct a SmartClientMediumRequest for the medium medium."""
1300
self._medium = medium
1301
# we track state by constants - we may want to use the same
1302
# pattern as BodyReader if it gets more complex.
1303
# valid states are: "writing", "reading", "done"
1304
self._state = "writing"
1306
def accept_bytes(self, bytes):
1307
"""Accept bytes for inclusion in this request.
1309
This method may not be be called after finished_writing() has been
1310
called. It depends upon the Medium whether or not the bytes will be
1311
immediately transmitted. Message based Mediums will tend to buffer the
1312
bytes until finished_writing() is called.
1314
:param bytes: A bytestring.
1316
if self._state != "writing":
1317
raise errors.WritingCompleted(self)
1318
self._accept_bytes(bytes)
1320
def _accept_bytes(self, bytes):
1321
"""Helper for accept_bytes.
1323
Accept_bytes checks the state of the request to determing if bytes
1324
should be accepted. After that it hands off to _accept_bytes to do the
1327
raise NotImplementedError(self._accept_bytes)
1329
def finished_reading(self):
1330
"""Inform the request that all desired data has been read.
1332
This will remove the request from the pipeline for its medium (if the
1333
medium supports pipelining) and any further calls to methods on the
1334
request will raise ReadingCompleted.
1336
if self._state == "writing":
1337
raise errors.WritingNotComplete(self)
1338
if self._state != "reading":
1339
raise errors.ReadingCompleted(self)
1340
self._state = "done"
1341
self._finished_reading()
1343
def _finished_reading(self):
1344
"""Helper for finished_reading.
1346
finished_reading checks the state of the request to determine if
1347
finished_reading is allowed, and if it is hands off to _finished_reading
1348
to perform the action.
1350
raise NotImplementedError(self._finished_reading)
1352
def finished_writing(self):
1353
"""Finish the writing phase of this request.
1355
This will flush all pending data for this request along the medium.
1356
After calling finished_writing, you may not call accept_bytes anymore.
1358
if self._state != "writing":
1359
raise errors.WritingCompleted(self)
1360
self._state = "reading"
1361
self._finished_writing()
1363
def _finished_writing(self):
1364
"""Helper for finished_writing.
1366
finished_writing checks the state of the request to determine if
1367
finished_writing is allowed, and if it is hands off to _finished_writing
1368
to perform the action.
1370
raise NotImplementedError(self._finished_writing)
1372
def read_bytes(self, count):
1373
"""Read bytes from this requests response.
1375
This method will block and wait for count bytes to be read. It may not
1376
be invoked until finished_writing() has been called - this is to ensure
1377
a message-based approach to requests, for compatability with message
1378
based mediums like HTTP.
1380
if self._state == "writing":
1381
raise errors.WritingNotComplete(self)
1382
if self._state != "reading":
1383
raise errors.ReadingCompleted(self)
1384
return self._read_bytes(count)
1386
def _read_bytes(self, count):
1387
"""Helper for read_bytes.
1389
read_bytes checks the state of the request to determing if bytes
1390
should be read. After that it hands off to _read_bytes to do the
1393
raise NotImplementedError(self._read_bytes)
1396
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1397
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1399
def __init__(self, medium):
1400
SmartClientMediumRequest.__init__(self, medium)
1401
# check that we are safe concurrency wise. If some streams start
1402
# allowing concurrent requests - i.e. via multiplexing - then this
1403
# assert should be moved to SmartClientStreamMedium.get_request,
1404
# and the setting/unsetting of _current_request likewise moved into
1405
# that class : but its unneeded overhead for now. RBC 20060922
1406
if self._medium._current_request is not None:
1407
raise errors.TooManyConcurrentRequests(self._medium)
1408
self._medium._current_request = self
1410
def _accept_bytes(self, bytes):
1411
"""See SmartClientMediumRequest._accept_bytes.
1413
This forwards to self._medium._accept_bytes because we are operating
1414
on the mediums stream.
1416
self._medium._accept_bytes(bytes)
1418
def _finished_reading(self):
1419
"""See SmartClientMediumRequest._finished_reading.
1421
This clears the _current_request on self._medium to allow a new
1422
request to be created.
1424
assert self._medium._current_request is self
1425
self._medium._current_request = None
1427
def _finished_writing(self):
1428
"""See SmartClientMediumRequest._finished_writing.
1430
This invokes self._medium._flush to ensure all bytes are transmitted.
1432
self._medium._flush()
1434
def _read_bytes(self, count):
1435
"""See SmartClientMediumRequest._read_bytes.
1437
This forwards to self._medium._read_bytes because we are operating
1438
on the mediums stream.
1440
return self._medium._read_bytes(count)
1443
class SmartClientRequestProtocolOne(SmartProtocolBase):
1444
"""The client-side protocol for smart version 1."""
1446
def __init__(self, request):
1447
"""Construct a SmartClientRequestProtocolOne.
1449
:param request: A SmartClientMediumRequest to serialise onto and
1452
self._request = request
1453
self._body_buffer = None
1455
def call(self, *args):
1456
bytes = _encode_tuple(args)
1457
self._request.accept_bytes(bytes)
1458
self._request.finished_writing()
1460
def call_with_body_bytes(self, args, body):
1461
"""Make a remote call of args with body bytes 'body'.
1463
After calling this, call read_response_tuple to find the result out.
1465
bytes = _encode_tuple(args)
1466
self._request.accept_bytes(bytes)
1467
bytes = self._encode_bulk_data(body)
1468
self._request.accept_bytes(bytes)
1469
self._request.finished_writing()
1471
def call_with_body_readv_array(self, args, body):
1472
"""Make a remote call with a readv array.
1474
The body is encoded with one line per readv offset pair. The numbers in
1475
each pair are separated by a comma, and no trailing \n is emitted.
1477
bytes = _encode_tuple(args)
1478
self._request.accept_bytes(bytes)
1479
readv_bytes = self._serialise_offsets(body)
1480
bytes = self._encode_bulk_data(readv_bytes)
1481
self._request.accept_bytes(bytes)
1482
self._request.finished_writing()
1484
def cancel_read_body(self):
1485
"""After expecting a body, a response code may indicate one otherwise.
1487
This method lets the domain client inform the protocol that no body
1488
will be transmitted. This is a terminal method: after calling it the
1489
protocol is not able to be used further.
1491
self._request.finished_reading()
1493
def read_response_tuple(self, expect_body=False):
1494
"""Read a response tuple from the wire.
1496
This should only be called once.
1498
result = self._recv_tuple()
1500
self._request.finished_reading()
1503
def read_body_bytes(self, count=-1):
1504
"""Read bytes from the body, decoding into a byte stream.
1506
We read all bytes at once to ensure we've checked the trailer for
1507
errors, and then feed the buffer back as read_body_bytes is called.
1509
if self._body_buffer is not None:
1510
return self._body_buffer.read(count)
1511
_body_decoder = LengthPrefixedBodyDecoder()
1513
while not _body_decoder.finished_reading:
1514
bytes_wanted = _body_decoder.next_read_size()
1515
bytes = self._request.read_bytes(bytes_wanted)
1516
_body_decoder.accept_bytes(bytes)
1517
self._request.finished_reading()
1518
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1519
# XXX: TODO check the trailer result.
1520
return self._body_buffer.read(count)
1522
def _recv_tuple(self):
1523
"""Receive a tuple from the medium request."""
1525
while not line or line[-1] != '\n':
1526
# TODO: this is inefficient - but tuples are short.
1527
new_char = self._request.read_bytes(1)
1529
assert new_char != '', "end of file reading from server."
1530
return _decode_tuple(line)
1532
def query_version(self):
1533
"""Return protocol version number of the server."""
1535
resp = self.read_response_tuple()
1536
if resp == ('ok', '1'):
1539
raise errors.SmartProtocolError("bad response %r" % (resp,))
1542
class SmartClientMedium(object):
1543
"""Smart client is a medium for sending smart protocol requests over."""
1545
def disconnect(self):
1546
"""If this medium maintains a persistent connection, close it.
1548
The default implementation does nothing.
1552
class SmartClientStreamMedium(SmartClientMedium):
1553
"""Stream based medium common class.
1555
SmartClientStreamMediums operate on a stream. All subclasses use a common
1556
SmartClientStreamMediumRequest for their requests, and should implement
1557
_accept_bytes and _read_bytes to allow the request objects to send and
1562
self._current_request = None
1564
def accept_bytes(self, bytes):
1565
self._accept_bytes(bytes)
1568
"""The SmartClientStreamMedium knows how to close the stream when it is
1574
"""Flush the output stream.
1576
This method is used by the SmartClientStreamMediumRequest to ensure that
1577
all data for a request is sent, to avoid long timeouts or deadlocks.
1579
raise NotImplementedError(self._flush)
1581
def get_request(self):
1582
"""See SmartClientMedium.get_request().
1584
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1587
return SmartClientStreamMediumRequest(self)
1589
def read_bytes(self, count):
1590
return self._read_bytes(count)
1593
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1594
"""A client medium using simple pipes.
1596
This client does not manage the pipes: it assumes they will always be open.
1599
def __init__(self, readable_pipe, writeable_pipe):
1600
SmartClientStreamMedium.__init__(self)
1601
self._readable_pipe = readable_pipe
1602
self._writeable_pipe = writeable_pipe
1604
def _accept_bytes(self, bytes):
1605
"""See SmartClientStreamMedium.accept_bytes."""
1606
self._writeable_pipe.write(bytes)
1609
"""See SmartClientStreamMedium._flush()."""
1610
self._writeable_pipe.flush()
1612
def _read_bytes(self, count):
1613
"""See SmartClientStreamMedium._read_bytes."""
1614
return self._readable_pipe.read(count)
1617
class SmartSSHClientMedium(SmartClientStreamMedium):
1618
"""A client medium using SSH."""
1620
def __init__(self, host, port=None, username=None, password=None,
1622
"""Creates a client that will connect on the first use.
1624
:param vendor: An optional override for the ssh vendor to use. See
1625
bzrlib.transport.ssh for details on ssh vendors.
1627
SmartClientStreamMedium.__init__(self)
1628
self._connected = False
1630
self._password = password
1632
self._username = username
1633
self._read_from = None
1634
self._ssh_connection = None
1635
self._vendor = vendor
1636
self._write_to = None
1638
def _accept_bytes(self, bytes):
1639
"""See SmartClientStreamMedium.accept_bytes."""
1640
self._ensure_connection()
1641
self._write_to.write(bytes)
1643
def disconnect(self):
1644
"""See SmartClientMedium.disconnect()."""
1645
if not self._connected:
1647
self._read_from.close()
1648
self._write_to.close()
1649
self._ssh_connection.close()
1650
self._connected = False
1652
def _ensure_connection(self):
1653
"""Connect this medium if not already connected."""
1656
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1657
if self._vendor is None:
1658
vendor = ssh._get_ssh_vendor()
1660
vendor = self._vendor
1661
self._ssh_connection = vendor.connect_ssh(self._username,
1662
self._password, self._host, self._port,
1663
command=[executable, 'serve', '--inet', '--directory=/',
1665
self._read_from, self._write_to = \
1666
self._ssh_connection.get_filelike_channels()
1667
self._connected = True
1670
"""See SmartClientStreamMedium._flush()."""
1671
self._write_to.flush()
1673
def _read_bytes(self, count):
1674
"""See SmartClientStreamMedium.read_bytes."""
1675
if not self._connected:
1676
raise errors.MediumNotConnected(self)
1677
return self._read_from.read(count)
1680
class SmartTCPClientMedium(SmartClientStreamMedium):
1681
"""A client medium using TCP."""
1683
def __init__(self, host, port):
1684
"""Creates a client that will connect on the first use."""
1685
SmartClientStreamMedium.__init__(self)
1686
self._connected = False
1691
def _accept_bytes(self, bytes):
1692
"""See SmartClientMedium.accept_bytes."""
1693
self._ensure_connection()
1694
self._socket.sendall(bytes)
1696
def disconnect(self):
1697
"""See SmartClientMedium.disconnect()."""
1698
if not self._connected:
1700
self._socket.close()
1702
self._connected = False
1704
def _ensure_connection(self):
1705
"""Connect this medium if not already connected."""
1708
self._socket = socket.socket()
1709
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1710
result = self._socket.connect_ex((self._host, int(self._port)))
1712
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1713
(self._host, self._port, os.strerror(result)))
1714
self._connected = True
1717
"""See SmartClientStreamMedium._flush().
1719
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1720
add a means to do a flush, but that can be done in the future.
1723
def _read_bytes(self, count):
1724
"""See SmartClientMedium.read_bytes."""
1725
if not self._connected:
1726
raise errors.MediumNotConnected(self)
1727
return self._socket.recv(count)
1730
class SmartTCPTransport(SmartTransport):
1731
"""Connection to smart server over plain tcp.
1733
This is essentially just a factory to get 'RemoteTransport(url,
1734
SmartTCPClientMedium).
1737
def __init__(self, url):
1738
_scheme, _username, _password, _host, _port, _path = \
1739
transport.split_url(url)
1742
except (ValueError, TypeError), e:
1743
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1744
medium = SmartTCPClientMedium(_host, _port)
1745
super(SmartTCPTransport, self).__init__(url, medium=medium)
1748
class SmartSSHTransport(SmartTransport):
1749
"""Connection to smart server over SSH.
1751
This is essentially just a factory to get 'RemoteTransport(url,
1752
SmartSSHClientMedium).
1755
def __init__(self, url):
1756
_scheme, _username, _password, _host, _port, _path = \
1757
transport.split_url(url)
1759
if _port is not None:
1761
except (ValueError, TypeError), e:
1762
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1764
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1765
super(SmartSSHTransport, self).__init__(url, medium=medium)
1768
def get_test_permutations():
1769
"""Return (transport, server) permutations for testing."""
1770
### We may need a little more test framework support to construct an
1771
### appropriate RemoteTransport in the future.
1772
return [(SmartTCPTransport, SmartTCPServer_for_testing)]