1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the idea
51
that you have multiple requests and get a read error because the other side did
52
shutdown. For pipes we have read pipe which will have a zero read which marks
53
end-of-file. For HTTP server environment there is not end-of-stream because
54
each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out requests from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialization, deserialization) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialization, deserialization) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
197
from cStringIO import StringIO
214
from bzrlib.bundle.serializer import write_bundle
216
from bzrlib.transport import ssh
217
except errors.ParamikoNotPresent:
218
# no paramiko. SmartSSHClientMedium will break.
221
# must do this otherwise urllib can't parse the urls properly :(
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
223
transport.register_urlparse_netloc_protocol(scheme)
227
# Port 4155 is the default port for bzr://, registered with IANA.
228
BZR_DEFAULT_PORT = 4155
231
def _recv_tuple(from_file):
232
req_line = from_file.readline()
233
return _decode_tuple(req_line)
236
def _decode_tuple(req_line):
237
if req_line == None or req_line == '':
239
if req_line[-1] != '\n':
240
raise errors.SmartProtocolError("request %r not terminated" % req_line)
241
return tuple(req_line[:-1].split('\x01'))
244
def _encode_tuple(args):
245
"""Encode the tuple args to a bytestream."""
246
return '\x01'.join(args) + '\n'
249
class SmartProtocolBase(object):
250
"""Methods common to client and server"""
252
# TODO: this only actually accomodates a single block; possibly should
253
# support multiple chunks?
254
def _encode_bulk_data(self, body):
255
"""Encode body as a bulk data chunk."""
256
return ''.join(('%d\n' % len(body), body, 'done\n'))
258
def _serialise_offsets(self, offsets):
259
"""Serialise a readv offset list."""
261
for start, length in offsets:
262
txt.append('%d,%d' % (start, length))
263
return '\n'.join(txt)
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
267
"""Server-side encoding and decoding logic for smart version 1."""
269
def __init__(self, backing_transport, write_func):
270
self._backing_transport = backing_transport
271
self.excess_buffer = ''
272
self._finished = False
274
self.has_dispatched = False
276
self._body_decoder = None
277
self._write_func = write_func
279
def accept_bytes(self, bytes):
280
"""Take bytes, and advance the internal state machine appropriately.
282
:param bytes: must be a byte string
284
assert isinstance(bytes, str)
285
self.in_buffer += bytes
286
if not self.has_dispatched:
287
if '\n' not in self.in_buffer:
288
# no command line yet
290
self.has_dispatched = True
292
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
294
req_args = _decode_tuple(first_line)
295
self.request = SmartServerRequestHandler(
296
self._backing_transport)
297
self.request.dispatch_command(req_args[0], req_args[1:])
298
if self.request.finished_reading:
300
self.excess_buffer = self.in_buffer
302
self._send_response(self.request.response.args,
303
self.request.response.body)
304
except KeyboardInterrupt:
306
except Exception, exception:
307
# everything else: pass to client, flush, and quit
308
self._send_response(('error', str(exception)))
311
if self.has_dispatched:
313
# nothing to do.XXX: this routine should be a single state
315
self.excess_buffer += self.in_buffer
318
if self._body_decoder is None:
319
self._body_decoder = LengthPrefixedBodyDecoder()
320
self._body_decoder.accept_bytes(self.in_buffer)
321
self.in_buffer = self._body_decoder.unused_data
322
body_data = self._body_decoder.read_pending_data()
323
self.request.accept_body(body_data)
324
if self._body_decoder.finished_reading:
325
self.request.end_of_body()
326
assert self.request.finished_reading, \
327
"no more body, request not finished"
328
if self.request.response is not None:
329
self._send_response(self.request.response.args,
330
self.request.response.body)
331
self.excess_buffer = self.in_buffer
334
assert not self.request.finished_reading, \
335
"no response and we have finished reading."
337
def _send_response(self, args, body=None):
338
"""Send a smart server response down the output stream."""
339
assert not self._finished, 'response already sent'
340
self._finished = True
341
self._write_func(_encode_tuple(args))
343
assert isinstance(body, str), 'body must be a str'
344
bytes = self._encode_bulk_data(body)
345
self._write_func(bytes)
347
def next_read_size(self):
350
if self._body_decoder is None:
353
return self._body_decoder.next_read_size()
356
class LengthPrefixedBodyDecoder(object):
357
"""Decodes the length-prefixed bulk data."""
360
self.bytes_left = None
361
self.finished_reading = False
362
self.unused_data = ''
363
self.state_accept = self._state_accept_expecting_length
364
self.state_read = self._state_read_no_data
366
self._trailer_buffer = ''
368
def accept_bytes(self, bytes):
369
"""Decode as much of bytes as possible.
371
If 'bytes' contains too much data it will be appended to
374
finished_reading will be set when no more data is required. Further
375
data will be appended to self.unused_data.
377
# accept_bytes is allowed to change the state
378
current_state = self.state_accept
379
self.state_accept(bytes)
380
while current_state != self.state_accept:
381
current_state = self.state_accept
382
self.state_accept('')
384
def next_read_size(self):
385
if self.bytes_left is not None:
386
# Ideally we want to read all the remainder of the body and the
388
return self.bytes_left + 5
389
elif self.state_accept == self._state_accept_reading_trailer:
390
# Just the trailer left
391
return 5 - len(self._trailer_buffer)
392
elif self.state_accept == self._state_accept_expecting_length:
393
# There's still at least 6 bytes left ('\n' to end the length, plus
397
# Reading excess data. Either way, 1 byte at a time is fine.
400
def read_pending_data(self):
401
"""Return any pending data that has been decoded."""
402
return self.state_read()
404
def _state_accept_expecting_length(self, bytes):
405
self._in_buffer += bytes
406
pos = self._in_buffer.find('\n')
409
self.bytes_left = int(self._in_buffer[:pos])
410
self._in_buffer = self._in_buffer[pos+1:]
411
self.bytes_left -= len(self._in_buffer)
412
self.state_accept = self._state_accept_reading_body
413
self.state_read = self._state_read_in_buffer
415
def _state_accept_reading_body(self, bytes):
416
self._in_buffer += bytes
417
self.bytes_left -= len(bytes)
418
if self.bytes_left <= 0:
420
if self.bytes_left != 0:
421
self._trailer_buffer = self._in_buffer[self.bytes_left:]
422
self._in_buffer = self._in_buffer[:self.bytes_left]
423
self.bytes_left = None
424
self.state_accept = self._state_accept_reading_trailer
426
def _state_accept_reading_trailer(self, bytes):
427
self._trailer_buffer += bytes
428
# TODO: what if the trailer does not match "done\n"? Should this raise
429
# a ProtocolViolation exception?
430
if self._trailer_buffer.startswith('done\n'):
431
self.unused_data = self._trailer_buffer[len('done\n'):]
432
self.state_accept = self._state_accept_reading_unused
433
self.finished_reading = True
435
def _state_accept_reading_unused(self, bytes):
436
self.unused_data += bytes
438
def _state_read_no_data(self):
441
def _state_read_in_buffer(self):
442
result = self._in_buffer
447
class SmartServerStreamMedium(object):
448
"""Handles smart commands coming over a stream.
450
The stream may be a pipe connected to sshd, or a tcp socket, or an
451
in-process fifo for testing.
453
One instance is created for each connected client; it can serve multiple
454
requests in the lifetime of the connection.
456
The server passes requests through to an underlying backing transport,
457
which will typically be a LocalTransport looking at the server's filesystem.
460
def __init__(self, backing_transport):
461
"""Construct new server.
463
:param backing_transport: Transport for the directory served.
465
# backing_transport could be passed to serve instead of __init__
466
self.backing_transport = backing_transport
467
self.finished = False
470
"""Serve requests until the client disconnects."""
471
# Keep a reference to stderr because the sys module's globals get set to
472
# None during interpreter shutdown.
473
from sys import stderr
475
while not self.finished:
476
protocol = SmartServerRequestProtocolOne(self.backing_transport,
478
self._serve_one_request(protocol)
480
stderr.write("%s terminating on exception %s\n" % (self, e))
483
def _serve_one_request(self, protocol):
484
"""Read one request from input, process, send back a response.
486
:param protocol: a SmartServerRequestProtocol.
489
self._serve_one_request_unguarded(protocol)
490
except KeyboardInterrupt:
493
self.terminate_due_to_error()
495
def terminate_due_to_error(self):
496
"""Called when an unhandled exception from the protocol occurs."""
497
raise NotImplementedError(self.terminate_due_to_error)
500
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
502
def __init__(self, sock, backing_transport):
505
:param sock: the socket the server will read from. It will be put
508
SmartServerStreamMedium.__init__(self, backing_transport)
510
sock.setblocking(True)
513
def _serve_one_request_unguarded(self, protocol):
514
while protocol.next_read_size():
516
protocol.accept_bytes(self.push_back)
519
bytes = self.socket.recv(4096)
523
protocol.accept_bytes(bytes)
525
self.push_back = protocol.excess_buffer
527
def terminate_due_to_error(self):
528
"""Called when an unhandled exception from the protocol occurs."""
529
# TODO: This should log to a server log file, but no such thing
530
# exists yet. Andrew Bennetts 2006-09-29.
534
def _write_out(self, bytes):
535
self.socket.sendall(bytes)
538
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
540
def __init__(self, in_file, out_file, backing_transport):
541
"""Construct new server.
543
:param in_file: Python file from which requests can be read.
544
:param out_file: Python file to write responses.
545
:param backing_transport: Transport for the directory served.
547
SmartServerStreamMedium.__init__(self, backing_transport)
548
if sys.platform == 'win32':
549
# force binary mode for files
551
for f in (in_file, out_file):
552
fileno = getattr(f, 'fileno', None)
554
msvcrt.setmode(fileno(), os.O_BINARY)
558
def _serve_one_request_unguarded(self, protocol):
560
bytes_to_read = protocol.next_read_size()
561
if bytes_to_read == 0:
562
# Finished serving this request.
565
bytes = self._in.read(bytes_to_read)
567
# Connection has been closed.
571
protocol.accept_bytes(bytes)
573
def terminate_due_to_error(self):
574
# TODO: This should log to a server log file, but no such thing
575
# exists yet. Andrew Bennetts 2006-09-29.
579
def _write_out(self, bytes):
580
self._out.write(bytes)
583
class SmartServerResponse(object):
584
"""Response generated by SmartServerRequestHandler."""
586
def __init__(self, args, body=None):
590
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
591
# for delivering the data for a request. This could be done with as the
592
# StreamServer, though that would create conflation between request and response
593
# which may be undesirable.
596
class SmartServerRequestHandler(object):
597
"""Protocol logic for smart server.
599
This doesn't handle serialization at all, it just processes requests and
603
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
604
# not contain encoding or decoding logic to allow the wire protocol to vary
605
# from the object protocol: we will want to tweak the wire protocol separate
606
# from the object model, and ideally we will be able to do that without
607
# having a SmartServerRequestHandler subclass for each wire protocol, rather
608
# just a Protocol subclass.
610
# TODO: Better way of representing the body for commands that take it,
611
# and allow it to be streamed into the server.
613
def __init__(self, backing_transport):
614
self._backing_transport = backing_transport
615
self._converted_command = False
616
self.finished_reading = False
617
self._body_bytes = ''
620
def accept_body(self, bytes):
623
This should be overriden for each command that desired body data to
624
handle the right format of that data. I.e. plain bytes, a bundle etc.
626
The deserialisation into that format should be done in the Protocol
627
object. Set self.desired_body_format to the format your method will
630
# default fallback is to accumulate bytes.
631
self._body_bytes += bytes
633
def _end_of_body_handler(self):
634
"""An unimplemented end of body handler."""
635
raise NotImplementedError(self._end_of_body_handler)
638
"""Answer a version request with my version."""
639
return SmartServerResponse(('ok', '1'))
641
def do_has(self, relpath):
642
r = self._backing_transport.has(relpath) and 'yes' or 'no'
643
return SmartServerResponse((r,))
645
def do_get(self, relpath):
646
backing_bytes = self._backing_transport.get_bytes(relpath)
647
return SmartServerResponse(('ok',), backing_bytes)
649
def _deserialise_optional_mode(self, mode):
650
# XXX: FIXME this should be on the protocol object.
656
def do_append(self, relpath, mode):
657
self._converted_command = True
658
self._relpath = relpath
659
self._mode = self._deserialise_optional_mode(mode)
660
self._end_of_body_handler = self._handle_do_append_end
662
def _handle_do_append_end(self):
663
old_length = self._backing_transport.append_bytes(
664
self._relpath, self._body_bytes, self._mode)
665
self.response = SmartServerResponse(('appended', '%d' % old_length))
667
def do_delete(self, relpath):
668
self._backing_transport.delete(relpath)
670
def do_iter_files_recursive(self, relpath):
671
transport = self._backing_transport.clone(relpath)
672
filenames = transport.iter_files_recursive()
673
return SmartServerResponse(('names',) + tuple(filenames))
675
def do_list_dir(self, relpath):
676
filenames = self._backing_transport.list_dir(relpath)
677
return SmartServerResponse(('names',) + tuple(filenames))
679
def do_mkdir(self, relpath, mode):
680
self._backing_transport.mkdir(relpath,
681
self._deserialise_optional_mode(mode))
683
def do_move(self, rel_from, rel_to):
684
self._backing_transport.move(rel_from, rel_to)
686
def do_put(self, relpath, mode):
687
self._converted_command = True
688
self._relpath = relpath
689
self._mode = self._deserialise_optional_mode(mode)
690
self._end_of_body_handler = self._handle_do_put
692
def _handle_do_put(self):
693
self._backing_transport.put_bytes(self._relpath,
694
self._body_bytes, self._mode)
695
self.response = SmartServerResponse(('ok',))
697
def _deserialise_offsets(self, text):
698
# XXX: FIXME this should be on the protocol object.
700
for line in text.split('\n'):
703
start, length = line.split(',')
704
offsets.append((int(start), int(length)))
707
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
708
self._converted_command = True
709
self._end_of_body_handler = self._handle_put_non_atomic
710
self._relpath = relpath
711
self._dir_mode = self._deserialise_optional_mode(dir_mode)
712
self._mode = self._deserialise_optional_mode(mode)
713
# a boolean would be nicer XXX
714
self._create_parent = (create_parent == 'T')
716
def _handle_put_non_atomic(self):
717
self._backing_transport.put_bytes_non_atomic(self._relpath,
720
create_parent_dir=self._create_parent,
721
dir_mode=self._dir_mode)
722
self.response = SmartServerResponse(('ok',))
724
def do_readv(self, relpath):
725
self._converted_command = True
726
self._end_of_body_handler = self._handle_readv_offsets
727
self._relpath = relpath
729
def end_of_body(self):
730
"""No more body data will be received."""
731
self._run_handler_code(self._end_of_body_handler, (), {})
732
# cannot read after this.
733
self.finished_reading = True
735
def _handle_readv_offsets(self):
736
"""accept offsets for a readv request."""
737
offsets = self._deserialise_offsets(self._body_bytes)
738
backing_bytes = ''.join(bytes for offset, bytes in
739
self._backing_transport.readv(self._relpath, offsets))
740
self.response = SmartServerResponse(('readv',), backing_bytes)
742
def do_rename(self, rel_from, rel_to):
743
self._backing_transport.rename(rel_from, rel_to)
745
def do_rmdir(self, relpath):
746
self._backing_transport.rmdir(relpath)
748
def do_stat(self, relpath):
749
stat = self._backing_transport.stat(relpath)
750
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
752
def do_get_bundle(self, path, revision_id):
753
# open transport relative to our base
754
t = self._backing_transport.clone(path)
755
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
756
repo = control.open_repository()
757
tmpf = tempfile.TemporaryFile()
758
base_revision = revision.NULL_REVISION
759
write_bundle(repo, revision_id, base_revision, tmpf)
761
return SmartServerResponse((), tmpf.read())
763
def dispatch_command(self, cmd, args):
764
"""Deprecated compatibility method.""" # XXX XXX
765
func = getattr(self, 'do_' + cmd, None)
767
raise errors.SmartProtocolError("bad request %r" % (cmd,))
768
self._run_handler_code(func, args, {})
770
def _run_handler_code(self, callable, args, kwargs):
771
"""Run some handler specific code 'callable'.
773
If a result is returned, it is considered to be the commands response,
774
and finished_reading is set true, and its assigned to self.response.
776
Any exceptions caught are translated and a response object created
779
result = self._call_converting_errors(callable, args, kwargs)
780
if result is not None:
781
self.response = result
782
self.finished_reading = True
783
# handle unconverted commands
784
if not self._converted_command:
785
self.finished_reading = True
787
self.response = SmartServerResponse(('ok',))
789
def _call_converting_errors(self, callable, args, kwargs):
790
"""Call callable converting errors to Response objects."""
792
return callable(*args, **kwargs)
793
except errors.NoSuchFile, e:
794
return SmartServerResponse(('NoSuchFile', e.path))
795
except errors.FileExists, e:
796
return SmartServerResponse(('FileExists', e.path))
797
except errors.DirectoryNotEmpty, e:
798
return SmartServerResponse(('DirectoryNotEmpty', e.path))
799
except errors.ShortReadvError, e:
800
return SmartServerResponse(('ShortReadvError',
801
e.path, str(e.offset), str(e.length), str(e.actual)))
802
except UnicodeError, e:
803
# If it is a DecodeError, than most likely we are starting
804
# with a plain string
805
str_or_unicode = e.object
806
if isinstance(str_or_unicode, unicode):
807
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
808
# should escape it somehow.
809
val = 'u:' + str_or_unicode.encode('utf-8')
811
val = 's:' + str_or_unicode.encode('base64')
812
# This handles UnicodeEncodeError or UnicodeDecodeError
813
return SmartServerResponse((e.__class__.__name__,
814
e.encoding, val, str(e.start), str(e.end), e.reason))
815
except errors.TransportNotPossible, e:
816
if e.msg == "readonly transport":
817
return SmartServerResponse(('ReadOnlyError', ))
822
class SmartTCPServer(object):
823
"""Listens on a TCP socket and accepts connections from smart clients"""
825
def __init__(self, backing_transport, host='127.0.0.1', port=0):
826
"""Construct a new server.
828
To actually start it running, call either start_background_thread or
831
:param host: Name of the interface to listen on.
832
:param port: TCP port to listen on, or 0 to allocate a transient port.
834
self._server_socket = socket.socket()
835
self._server_socket.bind((host, port))
836
self.port = self._server_socket.getsockname()[1]
837
self._server_socket.listen(1)
838
self._server_socket.settimeout(1)
839
self.backing_transport = backing_transport
842
# let connections timeout so that we get a chance to terminate
843
# Keep a reference to the exceptions we want to catch because the socket
844
# module's globals get set to None during interpreter shutdown.
845
from socket import timeout as socket_timeout
846
from socket import error as socket_error
847
self._should_terminate = False
848
while not self._should_terminate:
850
self.accept_and_serve()
851
except socket_timeout:
852
# just check if we're asked to stop
854
except socket_error, e:
855
trace.warning("client disconnected: %s", e)
859
"""Return the url of the server"""
860
return "bzr://%s:%d/" % self._server_socket.getsockname()
862
def accept_and_serve(self):
863
conn, client_addr = self._server_socket.accept()
864
# For WIN32, where the timeout value from the listening socket
865
# propogates to the newly accepted socket.
866
conn.setblocking(True)
867
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
868
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
869
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
870
connection_thread.setDaemon(True)
871
connection_thread.start()
873
def start_background_thread(self):
874
self._server_thread = threading.Thread(None,
876
name='server-' + self.get_url())
877
self._server_thread.setDaemon(True)
878
self._server_thread.start()
880
def stop_background_thread(self):
881
self._should_terminate = True
882
# At one point we would wait to join the threads here, but it looks
883
# like they don't actually exit. So now we just leave them running
884
# and expect to terminate the process. -- mbp 20070215
885
# self._server_socket.close()
886
## sys.stderr.write("waiting for server thread to finish...")
887
## self._server_thread.join()
890
class SmartTCPServer_for_testing(SmartTCPServer):
891
"""Server suitable for use by transport tests.
893
This server is backed by the process's cwd.
897
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
898
# The server is set up by default like for ssh access: the client
899
# passes filesystem-absolute paths; therefore the server must look
900
# them up relative to the root directory. it might be better to act
901
# a public server and have the server rewrite paths into the test
903
SmartTCPServer.__init__(self,
904
transport.get_transport(urlutils.local_path_to_url('/')))
907
"""Set up server for testing"""
908
self.start_background_thread()
911
self.stop_background_thread()
914
"""Return the url of the server"""
915
host, port = self._server_socket.getsockname()
916
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
918
def get_bogus_url(self):
919
"""Return a URL which will fail to connect"""
920
return 'bzr://127.0.0.1:1/'
923
class SmartStat(object):
925
def __init__(self, size, mode):
930
class SmartTransport(transport.Transport):
931
"""Connection to a smart server.
933
The connection holds references to pipes that can be used to send requests
936
The connection has a notion of the current directory to which it's
937
connected; this is incorporated in filenames passed to the server.
939
This supports some higher-level RPC operations and can also be treated
940
like a Transport to do file-like operations.
942
The connection can be made over a tcp socket, or (in future) an ssh pipe
943
or a series of http requests. There are concrete subclasses for each
944
type: SmartTCPTransport, etc.
947
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
948
# responsibilities: Put those on SmartClient or similar. This is vital for
949
# the ability to support multiple versions of the smart protocol over time:
950
# SmartTransport is an adapter from the Transport object model to the
951
# SmartClient model, not an encoder.
953
def __init__(self, url, clone_from=None, medium=None):
956
:param medium: The medium to use for this RemoteTransport. This must be
957
supplied if clone_from is None.
959
### Technically super() here is faulty because Transport's __init__
960
### fails to take 2 parameters, and if super were to choose a silly
961
### initialisation order things would blow up.
962
if not url.endswith('/'):
964
super(SmartTransport, self).__init__(url)
965
self._scheme, self._username, self._password, self._host, self._port, self._path = \
966
transport.split_url(url)
967
if clone_from is None:
968
self._medium = medium
970
# credentials may be stripped from the base in some circumstances
971
# as yet to be clearly defined or documented, so copy them.
972
self._username = clone_from._username
973
# reuse same connection
974
self._medium = clone_from._medium
975
assert self._medium is not None
977
def abspath(self, relpath):
978
"""Return the full url to the given relative path.
980
@param relpath: the relative path or path components
981
@type relpath: str or list
983
return self._unparse_url(self._remote_path(relpath))
985
def clone(self, relative_url):
986
"""Make a new SmartTransport related to me, sharing the same connection.
988
This essentially opens a handle on a different remote directory.
990
if relative_url is None:
991
return SmartTransport(self.base, self)
993
return SmartTransport(self.abspath(relative_url), self)
995
def is_readonly(self):
996
"""Smart server transport can do read/write file operations."""
999
def get_smart_client(self):
1002
def get_smart_medium(self):
1005
def _unparse_url(self, path):
1006
"""Return URL for a path.
1008
:see: SFTPUrlHandling._unparse_url
1010
# TODO: Eventually it should be possible to unify this with
1011
# SFTPUrlHandling._unparse_url?
1014
path = urllib.quote(path)
1015
netloc = urllib.quote(self._host)
1016
if self._username is not None:
1017
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1018
if self._port is not None:
1019
netloc = '%s:%d' % (netloc, self._port)
1020
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1022
def _remote_path(self, relpath):
1023
"""Returns the Unicode version of the absolute path for relpath."""
1024
return self._combine_paths(self._path, relpath)
1026
def _call(self, method, *args):
1027
resp = self._call2(method, *args)
1028
self._translate_error(resp)
1030
def _call2(self, method, *args):
1031
"""Call a method on the remote server."""
1032
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1033
protocol.call(method, *args)
1034
return protocol.read_response_tuple()
1036
def _call_with_body_bytes(self, method, args, body):
1037
"""Call a method on the remote server with body bytes."""
1038
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1039
protocol.call_with_body_bytes((method, ) + args, body)
1040
return protocol.read_response_tuple()
1042
def has(self, relpath):
1043
"""Indicate whether a remote file of the given name exists or not.
1045
:see: Transport.has()
1047
resp = self._call2('has', self._remote_path(relpath))
1048
if resp == ('yes', ):
1050
elif resp == ('no', ):
1053
self._translate_error(resp)
1055
def get(self, relpath):
1056
"""Return file-like object reading the contents of a remote file.
1058
:see: Transport.get_bytes()/get_file()
1060
return StringIO(self.get_bytes(relpath))
1062
def get_bytes(self, relpath):
1063
remote = self._remote_path(relpath)
1064
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1065
protocol.call('get', remote)
1066
resp = protocol.read_response_tuple(True)
1067
if resp != ('ok', ):
1068
protocol.cancel_read_body()
1069
self._translate_error(resp, relpath)
1070
return protocol.read_body_bytes()
1072
def _serialise_optional_mode(self, mode):
1078
def mkdir(self, relpath, mode=None):
1079
resp = self._call2('mkdir', self._remote_path(relpath),
1080
self._serialise_optional_mode(mode))
1081
self._translate_error(resp)
1083
def put_bytes(self, relpath, upload_contents, mode=None):
1084
# FIXME: upload_file is probably not safe for non-ascii characters -
1085
# should probably just pass all parameters as length-delimited
1087
resp = self._call_with_body_bytes('put',
1088
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1090
self._translate_error(resp)
1092
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1093
create_parent_dir=False,
1095
"""See Transport.put_bytes_non_atomic."""
1096
# FIXME: no encoding in the transport!
1097
create_parent_str = 'F'
1098
if create_parent_dir:
1099
create_parent_str = 'T'
1101
resp = self._call_with_body_bytes(
1103
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1104
create_parent_str, self._serialise_optional_mode(dir_mode)),
1106
self._translate_error(resp)
1108
def put_file(self, relpath, upload_file, mode=None):
1109
# its not ideal to seek back, but currently put_non_atomic_file depends
1110
# on transports not reading before failing - which is a faulty
1111
# assumption I think - RBC 20060915
1112
pos = upload_file.tell()
1114
return self.put_bytes(relpath, upload_file.read(), mode)
1116
upload_file.seek(pos)
1119
def put_file_non_atomic(self, relpath, f, mode=None,
1120
create_parent_dir=False,
1122
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1123
create_parent_dir=create_parent_dir,
1126
def append_file(self, relpath, from_file, mode=None):
1127
return self.append_bytes(relpath, from_file.read(), mode)
1129
def append_bytes(self, relpath, bytes, mode=None):
1130
resp = self._call_with_body_bytes(
1132
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1134
if resp[0] == 'appended':
1136
self._translate_error(resp)
1138
def delete(self, relpath):
1139
resp = self._call2('delete', self._remote_path(relpath))
1140
self._translate_error(resp)
1142
def readv(self, relpath, offsets):
1146
offsets = list(offsets)
1148
sorted_offsets = sorted(offsets)
1149
# turn the list of offsets into a stack
1150
offset_stack = iter(offsets)
1151
cur_offset_and_size = offset_stack.next()
1152
coalesced = list(self._coalesce_offsets(sorted_offsets,
1153
limit=self._max_readv_combine,
1154
fudge_factor=self._bytes_to_read_before_seek))
1156
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1157
protocol.call_with_body_readv_array(
1158
('readv', self._remote_path(relpath)),
1159
[(c.start, c.length) for c in coalesced])
1160
resp = protocol.read_response_tuple(True)
1162
if resp[0] != 'readv':
1163
# This should raise an exception
1164
protocol.cancel_read_body()
1165
self._translate_error(resp)
1168
# FIXME: this should know how many bytes are needed, for clarity.
1169
data = protocol.read_body_bytes()
1170
# Cache the results, but only until they have been fulfilled
1172
for c_offset in coalesced:
1173
if len(data) < c_offset.length:
1174
raise errors.ShortReadvError(relpath, c_offset.start,
1175
c_offset.length, actual=len(data))
1176
for suboffset, subsize in c_offset.ranges:
1177
key = (c_offset.start+suboffset, subsize)
1178
data_map[key] = data[suboffset:suboffset+subsize]
1179
data = data[c_offset.length:]
1181
# Now that we've read some data, see if we can yield anything back
1182
while cur_offset_and_size in data_map:
1183
this_data = data_map.pop(cur_offset_and_size)
1184
yield cur_offset_and_size[0], this_data
1185
cur_offset_and_size = offset_stack.next()
1187
def rename(self, rel_from, rel_to):
1188
self._call('rename',
1189
self._remote_path(rel_from),
1190
self._remote_path(rel_to))
1192
def move(self, rel_from, rel_to):
1194
self._remote_path(rel_from),
1195
self._remote_path(rel_to))
1197
def rmdir(self, relpath):
1198
resp = self._call('rmdir', self._remote_path(relpath))
1200
def _translate_error(self, resp, orig_path=None):
1201
"""Raise an exception from a response"""
1208
elif what == 'NoSuchFile':
1209
if orig_path is not None:
1210
error_path = orig_path
1212
error_path = resp[1]
1213
raise errors.NoSuchFile(error_path)
1214
elif what == 'error':
1215
raise errors.SmartProtocolError(unicode(resp[1]))
1216
elif what == 'FileExists':
1217
raise errors.FileExists(resp[1])
1218
elif what == 'DirectoryNotEmpty':
1219
raise errors.DirectoryNotEmpty(resp[1])
1220
elif what == 'ShortReadvError':
1221
raise errors.ShortReadvError(resp[1], int(resp[2]),
1222
int(resp[3]), int(resp[4]))
1223
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1224
encoding = str(resp[1]) # encoding must always be a string
1226
start = int(resp[3])
1228
reason = str(resp[5]) # reason must always be a string
1229
if val.startswith('u:'):
1230
val = val[2:].decode('utf-8')
1231
elif val.startswith('s:'):
1232
val = val[2:].decode('base64')
1233
if what == 'UnicodeDecodeError':
1234
raise UnicodeDecodeError(encoding, val, start, end, reason)
1235
elif what == 'UnicodeEncodeError':
1236
raise UnicodeEncodeError(encoding, val, start, end, reason)
1237
elif what == "ReadOnlyError":
1238
raise errors.TransportNotPossible('readonly transport')
1240
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1242
def disconnect(self):
1243
self._medium.disconnect()
1245
def delete_tree(self, relpath):
1246
raise errors.TransportNotPossible('readonly transport')
1248
def stat(self, relpath):
1249
resp = self._call2('stat', self._remote_path(relpath))
1250
if resp[0] == 'stat':
1251
return SmartStat(int(resp[1]), int(resp[2], 8))
1253
self._translate_error(resp)
1255
## def lock_read(self, relpath):
1256
## """Lock the given file for shared (read) access.
1257
## :return: A lock object, which should be passed to Transport.unlock()
1259
## # The old RemoteBranch ignore lock for reading, so we will
1260
## # continue that tradition and return a bogus lock object.
1261
## class BogusLock(object):
1262
## def __init__(self, path):
1264
## def unlock(self):
1266
## return BogusLock(relpath)
1271
def list_dir(self, relpath):
1272
resp = self._call2('list_dir', self._remote_path(relpath))
1273
if resp[0] == 'names':
1274
return [name.encode('ascii') for name in resp[1:]]
1276
self._translate_error(resp)
1278
def iter_files_recursive(self):
1279
resp = self._call2('iter_files_recursive', self._remote_path(''))
1280
if resp[0] == 'names':
1283
self._translate_error(resp)
1286
class SmartClientMediumRequest(object):
1287
"""A request on a SmartClientMedium.
1289
Each request allows bytes to be provided to it via accept_bytes, and then
1290
the response bytes to be read via read_bytes.
1293
request.accept_bytes('123')
1294
request.finished_writing()
1295
result = request.read_bytes(3)
1296
request.finished_reading()
1298
It is up to the individual SmartClientMedium whether multiple concurrent
1299
requests can exist. See SmartClientMedium.get_request to obtain instances
1300
of SmartClientMediumRequest, and the concrete Medium you are using for
1301
details on concurrency and pipelining.
1304
def __init__(self, medium):
1305
"""Construct a SmartClientMediumRequest for the medium medium."""
1306
self._medium = medium
1307
# we track state by constants - we may want to use the same
1308
# pattern as BodyReader if it gets more complex.
1309
# valid states are: "writing", "reading", "done"
1310
self._state = "writing"
1312
def accept_bytes(self, bytes):
1313
"""Accept bytes for inclusion in this request.
1315
This method may not be be called after finished_writing() has been
1316
called. It depends upon the Medium whether or not the bytes will be
1317
immediately transmitted. Message based Mediums will tend to buffer the
1318
bytes until finished_writing() is called.
1320
:param bytes: A bytestring.
1322
if self._state != "writing":
1323
raise errors.WritingCompleted(self)
1324
self._accept_bytes(bytes)
1326
def _accept_bytes(self, bytes):
1327
"""Helper for accept_bytes.
1329
Accept_bytes checks the state of the request to determing if bytes
1330
should be accepted. After that it hands off to _accept_bytes to do the
1333
raise NotImplementedError(self._accept_bytes)
1335
def finished_reading(self):
1336
"""Inform the request that all desired data has been read.
1338
This will remove the request from the pipeline for its medium (if the
1339
medium supports pipelining) and any further calls to methods on the
1340
request will raise ReadingCompleted.
1342
if self._state == "writing":
1343
raise errors.WritingNotComplete(self)
1344
if self._state != "reading":
1345
raise errors.ReadingCompleted(self)
1346
self._state = "done"
1347
self._finished_reading()
1349
def _finished_reading(self):
1350
"""Helper for finished_reading.
1352
finished_reading checks the state of the request to determine if
1353
finished_reading is allowed, and if it is hands off to _finished_reading
1354
to perform the action.
1356
raise NotImplementedError(self._finished_reading)
1358
def finished_writing(self):
1359
"""Finish the writing phase of this request.
1361
This will flush all pending data for this request along the medium.
1362
After calling finished_writing, you may not call accept_bytes anymore.
1364
if self._state != "writing":
1365
raise errors.WritingCompleted(self)
1366
self._state = "reading"
1367
self._finished_writing()
1369
def _finished_writing(self):
1370
"""Helper for finished_writing.
1372
finished_writing checks the state of the request to determine if
1373
finished_writing is allowed, and if it is hands off to _finished_writing
1374
to perform the action.
1376
raise NotImplementedError(self._finished_writing)
1378
def read_bytes(self, count):
1379
"""Read bytes from this requests response.
1381
This method will block and wait for count bytes to be read. It may not
1382
be invoked until finished_writing() has been called - this is to ensure
1383
a message-based approach to requests, for compatability with message
1384
based mediums like HTTP.
1386
if self._state == "writing":
1387
raise errors.WritingNotComplete(self)
1388
if self._state != "reading":
1389
raise errors.ReadingCompleted(self)
1390
return self._read_bytes(count)
1392
def _read_bytes(self, count):
1393
"""Helper for read_bytes.
1395
read_bytes checks the state of the request to determing if bytes
1396
should be read. After that it hands off to _read_bytes to do the
1399
raise NotImplementedError(self._read_bytes)
1402
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1403
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1405
def __init__(self, medium):
1406
SmartClientMediumRequest.__init__(self, medium)
1407
# check that we are safe concurrency wise. If some streams start
1408
# allowing concurrent requests - i.e. via multiplexing - then this
1409
# assert should be moved to SmartClientStreamMedium.get_request,
1410
# and the setting/unsetting of _current_request likewise moved into
1411
# that class : but its unneeded overhead for now. RBC 20060922
1412
if self._medium._current_request is not None:
1413
raise errors.TooManyConcurrentRequests(self._medium)
1414
self._medium._current_request = self
1416
def _accept_bytes(self, bytes):
1417
"""See SmartClientMediumRequest._accept_bytes.
1419
This forwards to self._medium._accept_bytes because we are operating
1420
on the mediums stream.
1422
self._medium._accept_bytes(bytes)
1424
def _finished_reading(self):
1425
"""See SmartClientMediumRequest._finished_reading.
1427
This clears the _current_request on self._medium to allow a new
1428
request to be created.
1430
assert self._medium._current_request is self
1431
self._medium._current_request = None
1433
def _finished_writing(self):
1434
"""See SmartClientMediumRequest._finished_writing.
1436
This invokes self._medium._flush to ensure all bytes are transmitted.
1438
self._medium._flush()
1440
def _read_bytes(self, count):
1441
"""See SmartClientMediumRequest._read_bytes.
1443
This forwards to self._medium._read_bytes because we are operating
1444
on the mediums stream.
1446
return self._medium._read_bytes(count)
1449
class SmartClientRequestProtocolOne(SmartProtocolBase):
1450
"""The client-side protocol for smart version 1."""
1452
def __init__(self, request):
1453
"""Construct a SmartClientRequestProtocolOne.
1455
:param request: A SmartClientMediumRequest to serialise onto and
1458
self._request = request
1459
self._body_buffer = None
1461
def call(self, *args):
1462
bytes = _encode_tuple(args)
1463
self._request.accept_bytes(bytes)
1464
self._request.finished_writing()
1466
def call_with_body_bytes(self, args, body):
1467
"""Make a remote call of args with body bytes 'body'.
1469
After calling this, call read_response_tuple to find the result out.
1471
bytes = _encode_tuple(args)
1472
self._request.accept_bytes(bytes)
1473
bytes = self._encode_bulk_data(body)
1474
self._request.accept_bytes(bytes)
1475
self._request.finished_writing()
1477
def call_with_body_readv_array(self, args, body):
1478
"""Make a remote call with a readv array.
1480
The body is encoded with one line per readv offset pair. The numbers in
1481
each pair are separated by a comma, and no trailing \n is emitted.
1483
bytes = _encode_tuple(args)
1484
self._request.accept_bytes(bytes)
1485
readv_bytes = self._serialise_offsets(body)
1486
bytes = self._encode_bulk_data(readv_bytes)
1487
self._request.accept_bytes(bytes)
1488
self._request.finished_writing()
1490
def cancel_read_body(self):
1491
"""After expecting a body, a response code may indicate one otherwise.
1493
This method lets the domain client inform the protocol that no body
1494
will be transmitted. This is a terminal method: after calling it the
1495
protocol is not able to be used further.
1497
self._request.finished_reading()
1499
def read_response_tuple(self, expect_body=False):
1500
"""Read a response tuple from the wire.
1502
This should only be called once.
1504
result = self._recv_tuple()
1506
self._request.finished_reading()
1509
def read_body_bytes(self, count=-1):
1510
"""Read bytes from the body, decoding into a byte stream.
1512
We read all bytes at once to ensure we've checked the trailer for
1513
errors, and then feed the buffer back as read_body_bytes is called.
1515
if self._body_buffer is not None:
1516
return self._body_buffer.read(count)
1517
_body_decoder = LengthPrefixedBodyDecoder()
1519
while not _body_decoder.finished_reading:
1520
bytes_wanted = _body_decoder.next_read_size()
1521
bytes = self._request.read_bytes(bytes_wanted)
1522
_body_decoder.accept_bytes(bytes)
1523
self._request.finished_reading()
1524
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1525
# XXX: TODO check the trailer result.
1526
return self._body_buffer.read(count)
1528
def _recv_tuple(self):
1529
"""Receive a tuple from the medium request."""
1531
while not line or line[-1] != '\n':
1532
# TODO: this is inefficient - but tuples are short.
1533
new_char = self._request.read_bytes(1)
1535
assert new_char != '', "end of file reading from server."
1536
return _decode_tuple(line)
1538
def query_version(self):
1539
"""Return protocol version number of the server."""
1541
resp = self.read_response_tuple()
1542
if resp == ('ok', '1'):
1545
raise errors.SmartProtocolError("bad response %r" % (resp,))
1548
class SmartClientMedium(object):
1549
"""Smart client is a medium for sending smart protocol requests over."""
1551
def disconnect(self):
1552
"""If this medium maintains a persistent connection, close it.
1554
The default implementation does nothing.
1558
class SmartClientStreamMedium(SmartClientMedium):
1559
"""Stream based medium common class.
1561
SmartClientStreamMediums operate on a stream. All subclasses use a common
1562
SmartClientStreamMediumRequest for their requests, and should implement
1563
_accept_bytes and _read_bytes to allow the request objects to send and
1568
self._current_request = None
1570
def accept_bytes(self, bytes):
1571
self._accept_bytes(bytes)
1574
"""The SmartClientStreamMedium knows how to close the stream when it is
1580
"""Flush the output stream.
1582
This method is used by the SmartClientStreamMediumRequest to ensure that
1583
all data for a request is sent, to avoid long timeouts or deadlocks.
1585
raise NotImplementedError(self._flush)
1587
def get_request(self):
1588
"""See SmartClientMedium.get_request().
1590
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1593
return SmartClientStreamMediumRequest(self)
1595
def read_bytes(self, count):
1596
return self._read_bytes(count)
1599
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1600
"""A client medium using simple pipes.
1602
This client does not manage the pipes: it assumes they will always be open.
1605
def __init__(self, readable_pipe, writeable_pipe):
1606
SmartClientStreamMedium.__init__(self)
1607
self._readable_pipe = readable_pipe
1608
self._writeable_pipe = writeable_pipe
1610
def _accept_bytes(self, bytes):
1611
"""See SmartClientStreamMedium.accept_bytes."""
1612
self._writeable_pipe.write(bytes)
1615
"""See SmartClientStreamMedium._flush()."""
1616
self._writeable_pipe.flush()
1618
def _read_bytes(self, count):
1619
"""See SmartClientStreamMedium._read_bytes."""
1620
return self._readable_pipe.read(count)
1623
class SmartSSHClientMedium(SmartClientStreamMedium):
1624
"""A client medium using SSH."""
1626
def __init__(self, host, port=None, username=None, password=None,
1628
"""Creates a client that will connect on the first use.
1630
:param vendor: An optional override for the ssh vendor to use. See
1631
bzrlib.transport.ssh for details on ssh vendors.
1633
SmartClientStreamMedium.__init__(self)
1634
self._connected = False
1636
self._password = password
1638
self._username = username
1639
self._read_from = None
1640
self._ssh_connection = None
1641
self._vendor = vendor
1642
self._write_to = None
1644
def _accept_bytes(self, bytes):
1645
"""See SmartClientStreamMedium.accept_bytes."""
1646
self._ensure_connection()
1647
self._write_to.write(bytes)
1649
def disconnect(self):
1650
"""See SmartClientMedium.disconnect()."""
1651
if not self._connected:
1653
self._read_from.close()
1654
self._write_to.close()
1655
self._ssh_connection.close()
1656
self._connected = False
1658
def _ensure_connection(self):
1659
"""Connect this medium if not already connected."""
1662
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1663
if self._vendor is None:
1664
vendor = ssh._get_ssh_vendor()
1666
vendor = self._vendor
1667
self._ssh_connection = vendor.connect_ssh(self._username,
1668
self._password, self._host, self._port,
1669
command=[executable, 'serve', '--inet', '--directory=/',
1671
self._read_from, self._write_to = \
1672
self._ssh_connection.get_filelike_channels()
1673
self._connected = True
1676
"""See SmartClientStreamMedium._flush()."""
1677
self._write_to.flush()
1679
def _read_bytes(self, count):
1680
"""See SmartClientStreamMedium.read_bytes."""
1681
if not self._connected:
1682
raise errors.MediumNotConnected(self)
1683
return self._read_from.read(count)
1686
class SmartTCPClientMedium(SmartClientStreamMedium):
1687
"""A client medium using TCP."""
1689
def __init__(self, host, port):
1690
"""Creates a client that will connect on the first use."""
1691
SmartClientStreamMedium.__init__(self)
1692
self._connected = False
1697
def _accept_bytes(self, bytes):
1698
"""See SmartClientMedium.accept_bytes."""
1699
self._ensure_connection()
1700
self._socket.sendall(bytes)
1702
def disconnect(self):
1703
"""See SmartClientMedium.disconnect()."""
1704
if not self._connected:
1706
self._socket.close()
1708
self._connected = False
1710
def _ensure_connection(self):
1711
"""Connect this medium if not already connected."""
1714
self._socket = socket.socket()
1715
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1716
result = self._socket.connect_ex((self._host, int(self._port)))
1718
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1719
(self._host, self._port, os.strerror(result)))
1720
self._connected = True
1723
"""See SmartClientStreamMedium._flush().
1725
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1726
add a means to do a flush, but that can be done in the future.
1729
def _read_bytes(self, count):
1730
"""See SmartClientMedium.read_bytes."""
1731
if not self._connected:
1732
raise errors.MediumNotConnected(self)
1733
return self._socket.recv(count)
1736
class SmartTCPTransport(SmartTransport):
1737
"""Connection to smart server over plain tcp.
1739
This is essentially just a factory to get 'RemoteTransport(url,
1740
SmartTCPClientMedium).
1743
def __init__(self, url):
1744
_scheme, _username, _password, _host, _port, _path = \
1745
transport.split_url(url)
1747
_port = BZR_DEFAULT_PORT
1751
except (ValueError, TypeError), e:
1752
raise errors.InvalidURL(
1753
path=url, extra="invalid port %s" % _port)
1754
medium = SmartTCPClientMedium(_host, _port)
1755
super(SmartTCPTransport, self).__init__(url, medium=medium)
1758
class SmartSSHTransport(SmartTransport):
1759
"""Connection to smart server over SSH.
1761
This is essentially just a factory to get 'RemoteTransport(url,
1762
SmartSSHClientMedium).
1765
def __init__(self, url):
1766
_scheme, _username, _password, _host, _port, _path = \
1767
transport.split_url(url)
1769
if _port is not None:
1771
except (ValueError, TypeError), e:
1772
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1774
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1775
super(SmartSSHTransport, self).__init__(url, medium=medium)
1778
class SmartHTTPTransport(SmartTransport):
1779
"""Just a way to connect between a bzr+http:// url and http://.
1781
This connection operates slightly differently than the SmartSSHTransport.
1782
It uses a plain http:// transport underneath, which defines what remote
1783
.bzr/smart URL we are connected to. From there, all paths that are sent are
1784
sent as relative paths, this way, the remote side can properly
1785
de-reference them, since it is likely doing rewrite rules to translate an
1786
HTTP path into a local path.
1789
def __init__(self, url, http_transport=None):
1790
assert url.startswith('bzr+http://')
1792
if http_transport is None:
1793
http_url = url[len('bzr+'):]
1794
self._http_transport = transport.get_transport(http_url)
1796
self._http_transport = http_transport
1797
http_medium = self._http_transport.get_smart_medium()
1798
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1800
def _remote_path(self, relpath):
1801
"""After connecting HTTP Transport only deals in relative URLs."""
1807
def abspath(self, relpath):
1808
"""Return the full url to the given relative path.
1810
:param relpath: the relative path or path components
1811
:type relpath: str or list
1813
return self._unparse_url(self._combine_paths(self._path, relpath))
1815
def clone(self, relative_url):
1816
"""Make a new SmartHTTPTransport related to me.
1818
This is re-implemented rather than using the default
1819
SmartTransport.clone() because we must be careful about the underlying
1823
abs_url = self.abspath(relative_url)
1826
# By cloning the underlying http_transport, we are able to share the
1828
new_transport = self._http_transport.clone(relative_url)
1829
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1832
def get_test_permutations():
1833
"""Return (transport, server) permutations for testing."""
1834
### We may need a little more test framework support to construct an
1835
### appropriate RemoteTransport in the future.
1836
return [(SmartTCPTransport, SmartTCPServer_for_testing)]