13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""RemoteTransport client for the smart-server.
19
This module shouldn't be accessed directly. The classes defined here should be
20
imported from bzrlib.smart.
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Smart-server protocol, client and server.
19
Requests are sent as a command and list of arguments, followed by optional
20
bulk body data. Responses are similarly a response and list of arguments,
21
followed by bulk body data. ::
24
Fields are separated by Ctrl-A.
25
BULK_DATA := CHUNK+ TRAILER
26
Chunks can be repeated as many times as necessary.
27
CHUNK := CHUNK_LEN CHUNK_BODY
28
CHUNK_LEN := DIGIT+ NEWLINE
29
Gives the number of bytes in the following chunk.
30
CHUNK_BODY := BYTE[chunk_len]
31
TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
SUCCESS_TRAILER := 'done' NEWLINE
35
Paths are passed across the network. The client needs to see a namespace that
36
includes any repository that might need to be referenced, and the client needs
37
to know about a root directory beyond which it cannot ascend.
39
Servers run over ssh will typically want to be able to access any path the user
40
can access. Public servers on the other hand (which might be over http, ssh
41
or tcp) will typically want to restrict access to only a particular directory
42
and its children, so will want to do a software virtual root at that level.
43
In other words they'll want to rewrite incoming paths to be under that level
44
(and prevent escaping using ../ tricks.)
46
URLs that include ~ should probably be passed across to the server verbatim
47
and the server can expand them. This will proably not be meaningful when
48
limited to a directory?
23
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
53
# TODO: A plain integer from query_version is too simple; should give some
56
# TODO: Server should probably catch exceptions within itself and send them
57
# back across the network. (But shouldn't catch KeyboardInterrupt etc)
58
# Also needs to somehow report protocol errors like bad requests. Need to
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
60
# bulk transfer and then something goes wrong.
62
# TODO: Standard marker at start of request/response lines?
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
66
# TODO: get/put objects could be changed to gradually read back the data as it
67
# comes across the network
69
# TODO: What should the server do if it hits an error and has to terminate?
71
# TODO: is it useful to allow multiple chunks in the bulk data?
73
# TODO: If we get an exception during transmission of bulk data we can't just
74
# emit the exception because it won't be seen.
75
# John proposes: I think it would be worthwhile to have a header on each
76
# chunk, that indicates it is another chunk. Then you can send an 'error'
77
# chunk as long as you finish the previous chunk.
79
# TODO: Clone method on Transport; should work up towards parent directory;
80
# unclear how this should be stored or communicated to the server... maybe
81
# just pass it on all relevant requests?
83
# TODO: Better name than clone() for changing between directories. How about
84
# open_dir or change_dir or chdir?
86
# TODO: Is it really good to have the notion of current directory within the
87
# connection? Perhaps all Transports should factor out a common connection
88
# from the thing that has the directory context?
90
# TODO: Pull more things common to sftp and ssh to a higher level.
92
# TODO: The server that manages a connection should be quite small and retain
93
# minimum state because each of the requests are supposed to be stateless.
94
# Then we can write another implementation that maps to http.
96
# TODO: What to do when a client connection is garbage collected? Maybe just
97
# abruptly drop the connection?
99
# TODO: Server in some cases will need to restrict access to files outside of
100
# a particular root directory. LocalTransport doesn't do anything to stop you
101
# ascending above the base directory, so we need to prevent paths
102
# containing '..' in either the server or transport layers. (Also need to
103
# consider what happens if someone creates a symlink pointing outside the
106
# TODO: Server should rebase absolute paths coming across the network to put
107
# them under the virtual root, if one is in use. LocalTransport currently
108
# doesn't do that; if you give it an absolute path it just uses it.
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
111
# urlescape them instead. Indeed possibly this should just literally be
114
# FIXME: This transport, with several others, has imperfect handling of paths
115
# within urls. It'd probably be better for ".." from a root to raise an error
116
# rather than return the same directory as we do at present.
118
# TODO: Rather than working at the Transport layer we want a Branch,
119
# Repository or BzrDir objects that talk to a server.
121
# TODO: Probably want some way for server commands to gradually produce body
122
# data rather than passing it as a string; they could perhaps pass an
123
# iterator-like callback that will gradually yield data; it probably needs a
124
# close() method that will always be closed to do any necessary cleanup.
126
# TODO: Split the actual smart server from the ssh encoding of it.
128
# TODO: Perhaps support file-level readwrite operations over the transport
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
132
# branch doing file-level operations.
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
135
# the socket, and it assumes everything is UTF8 sections separated
136
# by \001. Which means a request like '\002' Will abort the connection
137
# because of a UnicodeDecodeError. It does look like invalid data will
138
# kill the SmartStreamServer, but only with an abort + exception, and
139
# the overall server shouldn't die.
25
141
from cStringIO import StringIO
27
151
from bzrlib import (
36
from bzrlib.smart import client, medium
37
from bzrlib.symbol_versioning import (
42
class _SmartStat(object):
159
from bzrlib.bundle.serializer import write_bundle
160
from bzrlib.trace import mutter
161
from bzrlib.transport import local
163
# must do this otherwise urllib can't parse the urls properly :(
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
165
transport.register_urlparse_netloc_protocol(scheme)
169
def _recv_tuple(from_file):
170
req_line = from_file.readline()
171
return _decode_tuple(req_line)
174
def _decode_tuple(req_line):
175
if req_line == None or req_line == '':
177
if req_line[-1] != '\n':
178
raise errors.SmartProtocolError("request %r not terminated" % req_line)
179
return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
182
def _send_tuple(to_file, args):
183
# XXX: this will be inefficient. Just ask Robert.
184
to_file.write('\x01'.join((a.encode('utf-8') for a in args)) + '\n')
188
class SmartProtocolBase(object):
189
"""Methods common to client and server"""
191
def _send_bulk_data(self, body):
192
"""Send chunked body data"""
193
assert isinstance(body, str)
194
self._out.write('%d\n' % len(body))
195
self._out.write(body)
196
self._out.write('done\n')
199
# TODO: this only actually accomodates a single block; possibly should support
201
def _recv_bulk(self):
202
chunk_len = self._in.readline()
204
chunk_len = int(chunk_len)
206
raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
207
bulk = self._in.read(chunk_len)
208
if len(bulk) != chunk_len:
209
raise errors.SmartProtocolError("short read fetching bulk data chunk")
213
def _recv_tuple(self):
214
return _recv_tuple(self._in)
216
def _recv_trailer(self):
217
resp = self._recv_tuple()
218
if resp == ('done', ):
221
self._translate_error(resp)
224
class SmartStreamServer(SmartProtocolBase):
225
"""Handles smart commands coming over a stream.
227
The stream may be a pipe connected to sshd, or a tcp socket, or an
228
in-process fifo for testing.
230
One instance is created for each connected client; it can serve multiple
231
requests in the lifetime of the connection.
233
The server passes requests through to an underlying backing transport,
234
which will typically be a LocalTransport looking at the server's filesystem.
237
def __init__(self, in_file, out_file, backing_transport):
238
"""Construct new server.
240
:param in_file: Python file from which requests can be read.
241
:param out_file: Python file to write responses.
242
:param backing_transport: Transport for the directory served.
246
self.smart_server = SmartServer(backing_transport)
247
# server can call back to us to get bulk data - this is not really
248
# ideal, they should get it per request instead
249
self.smart_server._recv_body = self._recv_bulk
251
def _recv_tuple(self):
252
"""Read a request from the client and return as a tuple.
254
Returns None at end of file (if the client closed the connection.)
256
return _recv_tuple(self._in)
258
def _send_tuple(self, args):
259
"""Send response header"""
260
return _send_tuple(self._out, args)
262
def _send_error_and_disconnect(self, exception):
263
self._send_tuple(('error', str(exception)))
268
def _serve_one_request(self):
269
"""Read one request from input, process, send back a response.
271
:return: False if the server should terminate, otherwise None.
273
req_args = self._recv_tuple()
275
# client closed connection
276
return False # shutdown server
278
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
279
self._send_tuple(response.args)
280
if response.body is not None:
281
self._send_bulk_data(response.body)
282
except KeyboardInterrupt:
285
# everything else: pass to client, flush, and quit
286
self._send_error_and_disconnect(e)
290
"""Serve requests until the client disconnects."""
291
# Keep a reference to stderr because the sys module's globals get set to
292
# None during interpreter shutdown.
293
from sys import stderr
295
while self._serve_one_request() != False:
298
stderr.write("%s terminating on exception %s\n" % (self, e))
302
class SmartServerResponse(object):
303
"""Response generated by SmartServer."""
305
def __init__(self, args, body=None):
310
class SmartServer(object):
311
"""Protocol logic for smart server.
313
This doesn't handle serialization at all, it just processes requests and
317
# TODO: Better way of representing the body for commands that take it,
318
# and allow it to be streamed into the server.
320
def __init__(self, backing_transport):
321
self._backing_transport = backing_transport
324
"""Answer a version request with my version."""
325
return SmartServerResponse(('ok', '1'))
327
def do_has(self, relpath):
328
r = self._backing_transport.has(relpath) and 'yes' or 'no'
329
return SmartServerResponse((r,))
331
def do_get(self, relpath):
332
backing_bytes = self._backing_transport.get_bytes(relpath)
333
return SmartServerResponse(('ok',), backing_bytes)
335
def _deserialise_optional_mode(self, mode):
341
def do_append(self, relpath, mode):
342
old_length = self._backing_transport.append_bytes(
343
relpath, self._recv_body(), self._deserialise_optional_mode(mode))
344
return SmartServerResponse(('appended', '%d' % old_length))
346
def do_delete(self, relpath):
347
self._backing_transport.delete(relpath)
349
def do_iter_files_recursive(self, abspath):
350
# XXX: the path handling needs some thought.
351
#relpath = self._backing_transport.relpath(abspath)
352
transport = self._backing_transport.clone(abspath)
353
filenames = transport.iter_files_recursive()
354
return SmartServerResponse(('names',) + tuple(filenames))
356
def do_list_dir(self, relpath):
357
filenames = self._backing_transport.list_dir(relpath)
358
return SmartServerResponse(('names',) + tuple(filenames))
360
def do_mkdir(self, relpath, mode):
361
self._backing_transport.mkdir(relpath,
362
self._deserialise_optional_mode(mode))
364
def do_move(self, rel_from, rel_to):
365
self._backing_transport.move(rel_from, rel_to)
367
def do_put(self, relpath, mode):
368
self._backing_transport.put_bytes(relpath,
370
self._deserialise_optional_mode(mode))
372
def do_rename(self, rel_from, rel_to):
373
self._backing_transport.rename(rel_from, rel_to)
375
def do_rmdir(self, relpath):
376
self._backing_transport.rmdir(relpath)
378
def do_stat(self, relpath):
379
stat = self._backing_transport.stat(relpath)
380
return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
382
def do_get_bundle(self, path, revision_id):
383
# open transport relative to our base
384
t = self._backing_transport.clone(path)
385
control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
386
repo = control.open_repository()
387
tmpf = tempfile.TemporaryFile()
388
base_revision = revision.NULL_REVISION
389
write_bundle(repo, revision_id, base_revision, tmpf)
391
return SmartServerResponse((), tmpf.read())
393
def dispatch_command(self, cmd, args):
394
func = getattr(self, 'do_' + cmd, None)
396
raise errors.SmartProtocolError("bad request %r" % (cmd,))
400
result = SmartServerResponse(('ok',))
402
except errors.NoSuchFile, e:
403
return SmartServerResponse(('NoSuchFile', e.path))
404
except errors.FileExists, e:
405
return SmartServerResponse(('FileExists', e.path))
406
except errors.DirectoryNotEmpty, e:
407
return SmartServerResponse(('DirectoryNotEmpty', e.path))
408
except UnicodeError, e:
409
# If it is a DecodeError, than most likely we are starting
410
# with a plain string
411
str_or_unicode = e.object
412
if isinstance(str_or_unicode, unicode):
413
val = u'u:' + str_or_unicode
415
val = u's:' + str_or_unicode.encode('base64')
416
# This handles UnicodeEncodeError or UnicodeDecodeError
417
return SmartServerResponse((e.__class__.__name__,
418
e.encoding, val, str(e.start), str(e.end), e.reason))
421
class SmartTCPServer(object):
422
"""Listens on a TCP socket and accepts connections from smart clients"""
424
def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
425
"""Construct a new server.
427
To actually start it running, call either start_background_thread or
430
:param host: Name of the interface to listen on.
431
:param port: TCP port to listen on, or 0 to allocate a transient port.
433
if backing_transport is None:
434
backing_transport = memory.MemoryTransport()
435
self._server_socket = socket.socket()
436
self._server_socket.bind((host, port))
437
self.port = self._server_socket.getsockname()[1]
438
self._server_socket.listen(1)
439
self._server_socket.settimeout(1)
440
self.backing_transport = backing_transport
443
# let connections timeout so that we get a chance to terminate
444
# Keep a reference to the exceptions we want to catch because the socket
445
# module's globals get set to None during interpreter shutdown.
446
from socket import timeout as socket_timeout
447
from socket import error as socket_error
448
self._should_terminate = False
449
while not self._should_terminate:
451
self.accept_and_serve()
452
except socket_timeout:
453
# just check if we're asked to stop
455
except socket_error, e:
456
trace.warning("client disconnected: %s", e)
460
"""Return the url of the server"""
461
return "bzr://%s:%d/" % self._server_socket.getsockname()
463
def accept_and_serve(self):
464
conn, client_addr = self._server_socket.accept()
465
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
466
from_client = conn.makefile('r')
467
to_client = conn.makefile('w')
468
handler = SmartStreamServer(from_client, to_client,
469
self.backing_transport)
470
connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
471
connection_thread.setDaemon(True)
472
connection_thread.start()
474
def start_background_thread(self):
475
self._server_thread = threading.Thread(None,
477
name='server-' + self.get_url())
478
self._server_thread.setDaemon(True)
479
self._server_thread.start()
481
def stop_background_thread(self):
482
self._should_terminate = True
483
# self._server_socket.close()
484
# we used to join the thread, but it's not really necessary; it will
486
## self._server_thread.join()
489
class SmartTCPServer_for_testing(SmartTCPServer):
490
"""Server suitable for use by transport tests.
492
This server is backed by the process's cwd.
496
self._homedir = os.getcwd()
497
# The server is set up by default like for ssh access: the client
498
# passes filesystem-absolute paths; therefore the server must look
499
# them up relative to the root directory. it might be better to act
500
# a public server and have the server rewrite paths into the test
502
SmartTCPServer.__init__(self, transport.get_transport("file:///"))
505
"""Set up server for testing"""
506
self.start_background_thread()
509
self.stop_background_thread()
512
"""Return the url of the server"""
513
host, port = self._server_socket.getsockname()
514
# XXX: I think this is likely to break on windows -- self._homedir will
515
# have backslashes (and maybe a drive letter?).
516
# -- Andrew Bennetts, 2006-08-29
517
return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
519
def get_bogus_url(self):
520
"""Return a URL which will fail to connect"""
521
return 'bzr://127.0.0.1:1/'
524
class SmartStat(object):
44
526
def __init__(self, size, mode):
45
527
self.st_size = size
46
528
self.st_mode = mode
49
class RemoteTransport(transport.ConnectedTransport):
531
class SmartTransport(transport.Transport):
50
532
"""Connection to a smart server.
52
The connection holds references to the medium that can be used to send
53
requests to the server.
534
The connection holds references to pipes that can be used to send requests
55
537
The connection has a notion of the current directory to which it's
56
538
connected; this is incorporated in filenames passed to the server.
58
This supports some higher-level RPC operations and can also be treated
540
This supports some higher-level RPC operations and can also be treated
59
541
like a Transport to do file-like operations.
61
The connection can be made over a tcp socket, an ssh pipe or a series of
62
http requests. There are concrete subclasses for each type:
63
RemoteTCPTransport, etc.
543
The connection can be made over a tcp socket, or (in future) an ssh pipe
544
or a series of http requests. There are concrete subclasses for each
545
type: SmartTCPTransport, etc.
66
# When making a readv request, cap it at requesting 5MB of data
67
_max_readv_bytes = 5*1024*1024
69
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
70
# responsibilities: Put those on SmartClient or similar. This is vital for
71
# the ability to support multiple versions of the smart protocol over time:
72
# RemoteTransport is an adapter from the Transport object model to the
73
# SmartClient model, not an encoder.
75
# FIXME: the medium parameter should be private, only the tests requires
76
# it. It may be even clearer to define a TestRemoteTransport that handles
77
# the specific cases of providing a _client and/or a _medium, and leave
78
# RemoteTransport as an abstract class.
79
def __init__(self, url, _from_transport=None, medium=None, _client=None):
548
def __init__(self, url, clone_from=None, client=None):
82
:param _from_transport: Another RemoteTransport instance that this
83
one is being cloned from. Attributes such as the medium will
86
:param medium: The medium to use for this RemoteTransport. If None,
87
the medium from the _from_transport is shared. If both this
88
and _from_transport are None, a new medium will be built.
89
_from_transport and medium cannot both be specified.
91
:param _client: Override the _SmartClient used by this transport. This
92
should only be used for testing purposes; normally this is
93
determined from the medium.
95
super(RemoteTransport, self).__init__(
96
url, _from_transport=_from_transport)
98
# The medium is the connection, except when we need to share it with
99
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
# what we want to share is really the shared connection.
102
if (_from_transport is not None
103
and isinstance(_from_transport, RemoteTransport)):
104
_client = _from_transport._client
105
elif _from_transport is None:
106
# If no _from_transport is specified, we need to intialize the
110
medium, credentials = self._build_medium()
111
if 'hpss' in debug.debug_flags:
112
trace.mutter('hpss: Built a new medium: %s',
113
medium.__class__.__name__)
114
self._shared_connection = transport._SharedConnection(medium,
118
# No medium was specified, so share the medium from the
120
medium = self._shared_connection.connection
122
raise AssertionError(
123
"Both _from_transport (%r) and medium (%r) passed to "
124
"RemoteTransport.__init__, but these parameters are mutally "
125
"exclusive." % (_from_transport, medium))
128
self._client = client._SmartClient(medium)
130
self._client = _client
132
def _build_medium(self):
133
"""Create the medium if _from_transport does not provide one.
135
The medium is analogous to the connection for ConnectedTransport: it
136
allows connection sharing.
141
def _report_activity(self, bytes, direction):
142
"""See Transport._report_activity.
144
Does nothing; the smart medium will report activity triggered by a
551
:param client: ignored when clone_from is not None.
553
### Technically super() here is faulty because Transport's __init__
554
### fails to take 2 parameters, and if super were to choose a silly
555
### initialisation order things would blow up.
556
if not url.endswith('/'):
558
super(SmartTransport, self).__init__(url)
559
self._scheme, self._username, self._password, self._host, self._port, self._path = \
560
transport.split_url(url)
561
if clone_from is None:
563
self._client = SmartStreamClient(self._connect_to_server)
565
self._client = client
567
# credentials may be stripped from the base in some circumstances
568
# as yet to be clearly defined or documented, so copy them.
569
self._username = clone_from._username
570
# reuse same connection
571
self._client = clone_from._client
573
def abspath(self, relpath):
574
"""Return the full url to the given relative path.
576
@param relpath: the relative path or path components
577
@type relpath: str or list
579
return self._unparse_url(self._remote_path(relpath))
581
def clone(self, relative_url):
582
"""Make a new SmartTransport related to me, sharing the same connection.
584
This essentially opens a handle on a different remote directory.
586
if relative_url is None:
587
return self.__class__(self.base, self)
589
return self.__class__(self.abspath(relative_url), self)
149
591
def is_readonly(self):
150
592
"""Smart server transport can do read/write file operations."""
152
resp = self._call2('Transport.is_readonly')
153
except errors.UnknownSmartMethod:
154
# XXX: nasty hack: servers before 0.16 don't have a
155
# 'Transport.is_readonly' verb, so we do what clients before 0.16
158
if resp == ('yes', ):
160
elif resp == ('no', ):
163
raise errors.UnexpectedSmartServerResponse(resp)
165
595
def get_smart_client(self):
166
return self._get_connection()
598
def _unparse_url(self, path):
599
"""Return URL for a path.
168
def get_smart_medium(self):
169
return self._get_connection()
601
:see: SFTPUrlHandling._unparse_url
603
# TODO: Eventually it should be possible to unify this with
604
# SFTPUrlHandling._unparse_url?
607
path = urllib.quote(path)
608
netloc = urllib.quote(self._host)
609
if self._username is not None:
610
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
611
if self._port is not None:
612
netloc = '%s:%d' % (netloc, self._port)
613
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
171
615
def _remote_path(self, relpath):
172
616
"""Returns the Unicode version of the absolute path for relpath."""
173
617
return self._combine_paths(self._path, relpath)
175
def _call(self, method, *args):
176
resp = self._call2(method, *args)
177
self._ensure_ok(resp)
179
def _call2(self, method, *args):
180
"""Call a method on the remote server."""
182
return self._client.call(method, *args)
183
except errors.ErrorFromSmartServer, err:
184
# The first argument, if present, is always a path.
186
context = {'relpath': args[0]}
189
self._translate_error(err, **context)
191
def _call_with_body_bytes(self, method, args, body):
192
"""Call a method on the remote server with body bytes."""
194
return self._client.call_with_body_bytes(method, args, body)
195
except errors.ErrorFromSmartServer, err:
196
# The first argument, if present, is always a path.
198
context = {'relpath': args[0]}
201
self._translate_error(err, **context)
203
619
def has(self, relpath):
204
620
"""Indicate whether a remote file of the given name exists or not.
206
622
:see: Transport.has()
208
resp = self._call2('has', self._remote_path(relpath))
624
resp = self._client._call('has', self._remote_path(relpath))
209
625
if resp == ('yes', ):
211
627
elif resp == ('no', ):
214
raise errors.UnexpectedSmartServerResponse(resp)
630
self._translate_error(resp)
216
632
def get(self, relpath):
217
633
"""Return file-like object reading the contents of a remote file.
219
635
:see: Transport.get_bytes()/get_file()
221
return StringIO(self.get_bytes(relpath))
223
def get_bytes(self, relpath):
224
637
remote = self._remote_path(relpath)
226
resp, response_handler = self._client.call_expecting_body('get', remote)
227
except errors.ErrorFromSmartServer, err:
228
self._translate_error(err, relpath)
638
resp = self._client._call('get', remote)
229
639
if resp != ('ok', ):
230
response_handler.cancel_read_body()
231
raise errors.UnexpectedSmartServerResponse(resp)
232
return response_handler.read_body_bytes()
640
self._translate_error(resp, relpath)
641
return StringIO(self._client._recv_bulk())
234
643
def _serialise_optional_mode(self, mode):
291
663
upload_file.seek(pos)
294
def put_file_non_atomic(self, relpath, f, mode=None,
295
create_parent_dir=False,
297
return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
298
create_parent_dir=create_parent_dir,
666
def put_bytes(self, relpath, upload_contents, mode=None):
667
# FIXME: upload_file is probably not safe for non-ascii characters -
668
# should probably just pass all parameters as length-delimited
670
resp = self._client._call_with_upload(
672
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
674
self._translate_error(resp)
301
676
def append_file(self, relpath, from_file, mode=None):
302
677
return self.append_bytes(relpath, from_file.read(), mode)
304
679
def append_bytes(self, relpath, bytes, mode=None):
305
resp = self._call_with_body_bytes(
680
resp = self._client._call_with_upload(
307
682
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
309
684
if resp[0] == 'appended':
310
685
return int(resp[1])
311
raise errors.UnexpectedSmartServerResponse(resp)
686
self._translate_error(resp)
313
688
def delete(self, relpath):
314
resp = self._call2('delete', self._remote_path(relpath))
315
self._ensure_ok(resp)
317
def external_url(self):
318
"""See bzrlib.transport.Transport.external_url."""
319
# the external path for RemoteTransports is the base
322
def recommended_page_size(self):
323
"""Return the recommended page size for this transport."""
326
def _readv(self, relpath, offsets):
330
offsets = list(offsets)
332
sorted_offsets = sorted(offsets)
333
coalesced = list(self._coalesce_offsets(sorted_offsets,
334
limit=self._max_readv_combine,
335
fudge_factor=self._bytes_to_read_before_seek,
336
max_size=self._max_readv_bytes))
338
# now that we've coallesced things, avoid making enormous requests
343
if c.length + cur_len > self._max_readv_bytes:
344
requests.append(cur_request)
348
cur_request.append(c)
351
requests.append(cur_request)
352
if 'hpss' in debug.debug_flags:
353
trace.mutter('%s.readv %s offsets => %s coalesced'
354
' => %s requests (%s)',
355
self.__class__.__name__, len(offsets), len(coalesced),
356
len(requests), sum(map(len, requests)))
357
# Cache the results, but only until they have been fulfilled
359
# turn the list of offsets into a single stack to iterate
360
offset_stack = iter(offsets)
361
# using a list so it can be modified when passing down and coming back
362
next_offset = [offset_stack.next()]
363
for cur_request in requests:
365
result = self._client.call_with_body_readv_array(
366
('readv', self._remote_path(relpath),),
367
[(c.start, c.length) for c in cur_request])
368
resp, response_handler = result
369
except errors.ErrorFromSmartServer, err:
370
self._translate_error(err, relpath)
372
if resp[0] != 'readv':
373
# This should raise an exception
374
response_handler.cancel_read_body()
375
raise errors.UnexpectedSmartServerResponse(resp)
377
for res in self._handle_response(offset_stack, cur_request,
383
def _handle_response(self, offset_stack, coalesced, response_handler,
384
data_map, next_offset):
385
cur_offset_and_size = next_offset[0]
386
# FIXME: this should know how many bytes are needed, for clarity.
387
data = response_handler.read_body_bytes()
389
for c_offset in coalesced:
390
if len(data) < c_offset.length:
391
raise errors.ShortReadvError(relpath, c_offset.start,
392
c_offset.length, actual=len(data))
393
for suboffset, subsize in c_offset.ranges:
394
key = (c_offset.start+suboffset, subsize)
395
this_data = data[data_offset+suboffset:
396
data_offset+suboffset+subsize]
397
# Special case when the data is in-order, rather than packing
398
# into a map and then back out again. Benchmarking shows that
399
# this has 100% hit rate, but leave in the data_map work just
401
# TODO: Could we get away with using buffer() to avoid the
402
# memory copy? Callers would need to realize they may
403
# not have a real string.
404
if key == cur_offset_and_size:
405
yield cur_offset_and_size[0], this_data
406
cur_offset_and_size = next_offset[0] = offset_stack.next()
408
data_map[key] = this_data
409
data_offset += c_offset.length
411
# Now that we've read some data, see if we can yield anything back
412
while cur_offset_and_size in data_map:
413
this_data = data_map.pop(cur_offset_and_size)
414
yield cur_offset_and_size[0], this_data
415
cur_offset_and_size = next_offset[0] = offset_stack.next()
689
resp = self._client._call('delete', self._remote_path(relpath))
690
self._translate_error(resp)
417
692
def rename(self, rel_from, rel_to):
419
694
self._remote_path(rel_from),
420
695
self._remote_path(rel_to))
422
697
def move(self, rel_from, rel_to):
424
699
self._remote_path(rel_from),
425
700
self._remote_path(rel_to))
427
702
def rmdir(self, relpath):
428
703
resp = self._call('rmdir', self._remote_path(relpath))
430
def _ensure_ok(self, resp):
432
raise errors.UnexpectedSmartServerResponse(resp)
434
def _translate_error(self, err, relpath=None):
435
remote._translate_error(err, path=relpath)
705
def _call(self, method, *args):
706
resp = self._client._call(method, *args)
707
self._translate_error(resp)
709
def _translate_error(self, resp, orig_path=None):
710
"""Raise an exception from a response"""
714
elif what == 'NoSuchFile':
715
if orig_path is not None:
716
error_path = orig_path
719
raise errors.NoSuchFile(error_path)
720
elif what == 'error':
721
raise errors.SmartProtocolError(unicode(resp[1]))
722
elif what == 'FileExists':
723
raise errors.FileExists(resp[1])
724
elif what == 'DirectoryNotEmpty':
725
raise errors.DirectoryNotEmpty(resp[1])
726
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
727
encoding = str(resp[1]) # encoding must always be a string
731
reason = str(resp[5]) # reason must always be a string
732
if val.startswith('u:'):
734
elif val.startswith('s:'):
735
val = val[2:].decode('base64')
736
if what == 'UnicodeDecodeError':
737
raise UnicodeDecodeError(encoding, val, start, end, reason)
738
elif what == 'UnicodeEncodeError':
739
raise UnicodeEncodeError(encoding, val, start, end, reason)
741
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
743
def _send_tuple(self, args):
744
self._client._send_tuple(args)
746
def _recv_tuple(self):
747
return self._client._recv_tuple()
437
749
def disconnect(self):
438
m = self.get_smart_medium()
750
self._client.disconnect()
752
def delete_tree(self, relpath):
753
raise errors.TransportNotPossible('readonly transport')
442
755
def stat(self, relpath):
443
resp = self._call2('stat', self._remote_path(relpath))
756
resp = self._client._call('stat', self._remote_path(relpath))
444
757
if resp[0] == 'stat':
445
return _SmartStat(int(resp[1]), int(resp[2], 8))
446
raise errors.UnexpectedSmartServerResponse(resp)
758
return SmartStat(int(resp[1]), int(resp[2], 8))
760
self._translate_error(resp)
448
762
## def lock_read(self, relpath):
449
763
## """Lock the given file for shared (read) access.
464
778
def list_dir(self, relpath):
465
resp = self._call2('list_dir', self._remote_path(relpath))
779
resp = self._client._call('list_dir',
780
self._remote_path(relpath))
466
781
if resp[0] == 'names':
467
782
return [name.encode('ascii') for name in resp[1:]]
468
raise errors.UnexpectedSmartServerResponse(resp)
784
self._translate_error(resp)
470
786
def iter_files_recursive(self):
471
resp = self._call2('iter_files_recursive', self._remote_path(''))
787
resp = self._client._call('iter_files_recursive',
788
self._remote_path(''))
472
789
if resp[0] == 'names':
474
raise errors.UnexpectedSmartServerResponse(resp)
477
class RemoteTCPTransport(RemoteTransport):
478
"""Connection to smart server over plain tcp.
480
This is essentially just a factory to get 'RemoteTransport(url,
481
SmartTCPClientMedium).
484
def _build_medium(self):
485
client_medium = medium.SmartTCPClientMedium(
486
self._host, self._port, self.base)
487
return client_medium, None
490
class RemoteTCPTransportV2Only(RemoteTransport):
491
"""Connection to smart server over plain tcp with the client hard-coded to
492
assume protocol v2 and remote server version <= 1.6.
494
This should only be used for testing.
497
def _build_medium(self):
498
client_medium = medium.SmartTCPClientMedium(
499
self._host, self._port, self.base)
500
client_medium._protocol_version = 2
501
client_medium._remember_remote_is_before((1, 6))
502
return client_medium, None
505
class RemoteSSHTransport(RemoteTransport):
506
"""Connection to smart server over SSH.
508
This is essentially just a factory to get 'RemoteTransport(url,
509
SmartSSHClientMedium).
512
def _build_medium(self):
513
location_config = config.LocationConfig(self.base)
514
bzr_remote_path = location_config.get_bzr_remote_path()
517
auth = config.AuthenticationConfig()
518
user = auth.get_user('ssh', self._host, self._port)
519
ssh_params = medium.SSHParams(self._host, self._port, user,
520
self._password, bzr_remote_path)
521
client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
522
return client_medium, (user, self._password)
525
class RemoteHTTPTransport(RemoteTransport):
526
"""Just a way to connect between a bzr+http:// url and http://.
528
This connection operates slightly differently than the RemoteSSHTransport.
529
It uses a plain http:// transport underneath, which defines what remote
530
.bzr/smart URL we are connected to. From there, all paths that are sent are
531
sent as relative paths, this way, the remote side can properly
532
de-reference them, since it is likely doing rewrite rules to translate an
533
HTTP path into a local path.
536
def __init__(self, base, _from_transport=None, http_transport=None):
537
if http_transport is None:
538
# FIXME: the password may be lost here because it appears in the
539
# url only for an intial construction (when the url came from the
541
http_url = base[len('bzr+'):]
542
self._http_transport = transport.get_transport(http_url)
544
self._http_transport = http_transport
545
super(RemoteHTTPTransport, self).__init__(
546
base, _from_transport=_from_transport)
548
def _build_medium(self):
549
# We let http_transport take care of the credentials
550
return self._http_transport.get_smart_medium(), None
552
def _remote_path(self, relpath):
553
"""After connecting, HTTP Transport only deals in relative URLs."""
554
# Adjust the relpath based on which URL this smart transport is
556
http_base = urlutils.normalize_url(self.get_smart_medium().base)
557
url = urlutils.join(self.base[len('bzr+'):], relpath)
558
url = urlutils.normalize_url(url)
559
return urlutils.relative_url(http_base, url)
561
def clone(self, relative_url):
562
"""Make a new RemoteHTTPTransport related to me.
564
This is re-implemented rather than using the default
565
RemoteTransport.clone() because we must be careful about the underlying
568
Also, the cloned smart transport will POST to the same .bzr/smart
569
location as this transport (although obviously the relative paths in the
570
smart requests may be different). This is so that the server doesn't
571
have to handle .bzr/smart requests at arbitrary places inside .bzr
572
directories, just at the initial URL the user uses.
792
self._translate_error(resp)
795
class SmartStreamClient(SmartProtocolBase):
796
"""Connection to smart server over two streams"""
798
def __init__(self, connect_func):
799
self._connect_func = connect_func
800
self._connected = False
805
def _ensure_connection(self):
806
if not self._connected:
807
self._in, self._out = self._connect_func()
808
self._connected = True
810
def _send_tuple(self, args):
811
self._ensure_connection()
812
_send_tuple(self._out, args)
814
def _send_bulk_data(self, body):
815
self._ensure_connection()
816
SmartProtocolBase._send_bulk_data(self, body)
818
def _recv_bulk(self):
819
self._ensure_connection()
820
return SmartProtocolBase._recv_bulk(self)
822
def _recv_tuple(self):
823
self._ensure_connection()
824
return SmartProtocolBase._recv_tuple(self)
826
def _recv_trailer(self):
827
self._ensure_connection()
828
return SmartProtocolBase._recv_trailer(self)
830
def disconnect(self):
831
"""Close connection to the server"""
836
def _call(self, *args):
837
self._send_tuple(args)
838
return self._recv_tuple()
840
def _call_with_upload(self, method, args, body):
841
"""Call an rpc, supplying bulk upload data.
843
:param method: method name to call
844
:param args: parameter args tuple
845
:param body: upload body as a byte string
575
abs_url = self.abspath(relative_url)
578
return RemoteHTTPTransport(abs_url,
579
_from_transport=self,
580
http_transport=self._http_transport)
582
def _redirected_to(self, source, target):
583
"""See transport._redirected_to"""
584
redirected = self._http_transport._redirected_to(source, target)
585
if (redirected is not None
586
and isinstance(redirected, type(self._http_transport))):
587
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
588
http_transport=redirected)
590
# Either None or a transport for a different protocol
594
class HintingSSHTransport(transport.Transport):
595
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
597
def __init__(self, url):
598
raise errors.UnsupportedProtocol(url,
599
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
847
self._send_tuple((method,) + args)
848
self._send_bulk_data(body)
849
return self._recv_tuple()
851
def query_version(self):
852
"""Return protocol version number of the server."""
853
# XXX: should make sure it's empty
854
self._send_tuple(('hello',))
855
resp = self._recv_tuple()
856
if resp == ('ok', '1'):
859
raise errors.SmartProtocolError("bad response %r" % (resp,))
862
class SmartTCPTransport(SmartTransport):
863
"""Connection to smart server over plain tcp"""
865
def __init__(self, url, clone_from=None):
866
super(SmartTCPTransport, self).__init__(url, clone_from)
868
self._port = int(self._port)
869
except (ValueError, TypeError), e:
870
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
873
def _connect_to_server(self):
874
self._socket = socket.socket()
875
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
876
result = self._socket.connect_ex((self._host, int(self._port)))
878
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
879
(self._host, self._port, os.strerror(result)))
880
# TODO: May be more efficient to just treat them as sockets
881
# throughout? But what about pipes to ssh?...
882
to_server = self._socket.makefile('w')
883
from_server = self._socket.makefile('r')
884
return from_server, to_server
886
def disconnect(self):
887
super(SmartTCPTransport, self).disconnect()
888
# XXX: Is closing the socket as well as closing the files really
890
if self._socket is not None:
894
from bzrlib.transport import sftp
895
except errors.ParamikoNotPresent:
896
# no paramiko, no SSHTransport.
899
class SmartSSHTransport(SmartTransport):
900
"""Connection to smart server over SSH."""
902
def __init__(self, url, clone_from=None):
903
# TODO: all this probably belongs in the parent class.
904
super(SmartSSHTransport, self).__init__(url, clone_from)
906
if self._port is not None:
907
self._port = int(self._port)
908
except (ValueError, TypeError), e:
909
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
911
def _connect_to_server(self):
912
# XXX: don't hardcode vendor
913
# XXX: cannot pass password to SSHSubprocess yet
914
if self._password is not None:
915
raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
916
self._ssh_connection = sftp.SSHSubprocess(self._host, 'openssh',
917
port=self._port, user=self._username,
918
command=['bzr', 'serve', '--inet'])
919
return self._ssh_connection.get_filelike_channels()
921
def disconnect(self):
922
super(SmartSSHTransport, self).disconnect()
923
self._ssh_connection.close()
602
926
def get_test_permutations():
603
"""Return (transport, server) permutations for testing."""
604
### We may need a little more test framework support to construct an
605
### appropriate RemoteTransport in the future.
606
from bzrlib.tests import test_server
607
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
927
"""Return (transport, server) permutations for testing"""
928
return [(SmartTCPTransport, SmartTCPServer_for_testing)]