~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: wang
  • Date: 2006-10-29 13:41:32 UTC
  • mto: (2104.4.1 wang_65714)
  • mto: This revision was merged to the branch mainline in revision 2109.
  • Revision ID: wang@ubuntu-20061029134132-3d7f4216f20c4aef
Replace python's difflib by patiencediff because the worst case 
performance is cubic for difflib and people commiting large data 
files are often hurt by this. The worst case performance of patience is 
quadratic. Fix bug 65714.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
 
 
197
from cStringIO import StringIO
 
198
import os
 
199
import socket
 
200
import tempfile
 
201
import threading
 
202
import urllib
 
203
import urlparse
 
204
 
 
205
from bzrlib import (
 
206
    bzrdir,
 
207
    errors,
 
208
    revision,
 
209
    transport,
 
210
    trace,
 
211
    urlutils,
 
212
    )
 
213
from bzrlib.bundle.serializer import write_bundle
 
214
try:
 
215
    from bzrlib.transport import ssh
 
216
except errors.ParamikoNotPresent:
 
217
    # no paramiko.  SmartSSHClientMedium will break.
 
218
    pass
 
219
 
 
220
# must do this otherwise urllib can't parse the urls properly :(
 
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
 
222
    transport.register_urlparse_netloc_protocol(scheme)
 
223
del scheme
 
224
 
 
225
 
 
226
def _recv_tuple(from_file):
 
227
    req_line = from_file.readline()
 
228
    return _decode_tuple(req_line)
 
229
 
 
230
 
 
231
def _decode_tuple(req_line):
 
232
    if req_line == None or req_line == '':
 
233
        return None
 
234
    if req_line[-1] != '\n':
 
235
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
236
    try:
 
237
        return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
238
    except UnicodeDecodeError:
 
239
        raise errors.SmartProtocolError(
 
240
            "one or more arguments of request %r are not valid UTF-8"
 
241
            % req_line)
 
242
 
 
243
 
 
244
def _encode_tuple(args):
 
245
    """Encode the tuple args to a bytestream."""
 
246
    return '\x01'.join((a.encode('utf-8') for a in args)) + '\n'
 
247
 
 
248
 
 
249
class SmartProtocolBase(object):
 
250
    """Methods common to client and server"""
 
251
 
 
252
    # TODO: this only actually accomodates a single block; possibly should
 
253
    # support multiple chunks?
 
254
    def _encode_bulk_data(self, body):
 
255
        """Encode body as a bulk data chunk."""
 
256
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
257
 
 
258
    def _serialise_offsets(self, offsets):
 
259
        """Serialise a readv offset list."""
 
260
        txt = []
 
261
        for start, length in offsets:
 
262
            txt.append('%d,%d' % (start, length))
 
263
        return '\n'.join(txt)
 
264
        
 
265
 
 
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
267
    """Server-side encoding and decoding logic for smart version 1."""
 
268
    
 
269
    def __init__(self, backing_transport, write_func):
 
270
        self._backing_transport = backing_transport
 
271
        self.excess_buffer = ''
 
272
        self._finished_reading = False
 
273
        self.in_buffer = ''
 
274
        self.has_dispatched = False
 
275
        self.request = None
 
276
        self._body_decoder = None
 
277
        self._write_func = write_func
 
278
 
 
279
    def accept_bytes(self, bytes):
 
280
        """Take bytes, and advance the internal state machine appropriately.
 
281
        
 
282
        :param bytes: must be a byte string
 
283
        """
 
284
        assert isinstance(bytes, str)
 
285
        self.in_buffer += bytes
 
286
        if not self.has_dispatched:
 
287
            if '\n' not in self.in_buffer:
 
288
                # no command line yet
 
289
                return
 
290
            self.has_dispatched = True
 
291
            try:
 
292
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
293
                first_line += '\n'
 
294
                req_args = _decode_tuple(first_line)
 
295
                self.request = SmartServerRequestHandler(
 
296
                    self._backing_transport)
 
297
                self.request.dispatch_command(req_args[0], req_args[1:])
 
298
                if self.request.finished_reading:
 
299
                    # trivial request
 
300
                    self.excess_buffer = self.in_buffer
 
301
                    self.in_buffer = ''
 
302
                    self._send_response(self.request.response.args,
 
303
                        self.request.response.body)
 
304
                self.sync_with_request(self.request)
 
305
            except KeyboardInterrupt:
 
306
                raise
 
307
            except Exception, exception:
 
308
                # everything else: pass to client, flush, and quit
 
309
                self._send_response(('error', str(exception)))
 
310
                return None
 
311
 
 
312
        if self.has_dispatched:
 
313
            if self._finished_reading:
 
314
                # nothing to do.XXX: this routine should be a single state 
 
315
                # machine too.
 
316
                self.excess_buffer += self.in_buffer
 
317
                self.in_buffer = ''
 
318
                return
 
319
            if self._body_decoder is None:
 
320
                self._body_decoder = LengthPrefixedBodyDecoder()
 
321
            self._body_decoder.accept_bytes(self.in_buffer)
 
322
            self.in_buffer = self._body_decoder.unused_data
 
323
            body_data = self._body_decoder.read_pending_data()
 
324
            self.request.accept_body(body_data)
 
325
            if self._body_decoder.finished_reading:
 
326
                self.request.end_of_body()
 
327
                assert self.request.finished_reading, \
 
328
                    "no more body, request not finished"
 
329
            self.sync_with_request(self.request)
 
330
            if self.request.response is not None:
 
331
                self._send_response(self.request.response.args,
 
332
                    self.request.response.body)
 
333
                self.excess_buffer = self.in_buffer
 
334
                self.in_buffer = ''
 
335
            else:
 
336
                assert not self.request.finished_reading, \
 
337
                    "no response and we have finished reading."
 
338
 
 
339
    def _send_response(self, args, body=None):
 
340
        """Send a smart server response down the output stream."""
 
341
        self._write_func(_encode_tuple(args))
 
342
        if body is not None:
 
343
            assert isinstance(body, str), 'body must be a str'
 
344
            bytes = self._encode_bulk_data(body)
 
345
            self._write_func(bytes)
 
346
 
 
347
    def sync_with_request(self, request):
 
348
        self._finished_reading = request.finished_reading
 
349
        
 
350
    def next_read_size(self):
 
351
        if self._finished_reading:
 
352
            return 0
 
353
        if self._body_decoder is None:
 
354
            return 1
 
355
        else:
 
356
            return self._body_decoder.next_read_size()
 
357
 
 
358
 
 
359
class LengthPrefixedBodyDecoder(object):
 
360
    """Decodes the length-prefixed bulk data."""
 
361
    
 
362
    def __init__(self):
 
363
        self.bytes_left = None
 
364
        self.finished_reading = False
 
365
        self.unused_data = ''
 
366
        self.state_accept = self._state_accept_expecting_length
 
367
        self.state_read = self._state_read_no_data
 
368
        self._in_buffer = ''
 
369
        self._trailer_buffer = ''
 
370
    
 
371
    def accept_bytes(self, bytes):
 
372
        """Decode as much of bytes as possible.
 
373
 
 
374
        If 'bytes' contains too much data it will be appended to
 
375
        self.unused_data.
 
376
 
 
377
        finished_reading will be set when no more data is required.  Further
 
378
        data will be appended to self.unused_data.
 
379
        """
 
380
        # accept_bytes is allowed to change the state
 
381
        current_state = self.state_accept
 
382
        self.state_accept(bytes)
 
383
        while current_state != self.state_accept:
 
384
            current_state = self.state_accept
 
385
            self.state_accept('')
 
