~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: John Arbash Meinel
  • Date: 2006-08-14 16:16:53 UTC
  • mto: (1946.2.6 reduce-knit-churn)
  • mto: This revision was merged to the branch mainline in revision 1919.
  • Revision ID: john@arbash-meinel.com-20060814161653-54cdcdadcd4e9003
Remove bogus entry from BRANCH.TODO

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Smart-server protocol, client and server.
18
 
 
19
 
Requests are sent as a command and list of arguments, followed by optional
20
 
bulk body data.  Responses are similarly a response and list of arguments,
21
 
followed by bulk body data. ::
22
 
 
23
 
  SEP := '\001'
24
 
    Fields are separated by Ctrl-A.
25
 
  BULK_DATA := CHUNK TRAILER
26
 
    Chunks can be repeated as many times as necessary.
27
 
  CHUNK := CHUNK_LEN CHUNK_BODY
28
 
  CHUNK_LEN := DIGIT+ NEWLINE
29
 
    Gives the number of bytes in the following chunk.
30
 
  CHUNK_BODY := BYTE[chunk_len]
31
 
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
 
  SUCCESS_TRAILER := 'done' NEWLINE
33
 
  ERROR_TRAILER := 
34
 
 
35
 
Paths are passed across the network.  The client needs to see a namespace that
36
 
includes any repository that might need to be referenced, and the client needs
37
 
to know about a root directory beyond which it cannot ascend.
38
 
 
39
 
Servers run over ssh will typically want to be able to access any path the user 
40
 
can access.  Public servers on the other hand (which might be over http, ssh
41
 
or tcp) will typically want to restrict access to only a particular directory 
42
 
and its children, so will want to do a software virtual root at that level.
43
 
In other words they'll want to rewrite incoming paths to be under that level
44
 
(and prevent escaping using ../ tricks.)
45
 
 
46
 
URLs that include ~ should probably be passed across to the server verbatim
47
 
and the server can expand them.  This will proably not be meaningful when 
48
 
limited to a directory?
49
 
 
50
 
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
51
 
that you have multiple requests and get a read error because the other side did
52
 
shutdown.  For pipes we have read pipe which will have a zero read which marks
53
 
end-of-file.  For HTTP server environment there is not end-of-stream because
54
 
each request coming into the server is independent.
55
 
 
56
 
So we need a wrapper around pipes and sockets to seperate out requests from
57
 
substrate and this will give us a single model which is consist for HTTP,
58
 
sockets and pipes.
59
 
 
60
 
Server-side
61
 
-----------
62
 
 
63
 
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
64
 
          uses protocol to detect end-of-request, sends written
65
 
          bytes to client) e.g. socket, pipe, HTTP request handler.
66
 
  ^
67
 
  | bytes.
68
 
  v
69
 
 
70
 
PROTOCOL  (serialization, deserialization)  accepts bytes for one
71
 
          request, decodes according to internal state, pushes
72
 
          structured data to handler.  accepts structured data from
73
 
          handler and encodes and writes to the medium.  factory for
74
 
          handler.
75
 
  ^
76
 
  | structured data
77
 
  v
78
 
 
79
 
HANDLER   (domain logic) accepts structured data, operates state
80
 
          machine until the request can be satisfied,
81
 
          sends structured data to the protocol.
82
 
 
83
 
 
84
 
Client-side
85
 
-----------
86
 
 
87
 
 CLIENT             domain logic, accepts domain requests, generated structured
88
 
                    data, reads structured data from responses and turns into
89
 
                    domain data.  Sends structured data to the protocol.
90
 
                    Operates state machines until the request can be delivered
91
 
                    (e.g. reading from a bundle generated in bzrlib to deliver a
92
 
                    complete request).
93
 
 
94
 
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
95
 
                    ...
96
 
  ^
97
 
  | structured data
98
 
  v
99
 
 
100
 
PROTOCOL  (serialization, deserialization)  accepts structured data for one
101
 
          request, encodes and writes to the medium.  Reads bytes from the
102
 
          medium, decodes and allows the client to read structured data.
103
 
  ^
104
 
  | bytes.
105
 
  v
106
 
 
107
 
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
108
 
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
109
 
"""
110
 
 
111
 
 
112
 
# TODO: _translate_error should be on the client, not the transport because
113
 
#     error coding is wire protocol specific.
114
 
 
115
 
# TODO: A plain integer from query_version is too simple; should give some
116
 
# capabilities too?
117
 
 
118
 
# TODO: Server should probably catch exceptions within itself and send them
119
 
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
120
 
# Also needs to somehow report protocol errors like bad requests.  Need to
121
 
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
 
# bulk transfer and then something goes wrong.
123
 
 
124
 
# TODO: Standard marker at start of request/response lines?
125
 
 
126
 
# TODO: Make each request and response self-validatable, e.g. with checksums.
127
 
#
128
 
# TODO: get/put objects could be changed to gradually read back the data as it
129
 
# comes across the network
130
 
#
131
 
# TODO: What should the server do if it hits an error and has to terminate?
132
 
#
133
 
# TODO: is it useful to allow multiple chunks in the bulk data?
134
 
#
135
 
# TODO: If we get an exception during transmission of bulk data we can't just
136
 
# emit the exception because it won't be seen.
137
 
#   John proposes:  I think it would be worthwhile to have a header on each
138
 
#   chunk, that indicates it is another chunk. Then you can send an 'error'
139
 
#   chunk as long as you finish the previous chunk.
140
 
#
141
 
# TODO: Clone method on Transport; should work up towards parent directory;
142
 
# unclear how this should be stored or communicated to the server... maybe
143
 
# just pass it on all relevant requests?
144
 
#
145
 
# TODO: Better name than clone() for changing between directories.  How about
146
 
# open_dir or change_dir or chdir?
147
 
#
148
 
# TODO: Is it really good to have the notion of current directory within the
149
 
# connection?  Perhaps all Transports should factor out a common connection
150
 
# from the thing that has the directory context?
151
 
#
152
 
# TODO: Pull more things common to sftp and ssh to a higher level.
153
 
#
154
 
# TODO: The server that manages a connection should be quite small and retain
155
 
# minimum state because each of the requests are supposed to be stateless.
156
 
# Then we can write another implementation that maps to http.
157
 
#
158
 
# TODO: What to do when a client connection is garbage collected?  Maybe just
159
 
# abruptly drop the connection?
160
 
#
161
 
# TODO: Server in some cases will need to restrict access to files outside of
162
 
# a particular root directory.  LocalTransport doesn't do anything to stop you
163
 
# ascending above the base directory, so we need to prevent paths
164
 
# containing '..' in either the server or transport layers.  (Also need to
165
 
# consider what happens if someone creates a symlink pointing outside the 
166
 
# directory tree...)
167
 
#
168
 
# TODO: Server should rebase absolute paths coming across the network to put
169
 
# them under the virtual root, if one is in use.  LocalTransport currently
170
 
# doesn't do that; if you give it an absolute path it just uses it.
171
 
172
 
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
 
# urlescape them instead.  Indeed possibly this should just literally be
174
 
# http-over-ssh.
175
 
#
176
 
# FIXME: This transport, with several others, has imperfect handling of paths
177
 
# within urls.  It'd probably be better for ".." from a root to raise an error
178
 
# rather than return the same directory as we do at present.
179
 
#
180
 
# TODO: Rather than working at the Transport layer we want a Branch,
181
 
# Repository or BzrDir objects that talk to a server.
182
 
#
183
 
# TODO: Probably want some way for server commands to gradually produce body
184
 
# data rather than passing it as a string; they could perhaps pass an
185
 
# iterator-like callback that will gradually yield data; it probably needs a
186
 
# close() method that will always be closed to do any necessary cleanup.
187
 
#
188
 
# TODO: Split the actual smart server from the ssh encoding of it.
189
 
#
190
 
# TODO: Perhaps support file-level readwrite operations over the transport
191
 
# too.
192
 
#
193
 
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
 
# branch doing file-level operations.
195
 
#
196
 
 
197
 
from cStringIO import StringIO
198
 
import os
199
 
import socket
200
 
import tempfile
201
 
import threading
202
 
import urllib
203
 
import urlparse
204
 
 
205
 
from bzrlib import (
206
 
    bzrdir,
207
 
    errors,
208
 
    revision,
209
 
    transport,
210
 
    trace,
211
 
    urlutils,
212
 
    )
213
 
from bzrlib.bundle.serializer import write_bundle
214
 
try:
215
 
    from bzrlib.transport import ssh
216
 
except errors.ParamikoNotPresent:
217
 
    # no paramiko.  SmartSSHClientMedium will break.
218
 
    pass
219
 
 
220
 
# must do this otherwise urllib can't parse the urls properly :(
221
 
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
222
 
    transport.register_urlparse_netloc_protocol(scheme)
223
 
del scheme
224
 
 
225
 
 
226
 
def _recv_tuple(from_file):
227
 
    req_line = from_file.readline()
228
 
    return _decode_tuple(req_line)
229
 
 
230
 
 
231
 
def _decode_tuple(req_line):
232
 
    if req_line == None or req_line == '':
233
 
        return None
234
 
    if req_line[-1] != '\n':
235
 
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
236
 
    try:
237
 
        return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
238
 
    except UnicodeDecodeError:
239
 
        raise errors.SmartProtocolError(
240
 
            "one or more arguments of request %r are not valid UTF-8"
241
 
            % req_line)
242
 
 
243
 
 
244
 
def _encode_tuple(args):
245
 
    """Encode the tuple args to a bytestream."""
246
 
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
247
 
 
248
 
 
249
 
class SmartProtocolBase(object):
250
 
    """Methods common to client and server"""
251
 
 
252
 
    # TODO: this only actually accomodates a single block; possibly should
253
 
    # support multiple chunks?
254
 
    def _encode_bulk_data(self, body):
255
 
        """Encode body as a bulk data chunk."""
256
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
257
 
 
258
 
    def _serialise_offsets(self, offsets):
259
 
        """Serialise a readv offset list."""
260
 
        txt = []
261
 
        for start, length in offsets:
262
 
            txt.append('%d,%d' % (start, length))
263
 
        return '\n'.join(txt)
264
 
        
265
 
 
266
 
class SmartServerRequestProtocolOne(SmartProtocolBase):
267
 
    """Server-side encoding and decoding logic for smart version 1."""
268
 
    
269
 
    def __init__(self, backing_transport, write_func):
270
 
        self._backing_transport = backing_transport
271
 
        self.excess_buffer = ''
272
 
        self._finished_reading = False
273
 
        self.in_buffer = ''
274
 
        self.has_dispatched = False
275
 
        self.request = None
276
 
        self._body_decoder = None
277
 
        self._write_func = write_func
278
 
 
279
 
    def accept_bytes(self, bytes):
280
 
        """Take bytes, and advance the internal state machine appropriately.
