~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-09-16 14:03:54 UTC
  • mfrom: (2017.1.1 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20060916140354-1a9932f525bb7182
(robertc) Add MemoryTree and TreeBuilder test helpers. Also test behavior of transport.has('/') which caused failures in this when merging, and as a result cleanup the sftp path normalisation logic.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""RemoteTransport client for the smart-server.
18
 
 
19
 
This module shouldn't be accessed directly.  The classes defined here should be
20
 
imported from bzrlib.smart.
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
21
49
"""
22
50
 
23
 
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
 
51
 
 
52
 
 
53
# TODO: A plain integer from query_version is too simple; should give some
 
54
# capabilities too?
 
55
 
 
56
# TODO: Server should probably catch exceptions within itself and send them
 
57
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
58
# Also needs to somehow report protocol errors like bad requests.  Need to
 
59
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
60
# bulk transfer and then something goes wrong.
 
61
 
 
62
# TODO: Standard marker at start of request/response lines?
 
63
 
 
64
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
65
#
 
66
# TODO: get/put objects could be changed to gradually read back the data as it
 
67
# comes across the network
 
68
#
 
69
# TODO: What should the server do if it hits an error and has to terminate?
 
70
#
 
71
# TODO: is it useful to allow multiple chunks in the bulk data?
 
72
#
 
73
# TODO: If we get an exception during transmission of bulk data we can't just
 
74
# emit the exception because it won't be seen.
 
75
#   John proposes:  I think it would be worthwhile to have a header on each
 
76
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
77
#   chunk as long as you finish the previous chunk.
 
78
#
 
79
# TODO: Clone method on Transport; should work up towards parent directory;
 
80
# unclear how this should be stored or communicated to the server... maybe
 
81
# just pass it on all relevant requests?
 
82
#
 
83
# TODO: Better name than clone() for changing between directories.  How about
 
84
# open_dir or change_dir or chdir?
 
85
#
 
86
# TODO: Is it really good to have the notion of current directory within the
 
87
# connection?  Perhaps all Transports should factor out a common connection
 
88
# from the thing that has the directory context?
 
89
#
 
90
# TODO: Pull more things common to sftp and ssh to a higher level.
 
91
#
 
92
# TODO: The server that manages a connection should be quite small and retain
 
93
# minimum state because each of the requests are supposed to be stateless.
 
94
# Then we can write another implementation that maps to http.
 
95
#
 
96
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
97
# abruptly drop the connection?
 
98
#
 
99
# TODO: Server in some cases will need to restrict access to files outside of
 
100
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
101
# ascending above the base directory, so we need to prevent paths
 
102
# containing '..' in either the server or transport layers.  (Also need to
 
103
# consider what happens if someone creates a symlink pointing outside the 
 
104
# directory tree...)
 
105
#
 
106
# TODO: Server should rebase absolute paths coming across the network to put
 
107
# them under the virtual root, if one is in use.  LocalTransport currently
 
108
# doesn't do that; if you give it an absolute path it just uses it.
 
109
 
110
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
111
# urlescape them instead.  Indeed possibly this should just literally be
 
112
# http-over-ssh.
 
113
#
 
114
# FIXME: This transport, with several others, has imperfect handling of paths
 
115
# within urls.  It'd probably be better for ".." from a root to raise an error
 
116
# rather than return the same directory as we do at present.
 
117
#
 
118
# TODO: Rather than working at the Transport layer we want a Branch,
 
119
# Repository or BzrDir objects that talk to a server.
 
120
#
 
121
# TODO: Probably want some way for server commands to gradually produce body
 
122
# data rather than passing it as a string; they could perhaps pass an
 
123
# iterator-like callback that will gradually yield data; it probably needs a
 
124
# close() method that will always be closed to do any necessary cleanup.
 
125
#
 
126
# TODO: Split the actual smart server from the ssh encoding of it.
 
127
#
 
128
# TODO: Perhaps support file-level readwrite operations over the transport
 
129
# too.
 
130
#
 
131
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
132
# branch doing file-level operations.
 
133
#
 
134
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
135
#       the socket, and it assumes everything is UTF8 sections separated
 
136
#       by \001. Which means a request like '\002' Will abort the connection
 
137
#       because of a UnicodeDecodeError. It does look like invalid data will
 
138
#       kill the SmartStreamServer, but only with an abort + exception, and 
 
139
#       the overall server shouldn't die.
24
140
 
25
141
from cStringIO import StringIO
 
142
import errno
 
143
import os
 
144
import socket
 
145
import sys
 
146
import tempfile
 
147
import threading
 
148
import urllib
 
149
import urlparse
26
150
 
27
151
from bzrlib import (
28
 
    config,
29
 
    debug,
 
152
    bzrdir,
30
153
    errors,
31
 
    remote,
 
154
    revision,
 
155
    transport,
32
156
    trace,
33
 
    transport,
34
157
    urlutils,
35
158
    )
36
 
from bzrlib.smart import client, medium
37
 
from bzrlib.symbol_versioning import (
38
 
    deprecated_method,
39
 
    )
40
 
 
41
 
 
42
 
class _SmartStat(object):
 
159
from bzrlib.bundle.serializer import write_bundle
 
160
from bzrlib.trace import mutter
 
161
from bzrlib.transport import local
 
162
 
 
163
# must do this otherwise urllib can't parse the urls properly :(
 
164
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
165
    transport.register_urlparse_netloc_protocol(scheme)
 
166
del scheme
 
167
 
 
168
 
 
169
def _recv_tuple(from_file):
 
170
    req_line = from_file.readline()
 
171
    return _decode_tuple(req_line)
 
172
 
 
173
 
 
174
def _decode_tuple(req_line):
 
175
    if req_line == None or req_line == '':
 
176
        return None
 
177
    if req_line[-1] != '\n':
 
178
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
179
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
180
 
 
181
 
 
182
def _send_tuple(to_file, args):
 
183
    # XXX: this will be inefficient.  Just ask Robert.
 
184
    to_file.write('\x01'.join((a.encode('utf-8') for a in args)) + '\n')
 
185
    to_file.flush()
 
186
 
 
187
 
 
188
class SmartProtocolBase(object):
 
189
    """Methods common to client and server"""
 
190
 
 
191
    def _send_bulk_data(self, body):
 
192
        """Send chunked body data"""
 
193
        assert isinstance(body, str)
 
194
        self._out.write('%d\n' % len(body))
 
195
        self._out.write(body)
 
196
        self._out.write('done\n')
 
197
        self._out.flush()
 
198
 
 
199
    # TODO: this only actually accomodates a single block; possibly should support
 
200
    # multiple chunks?
 
201
    def _recv_bulk(self):
 
202
        chunk_len = self._in.readline()
 
203
        try:
 
204
            chunk_len = int(chunk_len)
 
205
        except ValueError:
 
206
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
 
207
        bulk = self._in.read(chunk_len)
 
208
        if len(bulk) != chunk_len:
 
209
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
 
210
        self._recv_trailer()
 
211
        return bulk
 
212
 
 
213
    def _recv_tuple(self):
 
214
        return _recv_tuple(self._in)
 
215
 
 
216
    def _recv_trailer(self):
 
217
        resp = self._recv_tuple()
 
218
        if resp == ('done', ):
 
219
            return
 
220
        else:
 
221
            self._translate_error(resp)
 
222
 
 
223
 
 
224
class SmartStreamServer(SmartProtocolBase):
 
225
    """Handles smart commands coming over a stream.
 
226
 
 
227
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
228
    in-process fifo for testing.
 
229
 
 
230
    One instance is created for each connected client; it can serve multiple
 
231
    requests in the lifetime of the connection.
 
232
 
 
233
    The server passes requests through to an underlying backing transport, 
 
234
    which will typically be a LocalTransport looking at the server's filesystem.
 
235
    """
 
236
 
 
237
    def __init__(self, in_file, out_file, backing_transport):
 
238
        """Construct new server.
 
239
 
 
240
        :param in_file: Python file from which requests can be read.
 
241
        :param out_file: Python file to write responses.
 
242
        :param backing_transport: Transport for the directory served.
 
243
        """
 
244
        self._in = in_file
 
245
        self._out = out_file
 
246
        self.smart_server = SmartServer(backing_transport)
 
247
        # server can call back to us to get bulk data - this is not really
 
248
        # ideal, they should get it per request instead
 
249
        self.smart_server._recv_body = self._recv_bulk
 
250
 
 
251
    def _recv_tuple(self):
 
252
        """Read a request from the client and return as a tuple.
 
253
        
 
254
        Returns None at end of file (if the client closed the connection.)
 
255
        """
 
256
        return _recv_tuple(self._in)
 
257
 
 
258
    def _send_tuple(self, args):
 
259
        """Send response header"""
 
260
        return _send_tuple(self._out, args)
 
261
 
 
262
    def _send_error_and_disconnect(self, exception):
 
263
        self._send_tuple(('error', str(exception)))
 
264
        self._out.flush()
 
265
        ## self._out.close()
 
266
        ## self._in.close()
 
267
 
 
268
    def _serve_one_request(self):
 
269
        """Read one request from input, process, send back a response.
 
270
        
 
271
        :return: False if the server should terminate, otherwise None.
 
272
        """
 
273
        req_args = self._recv_tuple()
 
274
        if req_args == None:
 
275
            # client closed connection
 
276
            return False  # shutdown server
 
277
        try:
 
278
            response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
 
279
            self._send_tuple(response.args)
 
280
            if response.body is not None:
 
281
                self._send_bulk_data(response.body)
 
282
        except KeyboardInterrupt:
 
283
            raise
 
284
        except Exception, e:
 
285
            # everything else: pass to client, flush, and quit
 
286
            self._send_error_and_disconnect(e)
 
287
            return False
 
288
 
 
289
    def serve(self):
 
290
        """Serve requests until the client disconnects."""
 
291
        # Keep a reference to stderr because the sys module's globals get set to
 
292
        # None during interpreter shutdown.
 
293
        from sys import stderr
 
294
        try:
 
295
            while self._serve_one_request() != False:
 
296
                pass
 
297
        except Exception, e:
 
298
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
299
            raise
 
300
 
 
301
 
 
302
class SmartServerResponse(object):
 
303
    """Response generated by SmartServer."""
 
304
 
 
305
    def __init__(self, args, body=None):
 
306
        self.args = args
 
307
        self.body = body
 
308
 
 
309
 
 
310
class SmartServer(object):
 
311
    """Protocol logic for smart server.
 
312
    
 
313
    This doesn't handle serialization at all, it just processes requests and
 
314
    creates responses.
 
315
    """
 
316
 
 
317
    # TODO: Better way of representing the body for commands that take it,
 
318
    # and allow it to be streamed into the server.
 
319
    
 
320
    def __init__(self, backing_transport):
 
321
        self._backing_transport = backing_transport
 
322
        
 
323
    def do_hello(self):
 
324
        """Answer a version request with my version."""
 
325
        return SmartServerResponse(('ok', '1'))
 
326
 
 
327
    def do_has(self, relpath):
 
328
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
329
        return SmartServerResponse((r,))
 
330
 
 
331
    def do_get(self, relpath):
 
332
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
333
        return SmartServerResponse(('ok',), backing_bytes)
 
334
 
 
335
    def _deserialise_optional_mode(self, mode):
 
336
        if mode == '':
 
337
            return None
 
338
        else:
 
339
            return int(mode)
 
340
 
 
341
    def do_append(self, relpath, mode):
 
342
        old_length = self._backing_transport.append_bytes(
 
343
            relpath, self._recv_body(), self._deserialise_optional_mode(mode))
 
344
        return SmartServerResponse(('appended', '%d' % old_length))
 
345
 
 
346
    def do_delete(self, relpath):
 
347
        self._backing_transport.delete(relpath)
 
348
 
 
349
    def do_iter_files_recursive(self, abspath):
 
350
        # XXX: the path handling needs some thought.
 
351
        #relpath = self._backing_transport.relpath(abspath)
 
352
        transport = self._backing_transport.clone(abspath)
 
353
        filenames = transport.iter_files_recursive()
 
354
        return SmartServerResponse(('names',) + tuple(filenames))
 
355
 
 
356
    def do_list_dir(self, relpath):
 
357
        filenames = self._backing_transport.list_dir(relpath)
 
358
        return SmartServerResponse(('names',) + tuple(filenames))
 
359
 
 
360
    def do_mkdir(self, relpath, mode):
 
361
        self._backing_transport.mkdir(relpath,
 
362
                                      self._deserialise_optional_mode(mode))
 
363
 
 
364
    def do_move(self, rel_from, rel_to):
 
365
        self._backing_transport.move(rel_from, rel_to)
 
366
 
 
367
    def do_put(self, relpath, mode):
 
368
        self._backing_transport.put_bytes(relpath,
 
369
                self._recv_body(),
 
370
                self._deserialise_optional_mode(mode))
 
371
 
 
372
    def do_rename(self, rel_from, rel_to):
 
373
        self._backing_transport.rename(rel_from, rel_to)
 
374
 
 
375
    def do_rmdir(self, relpath):
 
376
        self._backing_transport.rmdir(relpath)
 
377
 
 
378
    def do_stat(self, relpath):
 
379
        stat = self._backing_transport.stat(relpath)
 
380
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
381
        
 
382
    def do_get_bundle(self, path, revision_id):
 
383
        # open transport relative to our base
 
384
        t = self._backing_transport.clone(path)
 
385
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
386
        repo = control.open_repository()
 
387
        tmpf = tempfile.TemporaryFile()
 
388
        base_revision = revision.NULL_REVISION
 
389
        write_bundle(repo, revision_id, base_revision, tmpf)
 
390
        tmpf.seek(0)
 
391
        return SmartServerResponse((), tmpf.read())
 
392
 
 
393
    def dispatch_command(self, cmd, args):
 
394
        func = getattr(self, 'do_' + cmd, None)
 
395
        if func is None:
 
396
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
397
        try:
 
398
            result = func(*args)
 
399
            if result is None: 
 
400
                result = SmartServerResponse(('ok',))
 
401
            return result
 
402
        except errors.NoSuchFile, e:
 
403
            return SmartServerResponse(('NoSuchFile', e.path))
 
404
        except errors.FileExists, e:
 
405
            return SmartServerResponse(('FileExists', e.path))
 
406
        except errors.DirectoryNotEmpty, e:
 
407
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
408
        except UnicodeError, e:
 
409
            # If it is a DecodeError, than most likely we are starting
 
410
            # with a plain string
 
411
            str_or_unicode = e.object
 
412
            if isinstance(str_or_unicode, unicode):
 
413
                val = u'u:' + str_or_unicode
 
414
            else:
 
415
                val = u's:' + str_or_unicode.encode('base64')
 
416
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
417
            return SmartServerResponse((e.__class__.__name__,
 
418
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
419
 
 
420
 
 
421
class SmartTCPServer(object):
 
422
    """Listens on a TCP socket and accepts connections from smart clients"""
 
423
 
 
424
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
425
        """Construct a new server.
 
426
 
 
427
        To actually start it running, call either start_background_thread or
 
428
        serve.
 
429
 
 
430
        :param host: Name of the interface to listen on.
 
431
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
432
        """
 
433
        if backing_transport is None:
 
434
            backing_transport = memory.MemoryTransport()
 
435
        self._server_socket = socket.socket()
 
436
        self._server_socket.bind((host, port))
 
437
        self.port = self._server_socket.getsockname()[1]
 
438
        self._server_socket.listen(1)
 
439
        self._server_socket.settimeout(1)
 
440
        self.backing_transport = backing_transport
 
441
 
 
442
    def serve(self):
 
443
        # let connections timeout so that we get a chance to terminate
 
444
        # Keep a reference to the exceptions we want to catch because the socket
 
445
        # module's globals get set to None during interpreter shutdown.
 
446
        from socket import timeout as socket_timeout
 
447
        from socket import error as socket_error
 
448
        self._should_terminate = False
 
449
        while not self._should_terminate:
 
450
            try:
 
451
                self.accept_and_serve()
 
452
            except socket_timeout:
 
453
                # just check if we're asked to stop
 
454
                pass
 
455
            except socket_error, e:
 
456
                trace.warning("client disconnected: %s", e)
 
457
                pass
 
458
 
 
459
    def get_url(self):
 
460
        """Return the url of the server"""
 
461
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
462
 
 
463
    def accept_and_serve(self):
 
464
        conn, client_addr = self._server_socket.accept()
 
465
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
466
        from_client = conn.makefile('r')
 
467
        to_client = conn.makefile('w')
 
468
        handler = SmartStreamServer(from_client, to_client,
 
469
                self.backing_transport)
 
470
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
471
        connection_thread.setDaemon(True)
 
472
        connection_thread.start()
 
473
 
 
474
    def start_background_thread(self):
 
475
        self._server_thread = threading.Thread(None,
 
476
                self.serve,
 
477
                name='server-' + self.get_url())
 
478
        self._server_thread.setDaemon(True)
 
479
        self._server_thread.start()
 
480
 
 
481
    def stop_background_thread(self):
 
482
        self._should_terminate = True
 
483
        # self._server_socket.close()
 
484
        # we used to join the thread, but it's not really necessary; it will
 
485
        # terminate in time
 
486
        ## self._server_thread.join()
 
487
 
 
488
 
 
489
class SmartTCPServer_for_testing(SmartTCPServer):
 
490
    """Server suitable for use by transport tests.
 
491
    
 
492
    This server is backed by the process's cwd.
 
493
    """
 
494
 
 
495
    def __init__(self):
 
496
        self._homedir = os.getcwd()
 
497
        # The server is set up by default like for ssh access: the client
 
498
        # passes filesystem-absolute paths; therefore the server must look
 
499
        # them up relative to the root directory.  it might be better to act
 
500
        # a public server and have the server rewrite paths into the test
 
501
        # directory.
 
502
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
503
        
 
504
    def setUp(self):
 
505
        """Set up server for testing"""
 
506
        self.start_background_thread()
 
507
 
 
508
    def tearDown(self):
 
509
        self.stop_background_thread()
 
510
 
 
511
    def get_url(self):
 
512
        """Return the url of the server"""
 
513
        host, port = self._server_socket.getsockname()
 
514
        # XXX: I think this is likely to break on windows -- self._homedir will
 
515
        # have backslashes (and maybe a drive letter?).
 
516
        #  -- Andrew Bennetts, 2006-08-29
 
517
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
518
 
 
519
    def get_bogus_url(self):
 
520
        """Return a URL which will fail to connect"""
 
521
        return 'bzr://127.0.0.1:1/'
 
522
 
 
523
 
 
524
class SmartStat(object):
43
525
 
44
526
    def __init__(self, size, mode):
45
527
        self.st_size = size
46
528
        self.st_mode = mode
47
529
 
48
530
 
49
 
class RemoteTransport(transport.ConnectedTransport):
 
531
class SmartTransport(transport.Transport):
50
532
    """Connection to a smart server.
51
533
 
52
 
    The connection holds references to the medium that can be used to send
53
 
    requests to the server.
 
534
    The connection holds references to pipes that can be used to send requests
 
535
    to the server.
54
536
 
55
537
    The connection has a notion of the current directory to which it's
56
538
    connected; this is incorporated in filenames passed to the server.
57
 
 
58
 
    This supports some higher-level RPC operations and can also be treated
 
539
    
 
540
    This supports some higher-level RPC operations and can also be treated 
59
541
    like a Transport to do file-like operations.
60
542
 
61
 
    The connection can be made over a tcp socket, an ssh pipe or a series of
62
 
    http requests.  There are concrete subclasses for each type:
63
 
    RemoteTCPTransport, etc.
 
543
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
544
    or a series of http requests.  There are concrete subclasses for each
 
545
    type: SmartTCPTransport, etc.
64
546
    """
65
547
 
66
 
    # When making a readv request, cap it at requesting 5MB of data
67
 
    _max_readv_bytes = 5*1024*1024
68
 
 
69
 
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
70
 
    # responsibilities: Put those on SmartClient or similar. This is vital for
71
 
    # the ability to support multiple versions of the smart protocol over time:
72
 
    # RemoteTransport is an adapter from the Transport object model to the
73
 
    # SmartClient model, not an encoder.
74
 
 
75
 
    # FIXME: the medium parameter should be private, only the tests requires
76
 
    # it. It may be even clearer to define a TestRemoteTransport that handles
77
 
    # the specific cases of providing a _client and/or a _medium, and leave
78
 
    # RemoteTransport as an abstract class.
79
 
    def __init__(self, url, _from_transport=None, medium=None, _client=None):
 
548
    def __init__(self, url, clone_from=None, client=None):
80
549
        """Constructor.
81
550
 
82
 
        :param _from_transport: Another RemoteTransport instance that this
83
 
            one is being cloned from.  Attributes such as the medium will
84
 
            be reused.
85
 
 
86
 
        :param medium: The medium to use for this RemoteTransport.  If None,
87
 
            the medium from the _from_transport is shared.  If both this
88
 
            and _from_transport are None, a new medium will be built.
89
 
            _from_transport and medium cannot both be specified.
90
 
 
91
 
        :param _client: Override the _SmartClient used by this transport.  This
92
 
            should only be used for testing purposes; normally this is
93
 
            determined from the medium.
94
 
        """
95
 
        super(RemoteTransport, self).__init__(
96
 
            url, _from_transport=_from_transport)
97
 
 
98
 
        # The medium is the connection, except when we need to share it with
99
 
        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
 
        # what we want to share is really the shared connection.
101
 
 
102
 
        if (_from_transport is not None
103
 
            and isinstance(_from_transport, RemoteTransport)):
104
 
            _client = _from_transport._client
105
 
        elif _from_transport is None:
106
 
            # If no _from_transport is specified, we need to intialize the
107
 
            # shared medium.
108
 
            credentials = None
109
 
            if medium is None:
110
 
                medium, credentials = self._build_medium()
111
 
                if 'hpss' in debug.debug_flags:
112
 
                    trace.mutter('hpss: Built a new medium: %s',
113
 
                                 medium.__class__.__name__)
114
 
            self._shared_connection = transport._SharedConnection(medium,
115
 
                                                                  credentials,
116
 
                                                                  self.base)
117
 
        elif medium is None:
118
 
            # No medium was specified, so share the medium from the
119
 
            # _from_transport.
120
 
            medium = self._shared_connection.connection
121
 
        else:
122
 
            raise AssertionError(
123
 
                "Both _from_transport (%r) and medium (%r) passed to "
124
 
                "RemoteTransport.__init__, but these parameters are mutally "
125
 
                "exclusive." % (_from_transport, medium))
126
 
 
127
 
        if _client is None:
128
 
            self._client = client._SmartClient(medium)
129
 
        else:
130
 
            self._client = _client
131
 
 
132
 
    def _build_medium(self):
133
 
        """Create the medium if _from_transport does not provide one.
134
 
 
135
 
        The medium is analogous to the connection for ConnectedTransport: it
136
 
        allows connection sharing.
137
 
        """
138
 
        # No credentials
139
 
        return None, None
140
 
 
141
 
    def _report_activity(self, bytes, direction):
142
 
        """See Transport._report_activity.
143
 
 
144
 
        Does nothing; the smart medium will report activity triggered by a
145
 
        RemoteTransport.
146
 
        """
147
 
        pass
 
551
        :param client: ignored when clone_from is not None.
 
552
        """
 
553
        ### Technically super() here is faulty because Transport's __init__
 
554
        ### fails to take 2 parameters, and if super were to choose a silly
 
555
        ### initialisation order things would blow up. 
 
556
        if not url.endswith('/'):
 
557
            url += '/'
 
558
        super(SmartTransport, self).__init__(url)
 
559
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
560
                transport.split_url(url)
 
561
        if clone_from is None:
 
562
            if client is None:
 
563
                self._client = SmartStreamClient(self._connect_to_server)
 
564
            else:
 
565
                self._client = client
 
566
        else:
 
567
            # credentials may be stripped from the base in some circumstances
 
568
            # as yet to be clearly defined or documented, so copy them.
 
569
            self._username = clone_from._username
 
570
            # reuse same connection
 
571
            self._client = clone_from._client
 
572
 
 
573
    def abspath(self, relpath):
 
574
        """Return the full url to the given relative path.
 
575
        
 
576
        @param relpath: the relative path or path components
 
577
        @type relpath: str or list
 
578
        """
 
579
        return self._unparse_url(self._remote_path(relpath))
 
580
    
 
581
    def clone(self, relative_url):
 
582
        """Make a new SmartTransport related to me, sharing the same connection.
 
583
 
 
584
        This essentially opens a handle on a different remote directory.
 
585
        """
 
586
        if relative_url is None:
 
587
            return self.__class__(self.base, self)
 
588
        else:
 
589
            return self.__class__(self.abspath(relative_url), self)
148
590
 
149
591
    def is_readonly(self):
150
592
        """Smart server transport can do read/write file operations."""
151
 
        try:
152
 
            resp = self._call2('Transport.is_readonly')
153
 
        except errors.UnknownSmartMethod:
154
 
            # XXX: nasty hack: servers before 0.16 don't have a
155
 
            # 'Transport.is_readonly' verb, so we do what clients before 0.16
156
 
            # did: assume False.
157
 
            return False
158
 
        if resp == ('yes', ):
159
 
            return True
160
 
        elif resp == ('no', ):
161
 
            return False
162
 
        else:
163
 
            raise errors.UnexpectedSmartServerResponse(resp)
164
 
 
 
593
        return False
 
594
                                                   
165
595
    def get_smart_client(self):
166
 
        return self._get_connection()
 
596
        return self._client
 
597
                                                   
 
598
    def _unparse_url(self, path):
 
599
        """Return URL for a path.
167
600
 
168
 
    def get_smart_medium(self):
169
 
        return self._get_connection()
 
601
        :see: SFTPUrlHandling._unparse_url
 
602
        """
 
603
        # TODO: Eventually it should be possible to unify this with
 
604
        # SFTPUrlHandling._unparse_url?
 
605
        if path == '':
 
606
            path = '/'
 
607
        path = urllib.quote(path)
 
608
        netloc = urllib.quote(self._host)
 
609
        if self._username is not None:
 
610
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
611
        if self._port is not None:
 
612
            netloc = '%s:%d' % (netloc, self._port)
 
613
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
170
614
 
171
615
    def _remote_path(self, relpath):
172
616
        """Returns the Unicode version of the absolute path for relpath."""
173
617
        return self._combine_paths(self._path, relpath)
174
618
 
175
 
    def _call(self, method, *args):
176
 
        resp = self._call2(method, *args)
177
 
        self._ensure_ok(resp)
178
 
 
179
 
    def _call2(self, method, *args):
180
 
        """Call a method on the remote server."""
181
 
        try:
182
 
            return self._client.call(method, *args)
183
 
        except errors.ErrorFromSmartServer, err:
184
 
            # The first argument, if present, is always a path.
185
 
            if args:
186
 
                context = {'relpath': args[0]}
187
 
            else:
188
 
                context = {}
189
 
            self._translate_error(err, **context)
190
 
 
191
 
    def _call_with_body_bytes(self, method, args, body):
192
 
        """Call a method on the remote server with body bytes."""
193
 
        try:
194
 
            return self._client.call_with_body_bytes(method, args, body)
195
 
        except errors.ErrorFromSmartServer, err:
196
 
            # The first argument, if present, is always a path.
197
 
            if args:
198
 
                context = {'relpath': args[0]}
199
 
            else:
200
 
                context = {}
201
 
            self._translate_error(err, **context)
202
 
 
203
619
    def has(self, relpath):
204
620
        """Indicate whether a remote file of the given name exists or not.
205
621
 
206
622
        :see: Transport.has()
207
623
        """
208
 
        resp = self._call2('has', self._remote_path(relpath))
 
624
        resp = self._client._call('has', self._remote_path(relpath))
209
625
        if resp == ('yes', ):
210
626
            return True
211
627
        elif resp == ('no', ):
212
628
            return False
213
629
        else:
214
 
            raise errors.UnexpectedSmartServerResponse(resp)
 
630
            self._translate_error(resp)
215
631
 
216
632
    def get(self, relpath):
217
633
        """Return file-like object reading the contents of a remote file.
218
 
 
 
634
        
219
635
        :see: Transport.get_bytes()/get_file()
220
636
        """
221
 
        return StringIO(self.get_bytes(relpath))
222
 
 
223
 
    def get_bytes(self, relpath):
224
637
        remote = self._remote_path(relpath)
225
 
        try:
226
 
            resp, response_handler = self._client.call_expecting_body('get', remote)
227
 
        except errors.ErrorFromSmartServer, err:
228
 
            self._translate_error(err, relpath)
 
638
        resp = self._client._call('get', remote)
229
639
        if resp != ('ok', ):
230
 
            response_handler.cancel_read_body()
231
 
            raise errors.UnexpectedSmartServerResponse(resp)
232
 
        return response_handler.read_body_bytes()
 
640
            self._translate_error(resp, relpath)
 
641
        return StringIO(self._client._recv_bulk())
233
642
 
234
643
    def _serialise_optional_mode(self, mode):
235
644
        if mode is None:
238
647
            return '%d' % mode
239
648
 
240
649
    def mkdir(self, relpath, mode=None):
241
 
        resp = self._call2('mkdir', self._remote_path(relpath),
242
 
            self._serialise_optional_mode(mode))
243
 
 
244
 
    def open_write_stream(self, relpath, mode=None):
245
 
        """See Transport.open_write_stream."""
246
 
        self.put_bytes(relpath, "", mode)
247
 
        result = transport.AppendBasedFileStream(self, relpath)
248
 
        transport._file_streams[self.abspath(relpath)] = result
249
 
        return result
250
 
 
251
 
    def put_bytes(self, relpath, upload_contents, mode=None):
252
 
        # FIXME: upload_file is probably not safe for non-ascii characters -
253
 
        # should probably just pass all parameters as length-delimited
254
 
        # strings?
255
 
        if type(upload_contents) is unicode:
256
 
            # Although not strictly correct, we raise UnicodeEncodeError to be
257
 
            # compatible with other transports.
258
 
            raise UnicodeEncodeError(
259
 
                'undefined', upload_contents, 0, 1,
260
 
                'put_bytes must be given bytes, not unicode.')
261
 
        resp = self._call_with_body_bytes('put',
262
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
263
 
            upload_contents)
264
 
        self._ensure_ok(resp)
265
 
        return len(upload_contents)
266
 
 
267
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
268
 
                             create_parent_dir=False,
269
 
                             dir_mode=None):
