1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the
51
idea that you have multiple requests and get have a read error because the
52
other side did shutdown sd send. For pipes we have read pipe which will have a
53
zero read which marks end-of-file. For HTTP server environment there is not
54
end-of-stream because each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialisation, deserialisation) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialisation, deserialisation) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
197
# the socket, and it assumes everything is UTF8 sections separated
198
# by \001. Which means a request like '\002' Will abort the connection
199
# because of a UnicodeDecodeError. It does look like invalid data will
200
# kill the SmartServerStreamMedium, but only with an abort + exception, and
201
# the overall server shouldn't die.
203
from cStringIO import StringIO
220
from bzrlib.bundle.serializer import write_bundle
222
# must do this otherwise urllib can't parse the urls properly :(
223
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
224
transport.register_urlparse_netloc_protocol(scheme)
228
def _recv_tuple(from_file):
229
req_line = from_file.readline()
230
return _decode_tuple(req_line)
233
def _decode_tuple(req_line):
234
if req_line == None or req_line == '':
236
if req_line[-1] != '\n':
237
raise errors.SmartProtocolError("request %r not terminated" % req_line)
238
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
241
def _encode_tuple(args):
242
"""Encode the tuple args to a bytestream."""
243
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
246
class SmartProtocolBase(object):
247
"""Methods common to client and server"""
249
# TODO: this only actually accomodates a single block; possibly should
250
# support multiple chunks?
251
def _encode_bulk_data(self, body):
252
"""Encode body as a bulk data chunk."""
253
return ''.join(('%d\n' % len(body), body, 'done\n'))
255
def _serialise_offsets(self, offsets):
256
"""Serialise a readv offset list."""
258
for start, length in offsets:
259
txt.append('%d,%d' % (start, length))
260
return '\n'.join(txt)
263
class SmartServerRequestProtocolOne(SmartProtocolBase):
264
"""Server-side encoding and decoding logic for smart version 1."""
266
def __init__(self, backing_transport, write_func):
267
self._backing_transport = backing_transport
268
self.excess_buffer = ''
269
self._finished_reading = False
271
self.has_dispatched = False
273
self._body_decoder = None
274
self._write_func = write_func
276
def accept_bytes(self, bytes):
277
"""Take bytes, and advance the internal state machine appropriately.
279
:param bytes: must be a byte string
281
assert isinstance(bytes, str)
282
self.in_buffer += bytes
283
if not self.has_dispatched:
284
if '\n' not in self.in_buffer:
285
# no command line yet
287
self.has_dispatched = True
289
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
291
req_args = _decode_tuple(first_line)
292
self.request = SmartServerRequestHandler(
293
self._backing_transport)
294
self.request.dispatch_command(req_args[0], req_args[1:])
295
if self.request.finished_reading:
297
self.excess_buffer = self.in_buffer
299
self._send_response(self.request.response.args,
300
self.request.response.body)
301
self.sync_with_request(self.request)
302
except KeyboardInterrupt:
304
except Exception, exception:
305
# everything else: pass to client, flush, and quit
306
self._send_response(('error', str(exception)))
309
if self.has_dispatched:
310
if self._finished_reading:
311
# nothing to do.XXX: this routine should be a single state
313
self.excess_buffer += self.in_buffer
316
if self._body_decoder is None:
317
self._body_decoder = LengthPrefixedBodyDecoder()
318
self._body_decoder.accept_bytes(self.in_buffer)
319
self.in_buffer = self._body_decoder.unused_data
320
body_data = self._body_decoder.read_pending_data()
321
self.request.accept_body(body_data)
322
if self._body_decoder.finished_reading:
323
self.request.end_of_body()
324
assert self.request.finished_reading, \
325
"no more body, request not finished"
326
self.sync_with_request(self.request)
327
if self.request.response is not None:
328
self._send_response(self.request.response.args,
329
self.request.response.body)
330
self.excess_buffer = self.in_buffer
333
assert not self.request.finished_reading, \
334
"no response and we have finished reading."
336
def _send_response(self, args, body=None):
337
"""Send a smart server response down the output stream."""
338
self._write_func(_encode_tuple(args))
340
assert isinstance(body, str), 'body must be a str'
341
bytes = self._encode_bulk_data(body)
342
self._write_func(bytes)
344
def sync_with_request(self, request):
345
self._finished_reading = request.finished_reading
347
def next_read_size(self):
348
if self._finished_reading:
350
if self._body_decoder is None:
353
return self._body_decoder.next_read_size()
356
class LengthPrefixedBodyDecoder(object):
357
"""Decodes the length-prefixed bulk data."""
360
self.bytes_left = None
361
self.finished_reading = False
362
self.unused_data = ''
363
self.state_accept = self._state_accept_expecting_length
364
self.state_read = self._state_read_no_data
366
self._trailer_buffer = ''
368
def accept_bytes(self, bytes):
369
"""Decode as much of bytes as possible.
371
If 'bytes' contains too much data it will be appended to
374
finished_reading will be set when no more data is required. Further
375
data will be appended to self.unused_data.
377
# accept_bytes is allowed to change the state
378
current_state = self.state_accept
379
self.state_accept(bytes)
380
while current_state != self.state_accept:
381
current_state = self.state_accept
382
self.state_accept('')
384
def next_read_size(self):
385
if self.bytes_left is not None:
386
# Ideally we want to read all the remainder of the body and the
388
return self.bytes_left + 5
389
elif self.state_accept == self._state_accept_reading_trailer:
390
# Just the trailer left
391
return 5 - len(self._trailer_buffer)
392
elif self.state_accept == self._state_accept_expecting_length:
393
# There's still at least 6 bytes left ('\n' to end the length, plus
397
# Reading excess data. Either way, 1 byte at a time is fine.
400
def read_pending_data(self):
401
"""Return any pending data that has been decoded."""
402
return self.state_read()
404
def _state_accept_expecting_length(self, bytes):
405
self._in_buffer += bytes
406
pos = self._in_buffer.find('\n')
409
self.bytes_left = int(self._in_buffer[:pos])
410
self._in_buffer = self._in_buffer[pos+1:]
411
self.bytes_left -= len(self._in_buffer)
412
self.state_accept = self._state_accept_reading_body
413
self.state_read = self._state_read_in_buffer
415
def _state_accept_reading_body(self, bytes):
416
self._in_buffer += bytes
417
self.bytes_left -= len(bytes)
418
if self.bytes_left <= 0:
420
if self.bytes_left != 0:
421
self._trailer_buffer = self._in_buffer[self.bytes_left:]
422
self._in_buffer = self._in_buffer[:self.bytes_left]
423
self.bytes_left = None
424
self.state_accept = self._state_accept_reading_trailer
426
def _state_accept_reading_trailer(self, bytes):
427
self._trailer_buffer += bytes
428
# TODO: what if the trailer does not match "done\n"? Should this raise
429
# a ProtocolViolation exception?
430
if self._trailer_buffer.startswith('done\n'):
431
self.unused_data = self._trailer_buffer[len('done\n'):]
432
self.state_accept = self._state_accept_reading_unused
433
self.finished_reading = True
435
def _state_accept_reading_unused(self, bytes):
436
self.unused_data += bytes
438
def _state_read_no_data(self):
441
def _state_read_in_buffer(self):
442
result = self._in_buffer
447
class SmartServerStreamMedium(object):
448
"""Handles smart commands coming over a stream.
450
The stream may be a pipe connected to sshd, or a tcp socket, or an
451
in-process fifo for testing.
453
One instance is created for each connected client; it can serve multiple
454
requests in the lifetime of the connection.
456
The server passes requests through to an underlying backing transport,
457
which will typically be a LocalTransport looking at the server's filesystem.
460
def __init__(self, backing_transport):
461
"""Construct new server.
463
:param backing_transport: Transport for the directory served.
465
# backing_transport could be passed to serve instead of __init__
466
self.backing_transport = backing_transport
467
self.finished = False
470
"""Serve requests until the client disconnects."""
471
# Keep a reference to stderr because the sys module's globals get set to
472
# None during interpreter shutdown.
473
from sys import stderr
475
while not self.finished:
476
protocol = SmartServerRequestProtocolOne(self.backing_transport,
478
self._serve_one_request(protocol)
480
stderr.write("%s terminating on exception %s\n" % (self, e))
483
def _serve_one_request(self, protocol):
484
"""Read one request from input, process, send back a response.
486
:param protocol: a SmartServerRequestProtocol.
489
self._serve_one_request_unguarded(protocol)
490
except KeyboardInterrupt:
493
self.terminate_due_to_error()
495
def terminate_due_to_error(self):
496
"""Called when an unhandled exception from the protocol occurs."""
497
raise NotImplementedError(self.terminate_due_to_error)
500
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
502
def __init__(self, sock, backing_transport):
505
:param sock: the socket the server will read from. It will be put
508
SmartServerStreamMedium.__init__(self, backing_transport)
510
sock.setblocking(True)
513
def _serve_one_request_unguarded(self, protocol):
514
while protocol.next_read_size():
516
protocol.accept_bytes(self.push_back)
519
bytes = self.socket.recv(4096)
523
protocol.accept_bytes(bytes)
525
self.push_back = protocol.excess_buffer
527
def terminate_due_to_error(self):
528
"""Called when an unhandled exception from the protocol occurs."""
529
# TODO: This should log to a server log file, but no such thing
530
# exists yet. Andrew Bennetts 2006-09-29.
534
def _write_out(self, bytes):
535
self.socket.sendall(bytes)
538
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
540
def __init__(self, in_file, out_file, backing_transport):
541
"""Construct new server.
543
:param in_file: Python file from which requests can be read.
544
:param out_file: Python file to write responses.
545
:param backing_transport: Transport for the directory served.
547
SmartServerStreamMedium.__init__(self, backing_transport)
551
def _serve_one_request_unguarded(self, protocol):
553
bytes_to_read = protocol.next_read_size()
554
if bytes_to_read == 0:
555
# Finished serving this request.
558
bytes = self._in.read(bytes_to_read)
560
# Connection has been closed.
564
protocol.accept_bytes(bytes)
566
def terminate_due_to_error(self):
567
# TODO: This should log to a server log file, but no such thing
568
# exists yet. Andrew Bennetts 2006-09-29.
572
def _write_out(self, bytes):
573
self._out.write(bytes)
576
class SmartServerResponse(object):
577
"""Response generated by SmartServerRequestHandler."""
579
def __init__(self, args, body=None):
583
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
584
# for delivering the data for a request. This could be done with as the
585
# StreamServer, though that would create conflation between request and response
586
# which may be undesirable.
589
class SmartServerRequestHandler(object):
590
"""Protocol logic for smart server.
592
This doesn't handle serialization at all, it just processes requests and
596
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
597
# not contain encoding or decoding logic to allow the wire protocol to vary
598
# from the object protocol: we will want to tweak the wire protocol separate
599
# from the object model, and ideally we will be able to do that without
600
# having a SmartServerRequestHandler subclass for each wire protocol, rather
601
# just a Protocol subclass.
603
# TODO: Better way of representing the body for commands that take it,
604
# and allow it to be streamed into the server.
606
def __init__(self, backing_transport):
607
self._backing_transport = backing_transport
608
self._converted_command = False
609
self.finished_reading = False
610
self._body_bytes = ''
613
def accept_body(self, bytes):
616
This should be overriden for each command that desired body data to
617
handle the right format of that data. I.e. plain bytes, a bundle etc.
619
The deserialisation into that format should be done in the Protocol
620
object. Set self.desired_body_format to the format your method will
623
# default fallback is to accumulate bytes.
624
self._body_bytes += bytes
626
def _end_of_body_handler(self):
627
"""An unimplemented end of body handler."""
628
raise NotImplementedError(self._end_of_body_handler)
631
"""Answer a version request with my version."""
632
return SmartServerResponse(('ok', '1'))
634
def do_has(self, relpath):
635
r = self._backing_transport.has(relpath) and 'yes' or 'no'
636
return SmartServerResponse((r,))
638
def do_get(self, relpath):
639
backing_bytes = self._backing_transport.get_bytes(relpath)
640
return SmartServerResponse(('ok',), backing_bytes)
642
def _deserialise_optional_mode(self, mode):
643
# XXX: FIXME this should be on the protocol object.
649
def do_append(self, relpath, mode):
650
self._converted_command = True
651
self._relpath = relpath
652
self._mode = self._deserialise_optional_mode(mode)
653
self._end_of_body_handler = self._handle_do_append_end
655
def _handle_do_append_end(self):
656
old_length = self._backing_transport.append_bytes(
657
self._relpath, self._body_bytes, self._mode)
658
self.response = SmartServerResponse(('appended', '%d' % old_length))
660
def do_delete(self, relpath):
661
self._backing_transport.delete(relpath)
663
def do_iter_files_recursive(self, abspath):
664
# XXX: the path handling needs some thought.
665
#relpath = self._backing_transport.relpath(abspath)
666
transport = self._backing_transport.clone(abspath)
667
filenames = transport.iter_files_recursive()
668
return SmartServerResponse(('names',) + tuple(filenames))
670
def do_list_dir(self, relpath):
671
filenames = self._backing_transport.list_dir(relpath)
672
return SmartServerResponse(('names',) + tuple(filenames))
674
def do_mkdir(self, relpath, mode):
675
self._backing_transport.mkdir(relpath,
676
self._deserialise_optional_mode(mode))
678
def do_move(self, rel_from, rel_to):
679
self._backing_transport.move(rel_from, rel_to)
681
def do_put(self, relpath, mode):
682
self._converted_command = True
683
self._relpath = relpath
684
self._mode = self._deserialise_optional_mode(mode)
685
self._end_of_body_handler = self._handle_do_put
687
def _handle_do_put(self):
688
self._backing_transport.put_bytes(self._relpath,
689
self._body_bytes, self._mode)
690
self.response = SmartServerResponse(('ok',))
692
def _deserialise_offsets(self, text):
693
# XXX: FIXME this should be on the protocol object.
695
for line in text.split('\n'):
698
start, length = line.split(',')
699
offsets.append((int(start), int(length)))
702
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
703
self._converted_command = True
704
self._end_of_body_handler = self._handle_put_non_atomic
705
self._relpath = relpath
706
self._dir_mode = self._deserialise_optional_mode(dir_mode)
707
self._mode = self._deserialise_optional_mode(mode)
708
# a boolean would be nicer XXX
709
self._create_parent = (create_parent == 'T')
711
def _handle_put_non_atomic(self):
712
self._backing_transport.put_bytes_non_atomic(self._relpath,
715
create_parent_dir=self._create_parent,
716
dir_mode=self._dir_mode)
717
self.response = SmartServerResponse(('ok',))
719
def do_readv(self, relpath):
720
self._converted_command = True
721
self._end_of_body_handler = self._handle_readv_offsets
722
self._relpath = relpath
724
def end_of_body(self):
725
"""No more body data will be received."""
726
self._run_handler_code(self._end_of_body_handler, (), {})
727
# cannot read after this.
728
self.finished_reading = True
730
def _handle_readv_offsets(self):
731
"""accept offsets for a readv request."""
732
offsets = self._deserialise_offsets(self._body_bytes)
733
backing_bytes = ''.join(bytes for offset, bytes in
734
self._backing_transport.readv(self._relpath, offsets))
735
self.response = SmartServerResponse(('readv',), backing_bytes)
737
def do_rename(self, rel_from, rel_to):
738
self._backing_transport.rename(rel_from, rel_to)
740
def do_rmdir(self, relpath):
741
self._backing_transport.rmdir(relpath)
743
def do_stat(self, relpath):
744
stat = self._backing_transport.stat(relpath)
745
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
747
def do_get_bundle(self, path, revision_id):
748
# open transport relative to our base
749
t = self._backing_transport.clone(path)
750
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
751
repo = control.open_repository()
752
tmpf = tempfile.TemporaryFile()
753
base_revision = revision.NULL_REVISION
754
write_bundle(repo, revision_id, base_revision, tmpf)
756
return SmartServerResponse((), tmpf.read())
758
def dispatch_command(self, cmd, args):
759
"""Deprecated compatibility method.""" # XXX XXX
760
func = getattr(self, 'do_' + cmd, None)
762
raise errors.SmartProtocolError("bad request %r" % (cmd,))
763
self._run_handler_code(func, args, {})
765
def _run_handler_code(self, callable, args, kwargs):
766
"""Run some handler specific code 'callable'.
768
If a result is returned, it is considered to be the commands response,
769
and finished_reading is set true, and its assigned to self.response.
771
Any exceptions caught are translated and a response object created
774
result = self._call_converting_errors(callable, args, kwargs)
775
if result is not None:
776
self.response = result
777
self.finished_reading = True
778
# handle unconverted commands
779
if not self._converted_command:
780
self.finished_reading = True
782
self.response = SmartServerResponse(('ok',))
784
def _call_converting_errors(self, callable, args, kwargs):
785
"""Call callable converting errors to Response objects."""
787
return callable(*args, **kwargs)
788
except errors.NoSuchFile, e:
789
return SmartServerResponse(('NoSuchFile', e.path))
790
except errors.FileExists, e:
791
return SmartServerResponse(('FileExists', e.path))
792
except errors.DirectoryNotEmpty, e:
793
return SmartServerResponse(('DirectoryNotEmpty', e.path))
794
except errors.ShortReadvError, e:
795
return SmartServerResponse(('ShortReadvError',
796
e.path, str(e.offset), str(e.length), str(e.actual)))
797
except UnicodeError, e:
798
# If it is a DecodeError, than most likely we are starting
799
# with a plain string
800
str_or_unicode = e.object
801
if isinstance(str_or_unicode, unicode):
802
val = u'u:' + str_or_unicode
804
val = u's:' + str_or_unicode.encode('base64')
805
# This handles UnicodeEncodeError or UnicodeDecodeError
806
return SmartServerResponse((e.__class__.__name__,
807
e.encoding, val, str(e.start), str(e.end), e.reason))
808
except errors.TransportNotPossible, e:
809
if e.msg == "readonly transport":
810
return SmartServerResponse(('ReadOnlyError', ))
815
class SmartTCPServer(object):
816
"""Listens on a TCP socket and accepts connections from smart clients"""
818
def __init__(self, backing_transport, host='127.0.0.1', port=0):
819
"""Construct a new server.
821
To actually start it running, call either start_background_thread or
824
:param host: Name of the interface to listen on.
825
:param port: TCP port to listen on, or 0 to allocate a transient port.
827
self._server_socket = socket.socket()
828
self._server_socket.bind((host, port))
829
self.port = self._server_socket.getsockname()[1]
830
self._server_socket.listen(1)
831
self._server_socket.settimeout(1)
832
self.backing_transport = backing_transport
835
# let connections timeout so that we get a chance to terminate
836
# Keep a reference to the exceptions we want to catch because the socket
837
# module's globals get set to None during interpreter shutdown.
838
from socket import timeout as socket_timeout
839
from socket import error as socket_error
840
self._should_terminate = False
841
while not self._should_terminate:
843
self.accept_and_serve()
844
except socket_timeout:
845
# just check if we're asked to stop
847
except socket_error, e:
848
trace.warning("client disconnected: %s", e)
852
"""Return the url of the server"""
853
return "bzr://%s:%d/" % self._server_socket.getsockname()
855
def accept_and_serve(self):
856
conn, client_addr = self._server_socket.accept()
857
# For WIN32, where the timeout value from the listening socket
858
# propogates to the newly accepted socket.
859
conn.setblocking(True)
860
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
861
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
862
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
863
connection_thread.setDaemon(True)
864
connection_thread.start()
866
def start_background_thread(self):
867
self._server_thread = threading.Thread(None,
869
name='server-' + self.get_url())
870
self._server_thread.setDaemon(True)
871
self._server_thread.start()
873
def stop_background_thread(self):
874
self._should_terminate = True
875
# self._server_socket.close()
876
# we used to join the thread, but it's not really necessary; it will
878
## self._server_thread.join()
881
class SmartTCPServer_for_testing(SmartTCPServer):
882
"""Server suitable for use by transport tests.
884
This server is backed by the process's cwd.
888
self._homedir = os.getcwd()
889
# The server is set up by default like for ssh access: the client
890
# passes filesystem-absolute paths; therefore the server must look
891
# them up relative to the root directory. it might be better to act
892
# a public server and have the server rewrite paths into the test
894
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
897
"""Set up server for testing"""
898
self.start_background_thread()
901
self.stop_background_thread()
904
"""Return the url of the server"""
905
host, port = self._server_socket.getsockname()
906
# XXX: I think this is likely to break on windows -- self._homedir will
907
# have backslashes (and maybe a drive letter?).
908
# -- Andrew Bennetts, 2006-08-29
909
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
911
def get_bogus_url(self):
912
"""Return a URL which will fail to connect"""
913
return 'bzr://127.0.0.1:1/'
916
class SmartStat(object):
918
def __init__(self, size, mode):
923
class SmartTransport(transport.Transport):
924
"""Connection to a smart server.
926
The connection holds references to pipes that can be used to send requests
929
The connection has a notion of the current directory to which it's
930
connected; this is incorporated in filenames passed to the server.
932
This supports some higher-level RPC operations and can also be treated
933
like a Transport to do file-like operations.
935
The connection can be made over a tcp socket, or (in future) an ssh pipe
936
or a series of http requests. There are concrete subclasses for each
937
type: SmartTCPTransport, etc.
940
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
941
# responsibilities: Put those on SmartClient or similar. This is vital for
942
# the ability to support multiple versions of the smart protocol over time:
943
# SmartTransport is an adapter from the Transport object model to the
944
# SmartClient model, not an encoder.
946
def __init__(self, url, clone_from=None, medium=None):
949
:param medium: The medium to use for this RemoteTransport. This must be
950
supplied if clone_from is None.
952
### Technically super() here is faulty because Transport's __init__
953
### fails to take 2 parameters, and if super were to choose a silly
954
### initialisation order things would blow up.
955
if not url.endswith('/'):
957
super(SmartTransport, self).__init__(url)
958
self._scheme, self._username, self._password, self._host, self._port, self._path = \
959
transport.split_url(url)
960
if clone_from is None:
961
self._medium = medium
963
# credentials may be stripped from the base in some circumstances
964
# as yet to be clearly defined or documented, so copy them.
965
self._username = clone_from._username
966
# reuse same connection
967
self._medium = clone_from._medium
968
assert self._medium is not None
970
def abspath(self, relpath):
971
"""Return the full url to the given relative path.
973
@param relpath: the relative path or path components
974
@type relpath: str or list
976
return self._unparse_url(self._remote_path(relpath))
978
def clone(self, relative_url):
979
"""Make a new SmartTransport related to me, sharing the same connection.
981
This essentially opens a handle on a different remote directory.
983
if relative_url is None:
984
return SmartTransport(self.base, self)
986
return SmartTransport(self.abspath(relative_url), self)
988
def is_readonly(self):
989
"""Smart server transport can do read/write file operations."""
992
def get_smart_client(self):
995
def get_smart_medium(self):
998
def _unparse_url(self, path):
999
"""Return URL for a path.
1001
:see: SFTPUrlHandling._unparse_url
1003
# TODO: Eventually it should be possible to unify this with
1004
# SFTPUrlHandling._unparse_url?
1007
path = urllib.quote(path)
1008
netloc = urllib.quote(self._host)
1009
if self._username is not None:
1010
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1011
if self._port is not None:
1012
netloc = '%s:%d' % (netloc, self._port)
1013
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1015
def _remote_path(self, relpath):
1016
"""Returns the Unicode version of the absolute path for relpath."""
1017
return self._combine_paths(self._path, relpath)
1019
def _call(self, method, *args):
1020
resp = self._call2(method, *args)
1021
self._translate_error(resp)
1023
def _call2(self, method, *args):
1024
"""Call a method on the remote server."""
1025
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1026
protocol.call(method, *args)
1027
return protocol.read_response_tuple()
1029
def _call_with_body_bytes(self, method, args, body):
1030
"""Call a method on the remote server with body bytes."""
1031
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1032
protocol.call_with_body_bytes((method, ) + args, body)
1033
return protocol.read_response_tuple()
1035
def has(self, relpath):
1036
"""Indicate whether a remote file of the given name exists or not.
1038
:see: Transport.has()
1040
resp = self._call2('has', self._remote_path(relpath))
1041
if resp == ('yes', ):
1043
elif resp == ('no', ):
1046
self._translate_error(resp)
1048
def get(self, relpath):
1049
"""Return file-like object reading the contents of a remote file.
1051
:see: Transport.get_bytes()/get_file()
1053
return StringIO(self.get_bytes(relpath))
1055
def get_bytes(self, relpath):
1056
remote = self._remote_path(relpath)
1057
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1058
protocol.call('get', remote)
1059
resp = protocol.read_response_tuple(True)
1060
if resp != ('ok', ):
1061
protocol.cancel_read_body()
1062
self._translate_error(resp, relpath)
1063
return protocol.read_body_bytes()
1065
def _serialise_optional_mode(self, mode):
1071
def mkdir(self, relpath, mode=None):
1072
resp = self._call2('mkdir', self._remote_path(relpath),
1073
self._serialise_optional_mode(mode))
1074
self._translate_error(resp)
1076
def put_bytes(self, relpath, upload_contents, mode=None):
1077
# FIXME: upload_file is probably not safe for non-ascii characters -
1078
# should probably just pass all parameters as length-delimited
1080
resp = self._call_with_body_bytes('put',
1081
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1083
self._translate_error(resp)
1085
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1086
create_parent_dir=False,
1088
"""See Transport.put_bytes_non_atomic."""
1089
# FIXME: no encoding in the transport!
1090
create_parent_str = 'F'
1091
if create_parent_dir:
1092
create_parent_str = 'T'
1094
resp = self._call_with_body_bytes(
1096
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1097
create_parent_str, self._serialise_optional_mode(dir_mode)),
1099
self._translate_error(resp)
1101
def put_file(self, relpath, upload_file, mode=None):
1102
# its not ideal to seek back, but currently put_non_atomic_file depends
1103
# on transports not reading before failing - which is a faulty
1104
# assumption I think - RBC 20060915
1105
pos = upload_file.tell()
1107
return self.put_bytes(relpath, upload_file.read(), mode)
1109
upload_file.seek(pos)
1112
def put_file_non_atomic(self, relpath, f, mode=None,
1113
create_parent_dir=False,
1115
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1116
create_parent_dir=create_parent_dir,
1119
def append_file(self, relpath, from_file, mode=None):
1120
return self.append_bytes(relpath, from_file.read(), mode)
1122
def append_bytes(self, relpath, bytes, mode=None):
1123
resp = self._call_with_body_bytes(
1125
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1127
if resp[0] == 'appended':
1129
self._translate_error(resp)
1131
def delete(self, relpath):
1132
resp = self._call2('delete', self._remote_path(relpath))
1133
self._translate_error(resp)
1135
def readv(self, relpath, offsets):
1139
offsets = list(offsets)
1141
sorted_offsets = sorted(offsets)
1142
# turn the list of offsets into a stack
1143
offset_stack = iter(offsets)
1144
cur_offset_and_size = offset_stack.next()
1145
coalesced = list(self._coalesce_offsets(sorted_offsets,
1146
limit=self._max_readv_combine,
1147
fudge_factor=self._bytes_to_read_before_seek))
1149
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1150
protocol.call_with_body_readv_array(
1151
('readv', self._remote_path(relpath)),
1152
[(c.start, c.length) for c in coalesced])
1153
resp = protocol.read_response_tuple(True)
1155
if resp[0] != 'readv':
1156
# This should raise an exception
1157
protocol.cancel_read_body()
1158
self._translate_error(resp)
1161
# FIXME: this should know how many bytes are needed, for clarity.
1162
data = protocol.read_body_bytes()
1163
# Cache the results, but only until they have been fulfilled
1165
for c_offset in coalesced:
1166
if len(data) < c_offset.length:
1167
raise errors.ShortReadvError(relpath, c_offset.start,
1168
c_offset.length, actual=len(data))
1169
for suboffset, subsize in c_offset.ranges:
1170
key = (c_offset.start+suboffset, subsize)
1171
data_map[key] = data[suboffset:suboffset+subsize]
1172
data = data[c_offset.length:]
1174
# Now that we've read some data, see if we can yield anything back
1175
while cur_offset_and_size in data_map:
1176
this_data = data_map.pop(cur_offset_and_size)
1177
yield cur_offset_and_size[0], this_data
1178
cur_offset_and_size = offset_stack.next()
1180
def rename(self, rel_from, rel_to):
1181
self._call('rename',
1182
self._remote_path(rel_from),
1183
self._remote_path(rel_to))
1185
def move(self, rel_from, rel_to):
1187
self._remote_path(rel_from),
1188
self._remote_path(rel_to))
1190
def rmdir(self, relpath):
1191
resp = self._call('rmdir', self._remote_path(relpath))
1193
def _translate_error(self, resp, orig_path=None):
1194
"""Raise an exception from a response"""
1201
elif what == 'NoSuchFile':
1202
if orig_path is not None:
1203
error_path = orig_path
1205
error_path = resp[1]
1206
raise errors.NoSuchFile(error_path)
1207
elif what == 'error':
1208
raise errors.SmartProtocolError(unicode(resp[1]))
1209
elif what == 'FileExists':
1210
raise errors.FileExists(resp[1])
1211
elif what == 'DirectoryNotEmpty':
1212
raise errors.DirectoryNotEmpty(resp[1])
1213
elif what == 'ShortReadvError':
1214
raise errors.ShortReadvError(resp[1], int(resp[2]),
1215
int(resp[3]), int(resp[4]))
1216
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1217
encoding = str(resp[1]) # encoding must always be a string
1219
start = int(resp[3])
1221
reason = str(resp[5]) # reason must always be a string
1222
if val.startswith('u:'):
1224
elif val.startswith('s:'):
1225
val = val[2:].decode('base64')
1226
if what == 'UnicodeDecodeError':
1227
raise UnicodeDecodeError(encoding, val, start, end, reason)
1228
elif what == 'UnicodeEncodeError':
1229
raise UnicodeEncodeError(encoding, val, start, end, reason)
1230
elif what == "ReadOnlyError":
1231
raise errors.TransportNotPossible('readonly transport')
1233
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1235
def disconnect(self):
1236
self._medium.disconnect()
1238
def delete_tree(self, relpath):
1239
raise errors.TransportNotPossible('readonly transport')
1241
def stat(self, relpath):
1242
resp = self._call2('stat', self._remote_path(relpath))
1243
if resp[0] == 'stat':
1244
return SmartStat(int(resp[1]), int(resp[2], 8))
1246
self._translate_error(resp)
1248
## def lock_read(self, relpath):
1249
## """Lock the given file for shared (read) access.
1250
## :return: A lock object, which should be passed to Transport.unlock()
1252
## # The old RemoteBranch ignore lock for reading, so we will
1253
## # continue that tradition and return a bogus lock object.
1254
## class BogusLock(object):
1255
## def __init__(self, path):
1257
## def unlock(self):
1259
## return BogusLock(relpath)
1264
def list_dir(self, relpath):
1265
resp = self._call2('list_dir', self._remote_path(relpath))
1266
if resp[0] == 'names':
1267
return [name.encode('ascii') for name in resp[1:]]
1269
self._translate_error(resp)
1271
def iter_files_recursive(self):
1272
resp = self._call2('iter_files_recursive', self._remote_path(''))
1273
if resp[0] == 'names':
1276
self._translate_error(resp)
1279
class SmartClientMediumRequest(object):
1280
"""A request on a SmartClientMedium.
1282
Each request allows bytes to be provided to it via accept_bytes, and then
1283
the response bytes to be read via read_bytes.
1286
request.accept_bytes('123')
1287
request.finished_writing()
1288
result = request.read_bytes(3)
1289
request.finished_reading()
1291
It is up to the individual SmartClientMedium whether multiple concurrent
1292
requests can exist. See SmartClientMedium.get_request to obtain instances
1293
of SmartClientMediumRequest, and the concrete Medium you are using for
1294
details on concurrency and pipelining.
1297
def __init__(self, medium):
1298
"""Construct a SmartClientMediumRequest for the medium medium."""
1299
self._medium = medium
1300
# we track state by constants - we may want to use the same
1301
# pattern as BodyReader if it gets more complex.
1302
# valid states are: "writing", "reading", "done"
1303
self._state = "writing"
1305
def accept_bytes(self, bytes):
1306
"""Accept bytes for inclusion in this request.
1308
This method may not be be called after finished_writing() has been
1309
called. It depends upon the Medium whether or not the bytes will be
1310
immediately transmitted. Message based Mediums will tend to buffer the
1311
bytes until finished_writing() is called.
1313
:param bytes: A bytestring.
1315
if self._state != "writing":
1316
raise errors.WritingCompleted(self)
1317
self._accept_bytes(bytes)
1319
def _accept_bytes(self, bytes):
1320
"""Helper for accept_bytes.
1322
Accept_bytes checks the state of the request to determing if bytes
1323
should be accepted. After that it hands off to _accept_bytes to do the
1326
raise NotImplementedError(self._accept_bytes)
1328
def finished_reading(self):
1329
"""Inform the request that all desired data has been read.
1331
This will remove the request from the pipeline for its medium (if the
1332
medium supports pipelining) and any further calls to methods on the
1333
request will raise ReadingCompleted.
1335
if self._state == "writing":
1336
raise errors.WritingNotComplete(self)
1337
if self._state != "reading":
1338
raise errors.ReadingCompleted(self)
1339
self._state = "done"
1340
self._finished_reading()
1342
def _finished_reading(self):
1343
"""Helper for finished_reading.
1345
finished_reading checks the state of the request to determine if
1346
finished_reading is allowed, and if it is hands off to _finished_reading
1347
to perform the action.
1349
raise NotImplementedError(self._finished_reading)
1351
def finished_writing(self):
1352
"""Finish the writing phase of this request.
1354
This will flush all pending data for this request along the medium.
1355
After calling finished_writing, you may not call accept_bytes anymore.
1357
if self._state != "writing":
1358
raise errors.WritingCompleted(self)
1359
self._state = "reading"
1360
self._finished_writing()
1362
def _finished_writing(self):
1363
"""Helper for finished_writing.
1365
finished_writing checks the state of the request to determine if
1366
finished_writing is allowed, and if it is hands off to _finished_writing
1367
to perform the action.
1369
raise NotImplementedError(self._finished_writing)
1371
def read_bytes(self, count):
1372
"""Read bytes from this requests response.
1374
This method will block and wait for count bytes to be read. It may not
1375
be invoked until finished_writing() has been called - this is to ensure
1376
a message-based approach to requests, for compatability with message
1377
based mediums like HTTP.
1379
if self._state == "writing":
1380
raise errors.WritingNotComplete(self)
1381
if self._state != "reading":
1382
raise errors.ReadingCompleted(self)
1383
return self._read_bytes(count)
1385
def _read_bytes(self, count):
1386
"""Helper for read_bytes.
1388
read_bytes checks the state of the request to determing if bytes
1389
should be read. After that it hands off to _read_bytes to do the
1392
raise NotImplementedError(self._read_bytes)
1395
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1396
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1398
def __init__(self, medium):
1399
SmartClientMediumRequest.__init__(self, medium)
1400
# check that we are safe concurrency wise. If some streams start
1401
# allowing concurrent requests - i.e. via multiplexing - then this
1402
# assert should be moved to SmartClientStreamMedium.get_request,
1403
# and the setting/unsetting of _current_request likewise moved into
1404
# that class : but its unneeded overhead for now. RBC 20060922
1405
if self._medium._current_request is not None:
1406
raise errors.TooManyConcurrentRequests(self._medium)
1407
self._medium._current_request = self
1409
def _accept_bytes(self, bytes):
1410
"""See SmartClientMediumRequest._accept_bytes.
1412
This forwards to self._medium._accept_bytes because we are operating
1413
on the mediums stream.
1415
self._medium._accept_bytes(bytes)
1417
def _finished_reading(self):
1418
"""See SmartClientMediumRequest._finished_reading.
1420
This clears the _current_request on self._medium to allow a new
1421
request to be created.
1423
assert self._medium._current_request is self
1424
self._medium._current_request = None
1426
def _finished_writing(self):
1427
"""See SmartClientMediumRequest._finished_writing.
1429
This invokes self._medium._flush to ensure all bytes are transmitted.
1431
self._medium._flush()
1433
def _read_bytes(self, count):
1434
"""See SmartClientMediumRequest._read_bytes.
1436
This forwards to self._medium._read_bytes because we are operating
1437
on the mediums stream.
1439
return self._medium._read_bytes(count)
1442
class SmartClientRequestProtocolOne(SmartProtocolBase):
1443
"""The client-side protocol for smart version 1."""
1445
def __init__(self, request):
1446
"""Construct a SmartClientRequestProtocolOne.
1448
:param request: A SmartClientMediumRequest to serialise onto and
1451
self._request = request
1452
self._body_buffer = None
1454
def call(self, *args):
1455
bytes = _encode_tuple(args)
1456
self._request.accept_bytes(bytes)
1457
self._request.finished_writing()
1459
def call_with_body_bytes(self, args, body):
1460
"""Make a remote call of args with body bytes 'body'.
1462
After calling this, call read_response_tuple to find the result out.
1464
bytes = _encode_tuple(args)
1465
self._request.accept_bytes(bytes)
1466
bytes = self._encode_bulk_data(body)
1467
self._request.accept_bytes(bytes)
1468
self._request.finished_writing()
1470
def call_with_body_readv_array(self, args, body):
1471
"""Make a remote call with a readv array.
1473
The body is encoded with one line per readv offset pair. The numbers in
1474
each pair are separated by a comma, and no trailing \n is emitted.
1476
bytes = _encode_tuple(args)
1477
self._request.accept_bytes(bytes)
1478
readv_bytes = self._serialise_offsets(body)
1479
bytes = self._encode_bulk_data(readv_bytes)
1480
self._request.accept_bytes(bytes)
1481
self._request.finished_writing()
1483
def cancel_read_body(self):
1484
"""After expecting a body, a response code may indicate one otherwise.
1486
This method lets the domain client inform the protocol that no body
1487
will be transmitted. This is a terminal method: after calling it the
1488
protocol is not able to be used further.
1490
self._request.finished_reading()
1492
def read_response_tuple(self, expect_body=False):
1493
"""Read a response tuple from the wire.
1495
This should only be called once.
1497
result = self._recv_tuple()
1499
self._request.finished_reading()
1502
def read_body_bytes(self, count=-1):
1503
"""Read bytes from the body, decoding into a byte stream.
1505
We read all bytes at once to ensure we've checked the trailer for
1506
errors, and then feed the buffer back as read_body_bytes is called.
1508
if self._body_buffer is not None:
1509
return self._body_buffer.read(count)
1510
_body_decoder = LengthPrefixedBodyDecoder()
1512
while not _body_decoder.finished_reading:
1513
bytes_wanted = _body_decoder.next_read_size()
1514
bytes = self._request.read_bytes(bytes_wanted)
1515
_body_decoder.accept_bytes(bytes)
1516
self._request.finished_reading()
1517
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1518
# XXX: TODO check the trailer result.
1519
return self._body_buffer.read(count)
1521
def _recv_tuple(self):
1522
"""Receive a tuple from the medium request."""
1524
while not line or line[-1] != '\n':
1525
# TODO: this is inefficient - but tuples are short.
1526
new_char = self._request.read_bytes(1)
1528
assert new_char != '', "end of file reading from server."
1529
return _decode_tuple(line)
1531
def query_version(self):
1532
"""Return protocol version number of the server."""
1534
resp = self.read_response_tuple()
1535
if resp == ('ok', '1'):
1538
raise errors.SmartProtocolError("bad response %r" % (resp,))
1541
class SmartClientMedium(object):
1542
"""Smart client is a medium for sending smart protocol requests over."""
1544
def disconnect(self):
1545
"""If this medium maintains a persistent connection, close it.
1547
The default implementation does nothing.
1551
class SmartClientStreamMedium(SmartClientMedium):
1552
"""Stream based medium common class.
1554
SmartClientStreamMediums operate on a stream. All subclasses use a common
1555
SmartClientStreamMediumRequest for their requests, and should implement
1556
_accept_bytes and _read_bytes to allow the request objects to send and
1561
self._current_request = None
1563
def accept_bytes(self, bytes):
1564
self._accept_bytes(bytes)
1567
"""The SmartClientStreamMedium knows how to close the stream when it is
1573
"""Flush the output stream.
1575
This method is used by the SmartClientStreamMediumRequest to ensure that
1576
all data for a request is sent, to avoid long timeouts or deadlocks.
1578
raise NotImplementedError(self._flush)
1580
def get_request(self):
1581
"""See SmartClientMedium.get_request().
1583
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1586
return SmartClientStreamMediumRequest(self)
1588
def read_bytes(self, count):
1589
return self._read_bytes(count)
1592
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1593
"""A client medium using simple pipes.
1595
This client does not manage the pipes: it assumes they will always be open.
1598
def __init__(self, readable_pipe, writeable_pipe):
1599
SmartClientStreamMedium.__init__(self)
1600
self._readable_pipe = readable_pipe
1601
self._writeable_pipe = writeable_pipe
1603
def _accept_bytes(self, bytes):
1604
"""See SmartClientStreamMedium.accept_bytes."""
1605
self._writeable_pipe.write(bytes)
1608
"""See SmartClientStreamMedium._flush()."""
1609
self._writeable_pipe.flush()
1611
def _read_bytes(self, count):
1612
"""See SmartClientStreamMedium._read_bytes."""
1613
return self._readable_pipe.read(count)
1616
class SmartSSHClientMedium(SmartClientStreamMedium):
1617
"""A client medium using SSH."""
1619
def __init__(self, host, port=None, username=None, password=None,
1621
"""Creates a client that will connect on the first use.
1623
:param vendor: An optional override for the ssh vendor to use. See
1624
bzrlib.transport.ssh for details on ssh vendors.
1626
SmartClientStreamMedium.__init__(self)
1627
self._connected = False
1629
self._password = password
1631
self._username = username
1632
self._read_from = None
1633
self._ssh_connection = None
1634
self._vendor = vendor
1635
self._write_to = None
1637
def _accept_bytes(self, bytes):
1638
"""See SmartClientStreamMedium.accept_bytes."""
1639
self._ensure_connection()
1640
self._write_to.write(bytes)
1642
def disconnect(self):
1643
"""See SmartClientMedium.disconnect()."""
1644
if not self._connected:
1646
self._read_from.close()
1647
self._write_to.close()
1648
self._ssh_connection.close()
1649
self._connected = False
1651
def _ensure_connection(self):
1652
"""Connect this medium if not already connected."""
1655
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1656
if self._vendor is None:
1657
vendor = ssh._get_ssh_vendor()
1659
vendor = self._vendor
1660
self._ssh_connection = vendor.connect_ssh(self._username,
1661
self._password, self._host, self._port,
1662
command=[executable, 'serve', '--inet', '--directory=/',
1664
self._read_from, self._write_to = \
1665
self._ssh_connection.get_filelike_channels()
1666
self._connected = True
1669
"""See SmartClientStreamMedium._flush()."""
1670
self._write_to.flush()
1672
def _read_bytes(self, count):
1673
"""See SmartClientStreamMedium.read_bytes."""
1674
if not self._connected:
1675
raise errors.MediumNotConnected(self)
1676
return self._read_from.read(count)
1679
class SmartTCPClientMedium(SmartClientStreamMedium):
1680
"""A client medium using TCP."""
1682
def __init__(self, host, port):
1683
"""Creates a client that will connect on the first use."""
1684
SmartClientStreamMedium.__init__(self)
1685
self._connected = False
1690
def _accept_bytes(self, bytes):
1691
"""See SmartClientMedium.accept_bytes."""
1692
self._ensure_connection()
1693
self._socket.sendall(bytes)
1695
def disconnect(self):
1696
"""See SmartClientMedium.disconnect()."""
1697
if not self._connected:
1699
self._socket.close()
1701
self._connected = False
1703
def _ensure_connection(self):
1704
"""Connect this medium if not already connected."""
1707
self._socket = socket.socket()
1708
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1709
result = self._socket.connect_ex((self._host, int(self._port)))
1711
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1712
(self._host, self._port, os.strerror(result)))
1713
self._connected = True
1716
"""See SmartClientStreamMedium._flush().
1718
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1719
add a means to do a flush, but that can be done in the future.
1722
def _read_bytes(self, count):
1723
"""See SmartClientMedium.read_bytes."""
1724
if not self._connected:
1725
raise errors.MediumNotConnected(self)
1726
return self._socket.recv(count)
1729
class SmartTCPTransport(SmartTransport):
1730
"""Connection to smart server over plain tcp.
1732
This is essentially just a factory to get 'RemoteTransport(url,
1733
SmartTCPClientMedium).
1736
def __init__(self, url):
1737
_scheme, _username, _password, _host, _port, _path = \
1738
transport.split_url(url)
1741
except (ValueError, TypeError), e:
1742
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1743
medium = SmartTCPClientMedium(_host, _port)
1744
super(SmartTCPTransport, self).__init__(url, medium=medium)
1748
from bzrlib.transport import ssh
1749
except errors.ParamikoNotPresent:
1750
# no paramiko, no SSHTransport.
1753
class SmartSSHTransport(SmartTransport):
1754
"""Connection to smart server over SSH.
1756
This is essentially just a factory to get 'RemoteTransport(url,
1757
SmartSSHClientMedium).
1760
def __init__(self, url):
1761
_scheme, _username, _password, _host, _port, _path = \
1762
transport.split_url(url)
1764
if _port is not None:
1766
except (ValueError, TypeError), e:
1767
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1769
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1770
super(SmartSSHTransport, self).__init__(url, medium=medium)
1773
def get_test_permutations():
1774
"""Return (transport, server) permutations for testing."""
1775
### We may need a little more test framework support to construct an
1776
### appropriate RemoteTransport in the future.
1777
return [(SmartTCPTransport, SmartTCPServer_for_testing)]