~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

Delete some obsolete code and comments.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK+ TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the
 
51
idea that you have multiple requests and get have a read error because the
 
52
other side did shutdown sd send.  For pipes we have read pipe which will have a
 
53
zero read which marks end-of-file.  For HTTP server environment there is not
 
54
end-of-stream because each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out reqeusts from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialisation, deserialisation)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialisation, deserialisation)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
# TODO: jam 20060915 _decode_tuple is acting directly on input over
 
197
#       the socket, and it assumes everything is UTF8 sections separated
 
198
#       by \001. Which means a request like '\002' Will abort the connection
 
199
#       because of a UnicodeDecodeError. It does look like invalid data will
 
200
#       kill the SmartServerStreamMedium, but only with an abort + exception, and 
 
201
#       the overall server shouldn't die.
 
202
 
 
203
from cStringIO import StringIO
 
204
import os
 
205
import select
 
206
import socket
 
207
import tempfile
 
208
import threading
 
209
import urllib
 
210
import urlparse
 
211
 
 
212
from bzrlib import (
 
213
    bzrdir,
 
214
    errors,
 
215
    revision,
 
216
    transport,
 
217
    trace,
 
218
    urlutils,
 
219
    )
 
220
from bzrlib.bundle.serializer import write_bundle
 
221
 
 
222
# must do this otherwise urllib can't parse the urls properly :(
 
223
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
224
    transport.register_urlparse_netloc_protocol(scheme)
 
225
del scheme
 
226
 
 
227
 
 
228
def _recv_tuple(from_file):
 
229
    req_line = from_file.readline()
 
230
    return _decode_tuple(req_line)
 
231
 
 
232
 
 
233
def _decode_tuple(req_line):
 
234
    if req_line == None or req_line == '':
 
235
        return None
 
236
    if req_line[-1] != '\n':
 
237
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
238
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
239
 
 
240
 
 
241
def _encode_tuple(args):
 
242
    """Encode the tuple args to a bytestream."""
 
243
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
244
 
 
245
 
 
246
class SmartProtocolBase(object):
 
247
    """Methods common to client and server"""
 
248
 
 
249
    # TODO: this only actually accomodates a single block; possibly should
 
250
    # support multiple chunks?
 
251
    def _encode_bulk_data(self, body):
 
252
        """Encode body as a bulk data chunk."""
 
253
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
254
 
 
255
    def _serialise_offsets(self, offsets):
 
256
        """Serialise a readv offset list."""
 
257
        txt = []
 
258
        for start, length in offsets:
 
259
            txt.append('%d,%d' % (start, length))
 
260
        return '\n'.join(txt)
 
261
        
 
262
 
 
263
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
264
    """Server-side encoding and decoding logic for smart version 1."""
 
265
    
 
266
    def __init__(self, backing_transport, write_func):
 
267
        self._backing_transport = backing_transport
 
268
        self.excess_buffer = ''
 
269
        self._finished_reading = False
 
270
        self.in_buffer = ''
 
271
        self.has_dispatched = False
 
272
        self.request = None
 
273
        self._body_decoder = None
 
274
        self._write_func = write_func
 
275
 
 
276
    def accept_bytes(self, bytes):
 
277
        """Take bytes, and advance the internal state machine appropriately.
 
278
        
 
279
        :param bytes: must be a byte string
 
280
        """
 
281
        assert isinstance(bytes, str)
 
282
        self.in_buffer += bytes
 
283
        if not self.has_dispatched:
 
284
            if '\n' not in self.in_buffer:
 
285
                # no command line yet
 
286
                return
 
287
            self.has_dispatched = True
 
288
            try:
 
289
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
290
                first_line += '\n'
 
291
                req_args = _decode_tuple(first_line)
 
292
                self.request = SmartServerRequestHandler(
 
293
                    self._backing_transport)
 
294
                self.request.dispatch_command(req_args[0], req_args[1:])
 
295
                if self.request.finished_reading:
 
296
                    # trivial request
 
297
                    self.excess_buffer = self.in_buffer
 
298
                    self.in_buffer = ''
 
299
                    self._send_response(self.request.response.args,
 
300
                        self.request.response.body)
 
301
                self.sync_with_request(self.request)
 
302
            except KeyboardInterrupt:
 
303
                raise
 
304
            except Exception, exception:
 
305
                # everything else: pass to client, flush, and quit
 
306
                self._send_response(('error', str(exception)))
 
307
                return None
 
308
 
 
309
        if self.has_dispatched:
 
310
            if self._finished_reading:
 
311
                # nothing to do.XXX: this routine should be a single state 
 
312
                # machine too.
 
313
                self.excess_buffer += self.in_buffer
 
314
                self.in_buffer = ''
 
315
                return
 
316
            if self._body_decoder is None:
 
317
                self._body_decoder = LengthPrefixedBodyDecoder()
 
318
            self._body_decoder.accept_bytes(self.in_buffer)
 
319
            self.in_buffer = self._body_decoder.unused_data
 
320
            body_data = self._body_decoder.read_pending_data()
 
321
            self.request.accept_body(body_data)
 
322
            if self._body_decoder.finished_reading:
 
323
                self.request.end_of_body()
 
324
                assert self.request.finished_reading, \
 
325
                    "no more body, request not finished"
 
326
            self.sync_with_request(self.request)
 
327
            if self.request.response is not None:
 
328
                self._send_response(self.request.response.args,
 
329
                    self.request.response.body)
 
330
                self.excess_buffer = self.in_buffer
 
331
                self.in_buffer = ''
 
332
            else:
 
333
                assert not self.request.finished_reading, \
 
334
                    "no response and we have finished reading."
 
335
 
 
336
    def _send_response(self, args, body=None):
 
337
        """Send a smart server response down the output stream."""
 
338
        self._write_func(_encode_tuple(args))
 
339
        if body is not None:
 
340
            assert isinstance(body, str), 'body must be a str'
 
341
            bytes = self._encode_bulk_data(body)
 
342
            self._write_func(bytes)
 
343
 
 
344
    def sync_with_request(self, request):
 
345
        self._finished_reading = request.finished_reading
 
346
        
 
347
    def next_read_size(self):
 
348
        if self._finished_reading:
 
349
            return 0
 
350
        if self._body_decoder is None:
 
351
            return 1
 
352
        else:
 
353
            return self._body_decoder.next_read_size()
 
354
 
 
355
 
 
356
class LengthPrefixedBodyDecoder(object):
 
357
    """Decodes the length-prefixed bulk data."""
 
358
    
 
359
    def __init__(self):
 
360
        self.bytes_left = None
 
361
        self.finished_reading = False
 
362
        self.unused_data = ''
 
363
        self.state_accept = self._state_accept_expecting_length
 
364
        self.state_read = self._state_read_no_data
 
365
        self._in_buffer = ''
 
366
        self._trailer_buffer = ''
 
367
    
 
368
    def accept_bytes(self, bytes):
 
369
        """Decode as much of bytes as possible.
 
370
 
 
371
        If 'bytes' contains too much data it will be appended to
 
372
        self.unused_data.
 
373
 
 
374
        finished_reading will be set when no more data is required.  Further
 
375
        data will be appended to self.unused_data.
 
376
        """
 
377
        # accept_bytes is allowed to change the state
 
378
        current_state = self.state_accept
 
379
        self.state_accept(bytes)
 
380
        while current_state != self.state_accept:
 
381
            current_state = self.state_accept
 
382
            self.state_accept('')
 
383
 
 
384
    def next_read_size(self):
 
385
        if self.bytes_left is not None:
 
386
            # Ideally we want to read all the remainder of the body and the
 