270
 
        """See Transport.put_bytes_non_atomic."""
271
 
        # FIXME: no encoding in the transport!
272
 
        create_parent_str = 'F'
273
 
        if create_parent_dir:
274
 
            create_parent_str = 'T'
275
 
 
276
 
        resp = self._call_with_body_bytes(
277
 
            'put_non_atomic',
278
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
279
 
             create_parent_str, self._serialise_optional_mode(dir_mode)),
280
 
            bytes)
281
 
        self._ensure_ok(resp)
 
650
        resp = self._client._call('mkdir', 
 
651
                                  self._remote_path(relpath), 
 
652
                                  self._serialise_optional_mode(mode))
 
653
        self._translate_error(resp)
282
654
 
283
655
    def put_file(self, relpath, upload_file, mode=None):
284
656
        # its not ideal to seek back, but currently put_non_atomic_file depends
291
663
            upload_file.seek(pos)
292
664
            raise
293
665
 
294
 
    def put_file_non_atomic(self, relpath, f, mode=None,
295
 
                            create_parent_dir=False,
296
 
                            dir_mode=None):
297
 
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
298
 
                                         create_parent_dir=create_parent_dir,
299
 
                                         dir_mode=dir_mode)
 
666
    def put_bytes(self, relpath, upload_contents, mode=None):
 
