13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
52
# TODO: _translate_error should be on the client, not the transport because
53
# error coding is wire protocol specific.
55
# TODO: A plain integer from query_version is too simple; should give some
58
# TODO: Server should probably catch exceptions within itself and send them
59
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
60
# Also needs to somehow report protocol errors like bad requests. Need to
61
# consider how we'll handle error reporting, e.g. if we get halfway through a
62
# bulk transfer and then something goes wrong.
64
# TODO: Standard marker at start of request/response lines?
66
# TODO: Make each request and response self-validatable, e.g. with checksums.
68
# TODO: get/put objects could be changed to gradually read back the data as it
69
# comes across the network
71
# TODO: What should the server do if it hits an error and has to terminate?
73
# TODO: is it useful to allow multiple chunks in the bulk data?
75
# TODO: If we get an exception during transmission of bulk data we can't just
76
# emit the exception because it won't be seen.
77
# John proposes: I think it would be worthwhile to have a header on each
78
# chunk, that indicates it is another chunk. Then you can send an 'error'
79
# chunk as long as you finish the previous chunk.
81
# TODO: Clone method on Transport; should work up towards parent directory;
82
# unclear how this should be stored or communicated to the server... maybe
83
# just pass it on all relevant requests?
85
# TODO: Better name than clone() for changing between directories. How about
86
# open_dir or change_dir or chdir?
88
# TODO: Is it really good to have the notion of current directory within the
89
# connection? Perhaps all Transports should factor out a common connection
90
# from the thing that has the directory context?
92
# TODO: Pull more things common to sftp and ssh to a higher level.
94
# TODO: The server that manages a connection should be quite small and retain
95
# minimum state because each of the requests are supposed to be stateless.
96
# Then we can write another implementation that maps to http.
98
# TODO: What to do when a client connection is garbage collected? Maybe just
99
# abruptly drop the connection?
101
# TODO: Server in some cases will need to restrict access to files outside of
102
# a particular root directory. LocalTransport doesn't do anything to stop you
103
# ascending above the base directory, so we need to prevent paths
104
# containing '..' in either the server or transport layers. (Also need to
105
# consider what happens if someone creates a symlink pointing outside the
108
# TODO: Server should rebase absolute paths coming across the network to put
109
# them under the virtual root, if one is in use. LocalTransport currently
110
# doesn't do that; if you give it an absolute path it just uses it.
112
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
113
# urlescape them instead. Indeed possibly this should just literally be
116
# FIXME: This transport, with several others, has imperfect handling of paths
117
# within urls. It'd probably be better for ".." from a root to raise an error
118
# rather than return the same directory as we do at present.
120
# TODO: Rather than working at the Transport layer we want a Branch,
121
# Repository or BzrDir objects that talk to a server.
123
# TODO: Probably want some way for server commands to gradually produce body
124
# data rather than passing it as a string; they could perhaps pass an
125
# iterator-like callback that will gradually yield data; it probably needs a
126
# close() method that will always be closed to do any necessary cleanup.
128
# TODO: Split the actual smart server from the ssh encoding of it.
130
# TODO: Perhaps support file-level readwrite operations over the transport
133
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
134
# branch doing file-level operations.
136
# TODO: jam 20060915 _decode_tuple is acting directly on input over
137
# the socket, and it assumes everything is UTF8 sections separated
138
# by \001. Which means a request like '\002' Will abort the connection
139
# because of a UnicodeDecodeError. It does look like invalid data will
140
# kill the SmartStreamServer, but only with an abort + exception, and
141
# the overall server shouldn't die.
23
from __future__ import absolute_import
25
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
143
27
from cStringIO import StringIO
153
29
from bzrlib import (
161
from bzrlib.bundle.serializer import write_bundle
162
from bzrlib.trace import mutter
163
from bzrlib.transport import local
165
# must do this otherwise urllib can't parse the urls properly :(
166
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
167
transport.register_urlparse_netloc_protocol(scheme)
171
def _recv_tuple(from_file):
172
req_line = from_file.readline()
173
return _decode_tuple(req_line)
176
def _decode_tuple(req_line):
177
if req_line == None or req_line == '':
179
if req_line[-1] != '\n':
180
raise errors.SmartProtocolError("request %r not terminated" % req_line)
181
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
184
def _encode_tuple(args):
185
"""Encode the tuple args to a bytestream."""
186
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
189
class SmartProtocolBase(object):
190
"""Methods common to client and server"""
192
def _send_bulk_data(self, body):
193
"""Send chunked body data"""
194
assert isinstance(body, str)
195
bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
196
self._write_and_flush(bytes)
198
# TODO: this only actually accomodates a single block; possibly should support
200
def _recv_bulk(self):
201
chunk_len = self._in.readline()
203
chunk_len = int(chunk_len)
205
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
206
bulk = self._in.read(chunk_len)
207
if len(bulk) != chunk_len:
208
raise errors.SmartProtocolError("short read fetching bulk data chunk")
212
def _recv_tuple(self):
213
return _recv_tuple(self._in)
215
def _recv_trailer(self):
216
resp = self._recv_tuple()
217
if resp == ('done', ):
220
self._translate_error(resp)
222
def _serialise_offsets(self, offsets):
223
"""Serialise a readv offset list."""
225
for start, length in offsets:
226
txt.append('%d,%d' % (start, length))
227
return '\n'.join(txt)
229
def _write_and_flush(self, bytes):
230
"""Write bytes to self._out and flush it."""
231
# XXX: this will be inefficient. Just ask Robert.
232
self._out.write(bytes)
236
class SmartStreamServer(SmartProtocolBase):
237
"""Handles smart commands coming over a stream.
239
The stream may be a pipe connected to sshd, or a tcp socket, or an
240
in-process fifo for testing.
242
One instance is created for each connected client; it can serve multiple
243
requests in the lifetime of the connection.
245
The server passes requests through to an underlying backing transport,
246
which will typically be a LocalTransport looking at the server's filesystem.
249
def __init__(self, in_file, out_file, backing_transport):
250
"""Construct new server.
252
:param in_file: Python file from which requests can be read.
253
:param out_file: Python file to write responses.
254
:param backing_transport: Transport for the directory served.
258
self.smart_server = SmartServer(backing_transport)
259
# server can call back to us to get bulk data - this is not really
260
# ideal, they should get it per request instead
261
self.smart_server._recv_body = self._recv_bulk
263
def _recv_tuple(self):
264
"""Read a request from the client and return as a tuple.
266
Returns None at end of file (if the client closed the connection.)
268
return _recv_tuple(self._in)
270
def _send_tuple(self, args):
271
"""Send response header"""
272
return self._write_and_flush(_encode_tuple(args))
274
def _send_error_and_disconnect(self, exception):
275
self._send_tuple(('error', str(exception)))
279
def _serve_one_request(self):
280
"""Read one request from input, process, send back a response.
282
:return: False if the server should terminate, otherwise None.
284
req_args = self._recv_tuple()
286
# client closed connection
287
return False # shutdown server
289
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
290
self._send_tuple(response.args)
291
if response.body is not None:
292
self._send_bulk_data(response.body)
293
except KeyboardInterrupt:
296
# everything else: pass to client, flush, and quit
297
self._send_error_and_disconnect(e)
301
"""Serve requests until the client disconnects."""
302
# Keep a reference to stderr because the sys module's globals get set to
303
# None during interpreter shutdown.
304
from sys import stderr
306
while self._serve_one_request() != False:
309
stderr.write("%s terminating on exception %s\n" % (self, e))
313
class SmartServerResponse(object):
314
"""Response generated by SmartServer."""
316
def __init__(self, args, body=None):
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
321
# for delivering the data for a request. This could be done with as the
322
# StreamServer, though that would create conflation between request and response
323
# which may be undesirable.
326
class SmartServer(object):
327
"""Protocol logic for smart server.
329
This doesn't handle serialization at all, it just processes requests and
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
340
# TODO: Better way of representing the body for commands that take it,
341
# and allow it to be streamed into the server.
343
def __init__(self, backing_transport):
344
self._backing_transport = backing_transport
347
"""Answer a version request with my version."""
348
return SmartServerResponse(('ok', '1'))
350
def do_has(self, relpath):
351
r = self._backing_transport.has(relpath) and 'yes' or 'no'
352
return SmartServerResponse((r,))
354
def do_get(self, relpath):
355
backing_bytes = self._backing_transport.get_bytes(relpath)
356
return SmartServerResponse(('ok',), backing_bytes)
358
def _deserialise_optional_mode(self, mode):
359
# XXX: FIXME this should be on the protocol object.
365
def do_append(self, relpath, mode):
366
old_length = self._backing_transport.append_bytes(
367
relpath, self._recv_body(), self._deserialise_optional_mode(mode))
368
return SmartServerResponse(('appended', '%d' % old_length))
370
def do_delete(self, relpath):
371
self._backing_transport.delete(relpath)
373
def do_iter_files_recursive(self, abspath):
374
# XXX: the path handling needs some thought.
375
#relpath = self._backing_transport.relpath(abspath)
376
transport = self._backing_transport.clone(abspath)
377
filenames = transport.iter_files_recursive()
378
return SmartServerResponse(('names',) + tuple(filenames))
380
def do_list_dir(self, relpath):
381
filenames = self._backing_transport.list_dir(relpath)
382
return SmartServerResponse(('names',) + tuple(filenames))
384
def do_mkdir(self, relpath, mode):
385
self._backing_transport.mkdir(relpath,
386
self._deserialise_optional_mode(mode))
388
def do_move(self, rel_from, rel_to):
389
self._backing_transport.move(rel_from, rel_to)
391
def do_put(self, relpath, mode):
392
self._backing_transport.put_bytes(relpath,
394
self._deserialise_optional_mode(mode))
396
def _deserialise_offsets(self, text):
397
# XXX: FIXME this should be on the protocol object.
399
for line in text.split('\n'):
402
start, length = line.split(',')
403
offsets.append((int(start), int(length)))
406
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
407
create_parent_dir = (create_parent == 'T')
408
self._backing_transport.put_bytes_non_atomic(relpath,
410
mode=self._deserialise_optional_mode(mode),
411
create_parent_dir=create_parent_dir,
412
dir_mode=self._deserialise_optional_mode(dir_mode))
414
def do_readv(self, relpath):
415
offsets = self._deserialise_offsets(self._recv_body())
416
backing_bytes = ''.join(bytes for offset, bytes in
417
self._backing_transport.readv(relpath, offsets))
418
return SmartServerResponse(('readv',), backing_bytes)
420
def do_rename(self, rel_from, rel_to):
421
self._backing_transport.rename(rel_from, rel_to)
423
def do_rmdir(self, relpath):
424
self._backing_transport.rmdir(relpath)
426
def do_stat(self, relpath):
427
stat = self._backing_transport.stat(relpath)
428
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
430
def do_get_bundle(self, path, revision_id):
431
# open transport relative to our base
432
t = self._backing_transport.clone(path)
433
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
434
repo = control.open_repository()
435
tmpf = tempfile.TemporaryFile()
436
base_revision = revision.NULL_REVISION
437
write_bundle(repo, revision_id, base_revision, tmpf)
439
return SmartServerResponse((), tmpf.read())
441
def dispatch_command(self, cmd, args):
442
func = getattr(self, 'do_' + cmd, None)
444
raise errors.SmartProtocolError("bad request %r" % (cmd,))
448
result = SmartServerResponse(('ok',))
450
except errors.NoSuchFile, e:
451
return SmartServerResponse(('NoSuchFile', e.path))
452
except errors.FileExists, e:
453
return SmartServerResponse(('FileExists', e.path))
454
except errors.DirectoryNotEmpty, e:
455
return SmartServerResponse(('DirectoryNotEmpty', e.path))
456
except errors.ShortReadvError, e:
457
return SmartServerResponse(('ShortReadvError',
458
e.path, str(e.offset), str(e.length), str(e.actual)))
459
except UnicodeError, e:
460
# If it is a DecodeError, than most likely we are starting
461
# with a plain string
462
str_or_unicode = e.object
463
if isinstance(str_or_unicode, unicode):
464
val = u'u:' + str_or_unicode
466
val = u's:' + str_or_unicode.encode('base64')
467
# This handles UnicodeEncodeError or UnicodeDecodeError
468
return SmartServerResponse((e.__class__.__name__,
469
e.encoding, val, str(e.start), str(e.end), e.reason))
470
except errors.TransportNotPossible, e:
471
if e.msg == "readonly transport":
472
return SmartServerResponse(('ReadOnlyError', ))
477
class SmartTCPServer(object):
478
"""Listens on a TCP socket and accepts connections from smart clients"""
480
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
481
"""Construct a new server.
483
To actually start it running, call either start_background_thread or
486
:param host: Name of the interface to listen on.
487
:param port: TCP port to listen on, or 0 to allocate a transient port.
489
if backing_transport is None:
490
backing_transport = memory.MemoryTransport()
491
self._server_socket = socket.socket()
492
self._server_socket.bind((host, port))
493
self.port = self._server_socket.getsockname()[1]
494
self._server_socket.listen(1)
495
self._server_socket.settimeout(1)
496
self.backing_transport = backing_transport
499
# let connections timeout so that we get a chance to terminate
500
# Keep a reference to the exceptions we want to catch because the socket
501
# module's globals get set to None during interpreter shutdown.
502
from socket import timeout as socket_timeout
503
from socket import error as socket_error
504
self._should_terminate = False
505
while not self._should_terminate:
507
self.accept_and_serve()
508
except socket_timeout:
509
# just check if we're asked to stop
511
except socket_error, e:
512
trace.warning("client disconnected: %s", e)
516
"""Return the url of the server"""
517
return "bzr://%s:%d/" % self._server_socket.getsockname()
519
def accept_and_serve(self):
520
conn, client_addr = self._server_socket.accept()
521
# For WIN32, where the timeout value from the listening socket
522
# propogates to the newly accepted socket.
523
conn.setblocking(True)
524
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
525
from_client = conn.makefile('r')
526
to_client = conn.makefile('w')
527
handler = SmartStreamServer(from_client, to_client,
528
self.backing_transport)
529
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
530
connection_thread.setDaemon(True)
531
connection_thread.start()
533
def start_background_thread(self):
534
self._server_thread = threading.Thread(None,
536
name='server-' + self.get_url())
537
self._server_thread.setDaemon(True)
538
self._server_thread.start()
540
def stop_background_thread(self):
541
self._should_terminate = True
542
# self._server_socket.close()
543
# we used to join the thread, but it's not really necessary; it will
545
## self._server_thread.join()
548
class SmartTCPServer_for_testing(SmartTCPServer):
549
"""Server suitable for use by transport tests.
551
This server is backed by the process's cwd.
555
self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
556
# The server is set up by default like for ssh access: the client
557
# passes filesystem-absolute paths; therefore the server must look
558
# them up relative to the root directory. it might be better to act
559
# a public server and have the server rewrite paths into the test
561
SmartTCPServer.__init__(self,
562
transport.get_transport(urlutils.local_path_to_url('/')))
565
"""Set up server for testing"""
566
self.start_background_thread()
569
self.stop_background_thread()
572
"""Return the url of the server"""
573
host, port = self._server_socket.getsockname()
574
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
576
def get_bogus_url(self):
577
"""Return a URL which will fail to connect"""
578
return 'bzr://127.0.0.1:1/'
581
class SmartStat(object):
38
from bzrlib.smart import client, medium
41
class _SmartStat(object):
583
43
def __init__(self, size, mode):
584
44
self.st_size = size
585
45
self.st_mode = mode
588
class SmartTransport(transport.Transport):
48
class RemoteTransport(transport.ConnectedTransport):
589
49
"""Connection to a smart server.
591
The connection holds references to pipes that can be used to send requests
51
The connection holds references to the medium that can be used to send
52
requests to the server.
594
54
The connection has a notion of the current directory to which it's
595
55
connected; this is incorporated in filenames passed to the server.
597
This supports some higher-level RPC operations and can also be treated
57
This supports some higher-level RPC operations and can also be treated
598
58
like a Transport to do file-like operations.
600
The connection can be made over a tcp socket, or (in future) an ssh pipe
601
or a series of http requests. There are concrete subclasses for each
602
type: SmartTCPTransport, etc.
60
The connection can be made over a tcp socket, an ssh pipe or a series of
61
http requests. There are concrete subclasses for each type:
62
RemoteTCPTransport, etc.
605
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
65
# When making a readv request, cap it at requesting 5MB of data
66
_max_readv_bytes = 5*1024*1024
68
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
606
69
# responsibilities: Put those on SmartClient or similar. This is vital for
607
70
# the ability to support multiple versions of the smart protocol over time:
608
# SmartTransport is an adapter from the Transport object model to the
71
# RemoteTransport is an adapter from the Transport object model to the
609
72
# SmartClient model, not an encoder.
611
def __init__(self, url, clone_from=None, client=None):
74
# FIXME: the medium parameter should be private, only the tests requires
75
# it. It may be even clearer to define a TestRemoteTransport that handles
76
# the specific cases of providing a _client and/or a _medium, and leave
77
# RemoteTransport as an abstract class.
78
def __init__(self, url, _from_transport=None, medium=None, _client=None):
614
:param client: ignored when clone_from is not None.
616
### Technically super() here is faulty because Transport's __init__
617
### fails to take 2 parameters, and if super were to choose a silly
618
### initialisation order things would blow up.
619
if not url.endswith('/'):
621
super(SmartTransport, self).__init__(url)
622
self._scheme, self._username, self._password, self._host, self._port, self._path = \
623
transport.split_url(url)
624
if clone_from is None:
626
self._client = SmartStreamClient(self._connect_to_server)
628
self._client = client
630
# credentials may be stripped from the base in some circumstances
631
# as yet to be clearly defined or documented, so copy them.
632
self._username = clone_from._username
633
# reuse same connection
634
self._client = clone_from._client
636
def abspath(self, relpath):
637
"""Return the full url to the given relative path.
639
@param relpath: the relative path or path components
640
@type relpath: str or list
642
return self._unparse_url(self._remote_path(relpath))
644
def clone(self, relative_url):
645
"""Make a new SmartTransport related to me, sharing the same connection.
647
This essentially opens a handle on a different remote directory.
649
if relative_url is None:
650
return self.__class__(self.base, self)
652
return self.__class__(self.abspath(relative_url), self)
81
:param _from_transport: Another RemoteTransport instance that this
82
one is being cloned from. Attributes such as the medium will
85
:param medium: The medium to use for this RemoteTransport. If None,
86
the medium from the _from_transport is shared. If both this
87
and _from_transport are None, a new medium will be built.
88
_from_transport and medium cannot both be specified.
90
:param _client: Override the _SmartClient used by this transport. This
91
should only be used for testing purposes; normally this is
92
determined from the medium.
94
super(RemoteTransport, self).__init__(
95
url, _from_transport=_from_transport)
97
# The medium is the connection, except when we need to share it with
98
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
99
# what we want to share is really the shared connection.
101
if (_from_transport is not None
102
and isinstance(_from_transport, RemoteTransport)):
103
_client = _from_transport._client
104
elif _from_transport is None:
105
# If no _from_transport is specified, we need to intialize the
109
medium, credentials = self._build_medium()
110
if 'hpss' in debug.debug_flags:
111
trace.mutter('hpss: Built a new medium: %s',
112
medium.__class__.__name__)
113
self._shared_connection = transport._SharedConnection(medium,
117
# No medium was specified, so share the medium from the
119
medium = self._shared_connection.connection
121
raise AssertionError(
122
"Both _from_transport (%r) and medium (%r) passed to "
123
"RemoteTransport.__init__, but these parameters are mutally "
124
"exclusive." % (_from_transport, medium))
127
self._client = client._SmartClient(medium)
129
self._client = _client
131
def _build_medium(self):
132
"""Create the medium if _from_transport does not provide one.
134
The medium is analogous to the connection for ConnectedTransport: it
135
allows connection sharing.
140
def _report_activity(self, bytes, direction):
141
"""See Transport._report_activity.
143
Does nothing; the smart medium will report activity triggered by a
654
148
def is_readonly(self):
655
149
"""Smart server transport can do read/write file operations."""
151
resp = self._call2('Transport.is_readonly')
152
except errors.UnknownSmartMethod:
153
# XXX: nasty hack: servers before 0.16 don't have a
154
# 'Transport.is_readonly' verb, so we do what clients before 0.16
157
if resp == ('yes', ):
159
elif resp == ('no', ):
162
raise errors.UnexpectedSmartServerResponse(resp)
658
164
def get_smart_client(self):
661
def _unparse_url(self, path):
662
"""Return URL for a path.
165
return self._get_connection()
664
:see: SFTPUrlHandling._unparse_url
666
# TODO: Eventually it should be possible to unify this with
667
# SFTPUrlHandling._unparse_url?
670
path = urllib.quote(path)
671
netloc = urllib.quote(self._host)
672
if self._username is not None:
673
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
674
if self._port is not None:
675
netloc = '%s:%d' % (netloc, self._port)
676
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
167
def get_smart_medium(self):
168
return self._get_connection()
678
170
def _remote_path(self, relpath):
679
171
"""Returns the Unicode version of the absolute path for relpath."""
680
return self._combine_paths(self._path, relpath)
172
return urlutils.URL._combine_paths(self._parsed_url.path, relpath)
174
def _call(self, method, *args):
175
resp = self._call2(method, *args)
176
self._ensure_ok(resp)
178
def _call2(self, method, *args):
179
"""Call a method on the remote server."""
181
return self._client.call(method, *args)
182
except errors.ErrorFromSmartServer, err:
183
# The first argument, if present, is always a path.
185
context = {'relpath': args[0]}
188
self._translate_error(err, **context)
190
def _call_with_body_bytes(self, method, args, body):
191
"""Call a method on the remote server with body bytes."""
193
return self._client.call_with_body_bytes(method, args, body)
194
except errors.ErrorFromSmartServer, err:
195
# The first argument, if present, is always a path.
197
context = {'relpath': args[0]}
200
self._translate_error(err, **context)
682
202
def has(self, relpath):
683
203
"""Indicate whether a remote file of the given name exists or not.
685
205
:see: Transport.has()
687
resp = self._client._call('has', self._remote_path(relpath))
207
resp = self._call2('has', self._remote_path(relpath))
688
208
if resp == ('yes', ):
690
210
elif resp == ('no', ):
693
self._translate_error(resp)
213
raise errors.UnexpectedSmartServerResponse(resp)
695
215
def get(self, relpath):
696
216
"""Return file-like object reading the contents of a remote file.
698
218
:see: Transport.get_bytes()/get_file()
220
return StringIO(self.get_bytes(relpath))
222
def get_bytes(self, relpath):
700
223
remote = self._remote_path(relpath)
701
resp = self._client._call('get', remote)
225
resp, response_handler = self._client.call_expecting_body('get', remote)
226
except errors.ErrorFromSmartServer, err:
227
self._translate_error(err, relpath)
702
228
if resp != ('ok', ):
703
self._translate_error(resp, relpath)
704
return StringIO(self._client._recv_bulk())
229
response_handler.cancel_read_body()
230
raise errors.UnexpectedSmartServerResponse(resp)
231
return response_handler.read_body_bytes()
706
233
def _serialise_optional_mode(self, mode):
762
300
def append_file(self, relpath, from_file, mode=None):
763
301
return self.append_bytes(relpath, from_file.read(), mode)
765
303
def append_bytes(self, relpath, bytes, mode=None):
766
resp = self._client._call_with_upload(
304
resp = self._call_with_body_bytes(
768
306
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
770
308
if resp[0] == 'appended':
771
309
return int(resp[1])
772
self._translate_error(resp)
310
raise errors.UnexpectedSmartServerResponse(resp)
774
312
def delete(self, relpath):
775
resp = self._client._call('delete', self._remote_path(relpath))
776
self._translate_error(resp)
778
def readv(self, relpath, offsets):
313
resp = self._call2('delete', self._remote_path(relpath))
314
self._ensure_ok(resp)
316
def external_url(self):
317
"""See bzrlib.transport.Transport.external_url."""
318
# the external path for RemoteTransports is the base
321
def recommended_page_size(self):
322
"""Return the recommended page size for this transport."""
325
def _readv(self, relpath, offsets):
782
329
offsets = list(offsets)
784
331
sorted_offsets = sorted(offsets)
785
# turn the list of offsets into a stack
786
offset_stack = iter(offsets)
787
cur_offset_and_size = offset_stack.next()
788
332
coalesced = list(self._coalesce_offsets(sorted_offsets,
789
333
limit=self._max_readv_combine,
790
fudge_factor=self._bytes_to_read_before_seek))
793
resp = self._client._call_with_upload(
795
(self._remote_path(relpath),),
796
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
798
if resp[0] != 'readv':
799
# This should raise an exception
800
self._translate_error(resp)
803
data = self._client._recv_bulk()
334
fudge_factor=self._bytes_to_read_before_seek,
335
max_size=self._max_readv_bytes))
337
# now that we've coallesced things, avoid making enormous requests
342
if c.length + cur_len > self._max_readv_bytes:
343
requests.append(cur_request)
347
cur_request.append(c)
350
requests.append(cur_request)
351
if 'hpss' in debug.debug_flags:
352
trace.mutter('%s.readv %s offsets => %s coalesced'
353
' => %s requests (%s)',
354
self.__class__.__name__, len(offsets), len(coalesced),
355
len(requests), sum(map(len, requests)))
804
356
# Cache the results, but only until they have been fulfilled
358
# turn the list of offsets into a single stack to iterate
359
offset_stack = iter(offsets)
360
# using a list so it can be modified when passing down and coming back
361
next_offset = [offset_stack.next()]
362
for cur_request in requests:
364
result = self._client.call_with_body_readv_array(
365
('readv', self._remote_path(relpath),),
366
[(c.start, c.length) for c in cur_request])
367
resp, response_handler = result
368
except errors.ErrorFromSmartServer, err:
369
self._translate_error(err, relpath)
371
if resp[0] != 'readv':
372
# This should raise an exception
373
response_handler.cancel_read_body()
374
raise errors.UnexpectedSmartServerResponse(resp)
376
for res in self._handle_response(offset_stack, cur_request,
382
def _handle_response(self, offset_stack, coalesced, response_handler,
383
data_map, next_offset):
384
cur_offset_and_size = next_offset[0]
385
# FIXME: this should know how many bytes are needed, for clarity.
386
data = response_handler.read_body_bytes()
806
388
for c_offset in coalesced:
807
389
if len(data) < c_offset.length:
808
390
raise errors.ShortReadvError(relpath, c_offset.start,
809
391
c_offset.length, actual=len(data))
810
392
for suboffset, subsize in c_offset.ranges:
811
393
key = (c_offset.start+suboffset, subsize)
812
data_map[key] = data[suboffset:suboffset+subsize]
813
data = data[c_offset.length:]
394
this_data = data[data_offset+suboffset:
395
data_offset+suboffset+subsize]
396
# Special case when the data is in-order, rather than packing
397
# into a map and then back out again. Benchmarking shows that
398
# this has 100% hit rate, but leave in the data_map work just
400
# TODO: Could we get away with using buffer() to avoid the
401
# memory copy? Callers would need to realize they may
402
# not have a real string.
403
if key == cur_offset_and_size:
404
yield cur_offset_and_size[0], this_data
405
cur_offset_and_size = next_offset[0] = offset_stack.next()
407
data_map[key] = this_data
408
data_offset += c_offset.length
815
410
# Now that we've read some data, see if we can yield anything back
816
411
while cur_offset_and_size in data_map:
817
412
this_data = data_map.pop(cur_offset_and_size)
818
413
yield cur_offset_and_size[0], this_data
819
cur_offset_and_size = offset_stack.next()
414
cur_offset_and_size = next_offset[0] = offset_stack.next()
821
416
def rename(self, rel_from, rel_to):
823
418
self._remote_path(rel_from),
824
419
self._remote_path(rel_to))
826
421
def move(self, rel_from, rel_to):
828
423
self._remote_path(rel_from),
829
424
self._remote_path(rel_to))
831
426
def rmdir(self, relpath):
832
427
resp = self._call('rmdir', self._remote_path(relpath))
834
def _call(self, method, *args):
835
resp = self._client._call(method, *args)
836
self._translate_error(resp)
838
def _translate_error(self, resp, orig_path=None):
839
"""Raise an exception from a response"""
846
elif what == 'NoSuchFile':
847
if orig_path is not None:
848
error_path = orig_path
851
raise errors.NoSuchFile(error_path)
852
elif what == 'error':
853
raise errors.SmartProtocolError(unicode(resp[1]))
854
elif what == 'FileExists':
855
raise errors.FileExists(resp[1])
856
elif what == 'DirectoryNotEmpty':
857
raise errors.DirectoryNotEmpty(resp[1])
858
elif what == 'ShortReadvError':
859
raise errors.ShortReadvError(resp[1], int(resp[2]),
860
int(resp[3]), int(resp[4]))
861
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
862
encoding = str(resp[1]) # encoding must always be a string
866
reason = str(resp[5]) # reason must always be a string
867
if val.startswith('u:'):
869
elif val.startswith('s:'):
870
val = val[2:].decode('base64')
871
if what == 'UnicodeDecodeError':
872
raise UnicodeDecodeError(encoding, val, start, end, reason)
873
elif what == 'UnicodeEncodeError':
874
raise UnicodeEncodeError(encoding, val, start, end, reason)
875
elif what == "ReadOnlyError":
876
raise errors.TransportNotPossible('readonly transport')
878
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
880
def _send_tuple(self, args):
881
self._client._send_tuple(args)
883
def _recv_tuple(self):
884
return self._client._recv_tuple()
429
def _ensure_ok(self, resp):
431
raise errors.UnexpectedSmartServerResponse(resp)
433
def _translate_error(self, err, relpath=None):
434
remote._translate_error(err, path=relpath)
886
436
def disconnect(self):
887
self._client.disconnect()
889
def delete_tree(self, relpath):
890
raise errors.TransportNotPossible('readonly transport')
437
m = self.get_smart_medium()
892
441
def stat(self, relpath):
893
resp = self._client._call('stat', self._remote_path(relpath))
442
resp = self._call2('stat', self._remote_path(relpath))
894
443
if resp[0] == 'stat':
895
return SmartStat(int(resp[1]), int(resp[2], 8))
897
self._translate_error(resp)
444
return _SmartStat(int(resp[1]), int(resp[2], 8))
445
raise errors.UnexpectedSmartServerResponse(resp)
899
447
## def lock_read(self, relpath):
900
448
## """Lock the given file for shared (read) access.
915
463
def list_dir(self, relpath):
916
resp = self._client._call('list_dir',
917
self._remote_path(relpath))
464
resp = self._call2('list_dir', self._remote_path(relpath))
918
465
if resp[0] == 'names':
919
466
return [name.encode('ascii') for name in resp[1:]]
921
self._translate_error(resp)
467
raise errors.UnexpectedSmartServerResponse(resp)
923
469
def iter_files_recursive(self):
924
resp = self._client._call('iter_files_recursive',
925
self._remote_path(''))
470
resp = self._call2('iter_files_recursive', self._remote_path(''))
926
471
if resp[0] == 'names':
473
raise errors.UnexpectedSmartServerResponse(resp)
476
class RemoteTCPTransport(RemoteTransport):
477
"""Connection to smart server over plain tcp.
479
This is essentially just a factory to get 'RemoteTransport(url,
480
SmartTCPClientMedium).
483
def _build_medium(self):
484
client_medium = medium.SmartTCPClientMedium(
485
self._parsed_url.host, self._parsed_url.port, self.base)
486
return client_medium, None
489
class RemoteTCPTransportV2Only(RemoteTransport):
490
"""Connection to smart server over plain tcp with the client hard-coded to
491
assume protocol v2 and remote server version <= 1.6.
493
This should only be used for testing.
496
def _build_medium(self):
497
client_medium = medium.SmartTCPClientMedium(
498
self._parsed_url.host, self._parsed_url.port, self.base)
499
client_medium._protocol_version = 2
500
client_medium._remember_remote_is_before((1, 6))
501
return client_medium, None
504
class RemoteSSHTransport(RemoteTransport):
505
"""Connection to smart server over SSH.
507
This is essentially just a factory to get 'RemoteTransport(url,
508
SmartSSHClientMedium).
511
def _build_medium(self):
512
location_config = config.LocationConfig(self.base)
513
bzr_remote_path = location_config.get_bzr_remote_path()
514
user = self._parsed_url.user
516
auth = config.AuthenticationConfig()
517
user = auth.get_user('ssh', self._parsed_url.host,
518
self._parsed_url.port)
519
ssh_params = medium.SSHParams(self._parsed_url.host,
520
self._parsed_url.port, user, self._parsed_url.password,
522
client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
523
return client_medium, (user, self._parsed_url.password)
526
class RemoteHTTPTransport(RemoteTransport):
527
"""Just a way to connect between a bzr+http:// url and http://.
529
This connection operates slightly differently than the RemoteSSHTransport.
530
It uses a plain http:// transport underneath, which defines what remote
531
.bzr/smart URL we are connected to. From there, all paths that are sent are
532
sent as relative paths, this way, the remote side can properly
533
de-reference them, since it is likely doing rewrite rules to translate an
534
HTTP path into a local path.
537
def __init__(self, base, _from_transport=None, http_transport=None):
538
if http_transport is None:
539
# FIXME: the password may be lost here because it appears in the
540
# url only for an intial construction (when the url came from the
542
http_url = base[len('bzr+'):]
543
self._http_transport = transport.get_transport_from_url(http_url)
929
self._translate_error(resp)
932
class SmartStreamClient(SmartProtocolBase):
933
"""Connection to smart server over two streams"""
935
def __init__(self, connect_func):
936
self._connect_func = connect_func
937
self._connected = False
942
def _ensure_connection(self):
943
if not self._connected:
944
self._in, self._out = self._connect_func()
945
self._connected = True
947
def _send_tuple(self, args):
948
self._ensure_connection()
949
return self._write_and_flush(_encode_tuple(args))
951
def _send_bulk_data(self, body):
952
self._ensure_connection()
953
SmartProtocolBase._send_bulk_data(self, body)
955
def _recv_bulk(self):
956
self._ensure_connection()
957
return SmartProtocolBase._recv_bulk(self)
959
def _recv_tuple(self):
960
self._ensure_connection()
961
return SmartProtocolBase._recv_tuple(self)
963
def _recv_trailer(self):
964
self._ensure_connection()
965
return SmartProtocolBase._recv_trailer(self)
967
def disconnect(self):
968
"""Close connection to the server"""
973
def _call(self, *args):
974
self._send_tuple(args)
975
return self._recv_tuple()
977
def _call_with_upload(self, method, args, body):
978
"""Call an rpc, supplying bulk upload data.
980
:param method: method name to call
981
:param args: parameter args tuple
982
:param body: upload body as a byte string
545
self._http_transport = http_transport
546
super(RemoteHTTPTransport, self).__init__(
547
base, _from_transport=_from_transport)
549
def _build_medium(self):
550
# We let http_transport take care of the credentials
551
return self._http_transport.get_smart_medium(), None
553
def _remote_path(self, relpath):
554
"""After connecting, HTTP Transport only deals in relative URLs."""
555
# Adjust the relpath based on which URL this smart transport is
557
http_base = urlutils.normalize_url(self.get_smart_medium().base)
558
url = urlutils.join(self.base[len('bzr+'):], relpath)
559
url = urlutils.normalize_url(url)
560
return urlutils.relative_url(http_base, url)
562
def clone(self, relative_url):
563
"""Make a new RemoteHTTPTransport related to me.
565
This is re-implemented rather than using the default
566
RemoteTransport.clone() because we must be careful about the underlying
569
Also, the cloned smart transport will POST to the same .bzr/smart
570
location as this transport (although obviously the relative paths in the
571
smart requests may be different). This is so that the server doesn't
572
have to handle .bzr/smart requests at arbitrary places inside .bzr
573
directories, just at the initial URL the user uses.
984
self._send_tuple((method,) + args)
985
self._send_bulk_data(body)
986
return self._recv_tuple()
988
def query_version(self):
989
"""Return protocol version number of the server."""
990
# XXX: should make sure it's empty
991
self._send_tuple(('hello',))
992
resp = self._recv_tuple()
993
if resp == ('ok', '1'):
996
raise errors.SmartProtocolError("bad response %r" % (resp,))
999
class SmartTCPTransport(SmartTransport):
1000
"""Connection to smart server over plain tcp"""
1002
def __init__(self, url, clone_from=None):
1003
super(SmartTCPTransport, self).__init__(url, clone_from)
1005
self._port = int(self._port)
1006
except (ValueError, TypeError), e:
1007
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1010
def _connect_to_server(self):
1011
self._socket = socket.socket()
1012
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1013
result = self._socket.connect_ex((self._host, int(self._port)))
1015
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1016
(self._host, self._port, os.strerror(result)))
1017
# TODO: May be more efficient to just treat them as sockets
1018
# throughout? But what about pipes to ssh?...
1019
to_server = self._socket.makefile('w')
1020
from_server = self._socket.makefile('r')
1021
return from_server, to_server
1023
def disconnect(self):
1024
super(SmartTCPTransport, self).disconnect()
1025
# XXX: Is closing the socket as well as closing the files really
1027
if self._socket is not None:
1028
self._socket.close()
1031
class SmartSSHTransport(SmartTransport):
1032
"""Connection to smart server over SSH."""
1034
def __init__(self, url, clone_from=None):
1035
# TODO: all this probably belongs in the parent class.
1036
super(SmartSSHTransport, self).__init__(url, clone_from)
1038
if self._port is not None:
1039
self._port = int(self._port)
1040
except (ValueError, TypeError), e:
1041
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1043
def _connect_to_server(self):
1044
from bzrlib.transport import ssh
1045
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1046
vendor = ssh._get_ssh_vendor()
1047
self._ssh_connection = vendor.connect_ssh(self._username,
1048
self._password, self._host, self._port,
1049
command=[executable, 'serve', '--inet', '--directory=/',
1051
return self._ssh_connection.get_filelike_channels()
1053
def disconnect(self):
1054
super(SmartSSHTransport, self).disconnect()
1055
self._ssh_connection.close()
576
abs_url = self.abspath(relative_url)
579
return RemoteHTTPTransport(abs_url,
580
_from_transport=self,
581
http_transport=self._http_transport)
583
def _redirected_to(self, source, target):
584
"""See transport._redirected_to"""
585
redirected = self._http_transport._redirected_to(source, target)
586
if (redirected is not None
587
and isinstance(redirected, type(self._http_transport))):
588
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
589
http_transport=redirected)
591
# Either None or a transport for a different protocol
595
class HintingSSHTransport(transport.Transport):
596
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
598
def __init__(self, url):
599
raise errors.UnsupportedProtocol(url,
600
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
1058
603
def get_test_permutations():
1059
"""Return (transport, server) permutations for testing"""
1060
return [(SmartTCPTransport, SmartTCPServer_for_testing)]
604
"""Return (transport, server) permutations for testing."""
605
### We may need a little more test framework support to construct an
606
### appropriate RemoteTransport in the future.
607
from bzrlib.tests import test_server
608
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]