13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
23
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
52
# TODO: _translate_error should be on the client, not the transport because
53
# error coding is wire protocol specific.
55
# TODO: A plain integer from query_version is too simple; should give some
58
# TODO: Server should probably catch exceptions within itself and send them
59
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
60
# Also needs to somehow report protocol errors like bad requests. Need to
61
# consider how we'll handle error reporting, e.g. if we get halfway through a
62
# bulk transfer and then something goes wrong.
64
# TODO: Standard marker at start of request/response lines?
66
# TODO: Make each request and response self-validatable, e.g. with checksums.
68
# TODO: get/put objects could be changed to gradually read back the data as it
69
# comes across the network
71
# TODO: What should the server do if it hits an error and has to terminate?
73
# TODO: is it useful to allow multiple chunks in the bulk data?
75
# TODO: If we get an exception during transmission of bulk data we can't just
76
# emit the exception because it won't be seen.
77
# John proposes: I think it would be worthwhile to have a header on each
78
# chunk, that indicates it is another chunk. Then you can send an 'error'
79
# chunk as long as you finish the previous chunk.
81
# TODO: Clone method on Transport; should work up towards parent directory;
82
# unclear how this should be stored or communicated to the server... maybe
83
# just pass it on all relevant requests?
85
# TODO: Better name than clone() for changing between directories. How about
86
# open_dir or change_dir or chdir?
88
# TODO: Is it really good to have the notion of current directory within the
89
# connection? Perhaps all Transports should factor out a common connection
90
# from the thing that has the directory context?
92
# TODO: Pull more things common to sftp and ssh to a higher level.
94
# TODO: The server that manages a connection should be quite small and retain
95
# minimum state because each of the requests are supposed to be stateless.
96
# Then we can write another implementation that maps to http.
98
# TODO: What to do when a client connection is garbage collected? Maybe just
99
# abruptly drop the connection?
101
# TODO: Server in some cases will need to restrict access to files outside of
102
# a particular root directory. LocalTransport doesn't do anything to stop you
103
# ascending above the base directory, so we need to prevent paths
104
# containing '..' in either the server or transport layers. (Also need to
105
# consider what happens if someone creates a symlink pointing outside the
108
# TODO: Server should rebase absolute paths coming across the network to put
109
# them under the virtual root, if one is in use. LocalTransport currently
110
# doesn't do that; if you give it an absolute path it just uses it.
112
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
113
# urlescape them instead. Indeed possibly this should just literally be
116
# FIXME: This transport, with several others, has imperfect handling of paths
117
# within urls. It'd probably be better for ".." from a root to raise an error
118
# rather than return the same directory as we do at present.
120
# TODO: Rather than working at the Transport layer we want a Branch,
121
# Repository or BzrDir objects that talk to a server.
123
# TODO: Probably want some way for server commands to gradually produce body
124
# data rather than passing it as a string; they could perhaps pass an
125
# iterator-like callback that will gradually yield data; it probably needs a
126
# close() method that will always be closed to do any necessary cleanup.
128
# TODO: Split the actual smart server from the ssh encoding of it.
130
# TODO: Perhaps support file-level readwrite operations over the transport
133
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
134
# branch doing file-level operations.
136
# TODO: jam 20060915 _decode_tuple is acting directly on input over
137
# the socket, and it assumes everything is UTF8 sections separated
138
# by \001. Which means a request like '\002' Will abort the connection
139
# because of a UnicodeDecodeError. It does look like invalid data will
140
# kill the SmartStreamServer, but only with an abort + exception, and
141
# the overall server shouldn't die.
25
143
from cStringIO import StringIO
27
153
from bzrlib import (
36
from bzrlib.smart import client, medium
37
from bzrlib.symbol_versioning import (
42
class _SmartStat(object):
161
from bzrlib.bundle.serializer import write_bundle
162
from bzrlib.trace import mutter
163
from bzrlib.transport import local
165
# must do this otherwise urllib can't parse the urls properly :(
166
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
167
transport.register_urlparse_netloc_protocol(scheme)
171
def _recv_tuple(from_file):
172
req_line = from_file.readline()
173
return _decode_tuple(req_line)
176
def _decode_tuple(req_line):
177
if req_line == None or req_line == '':
179
if req_line[-1] != '\n':
180
raise errors.SmartProtocolError("request %r not terminated" % req_line)
181
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
184
def _encode_tuple(args):
185
"""Encode the tuple args to a bytestream."""
186
return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
189
class SmartProtocolBase(object):
190
"""Methods common to client and server"""
192
def _send_bulk_data(self, body):
193
"""Send chunked body data"""
194
assert isinstance(body, str)
195
bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
196
self._write_and_flush(bytes)
198
# TODO: this only actually accomodates a single block; possibly should support
200
def _recv_bulk(self):
201
chunk_len = self._in.readline()
203
chunk_len = int(chunk_len)
205
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
206
bulk = self._in.read(chunk_len)
207
if len(bulk) != chunk_len:
208
raise errors.SmartProtocolError("short read fetching bulk data chunk")
212
def _recv_tuple(self):
213
return _recv_tuple(self._in)
215
def _recv_trailer(self):
216
resp = self._recv_tuple()
217
if resp == ('done', ):
220
self._translate_error(resp)
222
def _serialise_offsets(self, offsets):
223
"""Serialise a readv offset list."""
225
for start, length in offsets:
226
txt.append('%d,%d' % (start, length))
227
return '\n'.join(txt)
229
def _write_and_flush(self, bytes):
230
"""Write bytes to self._out and flush it."""
231
# XXX: this will be inefficient. Just ask Robert.
232
self._out.write(bytes)
236
class SmartStreamServer(SmartProtocolBase):
237
"""Handles smart commands coming over a stream.
239
The stream may be a pipe connected to sshd, or a tcp socket, or an
240
in-process fifo for testing.
242
One instance is created for each connected client; it can serve multiple
243
requests in the lifetime of the connection.
245
The server passes requests through to an underlying backing transport,
246
which will typically be a LocalTransport looking at the server's filesystem.
249
def __init__(self, in_file, out_file, backing_transport):
250
"""Construct new server.
252
:param in_file: Python file from which requests can be read.
253
:param out_file: Python file to write responses.
254
:param backing_transport: Transport for the directory served.
258
self.smart_server = SmartServer(backing_transport)
259
# server can call back to us to get bulk data - this is not really
260
# ideal, they should get it per request instead
261
self.smart_server._recv_body = self._recv_bulk
263
def _recv_tuple(self):
264
"""Read a request from the client and return as a tuple.
266
Returns None at end of file (if the client closed the connection.)
268
return _recv_tuple(self._in)
270
def _send_tuple(self, args):
271
"""Send response header"""
272
return self._write_and_flush(_encode_tuple(args))
274
def _send_error_and_disconnect(self, exception):
275
self._send_tuple(('error', str(exception)))
279
def _serve_one_request(self):
280
"""Read one request from input, process, send back a response.
282
:return: False if the server should terminate, otherwise None.
284
req_args = self._recv_tuple()
286
# client closed connection
287
return False # shutdown server
289
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
290
self._send_tuple(response.args)
291
if response.body is not None:
292
self._send_bulk_data(response.body)
293
except KeyboardInterrupt:
296
# everything else: pass to client, flush, and quit
297
self._send_error_and_disconnect(e)
301
"""Serve requests until the client disconnects."""
302
# Keep a reference to stderr because the sys module's globals get set to
303
# None during interpreter shutdown.
304
from sys import stderr
306
while self._serve_one_request() != False:
309
stderr.write("%s terminating on exception %s\n" % (self, e))
313
class SmartServerResponse(object):
314
"""Response generated by SmartServer."""
316
def __init__(self, args, body=None):
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
321
# for delivering the data for a request. This could be done with as the
322
# StreamServer, though that would create conflation between request and response
323
# which may be undesirable.
326
class SmartServer(object):
327
"""Protocol logic for smart server.
329
This doesn't handle serialization at all, it just processes requests and
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
340
# TODO: Better way of representing the body for commands that take it,
341
# and allow it to be streamed into the server.
343
def __init__(self, backing_transport):
344
self._backing_transport = backing_transport
347
"""Answer a version request with my version."""
348
return SmartServerResponse(('ok', '1'))
350
def do_has(self, relpath):
351
r = self._backing_transport.has(relpath) and 'yes' or 'no'
352
return SmartServerResponse((r,))
354
def do_get(self, relpath):
355
backing_bytes = self._backing_transport.get_bytes(relpath)
356
return SmartServerResponse(('ok',), backing_bytes)
358
def _deserialise_optional_mode(self, mode):
359
# XXX: FIXME this should be on the protocol object.
365
def do_append(self, relpath, mode):
366
old_length = self._backing_transport.append_bytes(
367
relpath, self._recv_body(), self._deserialise_optional_mode(mode))
368
return SmartServerResponse(('appended', '%d' % old_length))
370
def do_delete(self, relpath):
371
self._backing_transport.delete(relpath)
373
def do_iter_files_recursive(self, abspath):
374
# XXX: the path handling needs some thought.
375
#relpath = self._backing_transport.relpath(abspath)
376
transport = self._backing_transport.clone(abspath)
377
filenames = transport.iter_files_recursive()
378
return SmartServerResponse(('names',) + tuple(filenames))
380
def do_list_dir(self, relpath):
381
filenames = self._backing_transport.list_dir(relpath)
382
return SmartServerResponse(('names',) + tuple(filenames))
384
def do_mkdir(self, relpath, mode):
385
self._backing_transport.mkdir(relpath,
386
self._deserialise_optional_mode(mode))
388
def do_move(self, rel_from, rel_to):
389
self._backing_transport.move(rel_from, rel_to)
391
def do_put(self, relpath, mode):
392
self._backing_transport.put_bytes(relpath,
394
self._deserialise_optional_mode(mode))
396
def _deserialise_offsets(self, text):
397
# XXX: FIXME this should be on the protocol object.
399
for line in text.split('\n'):
402
start, length = line.split(',')
403
offsets.append((int(start), int(length)))
406
def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
407
create_parent_dir = (create_parent == 'T')
408
self._backing_transport.put_bytes_non_atomic(relpath,
410
mode=self._deserialise_optional_mode(mode),
411
create_parent_dir=create_parent_dir,
412
dir_mode=self._deserialise_optional_mode(dir_mode))
414
def do_readv(self, relpath):
415
offsets = self._deserialise_offsets(self._recv_body())
416
backing_bytes = ''.join(bytes for offset, bytes in
417
self._backing_transport.readv(relpath, offsets))
418
return SmartServerResponse(('readv',), backing_bytes)
420
def do_rename(self, rel_from, rel_to):
421
self._backing_transport.rename(rel_from, rel_to)
423
def do_rmdir(self, relpath):
424
self._backing_transport.rmdir(relpath)
426
def do_stat(self, relpath):
427
stat = self._backing_transport.stat(relpath)
428
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
430
def do_get_bundle(self, path, revision_id):
431
# open transport relative to our base
432
t = self._backing_transport.clone(path)
433
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
434
repo = control.open_repository()
435
tmpf = tempfile.TemporaryFile()
436
base_revision = revision.NULL_REVISION
437
write_bundle(repo, revision_id, base_revision, tmpf)
439
return SmartServerResponse((), tmpf.read())
441
def dispatch_command(self, cmd, args):
442
func = getattr(self, 'do_' + cmd, None)
444
raise errors.SmartProtocolError("bad request %r" % (cmd,))
448
result = SmartServerResponse(('ok',))
450
except errors.NoSuchFile, e:
451
return SmartServerResponse(('NoSuchFile', e.path))
452
except errors.FileExists, e:
453
return SmartServerResponse(('FileExists', e.path))
454
except errors.DirectoryNotEmpty, e:
455
return SmartServerResponse(('DirectoryNotEmpty', e.path))
456
except errors.ShortReadvError, e:
457
return SmartServerResponse(('ShortReadvError',
458
e.path, str(e.offset), str(e.length), str(e.actual)))
459
except UnicodeError, e:
460
# If it is a DecodeError, than most likely we are starting
461
# with a plain string
462
str_or_unicode = e.object
463
if isinstance(str_or_unicode, unicode):
464
val = u'u:' + str_or_unicode
466
val = u's:' + str_or_unicode.encode('base64')
467
# This handles UnicodeEncodeError or UnicodeDecodeError
468
return SmartServerResponse((e.__class__.__name__,
469
e.encoding, val, str(e.start), str(e.end), e.reason))
470
except errors.TransportNotPossible, e:
471
if e.msg == "readonly transport":
472
return SmartServerResponse(('ReadOnlyError', ))
477
class SmartTCPServer(object):
478
"""Listens on a TCP socket and accepts connections from smart clients"""
480
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
481
"""Construct a new server.
483
To actually start it running, call either start_background_thread or
486
:param host: Name of the interface to listen on.
487
:param port: TCP port to listen on, or 0 to allocate a transient port.
489
if backing_transport is None:
490
backing_transport = memory.MemoryTransport()
491
self._server_socket = socket.socket()
492
self._server_socket.bind((host, port))
493
self.port = self._server_socket.getsockname()[1]
494
self._server_socket.listen(1)
495
self._server_socket.settimeout(1)
496
self.backing_transport = backing_transport
499
# let connections timeout so that we get a chance to terminate
500
# Keep a reference to the exceptions we want to catch because the socket
501
# module's globals get set to None during interpreter shutdown.
502
from socket import timeout as socket_timeout
503
from socket import error as socket_error
504
self._should_terminate = False
505
while not self._should_terminate:
507
self.accept_and_serve()
508
except socket_timeout:
509
# just check if we're asked to stop
511
except socket_error, e:
512
trace.warning("client disconnected: %s", e)
516
"""Return the url of the server"""
517
return "bzr://%s:%d/" % self._server_socket.getsockname()
519
def accept_and_serve(self):
520
conn, client_addr = self._server_socket.accept()
521
# For WIN32, where the timeout value from the listening socket
522
# propogates to the newly accepted socket.
523
conn.setblocking(True)
524
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
525
from_client = conn.makefile('r')
526
to_client = conn.makefile('w')
527
handler = SmartStreamServer(from_client, to_client,
528
self.backing_transport)
529
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
530
connection_thread.setDaemon(True)
531
connection_thread.start()
533
def start_background_thread(self):
534
self._server_thread = threading.Thread(None,
536
name='server-' + self.get_url())
537
self._server_thread.setDaemon(True)
538
self._server_thread.start()
540
def stop_background_thread(self):
541
self._should_terminate = True
542
# self._server_socket.close()
543
# we used to join the thread, but it's not really necessary; it will
545
## self._server_thread.join()
548
class SmartTCPServer_for_testing(SmartTCPServer):
549
"""Server suitable for use by transport tests.
551
This server is backed by the process's cwd.
555
self._homedir = os.getcwd()
556
# The server is set up by default like for ssh access: the client
557
# passes filesystem-absolute paths; therefore the server must look
558
# them up relative to the root directory. it might be better to act
559
# a public server and have the server rewrite paths into the test
561
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
564
"""Set up server for testing"""
565
self.start_background_thread()
568
self.stop_background_thread()
571
"""Return the url of the server"""
572
host, port = self._server_socket.getsockname()
573
# XXX: I think this is likely to break on windows -- self._homedir will
574
# have backslashes (and maybe a drive letter?).
575
# -- Andrew Bennetts, 2006-08-29
576
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
578
def get_bogus_url(self):
579
"""Return a URL which will fail to connect"""
580
return 'bzr://127.0.0.1:1/'
583
class SmartStat(object):
44
585
def __init__(self, size, mode):
45
586
self.st_size = size
46
587
self.st_mode = mode
49
class RemoteTransport(transport.ConnectedTransport):
590
class SmartTransport(transport.Transport):
50
591
"""Connection to a smart server.
52
The connection holds references to the medium that can be used to send
53
requests to the server.
593
The connection holds references to pipes that can be used to send requests
55
596
The connection has a notion of the current directory to which it's
56
597
connected; this is incorporated in filenames passed to the server.
58
This supports some higher-level RPC operations and can also be treated
599
This supports some higher-level RPC operations and can also be treated
59
600
like a Transport to do file-like operations.
61
The connection can be made over a tcp socket, an ssh pipe or a series of
62
http requests. There are concrete subclasses for each type:
63
RemoteTCPTransport, etc.
602
The connection can be made over a tcp socket, or (in future) an ssh pipe
603
or a series of http requests. There are concrete subclasses for each
604
type: SmartTCPTransport, etc.
66
# When making a readv request, cap it at requesting 5MB of data
67
_max_readv_bytes = 5*1024*1024
69
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
607
# IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
70
608
# responsibilities: Put those on SmartClient or similar. This is vital for
71
609
# the ability to support multiple versions of the smart protocol over time:
72
# RemoteTransport is an adapter from the Transport object model to the
610
# SmartTransport is an adapter from the Transport object model to the
73
611
# SmartClient model, not an encoder.
75
# FIXME: the medium parameter should be private, only the tests requires
76
# it. It may be even clearer to define a TestRemoteTransport that handles
77
# the specific cases of providing a _client and/or a _medium, and leave
78
# RemoteTransport as an abstract class.
79
def __init__(self, url, _from_transport=None, medium=None, _client=None):
613
def __init__(self, url, clone_from=None, client=None):
82
:param _from_transport: Another RemoteTransport instance that this
83
one is being cloned from. Attributes such as the medium will
86
:param medium: The medium to use for this RemoteTransport. If None,
87
the medium from the _from_transport is shared. If both this
88
and _from_transport are None, a new medium will be built.
89
_from_transport and medium cannot both be specified.
91
:param _client: Override the _SmartClient used by this transport. This
92
should only be used for testing purposes; normally this is
93
determined from the medium.
95
super(RemoteTransport, self).__init__(
96
url, _from_transport=_from_transport)
98
# The medium is the connection, except when we need to share it with
99
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
# what we want to share is really the shared connection.
102
if (_from_transport is not None
103
and isinstance(_from_transport, RemoteTransport)):
104
_client = _from_transport._client
105
elif _from_transport is None:
106
# If no _from_transport is specified, we need to intialize the
110
medium, credentials = self._build_medium()
111
if 'hpss' in debug.debug_flags:
112
trace.mutter('hpss: Built a new medium: %s',
113
medium.__class__.__name__)
114
self._shared_connection = transport._SharedConnection(medium,
118
# No medium was specified, so share the medium from the
120
medium = self._shared_connection.connection
122
raise AssertionError(
123
"Both _from_transport (%r) and medium (%r) passed to "
124
"RemoteTransport.__init__, but these parameters are mutally "
125
"exclusive." % (_from_transport, medium))
128
self._client = client._SmartClient(medium)
130
self._client = _client
132
def _build_medium(self):
133
"""Create the medium if _from_transport does not provide one.
135
The medium is analogous to the connection for ConnectedTransport: it
136
allows connection sharing.
141
def _report_activity(self, bytes, direction):
142
"""See Transport._report_activity.
144
Does nothing; the smart medium will report activity triggered by a
616
:param client: ignored when clone_from is not None.
618
### Technically super() here is faulty because Transport's __init__
619
### fails to take 2 parameters, and if super were to choose a silly
620
### initialisation order things would blow up.
621
if not url.endswith('/'):
623
super(SmartTransport, self).__init__(url)
624
self._scheme, self._username, self._password, self._host, self._port, self._path = \
625
transport.split_url(url)
626
if clone_from is None:
628
self._client = SmartStreamClient(self._connect_to_server)
630
self._client = client
632
# credentials may be stripped from the base in some circumstances
633
# as yet to be clearly defined or documented, so copy them.
634
self._username = clone_from._username
635
# reuse same connection
636
self._client = clone_from._client
638
def abspath(self, relpath):
639
"""Return the full url to the given relative path.
641
@param relpath: the relative path or path components
642
@type relpath: str or list
644
return self._unparse_url(self._remote_path(relpath))
646
def clone(self, relative_url):
647
"""Make a new SmartTransport related to me, sharing the same connection.
649
This essentially opens a handle on a different remote directory.
651
if relative_url is None:
652
return self.__class__(self.base, self)
654
return self.__class__(self.abspath(relative_url), self)
149
656
def is_readonly(self):
150
657
"""Smart server transport can do read/write file operations."""
152
resp = self._call2('Transport.is_readonly')
153
except errors.UnknownSmartMethod:
154
# XXX: nasty hack: servers before 0.16 don't have a
155
# 'Transport.is_readonly' verb, so we do what clients before 0.16
158
if resp == ('yes', ):
160
elif resp == ('no', ):
163
raise errors.UnexpectedSmartServerResponse(resp)
165
660
def get_smart_client(self):
166
return self._get_connection()
663
def _unparse_url(self, path):
664
"""Return URL for a path.
168
def get_smart_medium(self):
169
return self._get_connection()
666
:see: SFTPUrlHandling._unparse_url
668
# TODO: Eventually it should be possible to unify this with
669
# SFTPUrlHandling._unparse_url?
672
path = urllib.quote(path)
673
netloc = urllib.quote(self._host)
674
if self._username is not None:
675
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
676
if self._port is not None:
677
netloc = '%s:%d' % (netloc, self._port)
678
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
171
680
def _remote_path(self, relpath):
172
681
"""Returns the Unicode version of the absolute path for relpath."""
173
682
return self._combine_paths(self._path, relpath)
175
def _call(self, method, *args):
176
resp = self._call2(method, *args)
177
self._ensure_ok(resp)
179
def _call2(self, method, *args):
180
"""Call a method on the remote server."""
182
return self._client.call(method, *args)
183
except errors.ErrorFromSmartServer, err:
184
# The first argument, if present, is always a path.
186
context = {'relpath': args[0]}
189
self._translate_error(err, **context)
191
def _call_with_body_bytes(self, method, args, body):
192
"""Call a method on the remote server with body bytes."""
194
return self._client.call_with_body_bytes(method, args, body)
195
except errors.ErrorFromSmartServer, err:
196
# The first argument, if present, is always a path.
198
context = {'relpath': args[0]}
201
self._translate_error(err, **context)
203
684
def has(self, relpath):
204
685
"""Indicate whether a remote file of the given name exists or not.
206
687
:see: Transport.has()
208
resp = self._call2('has', self._remote_path(relpath))
689
resp = self._client._call('has', self._remote_path(relpath))
209
690
if resp == ('yes', ):
211
692
elif resp == ('no', ):
214
raise errors.UnexpectedSmartServerResponse(resp)
695
self._translate_error(resp)
216
697
def get(self, relpath):
217
698
"""Return file-like object reading the contents of a remote file.
219
700
:see: Transport.get_bytes()/get_file()
221
return StringIO(self.get_bytes(relpath))
223
def get_bytes(self, relpath):
224
702
remote = self._remote_path(relpath)
226
resp, response_handler = self._client.call_expecting_body('get', remote)
227
except errors.ErrorFromSmartServer, err:
228
self._translate_error(err, relpath)
703
resp = self._client._call('get', remote)
229
704
if resp != ('ok', ):
230
response_handler.cancel_read_body()
231
raise errors.UnexpectedSmartServerResponse(resp)
232
return response_handler.read_body_bytes()
705
self._translate_error(resp, relpath)
706
return StringIO(self._client._recv_bulk())
234
708
def _serialise_optional_mode(self, mode):
301
764
def append_file(self, relpath, from_file, mode=None):
302
765
return self.append_bytes(relpath, from_file.read(), mode)
304
767
def append_bytes(self, relpath, bytes, mode=None):
305
resp = self._call_with_body_bytes(
768
resp = self._client._call_with_upload(
307
770
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
309
772
if resp[0] == 'appended':
310
773
return int(resp[1])
311
raise errors.UnexpectedSmartServerResponse(resp)
774
self._translate_error(resp)
313
776
def delete(self, relpath):
314
resp = self._call2('delete', self._remote_path(relpath))
315
self._ensure_ok(resp)
317
def external_url(self):
318
"""See bzrlib.transport.Transport.external_url."""
319
# the external path for RemoteTransports is the base
322
def recommended_page_size(self):
323
"""Return the recommended page size for this transport."""
326
def _readv(self, relpath, offsets):
777
resp = self._client._call('delete', self._remote_path(relpath))
778
self._translate_error(resp)
780
def readv(self, relpath, offsets):
330
784
offsets = list(offsets)
332
786
sorted_offsets = sorted(offsets)
787
# turn the list of offsets into a stack
788
offset_stack = iter(offsets)
789
cur_offset_and_size = offset_stack.next()
333
790
coalesced = list(self._coalesce_offsets(sorted_offsets,
334
791
limit=self._max_readv_combine,
335
fudge_factor=self._bytes_to_read_before_seek,
336
max_size=self._max_readv_bytes))
338
# now that we've coallesced things, avoid making enormous requests
343
if c.length + cur_len > self._max_readv_bytes:
344
requests.append(cur_request)
348
cur_request.append(c)
351
requests.append(cur_request)
352
if 'hpss' in debug.debug_flags:
353
trace.mutter('%s.readv %s offsets => %s coalesced'
354
' => %s requests (%s)',
355
self.__class__.__name__, len(offsets), len(coalesced),
356
len(requests), sum(map(len, requests)))
792
fudge_factor=self._bytes_to_read_before_seek))
795
resp = self._client._call_with_upload(
797
(self._remote_path(relpath),),
798
self._client._serialise_offsets((c.start, c.length) for c in coalesced))
800
if resp[0] != 'readv':
801
# This should raise an exception
802
self._translate_error(resp)
805
data = self._client._recv_bulk()
357
806
# Cache the results, but only until they have been fulfilled
359
# turn the list of offsets into a single stack to iterate
360
offset_stack = iter(offsets)
361
# using a list so it can be modified when passing down and coming back
362
next_offset = [offset_stack.next()]
363
for cur_request in requests:
365
result = self._client.call_with_body_readv_array(
366
('readv', self._remote_path(relpath),),
367
[(c.start, c.length) for c in cur_request])
368
resp, response_handler = result
369
except errors.ErrorFromSmartServer, err:
370
self._translate_error(err, relpath)
372
if resp[0] != 'readv':
373
# This should raise an exception
374
response_handler.cancel_read_body()
375
raise errors.UnexpectedSmartServerResponse(resp)
377
for res in self._handle_response(offset_stack, cur_request,
383
def _handle_response(self, offset_stack, coalesced, response_handler,
384
data_map, next_offset):
385
cur_offset_and_size = next_offset[0]
386
# FIXME: this should know how many bytes are needed, for clarity.
387
data = response_handler.read_body_bytes()
389
808
for c_offset in coalesced:
390
809
if len(data) < c_offset.length:
391
810
raise errors.ShortReadvError(relpath, c_offset.start,
392
811
c_offset.length, actual=len(data))
393
812
for suboffset, subsize in c_offset.ranges:
394
813
key = (c_offset.start+suboffset, subsize)
395
this_data = data[data_offset+suboffset:
396
data_offset+suboffset+subsize]
397
# Special case when the data is in-order, rather than packing
398
# into a map and then back out again. Benchmarking shows that
399
# this has 100% hit rate, but leave in the data_map work just
401
# TODO: Could we get away with using buffer() to avoid the
402
# memory copy? Callers would need to realize they may
403
# not have a real string.
404
if key == cur_offset_and_size:
405
yield cur_offset_and_size[0], this_data
406
cur_offset_and_size = next_offset[0] = offset_stack.next()
408
data_map[key] = this_data
409
data_offset += c_offset.length
814
data_map[key] = data[suboffset:suboffset+subsize]
815
data = data[c_offset.length:]
411
817
# Now that we've read some data, see if we can yield anything back
412
818
while cur_offset_and_size in data_map:
413
819
this_data = data_map.pop(cur_offset_and_size)
414
820
yield cur_offset_and_size[0], this_data
415
cur_offset_and_size = next_offset[0] = offset_stack.next()
821
cur_offset_and_size = offset_stack.next()
417
823
def rename(self, rel_from, rel_to):
419
825
self._remote_path(rel_from),
420
826
self._remote_path(rel_to))
422
828
def move(self, rel_from, rel_to):
424
830
self._remote_path(rel_from),
425
831
self._remote_path(rel_to))
427
833
def rmdir(self, relpath):
428
834
resp = self._call('rmdir', self._remote_path(relpath))
430
def _ensure_ok(self, resp):
432
raise errors.UnexpectedSmartServerResponse(resp)
434
def _translate_error(self, err, relpath=None):
435
remote._translate_error(err, path=relpath)
836
def _call(self, method, *args):
837
resp = self._client._call(method, *args)
838
self._translate_error(resp)
840
def _translate_error(self, resp, orig_path=None):
841
"""Raise an exception from a response"""
848
elif what == 'NoSuchFile':
849
if orig_path is not None:
850
error_path = orig_path
853
raise errors.NoSuchFile(error_path)
854
elif what == 'error':
855
raise errors.SmartProtocolError(unicode(resp[1]))
856
elif what == 'FileExists':
857
raise errors.FileExists(resp[1])
858
elif what == 'DirectoryNotEmpty':
859
raise errors.DirectoryNotEmpty(resp[1])
860
elif what == 'ShortReadvError':
861
raise errors.ShortReadvError(resp[1], int(resp[2]),
862
int(resp[3]), int(resp[4]))
863
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
864
encoding = str(resp[1]) # encoding must always be a string
868
reason = str(resp[5]) # reason must always be a string
869
if val.startswith('u:'):
871
elif val.startswith('s:'):
872
val = val[2:].decode('base64')
873
if what == 'UnicodeDecodeError':
874
raise UnicodeDecodeError(encoding, val, start, end, reason)
875
elif what == 'UnicodeEncodeError':
876
raise UnicodeEncodeError(encoding, val, start, end, reason)
877
elif what == "ReadOnlyError":
878
raise errors.TransportNotPossible('readonly transport')
880
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
882
def _send_tuple(self, args):
883
self._client._send_tuple(args)
885
def _recv_tuple(self):
886
return self._client._recv_tuple()
437
888
def disconnect(self):
438
self.get_smart_medium().disconnect()
889
self._client.disconnect()
891
def delete_tree(self, relpath):
892
raise errors.TransportNotPossible('readonly transport')
440
894
def stat(self, relpath):
441
resp = self._call2('stat', self._remote_path(relpath))
895
resp = self._client._call('stat', self._remote_path(relpath))
442
896
if resp[0] == 'stat':
443
return _SmartStat(int(resp[1]), int(resp[2], 8))
444
raise errors.UnexpectedSmartServerResponse(resp)
897
return SmartStat(int(resp[1]), int(resp[2], 8))
899
self._translate_error(resp)
446
901
## def lock_read(self, relpath):
447
902
## """Lock the given file for shared (read) access.
462
917
def list_dir(self, relpath):
463
resp = self._call2('list_dir', self._remote_path(relpath))
918
resp = self._client._call('list_dir',
919
self._remote_path(relpath))
464
920
if resp[0] == 'names':
465
921
return [name.encode('ascii') for name in resp[1:]]
466
raise errors.UnexpectedSmartServerResponse(resp)
923
self._translate_error(resp)
468
925
def iter_files_recursive(self):
469
resp = self._call2('iter_files_recursive', self._remote_path(''))
926
resp = self._client._call('iter_files_recursive',
927
self._remote_path(''))
470
928
if resp[0] == 'names':
472
raise errors.UnexpectedSmartServerResponse(resp)
475
class RemoteTCPTransport(RemoteTransport):
476
"""Connection to smart server over plain tcp.
478
This is essentially just a factory to get 'RemoteTransport(url,
479
SmartTCPClientMedium).
482
def _build_medium(self):
483
client_medium = medium.SmartTCPClientMedium(
484
self._host, self._port, self.base)
485
return client_medium, None
488
class RemoteTCPTransportV2Only(RemoteTransport):
489
"""Connection to smart server over plain tcp with the client hard-coded to
490
assume protocol v2 and remote server version <= 1.6.
492
This should only be used for testing.
495
def _build_medium(self):
496
client_medium = medium.SmartTCPClientMedium(
497
self._host, self._port, self.base)
498
client_medium._protocol_version = 2
499
client_medium._remember_remote_is_before((1, 6))
500
return client_medium, None
503
class RemoteSSHTransport(RemoteTransport):
504
"""Connection to smart server over SSH.
506
This is essentially just a factory to get 'RemoteTransport(url,
507
SmartSSHClientMedium).
510
def _build_medium(self):
511
location_config = config.LocationConfig(self.base)
512
bzr_remote_path = location_config.get_bzr_remote_path()
515
auth = config.AuthenticationConfig()
516
user = auth.get_user('ssh', self._host, self._port)
517
client_medium = medium.SmartSSHClientMedium(self._host, self._port,
518
user, self._password, self.base,
519
bzr_remote_path=bzr_remote_path)
520
return client_medium, (user, self._password)
523
class RemoteHTTPTransport(RemoteTransport):
524
"""Just a way to connect between a bzr+http:// url and http://.
526
This connection operates slightly differently than the RemoteSSHTransport.
527
It uses a plain http:// transport underneath, which defines what remote
528
.bzr/smart URL we are connected to. From there, all paths that are sent are
529
sent as relative paths, this way, the remote side can properly
530
de-reference them, since it is likely doing rewrite rules to translate an
531
HTTP path into a local path.
534
def __init__(self, base, _from_transport=None, http_transport=None):
535
if http_transport is None:
536
# FIXME: the password may be lost here because it appears in the
537
# url only for an intial construction (when the url came from the
539
http_url = base[len('bzr+'):]
540
self._http_transport = transport.get_transport(http_url)
542
self._http_transport = http_transport
543
super(RemoteHTTPTransport, self).__init__(
544
base, _from_transport=_from_transport)
546
def _build_medium(self):
547
# We let http_transport take care of the credentials
548
return self._http_transport.get_smart_medium(), None
550
def _remote_path(self, relpath):
551
"""After connecting, HTTP Transport only deals in relative URLs."""
552
# Adjust the relpath based on which URL this smart transport is
554
http_base = urlutils.normalize_url(self.get_smart_medium().base)
555
url = urlutils.join(self.base[len('bzr+'):], relpath)
556
url = urlutils.normalize_url(url)
557
return urlutils.relative_url(http_base, url)
559
def clone(self, relative_url):
560
"""Make a new RemoteHTTPTransport related to me.
562
This is re-implemented rather than using the default
563
RemoteTransport.clone() because we must be careful about the underlying
566
Also, the cloned smart transport will POST to the same .bzr/smart
567
location as this transport (although obviously the relative paths in the
568
smart requests may be different). This is so that the server doesn't
569
have to handle .bzr/smart requests at arbitrary places inside .bzr
570
directories, just at the initial URL the user uses.
931
self._translate_error(resp)
934
class SmartStreamClient(SmartProtocolBase):
935
"""Connection to smart server over two streams"""
937
def __init__(self, connect_func):
938
self._connect_func = connect_func
939
self._connected = False
944
def _ensure_connection(self):
945
if not self._connected:
946
self._in, self._out = self._connect_func()
947
self._connected = True
949
def _send_tuple(self, args):
950
self._ensure_connection()
951
return self._write_and_flush(_encode_tuple(args))
953
def _send_bulk_data(self, body):
954
self._ensure_connection()
955
SmartProtocolBase._send_bulk_data(self, body)
957
def _recv_bulk(self):
958
self._ensure_connection()
959
return SmartProtocolBase._recv_bulk(self)
961
def _recv_tuple(self):
962
self._ensure_connection()
963
return SmartProtocolBase._recv_tuple(self)
965
def _recv_trailer(self):
966
self._ensure_connection()
967
return SmartProtocolBase._recv_trailer(self)
969
def disconnect(self):
970
"""Close connection to the server"""
975
def _call(self, *args):
976
self._send_tuple(args)
977
return self._recv_tuple()
979
def _call_with_upload(self, method, args, body):
980
"""Call an rpc, supplying bulk upload data.
982
:param method: method name to call
983
:param args: parameter args tuple
984
:param body: upload body as a byte string
573
abs_url = self.abspath(relative_url)
576
return RemoteHTTPTransport(abs_url,
577
_from_transport=self,
578
http_transport=self._http_transport)
580
def _redirected_to(self, source, target):
581
"""See transport._redirected_to"""
582
redirected = self._http_transport._redirected_to(source, target)
583
if (redirected is not None
584
and isinstance(redirected, type(self._http_transport))):
585
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
586
http_transport=redirected)
588
# Either None or a transport for a different protocol
592
class HintingSSHTransport(transport.Transport):
593
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
595
def __init__(self, url):
596
raise errors.UnsupportedProtocol(url,
597
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
986
self._send_tuple((method,) + args)
987
self._send_bulk_data(body)
988
return self._recv_tuple()
990
def query_version(self):
991
"""Return protocol version number of the server."""
992
# XXX: should make sure it's empty
993
self._send_tuple(('hello',))
994
resp = self._recv_tuple()
995
if resp == ('ok', '1'):
998
raise errors.SmartProtocolError("bad response %r" % (resp,))
1001
class SmartTCPTransport(SmartTransport):
1002
"""Connection to smart server over plain tcp"""
1004
def __init__(self, url, clone_from=None):
1005
super(SmartTCPTransport, self).__init__(url, clone_from)
1007
self._port = int(self._port)
1008
except (ValueError, TypeError), e:
1009
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1012
def _connect_to_server(self):
1013
self._socket = socket.socket()
1014
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1015
result = self._socket.connect_ex((self._host, int(self._port)))
1017
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1018
(self._host, self._port, os.strerror(result)))
1019
# TODO: May be more efficient to just treat them as sockets
1020
# throughout? But what about pipes to ssh?...
1021
to_server = self._socket.makefile('w')
1022
from_server = self._socket.makefile('r')
1023
return from_server, to_server
1025
def disconnect(self):
1026
super(SmartTCPTransport, self).disconnect()
1027
# XXX: Is closing the socket as well as closing the files really
1029
if self._socket is not None:
1030
self._socket.close()
1033
from bzrlib.transport import sftp, ssh
1034
except errors.ParamikoNotPresent:
1035
# no paramiko, no SSHTransport.
1038
class SmartSSHTransport(SmartTransport):
1039
"""Connection to smart server over SSH."""
1041
def __init__(self, url, clone_from=None):
1042
# TODO: all this probably belongs in the parent class.
1043
super(SmartSSHTransport, self).__init__(url, clone_from)
1045
if self._port is not None:
1046
self._port = int(self._port)
1047
except (ValueError, TypeError), e:
1048
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1050
def _connect_to_server(self):
1051
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1052
vendor = ssh._get_ssh_vendor()
1053
self._ssh_connection = vendor.connect_ssh(self._username,
1054
self._password, self._host, self._port,
1055
command=[executable, 'serve', '--inet', '--directory=/',
1057
return self._ssh_connection.get_filelike_channels()
1059
def disconnect(self):
1060
super(SmartSSHTransport, self).disconnect()
1061
self._ssh_connection.close()
600
1064
def get_test_permutations():
601
"""Return (transport, server) permutations for testing."""
602
### We may need a little more test framework support to construct an
603
### appropriate RemoteTransport in the future.
604
from bzrlib.tests import test_server
605
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
1065
"""Return (transport, server) permutations for testing"""
1066
return [(SmartTCPTransport, SmartTCPServer_for_testing)]