667
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
668
        # should probably just pass all parameters as length-delimited
 
669
        # strings?
 
670
        resp = self._client._call_with_upload(
 
671
            'put',
 
672
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
673
            upload_contents)
 
674
        self._translate_error(resp)
300
675
 
301
676
    def append_file(self, relpath, from_file, mode=None):
302
677
        return self.append_bytes(relpath, from_file.read(), mode)
303
 
 
 
678
        
304
679
    def append_bytes(self, relpath, bytes, mode=None):
305
 
        resp = self._call_with_body_bytes(
 
680
        resp = self._client._call_with_upload(
306
681
            'append',
307
682
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
308
683
            bytes)
309
684
        if resp[0] == 'appended':
310
685
            return int(resp[1])
311
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
686
        self._translate_error(resp)
312
687
 
313
688
    def delete(self, relpath):
314
 
        resp = self._call2('delete', self._remote_path(relpath))
315
 
        self._ensure_ok(resp)
316
 
 
317
 
    def external_url(self):
318
 
        """See bzrlib.transport.Transport.external_url."""
319
 
        # the external path for RemoteTransports is the base
320
 
        return self.base
321
 
 
322
 
    def recommended_page_size(self):
323
 
        """Return the recommended page size for this transport."""
324
 
        return 64 * 1024
325
 
 
326
 
    def _readv(self, relpath, offsets):
327
 
        if not offsets:
328
 
            return
329
 
 
330
 
        offsets = list(offsets)
331
 
 
332
 
        sorted_offsets = sorted(offsets)
333
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
334
 
                               limit=self._max_readv_combine,
335
 
                               fudge_factor=self._bytes_to_read_before_seek,
336
 
                               max_size=self._max_readv_bytes))