281
 
        
282
 
        :param bytes: must be a byte string
283
 
        """
284
 
        assert isinstance(bytes, str)
285
 
        self.in_buffer += bytes
286
 
        if not self.has_dispatched:
287
 
            if '\n' not in self.in_buffer:
288
 
                # no command line yet
289
 
                return
290
 
            self.has_dispatched = True
291
 
            try:
292
 
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
293
 
                first_line += '\n'
294
 
                req_args = _decode_tuple(first_line)
295
 
                self.request = SmartServerRequestHandler(
296
 
                    self._backing_transport)
297
 
                self.request.dispatch_command(req_args[0], req_args[1:])
298
 
                if self.request.finished_reading:
299
 
                    # trivial request
300
 
                    self.excess_buffer = self.in_buffer
301
 
                    self.in_buffer = ''
302
 
                    self._send_response(self.request.response.args,
303
 
                        self.request.response.body)
304
 
                self.sync_with_request(self.request)
305
 
            except KeyboardInterrupt:
306
 
                raise
307
 
            except Exception, exception:
308
 
                # everything else: pass to client, flush, and quit
309
 
                self._send_response(('error', str(exception)))
310
 
                return None
311
 
 
312
 
        if self.has_dispatched:
313
 
            if self._finished_reading:
314
 
                # nothing to do.XXX: this routine should be a single state 
315
 
                # machine too.
316
 
                self.excess_buffer += self.in_buffer
317
 
                self.in_buffer = ''
318
 
                return
319
 
            if self._body_decoder is None:
320
 
                self._body_decoder = LengthPrefixedBodyDecoder()
321
 
            self._body_decoder.accept_bytes(self.in_buffer)
322
 
            self.in_buffer = self._body_decoder.unused_data
323
 
            body_data = self._body_decoder.read_pending_data()
324
 
            self.request.accept_body(body_data)
325
 
            if self._body_decoder.finished_reading:
326
 
                self.request.end_of_body()
327
 
                assert self.request.finished_reading, \
328
 
                    "no more body, request not finished"
329
 
            self.sync_with_request(self.request)
330
 
            if self.request.response is not None:
331
 
                self._send_response(self.request.response.args,
332
 
                    self.request.response.body)
333
 
                self.excess_buffer = self.in_buffer
334
 
                self.in_buffer = ''
335
 
            else:
336
 
                assert not self.request.finished_reading, \
337
 
                    "no response and we have finished reading."
338
 
 
339
 
    def _send_response(self, args, body=None):
340
 
        """Send a smart server response down the output stream."""
341
 
        self._write_func(_encode_tuple(args))
342
 
        if body is not None:
343
 
            assert isinstance(body, str), 'body must be a str'
344
 
            bytes = self._encode_bulk_data(body)
345
 
            self._write_func(bytes)
346
 
 
347
 
    def sync_with_request(self, request):
348
 
        self._finished_reading = request.finished_reading
349
 
        
350
 
    def next_read_size(self):
351
 
        if self._finished_reading:
352
 
            return 0
353
 
        if self._body_decoder is None:
354
 
            return 1
355
 
        else:
356
 
            return self._body_decoder.next_read_size()
357
 
 
358
 
 
359
 
class LengthPrefixedBodyDecoder(object):
360
 
    """Decodes the length-prefixed bulk data."""
361
 
    
362
 
    def __init__(self):
363
 
        self.bytes_left = None
364
 
        self.finished_reading = False
365
 
        self.unused_data = ''
366
 
        self.state_accept = self._state_accept_expecting_length
367
 
        self.state_read = self._state_read_no_data
368
 
        self._in_buffer = ''
369
 
        self._trailer_buffer = ''
370
 
    
371
 
    def accept_bytes(self, bytes):
372
 
        """Decode as much of bytes as possible.
373
 
 
374
 
        If 'bytes' contains too much data it will be appended to
375
 
        self.unused_data.
376
 
 
377
 
        finished_reading will be set when no more data is required.  Further
378
 
        data will be appended to self.unused_data.
379
 
        """
380
 
        # accept_bytes is allowed to change the state
381
 
        current_state = self.state_accept
382
 
        self.state_accept(bytes)
383
 
        while current_state != self.state_accept:
384
 
            current_state = self.state_accept
385
 
            self.state_accept('')
386
 
 
387
 
    def next_read_size(self):
388
 
        if self.bytes_left is not None:
389
 
            # Ideally we want to read all the remainder of the body and the
390
 
            # trailer in one go.
391
 
            return self.bytes_left + 5
392
 
        elif self.state_accept == self._state_accept_reading_trailer:
393
 
            # Just the trailer left
394
 
            return 5 - len(self._trailer_buffer)
395
 
        elif self.state_accept == self._state_accept_expecting_length:
396
 
            # There's still at least 6 bytes left ('\n' to end the length, plus
397
 
            # 'done\n').
398
 
            return 6
399
 
        else:
400
 
            # Reading excess data.  Either way, 1 byte at a time is fine.
401
 
            return 1
402
 
        
403
 
    def read_pending_data(self):
404
 
        """Return any pending data that has been decoded."""
405
 
        return self.state_read()
406
 
 
407
 
    def _state_accept_expecting_length(self, bytes):
408
 
        self._in_buffer += bytes
409
 
        pos = self._in_buffer.find('\n')
410
 
        if pos == -1:
411
 
            return
412
 
        self.bytes_left = int(self._in_buffer[:pos])
413
 
        self._in_buffer = self._in_buffer[pos+1:]
414
 
        self.bytes_left -= len(self._in_buffer)
415
 
        self.state_accept = self._state_accept_reading_body
416
 
        self.state_read = self._state_read_in_buffer
417
 
 
418
 
    def _state_accept_reading_body(self, bytes):
419
 
        self._in_buffer += bytes
420
 
        self.bytes_left -= len(bytes)
421
 
        if self.bytes_left <= 0:
422
 
            # Finished with body
423
 
            if self.bytes_left != 0:
424
 
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
425
 
                self._in_buffer = self._in_buffer[:self.bytes_left]
426
 
            self.bytes_left = None
427
 
            self.state_accept = self._state_accept_reading_trailer
428
 
        
429
 
    def _state_accept_reading_trailer(self, bytes):
430
 
        self._trailer_buffer += bytes
431
 
        # TODO: what if the trailer does not match "done\n"?  Should this raise
432
 
        # a ProtocolViolation exception?
433
 
        if self._trailer_buffer.startswith('done\n'):
434
 
            self.unused_data = self._trailer_buffer[len('done\n'):]
435
 
            self.state_accept = self._state_accept_reading_unused
436
 
            self.finished_reading = True
437
 
    
438
 
    def _state_accept_reading_unused(self, bytes):
439
 
        self.unused_data += bytes
440
 
 
441
 
    def _state_read_no_data(self):
442
 
        return ''
443
 
 
444
 
    def _state_read_in_buffer(self):
445
 
        result = self._in_buffer
446
 
        self._in_buffer = ''
447
 
        return result
448
 
 
449
 
 
450
 
class SmartServerStreamMedium(object):
451
 
    """Handles smart commands coming over a stream.
452
 
 
453
 
    The stream may be a pipe connected to sshd, or a tcp socket, or an
454
 
    in-process fifo for testing.