387
            # trailer in one go.
 
388
            return self.bytes_left + 5
 
389
        elif self.state_accept == self._state_accept_reading_trailer:
 
390
            # Just the trailer left
 
391
            return 5 - len(self._trailer_buffer)
 
392
        elif self.state_accept == self._state_accept_expecting_length:
 
393
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
394
            # 'done\n').
 
395
            return 6
 
396
        else:
 
397
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
398
            return 1
 
399
        
 
400
    def read_pending_data(self):
 
401
        """Return any pending data that has been decoded."""
 
402
        return self.state_read()
 
403
 
 
404
    def _state_accept_expecting_length(self, bytes):
 
405
        self._in_buffer += bytes
 
406
        pos = self._in_buffer.find('\n')
 
407
        if pos == -1:
 
408
            return
 
409
        self.bytes_left = int(self._in_buffer[:pos])
 
410
        self._in_buffer = self._in_buffer[pos+1:]
 
411
        self.bytes_left -= len(self._in_buffer)
 
412
        self.state_accept = self._state_accept_reading_body
 
413
        self.state_read = self._state_read_in_buffer
 
414
 
 
415
    def _state_accept_reading_body(self, bytes):
 
416
        self._in_buffer += bytes
 
417
        self.bytes_left -= len(bytes)
 
418
        if self.bytes_left <= 0:
 
419
            # Finished with body
 
420
            if self.bytes_left != 0:
 
421
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
422
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
423
            self.bytes_left = None
 
424
            self.state_accept = self._state_accept_reading_trailer
 
425
        
 
426
    def _state_accept_reading_trailer(self, bytes):
 
427
        self._trailer_buffer += bytes
 
428
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
429
        # a ProtocolViolation exception?
 
430
        if self._trailer_buffer.startswith('done\n'):
 
431
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
432
            self.state_accept = self._state_accept_reading_unused
 
433
            self.finished_reading = True
 
434
    
 
435
    def _state_accept_reading_unused(self, bytes):
 
436
        self.unused_data += bytes
 
437
 
 
438
    def _state_read_no_data(self):
 
439
        return ''
 
440
 
 
441
    def _state_read_in_buffer(self):
 
442
        result = self._in_buffer
 
443
        self._in_buffer = ''
 
444
        return result
 
445
 
 
446
 
 
447
class SmartServerStreamMedium(object):
 
448
    """Handles smart commands coming over a stream.
 
449
 
 
450
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
451
    in-process fifo for testing.
 
452
 
 
453
    One instance is created for each connected client; it can serve multiple
 
454
    requests in the lifetime of the connection.
 
455
 
 
456
    The server passes requests through to an underlying backing transport, 
 
457
    which will typically be a LocalTransport looking at the server's filesystem.
 
458
    """
 
459
 
 
460
    def __init__(self, backing_transport):
 
461
        """Construct new server.
 
462
 
 
463
        :param backing_transport: Transport for the directory served.
 
464
        """
 
465
        # backing_transport could be passed to serve instead of __init__
 
466
        self.backing_transport = backing_transport
 
467
        self.finished = False
 
468
 
 
469
    def serve(self):
 
470
        """Serve requests until the client disconnects."""
 
471
        # Keep a reference to stderr because the sys module's globals get set to
 
472
        # None during interpreter shutdown.
 
473
        from sys import stderr
 
474
        try:
 
475
            while not self.finished:
 
476
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
477
                                                         self._write_out)
 
478
                self._serve_one_request(protocol)
 
479
        except Exception, e:
 
480
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
481
            raise
 
482
 
 
483
    def _serve_one_request(self, protocol):
 
484
        """Read one request from input, process, send back a response.
 
485
        
 
486
        :param protocol: a SmartServerRequestProtocol.
 
487
        """
 
488
        try:
 
489
            self._serve_one_request_unguarded(protocol)
 
490
        except KeyboardInterrupt:
 
491
            raise
 
492
        except Exception, e:
 
493
            self.terminate_due_to_error()
 
494
 
 
495
    def terminate_due_to_error(self):
 
496
        """Called when an unhandled exception from the protocol occurs."""
 
497
        raise NotImplementedError(self.terminate_due_to_error)
 
498
 
 
499
 
 
500
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
501
 
 
502
    def __init__(self, sock, backing_transport):
 
503
        """Constructor.
 
504
 
 
505
        :param sock: the socket the server will read from.  It will be put
 
506
            into blocking mode.
 
507
        """
 
508
        SmartServerStreamMedium.__init__(self, backing_transport)
 
509
        self.push_back = ''
 
510
        sock.setblocking(True)
 
511
        self.socket = sock
 
512
 
 
513
    def _serve_one_request_unguarded(self, protocol):
 
514
        while protocol.next_read_size():
 
515
            if self.push_back:
 
516
                protocol.accept_bytes(self.push_back)
 
517
                self.push_back = ''
 
518
            else:
 
519
                bytes = self.socket.recv(4096)
 
520
                if bytes == '':
 
521
                    self.finished = True
 
522
                    return
 
523
                protocol.accept_bytes(bytes)
 
524
        
 
525
        self.push_back = protocol.excess_buffer
 
526
    
 
527
    def terminate_due_to_error(self):
 
528
        """Called when an unhandled exception from the protocol occurs."""
 
529
        # TODO: This should log to a server log file, but no such thing
 
530
        # exists yet.  Andrew Bennetts 2006-09-29.
 
531
        self.socket.close()
 
532
        self.finished = True
 
533
 
 
534
    def _write_out(self, bytes):
 
535
        self.socket.sendall(bytes)
 
536
 
 
537
 
 
538
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
539
 
 
540
    def __init__(self, in_file, out_file, backing_transport):
 
541
        """Construct new server.
 
542
 
 
543
        :param in_file: Python file from which requests can be read.
 
544
        :param out_file: Python file to write responses.
 
545
        :param backing_transport: Transport for the directory served.
 
546
        """
 
547
        SmartServerStreamMedium.__init__(self, backing_transport)
 
548
        self._in = in_file
 
549
        self._out = out_file
 
550
 
 
551
    def _serve_one_request_unguarded(self, protocol):
 
552
        while True:
 
553
            bytes_to_read = protocol.next_read_size()
 
554
            if bytes_to_read == 0:
 
555
                # Finished serving this request.
 
556
                self._out.flush()
 
557
                return
 
558
            bytes = self._in.read(bytes_to_read)
 
559
            if bytes == '':
 
560
                # Connection has been closed.
 
561
                self.finished = True
 
562
                self._out.flush()
 
563
                return
 
564
            protocol.accept_bytes(bytes)
 
565
 
 
566
    def terminate_due_to_error(self):
 
567
        # TODO: This should log to a server log file, but no such thing
 
568
        # exists yet.  Andrew Bennetts 2006-09-29.
 
569
        self._out.close()
 
570
        self.finished = True
 
571
 
 
572
    def _write_out(self, bytes):
 
573
        self._out.write(bytes)
 
574
 
 
575
 
 
576
class SmartServerResponse(object):
 
577
    """Response generated by SmartServerRequestHandler."""
 
578
 
 
579
    def __init__(self, args, body=None):
 
580
        self.args = args
 
581
        self.body = body
 
582
 
 
583
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
584
# for delivering the data for a request. This could be done with as the
 
585
# StreamServer, though that would create conflation between request and response
 
586
# which may be undesirable.
 
587
 
 
588
 
 
589
class SmartServerRequestHandler(object):
 
590
    """Protocol logic for smart server.
 
591
    
 
592
    This doesn't handle serialization at all, it just processes requests and
 
593
    creates responses.
 
594
    """
 