386
 
 
387
    def next_read_size(self):
 
388
        if self.bytes_left is not None:
 
389
            # Ideally we want to read all the remainder of the body and the
 
390
            # trailer in one go.
 
391
            return self.bytes_left + 5
 
392
        elif self.state_accept == self._state_accept_reading_trailer:
 
393
            # Just the trailer left
 
394
            return 5 - len(self._trailer_buffer)
 
395
        elif self.state_accept == self._state_accept_expecting_length:
 
396
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
397
            # 'done\n').
 
398
            return 6
 
399
        else:
 
400
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
401
            return 1
 
402
        
 
403
    def read_pending_data(self):
 
404
        """Return any pending data that has been decoded."""
 
405
        return self.state_read()
 
406
 
 
407
    def _state_accept_expecting_length(self, bytes):
 
408
        self._in_buffer += bytes
 
409
        pos = self._in_buffer.find('\n')
 
410
        if pos == -1:
 
411
            return
 
412
        self.bytes_left = int(self._in_buffer[:pos])
 
413
        self._in_buffer = self._in_buffer[pos+1:]
 
414
        self.bytes_left -= len(self._in_buffer)
 
415
        self.state_accept = self._state_accept_reading_body
 
416
        self.state_read = self._state_read_in_buffer
 
417
 
 
418
    def _state_accept_reading_body(self, bytes):
 
419
        self._in_buffer += bytes
 
420
        self.bytes_left -= len(bytes)
 
421
        if self.bytes_left <= 0:
 
422
            # Finished with body
 
423
            if self.bytes_left != 0:
 
424
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
425
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
426
            self.bytes_left = None
 
427
            self.state_accept = self._state_accept_reading_trailer
 
428
        
 
429
    def _state_accept_reading_trailer(self, bytes):
 
430
        self._trailer_buffer += bytes
 
431
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
432
        # a ProtocolViolation exception?
 
433
        if self._trailer_buffer.startswith('done\n'):
 
434
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
435
            self.state_accept = self._state_accept_reading_unused
 
436
            self.finished_reading = True
 
437
    
 
438
    def _state_accept_reading_unused(self, bytes):
 
439
        self.unused_data += bytes
 
440
 
 
441
    def _state_read_no_data(self):
 
442
        return ''
 
443
 
 
444
    def _state_read_in_buffer(self):
 
445
        result = self._in_buffer
 
446
        self._in_buffer = ''
 
447
        return result
 
448
 
 
449
 
 
450
class SmartServerStreamMedium(object):
 
451
    """Handles smart commands coming over a stream.
 
452
 
 
453
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
454
    in-process fifo for testing.
 
455
 
 
456
    One instance is created for each connected client; it can serve multiple
 
457
    requests in the lifetime of the connection.
 
458
 
 
459
    The server passes requests through to an underlying backing transport, 
 
460
    which will typically be a LocalTransport looking at the server's filesystem.
 
461
    """
 
462
 
 
463
    def __init__(self, backing_transport):
 
464
        """Construct new server.
 
465
 
 
466
        :param backing_transport: Transport for the directory served.
 
467
        """
 
468
        # backing_transport could be passed to serve instead of __init__
 
469
        self.backing_transport = backing_transport
 
470
        self.finished = False
 
471
 
 
472
    def serve(self):
 
473
        """Serve requests until the client disconnects."""
 
474
        # Keep a reference to stderr because the sys module's globals get set to
 
475
        # None during interpreter shutdown.
 
476
        from sys import stderr
 
477
        try:
 
478
            while not self.finished:
 
479
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
480
                                                         self._write_out)
 
481
                self._serve_one_request(protocol)
 
482
        except Exception, e:
 
483
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
484
            raise
 
485
 
 
486
    def _serve_one_request(self, protocol):
 
487
        """Read one request from input, process, send back a response.
 
488
        
 
489
        :param protocol: a SmartServerRequestProtocol.
 
490
        """
 
491
        try:
 
492
            self._serve_one_request_unguarded(protocol)
 
493
        except KeyboardInterrupt:
 
494
            raise
 
495
        except Exception, e:
 
496
            self.terminate_due_to_error()
 
497
 
 
498
    def terminate_due_to_error(self):
 
499
        """Called when an unhandled exception from the protocol occurs."""
 
500
        raise NotImplementedError(self.terminate_due_to_error)
 
501
 
 
502
 
 
503
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
504
 
 
505
    def __init__(self, sock, backing_transport):
 
506
        """Constructor.
 
507
 
 
508
        :param sock: the socket the server will read from.  It will be put
 
509
            into blocking mode.
 
510
        """
 
511
        SmartServerStreamMedium.__init__(self, backing_transport)
 
512
        self.push_back = ''
 
513
        sock.setblocking(True)
 
514
        self.socket = sock
 
515
 
 
516
    def _serve_one_request_unguarded(self, protocol):
 
517
        while protocol.next_read_size():
 
518
            if self.push_back:
 
519
                protocol.accept_bytes(self.push_back)
 
520
                self.push_back = ''
 
521
            else:
 
522
                bytes = self.socket.recv(4096)
 
523
                if bytes == '':
 
524
                    self.finished = True
 
525
                    return
 
526
                protocol.accept_bytes(bytes)
 
527
        
 
528
        self.push_back = protocol.excess_buffer
 
529
    
 
530
    def terminate_due_to_error(self):
 
531
        """Called when an unhandled exception from the protocol occurs."""
 
532
        # TODO: This should log to a server log file, but no such thing
 
533
        # exists yet.  Andrew Bennetts 2006-09-29.
 
534
        self.socket.close()
 
535
        self.finished = True
 
536
 
 
537
    def _write_out(self, bytes):
 
538
        self.socket.sendall(bytes)
 
539
 
 
540
 
 
541
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
542
 
 
543
    def __init__(self, in_file, out_file, backing_transport):
 
544
        """Construct new server.
 
545
 
 
546
        :param in_file: Python file from which requests can be read.
 
547
        :param out_file: Python file to write responses.
 
548
        :param backing_transport: Transport for the directory served.
 
549
        """
 
550
        SmartServerStreamMedium.__init__(self, backing_transport)
 
551
        self._in = in_file
 
552
        self._out = out_file
 
553
 
 
554
    def _serve_one_request_unguarded(self, protocol):
 
555
        while True:
 
556
            bytes_to_read = protocol.next_read_size()
 
557
            if bytes_to_read == 0:
 
558
                # Finished serving this request.
 
559
                self._out.flush()
 
560
                return
 
561
            bytes = self._in.read(bytes_to_read)
 
562
            if bytes == '':
 
563
                # Connection has been closed.
 
564
                self.finished = True
 
565
                self._out.flush()
 
566
                return
 
567
            protocol.accept_bytes(bytes)
 
568
 
 
569
    def terminate_due_to_error(self):
 
570
        # TODO: This should log to a server log file, but no such thing
 
571
        # exists yet.  Andrew Bennetts 2006-09-29.
 
572
        self._out.close()
 
573
        self.finished = True
 
574
 
 
575
    def _write_out(self, bytes):
 
576
        self._out.write(bytes)
 
577
 
 
578
 
 
579
class SmartServerResponse(object):
 
580
    """Response generated by SmartServerRequestHandler."""
 
581
 
 
582
    def __init__(self, args, body=None):
 
583
        self.args = args
 
584
        self.body = body
 
585
 
 
586
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
587
# for delivering the data for a request. This could be done with as the
 
588
# StreamServer, though that would create conflation between request and response
 
589
# which may be undesirable.
 
590
 
 
591
 
 
592
class SmartServerRequestHandler(object):
 