455
 
 
456
 
    One instance is created for each connected client; it can serve multiple
457
 
    requests in the lifetime of the connection.
458
 
 
459
 
    The server passes requests through to an underlying backing transport, 
460
 
    which will typically be a LocalTransport looking at the server's filesystem.
461
 
    """
462
 
 
463
 
    def __init__(self, backing_transport):
464
 
        """Construct new server.
465
 
 
466
 
        :param backing_transport: Transport for the directory served.
467
 
        """
468
 
        # backing_transport could be passed to serve instead of __init__
469
 
        self.backing_transport = backing_transport
470
 
        self.finished = False
471
 
 
472
 
    def serve(self):
473
 
        """Serve requests until the client disconnects."""
474
 
        # Keep a reference to stderr because the sys module's globals get set to
475
 
        # None during interpreter shutdown.
476
 
        from sys import stderr
477
 
        try:
478
 
            while not self.finished:
479
 
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
480
 
                                                         self._write_out)
481
 
                self._serve_one_request(protocol)
482
 
        except Exception, e:
483
 
            stderr.write("%s terminating on exception %s\n" % (self, e))
484
 
            raise
485
 
 
486
 
    def _serve_one_request(self, protocol):
487
 
        """Read one request from input, process, send back a response.
488
 
        
489
 
        :param protocol: a SmartServerRequestProtocol.
490
 
        """
491
 
        try:
492
 
            self._serve_one_request_unguarded(protocol)
493
 
        except KeyboardInterrupt:
494
 
            raise
495
 
        except Exception, e:
496
 
            self.terminate_due_to_error()
497
 
 
498
 
    def terminate_due_to_error(self):
499
 
        """Called when an unhandled exception from the protocol occurs."""
500
 
        raise NotImplementedError(self.terminate_due_to_error)
501
 
 
502
 
 
503
 
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
504
 
 
505
 
    def __init__(self, sock, backing_transport):
506
 
        """Constructor.
507
 
 
508
 
        :param sock: the socket the server will read from.  It will be put
509
 
            into blocking mode.
510
 
        """
511
 
        SmartServerStreamMedium.__init__(self, backing_transport)
512
 
        self.push_back = ''
513
 
        sock.setblocking(True)
514
 
        self.socket = sock
515
 
 
516
 
    def _serve_one_request_unguarded(self, protocol):
517
 
        while protocol.next_read_size():
518
 
            if self.push_back:
519
 
                protocol.accept_bytes(self.push_back)
520
 
                self.push_back = ''
521
 
            else:
522
 
                bytes = self.socket.recv(4096)
523
 
                if bytes == '':
524
 
                    self.finished = True
525
 
                    return
526
 
                protocol.accept_bytes(bytes)
527
 
        
528
 
        self.push_back = protocol.excess_buffer
529
 
    
530
 
    def terminate_due_to_error(self):
531
 
        """Called when an unhandled exception from the protocol occurs."""
532
 
        # TODO: This should log to a server log file, but no such thing
533
 
        # exists yet.  Andrew Bennetts 2006-09-29.
534
 
        self.socket.close()
535
 
        self.finished = True
536
 
 
537
 
    def _write_out(self, bytes):
538
 
        self.socket.sendall(bytes)
539
 
 
540
 
 
541
 
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
542
 
 
543
 
    def __init__(self, in_file, out_file, backing_transport):
544
 
        """Construct new server.
545
 
 
546
 
        :param in_file: Python file from which requests can be read.
547
 
        :param out_file: Python file to write responses.
548
 
        :param backing_transport: Transport for the directory served.
549
 
        """
550
 
        SmartServerStreamMedium.__init__(self, backing_transport)
551
 
        self._in = in_file
552
 
        self._out = out_file
553
 
 
554
 
    def _serve_one_request_unguarded(self, protocol):
555
 
        while True:
556
 
            bytes_to_read = protocol.next_read_size()
557
 
            if bytes_to_read == 0:
558
 
                # Finished serving this request.
559
 
                self._out.flush()
560
 
                return
561
 
            bytes = self._in.read(bytes_to_read)
562
 
            if bytes == '':
563
 
                # Connection has been closed.
564
 
                self.finished = True
565
 
                self._out.flush()
566
 
                return
567
 
            protocol.accept_bytes(bytes)
568
 
 
569
 
    def terminate_due_to_error(self):
570
 
        # TODO: This should log to a server log file, but no such thing
571
 
        # exists yet.  Andrew Bennetts 2006-09-29.
572
 
        self._out.close()
573
 
        self.finished = True
574
 
 
575
 
    def _write_out(self, bytes):
576
 
        self._out.write(bytes)
577
 
 
578
 
 
579
 
class SmartServerResponse(object):
580
 
    """Response generated by SmartServerRequestHandler."""
581
 
 
582
 
    def __init__(self, args, body=None):
583
 
        self.args = args
584
 
        self.body = body
585
 
 
586
 
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
587
 
# for delivering the data for a request. This could be done with as the
588
 
# StreamServer, though that would create conflation between request and response
589
 
# which may be undesirable.
590
 
 
591
 
 
592
 
class SmartServerRequestHandler(object):
593
 
    """Protocol logic for smart server.
594
 
    
595
 
    This doesn't handle serialization at all, it just processes requests and
596
 
    creates responses.
597
 
    """
598
 
 
599
 
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
600
 
    # not contain encoding or decoding logic to allow the wire protocol to vary
601
 
    # from the object protocol: we will want to tweak the wire protocol separate
602
 
    # from the object model, and ideally we will be able to do that without
603
 
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
604
 
    # just a Protocol subclass.
605
 
 
606
 
    # TODO: Better way of representing the body for commands that take it,
607
 
    # and allow it to be streamed into the server.
608
 
    
609
 
    def __init__(self, backing_transport):
610
 
        self._backing_transport = backing_transport
611
 
        self._converted_command = False
612
 
        self.finished_reading = False
613
 
        self._body_bytes = ''
614
 
        self.response = None
615
 
 
616
 
    def accept_body(self, bytes):
617
 
        """Accept body data.
618
 
 
619
 
        This should be overriden for each command that desired body data to
620
 
        handle the right format of that data. I.e. plain bytes, a bundle etc.
621
 
 
622
 
        The deserialisation into that format should be done in the Protocol
623
 
        object. Set self.desired_body_format to the format your method will
624
 
        handle.