595
 
 
596
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
597
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
598
    # from the object protocol: we will want to tweak the wire protocol separate
 
599
    # from the object model, and ideally we will be able to do that without
 
600
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
601
    # just a Protocol subclass.
 
602
 
 
603
    # TODO: Better way of representing the body for commands that take it,
 
604
    # and allow it to be streamed into the server.
 
605
    
 
606
    def __init__(self, backing_transport):
 
607
        self._backing_transport = backing_transport
 
608
        self._converted_command = False
 
609
        self.finished_reading = False
 
610
        self._body_bytes = ''
 
611
        self.response = None
 
612
 
 
613
    def accept_body(self, bytes):
 
614
        """Accept body data.
 
615
 
 
616
        This should be overriden for each command that desired body data to
 
617
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
618
 
 
619
        The deserialisation into that format should be done in the Protocol
 
620
        object. Set self.desired_body_format to the format your method will
 
621
        handle.
 
622
        """
 
623
        # default fallback is to accumulate bytes.
 
624
        self._body_bytes += bytes
 
625
        
 
626
    def _end_of_body_handler(self):
 
627
        """An unimplemented end of body handler."""
 
628
        raise NotImplementedError(self._end_of_body_handler)
 
629
        
 
630
    def do_hello(self):
 
631
        """Answer a version request with my version."""
 
632
        return SmartServerResponse(('ok', '1'))
 
633
 
 
634
    def do_has(self, relpath):
 
635
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
636
        return SmartServerResponse((r,))
 
637
 
 
638
    def do_get(self, relpath):
 
639
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
640
        return SmartServerResponse(('ok',), backing_bytes)
 
641
 
 
642
    def _deserialise_optional_mode(self, mode):
 
643
        # XXX: FIXME this should be on the protocol object.
 
644
        if mode == '':
 
645
            return None
 
646
        else:
 
647
            return int(mode)
 
648
 
 
649
    def do_append(self, relpath, mode):
 
650
        self._converted_command = True
 
651
        self._relpath = relpath
 
652
        self._mode = self._deserialise_optional_mode(mode)
 
653
        self._end_of_body_handler = self._handle_do_append_end
 
654
    
 
655
    def _handle_do_append_end(self):
 
656
        old_length = self._backing_transport.append_bytes(
 
657
            self._relpath, self._body_bytes, self._mode)
 
658
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
659
 
 
660
    def do_delete(self, relpath):
 
661
        self._backing_transport.delete(relpath)
 
662
 
 
663
    def do_iter_files_recursive(self, abspath):
 
664
        # XXX: the path handling needs some thought.
 
665
        #relpath = self._backing_transport.relpath(abspath)
 
666
        transport = self._backing_transport.clone(abspath)
 
667
        filenames = transport.iter_files_recursive()
 
668
        return SmartServerResponse(('names',) + tuple(filenames))
 
669
 
 
670
    def do_list_dir(self, relpath):
 
671
        filenames = self._backing_transport.list_dir(relpath)
 
672
        return SmartServerResponse(('names',) + tuple(filenames))
 
673
 
 
674
    def do_mkdir(self, relpath, mode):
 
675
        self._backing_transport.mkdir(relpath,
 
676
                                      self._deserialise_optional_mode(mode))
 
677
 
 
678
    def do_move(self, rel_from, rel_to):
 
679
        self._backing_transport.move(rel_from, rel_to)
 
680
 
 
681
    def do_put(self, relpath, mode):
 
682
        self._converted_command = True
 
683
        self._relpath = relpath
 
684
        self._mode = self._deserialise_optional_mode(mode)
 
685
        self._end_of_body_handler = self._handle_do_put
 
686
 
 
687
    def _handle_do_put(self):
 
688
        self._backing_transport.put_bytes(self._relpath,
 
689
                self._body_bytes, self._mode)
 
690
        self.response = SmartServerResponse(('ok',))
 
691
 
 
692
    def _deserialise_offsets(self, text):
 
693
        # XXX: FIXME this should be on the protocol object.
 
694
        offsets = []
 
695
        for line in text.split('\n'):
 
696
            if not line:
 
697
                continue
 
698
            start, length = line.split(',')
 
699
            offsets.append((int(start), int(length)))
 
700
        return offsets
 
701
 
 
702
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
703
        self._converted_command = True
 
704
        self._end_of_body_handler = self._handle_put_non_atomic
 
705
        self._relpath = relpath
 
706
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
707
        self._mode = self._deserialise_optional_mode(mode)
 
708
        # a boolean would be nicer XXX
 
709
        self._create_parent = (create_parent == 'T')
 
710
 
 
711
    def _handle_put_non_atomic(self):
 
712
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
713
                self._body_bytes,
 
714
                mode=self._mode,
 
715
                create_parent_dir=self._create_parent,
 
716
                dir_mode=self._dir_mode)
 
717
        self.response = SmartServerResponse(('ok',))
 
718
 
 
719
    def do_readv(self, relpath):
 
720
        self._converted_command = True
 
721
        self._end_of_body_handler = self._handle_readv_offsets
 
722
        self._relpath = relpath
 
723
 
 
724
    def end_of_body(self):
 
725
        """No more body data will be received."""
 
726
        self._run_handler_code(self._end_of_body_handler, (), {})
 
727
        # cannot read after this.
 
728
        self.finished_reading = True
 
729
 
 
730
    def _handle_readv_offsets(self):
 
731
        """accept offsets for a readv request."""
 
732
        offsets = self._deserialise_offsets(self._body_bytes)
 
733
        backing_bytes = ''.join(bytes for offset, bytes in
 
734
            self._backing_transport.readv(self._relpath, offsets))
 
735
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
736
        
 
737
    def do_rename(self, rel_from, rel_to):
 
738
        self._backing_transport.rename(rel_from, rel_to)
 
739
 
 
740
    def do_rmdir(self, relpath):
 
741
        self._backing_transport.rmdir(relpath)
 
742
 
 
743
    def do_stat(self, relpath):
 
744
        stat = self._backing_transport.stat(relpath)
 
745
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
746
        
 
747
    def do_get_bundle(self, path, revision_id):
 
748
        # open transport relative to our base
 
749
        t = self._backing_transport.clone(path)
 
750
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
751
        repo = control.open_repository()
 
752
        tmpf = tempfile.TemporaryFile()
 
753
        base_revision = revision.NULL_REVISION
 
754
        write_bundle(repo, revision_id, base_revision, tmpf)
 
755
        tmpf.seek(0)
 
756
        return SmartServerResponse((), tmpf.read())
 
757
 
 
758
    def dispatch_command(self, cmd, args):
 
759
        """Deprecated compatibility method.""" # XXX XXX
 
760
        func = getattr(self, 'do_' + cmd, None)
 
761
        if func is None:
 
762
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
763
        self._run_handler_code(func, args, {})
 
764
 
 
765
    def _run_handler_code(self, callable, args, kwargs):
 
766
        """Run some handler specific code 'callable'.
 
767
 
 
768
        If a result is returned, it is considered to be the commands response,
 
769
        and finished_reading is set true, and its assigned to self.response.
 
770
 
 
771
        Any exceptions caught are translated and a response object created
 
772
        from them.
 
773
        """
 
774
        result = self._call_converting_errors(callable, args, kwargs)
 
775
        if result is not None:
 
776
            self.response = result
 
777
            self.finished_reading = True
 
778
        # handle unconverted commands
 
779
        if not self._converted_command:
 
780
            self.finished_reading = True
 
781
            if result is None:
 
782
                self.response = SmartServerResponse(('ok',))
 
783
 
 
784
    def _call_converting_errors(self, callable, args, kwargs):
 