593
    """Protocol logic for smart server.
 
594
    
 
595
    This doesn't handle serialization at all, it just processes requests and
 
596
    creates responses.
 
597
    """
 
598
 
 
599
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
600
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
601
    # from the object protocol: we will want to tweak the wire protocol separate
 
602
    # from the object model, and ideally we will be able to do that without
 
603
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
604
    # just a Protocol subclass.
 
605
 
 
606
    # TODO: Better way of representing the body for commands that take it,
 
607
    # and allow it to be streamed into the server.
 
608
    
 
609
    def __init__(self, backing_transport):
 
610
        self._backing_transport = backing_transport
 
611
        self._converted_command = False
 
612
        self.finished_reading = False
 
613
        self._body_bytes = ''
 
614
        self.response = None
 
615
 
 
616
    def accept_body(self, bytes):
 
617
        """Accept body data.
 
618
 
 
619
        This should be overriden for each command that desired body data to
 
620
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
621
 
 
622
        The deserialisation into that format should be done in the Protocol
 
623
        object. Set self.desired_body_format to the format your method will
 
624
        handle.
 
625
        """
 
626
        # default fallback is to accumulate bytes.
 
627
        self._body_bytes += bytes
 
628
        
 
629
    def _end_of_body_handler(self):
 
630
        """An unimplemented end of body handler."""
 
631
        raise NotImplementedError(self._end_of_body_handler)
 
632
        
 
633
    def do_hello(self):
 
634
        """Answer a version request with my version."""
 
635
        return SmartServerResponse(('ok', '1'))
 
636
 
 
637
    def do_has(self, relpath):
 
638
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
639
        return SmartServerResponse((r,))
 
640
 
 
641
    def do_get(self, relpath):
 
642
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
643
        return SmartServerResponse(('ok',), backing_bytes)
 
644
 
 
645
    def _deserialise_optional_mode(self, mode):
 
646
        # XXX: FIXME this should be on the protocol object.
 
647
        if mode == '':
 
648
            return None
 
649
        else:
 
650
            return int(mode)
 
651
 
 
652
    def do_append(self, relpath, mode):
 
653
        self._converted_command = True
 
654
        self._relpath = relpath
 
655
        self._mode = self._deserialise_optional_mode(mode)
 
656
        self._end_of_body_handler = self._handle_do_append_end
 
657
    
 
658
    def _handle_do_append_end(self):
 
659
        old_length = self._backing_transport.append_bytes(
 
660
            self._relpath, self._body_bytes, self._mode)
 
661
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
662
 
 
663
    def do_delete(self, relpath):
 
664
        self._backing_transport.delete(relpath)
 
665
 
 
666
    def do_iter_files_recursive(self, abspath):
 
667
        # XXX: the path handling needs some thought.
 
668
        #relpath = self._backing_transport.relpath(abspath)
 
669
        transport = self._backing_transport.clone(abspath)
 
670
        filenames = transport.iter_files_recursive()
 
671
        return SmartServerResponse(('names',) + tuple(filenames))
 
672
 
 
673
    def do_list_dir(self, relpath):
 
674
        filenames = self._backing_transport.list_dir(relpath)
 
675
        return SmartServerResponse(('names',) + tuple(filenames))
 
676
 
 
677
    def do_mkdir(self, relpath, mode):
 
678
        self._backing_transport.mkdir(relpath,
 
679
                                      self._deserialise_optional_mode(mode))
 
680
 
 
681
    def do_move(self, rel_from, rel_to):
 
682
        self._backing_transport.move(rel_from, rel_to)
 
683
 
 
684
    def do_put(self, relpath, mode):
 
685
        self._converted_command = True
 
686
        self._relpath = relpath
 
687
        self._mode = self._deserialise_optional_mode(mode)
 
688
        self._end_of_body_handler = self._handle_do_put
 
689
 
 
690
    def _handle_do_put(self):
 
691
        self._backing_transport.put_bytes(self._relpath,
 
692
                self._body_bytes, self._mode)
 
693
        self.response = SmartServerResponse(('ok',))
 
694
 
 
695
    def _deserialise_offsets(self, text):
 
696
        # XXX: FIXME this should be on the protocol object.
 
697
        offsets = []
 
698
        for line in text.split('\n'):
 
699
            if not line:
 
700
                continue
 
701
            start, length = line.split(',')
 
702
            offsets.append((int(start), int(length)))
 
703
        return offsets
 
704
 
 
705
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
706
        self._converted_command = True
 
707
        self._end_of_body_handler = self._handle_put_non_atomic
 
708
        self._relpath = relpath
 
709
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
710
        self._mode = self._deserialise_optional_mode(mode)
 
711
        # a boolean would be nicer XXX
 
712
        self._create_parent = (create_parent == 'T')
 
713
 
 
714
    def _handle_put_non_atomic(self):
 
715
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
716
                self._body_bytes,
 
717
                mode=self._mode,
 
718
                create_parent_dir=self._create_parent,
 
719
                dir_mode=self._dir_mode)
 
720
        self.response = SmartServerResponse(('ok',))
 
721
 
 
722
    def do_readv(self, relpath):
 
723
        self._converted_command = True
 
724
        self._end_of_body_handler = self._handle_readv_offsets
 
725
        self._relpath = relpath
 
726
 
 
727
    def end_of_body(self):
 
728
        """No more body data will be received."""
 
729
        self._run_handler_code(self._end_of_body_handler, (), {})
 
730
        # cannot read after this.
 
731
        self.finished_reading = True
 
732
 
 
733
    def _handle_readv_offsets(self):
 
734
        """accept offsets for a readv request."""
 
735
        offsets = self._deserialise_offsets(self._body_bytes)
 
736
        backing_bytes = ''.join(bytes for offset, bytes in
 
737
            self._backing_transport.readv(self._relpath, offsets))
 
738
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
739
        
 
740
    def do_rename(self, rel_from, rel_to):
 
741
        self._backing_transport.rename(rel_from, rel_to)
 
742
 
 
743
    def do_rmdir(self, relpath):
 
744
        self._backing_transport.rmdir(relpath)
 
745
 
 
746
    def do_stat(self, relpath):
 
747
        stat = self._backing_transport.stat(relpath)
 
748
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
749
        
 
750
    def do_get_bundle(self, path, revision_id):
 
751
        # open transport relative to our base
 
752
        t = self._backing_transport.clone(path)
 
753
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
754
        repo = control.open_repository()
 
755
        tmpf = tempfile.TemporaryFile()
 
756
        base_revision = revision.NULL_REVISION
 
757
        write_bundle(repo, revision_id, base_revision, tmpf)
 
758
        tmpf.seek(0)
 
759
        return SmartServerResponse((), tmpf.read())
 
760
 
 
761
    def dispatch_command(self, cmd, args):
 
762
        """Deprecated compatibility method.""" # XXX XXX
 
763
        func = getattr(self, 'do_' + cmd, None)
 
764
        if func is None:
 
765
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
766
        self._run_handler_code(func, args, {})
 
767
 
 
768
    def _run_handler_code(self, callable, args, kwargs):
 
769
        """Run some handler specific code 'callable'.
 
770
 
 
771
        If a result is returned, it is considered to be the commands response,
 
772
        and finished_reading is set true, and its assigned to self.response.
 
773
 
 
774
        Any exceptions caught are translated and a response object created
 
775
        from them.
 
776
        """
 
777
        result = self._call_converting_errors(callable, args, kwargs)
 
778
        if result is not None:
 
779
            self.response = result
 
780
            self.finished_reading = True
 
781
        # handle unconverted commands
 