625
 
        """
626
 
        # default fallback is to accumulate bytes.
627
 
        self._body_bytes += bytes
628
 
        
629
 
    def _end_of_body_handler(self):
630
 
        """An unimplemented end of body handler."""
631
 
        raise NotImplementedError(self._end_of_body_handler)
632
 
        
633
 
    def do_hello(self):
634
 
        """Answer a version request with my version."""
635
 
        return SmartServerResponse(('ok', '1'))
636
 
 
637
 
    def do_has(self, relpath):
638
 
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
639
 
        return SmartServerResponse((r,))
640
 
 
641
 
    def do_get(self, relpath):
642
 
        backing_bytes = self._backing_transport.get_bytes(relpath)
643
 
        return SmartServerResponse(('ok',), backing_bytes)
644
 
 
645
 
    def _deserialise_optional_mode(self, mode):
646
 
        # XXX: FIXME this should be on the protocol object.
647
 
        if mode == '':
648
 
            return None
649
 
        else:
650
 
            return int(mode)
651
 
 
652
 
    def do_append(self, relpath, mode):
653
 
        self._converted_command = True
654
 
        self._relpath = relpath
655
 
        self._mode = self._deserialise_optional_mode(mode)
656
 
        self._end_of_body_handler = self._handle_do_append_end
657
 
    
658
 
    def _handle_do_append_end(self):
659
 
        old_length = self._backing_transport.append_bytes(
660
 
            self._relpath, self._body_bytes, self._mode)
661
 
        self.response = SmartServerResponse(('appended', '%d' % old_length))
662
 
 
663
 
    def do_delete(self, relpath):
664
 
        self._backing_transport.delete(relpath)
665
 
 
666
 
    def do_iter_files_recursive(self, abspath):
667
 
        # XXX: the path handling needs some thought.
668
 
        #relpath = self._backing_transport.relpath(abspath)
669
 
        transport = self._backing_transport.clone(abspath)
670
 
        filenames = transport.iter_files_recursive()
671
 
        return SmartServerResponse(('names',) + tuple(filenames))
672
 
 
673
 
    def do_list_dir(self, relpath):
674
 
        filenames = self._backing_transport.list_dir(relpath)
675
 
        return SmartServerResponse(('names',) + tuple(filenames))
676
 
 
677
 
    def do_mkdir(self, relpath, mode):
678
 
        self._backing_transport.mkdir(relpath,
679
 
                                      self._deserialise_optional_mode(mode))
680
 
 
681
 
    def do_move(self, rel_from, rel_to):
682
 
        self._backing_transport.move(rel_from, rel_to)
683
 
 
684
 
    def do_put(self, relpath, mode):
685
 
        self._converted_command = True
686
 
        self._relpath = relpath
687
 
        self._mode = self._deserialise_optional_mode(mode)
688
 
        self._end_of_body_handler = self._handle_do_put
689
 
 
690
 
    def _handle_do_put(self):
691
 
        self._backing_transport.put_bytes(self._relpath,
692
 
                self._body_bytes, self._mode)
693
 
        self.response = SmartServerResponse(('ok',))
694
 
 
695
 
    def _deserialise_offsets(self, text):
696
 
        # XXX: FIXME this should be on the protocol object.
697
 
        offsets = []
698
 
        for line in text.split('\n'):
699
 
            if not line:
700
 
                continue
701
 
            start, length = line.split(',')
702
 
            offsets.append((int(start), int(length)))
703
 
        return offsets
704
 
 
705
 
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
706
 
        self._converted_command = True
707
 
        self._end_of_body_handler = self._handle_put_non_atomic
708
 
        self._relpath = relpath
709
 
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
710
 
        self._mode = self._deserialise_optional_mode(mode)
711
 
        # a boolean would be nicer XXX
712
 
        self._create_parent = (create_parent == 'T')
713
 
 
714
 
    def _handle_put_non_atomic(self):
715
 
        self._backing_transport.put_bytes_non_atomic(self._relpath,
716
 
                self._body_bytes,
717
 
                mode=self._mode,
718
 
                create_parent_dir=self._create_parent,
719
 
                dir_mode=self._dir_mode)
720
 
        self.response = SmartServerResponse(('ok',))
721
 
 
722
 
    def do_readv(self, relpath):
723
 
        self._converted_command = True
724
 
        self._end_of_body_handler = self._handle_readv_offsets
725
 
        self._relpath = relpath
726
 
 
727
 
    def end_of_body(self):
728
 
        """No more body data will be received."""
729
 
        self._run_handler_code(self._end_of_body_handler, (), {})
730
 
        # cannot read after this.
731
 
        self.finished_reading = True
732
 
 
733
 
    def _handle_readv_offsets(self):
734
 
        """accept offsets for a readv request."""
735
 
        offsets = self._deserialise_offsets(self._body_bytes)
736
 
        backing_bytes = ''.join(bytes for offset, bytes in
737
 
            self._backing_transport.readv(self._relpath, offsets))
738
 
        self.response = SmartServerResponse(('readv',), backing_bytes)
739
 
        
740
 
    def do_rename(self, rel_from, rel_to):
741
 
        self._backing_transport.rename(rel_from, rel_to)
742
 
 
743
 
    def do_rmdir(self, relpath):
744
 
        self._backing_transport.rmdir(relpath)
745
 
 
746
 
    def do_stat(self, relpath):
747
 
        stat = self._backing_transport.stat(relpath)
748
 
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
749
 
        
750
 
    def do_get_bundle(self, path, revision_id):
751
 
        # open transport relative to our base
752
 
        t = self._backing_transport.clone(path)
753
 
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
754
 
        repo = control.open_repository()
755
 
        tmpf = tempfile.TemporaryFile()
756
 
        base_revision = revision.NULL_REVISION
757
 
        write_bundle(repo, revision_id, base_revision, tmpf)
758
 
        tmpf.seek(0)
759
 
        return SmartServerResponse((), tmpf.read())
760
 
 
761
 
    def dispatch_command(self, cmd, args):
762
 
        """Deprecated compatibility method.""" # XXX XXX
763
 
        func = getattr(self, 'do_' + cmd, None)
764
 
        if func is None:
765
 
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
766
 
        self._run_handler_code(func, args, {})
767
 
 
768
 
    def _run_handler_code(self, callable, args, kwargs):
769
 
        """Run some handler specific code 'callable'.
770
 
 
771
 
        If a result is returned, it is considered to be the commands response,
772
 
        and finished_reading is set true, and its assigned to self.response.
773
 
 
774
 
        Any exceptions caught are translated and a response object created
775
 
        from them.
776
 
        """
777
 
        result = self._call_converting_errors(callable, args, kwargs)
778
 
        if result is not None:
779
 
            self.response = result
780
 
            self.finished_reading = True
781
 
        # handle unconverted commands
782
 
        if not self._converted_command:
783
 
            self.finished_reading = True
784
 
            if result is None:
785
 
                self.response = SmartServerResponse(('ok',))
786
 
 
787
 
    def _call_converting_errors(self, callable, args, kwargs):
788
 
        """Call callable converting errors to Response objects."""
789
 
        try:
790
 
            return callable(*args, **kwargs)
791
 
        except errors.NoSuchFile, e:
792
 
            return SmartServerResponse(('NoSuchFile', e.path))
793
 
        except errors.FileExists, e:
794
 
            return SmartServerResponse(('FileExists', e.path))
795
 
        except errors.DirectoryNotEmpty, e:
796
 
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
797
 
        except errors.ShortReadvError, e:
798
 
            return SmartServerResponse(('ShortReadvError',
799
 
                e.path, str(e.offset), str(e.length), str(e.actual)))
800
 
        except UnicodeError, e:
801
 
            # If it is a DecodeError, than most likely we are starting
802
 
            # with a plain string
803
 
            str_or_unicode = e.object
804
 
            if isinstance(str_or_unicode, unicode):
805
 
                val = u'u:' + str_or_unicode
806
 
            else:
807
 
                val = u's:' + str_or_unicode.encode('base64')
808
 
            # This handles UnicodeEncodeError or UnicodeDecodeError
809
 
            return SmartServerResponse((e.__class__.__name__,
810
 
                    e.encoding, val, str(e.start), str(e.end), e.reason))
811
 
        except errors.TransportNotPossible, e:
812
 
            if e.msg == "readonly transport":
813
 
                return SmartServerResponse(('ReadOnlyError', ))
814
 
            else:
815
 
                raise
816
 
 
817
 
 
818
 
class SmartTCPServer(object):
819
 
    """Listens on a TCP socket and accepts connections from smart clients"""
820
 
 
821
 
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
822
 
        """Construct a new server.
823
 
 
824
 
        To actually start it running, call either start_background_thread or
825
 
        serve.
826
 
 
827
 
        :param host: Name of the interface to listen on.
828
 
        :param port: TCP port to listen on, or 0 to allocate a transient port.
829
 
        """
830
 
        self._server_socket = socket.socket()
831
 
        self._server_socket.bind((host, port))
832
 
        self.port = self._server_socket.getsockname()[1]
833
 
        self._server_socket.listen(1)
834
 
        self._server_socket.settimeout(1)
835
 
        self.backing_transport = backing_transport
836
 
 
837
 
    def serve(self):
838
 
        # let connections timeout so that we get a chance to terminate
839
 
        # Keep a reference to the exceptions we want to catch because the socket
840
 
        # module's globals get set to None during interpreter shutdown.
841
 
        from socket import timeout as socket_timeout
842
 
        from socket import error as socket_error
843
 
        self._should_terminate = False
844
 
        while not self._should_terminate:
845
 
            try:
846
 
                self.accept_and_serve()
847
 
            except socket_timeout:
848
 
                # just check if we're asked to stop
849
 
                pass
850
 
            except socket_error, e:
851
 
                trace.warning("client disconnected: %s", e)
852
 
                pass
853
 
 
854
 
    def get_url(self):
855
 
        """Return the url of the server"""
856
 
        return "bzr://%s:%d/" % self._server_socket.getsockname()
857
 
 
858
 
    def accept_and_serve(self):
859
 
        conn, client_addr = self._server_socket.accept()
860
 
        # For WIN32, where the timeout value from the listening socket
861
 
        # propogates to the newly accepted socket.
862
 
        conn.setblocking(True)
863
 
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
864
 
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
865
 
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
866
 
        connection_thread.setDaemon(True)
867
 
        connection_thread.start()
868
 
 
869
 
    def start_background_thread(self):
870
 
        self._server_thread = threading.Thread(None,
871
 
                self.serve,
872
 
                name='server-' + self.get_url())
873
 
        self._server_thread.setDaemon(True)
874
 
        self._server_thread.start()
875
 
 
876
 
    def stop_background_thread(self):
877
 
        self._should_terminate = True
878
 
        # self._server_socket.close()
879
 
        # we used to join the thread, but it's not really necessary; it will
880
 
        # terminate in time
881
 
        ## self._server_thread.join()
882
 
 
883
 
 
884
 
class SmartTCPServer_for_testing(SmartTCPServer):
885
 
    """Server suitable for use by transport tests.
886
 
    
887
 
    This server is backed by the process's cwd.