785
        """Call callable converting errors to Response objects."""
 
786
        try:
 
787
            return callable(*args, **kwargs)
 
788
        except errors.NoSuchFile, e:
 
789
            return SmartServerResponse(('NoSuchFile', e.path))
 
790
        except errors.FileExists, e:
 
791
            return SmartServerResponse(('FileExists', e.path))
 
792
        except errors.DirectoryNotEmpty, e:
 
793
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
794
        except errors.ShortReadvError, e:
 
795
            return SmartServerResponse(('ShortReadvError',
 
796
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
797
        except UnicodeError, e:
 
798
            # If it is a DecodeError, than most likely we are starting
 
799
            # with a plain string
 
800
            str_or_unicode = e.object
 
801
            if isinstance(str_or_unicode, unicode):
 
802
                val = u'u:' + str_or_unicode
 
803
            else:
 
804
                val = u's:' + str_or_unicode.encode('base64')
 
805
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
806
            return SmartServerResponse((e.__class__.__name__,
 
807
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
808
        except errors.TransportNotPossible, e:
 
809
            if e.msg == "readonly transport":
 
810
                return SmartServerResponse(('ReadOnlyError', ))
 
811
            else:
 
812
                raise
 
813
 
 
814
 
 
815
class SmartTCPServer(object):
 
816
    """Listens on a TCP socket and accepts connections from smart clients"""
 
817
 
 
818
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
819
        """Construct a new server.
 
820
 
 
821
        To actually start it running, call either start_background_thread or
 
822
        serve.
 
823
 
 
824
        :param host: Name of the interface to listen on.
 
825
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
826
        """
 
827
        self._server_socket = socket.socket()
 
828
        self._server_socket.bind((host, port))
 
829
        self.port = self._server_socket.getsockname()[1]
 
830
        self._server_socket.listen(1)
 
831
        self._server_socket.settimeout(1)
 
832
        self.backing_transport = backing_transport
 
833
 
 
834
    def serve(self):
 
835
        # let connections timeout so that we get a chance to terminate
 
836
        # Keep a reference to the exceptions we want to catch because the socket
 
837
        # module's globals get set to None during interpreter shutdown.
 
838
        from socket import timeout as socket_timeout
 
839
        from socket import error as socket_error
 
840
        self._should_terminate = False
 
841
        while not self._should_terminate:
 
842
            try:
 
843
                self.accept_and_serve()
 
844
            except socket_timeout:
 
845
                # just check if we're asked to stop
 
846
                pass
 
847
            except socket_error, e:
 
848
                trace.warning("client disconnected: %s", e)
 
849
                pass
 
850
 
 
851
    def get_url(self):
 
852
        """Return the url of the server"""
 
853
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
854
 
 
855
    def accept_and_serve(self):
 
856
        conn, client_addr = self._server_socket.accept()
 
857
        # For WIN32, where the timeout value from the listening socket
 
858
        # propogates to the newly accepted socket.
 
859
        conn.setblocking(True)
 
860
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
861
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
862
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
863
        connection_thread.setDaemon(True)
 
864
        connection_thread.start()
 
865
 
 
866
    def start_background_thread(self):
 
867
        self._server_thread = threading.Thread(None,
 
868
                self.serve,
 
869
                name='server-' + self.get_url())
 
870
        self._server_thread.setDaemon(True)
 
871
        self._server_thread.start()
 
872
 
 
873
    def stop_background_thread(self):
 
874
        self._should_terminate = True
 
875
        # self._server_socket.close()
 
876
        # we used to join the thread, but it's not really necessary; it will
 
877
        # terminate in time
 
878
        ## self._server_thread.join()
 
879
 
 
880
 
 
881
class SmartTCPServer_for_testing(SmartTCPServer):
 
882
    """Server suitable for use by transport tests.
 
883
    
 
884
    This server is backed by the process's cwd.
 
885
    """
 
886
 
 
887
    def __init__(self):
 
888
        self._homedir = os.getcwd()
 
889
        # The server is set up by default like for ssh access: the client
 
890
        # passes filesystem-absolute paths; therefore the server must look
 
891
        # them up relative to the root directory.  it might be better to act
 
892
        # a public server and have the server rewrite paths into the test
 
893
        # directory.
 
894
        SmartTCPServer.__init__(self, transport.get_transport("file:///"))
 
895
        
 
896
    def setUp(self):
 
897
        """Set up server for testing"""
 
898
        self.start_background_thread()
 
899
 
 
900
    def tearDown(self):
 
901
        self.stop_background_thread()
 
902
 
 
903
    def get_url(self):
 
904
        """Return the url of the server"""
 
905
        host, port = self._server_socket.getsockname()
 
906
        # XXX: I think this is likely to break on windows -- self._homedir will
 
907
        # have backslashes (and maybe a drive letter?).
 
908
        #  -- Andrew Bennetts, 2006-08-29
 
909
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
910
 
 
911
    def get_bogus_url(self):
 
912
        """Return a URL which will fail to connect"""
 
913
        return 'bzr://127.0.0.1:1/'
 
914
 
 
915
 
 
916
class SmartStat(object):
 
917
 
 
918
    def __init__(self, size, mode):
 
919
        self.st_size = size
 
920
        self.st_mode = mode
 
921
 
 
922
 
 
923
class SmartTransport(transport.Transport):
 
924
    """Connection to a smart server.
 
925
 
 
926
    The connection holds references to pipes that can be used to send requests
 
927
    to the server.
 
928
 
 
929
    The connection has a notion of the current directory to which it's
 
930
    connected; this is incorporated in filenames passed to the server.
 
931
    
 
932
    This supports some higher-level RPC operations and can also be treated 
 
933
    like a Transport to do file-like operations.
 
934
 
 
935
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
936
    or a series of http requests.  There are concrete subclasses for each
 
937
    type: SmartTCPTransport, etc.
 
938
    """
 
939
 
 
940
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
941
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
942
    # the ability to support multiple versions of the smart protocol over time:
 
943
    # SmartTransport is an adapter from the Transport object model to the 
 
944
    # SmartClient model, not an encoder.
 
945
 
 
946
    def __init__(self, url, clone_from=None, medium=None):
 
947
        """Constructor.
 
948
 
 
949
        :param medium: The medium to use for this RemoteTransport. This must be
 
950
            supplied if clone_from is None.
 
951
        """
 
952
        ### Technically super() here is faulty because Transport's __init__
 
953
        ### fails to take 2 parameters, and if super were to choose a silly
 
954
        ### initialisation order things would blow up. 
 
955
        if not url.endswith('/'):
 
956
            url += '/'
 
957
        super(SmartTransport, self).__init__(url)
 
958
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
959
                transport.split_url(url)
 
960
        if clone_from is None:
 
961
            self._medium = medium
 
962
        else:
 
963
            # credentials may be stripped from the base in some circumstances
 
964
            # as yet to be clearly defined or documented, so copy them.
 
965
            self._username = clone_from._username
 
966
            # reuse same connection
 
967
            self._medium = clone_from._medium
 
968
        assert self._medium is not None
 
969
 
 
970
    def abspath(self, relpath):
 
971
        """Return the full url to the given relative path.
 
972
        
 
973
        @param relpath: the relative path or path components
 
974
        @type relpath: str or list
 
975
        """
 
976
        return self._unparse_url(self._remote_path(relpath))
 
977
    
 
978
    def clone(self, relative_url):
 
979
        """Make a new SmartTransport related to me, sharing the same connection.
 
980
 
 
981
        This essentially opens a handle on a different remote directory.
 
982
        """
 
983
        if relative_url is None:
 
984
            return SmartTransport(self.base, self)
 
985
        else:
 
986
            return SmartTransport(self.abspath(relative_url), self)
 
987
 
 
988
    def is_readonly(self):
 
989
        """Smart server transport can do read/write file operations."""
 
990
        return False
 
991
                                                   
 
992
    def get_smart_client(self):
 
993
        return self._medium
 
994
 
 
995
    def get_smart_medium(self):
 
996
        return self._medium
 
997
                                                   
 
998
    def _unparse_url(self, path):
 
999
        """Return URL for a path.
 
1000
 
 
1001
        :see: SFTPUrlHandling._unparse_url
 
1002
        """
 
1003
        # TODO: Eventually it should be possible to unify this with
 
1004
        # SFTPUrlHandling._unparse_url?
 
1005
        if path == '':
 
1006
            path = '/'
 
1007
        path = urllib.quote(path)
 
1008
        netloc = urllib.quote(self._host)
 
1009
        if self._username is not None:
 
1010
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1011
        if self._port is not None:
 
1012
            netloc = '%s:%d' % (netloc, self._port)
 
1013
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
1014
 
 
1015
    def _remote_path(self, relpath):
 
1016
        """Returns the Unicode version of the absolute path for relpath."""
 
1017
        return self._combine_paths(self._path, relpath)
 
1018
 
 
1019
    def _call(self, method, *args):
 
1020
        resp = self._call2(method, *args)
 
1021
        self._translate_error(resp)
 
1022
 
 
1023
    def _call2(self, method, *args):
 
1024
        """Call a method on the remote server."""
 
1025
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1026
        protocol.call(method, *args)
 
1027
        return protocol.read_response_tuple()
 
1028
 
 
1029
    def _call_with_body_bytes(self, method, args, body):
 
1030
        """Call a method on the remote server with body bytes."""
 
1031
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1032
        protocol.call_with_body_bytes((method, ) + args, body)
 
1033
        return protocol.read_response_tuple()
 
1034
 
 
1035
    def has(self, relpath):
 
1036
        """Indicate whether a remote file of the given name exists or not.
 
1037
 
 
1038
        :see: Transport.has()
 
1039
        """
 
1040
        resp = self._call2('has', self._remote_path(relpath))
 
1041
        if resp == ('yes', ):
 
1042
            return True
 
1043
        elif resp == ('no', ):
 
1044
            return False
 
1045
        else:
 
1046
            self._translate_error(resp)
 
1047
 
 
1048
    def get(self, relpath):
 
1049
        """Return file-like object reading the contents of a remote file.
 
1050
        
 
1051
        :see: Transport.get_bytes()/get_file()
 
1052
        """
 
1053
        return StringIO(self.get_bytes(relpath))
 
1054
 
 
1055
    def get_bytes(self, relpath):
 
1056
        remote = self._remote_path(relpath)
 
1057
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1058
        protocol.call('get', remote)
 
1059
        resp = protocol.read_response_tuple(True)
 
1060
        if resp != ('ok', ):
 
1061
            protocol.cancel_read_body()
 
1062
            self._translate_error(resp, relpath)
 
1063
        return protocol.read_body_bytes()
 
1064
 
 
1065
    def _serialise_optional_mode(self, mode):
 
1066
        if mode is None:
 
1067
            return ''
 
1068
        else:
 
1069
            return '%d' % mode
 
1070
 
 
1071
    def mkdir(self, relpath, mode=None):
 
1072
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1073
            self._serialise_optional_mode(mode))
 
1074
        self._translate_error(resp)
 
1075
 
 
1076
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1077
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1078
        # should probably just pass all parameters as length-delimited
 
1079
        # strings?
 
1080
        resp = self._call_with_body_bytes('put',
 
1081
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1082
            upload_contents)
 
1083
        self._translate_error(resp)
 
1084
 
 
1085
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1086
                             create_parent_dir=False,
 
1087
                             dir_mode=None):
 
1088
        """See Transport.put_bytes_non_atomic."""
 
1089
        # FIXME: no encoding in the transport!
 
1090
        create_parent_str = 'F'
 
1091
        if create_parent_dir:
 
1092
            create_parent_str = 'T'
 
1093
 
 
1094
        resp = self._call_with_body_bytes(
 
1095
            'put_non_atomic',
 
1096
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1097
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1098
            bytes)
 
1099
        self._translate_error(resp)
 
1100
 
 
1101
    def put_file(self, relpath, upload_file, mode=None):
 
1102
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1103
        # on transports not reading before failing - which is a faulty
 
1104
        # assumption I think - RBC 20060915
 
1105
        pos = upload_file.tell()
 
1106
        try:
 
1107
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1108
        except:
 
1109
            upload_file.seek(pos)
 
1110
            raise
 
1111
 
 
1112
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1113
                            create_parent_dir=False,
 
1114
                            dir_mode=None):
 
1115
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1116
                                         create_parent_dir=create_parent_dir,
 
1117
                                         dir_mode=dir_mode)
 