337
 
 
338
 
        # now that we've coallesced things, avoid making enormous requests
339
 
        requests = []
340
 
        cur_request = []
341
 
        cur_len = 0
342
 
        for c in coalesced:
343
 
            if c.length + cur_len > self._max_readv_bytes:
344
 
                requests.append(cur_request)
345
 
                cur_request = [c]
346
 
                cur_len = c.length
347
 
                continue
348
 
            cur_request.append(c)
349
 
            cur_len += c.length
350
 
        if cur_request:
351
 
            requests.append(cur_request)
352
 
        if 'hpss' in debug.debug_flags:
353
 
            trace.mutter('%s.readv %s offsets => %s coalesced'
354
 
                         ' => %s requests (%s)',
355
 
                         self.__class__.__name__, len(offsets), len(coalesced),
356
 
                         len(requests), sum(map(len, requests)))
357
 
        # Cache the results, but only until they have been fulfilled
358
 
        data_map = {}
359
 
        # turn the list of offsets into a single stack to iterate
360
 
        offset_stack = iter(offsets)
361
 
        # using a list so it can be modified when passing down and coming back
362
 
        next_offset = [offset_stack.next()]
363
 
        for cur_request in requests:
364
 
            try:
365
 
                result = self._client.call_with_body_readv_array(
366
 
                    ('readv', self._remote_path(relpath),),
367
 
                    [(c.start, c.length) for c in cur_request])