888
 
    """
889
 
 
890
 
    def __init__(self):
891
 
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
892
 
        # The server is set up by default like for ssh access: the client
893
 
        # passes filesystem-absolute paths; therefore the server must look
894
 
        # them up relative to the root directory.  it might be better to act
895
 
        # a public server and have the server rewrite paths into the test
896
 
        # directory.
897
 
        SmartTCPServer.__init__(self,
898
 
            transport.get_transport(urlutils.local_path_to_url('/')))
899
 
        
900
 
    def setUp(self):
901
 
        """Set up server for testing"""
902
 
        self.start_background_thread()
903
 
 
904
 
    def tearDown(self):
905
 
        self.stop_background_thread()
906
 
 
907
 
    def get_url(self):
908
 
        """Return the url of the server"""
909
 
        host, port = self._server_socket.getsockname()
910
 
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
911
 
 
912
 
    def get_bogus_url(self):
913
 
        """Return a URL which will fail to connect"""
914
 
        return 'bzr://127.0.0.1:1/'
915
 
 
916
 
 
917
 
class SmartStat(object):
918
 
 
919
 
    def __init__(self, size, mode):
920
 
        self.st_size = size
921
 
        self.st_mode = mode
922
 
 
923
 
 
924
 
class SmartTransport(transport.Transport):
925
 
    """Connection to a smart server.
926
 
 
927
 
    The connection holds references to pipes that can be used to send requests
928
 
    to the server.
929
 
 
930
 
    The connection has a notion of the current directory to which it's
931
 
    connected; this is incorporated in filenames passed to the server.
932
 
    
933
 
    This supports some higher-level RPC operations and can also be treated 
934
 
    like a Transport to do file-like operations.
935
 
 
936
 
    The connection can be made over a tcp socket, or (in future) an ssh pipe
937
 
    or a series of http requests.  There are concrete subclasses for each
938
 
    type: SmartTCPTransport, etc.
939
 
    """
940
 
 
941
 
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
942
 
    # responsibilities: Put those on SmartClient or similar. This is vital for
943
 
    # the ability to support multiple versions of the smart protocol over time:
944
 
    # SmartTransport is an adapter from the Transport object model to the 
945
 
    # SmartClient model, not an encoder.
946
 
 
947
 
    def __init__(self, url, clone_from=None, medium=None):
948
 
        """Constructor.
949
 
 
950
 
        :param medium: The medium to use for this RemoteTransport. This must be
951
 
            supplied if clone_from is None.
952
 
        """
953
 
        ### Technically super() here is faulty because Transport's __init__
954
 
        ### fails to take 2 parameters, and if super were to choose a silly
955
 
        ### initialisation order things would blow up. 
956
 
        if not url.endswith('/'):
957
 
            url += '/'
958
 
        super(SmartTransport, self).__init__(url)
959
 
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
960
 
                transport.split_url(url)
961
 
        if clone_from is None:
962
 
            self._medium = medium
963
 
        else:
964
 
            # credentials may be stripped from the base in some circumstances
965
 
            # as yet to be clearly defined or documented, so copy them.
966
 
            self._username = clone_from._username
967
 
            # reuse same connection
968
 
            self._medium = clone_from._medium
969
 
        assert self._medium is not None
970
 
 
971
 
    def abspath(self, relpath):
972
 
        """Return the full url to the given relative path.
973
 
        
974
 
        @param relpath: the relative path or path components
975
 
        @type relpath: str or list
976
 
        """
977
 
        return self._unparse_url(self._remote_path(relpath))
978
 
    
979
 
    def clone(self, relative_url):
980
 
        """Make a new SmartTransport related to me, sharing the same connection.
981
 
 
982
 
        This essentially opens a handle on a different remote directory.
983
 
        """
984
 
        if relative_url is None:
985
 
            return SmartTransport(self.base, self)
986
 
        else:
987
 
            return SmartTransport(self.abspath(relative_url), self)
988
 
 
989
 
    def is_readonly(self):
990
 
        """Smart server transport can do read/write file operations."""
991
 
        return False
992
 
                                                   
993
 
    def get_smart_client(self):
994
 
        return self._medium
995
 
 
996
 
    def get_smart_medium(self):
997
 
        return self._medium
998
 
                                                   
999
 
    def _unparse_url(self, path):
1000
 
        """Return URL for a path.
1001
 
 
1002
 
        :see: SFTPUrlHandling._unparse_url
1003
 
        """
1004
 
        # TODO: Eventually it should be possible to unify this with
1005
 
        # SFTPUrlHandling._unparse_url?
1006
 
        if path == '':
1007
 
            path = '/'
1008
 
        path = urllib.quote(path)
1009
 
        netloc = urllib.quote(self._host)
1010
 
        if self._username is not None:
1011
 
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1012
 
        if self._port is not None:
1013
 
            netloc = '%s:%d' % (netloc, self._port)
1014
 
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
1015
 
 
1016
 
    def _remote_path(self, relpath):
1017
 
        """Returns the Unicode version of the absolute path for relpath."""
1018
 
        return self._combine_paths(self._path, relpath)
1019
 
 
1020
 
    def _call(self, method, *args):
1021
 
        resp = self._call2(method, *args)
1022
 
        self._translate_error(resp)
1023
 
 
1024
 
    def _call2(self, method, *args):
1025
 
        """Call a method on the remote server."""
1026
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1027
 
        protocol.call(method, *args)
1028
 
        return protocol.read_response_tuple()
1029
 
 
1030
 
    def _call_with_body_bytes(self, method, args, body):
1031
 
        """Call a method on the remote server with body bytes."""
1032
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1033
 
        protocol.call_with_body_bytes((method, ) + args, body)
1034
 
        return protocol.read_response_tuple()
1035
 
 
1036
 
    def has(self, relpath):
1037
 
        """Indicate whether a remote file of the given name exists or not.
1038
 
 
1039
 
        :see: Transport.has()
1040
 
        """
1041
 
        resp = self._call2('has', self._remote_path(relpath))
1042
 
        if resp == ('yes', ):
1043
 
            return True
1044
 
        elif resp == ('no', ):
1045
 
            return False
1046
 
        else:
1047
 
            self._translate_error(resp)
1048
 
 
1049
 
    def get(self, relpath):
1050
 
        """Return file-like object reading the contents of a remote file.
1051
 
        
1052
 
        :see: Transport.get_bytes()/get_file()