1118
 
 
1119
    def append_file(self, relpath, from_file, mode=None):
 
1120
        return self.append_bytes(relpath, from_file.read(), mode)
 
1121
        
 
1122
    def append_bytes(self, relpath, bytes, mode=None):
 
1123
        resp = self._call_with_body_bytes(
 
1124
            'append',
 
1125
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1126
            bytes)
 
1127
        if resp[0] == 'appended':
 
1128
            return int(resp[1])
 
1129
        self._translate_error(resp)
 
1130
 
 
1131
    def delete(self, relpath):
 
1132
        resp = self._call2('delete', self._remote_path(relpath))
 
1133
        self._translate_error(resp)
 
1134
 
 
1135
    def readv(self, relpath, offsets):
 
1136
        if not offsets:
 
1137
            return
 
1138
 
 
1139
        offsets = list(offsets)
 
1140
 
 
1141
        sorted_offsets = sorted(offsets)
 
1142
        # turn the list of offsets into a stack
 
1143
        offset_stack = iter(offsets)
 
1144
        cur_offset_and_size = offset_stack.next()
 
1145
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1146
                               limit=self._max_readv_combine,
 
1147
                               fudge_factor=self._bytes_to_read_before_seek))
 
1148
 
 
1149
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1150
        protocol.call_with_body_readv_array(
 
1151
            ('readv', self._remote_path(relpath)),
 
1152
            [(c.start, c.length) for c in coalesced])
 
1153
        resp = protocol.read_response_tuple(True)
 
1154
 
 
1155
        if resp[0] != 'readv':
 
1156
            # This should raise an exception
 
1157
            protocol.cancel_read_body()
 
1158
            self._translate_error(resp)
 
1159
            return
 
1160
 
 
1161
        # FIXME: this should know how many bytes are needed, for clarity.
 
1162
        data = protocol.read_body_bytes()
 
1163
        # Cache the results, but only until they have been fulfilled
 
1164
        data_map = {}
 
1165
        for c_offset in coalesced:
 
1166
            if len(data) < c_offset.length:
 
1167
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1168
                            c_offset.length, actual=len(data))
 
1169
            for suboffset, subsize in c_offset.ranges:
 
1170
                key = (c_offset.start+suboffset, subsize)
 
1171
                data_map[key] = data[suboffset:suboffset+subsize]
 
1172
            data = data[c_offset.length:]
 
1173
 
 
1174
            # Now that we've read some data, see if we can yield anything back
 
1175
            while cur_offset_and_size in data_map:
 
1176
                this_data = data_map.pop(cur_offset_and_size)
 
1177
                yield cur_offset_and_size[0], this_data
 
1178
                cur_offset_and_size = offset_stack.next()
 
1179
 
 
1180
    def rename(self, rel_from, rel_to):
 