368
 
                resp, response_handler = result
369
 
            except errors.ErrorFromSmartServer, err:
370
 
                self._translate_error(err, relpath)
371
 
 
372
 
            if resp[0] != 'readv':
373
 
                # This should raise an exception
374
 
                response_handler.cancel_read_body()
375
 
                raise errors.UnexpectedSmartServerResponse(resp)
376
 
 
377
 
            for res in self._handle_response(offset_stack, cur_request,
378
 
                                             response_handler,
379
 
                                             data_map,
380
 
                                             next_offset):
381
 
                yield res
382
 
 
383
 
    def _handle_response(self, offset_stack, coalesced, response_handler,
384
 
                         data_map, next_offset):
385
 
        cur_offset_and_size = next_offset[0]
386
 
        # FIXME: this should know how many bytes are needed, for clarity.
387
 
        data = response_handler.read_body_bytes()
388
 
        data_offset = 0
389
 
        for c_offset in coalesced:
390
 
            if len(data) < c_offset.length:
391
 
                raise errors.ShortReadvError(relpath, c_offset.start,
392
 
                            c_offset.length, actual=len(data))
393
 
            for suboffset, subsize in c_offset.ranges:
394
 
                key = (c_offset.start+suboffset, subsize)
395
 
                this_data = data[data_offset+suboffset:
396
 
                                 data_offset+suboffset+subsize]