782
        if not self._converted_command:
 
783
            self.finished_reading = True
 
784
            if result is None:
 
785
                self.response = SmartServerResponse(('ok',))
 
786
 
 
787
    def _call_converting_errors(self, callable, args, kwargs):
 
788
        """Call callable converting errors to Response objects."""
 
789
        try:
 
790
            return callable(*args, **kwargs)
 
791
        except errors.NoSuchFile, e:
 
792
            return SmartServerResponse(('NoSuchFile', e.path))
 
793
        except errors.FileExists, e:
 
794
            return SmartServerResponse(('FileExists', e.path))
 
795
        except errors.DirectoryNotEmpty, e:
 
796
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
797
        except errors.ShortReadvError, e:
 
798
            return SmartServerResponse(('ShortReadvError',
 
799
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
800
        except UnicodeError, e:
 
801
            # If it is a DecodeError, than most likely we are starting
 
802
            # with a plain string
 
803
            str_or_unicode = e.object
 
804
            if isinstance(str_or_unicode, unicode):
 
805
                val = u'u:' + str_or_unicode
 
806
            else:
 
807
                val = u's:' + str_or_unicode.encode('base64')
 
808
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
809
            return SmartServerResponse((e.__class__.__name__,
 
810
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
811
        except errors.TransportNotPossible, e:
 
812
            if e.msg == "readonly transport":
 
813
                return SmartServerResponse(('ReadOnlyError', ))
 
814
            else:
 
815
                raise
 
816
 
 
817
 
 
818
class SmartTCPServer(object):
 
819
    """Listens on a TCP socket and accepts connections from smart clients"""
 
820
 
 
821
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
822
        """Construct a new server.
 
823
 
 
824
        To actually start it running, call either start_background_thread or
 
825
        serve.
 
826
 
 
827
        :param host: Name of the interface to listen on.
 
828
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
829
        """
 
830
        self._server_socket = socket.socket()
 
831
        self._server_socket.bind((host, port))
 
832
        self.port = self._server_socket.getsockname()[1]
 
833
        self._server_socket.listen(1)
 
834
        self._server_socket.settimeout(1)
 
835
        self.backing_transport = backing_transport
 
836
 
 
837
    def serve(self):
 
838
        # let connections timeout so that we get a chance to terminate
 
839
        # Keep a reference to the exceptions we want to catch because the socket
 
840
        # module's globals get set to None during interpreter shutdown.
 
841
        from socket import timeout as socket_timeout
 
842
        from socket import error as socket_error
 
843
        self._should_terminate = False
 
844
        while not self._should_terminate:
 
845
            try:
 
846
                self.accept_and_serve()
 
847
            except socket_timeout:
 
848
                # just check if we're asked to stop
 
849
                pass
 
850
            except socket_error, e:
 
851
                trace.warning("client disconnected: %s", e)
 
852
                pass
 
853
 
 
854
    def get_url(self):
 
855
        """Return the url of the server"""
 
856
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
857
 
 
858
    def accept_and_serve(self):
 
859
        conn, client_addr = self._server_socket.accept()
 
860
        # For WIN32, where the timeout value from the listening socket
 
861
        # propogates to the newly accepted socket.
 
862
        conn.setblocking(True)
 
863
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
864
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
865
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
866
        connection_thread.setDaemon(True)
 
867
        connection_thread.start()
 
868
 
 
869
    def start_background_thread(self):
 
870
        self._server_thread = threading.Thread(None,
 
871
                self.serve,
 
872
                name='server-' + self.get_url())
 
873
        self._server_thread.setDaemon(True)
 
874
        self._server_thread.start()
 
875
 
 
876
    def stop_background_thread(self):
 
877
        self._should_terminate = True
 
878
        # self._server_socket.close()
 
879
        # we used to join the thread, but it's not really necessary; it will
 
880
        # terminate in time
 
881
        ## self._server_thread.join()
 
882
 
 
883
 
 
884
class SmartTCPServer_for_testing(SmartTCPServer):
 
885
    """Server suitable for use by transport tests.
 
886
    
 
887
    This server is backed by the process's cwd.
 
888
    """
 
889
 
 
890
    def __init__(self):
 
891
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
892
        # The server is set up by default like for ssh access: the client
 
893
        # passes filesystem-absolute paths; therefore the server must look
 
894
        # them up relative to the root directory.  it might be better to act
 
895
        # a public server and have the server rewrite paths into the test
 
896
        # directory.
 
897
        SmartTCPServer.__init__(self,
 
898
            transport.get_transport(urlutils.local_path_to_url('/')))
 
899
        
 
900
    def setUp(self):
 
901
        """Set up server for testing"""
 
902
        self.start_background_thread()
 
903
 
 
904
    def tearDown(self):
 
905
        self.stop_background_thread()
 
906
 
 
907
    def get_url(self):
 
908
        """Return the url of the server"""
 
909
        host, port = self._server_socket.getsockname()
 
910
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
911
 
 
912
    def get_bogus_url(self):
 
913
        """Return a URL which will fail to connect"""
 
914
        return 'bzr://127.0.0.1:1/'
 
915
 
 
916
 
 
917
class SmartStat(object):
 
918
 
 
919
    def __init__(self, size, mode):
 
920
        self.st_size = size
 
921
        self.st_mode = mode
 
922
 
 
923
 
 
924
class SmartTransport(transport.Transport):
 
925
    """Connection to a smart server.
 
926
 
 
927
    The connection holds references to pipes that can be used to send requests
 
928
    to the server.
 
929
 
 
930
    The connection has a notion of the current directory to which it's
 
931
    connected; this is incorporated in filenames passed to the server.
 
932
    
 
933
    This supports some higher-level RPC operations and can also be treated 
 
934
    like a Transport to do file-like operations.
 
935
 
 
936
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
937
    or a series of http requests.  There are concrete subclasses for each
 
938
    type: SmartTCPTransport, etc.
 
939
    """
 
940
 
 
941
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
942
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
943
    # the ability to support multiple versions of the smart protocol over time:
 
944
    # SmartTransport is an adapter from the Transport object model to the 
 
945
    # SmartClient model, not an encoder.
 
946
 
 
947
    def __init__(self, url, clone_from=None, medium=None):
 
948
        """Constructor.
 
949
 
 
950
        :param medium: The medium to use for this RemoteTransport. This must be
 
951
            supplied if clone_from is None.
 
952
        """
 
953
        ### Technically super() here is faulty because Transport's __init__
 
954
        ### fails to take 2 parameters, and if super were to choose a silly
 
955
        ### initialisation order things would blow up. 
 
956
        if not url.endswith('/'):
 
957
            url += '/'
 
958
        super(SmartTransport, self).__init__(url)
 
959
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
960
                transport.split_url(url)
 
961
        if clone_from is None:
 
962
            self._medium = medium
 
963
        else:
 
964
            # credentials may be stripped from the base in some circumstances
 
965
            # as yet to be clearly defined or documented, so copy them.
 
966
            self._username = clone_from._username
 
967
            # reuse same connection
 
968
            self._medium = clone_from._medium
 
969
        assert self._medium is not None
 
970
 
 
971
    def abspath(self, relpath):
 
972
        """Return the full url to the given relative path.
 
973
        
 
974
        @param relpath: the relative path or path components
 
975
        @type relpath: str or list
 
976
        """
 
977
        return self._unparse_url(self._remote_path(relpath))
 
978
    
 
979
    def clone(self, relative_url):
 
980
        """Make a new SmartTransport related to me, sharing the same connection.
 
981
 
 
982
        This essentially opens a handle on a different remote directory.
 
983
        """
 
984
        if relative_url is None:
 
985
            return SmartTransport(self.base, self)
 
986
        else:
 
987
            return SmartTransport(self.abspath(relative_url), self)
 
988
 
 
989
    def is_readonly(self):
 
990
        """Smart server transport can do read/write file operations."""
 
991
        return False
 
992
                                                   
 
993
    def get_smart_client(self):
 
994
        return self._medium
 
995
 
 
996
    def get_smart_medium(self):
 
997
        return self._medium
 
998
                                                   
 
999
    def _unparse_url(self, path):
 
1000
        """Return URL for a path.
 
1001
 
 
1002
        :see: SFTPUrlHandling._unparse_url
 
1003
        """
 
1004
        # TODO: Eventually it should be possible to unify this with
 
1005
        # SFTPUrlHandling._unparse_url?
 
1006
        if path == '':
 
1007
            path = '/'
 
1008
        path = urllib.quote(path)
 
1009
        netloc = urllib.quote(self._host)
 
1010
        if self._username is not None:
 
1011
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1012
        if self._port is not None:
 
1013
            netloc = '%s:%d' % (netloc, self._port)
 
1014
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
1015
 
 
1016
    def _remote_path(self, relpath):
 
1017
        """Returns the Unicode version of the absolute path for relpath."""
 
1018
        return self._combine_paths(self._path, relpath)
 
1019
 
 
1020
    def _call(self, method, *args):
 
1021
        resp = self._call2(method, *args)
 
1022
        self._translate_error(resp)
 
1023
 
 
1024
    def _call2(self, method, *args):
 
1025
        """Call a method on the remote server."""
 
1026
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1027
        protocol.call(method, *args)
 
1028
        return protocol.read_response_tuple()
 
1029
 
 
1030
    def _call_with_body_bytes(self, method, args, body):
 
1031
        """Call a method on the remote server with body bytes."""
 
1032
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1033
        protocol.call_with_body_bytes((method, ) + args, body)
 
1034
        return protocol.read_response_tuple()
 
1035
 
 
1036
    def has(self, relpath):
 
1037
        """Indicate whether a remote file of the given name exists or not.
 
1038
 
 
1039
        :see: Transport.has()
 
1040
        """
 
1041
        resp = self._call2('has', self._remote_path(relpath))
 
1042
        if resp == ('yes', ):
 
1043
            return True
 
1044
        elif resp == ('no', ):
 
1045
            return False
 
1046
        else:
 
1047
            self._translate_error(resp)
 
1048
 
 
1049
    def get(self, relpath):
 
1050
        """Return file-like object reading the contents of a remote file.
 
1051
        
 
1052
        :see: Transport.get_bytes()/get_file()
 
1053
        """
 
1054
        return StringIO(self.get_bytes(relpath))
 
1055
 
 
1056
    def get_bytes(self, relpath):
 
1057
        remote = self._remote_path(relpath)
 
1058
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1059
        protocol.call('get', remote)
 
1060
        resp = protocol.read_response_tuple(True)
 
1061
        if resp != ('ok', ):
 
1062
            protocol.cancel_read_body()
 
1063
            self._translate_error(resp, relpath)
 
1064
        return protocol.read_body_bytes()
 
1065
 
 
1066
    def _serialise_optional_mode(self, mode):
 
1067
        if mode is None:
 
1068
            return ''
 
1069
        else:
 
1070
            return '%d' % mode
 
1071
 
 
1072
    def mkdir(self, relpath, mode=None):
 
1073
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1074
            self._serialise_optional_mode(mode))
 
1075
        self._translate_error(resp)
 
1076
 
 
1077
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1078
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1079
        # should probably just pass all parameters as length-delimited
 
1080
        # strings?
 
1081
        resp = self._call_with_body_bytes('put',
 
1082
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1083
            upload_contents)
 
1084
        self._translate_error(resp)
 
1085
 
 
1086
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1087
                             create_parent_dir=False,
 
1088
                             dir_mode=None):
 
1089
        """See Transport.put_bytes_non_atomic."""
 
1090
        # FIXME: no encoding in the transport!
 
1091
        create_parent_str = 'F'
 
1092
        if create_parent_dir:
 
1093
            create_parent_str = 'T'
 
1094
 
 
1095
        resp = self._call_with_body_bytes(
 
1096
            'put_non_atomic',
 
1097
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1098
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1099
            bytes)
 
1100
        self._translate_error(resp)
 
1101
 
 
1102
    def put_file(self, relpath, upload_file, mode=None):
 
1103
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1104
        # on transports not reading before failing - which is a faulty
 
1105
        # assumption I think - RBC 20060915
 
1106
        pos = upload_file.tell()
 
1107
        try:
 
1108
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1109
        except:
 
1110
            upload_file.seek(pos)
 
1111
            raise
 
1112
 
 
1113
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1114
                            create_parent_dir=False,
 
1115
                            dir_mode=None):
 
1116
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1117
                                         create_parent_dir=create_parent_dir,
 
1118
                                         dir_mode=dir_mode)
 