1181
        self._call('rename',
 
1182
                   self._remote_path(rel_from),
 
1183
                   self._remote_path(rel_to))
 
1184
 
 
1185
    def move(self, rel_from, rel_to):
 
1186
        self._call('move',
 
1187
                   self._remote_path(rel_from),
 
1188
                   self._remote_path(rel_to))
 
1189
 
 
1190
    def rmdir(self, relpath):
 
1191
        resp = self._call('rmdir', self._remote_path(relpath))
 
1192
 
 
1193
    def _translate_error(self, resp, orig_path=None):
 
1194
        """Raise an exception from a response"""
 
1195
        if resp is None:
 
1196
            what = None
 
1197
        else:
 
1198
            what = resp[0]
 
1199
        if what == 'ok':
 
1200
            return
 
1201
        elif what == 'NoSuchFile':
 
1202
            if orig_path is not None:
 
1203
                error_path = orig_path
 
1204
            else:
 
1205
                error_path = resp[1]
 
1206
            raise errors.NoSuchFile(error_path)
 
1207
        elif what == 'error':
 
1208
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1209
        elif what == 'FileExists':
 
1210
            raise errors.FileExists(resp[1])
 
1211
        elif what == 'DirectoryNotEmpty':
 
1212
            raise errors.DirectoryNotEmpty(resp[1])
 
1213
        elif what == 'ShortReadvError':
 
1214
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1215
                                         int(resp[3]), int(resp[4]))
 
1216
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1217
            encoding = str(resp[1]) # encoding must always be a string
 
1218
            val = resp[2]
 
1219
            start = int(resp[3])
 
1220
            end = int(resp[4])
 
1221
            reason = str(resp[5]) # reason must always be a string
 
1222
            if val.startswith('u:'):
 
1223
                val = val[2:]
 
1224
            elif val.startswith('s:'):
 
1225
                val = val[2:].decode('base64')
 
1226
            if what == 'UnicodeDecodeError':
 
1227
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1228
            elif what == 'UnicodeEncodeError':
 
1229
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1230
        elif what == "ReadOnlyError":
 
1231
            raise errors.TransportNotPossible('readonly transport')
 
1232
        else:
 
1233
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1234
 
 
1235
    def disconnect(self):
 
1236
        self._medium.disconnect()
 
1237
 
 
1238
    def delete_tree(self, relpath):
 
1239
        raise errors.TransportNotPossible('readonly transport')
 
1240
 
 
1241
    def stat(self, relpath):
 
1242
        resp = self._call2('stat', self._remote_path(relpath))
 
1243
        if resp[0] == 'stat':
 
1244
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1245
        else:
 
1246
            self._translate_error(resp)
 
1247
 
 
1248
    ## def lock_read(self, relpath):
 
1249
    ##     """Lock the given file for shared (read) access.
 
1250
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1251
    ##     """
 
1252
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1253
    ##     # continue that tradition and return a bogus lock object.
 
1254
    ##     class BogusLock(object):
 
1255
    ##         def __init__(self, path):
 
1256
    ##             self.path = path
 
1257
    ##         def unlock(self):
 
1258
    ##             pass
 
1259
    ##     return BogusLock(relpath)
 
1260
 
 
1261
    def listable(self):
 
1262
        return True
 
1263
 
 
1264
    def list_dir(self, relpath):
 
1265
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1266
        if resp[0] == 'names':
 
1267
            return [name.encode('ascii') for name in resp[1:]]
 
1268
        else:
 
1269
            self._translate_error(resp)
 
1270
 
 
1271
    def iter_files_recursive(self):
 
1272
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1273
        if resp[0] == 'names':
 
1274
            return resp[1:]
 
1275
        else:
 
1276
            self._translate_error(resp)
 
1277
 
 
1278
 
 
1279
class SmartClientMediumRequest(object):
 
1280
    """A request on a SmartClientMedium.
 
1281
 
 
1282
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1283
    the response bytes to be read via read_bytes.
 
1284
 
 
1285
    For instance:
 
1286
    request.accept_bytes('123')
 
1287
    request.finished_writing()
 
1288
    result = request.read_bytes(3)
 
1289
    request.finished_reading()
 
1290
 
 
1291
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1292
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1293
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1294
    details on concurrency and pipelining.
 
1295
    """
 
1296
 
 
1297
    def __init__(self, medium):
 
1298
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1299
        self._medium = medium
 
1300
        # we track state by constants - we may want to use the same
 
1301
        # pattern as BodyReader if it gets more complex.
 
1302
        # valid states are: "writing", "reading", "done"
 
1303
        self._state = "writing"
 
1304
 
 
1305
    def accept_bytes(self, bytes):
 
1306
        """Accept bytes for inclusion in this request.
 
1307
 
 
1308
        This method may not be be called after finished_writing() has been
 
1309
        called.  It depends upon the Medium whether or not the bytes will be
 
1310
        immediately transmitted. Message based Mediums will tend to buffer the
 
1311
        bytes until finished_writing() is called.
 
1312
 
 
1313
        :param bytes: A bytestring.
 
1314
        """
 
1315
        if self._state != "writing":
 
1316
            raise errors.WritingCompleted(self)
 
1317
        self._accept_bytes(bytes)
 
1318
 
 
1319
    def _accept_bytes(self, bytes):
 
1320
        """Helper for accept_bytes.
 
1321
 
 
1322
        Accept_bytes checks the state of the request to determing if bytes
 
1323
        should be accepted. After that it hands off to _accept_bytes to do the
 
1324
        actual acceptance.
 
1325
        """
 
1326
        raise NotImplementedError(self._accept_bytes)
 
1327
 
 
1328
    def finished_reading(self):
 
1329
        """Inform the request that all desired data has been read.
 
1330
 
 
1331
        This will remove the request from the pipeline for its medium (if the
 
1332
        medium supports pipelining) and any further calls to methods on the
 
1333
        request will raise ReadingCompleted.
 
1334
        """
 
1335
        if self._state == "writing":
 
1336
            raise errors.WritingNotComplete(self)
 
1337
        if self._state != "reading":
 
1338
            raise errors.ReadingCompleted(self)
 
1339
        self._state = "done"
 
1340
        self._finished_reading()
 
1341
 
 
1342
    def _finished_reading(self):
 
1343
        """Helper for finished_reading.
 
1344
 
 
1345
        finished_reading checks the state of the request to determine if 
 
1346
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1347
        to perform the action.
 
1348
        """
 
1349
        raise NotImplementedError(self._finished_reading)
 
1350
 
 
1351
    def finished_writing(self):
 
1352
        """Finish the writing phase of this request.
 
1353
 
 
1354
        This will flush all pending data for this request along the medium.
 
1355
        After calling finished_writing, you may not call accept_bytes anymore.
 
1356
        """
 
1357
        if self._state != "writing":
 
1358
            raise errors.WritingCompleted(self)
 
1359
        self._state = "reading"
 
1360
        self._finished_writing()
 
1361
 
 
1362
    def _finished_writing(self):
 
1363
        """Helper for finished_writing.
 
1364
 
 
1365
        finished_writing checks the state of the request to determine if 
 
1366
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1367
        to perform the action.
 
1368
        """
 
1369
        raise NotImplementedError(self._finished_writing)
 
1370
 
 
1371
    def read_bytes(self, count):
 
1372
        """Read bytes from this requests response.
 
1373
 
 
1374
        This method will block and wait for count bytes to be read. It may not
 
1375
        be invoked until finished_writing() has been called - this is to ensure
 
1376
        a message-based approach to requests, for compatability with message
 
1377
        based mediums like HTTP.
 
1378
        """
 