1053
 
        """
1054
 
        return StringIO(self.get_bytes(relpath))
1055
 
 
1056
 
    def get_bytes(self, relpath):
1057
 
        remote = self._remote_path(relpath)
1058
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1059
 
        protocol.call('get', remote)
1060
 
        resp = protocol.read_response_tuple(True)
1061
 
        if resp != ('ok', ):
1062
 
            protocol.cancel_read_body()
1063
 
            self._translate_error(resp, relpath)
1064
 
        return protocol.read_body_bytes()
1065
 
 
1066
 
    def _serialise_optional_mode(self, mode):
1067
 
        if mode is None:
1068
 
            return ''
1069
 
        else:
1070
 
            return '%d' % mode
1071
 
 
1072
 
    def mkdir(self, relpath, mode=None):
1073
 
        resp = self._call2('mkdir', self._remote_path(relpath),
1074
 
            self._serialise_optional_mode(mode))
1075
 
        self._translate_error(resp)
1076
 
 
1077
 
    def put_bytes(self, relpath, upload_contents, mode=None):
1078
 
        # FIXME: upload_file is probably not safe for non-ascii characters -
1079
 
        # should probably just pass all parameters as length-delimited
1080
 
        # strings?
1081
 
        resp = self._call_with_body_bytes('put',
1082
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
1083
 
            upload_contents)
1084
 
        self._translate_error(resp)
1085
 
 
1086
 
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1087
 
                             create_parent_dir=False,
1088
 
                             dir_mode=None):
1089
 
        """See Transport.put_bytes_non_atomic."""
1090
 
        # FIXME: no encoding in the transport!
1091
 
        create_parent_str = 'F'
1092
 
        if create_parent_dir:
1093
 
            create_parent_str = 'T'
1094
 
 
1095
 
        resp = self._call_with_body_bytes(
1096
 
            'put_non_atomic',
1097
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
1098
 
             create_parent_str, self._serialise_optional_mode(dir_mode)),
1099
 
            bytes)
1100
 
        self._translate_error(resp)
1101
 
 
1102
 
    def put_file(self, relpath, upload_file, mode=None):
1103
 
        # its not ideal to seek back, but currently put_non_atomic_file depends
1104
 
        # on transports not reading before failing - which is a faulty
1105
 
        # assumption I think - RBC 20060915
1106
 
        pos = upload_file.tell()
1107
 
        try:
1108
 
            return self.put_bytes(relpath, upload_file.read(), mode)
1109
 
        except:
1110
 
            upload_file.seek(pos)
1111
 
            raise
1112
 
 
1113
 
    def put_file_non_atomic(self, relpath, f, mode=None,
1114
 
                            create_parent_dir=False,
1115
 
                            dir_mode=None):
1116
 
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
1117
 
                                         create_parent_dir=create_parent_dir,
1118
 
                                         dir_mode=dir_mode)
1119
 
 
1120
 
    def append_file(self, relpath, from_file, mode=None):
1121
 
        return self.append_bytes(relpath, from_file.read(), mode)
1122
 
        
1123
 
    def append_bytes(self, relpath, bytes, mode=None):
1124
 
        resp = self._call_with_body_bytes(
1125
 
            'append',
1126
 
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
1127
 
            bytes)
1128
 
        if resp[0] == 'appended':
1129
 
            return int(resp[1])
1130
 
        self._translate_error(resp)
1131
 
 
1132
 
    def delete(self, relpath):
1133
 
        resp = self._call2('delete', self._remote_path(relpath))
1134
 
        self._translate_error(resp)
1135
 
 
1136
 
    def readv(self, relpath, offsets):
1137
 
        if not offsets:
1138
 
            return
1139
 
 
1140
 
        offsets = list(offsets)
1141
 
 
1142
 
        sorted_offsets = sorted(offsets)
1143
 
        # turn the list of offsets into a stack
1144
 
        offset_stack = iter(offsets)
1145
 
        cur_offset_and_size = offset_stack.next()
1146
 
        coalesced = list(self._coalesce_offsets(sorted_offsets,
1147
 
                               limit=self._max_readv_combine,
1148
 
                               fudge_factor=self._bytes_to_read_before_seek))
1149
 
 
1150
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1151
 
        protocol.call_with_body_readv_array(
1152
 
            ('readv', self._remote_path(relpath)),
1153
 
            [(c.start, c.length) for c in coalesced])
1154
 
        resp = protocol.read_response_tuple(True)
1155
 
 
1156
 
        if resp[0] != 'readv':
1157
 
            # This should raise an exception
1158
 
            protocol.cancel_read_body()
1159
 
            self._translate_error(resp)
1160
 
            return
1161
 
 
1162
 
        # FIXME: this should know how many bytes are needed, for clarity.
1163
 
        data = protocol.read_body_bytes()
1164
 
        # Cache the results, but only until they have been fulfilled
1165
 
        data_map = {}
1166
 
        for c_offset in coalesced:
1167
 
            if len(data) < c_offset.length:
1168
 
                raise errors.ShortReadvError(relpath, c_offset.start,
1169
 
                            c_offset.length, actual=len(data))
1170
 
            for suboffset, subsize in c_offset.ranges:
1171
 
                key = (c_offset.start+suboffset, subsize)
1172
 
                data_map[key] = data[suboffset:suboffset+subsize]
1173
 
            data = data[c_offset.length:]
1174
 
 
1175
 
            # Now that we've read some data, see if we can yield anything back
1176
 
            while cur_offset_and_size in data_map:
1177
 
                this_data = data_map.pop(cur_offset_and_size)
1178
 
                yield cur_offset_and_size[0], this_data
1179
 
                cur_offset_and_size = offset_stack.next()
1180
 
 
1181
 
    def rename(self, rel_from, rel_to):
1182
 
        self._call('rename',
1183
 
                   self._remote_path(rel_from),
1184
 
                   self._remote_path(rel_to))
1185
 
 
1186
 
    def move(self, rel_from, rel_to):
1187
 
        self._call('move',
1188
 
                   self._remote_path(rel_from),
1189
 
                   self._remote_path(rel_to))
1190
 
 
1191
 
    def rmdir(self, relpath):
1192
 
        resp = self._call('rmdir', self._remote_path(relpath))
1193
 
 
1194
 
    def _translate_error(self, resp, orig_path=None):
1195
 
        """Raise an exception from a response"""
1196
 
        if resp is None:
1197
 
            what = None
1198
 
        else:
1199
 
            what = resp[0]
1200
 
        if what == 'ok':
1201
 
            return
1202
 
        elif what == 'NoSuchFile':
1203
 
            if orig_path is not None:
1204
 
                error_path = orig_path
1205
 
            else:
1206
 
                error_path = resp[1]
1207
 
            raise errors.NoSuchFile(error_path)
1208
 
        elif what == 'error':
1209
 
            raise errors.SmartProtocolError(unicode(resp[1]))
1210
 
        elif what == 'FileExists':
1211
 
            raise errors.FileExists(resp[1])
1212
 
        elif what == 'DirectoryNotEmpty':
1213
 
            raise errors.DirectoryNotEmpty(resp[1])
1214
 
        elif what == 'ShortReadvError':
1215
 
            raise errors.ShortReadvError(resp[1], int(resp[2]),
1216
 
                                         int(resp[3]), int(resp[4]))
1217
 
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1218
 
            encoding = str(resp[1]) # encoding must always be a string
1219
 
            val = resp[2]
1220
 
            start = int(resp[3])
1221
 
            end = int(resp[4])
1222
 
            reason = str(resp[5]) # reason must always be a string
1223
 
            if val.startswith('u:'):
1224
 
                val = val[2:]
1225
 
            elif val.startswith('s:'):
1226
 
                val = val[2:].decode('base64')
1227
 
            if what == 'UnicodeDecodeError':
1228
 
                raise UnicodeDecodeError(encoding, val, start, end, reason)
1229
 
            elif what == 'UnicodeEncodeError':
1230
 
                raise UnicodeEncodeError(encoding, val, start, end, reason)
1231
 
        elif what == "ReadOnlyError":
1232
 
            raise errors.TransportNotPossible('readonly transport')
1233
 
        else:
1234
 
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
1235
 
 
1236
 
    def disconnect(self):
1237
 
        self._medium.disconnect()
1238
 
 
1239
 
    def delete_tree(self, relpath):
1240
 
        raise errors.TransportNotPossible('readonly transport')
1241
 
 
1242
 
    def stat(self, relpath):
1243
 
        resp = self._call2('stat', self._remote_path(relpath))
1244
 
        if resp[0] == 'stat':
1245
 
            return SmartStat(int(resp[1]), int(resp[2], 8))
1246
 
        else:
1247
 
            self._translate_error(resp)
1248
 
 
1249
 
    ## def lock_read(self, relpath):
1250
 
    ##     """Lock the given file for shared (read) access.
1251
 
    ##     :return: A lock object, which should be passed to Transport.unlock()
1252
 
    ##     """
1253
 
    ##     # The old RemoteBranch ignore lock for reading, so we will
1254
 
    ##     # continue that tradition and return a bogus lock object.
1255
 
    ##     class BogusLock(object):
1256
 
    ##         def __init__(self, path):
1257
 
    ##             self.path = path
1258
 
    ##         def unlock(self):
1259
 
    ##             pass
1260
 
    ##     return BogusLock(relpath)
1261
 
 
1262
 
    def listable(self):
1263
 
        return True
1264
 
 
1265
 
    def list_dir(self, relpath):
1266
 
        resp = self._call2('list_dir', self._remote_path(relpath))
1267
 
        if resp[0] == 'names':
1268
 
            return [name.encode('ascii') for name in resp[1:]]
1269
 
        else:
1270
 
            self._translate_error(resp)
1271
 
 
1272
 
    def iter_files_recursive(self):
1273
 
        resp = self._call2('iter_files_recursive', self._remote_path(''))
1274
 
        if resp[0] == 'names':
1275
 
            return resp[1:]
1276
 
        else:
1277
 
            self._translate_error(resp)
1278
 
 
1279
 
 
1280
 
class SmartClientMediumRequest(object):
1281
 
    """A request on a SmartClientMedium.
1282
 
 
1283
 
    Each request allows bytes to be provided to it via accept_bytes, and then
1284
 
    the response bytes to be read via read_bytes.
1285
 
 
1286
 
    For instance:
1287
 
    request.accept_bytes('123')
1288
 
    request.finished_writing()
1289
 
    result = request.read_bytes(3)
1290
 
    request.finished_reading()
1291
 
 
1292
 
    It is up to the individual SmartClientMedium whether multiple concurrent
1293
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
1294
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
1295
 
    details on concurrency and pipelining.
1296
 
    """
1297
 
 
1298
 
    def __init__(self, medium):
1299
 
        """Construct a SmartClientMediumRequest for the medium medium."""
1300
 
        self._medium = medium
1301
 
        # we track state by constants - we may want to use the same
1302
 
        # pattern as BodyReader if it gets more complex.
1303
 
        # valid states are: "writing", "reading", "done"
1304
 
        self._state = "writing"
1305
 
 
1306
 
    def accept_bytes(self, bytes):
1307
 
        """Accept bytes for inclusion in this request.
1308
 
 
1309
 
        This method may not be be called after finished_writing() has been
1310
 
        called.  It depends upon the Medium whether or not the bytes will be
1311
 
        immediately transmitted. Message based Mediums will tend to buffer the
1312
 
        bytes until finished_writing() is called.
1313
 
 
1314
 
        :param bytes: A bytestring.
1315
 
        """
1316
 
        if self._state != "writing":
1317
 
            raise errors.WritingCompleted(self)
1318
 
        self._accept_bytes(bytes)
1319
 
 
1320
 
    def _accept_bytes(self, bytes):
1321
 
        """Helper for accept_bytes.
1322
 
 
1323
 
        Accept_bytes checks the state of the request to determing if bytes
1324
 
        should be accepted. After that it hands off to _accept_bytes to do the
1325
 
        actual acceptance.
1326
 
        """
1327
 
        raise NotImplementedError(self._accept_bytes)
1328
 
 
1329
 
    def finished_reading(self):
1330
 
        """Inform the request that all desired data has been read.
1331
 
 
1332
 
        This will remove the request from the pipeline for its medium (if the
1333
 
        medium supports pipelining) and any further calls to methods on the
1334
 
        request will raise ReadingCompleted.
1335
 
        """