1119
 
 
1120
    def append_file(self, relpath, from_file, mode=None):
 
1121
        return self.append_bytes(relpath, from_file.read(), mode)
 
1122
        
 
1123
    def append_bytes(self, relpath, bytes, mode=None):
 
1124
        resp = self._call_with_body_bytes(
 
1125
            'append',
 
1126
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1127
            bytes)
 
1128
        if resp[0] == 'appended':
 
1129
            return int(resp[1])
 
1130
        self._translate_error(resp)
 
1131
 
 
1132
    def delete(self, relpath):
 
1133
        resp = self._call2('delete', self._remote_path(relpath))
 
1134
        self._translate_error(resp)
 
1135
 
 
1136
    def readv(self, relpath, offsets):
 
1137
        if not offsets:
 
1138
            return
 
1139
 
 
1140
        offsets = list(offsets)
 
1141
 
 
1142
        sorted_offsets = sorted(offsets)
 
1143
        # turn the list of offsets into a stack
 
1144
        offset_stack = iter(offsets)
 
1145
        cur_offset_and_size = offset_stack.next()
 
1146
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1147
                               limit=self._max_readv_combine,
 
1148
                               fudge_factor=self._bytes_to_read_before_seek))
 
1149
 
 
1150
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1151
        protocol.call_with_body_readv_array(
 
1152
            ('readv', self._remote_path(relpath)),
 
1153
            [(c.start, c.length) for c in coalesced])
 
1154
        resp = protocol.read_response_tuple(True)
 
1155
 
 
1156
        if resp[0] != 'readv':
 
1157
            # This should raise an exception
 
1158
            protocol.cancel_read_body()
 
1159
            self._translate_error(resp)
 
1160
            return
 
1161
 
 
1162
        # FIXME: this should know how many bytes are needed, for clarity.
 
1163
        data = protocol.read_body_bytes()
 
1164
        # Cache the results, but only until they have been fulfilled
 
1165
        data_map = {}
 
1166
        for c_offset in coalesced:
 
1167
            if len(data) < c_offset.length:
 
1168
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1169
                            c_offset.length, actual=len(data))
 
1170
            for suboffset, subsize in c_offset.ranges:
 
1171
                key = (c_offset.start+suboffset, subsize)
 
1172
                data_map[key] = data[suboffset:suboffset+subsize]
 
1173
            data = data[c_offset.length:]
 
1174
 
 
1175
            # Now that we've read some data, see if we can yield anything back
 
1176
            while cur_offset_and_size in data_map:
 
1177
                this_data = data_map.pop(cur_offset_and_size)
 
1178
                yield cur_offset_and_size[0], this_data
 