1379
        if self._state == "writing":
 
1380
            raise errors.WritingNotComplete(self)
 
1381
        if self._state != "reading":
 
1382
            raise errors.ReadingCompleted(self)
 
1383
        return self._read_bytes(count)
 
1384
 
 
1385
    def _read_bytes(self, count):
 
1386
        """Helper for read_bytes.
 
1387
 
 
1388
        read_bytes checks the state of the request to determing if bytes
 
1389
        should be read. After that it hands off to _read_bytes to do the
 
1390
        actual read.
 
1391
        """
 
1392
        raise NotImplementedError(self._read_bytes)
 
1393
 
 
1394
 
 
1395
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1396
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1397
 
 
1398
    def __init__(self, medium):
 
1399
        SmartClientMediumRequest.__init__(self, medium)
 
1400
        # check that we are safe concurrency wise. If some streams start
 
1401
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1402
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1403
        # and the setting/unsetting of _current_request likewise moved into
 
1404
        # that class : but its unneeded overhead for now. RBC 20060922
 
1405
        if self._medium._current_request is not None:
 
1406
            raise errors.TooManyConcurrentRequests(self._medium)
 
1407
        self._medium._current_request = self
 
1408
 
 
1409
    def _accept_bytes(self, bytes):
 
1410
        """See SmartClientMediumRequest._accept_bytes.
 
1411
        
 
1412
        This forwards to self._medium._accept_bytes because we are operating
 
1413
        on the mediums stream.
 
1414
        """
 
1415
        self._medium._accept_bytes(bytes)
 
1416
 
 
1417
    def _finished_reading(self):
 
1418
        """See SmartClientMediumRequest._finished_reading.
 
1419
 
 
1420
        This clears the _current_request on self._medium to allow a new 
 
1421
        request to be created.
 
1422
        """
 
1423
        assert self._medium._current_request is self
 
1424
        self._medium._current_request = None
 
1425
        
 
1426
    def _finished_writing(self):
 
1427
        """See SmartClientMediumRequest._finished_writing.
 
1428
 
 
1429
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1430
        """
 
1431
        self._medium._flush()
 
1432
 
 
1433
    def _read_bytes(self, count):
 
1434
        """See SmartClientMediumRequest._read_bytes.
 
1435
        
 
1436
        This forwards to self._medium._read_bytes because we are operating
 
1437
        on the mediums stream.
 
1438
        """
 
1439
        return self._medium._read_bytes(count)
 
1440
 
 
1441
 
 
1442
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1443
    """The client-side protocol for smart version 1."""
 
1444
 
 
1445
    def __init__(self, request):
 
1446
        """Construct a SmartClientRequestProtocolOne.
 
1447
 
 
1448
        :param request: A SmartClientMediumRequest to serialise onto and
 
1449
            deserialise from.
 
1450
        """
 
1451
        self._request = request
 
1452
        self._body_buffer = None
 
1453
 
 
1454
    def call(self, *args):
 
1455
        bytes = _encode_tuple(args)
 
1456
        self._request.accept_bytes(bytes)
 
1457
        self._request.finished_writing()
 
1458
 
 
1459
    def call_with_body_bytes(self, args, body):
 
1460
        """Make a remote call of args with body bytes 'body'.
 
1461
 
 
1462
        After calling this, call read_response_tuple to find the result out.
 
1463
        """
 
1464
        bytes = _encode_tuple(args)
 
1465
        self._request.accept_bytes(bytes)
 
1466
        bytes = self._encode_bulk_data(body)
 
1467
        self._request.accept_bytes(bytes)
 
1468
        self._request.finished_writing()
 
1469
 
 
1470
    def call_with_body_readv_array(self, args, body):
 
1471
        """Make a remote call with a readv array.
 
1472
 
 
1473
        The body is encoded with one line per readv offset pair. The numbers in
 
1474
        each pair are separated by a comma, and no trailing \n is emitted.
 
1475
        """
 
1476
        bytes = _encode_tuple(args)
 
1477
        self._request.accept_bytes(bytes)
 
1478
        readv_bytes = self._serialise_offsets(body)
 
1479
        bytes = self._encode_bulk_data(readv_bytes)
 
1480
        self._request.accept_bytes(bytes)
 
1481
        self._request.finished_writing()
 
1482
 
 
1483
    def cancel_read_body(self):
 
1484
        """After expecting a body, a response code may indicate one otherwise.
 
1485
 
 
1486
        This method lets the domain client inform the protocol that no body
 
1487
        will be transmitted. This is a terminal method: after calling it the
 
1488
        protocol is not able to be used further.
 
1489
        """
 
1490
        self._request.finished_reading()
 
1491
 
 
1492
    def read_response_tuple(self, expect_body=False):
 
1493
        """Read a response tuple from the wire.
 
1494
 
 
1495
        This should only be called once.
 
1496
        """
 
1497
        result = self._recv_tuple()
 
1498
        if not expect_body:
 
1499
            self._request.finished_reading()
 
1500
        return result
 
1501
 
 
1502
    def read_body_bytes(self, count=-1):
 
1503
        """Read bytes from the body, decoding into a byte stream.
 
1504
        
 
1505
        We read all bytes at once to ensure we've checked the trailer for 
 
1506
        errors, and then feed the buffer back as read_body_bytes is called.
 
1507
        """
 
1508
        if self._body_buffer is not None:
 
1509
            return self._body_buffer.read(count)
 
1510
        _body_decoder = LengthPrefixedBodyDecoder()
 
1511
 
 
1512
        while not _body_decoder.finished_reading:
 
1513
            bytes_wanted = _body_decoder.next_read_size()
 
1514
            bytes = self._request.read_bytes(bytes_wanted)
 
1515
            _body_decoder.accept_bytes(bytes)
 
1516
        self._request.finished_reading()
 
1517
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1518
        # XXX: TODO check the trailer result.
 
1519
        return self._body_buffer.read(count)
 
1520
 
 
1521
    def _recv_tuple(self):
 
1522
        """Receive a tuple from the medium request."""
 
1523
        line = ''
 
1524
        while not line or line[-1] != '\n':
 
1525
            # TODO: this is inefficient - but tuples are short.
 
1526
            new_char = self._request.read_bytes(1)
 
1527
            line += new_char
 
1528
            assert new_char != '', "end of file reading from server."
 
1529
        return _decode_tuple(line)
 
1530
 
 
1531
    def query_version(self):
 
1532
        """Return protocol version number of the server."""
 
1533
        self.call('hello')
 
1534
        resp = self.read_response_tuple()
 
1535
        if resp == ('ok', '1'):
 
1536
            return 1
 
1537
        else:
 
1538
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1539
 
 
1540
 
 
1541
class SmartClientMedium(object):
 
1542
    """Smart client is a medium for sending smart protocol requests over."""
 
1543
 
 
1544
    def disconnect(self):
 
1545
        """If this medium maintains a persistent connection, close it.
 
1546
        
 
1547
        The default implementation does nothing.
 
1548
        """
 
1549
        
 
1550
 
 
1551
class SmartClientStreamMedium(SmartClientMedium):
 
1552
    """Stream based medium common class.
 
1553
 
 
1554
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1555
    SmartClientStreamMediumRequest for their requests, and should implement
 
1556
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1557
    receive bytes.
 
1558
    """
 
1559
 
 
1560
    def __init__(self):
 
1561
        self._current_request = None
 
1562
 
 
1563
    def accept_bytes(self, bytes):
 
1564
        self._accept_bytes(bytes)
 
1565
 
 
1566
    def __del__(self):
 
1567
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1568
        finished with it.
 
