1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
50
At the bottom level socket, pipes, HTTP server. For sockets, we have the idea
51
that you have multiple requests and get a read error because the other side did
52
shutdown. For pipes we have read pipe which will have a zero read which marks
53
end-of-file. For HTTP server environment there is not end-of-stream because
54
each request coming into the server is independent.
56
So we need a wrapper around pipes and sockets to seperate out requests from
57
substrate and this will give us a single model which is consist for HTTP,
63
MEDIUM (factory for protocol, reads bytes & pushes to protocol,
64
uses protocol to detect end-of-request, sends written
65
bytes to client) e.g. socket, pipe, HTTP request handler.
70
PROTOCOL (serialization, deserialization) accepts bytes for one
71
request, decodes according to internal state, pushes
72
structured data to handler. accepts structured data from
73
handler and encodes and writes to the medium. factory for
79
HANDLER (domain logic) accepts structured data, operates state
80
machine until the request can be satisfied,
81
sends structured data to the protocol.
87
CLIENT domain logic, accepts domain requests, generated structured
88
data, reads structured data from responses and turns into
89
domain data. Sends structured data to the protocol.
90
Operates state machines until the request can be delivered
91
(e.g. reading from a bundle generated in bzrlib to deliver a
94
Possibly this should just be RemoteBzrDir, RemoteTransport,
100
PROTOCOL (serialization, deserialization) accepts structured data for one
101
request, encodes and writes to the medium. Reads bytes from the
102
medium, decodes and allows the client to read structured data.
107
MEDIUM (accepts bytes from the protocol & delivers to the remote server.
108
Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
112
# TODO: _translate_error should be on the client, not the transport because
113
# error coding is wire protocol specific.
115
# TODO: A plain integer from query_version is too simple; should give some
118
# TODO: Server should probably catch exceptions within itself and send them
119
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
120
# Also needs to somehow report protocol errors like bad requests. Need to
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
# bulk transfer and then something goes wrong.
124
# TODO: Standard marker at start of request/response lines?
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
128
# TODO: get/put objects could be changed to gradually read back the data as it
129
# comes across the network
131
# TODO: What should the server do if it hits an error and has to terminate?
133
# TODO: is it useful to allow multiple chunks in the bulk data?
135
# TODO: If we get an exception during transmission of bulk data we can't just
136
# emit the exception because it won't be seen.
137
# John proposes: I think it would be worthwhile to have a header on each
138
# chunk, that indicates it is another chunk. Then you can send an 'error'
139
# chunk as long as you finish the previous chunk.
141
# TODO: Clone method on Transport; should work up towards parent directory;
142
# unclear how this should be stored or communicated to the server... maybe
143
# just pass it on all relevant requests?
145
# TODO: Better name than clone() for changing between directories. How about
146
# open_dir or change_dir or chdir?
148
# TODO: Is it really good to have the notion of current directory within the
149
# connection? Perhaps all Transports should factor out a common connection
150
# from the thing that has the directory context?
152
# TODO: Pull more things common to sftp and ssh to a higher level.
154
# TODO: The server that manages a connection should be quite small and retain
155
# minimum state because each of the requests are supposed to be stateless.
156
# Then we can write another implementation that maps to http.
158
# TODO: What to do when a client connection is garbage collected? Maybe just
159
# abruptly drop the connection?
161
# TODO: Server in some cases will need to restrict access to files outside of
162
# a particular root directory. LocalTransport doesn't do anything to stop you
163
# ascending above the base directory, so we need to prevent paths
164
# containing '..' in either the server or transport layers. (Also need to
165
# consider what happens if someone creates a symlink pointing outside the
168
# TODO: Server should rebase absolute paths coming across the network to put
169
# them under the virtual root, if one is in use. LocalTransport currently
170
# doesn't do that; if you give it an absolute path it just uses it.
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
# urlescape them instead. Indeed possibly this should just literally be
176
# FIXME: This transport, with several others, has imperfect handling of paths
177
# within urls. It'd probably be better for ".." from a root to raise an error
178
# rather than return the same directory as we do at present.
180
# TODO: Rather than working at the Transport layer we want a Branch,
181
# Repository or BzrDir objects that talk to a server.
183
# TODO: Probably want some way for server commands to gradually produce body
184
# data rather than passing it as a string; they could perhaps pass an
185
# iterator-like callback that will gradually yield data; it probably needs a
186
# close() method that will always be closed to do any necessary cleanup.
188
# TODO: Split the actual smart server from the ssh encoding of it.
190
# TODO: Perhaps support file-level readwrite operations over the transport
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
# branch doing file-level operations.
197
from cStringIO import StringIO
213
from bzrlib.bundle.serializer import write_bundle
215
from bzrlib.transport import ssh
216
except errors.ParamikoNotPresent:
217
# no paramiko. SmartSSHClientMedium will break.
220
# must do this otherwise urllib can't parse the urls properly :(
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
222
transport.register_urlparse_netloc_protocol(scheme)
226
def _recv_tuple(from_file):
227
req_line = from_file.readline()
228
return _decode_tuple(req_line)
231
def _decode_tuple(req_line):
232
if req_line == None or req_line == '':
234
if req_line[-1] != '\n':
235
raise errors.SmartProtocolError("request %r not terminated" % req_line)
236
return tuple(req_line[:-1].split('\x01'))
239
def _encode_tuple(args):
240
"""Encode the tuple args to a bytestream."""
241
return '\x01'.join(args) + '\n'
244
class SmartProtocolBase(object):
245
"""Methods common to client and server"""
247
# TODO: this only actually accomodates a single block; possibly should
248
# support multiple chunks?
249
def _encode_bulk_data(self, body):
250
"""Encode body as a bulk data chunk."""
251
return ''.join(('%d\n' % len(body), body, 'done\n'))
253
def _serialise_offsets(self, offsets):
254
"""Serialise a readv offset list."""
256
for start, length in offsets:
257
txt.append('%d,%d' % (start, length))
258
return '\n'.join(txt)
261
class SmartServerRequestProtocolOne(SmartProtocolBase):
262
"""Server-side encoding and decoding logic for smart version 1."""
264
def __init__(self, backing_transport, write_func):
265
self._backing_transport = backing_transport
266
self.excess_buffer = ''
267
self._finished = False
269
self.has_dispatched = False
271
self._body_decoder = None
272
self._write_func = write_func
274
def accept_bytes(self, bytes):
275
"""Take bytes, and advance the internal state machine appropriately.
277
:param bytes: must be a byte string
279
assert isinstance(bytes, str)
280
self.in_buffer += bytes
281
if not self.has_dispatched:
282
if '\n' not in self.in_buffer:
283
# no command line yet
285
self.has_dispatched = True
287
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
289
req_args = _decode_tuple(first_line)
290
self.request = SmartServerRequestHandler(
291
self._backing_transport)
292
self.request.dispatch_command(req_args[0], req_args[1:])
293
if self.request.finished_reading:
295
self.excess_buffer = self.in_buffer
297
self._send_response(self.request.response.args,
298
self.request.response.body)
299
except KeyboardInterrupt:
301
except Exception, exception:
302
# everything else: pass to client, flush, and quit
303
self._send_response(('error', str(exception)))
306
if self.has_dispatched:
308
# nothing to do.XXX: this routine should be a single state
310
self.excess_buffer += self.in_buffer
313
if self._body_decoder is None:
314
self._body_decoder = LengthPrefixedBodyDecoder()
315
self._body_decoder.accept_bytes(self.in_buffer)
316
self.in_buffer = self._body_decoder.unused_data
317
body_data = self._body_decoder.read_pending_data()
318
self.request.accept_body(body_data)
319
if self._body_decoder.finished_reading:
320
self.request.end_of_body()
321
assert self.request.finished_reading, \
322
"no more body, request not finished"
323
if self.request.response is not None:
324
self._send_response(self.request.response.args,
325
self.request.response.body)
326
self.excess_buffer = self.in_buffer
329
assert not self.request.finished_reading, \
330
"no response and we have finished reading."
332
def _send_response(self, args, body=None):
333
"""Send a smart server response down the output stream."""
334
assert not self._finished, 'response already sent'
335
self._finished = True
336
self._write_func(_encode_tuple(args))
338
assert isinstance(body, str), 'body must be a str'
339
bytes = self._encode_bulk_data(body)
340
self._write_func(bytes)
342
def next_read_size(self):
345
if self._body_decoder is None:
348
return self._body_decoder.next_read_size()
351
class LengthPrefixedBodyDecoder(object):
352
"""Decodes the length-prefixed bulk data."""
355
self.bytes_left = None
356
self.finished_reading = False
357
self.unused_data = ''
358
self.state_accept = self._state_accept_expecting_length
359
self.state_read = self._state_read_no_data
361
self._trailer_buffer = ''
363
def accept_bytes(self, bytes):
364
"""Decode as much of bytes as possible.
366
If 'bytes' contains too much data it will be appended to
369
finished_reading will be set when no more data is required. Further
370
data will be appended to self.unused_data.
372
# accept_bytes is allowed to change the state
373
current_state = self.state_accept
374
self.state_accept(bytes)
375
while current_state != self.state_accept:
376
current_state = self.state_accept
377
self.state_accept('')
379
def next_read_size(self):
380
if self.bytes_left is not None:
381
# Ideally we want to read all the remainder of the body and the
383
return self.bytes_left + 5
384
elif self.state_accept == self._state_accept_reading_trailer:
385
# Just the trailer left
386
return 5 - len(self._trailer_buffer)
387
elif self.state_accept == self._state_accept_expecting_length:
388
# There's still at least 6 bytes left ('\n' to end the length, plus
392
# Reading excess data. Either way, 1 byte at a time is fine.
395
def read_pending_data(self):
396
"""Return any pending data that has been decoded."""
397
return self.state_read()
399
def _state_accept_expecting_length(self, bytes):
400
self._in_buffer += bytes
401
pos = self._in_buffer.find('\n')
404
self.bytes_left = int(self._in_buffer[:pos])
405
self._in_buffer = self._in_buffer[pos+1:]
406
self.bytes_left -= len(self._in_buffer)
407
self.state_accept = self._state_accept_reading_body
408
self.state_read = self._state_read_in_buffer
410
def _state_accept_reading_body(self, bytes):
411
self._in_buffer += bytes
412
self.bytes_left -= len(bytes)
413
if self.bytes_left <= 0:
415
if self.bytes_left != 0:
416
self._trailer_buffer = self._in_buffer[self.bytes_left:]
417
self._in_buffer = self._in_buffer[:self.bytes_left]
418
self.bytes_left = None
419
self.state_accept = self._state_accept_reading_trailer
421
def _state_accept_reading_trailer(self, bytes):
422
self._trailer_buffer += bytes
423
# TODO: what if the trailer does not match "done\n"? Should this raise
424
# a ProtocolViolation exception?
425
if self._trailer_buffer.startswith('done\n'):
426
self.unused_data = self._trailer_buffer[len('done\n'):]
427
self.state_accept = self._state_accept_reading_unused
428
self.finished_reading = True
430
def _state_accept_reading_unused(self, bytes):
431
self.unused_data += bytes
433
def _state_read_no_data(self):
436
def _state_read_in_buffer(self):
437
result = self._in_buffer
442
class SmartServerStreamMedium(object):
443
"""Handles smart commands coming over a stream.
445
The stream may be a pipe connected to sshd, or a tcp socket, or an
446
in-process fifo for testing.
448
One instance is created for each connected client; it can serve multiple
449
requests in the lifetime of the connection.
451
The server passes requests through to an underlying backing transport,
452
which will typically be a LocalTransport looking at the server's filesystem.
455
def __init__(self, backing_transport):
456
"""Construct new server.
458
:param backing_transport: Transport for the directory served.
460
# backing_transport could be passed to serve instead of __init__
461
self.backing_transport = backing_transport
462
self.finished = False
465
"""Serve requests until the client disconnects."""
466
# Keep a reference to stderr because the sys module's globals get set to
467
# None during interpreter shutdown.
468
from sys import stderr
470
while not self.finished:
471
protocol = SmartServerRequestProtocolOne(self.backing_transport,
473
self._serve_one_request(protocol)
475
stderr.write("%s terminating on exception %s\n" % (self, e))
478
def _serve_one_request(self, protocol):
479
"""Read one request from input, process, send back a response.
481
:param protocol: a SmartServerRequestProtocol.
484
self._serve_one_request_unguarded(protocol)
485
except KeyboardInterrupt:
488
self.terminate_due_to_error()
490
def terminate_due_to_error(self):
491
"""Called when an unhandled exception from the protocol occurs."""
492
raise NotImplementedError(self.terminate_due_to_error)
495
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
497
def __init__(self, sock, backing_transport):
500
:param sock: the socket the server will read from. It will be put
503
SmartServerStreamMedium.__init__(self, backing_transport)
505
sock.setblocking(True)
508
def _serve_one_request_unguarded(self, protocol):
509
while protocol.next_read_size():
511
protocol.accept_bytes(self.push_back)
514
bytes = self.socket.recv(4096)
518
protocol.accept_bytes(bytes)
520
self.push_back = protocol.excess_buffer
522
def terminate_due_to_error(self):
523
"""Called when an unhandled exception from the protocol occurs."""
524
# TODO: This should log to a server log file, but no such thing
525
# exists yet. Andrew Bennetts 2006-09-29.
529
def _write_out(self, bytes):
530
self.socket.sendall(bytes)
533
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
535
def __init__(self, in_file, out_file, backing_transport):
536
"""Construct new server.
538
:param in_file: Python file from which requests can be read.
539
:param out_file: Python file to write responses.
540
:param backing_transport: Transport for the directory served.
542
SmartServerStreamMedium.__init__(self, backing_transport)
546
def _serve_one_request_unguarded(self, protocol):
548
bytes_to_read = protocol.next_read_size()
549
if bytes_to_read == 0:
550
# Finished serving this request.
553
bytes = self._in.read(bytes_to_read)
555
# Connection has been closed.
559
protocol.accept_bytes(bytes)
561
def terminate_due_to_error(self):
562
# TODO: This should log to a server log file, but no such thing
563
# exists yet. Andrew Bennetts 2006-09-29.
567
def _write_out(self, bytes):
568
self._out.write(bytes)
571
class SmartServerResponse(object):
572
"""Response generated by SmartServerRequestHandler."""
574
def __init__(self, args, body=None):
578
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
579
# for delivering the data for a request. This could be done with as the
580
# StreamServer, though that would create conflation between request and response
581
# which may be undesirable.
584
class SmartServerRequestHandler(object):
585
"""Protocol logic for smart server.
587
This doesn't handle serialization at all, it just processes requests and
591
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
592
# not contain encoding or decoding logic to allow the wire protocol to vary
593
# from the object protocol: we will want to tweak the wire protocol separate
594
# from the object model, and ideally we will be able to do that without
595
# having a SmartServerRequestHandler subclass for each wire protocol, rather
596
# just a Protocol subclass.
598
# TODO: Better way of representing the body for commands that take it,
599
# and allow it to be streamed into the server.
601
def __init__(self, backing_transport):
602
self._backing_transport = backing_transport
603
self._converted_command = False
604
self.finished_reading = False
605
self._body_bytes = ''
608
def accept_body(self, bytes):
611
This should be overriden for each command that desired body data to
612
handle the right format of that data. I.e. plain bytes, a bundle etc.
614
The deserialisation into that format should be done in the Protocol
615
object. Set self.desired_body_format to the format your method will
618
# default fallback is to accumulate bytes.
619
self._body_bytes += bytes
621
def _end_of_body_handler(self):
622
"""An unimplemented end of body handler."""
623
raise NotImplementedError(self._end_of_body_handler)
626
"""Answer a version request with my version."""
627
return SmartServerResponse(('ok', '1'))
629
def do_has(self, relpath):
630
r = self._backing_transport.has(relpath) and 'yes' or 'no'
631
return SmartServerResponse((r,))
633
def do_get(self, relpath):
634
backing_bytes = self._backing_transport.get_bytes(relpath)
635
return SmartServerResponse(('ok',), backing_bytes)
637
def _deserialise_optional_mode(self, mode):
638
# XXX: FIXME this should be on the protocol object.
644
def do_append(self, relpath, mode):
645
self._converted_command = True
646
self._relpath = relpath
647
self._mode = self._deserialise_optional_mode(mode)
648
self._end_of_body_handler = self._handle_do_append_end
650
def _handle_do_append_end(self):
651
old_length = self._backing_transport.append_bytes(
652
self._relpath, self._body_bytes, self._mode)
653
self.response = SmartServerResponse(('appended', '%d' % old_length))
655
def do_delete(self, relpath):
656
self._backing_transport.delete(relpath)
658
def do_iter_files_recursive(self, relpath):
659
transport = self._backing_transport.clone(relpath)
660
filenames = transport.iter_files_recursive()
661
return SmartServerResponse(('names',) + tuple(filenames))
663
def do_list_dir(self, relpath):
664
filenames = self._backing_transport.list_dir(relpath)
665
return SmartServerResponse(('names',) + tuple(filenames))
667
def do_mkdir(self, relpath, mode):
668
self._backing_transport.mkdir(relpath,
669
self._deserialise_optional_mode(mode))
671
def do_move(self, rel_from, rel_to):
672
self._backing_transport.move(rel_from, rel_to)
674
def do_put(self, relpath, mode):
675
self._converted_command = True
676
self._relpath = relpath
677
self._mode = self._deserialise_optional_mode(mode)
678
self._end_of_body_handler = self._handle_do_put
680
def _handle_do_put(self):
681
self._backing_transport.put_bytes(self._relpath,
682
self._body_bytes, self._mode)
683
self.response = SmartServerResponse(('ok',))
685
def _deserialise_offsets(self, text):
686
# XXX: FIXME this should be on the protocol object.
688
for line in text.split('\n'):
691
start, length = line.split(',')
692
offsets.append((int(start), int(length)))
695
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
696
self._converted_command = True
697
self._end_of_body_handler = self._handle_put_non_atomic
698
self._relpath = relpath
699
self._dir_mode = self._deserialise_optional_mode(dir_mode)
700
self._mode = self._deserialise_optional_mode(mode)
701
# a boolean would be nicer XXX
702
self._create_parent = (create_parent == 'T')
704
def _handle_put_non_atomic(self):
705
self._backing_transport.put_bytes_non_atomic(self._relpath,
708
create_parent_dir=self._create_parent,
709
dir_mode=self._dir_mode)
710
self.response = SmartServerResponse(('ok',))
712
def do_readv(self, relpath):
713
self._converted_command = True
714
self._end_of_body_handler = self._handle_readv_offsets
715
self._relpath = relpath
717
def end_of_body(self):
718
"""No more body data will be received."""
719
self._run_handler_code(self._end_of_body_handler, (), {})
720
# cannot read after this.
721
self.finished_reading = True
723
def _handle_readv_offsets(self):
724
"""accept offsets for a readv request."""
725
offsets = self._deserialise_offsets(self._body_bytes)
726
backing_bytes = ''.join(bytes for offset, bytes in
727
self._backing_transport.readv(self._relpath, offsets))
728
self.response = SmartServerResponse(('readv',), backing_bytes)
730
def do_rename(self, rel_from, rel_to):
731
self._backing_transport.rename(rel_from, rel_to)
733
def do_rmdir(self, relpath):
734
self._backing_transport.rmdir(relpath)
736
def do_stat(self, relpath):
737
stat = self._backing_transport.stat(relpath)
738
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
740
def do_get_bundle(self, path, revision_id):
741
# open transport relative to our base
742
t = self._backing_transport.clone(path)
743
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
744
repo = control.open_repository()
745
tmpf = tempfile.TemporaryFile()
746
base_revision = revision.NULL_REVISION
747
write_bundle(repo, revision_id, base_revision, tmpf)
749
return SmartServerResponse((), tmpf.read())
751
def dispatch_command(self, cmd, args):
752
"""Deprecated compatibility method.""" # XXX XXX
753
func = getattr(self, 'do_' + cmd, None)
755
raise errors.SmartProtocolError("bad request %r" % (cmd,))
756
self._run_handler_code(func, args, {})
758
def _run_handler_code(self, callable, args, kwargs):
759
"""Run some handler specific code 'callable'.
761
If a result is returned, it is considered to be the commands response,
762
and finished_reading is set true, and its assigned to self.response.
764
Any exceptions caught are translated and a response object created
767
result = self._call_converting_errors(callable, args, kwargs)
768
if result is not None:
769
self.response = result
770
self.finished_reading = True
771
# handle unconverted commands
772
if not self._converted_command:
773
self.finished_reading = True
775
self.response = SmartServerResponse(('ok',))
777
def _call_converting_errors(self, callable, args, kwargs):
778
"""Call callable converting errors to Response objects."""
780
return callable(*args, **kwargs)
781
except errors.NoSuchFile, e:
782
return SmartServerResponse(('NoSuchFile', e.path))
783
except errors.FileExists, e:
784
return SmartServerResponse(('FileExists', e.path))
785
except errors.DirectoryNotEmpty, e:
786
return SmartServerResponse(('DirectoryNotEmpty', e.path))
787
except errors.ShortReadvError, e:
788
return SmartServerResponse(('ShortReadvError',
789
e.path, str(e.offset), str(e.length), str(e.actual)))
790
except UnicodeError, e:
791
# If it is a DecodeError, than most likely we are starting
792
# with a plain string
793
str_or_unicode = e.object
794
if isinstance(str_or_unicode, unicode):
795
# XXX: UTF-8 might have \x01 (our seperator byte) in it. We
796
# should escape it somehow.
797
val = 'u:' + str_or_unicode.encode('utf-8')
799
val = 's:' + str_or_unicode.encode('base64')
800
# This handles UnicodeEncodeError or UnicodeDecodeError
801
return SmartServerResponse((e.__class__.__name__,
802
e.encoding, val, str(e.start), str(e.end), e.reason))
803
except errors.TransportNotPossible, e:
804
if e.msg == "readonly transport":
805
return SmartServerResponse(('ReadOnlyError', ))
810
class SmartTCPServer(object):
811
"""Listens on a TCP socket and accepts connections from smart clients"""
813
def __init__(self, backing_transport, host='127.0.0.1', port=0):
814
"""Construct a new server.
816
To actually start it running, call either start_background_thread or
819
:param host: Name of the interface to listen on.
820
:param port: TCP port to listen on, or 0 to allocate a transient port.
822
self._server_socket = socket.socket()
823
self._server_socket.bind((host, port))
824
self.port = self._server_socket.getsockname()[1]
825
self._server_socket.listen(1)
826
self._server_socket.settimeout(1)
827
self.backing_transport = backing_transport
830
# let connections timeout so that we get a chance to terminate
831
# Keep a reference to the exceptions we want to catch because the socket
832
# module's globals get set to None during interpreter shutdown.
833
from socket import timeout as socket_timeout
834
from socket import error as socket_error
835
self._should_terminate = False
836
while not self._should_terminate:
838
self.accept_and_serve()
839
except socket_timeout:
840
# just check if we're asked to stop
842
except socket_error, e:
843
trace.warning("client disconnected: %s", e)
847
"""Return the url of the server"""
848
return "bzr://%s:%d/" % self._server_socket.getsockname()
850
def accept_and_serve(self):
851
conn, client_addr = self._server_socket.accept()
852
# For WIN32, where the timeout value from the listening socket
853
# propogates to the newly accepted socket.
854
conn.setblocking(True)
855
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
856
handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
857
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
858
connection_thread.setDaemon(True)
859
connection_thread.start()
861
def start_background_thread(self):
862
self._server_thread = threading.Thread(None,
864
name='server-' + self.get_url())
865
self._server_thread.setDaemon(True)
866
self._server_thread.start()
868
def stop_background_thread(self):
869
self._should_terminate = True
870
# At one point we would wait to join the threads here, but it looks
871
# like they don't actually exit. So now we just leave them running
872
# and expect to terminate the process. -- mbp 20070215
873
# self._server_socket.close()
874
## sys.stderr.write("waiting for server thread to finish...")
875
## self._server_thread.join()
878
class SmartTCPServer_for_testing(SmartTCPServer):
879
"""Server suitable for use by transport tests.
881
This server is backed by the process's cwd.
885
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
886
# The server is set up by default like for ssh access: the client
887
# passes filesystem-absolute paths; therefore the server must look
888
# them up relative to the root directory. it might be better to act
889
# a public server and have the server rewrite paths into the test
891
SmartTCPServer.__init__(self,
892
transport.get_transport(urlutils.local_path_to_url('/')))
895
"""Set up server for testing"""
896
self.start_background_thread()
899
self.stop_background_thread()
902
"""Return the url of the server"""
903
host, port = self._server_socket.getsockname()
904
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
906
def get_bogus_url(self):
907
"""Return a URL which will fail to connect"""
908
return 'bzr://127.0.0.1:1/'
911
class SmartStat(object):
913
def __init__(self, size, mode):
918
class SmartTransport(transport.Transport):
919
"""Connection to a smart server.
921
The connection holds references to pipes that can be used to send requests
924
The connection has a notion of the current directory to which it's
925
connected; this is incorporated in filenames passed to the server.
927
This supports some higher-level RPC operations and can also be treated
928
like a Transport to do file-like operations.
930
The connection can be made over a tcp socket, or (in future) an ssh pipe
931
or a series of http requests. There are concrete subclasses for each
932
type: SmartTCPTransport, etc.
935
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
936
# responsibilities: Put those on SmartClient or similar. This is vital for
937
# the ability to support multiple versions of the smart protocol over time:
938
# SmartTransport is an adapter from the Transport object model to the
939
# SmartClient model, not an encoder.
941
def __init__(self, url, clone_from=None, medium=None):
944
:param medium: The medium to use for this RemoteTransport. This must be
945
supplied if clone_from is None.
947
### Technically super() here is faulty because Transport's __init__
948
### fails to take 2 parameters, and if super were to choose a silly
949
### initialisation order things would blow up.
950
if not url.endswith('/'):
952
super(SmartTransport, self).__init__(url)
953
self._scheme, self._username, self._password, self._host, self._port, self._path = \
954
transport.split_url(url)
955
if clone_from is None:
956
self._medium = medium
958
# credentials may be stripped from the base in some circumstances
959
# as yet to be clearly defined or documented, so copy them.
960
self._username = clone_from._username
961
# reuse same connection
962
self._medium = clone_from._medium
963
assert self._medium is not None
965
def abspath(self, relpath):
966
"""Return the full url to the given relative path.
968
@param relpath: the relative path or path components
969
@type relpath: str or list
971
return self._unparse_url(self._remote_path(relpath))
973
def clone(self, relative_url):
974
"""Make a new SmartTransport related to me, sharing the same connection.
976
This essentially opens a handle on a different remote directory.
978
if relative_url is None:
979
return SmartTransport(self.base, self)
981
return SmartTransport(self.abspath(relative_url), self)
983
def is_readonly(self):
984
"""Smart server transport can do read/write file operations."""
987
def get_smart_client(self):
990
def get_smart_medium(self):
993
def _unparse_url(self, path):
994
"""Return URL for a path.
996
:see: SFTPUrlHandling._unparse_url
998
# TODO: Eventually it should be possible to unify this with
999
# SFTPUrlHandling._unparse_url?
1002
path = urllib.quote(path)
1003
netloc = urllib.quote(self._host)
1004
if self._username is not None:
1005
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1006
if self._port is not None:
1007
netloc = '%s:%d' % (netloc, self._port)
1008
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1010
def _remote_path(self, relpath):
1011
"""Returns the Unicode version of the absolute path for relpath."""
1012
return self._combine_paths(self._path, relpath)
1014
def _call(self, method, *args):
1015
resp = self._call2(method, *args)
1016
self._translate_error(resp)
1018
def _call2(self, method, *args):
1019
"""Call a method on the remote server."""
1020
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1021
protocol.call(method, *args)
1022
return protocol.read_response_tuple()
1024
def _call_with_body_bytes(self, method, args, body):
1025
"""Call a method on the remote server with body bytes."""
1026
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1027
protocol.call_with_body_bytes((method, ) + args, body)
1028
return protocol.read_response_tuple()
1030
def has(self, relpath):
1031
"""Indicate whether a remote file of the given name exists or not.
1033
:see: Transport.has()
1035
resp = self._call2('has', self._remote_path(relpath))
1036
if resp == ('yes', ):
1038
elif resp == ('no', ):
1041
self._translate_error(resp)
1043
def get(self, relpath):
1044
"""Return file-like object reading the contents of a remote file.
1046
:see: Transport.get_bytes()/get_file()
1048
return StringIO(self.get_bytes(relpath))
1050
def get_bytes(self, relpath):
1051
remote = self._remote_path(relpath)
1052
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1053
protocol.call('get', remote)
1054
resp = protocol.read_response_tuple(True)
1055
if resp != ('ok', ):
1056
protocol.cancel_read_body()
1057
self._translate_error(resp, relpath)
1058
return protocol.read_body_bytes()
1060
def _serialise_optional_mode(self, mode):
1066
def mkdir(self, relpath, mode=None):
1067
resp = self._call2('mkdir', self._remote_path(relpath),
1068
self._serialise_optional_mode(mode))
1069
self._translate_error(resp)
1071
def put_bytes(self, relpath, upload_contents, mode=None):
1072
# FIXME: upload_file is probably not safe for non-ascii characters -
1073
# should probably just pass all parameters as length-delimited
1075
resp = self._call_with_body_bytes('put',
1076
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1078
self._translate_error(resp)
1080
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1081
create_parent_dir=False,
1083
"""See Transport.put_bytes_non_atomic."""
1084
# FIXME: no encoding in the transport!
1085
create_parent_str = 'F'
1086
if create_parent_dir:
1087
create_parent_str = 'T'
1089
resp = self._call_with_body_bytes(
1091
(self._remote_path(relpath), self._serialise_optional_mode(mode),
1092
create_parent_str, self._serialise_optional_mode(dir_mode)),
1094
self._translate_error(resp)
1096
def put_file(self, relpath, upload_file, mode=None):
1097
# its not ideal to seek back, but currently put_non_atomic_file depends
1098
# on transports not reading before failing - which is a faulty
1099
# assumption I think - RBC 20060915
1100
pos = upload_file.tell()
1102
return self.put_bytes(relpath, upload_file.read(), mode)
1104
upload_file.seek(pos)
1107
def put_file_non_atomic(self, relpath, f, mode=None,
1108
create_parent_dir=False,
1110
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1111
create_parent_dir=create_parent_dir,
1114
def append_file(self, relpath, from_file, mode=None):
1115
return self.append_bytes(relpath, from_file.read(), mode)
1117
def append_bytes(self, relpath, bytes, mode=None):
1118
resp = self._call_with_body_bytes(
1120
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
1122
if resp[0] == 'appended':
1124
self._translate_error(resp)
1126
def delete(self, relpath):
1127
resp = self._call2('delete', self._remote_path(relpath))
1128
self._translate_error(resp)
1130
def readv(self, relpath, offsets):
1134
offsets = list(offsets)
1136
sorted_offsets = sorted(offsets)
1137
# turn the list of offsets into a stack
1138
offset_stack = iter(offsets)
1139
cur_offset_and_size = offset_stack.next()
1140
coalesced = list(self._coalesce_offsets(sorted_offsets,
1141
limit=self._max_readv_combine,
1142
fudge_factor=self._bytes_to_read_before_seek))
1144
protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1145
protocol.call_with_body_readv_array(
1146
('readv', self._remote_path(relpath)),
1147
[(c.start, c.length) for c in coalesced])
1148
resp = protocol.read_response_tuple(True)
1150
if resp[0] != 'readv':
1151
# This should raise an exception
1152
protocol.cancel_read_body()
1153
self._translate_error(resp)
1156
# FIXME: this should know how many bytes are needed, for clarity.
1157
data = protocol.read_body_bytes()
1158
# Cache the results, but only until they have been fulfilled
1160
for c_offset in coalesced:
1161
if len(data) < c_offset.length:
1162
raise errors.ShortReadvError(relpath, c_offset.start,
1163
c_offset.length, actual=len(data))
1164
for suboffset, subsize in c_offset.ranges:
1165
key = (c_offset.start+suboffset, subsize)
1166
data_map[key] = data[suboffset:suboffset+subsize]
1167
data = data[c_offset.length:]
1169
# Now that we've read some data, see if we can yield anything back
1170
while cur_offset_and_size in data_map:
1171
this_data = data_map.pop(cur_offset_and_size)
1172
yield cur_offset_and_size[0], this_data
1173
cur_offset_and_size = offset_stack.next()
1175
def rename(self, rel_from, rel_to):
1176
self._call('rename',
1177
self._remote_path(rel_from),
1178
self._remote_path(rel_to))
1180
def move(self, rel_from, rel_to):
1182
self._remote_path(rel_from),
1183
self._remote_path(rel_to))
1185
def rmdir(self, relpath):
1186
resp = self._call('rmdir', self._remote_path(relpath))
1188
def _translate_error(self, resp, orig_path=None):
1189
"""Raise an exception from a response"""
1196
elif what == 'NoSuchFile':
1197
if orig_path is not None:
1198
error_path = orig_path
1200
error_path = resp[1]
1201
raise errors.NoSuchFile(error_path)
1202
elif what == 'error':
1203
raise errors.SmartProtocolError(unicode(resp[1]))
1204
elif what == 'FileExists':
1205
raise errors.FileExists(resp[1])
1206
elif what == 'DirectoryNotEmpty':
1207
raise errors.DirectoryNotEmpty(resp[1])
1208
elif what == 'ShortReadvError':
1209
raise errors.ShortReadvError(resp[1], int(resp[2]),
1210
int(resp[3]), int(resp[4]))
1211
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1212
encoding = str(resp[1]) # encoding must always be a string
1214
start = int(resp[3])
1216
reason = str(resp[5]) # reason must always be a string
1217
if val.startswith('u:'):
1218
val = val[2:].decode('utf-8')
1219
elif val.startswith('s:'):
1220
val = val[2:].decode('base64')
1221
if what == 'UnicodeDecodeError':
1222
raise UnicodeDecodeError(encoding, val, start, end, reason)
1223
elif what == 'UnicodeEncodeError':
1224
raise UnicodeEncodeError(encoding, val, start, end, reason)
1225
elif what == "ReadOnlyError":
1226
raise errors.TransportNotPossible('readonly transport')
1228
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1230
def disconnect(self):
1231
self._medium.disconnect()
1233
def delete_tree(self, relpath):
1234
raise errors.TransportNotPossible('readonly transport')
1236
def stat(self, relpath):
1237
resp = self._call2('stat', self._remote_path(relpath))
1238
if resp[0] == 'stat':
1239
return SmartStat(int(resp[1]), int(resp[2], 8))
1241
self._translate_error(resp)
1243
## def lock_read(self, relpath):
1244
## """Lock the given file for shared (read) access.
1245
## :return: A lock object, which should be passed to Transport.unlock()
1247
## # The old RemoteBranch ignore lock for reading, so we will
1248
## # continue that tradition and return a bogus lock object.
1249
## class BogusLock(object):
1250
## def __init__(self, path):
1252
## def unlock(self):
1254
## return BogusLock(relpath)
1259
def list_dir(self, relpath):
1260
resp = self._call2('list_dir', self._remote_path(relpath))
1261
if resp[0] == 'names':
1262
return [name.encode('ascii') for name in resp[1:]]
1264
self._translate_error(resp)
1266
def iter_files_recursive(self):
1267
resp = self._call2('iter_files_recursive', self._remote_path(''))
1268
if resp[0] == 'names':
1271
self._translate_error(resp)
1274
class SmartClientMediumRequest(object):
1275
"""A request on a SmartClientMedium.
1277
Each request allows bytes to be provided to it via accept_bytes, and then
1278
the response bytes to be read via read_bytes.
1281
request.accept_bytes('123')
1282
request.finished_writing()
1283
result = request.read_bytes(3)
1284
request.finished_reading()
1286
It is up to the individual SmartClientMedium whether multiple concurrent
1287
requests can exist. See SmartClientMedium.get_request to obtain instances
1288
of SmartClientMediumRequest, and the concrete Medium you are using for
1289
details on concurrency and pipelining.
1292
def __init__(self, medium):
1293
"""Construct a SmartClientMediumRequest for the medium medium."""
1294
self._medium = medium
1295
# we track state by constants - we may want to use the same
1296
# pattern as BodyReader if it gets more complex.
1297
# valid states are: "writing", "reading", "done"
1298
self._state = "writing"
1300
def accept_bytes(self, bytes):
1301
"""Accept bytes for inclusion in this request.
1303
This method may not be be called after finished_writing() has been
1304
called. It depends upon the Medium whether or not the bytes will be
1305
immediately transmitted. Message based Mediums will tend to buffer the
1306
bytes until finished_writing() is called.
1308
:param bytes: A bytestring.
1310
if self._state != "writing":
1311
raise errors.WritingCompleted(self)
1312
self._accept_bytes(bytes)
1314
def _accept_bytes(self, bytes):
1315
"""Helper for accept_bytes.
1317
Accept_bytes checks the state of the request to determing if bytes
1318
should be accepted. After that it hands off to _accept_bytes to do the
1321
raise NotImplementedError(self._accept_bytes)
1323
def finished_reading(self):
1324
"""Inform the request that all desired data has been read.
1326
This will remove the request from the pipeline for its medium (if the
1327
medium supports pipelining) and any further calls to methods on the
1328
request will raise ReadingCompleted.
1330
if self._state == "writing":
1331
raise errors.WritingNotComplete(self)
1332
if self._state != "reading":
1333
raise errors.ReadingCompleted(self)
1334
self._state = "done"
1335
self._finished_reading()
1337
def _finished_reading(self):
1338
"""Helper for finished_reading.
1340
finished_reading checks the state of the request to determine if
1341
finished_reading is allowed, and if it is hands off to _finished_reading
1342
to perform the action.
1344
raise NotImplementedError(self._finished_reading)
1346
def finished_writing(self):
1347
"""Finish the writing phase of this request.
1349
This will flush all pending data for this request along the medium.
1350
After calling finished_writing, you may not call accept_bytes anymore.
1352
if self._state != "writing":
1353
raise errors.WritingCompleted(self)
1354
self._state = "reading"
1355
self._finished_writing()
1357
def _finished_writing(self):
1358
"""Helper for finished_writing.
1360
finished_writing checks the state of the request to determine if
1361
finished_writing is allowed, and if it is hands off to _finished_writing
1362
to perform the action.
1364
raise NotImplementedError(self._finished_writing)
1366
def read_bytes(self, count):
1367
"""Read bytes from this requests response.
1369
This method will block and wait for count bytes to be read. It may not
1370
be invoked until finished_writing() has been called - this is to ensure
1371
a message-based approach to requests, for compatability with message
1372
based mediums like HTTP.
1374
if self._state == "writing":
1375
raise errors.WritingNotComplete(self)
1376
if self._state != "reading":
1377
raise errors.ReadingCompleted(self)
1378
return self._read_bytes(count)
1380
def _read_bytes(self, count):
1381
"""Helper for read_bytes.
1383
read_bytes checks the state of the request to determing if bytes
1384
should be read. After that it hands off to _read_bytes to do the
1387
raise NotImplementedError(self._read_bytes)
1390
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1391
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1393
def __init__(self, medium):
1394
SmartClientMediumRequest.__init__(self, medium)
1395
# check that we are safe concurrency wise. If some streams start
1396
# allowing concurrent requests - i.e. via multiplexing - then this
1397
# assert should be moved to SmartClientStreamMedium.get_request,
1398
# and the setting/unsetting of _current_request likewise moved into
1399
# that class : but its unneeded overhead for now. RBC 20060922
1400
if self._medium._current_request is not None:
1401
raise errors.TooManyConcurrentRequests(self._medium)
1402
self._medium._current_request = self
1404
def _accept_bytes(self, bytes):
1405
"""See SmartClientMediumRequest._accept_bytes.
1407
This forwards to self._medium._accept_bytes because we are operating
1408
on the mediums stream.
1410
self._medium._accept_bytes(bytes)
1412
def _finished_reading(self):
1413
"""See SmartClientMediumRequest._finished_reading.
1415
This clears the _current_request on self._medium to allow a new
1416
request to be created.
1418
assert self._medium._current_request is self
1419
self._medium._current_request = None
1421
def _finished_writing(self):
1422
"""See SmartClientMediumRequest._finished_writing.
1424
This invokes self._medium._flush to ensure all bytes are transmitted.
1426
self._medium._flush()
1428
def _read_bytes(self, count):
1429
"""See SmartClientMediumRequest._read_bytes.
1431
This forwards to self._medium._read_bytes because we are operating
1432
on the mediums stream.
1434
return self._medium._read_bytes(count)
1437
class SmartClientRequestProtocolOne(SmartProtocolBase):
1438
"""The client-side protocol for smart version 1."""
1440
def __init__(self, request):
1441
"""Construct a SmartClientRequestProtocolOne.
1443
:param request: A SmartClientMediumRequest to serialise onto and
1446
self._request = request
1447
self._body_buffer = None
1449
def call(self, *args):
1450
bytes = _encode_tuple(args)
1451
self._request.accept_bytes(bytes)
1452
self._request.finished_writing()
1454
def call_with_body_bytes(self, args, body):
1455
"""Make a remote call of args with body bytes 'body'.
1457
After calling this, call read_response_tuple to find the result out.
1459
bytes = _encode_tuple(args)
1460
self._request.accept_bytes(bytes)
1461
bytes = self._encode_bulk_data(body)
1462
self._request.accept_bytes(bytes)
1463
self._request.finished_writing()
1465
def call_with_body_readv_array(self, args, body):
1466
"""Make a remote call with a readv array.
1468
The body is encoded with one line per readv offset pair. The numbers in
1469
each pair are separated by a comma, and no trailing \n is emitted.
1471
bytes = _encode_tuple(args)
1472
self._request.accept_bytes(bytes)
1473
readv_bytes = self._serialise_offsets(body)
1474
bytes = self._encode_bulk_data(readv_bytes)
1475
self._request.accept_bytes(bytes)
1476
self._request.finished_writing()
1478
def cancel_read_body(self):
1479
"""After expecting a body, a response code may indicate one otherwise.
1481
This method lets the domain client inform the protocol that no body
1482
will be transmitted. This is a terminal method: after calling it the
1483
protocol is not able to be used further.
1485
self._request.finished_reading()
1487
def read_response_tuple(self, expect_body=False):
1488
"""Read a response tuple from the wire.
1490
This should only be called once.
1492
result = self._recv_tuple()
1494
self._request.finished_reading()
1497
def read_body_bytes(self, count=-1):
1498
"""Read bytes from the body, decoding into a byte stream.
1500
We read all bytes at once to ensure we've checked the trailer for
1501
errors, and then feed the buffer back as read_body_bytes is called.
1503
if self._body_buffer is not None:
1504
return self._body_buffer.read(count)
1505
_body_decoder = LengthPrefixedBodyDecoder()
1507
while not _body_decoder.finished_reading:
1508
bytes_wanted = _body_decoder.next_read_size()
1509
bytes = self._request.read_bytes(bytes_wanted)
1510
_body_decoder.accept_bytes(bytes)
1511
self._request.finished_reading()
1512
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1513
# XXX: TODO check the trailer result.
1514
return self._body_buffer.read(count)
1516
def _recv_tuple(self):
1517
"""Receive a tuple from the medium request."""
1519
while not line or line[-1] != '\n':
1520
# TODO: this is inefficient - but tuples are short.
1521
new_char = self._request.read_bytes(1)
1523
assert new_char != '', "end of file reading from server."
1524
return _decode_tuple(line)
1526
def query_version(self):
1527
"""Return protocol version number of the server."""
1529
resp = self.read_response_tuple()
1530
if resp == ('ok', '1'):
1533
raise errors.SmartProtocolError("bad response %r" % (resp,))
1536
class SmartClientMedium(object):
1537
"""Smart client is a medium for sending smart protocol requests over."""
1539
def disconnect(self):
1540
"""If this medium maintains a persistent connection, close it.
1542
The default implementation does nothing.
1546
class SmartClientStreamMedium(SmartClientMedium):
1547
"""Stream based medium common class.
1549
SmartClientStreamMediums operate on a stream. All subclasses use a common
1550
SmartClientStreamMediumRequest for their requests, and should implement
1551
_accept_bytes and _read_bytes to allow the request objects to send and
1556
self._current_request = None
1558
def accept_bytes(self, bytes):
1559
self._accept_bytes(bytes)
1562
"""The SmartClientStreamMedium knows how to close the stream when it is
1568
"""Flush the output stream.
1570
This method is used by the SmartClientStreamMediumRequest to ensure that
1571
all data for a request is sent, to avoid long timeouts or deadlocks.
1573
raise NotImplementedError(self._flush)
1575
def get_request(self):
1576
"""See SmartClientMedium.get_request().
1578
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1581
return SmartClientStreamMediumRequest(self)
1583
def read_bytes(self, count):
1584
return self._read_bytes(count)
1587
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1588
"""A client medium using simple pipes.
1590
This client does not manage the pipes: it assumes they will always be open.
1593
def __init__(self, readable_pipe, writeable_pipe):
1594
SmartClientStreamMedium.__init__(self)
1595
self._readable_pipe = readable_pipe
1596
self._writeable_pipe = writeable_pipe
1598
def _accept_bytes(self, bytes):
1599
"""See SmartClientStreamMedium.accept_bytes."""
1600
self._writeable_pipe.write(bytes)
1603
"""See SmartClientStreamMedium._flush()."""
1604
self._writeable_pipe.flush()
1606
def _read_bytes(self, count):
1607
"""See SmartClientStreamMedium._read_bytes."""
1608
return self._readable_pipe.read(count)
1611
class SmartSSHClientMedium(SmartClientStreamMedium):
1612
"""A client medium using SSH."""
1614
def __init__(self, host, port=None, username=None, password=None,
1616
"""Creates a client that will connect on the first use.
1618
:param vendor: An optional override for the ssh vendor to use. See
1619
bzrlib.transport.ssh for details on ssh vendors.
1621
SmartClientStreamMedium.__init__(self)
1622
self._connected = False
1624
self._password = password
1626
self._username = username
1627
self._read_from = None
1628
self._ssh_connection = None
1629
self._vendor = vendor
1630
self._write_to = None
1632
def _accept_bytes(self, bytes):
1633
"""See SmartClientStreamMedium.accept_bytes."""
1634
self._ensure_connection()
1635
self._write_to.write(bytes)
1637
def disconnect(self):
1638
"""See SmartClientMedium.disconnect()."""
1639
if not self._connected:
1641
self._read_from.close()
1642
self._write_to.close()
1643
self._ssh_connection.close()
1644
self._connected = False
1646
def _ensure_connection(self):
1647
"""Connect this medium if not already connected."""
1650
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1651
if self._vendor is None:
1652
vendor = ssh._get_ssh_vendor()
1654
vendor = self._vendor
1655
self._ssh_connection = vendor.connect_ssh(self._username,
1656
self._password, self._host, self._port,
1657
command=[executable, 'serve', '--inet', '--directory=/',
1659
self._read_from, self._write_to = \
1660
self._ssh_connection.get_filelike_channels()
1661
self._connected = True
1664
"""See SmartClientStreamMedium._flush()."""
1665
self._write_to.flush()
1667
def _read_bytes(self, count):
1668
"""See SmartClientStreamMedium.read_bytes."""
1669
if not self._connected:
1670
raise errors.MediumNotConnected(self)
1671
return self._read_from.read(count)
1674
class SmartTCPClientMedium(SmartClientStreamMedium):
1675
"""A client medium using TCP."""
1677
def __init__(self, host, port):
1678
"""Creates a client that will connect on the first use."""
1679
SmartClientStreamMedium.__init__(self)
1680
self._connected = False
1685
def _accept_bytes(self, bytes):
1686
"""See SmartClientMedium.accept_bytes."""
1687
self._ensure_connection()
1688
self._socket.sendall(bytes)
1690
def disconnect(self):
1691
"""See SmartClientMedium.disconnect()."""
1692
if not self._connected:
1694
self._socket.close()
1696
self._connected = False
1698
def _ensure_connection(self):
1699
"""Connect this medium if not already connected."""
1702
self._socket = socket.socket()
1703
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1704
result = self._socket.connect_ex((self._host, int(self._port)))
1706
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1707
(self._host, self._port, os.strerror(result)))
1708
self._connected = True
1711
"""See SmartClientStreamMedium._flush().
1713
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1714
add a means to do a flush, but that can be done in the future.
1717
def _read_bytes(self, count):
1718
"""See SmartClientMedium.read_bytes."""
1719
if not self._connected:
1720
raise errors.MediumNotConnected(self)
1721
return self._socket.recv(count)
1724
class SmartTCPTransport(SmartTransport):
1725
"""Connection to smart server over plain tcp.
1727
This is essentially just a factory to get 'RemoteTransport(url,
1728
SmartTCPClientMedium).
1731
def __init__(self, url):
1732
_scheme, _username, _password, _host, _port, _path = \
1733
transport.split_url(url)
1736
except (ValueError, TypeError), e:
1737
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1738
medium = SmartTCPClientMedium(_host, _port)
1739
super(SmartTCPTransport, self).__init__(url, medium=medium)
1742
class SmartSSHTransport(SmartTransport):
1743
"""Connection to smart server over SSH.
1745
This is essentially just a factory to get 'RemoteTransport(url,
1746
SmartSSHClientMedium).
1749
def __init__(self, url):
1750
_scheme, _username, _password, _host, _port, _path = \
1751
transport.split_url(url)
1753
if _port is not None:
1755
except (ValueError, TypeError), e:
1756
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1758
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1759
super(SmartSSHTransport, self).__init__(url, medium=medium)
1762
class SmartHTTPTransport(SmartTransport):
1763
"""Just a way to connect between a bzr+http:// url and http://.
1765
This connection operates slightly differently than the SmartSSHTransport.
1766
It uses a plain http:// transport underneath, which defines what remote
1767
.bzr/smart URL we are connected to. From there, all paths that are sent are
1768
sent as relative paths, this way, the remote side can properly
1769
de-reference them, since it is likely doing rewrite rules to translate an
1770
HTTP path into a local path.
1773
def __init__(self, url, http_transport=None):
1774
assert url.startswith('bzr+http://')
1776
if http_transport is None:
1777
http_url = url[len('bzr+'):]
1778
self._http_transport = transport.get_transport(http_url)
1780
self._http_transport = http_transport
1781
http_medium = self._http_transport.get_smart_medium()
1782
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1784
def _remote_path(self, relpath):
1785
"""After connecting HTTP Transport only deals in relative URLs."""
1791
def abspath(self, relpath):
1792
"""Return the full url to the given relative path.
1794
:param relpath: the relative path or path components
1795
:type relpath: str or list
1797
return self._unparse_url(self._combine_paths(self._path, relpath))
1799
def clone(self, relative_url):
1800
"""Make a new SmartHTTPTransport related to me.
1802
This is re-implemented rather than using the default
1803
SmartTransport.clone() because we must be careful about the underlying
1807
abs_url = self.abspath(relative_url)
1810
# By cloning the underlying http_transport, we are able to share the
1812
new_transport = self._http_transport.clone(relative_url)
1813
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1816
def get_test_permutations():
1817
"""Return (transport, server) permutations for testing."""
1818
### We may need a little more test framework support to construct an
1819
### appropriate RemoteTransport in the future.
1820
return [(SmartTCPTransport, SmartTCPServer_for_testing)]