1336
 
        if self._state == "writing":
1337
 
            raise errors.WritingNotComplete(self)
1338
 
        if self._state != "reading":
1339
 
            raise errors.ReadingCompleted(self)
1340
 
        self._state = "done"
1341
 
        self._finished_reading()
1342
 
 
1343
 
    def _finished_reading(self):
1344
 
        """Helper for finished_reading.
1345
 
 
1346
 
        finished_reading checks the state of the request to determine if 
1347
 
        finished_reading is allowed, and if it is hands off to _finished_reading
1348
 
        to perform the action.
1349
 
        """
1350
 
        raise NotImplementedError(self._finished_reading)
1351
 
 
1352
 
    def finished_writing(self):
1353
 
        """Finish the writing phase of this request.
1354
 
 
1355
 
        This will flush all pending data for this request along the medium.
1356
 
        After calling finished_writing, you may not call accept_bytes anymore.
1357
 
        """
1358
 
        if self._state != "writing":
1359
 
            raise errors.WritingCompleted(self)
1360
 
        self._state = "reading"
1361
 
        self._finished_writing()
1362
 
 
1363
 
    def _finished_writing(self):
1364
 
        """Helper for finished_writing.
1365
 
 
1366
 
        finished_writing checks the state of the request to determine if 
1367
 
        finished_writing is allowed, and if it is hands off to _finished_writing
1368
 
        to perform the action.
1369
 
        """
1370
 
        raise NotImplementedError(self._finished_writing)
1371
 
 
1372
 
    def read_bytes(self, count):
1373
 
        """Read bytes from this requests response.
1374
 
 
1375
 
        This method will block and wait for count bytes to be read. It may not
1376
 
        be invoked until finished_writing() has been called - this is to ensure
1377
 
        a message-based approach to requests, for compatability with message
1378
 
        based mediums like HTTP.
1379
 
        """
1380
 
        if self._state == "writing":
1381
 
            raise errors.WritingNotComplete(self)
1382
 
        if self._state != "reading":
1383
 
            raise errors.ReadingCompleted(self)
1384
 
        return self._read_bytes(count)
1385
 
 
1386
 
    def _read_bytes(self, count):
1387
 
        """Helper for read_bytes.
1388
 
 
1389
 
        read_bytes checks the state of the request to determing if bytes
1390
 
        should be read. After that it hands off to _read_bytes to do the
1391
 
        actual read.
1392
 
        """
1393
 
        raise NotImplementedError(self._read_bytes)
1394
 
 
1395
 
 
1396
 
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1397
 
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1398
 
 
1399
 
    def __init__(self, medium):
1400
 
        SmartClientMediumRequest.__init__(self, medium)
1401
 
        # check that we are safe concurrency wise. If some streams start
1402
 
        # allowing concurrent requests - i.e. via multiplexing - then this
1403
 
        # assert should be moved to SmartClientStreamMedium.get_request,
1404
 
        # and the setting/unsetting of _current_request likewise moved into
1405
 
        # that class : but its unneeded overhead for now. RBC 20060922
1406
 
        if self._medium._current_request is not None:
1407
 
            raise errors.TooManyConcurrentRequests(self._medium)
1408
 
        self._medium._current_request = self
1409
 
 
1410
 
    def _accept_bytes(self, bytes):
1411
 
        """See SmartClientMediumRequest._accept_bytes.
1412
 
        
1413
 
        This forwards to self._medium._accept_bytes because we are operating
1414
 
        on the mediums stream.
1415
 
        """
1416
 
        self._medium._accept_bytes(bytes)
1417
 
 
1418
 
    def _finished_reading(self):
1419
 
        """See SmartClientMediumRequest._finished_reading.
1420
 
 
1421
 
        This clears the _current_request on self._medium to allow a new 
1422
 
        request to be created.
1423
 
        """
1424
 
        assert self._medium._current_request is self
1425
 
        self._medium._current_request = None
1426
 
        
1427
 
    def _finished_writing(self):
1428
 
        """See SmartClientMediumRequest._finished_writing.
1429
 
 
1430
 
        This invokes self._medium._flush to ensure all bytes are transmitted.
1431
 
        """
1432
 
        self._medium._flush()
1433
 
 
1434
 
    def _read_bytes(self, count):
1435
 
        """See SmartClientMediumRequest._read_bytes.
1436
 
        
1437
 
        This forwards to self._medium._read_bytes because we are operating
1438
 
        on the mediums stream.
1439
 
        """
1440
 
        return self._medium._read_bytes(count)
1441
 
 
1442
 
 
1443
 
class SmartClientRequestProtocolOne(SmartProtocolBase):
1444
 
    """The client-side protocol for smart version 1."""
1445
 
 
1446
 
    def __init__(self, request):
1447
 
        """Construct a SmartClientRequestProtocolOne.
1448
 
 
1449
 
        :param request: A SmartClientMediumRequest to serialise onto and
1450
 
            deserialise from.
1451
 
        """
1452
 
        self._request = request
1453
 
        self._body_buffer = None
1454
 
 
1455
 
    def call(self, *args):
1456
 
        bytes = _encode_tuple(args)
1457
 
        self._request.accept_bytes(bytes)
1458
 
        self._request.finished_writing()
1459
 
 
1460
 
    def call_with_body_bytes(self, args, body):
1461
 
        """Make a remote call of args with body bytes 'body'.
1462
 
 
1463
 
        After calling this, call read_response_tuple to find the result out.
1464
 
        """
1465
 
        bytes = _encode_tuple(args)
1466
 
        self._request.accept_bytes(bytes)
1467
 
        bytes = self._encode_bulk_data(body)
1468
 
        self._request.accept_bytes(bytes)
1469
 
        self._request.finished_writing()
1470
 
 
1471
 
    def call_with_body_readv_array(self, args, body):
1472
 
        """Make a remote call with a readv array.
1473
 
 
1474
 
        The body is encoded with one line per readv offset pair. The numbers in
1475
 
        each pair are separated by a comma, and no trailing \n is emitted.
1476
 
        """
1477
 
        bytes = _encode_tuple(args)
1478
 
        self._request.accept_bytes(bytes)
1479
 
        readv_bytes = self._serialise_offsets(body)
1480
 
        bytes = self._encode_bulk_data(readv_bytes)
1481
 
        self._request.accept_bytes(bytes)
1482
 
        self._request.finished_writing()
1483
 
 
1484
 
    def cancel_read_body(self):
1485
 
        """After expecting a body, a response code may indicate one otherwise.
1486
 
 
1487
 
        This method lets the domain client inform the protocol that no body
1488
 
        will be transmitted. This is a terminal method: after calling it the
1489
 
        protocol is not able to be used further.
1490
 
        """
1491
 
        self._request.finished_reading()
1492
 
 
1493
 
    def read_response_tuple(self, expect_body=False):
1494
 
        """Read a response tuple from the wire.
1495
 
 
1496
 
        This should only be called once.
1497
 
        """
1498
 
        result = self._recv_tuple()
1499
 
        if not expect_body:
1500
 
            self._request.finished_reading()
1501
 
        return result
1502
 
 
1503
 
    def read_body_bytes(self, count=-1):
1504
 
        """Read bytes from the body, decoding into a byte stream.
1505
 
        
1506
 
        We read all bytes at once to ensure we've checked the trailer for 
1507
 
        errors, and then feed the buffer back as read_body_bytes is called.
1508
 
        """
1509
 
        if self._body_buffer is not None:
1510
 
            return self._body_buffer.read(count)
1511
 
        _body_decoder = LengthPrefixedBodyDecoder()
1512
 
 
1513
 
        while not _body_decoder.finished_reading:
1514
 
            bytes_wanted = _body_decoder.next_read_size()
1515
 
            bytes = self._request.read_bytes(bytes_wanted)
1516
 
            _body_decoder.accept_bytes(bytes)
1517
 
        self._request.finished_reading()
1518
 
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
1519
 
        # XXX: TODO check the trailer result.
1520
 
        return self._body_buffer.read(count)
1521
 
 
1522
 
    def _recv_tuple(self):
1523
 
        """Receive a tuple from the medium request."""
1524
 
        line = ''
1525
 
        while not line or line[-1] != '\n':
1526
 
            # TODO: this is inefficient - but tuples are short.
1527
 
            new_char = self._request.read_bytes(1)
1528
 
            line += new_char
1529
 
            assert new_char != '', "end of file reading from server."
1530
 
        return _decode_tuple(line)
1531
 
 
1532
 
    def query_version(self):
1533
 
        """Return protocol version number of the server."""
1534
 
        self.call('hello')
1535
 
        resp = self.read_response_tuple()
1536
 
        if resp == ('ok', '1'):
1537
 
            return 1
1538
 
        else:
1539
 
            raise errors.SmartProtocolError("bad response %r" % (resp,))
1540
 
 
1541
 
 
1542
 
class SmartClientMedium(object):
1543
 
    """Smart client is a medium for sending smart protocol requests over."""