1569
        """
 
1570
        self.disconnect()
 
1571
 
 
1572
    def _flush(self):
 
1573
        """Flush the output stream.
 
1574
        
 
1575
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1576
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1577
        """
 
1578
        raise NotImplementedError(self._flush)
 
1579
 
 
1580
    def get_request(self):
 
1581
        """See SmartClientMedium.get_request().
 
1582
 
 
1583
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1584
        for get_request.
 
1585
        """
 
1586
        return SmartClientStreamMediumRequest(self)
 
1587
 
 
1588
    def read_bytes(self, count):
 
1589
        return self._read_bytes(count)
 
1590
 
 
1591
 
 
1592
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1593
    """A client medium using simple pipes.
 
1594
    
 
1595
    This client does not manage the pipes: it assumes they will always be open.
 
1596
    """
 
1597
 
 
1598
    def __init__(self, readable_pipe, writeable_pipe):
 
1599
        SmartClientStreamMedium.__init__(self)
 
1600
        self._readable_pipe = readable_pipe
 
1601
        self._writeable_pipe = writeable_pipe
 
1602
 
 
1603
    def _accept_bytes(self, bytes):
 
1604
        """See SmartClientStreamMedium.accept_bytes."""
 
1605
        self._writeable_pipe.write(bytes)
 
1606
 
 
1607
    def _flush(self):
 
1608
        """See SmartClientStreamMedium._flush()."""
 
1609
        self._writeable_pipe.flush()
 
1610
 
 
1611
    def _read_bytes(self, count):
 
1612
        """See SmartClientStreamMedium._read_bytes."""
 
1613
        return self._readable_pipe.read(count)
 
1614
 
 
1615
 
 
1616
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1617
    """A client medium using SSH."""
 
1618
    
 
1619
    def __init__(self, host, port=None, username=None, password=None,
 
1620
            vendor=None):
 
1621
        """Creates a client that will connect on the first use.
 
1622
        
 
1623
        :param vendor: An optional override for the ssh vendor to use. See
 
1624
            bzrlib.transport.ssh for details on ssh vendors.
 
1625
        """
 
1626
        SmartClientStreamMedium.__init__(self)
 
1627
        self._connected = False
 
1628
        self._host = host
 
1629
        self._password = password
 
1630
        self._port = port
 
1631
        self._username = username
 
1632
        self._read_from = None
 
1633
        self._ssh_connection = None
 
1634
        self._vendor = vendor
 
1635
        self._write_to = None
 
1636
 
 
1637
    def _accept_bytes(self, bytes):
 
1638
        """See SmartClientStreamMedium.accept_bytes."""
 
1639
        self._ensure_connection()
 
1640
        self._write_to.write(bytes)
 
1641
 
 
1642
    def disconnect(self):
 
1643
        """See SmartClientMedium.disconnect()."""
 
1644
        if not self._connected:
 
1645
            return
 
1646
        self._read_from.close()
 
1647
        self._write_to.close()
 
1648
        self._ssh_connection.close()
 
1649
        self._connected = False
 
1650
 
 
1651
    def _ensure_connection(self):
 
1652
        """Connect this medium if not already connected."""
 
1653
        if self._connected:
 
1654
            return
 
1655
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1656
        if self._vendor is None:
 
1657
            vendor = ssh._get_ssh_vendor()
 
1658
        else:
 
1659
            vendor = self._vendor
 
1660
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1661
                self._password, self._host, self._port,
 
1662
                command=[executable, 'serve', '--inet', '--directory=/',
 
1663
                         '--allow-writes'])
 
1664
        self._read_from, self._write_to = \
 
1665
            self._ssh_connection.get_filelike_channels()
 
1666
        self._connected = True
 
1667
 
 
1668
    def _flush(self):
 
1669
        """See SmartClientStreamMedium._flush()."""
 
1670
        self._write_to.flush()
 
1671
 
 
1672
    def _read_bytes(self, count):
 
1673
        """See SmartClientStreamMedium.read_bytes."""
 
1674
        if not self._connected:
 
1675
            raise errors.MediumNotConnected(self)
 
1676
        return self._read_from.read(count)
 
1677
 
 
1678
 
 
1679
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1680
    """A client medium using TCP."""
 
1681
    
 
1682
    def __init__(self, host, port):
 
1683
        """Creates a client that will connect on the first use."""
 
1684
        SmartClientStreamMedium.__init__(self)
 
1685
        self._connected = False
 
1686
        self._host = host
 
1687
        self._port = port
 
1688
        self._socket = None
 
1689
 
 
1690
    def _accept_bytes(self, bytes):
 
1691
        """See SmartClientMedium.accept_bytes."""
 
1692
        self._ensure_connection()
 
1693
        self._socket.sendall(bytes)
 
1694
 
 
1695
    def disconnect(self):
 
1696
        """See SmartClientMedium.disconnect()."""
 
1697
        if not self._connected:
 
1698
            return
 
1699
        self._socket.close()
 
1700
        self._socket = None
 
1701
        self._connected = False
 
1702
 
 
1703
    def _ensure_connection(self):
 
1704
        """Connect this medium if not already connected."""
 
1705
        if self._connected:
 
1706
            return
 
1707
        self._socket = socket.socket()
 
1708
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1709
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1710
        if result:
 
1711
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1712
                    (self._host, self._port, os.strerror(result)))
 
1713
        self._connected = True
 
1714
 
 
1715
    def _flush(self):
 
1716
        """See SmartClientStreamMedium._flush().
 
1717
        
 
1718
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1719
        add a means to do a flush, but that can be done in the future.
 
1720
        """
 
1721
 
 
1722
    def _read_bytes(self, count):
 
1723
        """See SmartClientMedium.read_bytes."""
 
1724
        if not self._connected:
 
1725
            raise errors.MediumNotConnected(self)
 
1726
        return self._socket.recv(count)
 
1727
 
 
1728
 
 
1729
class SmartTCPTransport(SmartTransport):
 
1730
    """Connection to smart server over plain tcp.
 
1731
    
 
1732
    This is essentially just a factory to get 'RemoteTransport(url,
 
1733
        SmartTCPClientMedium).
 
1734
    """
 
1735
 
 
1736
    def __init__(self, url):
 
1737
        _scheme, _username, _password, _host, _port, _path = \
 
1738
            transport.split_url(url)
 
1739
        try:
 
1740
            _port = int(_port)
 
1741
        except (ValueError, TypeError), e:
 
1742
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1743
        medium = SmartTCPClientMedium(_host, _port)
 
1744
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1745
 
 
1746
 
 
1747
try:
 
1748
    from bzrlib.transport import ssh
 
1749
except errors.ParamikoNotPresent:
 
1750
    # no paramiko, no SSHTransport.
 
1751
    pass
 
1752
else:
 
1753
    class SmartSSHTransport(SmartTransport):
 
1754
        """Connection to smart server over SSH.
 
1755
 
 
1756
        This is essentially just a factory to get 'RemoteTransport(url,
 
1757
            SmartSSHClientMedium).
 
1758
        """
 
1759
 
 
1760
        def __init__(self, url):
 
1761
            _scheme, _username, _password, _host, _port, _path = \
 
1762
                transport.split_url(url)
 
1763
            try:
 
1764
                if _port is not None:
 
1765
                    _port = int(_port)
 
1766
            except (ValueError, TypeError), e:
 
1767
                raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1768
                    _port)
 
1769
            medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1770
            super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1771
 
 
1772
 
 
1773
def get_test_permutations():
 
1774
    """Return (transport, server) permutations for testing."""
 
1775
    ### We may need a little more test framework support to construct an
 
1776
    ### appropriate RemoteTransport in the future.
 
1777
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]