1179
                cur_offset_and_size = offset_stack.next()
 
1180
 
 
1181
    def rename(self, rel_from, rel_to):
 
1182
        self._call('rename',
 
1183
                   self._remote_path(rel_from),
 
1184
                   self._remote_path(rel_to))
 
1185
 
 
1186
    def move(self, rel_from, rel_to):
 
1187
        self._call('move',
 
1188
                   self._remote_path(rel_from),
 
1189
                   self._remote_path(rel_to))
 
1190
 
 
1191
    def rmdir(self, relpath):
 
1192
        resp = self._call('rmdir', self._remote_path(relpath))
 
1193
 
 
1194
    def _translate_error(self, resp, orig_path=None):
 
1195
        """Raise an exception from a response"""
 
1196
        if resp is None:
 
1197
            what = None
 
1198
        else:
 
1199
            what = resp[0]
 
1200
        if what == 'ok':
 
1201
            return
 
1202
        elif what == 'NoSuchFile':
 
1203
            if orig_path is not None:
 
1204
                error_path = orig_path
 
1205
            else:
 
1206
                error_path = resp[1]
 
1207
            raise errors.NoSuchFile(error_path)
 
1208
        elif what == 'error':
 
1209
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1210
        elif what == 'FileExists':
 
1211
            raise errors.FileExists(resp[1])
 
1212
        elif what == 'DirectoryNotEmpty':
 
1213
            raise errors.DirectoryNotEmpty(resp[1])
 
1214
        elif what == 'ShortReadvError':
 
1215
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1216
                                         int(resp[3]), int(resp[4]))
 
1217
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1218
            encoding = str(resp[1]) # encoding must always be a string
 
1219
            val = resp[2]
 
1220
            start = int(resp[3])
 
1221
            end = int(resp[4])
 
1222
            reason = str(resp[5]) # reason must always be a string
 
1223
            if val.startswith('u:'):
 
1224
                val = val[2:]
 
1225
            elif val.startswith('s:'):
 
1226
                val = val[2:].decode('base64')
 
1227
            if what == 'UnicodeDecodeError':
 
1228
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1229
            elif what == 'UnicodeEncodeError':
 
1230
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1231
        elif what == "ReadOnlyError":
 
1232
            raise errors.TransportNotPossible('readonly transport')
 
1233
        else:
 
1234
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1235
 
 
1236
    def disconnect(self):
 
1237
        self._medium.disconnect()
 
1238
 
 
1239
    def delete_tree(self, relpath):
 
1240
        raise errors.TransportNotPossible('readonly transport')
 
1241
 
 
1242
    def stat(self, relpath):
 
1243
        resp = self._call2('stat', self._remote_path(relpath))
 
1244
        if resp[0] == 'stat':
 
1245
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1246
        else:
 
1247
            self._translate_error(resp)
 
1248
 
 
1249
    ## def lock_read(self, relpath):
 
1250
    ##     """Lock the given file for shared (read) access.
 
1251
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1252
    ##     """
 
1253
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1254
    ##     # continue that tradition and return a bogus lock object.
 
1255
    ##     class BogusLock(object):
 
1256
    ##         def __init__(self, path):
 
1257
    ##             self.path = path
 
1258
    ##         def unlock(self):
 
1259
    ##             pass
 
1260
    ##     return BogusLock(relpath)
 
1261
 
 
1262
    def listable(self):
 
1263
        return True
 
1264
 
 
1265
    def list_dir(self, relpath):
 
1266
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1267
        if resp[0] == 'names':
 
1268
            return [name.encode('ascii') for name in resp[1:]]
 
1269
        else:
 
1270
            self._translate_error(resp)
 
1271
 
 
1272
    def iter_files_recursive(self):
 
1273
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1274
        if resp[0] == 'names':
 
1275
            return resp[1:]
 
1276
        else:
 
1277
            self._translate_error(resp)
 
1278
 
 
1279
 
 
1280
class SmartClientMediumRequest(object):
 
1281
    """A request on a SmartClientMedium.
 
1282
 
 
1283
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1284
    the response bytes to be read via read_bytes.
 
1285
 
 
1286
    For instance:
 
1287
    request.accept_bytes('123')
 
1288
    request.finished_writing()
 
1289
    result = request.read_bytes(3)
 
1290
    request.finished_reading()
 
1291
 
 
1292
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1293
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1294
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1295
    details on concurrency and pipelining.
 
1296
    """
 
1297
 
 
1298
    def __init__(self, medium):
 
1299
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1300
        self._medium = medium
 
1301
        # we track state by constants - we may want to use the same
 
1302
        # pattern as BodyReader if it gets more complex.
 
1303
        # valid states are: "writing", "reading", "done"
 
1304
        self._state = "writing"
 
1305
 
 
1306
    def accept_bytes(self, bytes):
 
1307
        """Accept bytes for inclusion in this request.
 
1308
 
 
1309
        This method may not be be called after finished_writing() has been
 
1310
        called.  It depends upon the Medium whether or not the bytes will be
 
1311
        immediately transmitted. Message based Mediums will tend to buffer the
 
1312
        bytes until finished_writing() is called.
 
1313
 
 
1314
        :param bytes: A bytestring.
 
1315
        """
 
1316
        if self._state != "writing":
 
1317
            raise errors.WritingCompleted(self)
 
1318
        self._accept_bytes(bytes)
 
1319
 
 
1320
    def _accept_bytes(self, bytes):
 
1321
        """Helper for accept_bytes.
 
1322
 
 
1323
        Accept_bytes checks the state of the request to determing if bytes
 
1324
        should be accepted. After that it hands off to _accept_bytes to do the
 
1325
        actual acceptance.
 
1326
        """
 
1327
        raise NotImplementedError(self._accept_bytes)
 
1328
 
 
1329
    def finished_reading(self):
 
1330
        """Inform the request that all desired data has been read.
 
1331
 
 
1332
        This will remove the request from the pipeline for its medium (if the
 
1333
        medium supports pipelining) and any further calls to methods on the
 
1334
        request will raise ReadingCompleted.
 
1335
        """
 
1336
        if self._state == "writing":
 
1337
            raise errors.WritingNotComplete(self)
 
1338
        if self._state != "reading":
 
1339
            raise errors.ReadingCompleted(self)
 
1340
        self._state = "done"
 
1341
        self._finished_reading()
 
1342
 
 
1343
    def _finished_reading(self):
 
1344
        """Helper for finished_reading.
 
1345
 
 
1346
        finished_reading checks the state of the request to determine if 
 
1347
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1348
        to perform the action.
 
1349
        """
 
1350
        raise NotImplementedError(self._finished_reading)
 
1351
 
 
1352
    def finished_writing(self):
 
1353
        """Finish the writing phase of this request.
 
1354
 
 
1355
        This will flush all pending data for this request along the medium.
 
1356
        After calling finished_writing, you may not call accept_bytes anymore.
 
1357
        """
 
1358
        if self._state != "writing":
 
1359
            raise errors.WritingCompleted(self)
 
1360
        self._state = "reading"
 
1361
        self._finished_writing()
 
1362
 
 
1363
    def _finished_writing(self):
 
1364
        """Helper for finished_writing.
 
1365
 
 
1366
        finished_writing checks the state of the request to determine if 
 
1367
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1368
        to perform the action.
 
1369
        """
 
1370
        raise NotImplementedError(self._finished_writing)
 
1371
 
 
1372
    def read_bytes(self, count):
 