1544
 
 
1545
 
    def disconnect(self):
1546
 
        """If this medium maintains a persistent connection, close it.
1547
 
        
1548
 
        The default implementation does nothing.
1549
 
        """
1550
 
        
1551
 
 
1552
 
class SmartClientStreamMedium(SmartClientMedium):
1553
 
    """Stream based medium common class.
1554
 
 
1555
 
    SmartClientStreamMediums operate on a stream. All subclasses use a common
1556
 
    SmartClientStreamMediumRequest for their requests, and should implement
1557
 
    _accept_bytes and _read_bytes to allow the request objects to send and
1558
 
    receive bytes.
1559
 
    """
1560
 
 
1561
 
    def __init__(self):
1562
 
        self._current_request = None
1563
 
 
1564
 
    def accept_bytes(self, bytes):
1565
 
        self._accept_bytes(bytes)
1566
 
 
1567
 
    def __del__(self):
1568
 
        """The SmartClientStreamMedium knows how to close the stream when it is
1569
 
        finished with it.
1570
 
        """
1571
 
        self.disconnect()
1572
 
 
1573
 
    def _flush(self):
1574
 
        """Flush the output stream.
1575
 
        
1576
 
        This method is used by the SmartClientStreamMediumRequest to ensure that
1577
 
        all data for a request is sent, to avoid long timeouts or deadlocks.
1578
 
        """
1579
 
        raise NotImplementedError(self._flush)
1580
 
 
1581
 
    def get_request(self):
1582
 
        """See SmartClientMedium.get_request().
1583
 
 
1584
 
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1585
 
        for get_request.
1586
 
        """
1587
 
        return SmartClientStreamMediumRequest(self)
1588
 
 
1589
 
    def read_bytes(self, count):
1590
 
        return self._read_bytes(count)
1591
 
 
1592
 
 
1593
 
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1594
 
    """A client medium using simple pipes.
1595
 
    
1596
 
    This client does not manage the pipes: it assumes they will always be open.
1597
 
    """
1598
 
 
1599
 
    def __init__(self, readable_pipe, writeable_pipe):
1600
 
        SmartClientStreamMedium.__init__(self)
1601
 
        self._readable_pipe = readable_pipe
1602
 
        self._writeable_pipe = writeable_pipe
1603
 
 
1604
 
    def _accept_bytes(self, bytes):
1605
 
        """See SmartClientStreamMedium.accept_bytes."""
1606
 
        self._writeable_pipe.write(bytes)
1607
 
 
1608
 
    def _flush(self):
1609
 
        """See SmartClientStreamMedium._flush()."""
1610
 
        self._writeable_pipe.flush()
1611
 
 
1612
 
    def _read_bytes(self, count):
1613
 
        """See SmartClientStreamMedium._read_bytes."""
1614
 
        return self._readable_pipe.read(count)
1615
 
 
1616
 
 
1617
 
class SmartSSHClientMedium(SmartClientStreamMedium):
1618
 
    """A client medium using SSH."""
1619
 
    
1620
 
    def __init__(self, host, port=None, username=None, password=None,
1621
 
            vendor=None):
1622
 
        """Creates a client that will connect on the first use.
1623
 
        
1624
 
        :param vendor: An optional override for the ssh vendor to use. See
1625
 
            bzrlib.transport.ssh for details on ssh vendors.
1626
 
        """
1627
 
        SmartClientStreamMedium.__init__(self)
1628
 
        self._connected = False
1629
 
        self._host = host
1630
 
        self._password = password
1631
 
        self._port = port
1632
 
        self._username = username
1633
 
        self._read_from = None
1634
 
        self._ssh_connection = None
1635
 
        self._vendor = vendor
1636
 
        self._write_to = None
1637
 
 
1638
 
    def _accept_bytes(self, bytes):
1639
 
        """See SmartClientStreamMedium.accept_bytes."""
1640
 
        self._ensure_connection()
1641
 
        self._write_to.write(bytes)
1642
 
 
1643
 
    def disconnect(self):
1644
 
        """See SmartClientMedium.disconnect()."""
1645
 
        if not self._connected:
1646
 
            return
1647
 
        self._read_from.close()
1648
 
        self._write_to.close()
1649
 
        self._ssh_connection.close()
1650
 
        self._connected = False
1651
 
 
1652
 
    def _ensure_connection(self):
1653
 
        """Connect this medium if not already connected."""
1654
 
        if self._connected:
1655
 
            return
1656
 
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1657
 
        if self._vendor is None:
1658
 
            vendor = ssh._get_ssh_vendor()
1659
 
        else:
1660
 
            vendor = self._vendor
1661
 
        self._ssh_connection = vendor.connect_ssh(self._username,
1662
 
                self._password, self._host, self._port,
1663
 
                command=[executable, 'serve', '--inet', '--directory=/',
1664
 
                         '--allow-writes'])
1665
 
        self._read_from, self._write_to = \
1666
 
            self._ssh_connection.get_filelike_channels()
1667
 
        self._connected = True
1668
 
 
1669
 
    def _flush(self):
1670
 
        """See SmartClientStreamMedium._flush()."""
1671
 
        self._write_to.flush()
1672
 
 
1673
 
    def _read_bytes(self, count):
1674
 
        """See SmartClientStreamMedium.read_bytes."""
1675
 
        if not self._connected:
1676
 
            raise errors.MediumNotConnected(self)
1677
 
        return self._read_from.read(count)
1678
 
 
1679
 
 
1680
 
class SmartTCPClientMedium(SmartClientStreamMedium):
1681
 
    """A client medium using TCP."""
1682
 
    
1683
 
    def __init__(self, host, port):
1684
 
        """Creates a client that will connect on the first use."""
1685
 
        SmartClientStreamMedium.__init__(self)
1686
 
        self._connected = False
1687
 
        self._host = host
1688
 
        self._port = port
1689
 
        self._socket = None
1690
 
 
1691
 
    def _accept_bytes(self, bytes):
1692
 
        """See SmartClientMedium.accept_bytes."""
1693
 
        self._ensure_connection()
1694
 
        self._socket.sendall(bytes)
1695
 
 
1696
 
    def disconnect(self):
1697
 
        """See SmartClientMedium.disconnect()."""
1698
 
        if not self._connected:
1699
 
            return
1700
 
        self._socket.close()
1701
 
        self._socket = None
1702
 
        self._connected = False
1703
 
 
1704
 
    def _ensure_connection(self):
1705
 
        """Connect this medium if not already connected."""
1706
 
        if self._connected:
1707
 
            return
1708
 
        self._socket = socket.socket()
1709
 
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1710
 
        result = self._socket.connect_ex((self._host, int(self._port)))
1711
 
        if result:
1712
 
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1713
 
                    (self._host, self._port, os.strerror(result)))
1714
 
        self._connected = True
1715
 
 
1716
 
    def _flush(self):
1717
 
        """See SmartClientStreamMedium._flush().
1718
 
        
1719
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
1720
 
        add a means to do a flush, but that can be done in the future.
1721
 
        """
1722
 
 
1723
 
    def _read_bytes(self, count):
1724
 
        """See SmartClientMedium.read_bytes."""
1725
 
        if not self._connected:
1726
 
            raise errors.MediumNotConnected(self)
1727
 
        return self._socket.recv(count)
1728
 
 
1729
 
 
1730
 
class SmartTCPTransport(SmartTransport):
1731
 
    """Connection to smart server over plain tcp.
1732
 
    
1733
 
    This is essentially just a factory to get 'RemoteTransport(url,
1734
 
        SmartTCPClientMedium).
1735
 
    """
1736
 
 
1737
 
    def __init__(self, url):
1738
 
        _scheme, _username, _password, _host, _port, _path = \
1739
 
            transport.split_url(url)
1740
 
        try:
1741
 
            _port = int(_port)
1742
 
        except (ValueError, TypeError), e:
1743
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1744
 
        medium = SmartTCPClientMedium(_host, _port)
1745
 
        super(SmartTCPTransport, self).__init__(url, medium=medium)
1746
 
 
1747
 
 
1748
 
class SmartSSHTransport(SmartTransport):
1749
 
    """Connection to smart server over SSH.
1750
 
 
1751
 
    This is essentially just a factory to get 'RemoteTransport(url,
1752
 
        SmartSSHClientMedium).
1753
 
    """
1754
 
 
1755
 
    def __init__(self, url):
1756
 
        _scheme, _username, _password, _host, _port, _path = \
1757
 
            transport.split_url(url)
1758
 
        try:
1759
 
            if _port is not None:
1760
 
                _port = int(_port)
1761
 
        except (ValueError, TypeError), e:
1762
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
1763
 
                _port)
1764
 
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
1765
 
        super(SmartSSHTransport, self).__init__(url, medium=medium)
1766
 
 
1767
 
 
1768
 
def get_test_permutations():
1769
 
    """Return (transport, server) permutations for testing."""
1770
 
    ### We may need a little more test framework support to construct an
1771
 
    ### appropriate RemoteTransport in the future.
1772
 
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]