397
 
                # Special case when the data is in-order, rather than packing
398
 
                # into a map and then back out again. Benchmarking shows that
399
 
                # this has 100% hit rate, but leave in the data_map work just
400
 
                # in case.
401
 
                # TODO: Could we get away with using buffer() to avoid the
402
 
                #       memory copy?  Callers would need to realize they may
403
 
                #       not have a real string.
404
 
                if key == cur_offset_and_size:
405
 
                    yield cur_offset_and_size[0], this_data
406
 
                    cur_offset_and_size = next_offset[0] = offset_stack.next()
407
 
                else:
408
 
                    data_map[key] = this_data
409
 
            data_offset += c_offset.length
410
 
 
411
 
            # Now that we've read some data, see if we can yield anything back
412
 
            while cur_offset_and_size in data_map:
413
 
                this_data = data_map.pop(cur_offset_and_size)
414
 
                yield cur_offset_and_size[0], this_data
415
 
                cur_offset_and_size = next_offset[0] = offset_stack.next()
 
689
        resp = self._client._call('delete', self._remote_path(relpath))
 
690
        self._translate_error(resp)
416
691
 
417
692
    def rename(self, rel_from, rel_to):
418
 
        self._call('rename',
 
693
        self._call('rename', 
419
694
                   self._remote_path(rel_from),
420
695
                   self._remote_path(rel_to))
421
696
 
422
697
    def move(self, rel_from, rel_to):
423
 
        self._call('move',
 
698
        self._call('move', 
424
699
                   self._remote_path(rel_from),
425
700
                   self._remote_path(rel_to))
426
701
 
427
702
    def rmdir(self, relpath):
428
703
        resp = self._call('rmdir', self._remote_path(relpath))
429
704
 
430
 
    def _ensure_ok(self, resp):
431
 
        if resp[0] != 'ok':
432
 
            raise errors.UnexpectedSmartServerResponse(resp)
433
 
 
434
 
    def _translate_error(self, err, relpath=None):
435
 
        remote._translate_error(err, path=relpath)
 
705
    def _call(self, method, *args):
 
706
        resp = self._client._call(method, *args)
 
707
        self._translate_error(resp)
 
708
 
 
709
    def _translate_error(self, resp, orig_path=None):
 
710
        """Raise an exception from a response"""
 
711
        what = resp[0]
 
712
        if what == 'ok':
 
713
            return
 
714
        elif what == 'NoSuchFile':
 
715
            if orig_path is not None:
 
716
                error_path = orig_path
 
717
            else:
 
718
                error_path = resp[1]
 
719
            raise errors.NoSuchFile(error_path)
 
720
        elif what == 'error':
 
721
            raise errors.SmartProtocolError(unicode(resp[1]))
 
722
        elif what == 'FileExists':
 
723
            raise errors.FileExists(resp[1])
 
724
        elif what == 'DirectoryNotEmpty':
 
725
            raise errors.DirectoryNotEmpty(resp[1])
 
726
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
727
            encoding = str(resp[1]) # encoding must always be a string
 
728
            val = resp[2]
 
729
            start = int(resp[3])
 
730
            end = int(resp[4])
 
731
            reason = str(resp[5]) # reason must always be a string
 
732
            if val.startswith('u:'):
 
733
                val = val[2:]
 
734
            elif val.startswith('s:'):
 
735
                val = val[2:].decode('base64')
 
736
            if what == 'UnicodeDecodeError':
 
737
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
738
            elif what == 'UnicodeEncodeError':
 
739
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
740
        else:
 
741
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
742
 
 
743
    def _send_tuple(self, args):
 
744
        self._client._send_tuple(args)
 
745
 
 
746
    def _recv_tuple(self):
 
747
        return self._client._recv_tuple()
436
748
 
437
749
    def disconnect(self):
438
 
        m = self.get_smart_medium()
439
 
        if m is not None:
440
 
            m.disconnect()
 
750
        self._client.disconnect()
 
751
 
 
752
    def delete_tree(self, relpath):
 
753
        raise errors.TransportNotPossible('readonly transport')
441
754
 
442
755
    def stat(self, relpath):
443
 
        resp = self._call2('stat', self._remote_path(relpath))
 
756
        resp = self._client._call('stat', self._remote_path(relpath))
444
757
        if resp[0] == 'stat':
445
 
            return _SmartStat(int(resp[1]), int(resp[2], 8))
446
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
758
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
759
        else:
 
760
            self._translate_error(resp)
447
761
 
448
762
    ## def lock_read(self, relpath):
449
763
    ##     """Lock the given file for shared (read) access.
462
776
        return True
463
777
 
464
778
    def list_dir(self, relpath):
465
 
        resp = self._call2('list_dir', self._remote_path(relpath))
 
779
        resp = self._client._call('list_dir',
 
780
                                  self._remote_path(relpath))
466
781
        if resp[0] == 'names':
467
782
            return [name.encode('ascii') for name in resp[1:]]
468
 
        raise errors.UnexpectedSmartServerResponse(resp)
 
783
        else:
 
784
            self._translate_error(resp)
469
785
 
470
786
    def iter_files_recursive(self):
471
 
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
787
        resp = self._client._call('iter_files_recursive',
 
788
                                  self._remote_path(''))
472
789
        if resp[0] == 'names':
473
790
            return resp[1:]
474
 
        raise errors.UnexpectedSmartServerResponse(resp)
475
 
 
476
 
 
477
 
class RemoteTCPTransport(RemoteTransport):
478
 
    """Connection to smart server over plain tcp.
479
 
 
480
 
    This is essentially just a factory to get 'RemoteTransport(url,
481
 
        SmartTCPClientMedium).
482
 
    """
483
 
 
484
 
    def _build_medium(self):
485
 
        client_medium = medium.SmartTCPClientMedium(
486
 
            self._host, self._port, self.base)
487
 
        return client_medium, None
488
 
 
489
 
 
490
 
class RemoteTCPTransportV2Only(RemoteTransport):
491
 
    """Connection to smart server over plain tcp with the client hard-coded to
492
 
    assume protocol v2 and remote server version <= 1.6.
493
 
 
494
 
    This should only be used for testing.
495
 
    """
496
 
 
497
 
    def _build_medium(self):
498
 
        client_medium = medium.SmartTCPClientMedium(
499
 
            self._host, self._port, self.base)
500
 
        client_medium._protocol_version = 2
501
 
        client_medium._remember_remote_is_before((1, 6))
502
 
        return client_medium, None
503
 
 
504
 
 
505
 
class RemoteSSHTransport(RemoteTransport):
506
 
    """Connection to smart server over SSH.
507
 
 
508
 
    This is essentially just a factory to get 'RemoteTransport(url,
509
 
        SmartSSHClientMedium).
510
 
    """
511
 
 
512
 
    def _build_medium(self):
513
 
        location_config = config.LocationConfig(self.base)
514
 
        bzr_remote_path = location_config.get_bzr_remote_path()
515
 
        user = self._user
516
 
        if user is None:
517
 
            auth = config.AuthenticationConfig()
518
 
            user = auth.get_user('ssh', self._host, self._port)
519
 
        ssh_params = medium.SSHParams(self._host, self._port, user,
520
 
            self._password, bzr_remote_path)
521
 
        client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
522
 
        return client_medium, (user, self._password)
523
 
 
524
 
 
525
 
class RemoteHTTPTransport(RemoteTransport):
526
 
    """Just a way to connect between a bzr+http:// url and http://.
527
 
 
528
 
    This connection operates slightly differently than the RemoteSSHTransport.
529
 
    It uses a plain http:// transport underneath, which defines what remote
530
 
    .bzr/smart URL we are connected to. From there, all paths that are sent are
531
 
    sent as relative paths, this way, the remote side can properly
532
 
    de-reference them, since it is likely doing rewrite rules to translate an
533
 
    HTTP path into a local path.
534
 
    """
535
 
 
536
 
    def __init__(self, base, _from_transport=None, http_transport=None):
537
 
        if http_transport is None:
538
 
            # FIXME: the password may be lost here because it appears in the
539
 
            # url only for an intial construction (when the url came from the
540
 
            # command-line).
541
 
            http_url = base[len('bzr+'):]
542
 
            self._http_transport = transport.get_transport(http_url)
543
791
        else:
544
 
            self._http_transport = http_transport
545
 
        super(RemoteHTTPTransport, self).__init__(
546
 
            base, _from_transport=_from_transport)
547
 
 
548
 
    def _build_medium(self):
549
 
        # We let http_transport take care of the credentials
550
 
        return self._http_transport.get_smart_medium(), None
551
 
 
552
 
    def _remote_path(self, relpath):
553
 
        """After connecting, HTTP Transport only deals in relative URLs."""
554
 
        # Adjust the relpath based on which URL this smart transport is
555
 
        # connected to.
556
 
        http_base = urlutils.normalize_url(self.get_smart_medium().base)
557
 
        url = urlutils.join(self.base[len('bzr+'):], relpath)
558
 
        url = urlutils.normalize_url(url)
559
 
        return urlutils.relative_url(http_base, url)
560
 
 
561
 
    def clone(self, relative_url):
562
 
        """Make a new RemoteHTTPTransport related to me.
563
 
 
564
 
        This is re-implemented rather than using the default
565
 
        RemoteTransport.clone() because we must be careful about the underlying
566
 
        http transport.
567
 
 
568
 
        Also, the cloned smart transport will POST to the same .bzr/smart
569
 
        location as this transport (although obviously the relative paths in the
570
 
        smart requests may be different).  This is so that the server doesn't
571
 
        have to handle .bzr/smart requests at arbitrary places inside .bzr
572
 
        directories, just at the initial URL the user uses.
 
792
            self._translate_error(resp)
 
793
 
 
794
 
 
795
class SmartStreamClient(SmartProtocolBase):
 
796
    """Connection to smart server over two streams"""
 
797
 
 
798
    def __init__(self, connect_func):
 
799
        self._connect_func = connect_func
 
800
        self._connected = False
 
801
 
 
802
    def __del__(self):
 
803
        self.disconnect()
 
804
 
 
805
    def _ensure_connection(self):
 
806
        if not self._connected:
 
807
            self._in, self._out = self._connect_func()
 
808
            self._connected = True
 
809
 
 
810
    def _send_tuple(self, args):
 
811
        self._ensure_connection()
 
812
        _send_tuple(self._out, args)
 
813
 
 
814
    def _send_bulk_data(self, body):
 
815
        self._ensure_connection()
 
816
        SmartProtocolBase._send_bulk_data(self, body)
 
817
        
 
818
    def _recv_bulk(self):
 
819
        self._ensure_connection()
 
820
        return SmartProtocolBase._recv_bulk(self)
 
821
 
 
822
    def _recv_tuple(self):
 
823
        self._ensure_connection()
 
824
        return SmartProtocolBase._recv_tuple(self)
 
825
 
 
826
    def _recv_trailer(self):
 
827
        self._ensure_connection()
 
828
        return SmartProtocolBase._recv_trailer(self)
 
829
 
 
830
    def disconnect(self):
 
831
        """Close connection to the server"""
 
832
        if self._connected:
 
833
            self._out.close()
 
834
            self._in.close()
 
835
 
 
836
    def _call(self, *args):
 
837
        self._send_tuple(args)
 
838
        return self._recv_tuple()
 
839
 
 
840
    def _call_with_upload(self, method, args, body):
 
841
        """Call an rpc, supplying bulk upload data.
 
842
 
 
843
        :param method: method name to call
 
844
        :param args: parameter args tuple
 
845
        :param body: upload body as a byte string
573
846
        """
574
 
        if relative_url:
575
 
            abs_url = self.abspath(relative_url)
576
 
        else:
577
 
            abs_url = self.base
578
 
        return RemoteHTTPTransport(abs_url,
579
 
                                   _from_transport=self,
580
 
                                   http_transport=self._http_transport)
581
 
 
582
 
    def _redirected_to(self, source, target):
583
 
        """See transport._redirected_to"""
584
 
        redirected = self._http_transport._redirected_to(source, target)
585
 
        if (redirected is not None
586
 
            and isinstance(redirected, type(self._http_transport))):
587
 
            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
588
 
                                       http_transport=redirected)
589
 
        else:
590
 
            # Either None or a transport for a different protocol
591
 
            return redirected
592
 
 
593
 
 
594
 
class HintingSSHTransport(transport.Transport):
595
 
    """Simple transport that handles ssh:// and points out bzr+ssh://."""
596
 
 
597
 
    def __init__(self, url):
598
 
        raise errors.UnsupportedProtocol(url,
599
 
            'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
 
847
        self._send_tuple((method,) + args)
 
848
        self._send_bulk_data(body)
 
849
        return self._recv_tuple()
 
850
 
 
851
    def query_version(self):
 
852
        """Return protocol version number of the server."""
 
853
        # XXX: should make sure it's empty
 
854
        self._send_tuple(('hello',))
 
855
        resp = self._recv_tuple()
 
856
        if resp == ('ok', '1'):
 
857
            return 1
 
858
        else:
 
859
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
860
 
 
861
 
 
862
class SmartTCPTransport(SmartTransport):
 
863
    """Connection to smart server over plain tcp"""
 
864
 
 
865
    def __init__(self, url, clone_from=None):
 
866
        super(SmartTCPTransport, self).__init__(url, clone_from)
 
867
        try:
 
868
            self._port = int(self._port)
 
869
        except (ValueError, TypeError), e:
 
870
            raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
871
        self._socket = None
 
872
 
 
873
    def _connect_to_server(self):
 
874
        self._socket = socket.socket()
 
875
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
876
        result = self._socket.connect_ex((self._host, int(self._port)))
 
877
        if result:
 
878
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
879
                    (self._host, self._port, os.strerror(result)))
 
880
        # TODO: May be more efficient to just treat them as sockets
 
881
        # throughout?  But what about pipes to ssh?...
 
882
        to_server = self._socket.makefile('w')
 
883
        from_server = self._socket.makefile('r')
 
884
        return from_server, to_server
 
885
 
 
886
    def disconnect(self):
 
887
        super(SmartTCPTransport, self).disconnect()
 
888
        # XXX: Is closing the socket as well as closing the files really
 
889
        # necessary?
 
890
        if self._socket is not None:
 
891
            self._socket.close()
 
892
 
 
893
try:
 
894
    from bzrlib.transport import sftp
 
895
except errors.ParamikoNotPresent:
 
896
    # no paramiko, no SSHTransport.
 
897
    pass
 
898
else:
 
899
    class SmartSSHTransport(SmartTransport):
 
900
        """Connection to smart server over SSH."""
 
901
 
 
902
        def __init__(self, url, clone_from=None):
 
903
            # TODO: all this probably belongs in the parent class.
 
904
            super(SmartSSHTransport, self).__init__(url, clone_from)
 
905
            try:
 
906
                if self._port is not None:
 
907
                    self._port = int(self._port)
 
908
            except (ValueError, TypeError), e:
 
909
                raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
 
910
 
 
911
        def _connect_to_server(self):
 
912
            # XXX: don't hardcode vendor
 
913
            # XXX: cannot pass password to SSHSubprocess yet
 
914
            if self._password is not None:
 
915
                raise errors.InvalidURL("SSH smart transport doesn't handle passwords")
 
916
            self._ssh_connection = sftp.SSHSubprocess(self._host, 'openssh',
 
917
                    port=self._port, user=self._username,
 
918
                    command=['bzr', 'serve', '--inet'])
 
919
            return self._ssh_connection.get_filelike_channels()
 
920
 
 
921
        def disconnect(self):
 
922
            super(SmartSSHTransport, self).disconnect()
 
923
            self._ssh_connection.close()
600
924
 
601
925
 
602
926
def get_test_permutations():
603
 
    """Return (transport, server) permutations for testing."""
604
 
    ### We may need a little more test framework support to construct an
605
 
    ### appropriate RemoteTransport in the future.
606
 
    from bzrlib.tests import test_server
607
 
    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]
 
927
    """Return (transport, server) permutations for testing"""
 
928
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]