1373
        """Read bytes from this requests response.
 
1374
 
 
1375
        This method will block and wait for count bytes to be read. It may not
 
1376
        be invoked until finished_writing() has been called - this is to ensure
 
1377
        a message-based approach to requests, for compatability with message
 
1378
        based mediums like HTTP.
 
1379
        """
 
1380
        if self._state == "writing":
 
1381
            raise errors.WritingNotComplete(self)
 
1382
        if self._state != "reading":
 
1383
            raise errors.ReadingCompleted(self)
 
1384
        return self._read_bytes(count)
 
1385
 
 
1386
    def _read_bytes(self, count):
 
1387
        """Helper for read_bytes.
 
1388
 
 
1389
        read_bytes checks the state of the request to determing if bytes
 
1390
        should be read. After that it hands off to _read_bytes to do the
 
1391
        actual read.
 
1392
        """
 
1393
        raise NotImplementedError(self._read_bytes)
 
1394
 
 
1395
 
 
1396
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1397
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1398
 
 
1399
    def __init__(self, medium):
 
1400
        SmartClientMediumRequest.__init__(self, medium)
 
1401
        # check that we are safe concurrency wise. If some streams start
 
1402
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1403
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1404
        # and the setting/unsetting of _current_request likewise moved into
 
1405
        # that class : but its unneeded overhead for now. RBC 20060922
 
1406
        if self._medium._current_request is not None:
 
1407
            raise errors.TooManyConcurrentRequests(self._medium)
 
1408
        self._medium._current_request = self
 
1409
 
 
1410
    def _accept_bytes(self, bytes):
 
1411
        """See SmartClientMediumRequest._accept_bytes.
 
1412
        
 
1413
        This forwards to self._medium._accept_bytes because we are operating
 
1414
        on the mediums stream.
 
1415
        """
 
1416
        self._medium._accept_bytes(bytes)
 
1417
 
 
1418
    def _finished_reading(self):
 
1419
        """See SmartClientMediumRequest._finished_reading.
 
1420
 
 
1421
        This clears the _current_request on self._medium to allow a new 
 
1422
        request to be created.
 
1423
        """
 
1424
        assert self._medium._current_request is self
 
1425
        self._medium._current_request = None
 
1426
        
 
1427
    def _finished_writing(self):
 
1428
        """See SmartClientMediumRequest._finished_writing.
 
1429
 
 
1430
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1431
        """
 
1432
        self._medium._flush()
 
1433
 
 
1434
    def _read_bytes(self, count):
 
1435
        """See SmartClientMediumRequest._read_bytes.
 
1436
        
 
1437
        This forwards to self._medium._read_bytes because we are operating
 
1438
        on the mediums stream.
 
1439
        """
 
1440
        return self._medium._read_bytes(count)
 
1441
 
 
1442
 
 
1443
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1444
    """The client-side protocol for smart version 1."""
 
1445
 
 
1446
    def __init__(self, request):
 
1447
        """Construct a SmartClientRequestProtocolOne.
 
1448
 
 
1449
        :param request: A SmartClientMediumRequest to serialise onto and
 
1450
            deserialise from.
 
1451
        """
 
1452
        self._request = request
 
1453
        self._body_buffer = None
 
1454
 
 
1455
    def call(self, *args):
 
1456
        bytes = _encode_tuple(args)
 
1457
        self._request.accept_bytes(bytes)
 
1458
        self._request.finished_writing()
 
1459
 
 
1460
    def call_with_body_bytes(self, args, body):
 
1461
        """Make a remote call of args with body bytes 'body'.
 
1462
 
 
1463
        After calling this, call read_response_tuple to find the result out.
 
1464
        """
 
1465
        bytes = _encode_tuple(args)
 
1466
        self._request.accept_bytes(bytes)
 
1467
        bytes = self._encode_bulk_data(body)
 
1468
        self._request.accept_bytes(bytes)
 
1469
        self._request.finished_writing()
 
1470
 
 
1471
    def call_with_body_readv_array(self, args, body):
 
1472
        """Make a remote call with a readv array.
 
1473
 
 
1474
        The body is encoded with one line per readv offset pair. The numbers in
 
1475
        each pair are separated by a comma, and no trailing \n is emitted.
 
1476
        """
 
1477
        bytes = _encode_tuple(args)
 
1478
        self._request.accept_bytes(bytes)
 
1479
        readv_bytes = self._serialise_offsets(body)
 
1480
        bytes = self._encode_bulk_data(readv_bytes)
 
1481
        self._request.accept_bytes(bytes)
 
1482
        self._request.finished_writing()
 
1483
 
 
1484
    def cancel_read_body(self):
 
1485
        """After expecting a body, a response code may indicate one otherwise.
 
1486
 
 
1487
        This method lets the domain client inform the protocol that no body
 
1488
        will be transmitted. This is a terminal method: after calling it the
 
1489
        protocol is not able to be used further.
 
1490
        """
 
1491
        self._request.finished_reading()
 
1492
 
 
1493
    def read_response_tuple(self, expect_body=False):
 
1494
        """Read a response tuple from the wire.
 
1495
 
 
1496
        This should only be called once.
 
1497
        """
 
1498
        result = self._recv_tuple()
 
1499
        if not expect_body:
 
1500
            self._request.finished_reading()
 
1501
        return result
 
1502
 
 
1503
    def read_body_bytes(self, count=-1):
 
1504
        """Read bytes from the body, decoding into a byte stream.
 
1505
        
 
1506
        We read all bytes at once to ensure we've checked the trailer for 
 
1507
        errors, and then feed the buffer back as read_body_bytes is called.
 
1508
        """
 
1509
        if self._body_buffer is not None:
 
1510
            return self._body_buffer.read(count)
 
1511
        _body_decoder = LengthPrefixedBodyDecoder()
 
1512
 
 
1513
        while not _body_decoder.finished_reading:
 
1514
            bytes_wanted = _body_decoder.next_read_size()
 
1515
            bytes = self._request.read_bytes(bytes_wanted)
 
1516
            _body_decoder.accept_bytes(bytes)
 
1517
        self._request.finished_reading()
 
1518
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1519
        # XXX: TODO check the trailer result.
 
1520
        return self._body_buffer.read(count)
 
1521
 
 
1522
    def _recv_tuple(self):
 
1523
        """Receive a tuple from the medium request."""
 
1524
        line = ''
 
1525
        while not line or line[-1] != '\n':
 
1526
            # TODO: this is inefficient - but tuples are short.
 
1527
            new_char = self._request.read_bytes(1)
 
1528
            line += new_char
 
1529
            assert new_char != '', "end of file reading from server."
 
1530
        return _decode_tuple(line)
 
1531
 
 
1532
    def query_version(self):
 
1533
        """Return protocol version number of the server."""
 
1534
        self.call('hello')
 
1535
        resp = self.read_response_tuple()
 
1536
        if resp == ('ok', '1'):
 
1537
            return 1
 
1538
        else:
 
1539
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1540
 
 
1541
 
 
1542
class SmartClientMedium(object):
 
1543
    """Smart client is a medium for sending smart protocol requests over."""
 
1544
 
 
1545
    def disconnect(self):
 
1546
        """If this medium maintains a persistent connection, close it.
 
1547
        
 
1548
        The default implementation does nothing.
 
1549
        """
 
1550
        
 
1551
 
 
1552
class SmartClientStreamMedium(SmartClientMedium):
 
1553
    """Stream based medium common class.
 
1554
 
 
1555
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1556
    SmartClientStreamMediumRequest for their requests, and should implement
 
1557
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1558
    receive bytes.
 
1559
    """
 
1560
 
 
1561
    def __init__(self):
 
1562
        self._current_request = None
 
1563
 
 
1564
    def accept_bytes(self, bytes):
 
1565
        self._accept_bytes(bytes)
 
1566
 
 
1567
    def __del__(self):
 
1568
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1569
        finished with it.
 
1570
        """
 
1571
        self.disconnect()
 
1572
 
 
1573
    def _flush(self):
 
1574
        """Flush the output stream.
 
1575
        
 
1576
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1577
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1578
        """
 
1579
        raise NotImplementedError(self._flush)
 
1580
 
 
1581
    def get_request(self):
 
1582
        """See SmartClientMedium.get_request().
 
1583
 
 
1584
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1585
        for get_request.
 
1586
        """
 
1587
        return SmartClientStreamMediumRequest(self)
 
1588
 
 
1589
    def read_bytes(self, count):
 
1590
        return self._read_bytes(count)
 
1591
 
 
1592
 
 
1593
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1594
    """A client medium using simple pipes.
 
1595
    
 
1596
    This client does not manage the pipes: it assumes they will always be open.
 
1597
    """
 
1598
 
 
1599
    def __init__(self, readable_pipe, writeable_pipe):
 
1600
        SmartClientStreamMedium.__init__(self)
 
1601
        self._readable_pipe = readable_pipe
 
1602
        self._writeable_pipe = writeable_pipe
 
1603
 
 
1604
    def _accept_bytes(self, bytes):
 
1605
        """See SmartClientStreamMedium.accept_bytes."""
 
1606
        self._writeable_pipe.write(bytes)
 
1607
 
 
1608
    def _flush(self):
 
1609
        """See SmartClientStreamMedium._flush()."""
 
1610
        self._writeable_pipe.flush()
 
1611
 
 
1612
    def _read_bytes(self, count):
 
1613
        """See SmartClientStreamMedium._read_bytes."""
 
1614
        return self._readable_pipe.read(count)
 
1615
 
 
1616
 
 
1617
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1618
    """A client medium using SSH."""
 
1619
    
 
1620
    def __init__(self, host, port=None, username=None, password=None,
 
1621
            vendor=None):
 
1622
        """Creates a client that will connect on the first use.
 
1623
        
 
1624
        :param vendor: An optional override for the ssh vendor to use. See
 
1625
            bzrlib.transport.ssh for details on ssh vendors.
 
1626
        """
 
1627
        SmartClientStreamMedium.__init__(self)
 
1628
        self._connected = False
 
1629
        self._host = host
 
1630
        self._password = password
 
1631
        self._port = port
 
1632
        self._username = username
 
1633
        self._read_from = None
 
1634
        self._ssh_connection = None
 
1635
        self._vendor = vendor
 
1636
        self._write_to = None
 
1637
 
 
1638
    def _accept_bytes(self, bytes):
 
1639
        """See SmartClientStreamMedium.accept_bytes."""
 
1640
        self._ensure_connection()
 
1641
        self._write_to.write(bytes)
 
1642
 
 
1643
    def disconnect(self):
 
1644
        """See SmartClientMedium.disconnect()."""
 
1645
        if not self._connected:
 
1646
            return
 
1647
        self._read_from.close()
 
1648
        self._write_to.close()
 
1649
        self._ssh_connection.close()
 
1650
        self._connected = False
 
1651
 
 
1652
    def _ensure_connection(self):
 
1653
        """Connect this medium if not already connected."""
 
1654
        if self._connected:
 
1655
            return
 
1656
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1657
        if self._vendor is None:
 
1658
            vendor = ssh._get_ssh_vendor()
 
1659
        else:
 
1660
            vendor = self._vendor
 
1661
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1662
                self._password, self._host, self._port,
 
1663
                command=[executable, 'serve', '--inet', '--directory=/',
 
1664
                         '--allow-writes'])
 
1665
        self._read_from, self._write_to = \
 
1666
            self._ssh_connection.get_filelike_channels()
 
1667
        self._connected = True
 
1668
 
 
1669
    def _flush(self):
 
1670
        """See SmartClientStreamMedium._flush()."""
 
1671
        self._write_to.flush()
 
1672
 
 
1673
    def _read_bytes(self, count):
 
1674
        """See SmartClientStreamMedium.read_bytes."""
 
1675
        if not self._connected:
 
1676
            raise errors.MediumNotConnected(self)
 
1677
        return self._read_from.read(count)
 
1678
 
 
1679
 
 
1680
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1681
    """A client medium using TCP."""
 
1682
    
 
1683
    def __init__(self, host, port):
 
1684
        """Creates a client that will connect on the first use."""
 
1685
        SmartClientStreamMedium.__init__(self)
 
1686
        self._connected = False
 
1687
        self._host = host
 
1688
        self._port = port
 
1689
        self._socket = None
 
1690
 
 
1691
    def _accept_bytes(self, bytes):
 
1692
        """See SmartClientMedium.accept_bytes."""
 
1693
        self._ensure_connection()
 
1694
        self._socket.sendall(bytes)
 
1695
 
 
1696
    def disconnect(self):
 
1697
        """See SmartClientMedium.disconnect()."""
 
1698
        if not self._connected:
 
1699
            return
 
1700
        self._socket.close()
 
1701
        self._socket = None
 
1702
        self._connected = False
 
1703
 
 
1704
    def _ensure_connection(self):
 
1705
        """Connect this medium if not already connected."""
 
1706
        if self._connected:
 
1707
            return
 
1708
        self._socket = socket.socket()
 
1709
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1710
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1711
        if result:
 
1712
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1713
                    (self._host, self._port, os.strerror(result)))
 
1714
        self._connected = True
 
1715
 
 
1716
    def _flush(self):
 
1717
        """See SmartClientStreamMedium._flush().
 
1718
        
 
1719
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1720
        add a means to do a flush, but that can be done in the future.
 
1721
        """
 
1722
 
 
1723
    def _read_bytes(self, count):
 
1724
        """See SmartClientMedium.read_bytes."""
 
1725
        if not self._connected:
 
1726
            raise errors.MediumNotConnected(self)
 
1727
        return self._socket.recv(count)
 
1728
 
 
1729
 
 
1730
class SmartTCPTransport(SmartTransport):
 
1731
    """Connection to smart server over plain tcp.
 
1732
    
 
1733
    This is essentially just a factory to get 'RemoteTransport(url,
 
1734
        SmartTCPClientMedium).
 
1735
    """
 
1736
 
 
1737
    def __init__(self, url):
 
1738
        _scheme, _username, _password, _host, _port, _path = \
 
1739
            transport.split_url(url)
 
1740
        try:
 
1741
            _port = int(_port)
 
1742
        except (ValueError, TypeError), e:
 
1743
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1744
        medium = SmartTCPClientMedium(_host, _port)
 
1745
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1746
 
 
1747
 
 
1748
class SmartSSHTransport(SmartTransport):
 
1749
    """Connection to smart server over SSH.
 
1750
 
 
1751
    This is essentially just a factory to get 'RemoteTransport(url,
 
1752
        SmartSSHClientMedium).
 
1753
    """
 
1754
 
 
1755
    def __init__(self, url):
 
1756
        _scheme, _username, _password, _host, _port, _path = \
 
1757
            transport.split_url(url)
 
1758
        try:
 
1759
            if _port is not None:
 
1760
                _port = int(_port)
 
1761
        except (ValueError, TypeError), e:
 
1762
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1763
                _port)
 
1764
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1765
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1766
 
 
1767
 
 
1768
def get_test_permutations():
 
1769
    """Return (transport, server) permutations for testing."""
 
1770
    ### We may need a little more test framework support to construct an
 
1771
    ### appropriate RemoteTransport in the future.
 
1772
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]