~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: John Arbash Meinel
  • Date: 2006-10-13 06:41:04 UTC
  • mfrom: (2018.2.29 hpss HTTP support)
  • mto: This revision was merged to the branch mainline in revision 2075.
  • Revision ID: john@arbash-meinel.com-20061013064104-57dbb43c2ffa55ff
(Andrew Bennetts) Implement bzr+http which lets us tunnel the smart protocol through an http connection

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
 
23
23
  SEP := '\001'
24
24
    Fields are separated by Ctrl-A.
25
 
  BULK_DATA := CHUNK+ TRAILER
 
25
  BULK_DATA := CHUNK TRAILER
26
26
    Chunks can be repeated as many times as necessary.
27
27
  CHUNK := CHUNK_LEN CHUNK_BODY
28
28
  CHUNK_LEN := DIGIT+ NEWLINE
46
46
URLs that include ~ should probably be passed across to the server verbatim
47
47
and the server can expand them.  This will proably not be meaningful when 
48
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
49
109
"""
50
110
 
51
111
 
133
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
134
194
# branch doing file-level operations.
135
195
#
136
 
# TODO: jam 20060915 _decode_tuple is acting directly on input over
137
 
#       the socket, and it assumes everything is UTF8 sections separated
138
 
#       by \001. Which means a request like '\002' Will abort the connection
139
 
#       because of a UnicodeDecodeError. It does look like invalid data will
140
 
#       kill the SmartStreamServer, but only with an abort + exception, and 
141
 
#       the overall server shouldn't die.
142
196
 
143
197
from cStringIO import StringIO
144
 
import errno
145
198
import os
146
199
import socket
147
 
import sys
148
200
import tempfile
149
201
import threading
150
202
import urllib
159
211
    urlutils,
160
212
    )
161
213
from bzrlib.bundle.serializer import write_bundle
162
 
from bzrlib.trace import mutter
163
 
from bzrlib.transport import local
 
214
try:
 
215
    from bzrlib.transport import ssh
 
216
except errors.ParamikoNotPresent:
 
217
    # no paramiko.  SmartSSHClientMedium will break.
 
218
    pass
164
219
 
165
220
# must do this otherwise urllib can't parse the urls properly :(
166
221
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
178
233
        return None
179
234
    if req_line[-1] != '\n':
180
235
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
181
 
    return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
236
    try:
 
237
        return tuple((a.decode('utf-8') for a in req_line[:-1].split('\x01')))
 
238
    except UnicodeDecodeError:
 
239
        raise errors.SmartProtocolError(
 
240
            "one or more arguments of request %r are not valid UTF-8"
 
241
            % req_line)
182
242
 
183
243
 
184
244
def _encode_tuple(args):
189
249
class SmartProtocolBase(object):
190
250
    """Methods common to client and server"""
191
251
 
192
 
    def _send_bulk_data(self, body):
193
 
        """Send chunked body data"""
194
 
        assert isinstance(body, str)
195
 
        bytes = ''.join(('%d\n' % len(body), body, 'done\n'))
196
 
        self._write_and_flush(bytes)
197
 
 
198
 
    # TODO: this only actually accomodates a single block; possibly should support
199
 
    # multiple chunks?
200
 
    def _recv_bulk(self):
201
 
        chunk_len = self._in.readline()
202
 
        try:
203
 
            chunk_len = int(chunk_len)
204
 
        except ValueError:
205
 
            raise errors.SmartProtocolError("bad chunk length line %r" % chunk_len)
206
 
        bulk = self._in.read(chunk_len)
207
 
        if len(bulk) != chunk_len:
208
 
            raise errors.SmartProtocolError("short read fetching bulk data chunk")
209
 
        self._recv_trailer()
210
 
        return bulk
211
 
 
212
 
    def _recv_tuple(self):
213
 
        return _recv_tuple(self._in)
214
 
 
215
 
    def _recv_trailer(self):
216
 
        resp = self._recv_tuple()
217
 
        if resp == ('done', ):
218
 
            return
219
 
        else:
220
 
            self._translate_error(resp)
 
252
    # TODO: this only actually accomodates a single block; possibly should
 
253
    # support multiple chunks?
 
254
    def _encode_bulk_data(self, body):
 
255
        """Encode body as a bulk data chunk."""
 
256
        return ''.join(('%d\n' % len(body), body, 'done\n'))
221
257
 
222
258
    def _serialise_offsets(self, offsets):
223
259
        """Serialise a readv offset list."""
225
261
        for start, length in offsets:
226
262
            txt.append('%d,%d' % (start, length))
227
263
        return '\n'.join(txt)
228
 
 
229
 
    def _write_and_flush(self, bytes):
230
 
        """Write bytes to self._out and flush it."""
231
 
        # XXX: this will be inefficient.  Just ask Robert.
232
 
        self._out.write(bytes)
233
 
        self._out.flush()
234
 
 
235
 
 
236
 
class SmartStreamServer(SmartProtocolBase):
 
264
        
 
265
 
 
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
267
    """Server-side encoding and decoding logic for smart version 1."""
 
268
    
 
269
    def __init__(self, backing_transport, write_func):
 
270
        self._backing_transport = backing_transport
 
271
        self.excess_buffer = ''
 
272
        self._finished_reading = False
 
273
        self.in_buffer = ''
 
274
        self.has_dispatched = False
 
275
        self.request = None
 
276
        self._body_decoder = None
 
277
        self._write_func = write_func
 
278
 
 
279
    def accept_bytes(self, bytes):
 
280
        """Take bytes, and advance the internal state machine appropriately.
 
281
        
 
282
        :param bytes: must be a byte string
 
283
        """
 
284
        assert isinstance(bytes, str)
 
285
        self.in_buffer += bytes
 
286
        if not self.has_dispatched:
 
287
            if '\n' not in self.in_buffer:
 
288
                # no command line yet
 
289
                return
 
290
            self.has_dispatched = True
 
291
            try:
 
292
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
293
                first_line += '\n'
 
294
                req_args = _decode_tuple(first_line)
 
295
                self.request = SmartServerRequestHandler(
 
296
                    self._backing_transport)
 
297
                self.request.dispatch_command(req_args[0], req_args[1:])
 
298
                if self.request.finished_reading:
 
299
                    # trivial request
 
300
                    self.excess_buffer = self.in_buffer
 
301
                    self.in_buffer = ''
 
302
                    self._send_response(self.request.response.args,
 
303
                        self.request.response.body)
 
304
                self.sync_with_request(self.request)
 
305
            except KeyboardInterrupt:
 
306
                raise
 
307
            except Exception, exception:
 
308
                # everything else: pass to client, flush, and quit
 
309
                self._send_response(('error', str(exception)))
 
310
                return None
 
311
 
 
312
        if self.has_dispatched:
 
313
            if self._finished_reading:
 
314
                # nothing to do.XXX: this routine should be a single state 
 
315
                # machine too.
 
316
                self.excess_buffer += self.in_buffer
 
317
                self.in_buffer = ''
 
318
                return
 
319
            if self._body_decoder is None:
 
320
                self._body_decoder = LengthPrefixedBodyDecoder()
 
321
            self._body_decoder.accept_bytes(self.in_buffer)
 
322
            self.in_buffer = self._body_decoder.unused_data
 
323
            body_data = self._body_decoder.read_pending_data()
 
324
            self.request.accept_body(body_data)
 
325
            if self._body_decoder.finished_reading:
 
326
                self.request.end_of_body()
 
327
                assert self.request.finished_reading, \
 
328
                    "no more body, request not finished"
 
329
            self.sync_with_request(self.request)
 
330
            if self.request.response is not None:
 
331
                self._send_response(self.request.response.args,
 
332
                    self.request.response.body)
 
333
                self.excess_buffer = self.in_buffer
 
334
                self.in_buffer = ''
 
335
            else:
 
336
                assert not self.request.finished_reading, \
 
337
                    "no response and we have finished reading."
 
338
 
 
339
    def _send_response(self, args, body=None):
 
340
        """Send a smart server response down the output stream."""
 
341
        self._write_func(_encode_tuple(args))
 
342
        if body is not None:
 
343
            assert isinstance(body, str), 'body must be a str'
 
344
            bytes = self._encode_bulk_data(body)
 
345
            self._write_func(bytes)
 
346
 
 
347
    def sync_with_request(self, request):
 
348
        self._finished_reading = request.finished_reading
 
349
        
 
350
    def next_read_size(self):
 
351
        if self._finished_reading:
 
352
            return 0
 
353
        if self._body_decoder is None:
 
354
            return 1
 
355
        else:
 
356
            return self._body_decoder.next_read_size()
 
357
 
 
358
 
 
359
class LengthPrefixedBodyDecoder(object):
 
360
    """Decodes the length-prefixed bulk data."""
 
361
    
 
362
    def __init__(self):
 
363
        self.bytes_left = None
 
364
        self.finished_reading = False
 
365
        self.unused_data = ''
 
366
        self.state_accept = self._state_accept_expecting_length
 
367
        self.state_read = self._state_read_no_data
 
368
        self._in_buffer = ''
 
369
        self._trailer_buffer = ''
 
370
    
 
371
    def accept_bytes(self, bytes):
 
372
        """Decode as much of bytes as possible.
 
373
 
 
374
        If 'bytes' contains too much data it will be appended to
 
375
        self.unused_data.
 
376
 
 
377
        finished_reading will be set when no more data is required.  Further
 
378
        data will be appended to self.unused_data.
 
379
        """
 
380
        # accept_bytes is allowed to change the state
 
381
        current_state = self.state_accept
 
382
        self.state_accept(bytes)
 
383
        while current_state != self.state_accept:
 
384
            current_state = self.state_accept
 
385
            self.state_accept('')
 
386
 
 
387
    def next_read_size(self):
 
388
        if self.bytes_left is not None:
 
389
            # Ideally we want to read all the remainder of the body and the
 
390
            # trailer in one go.
 
391
            return self.bytes_left + 5
 
392
        elif self.state_accept == self._state_accept_reading_trailer:
 
393
            # Just the trailer left
 
394
            return 5 - len(self._trailer_buffer)
 
395
        elif self.state_accept == self._state_accept_expecting_length:
 
396
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
397
            # 'done\n').
 
398
            return 6
 
399
        else:
 
400
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
401
            return 1
 
402
        
 
403
    def read_pending_data(self):
 
404
        """Return any pending data that has been decoded."""
 
405
        return self.state_read()
 
406
 
 
407
    def _state_accept_expecting_length(self, bytes):
 
408
        self._in_buffer += bytes
 
409
        pos = self._in_buffer.find('\n')
 
410
        if pos == -1:
 
411
            return
 
412
        self.bytes_left = int(self._in_buffer[:pos])
 
413
        self._in_buffer = self._in_buffer[pos+1:]
 
414
        self.bytes_left -= len(self._in_buffer)
 
415
        self.state_accept = self._state_accept_reading_body
 
416
        self.state_read = self._state_read_in_buffer
 
417
 
 
418
    def _state_accept_reading_body(self, bytes):
 
419
        self._in_buffer += bytes
 
420
        self.bytes_left -= len(bytes)
 
421
        if self.bytes_left <= 0:
 
422
            # Finished with body
 
423
            if self.bytes_left != 0:
 
424
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
425
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
426
            self.bytes_left = None
 
427
            self.state_accept = self._state_accept_reading_trailer
 
428
        
 
429
    def _state_accept_reading_trailer(self, bytes):
 
430
        self._trailer_buffer += bytes
 
431
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
432
        # a ProtocolViolation exception?
 
433
        if self._trailer_buffer.startswith('done\n'):
 
434
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
435
            self.state_accept = self._state_accept_reading_unused
 
436
            self.finished_reading = True
 
437
    
 
438
    def _state_accept_reading_unused(self, bytes):
 
439
        self.unused_data += bytes
 
440
 
 
441
    def _state_read_no_data(self):
 
442
        return ''
 
443
 
 
444
    def _state_read_in_buffer(self):
 
445
        result = self._in_buffer
 
446
        self._in_buffer = ''
 
447
        return result
 
448
 
 
449
 
 
450
class SmartServerStreamMedium(object):
237
451
    """Handles smart commands coming over a stream.
238
452
 
239
453
    The stream may be a pipe connected to sshd, or a tcp socket, or an
246
460
    which will typically be a LocalTransport looking at the server's filesystem.
247
461
    """
248
462
 
 
463
    def __init__(self, backing_transport):
 
464
        """Construct new server.
 
465
 
 
466
        :param backing_transport: Transport for the directory served.
 
467
        """
 
468
        # backing_transport could be passed to serve instead of __init__
 
469
        self.backing_transport = backing_transport
 
470
        self.finished = False
 
471
 
 
472
    def serve(self):
 
473
        """Serve requests until the client disconnects."""
 
474
        # Keep a reference to stderr because the sys module's globals get set to
 
475
        # None during interpreter shutdown.
 
476
        from sys import stderr
 
477
        try:
 
478
            while not self.finished:
 
479
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
480
                                                         self._write_out)
 
481
                self._serve_one_request(protocol)
 
482
        except Exception, e:
 
483
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
484
            raise
 
485
 
 
486
    def _serve_one_request(self, protocol):
 
487
        """Read one request from input, process, send back a response.
 
488
        
 
489
        :param protocol: a SmartServerRequestProtocol.
 
490
        """
 
491
        try:
 
492
            self._serve_one_request_unguarded(protocol)
 
493
        except KeyboardInterrupt:
 
494
            raise
 
495
        except Exception, e:
 
496
            self.terminate_due_to_error()
 
497
 
 
498
    def terminate_due_to_error(self):
 
499
        """Called when an unhandled exception from the protocol occurs."""
 
500
        raise NotImplementedError(self.terminate_due_to_error)
 
501
 
 
502
 
 
503
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
504
 
 
505
    def __init__(self, sock, backing_transport):
 
506
        """Constructor.
 
507
 
 
508
        :param sock: the socket the server will read from.  It will be put
 
509
            into blocking mode.
 
510
        """
 
511
        SmartServerStreamMedium.__init__(self, backing_transport)
 
512
        self.push_back = ''
 
513
        sock.setblocking(True)
 
514
        self.socket = sock
 
515
 
 
516
    def _serve_one_request_unguarded(self, protocol):
 
517
        while protocol.next_read_size():
 
518
            if self.push_back:
 
519
                protocol.accept_bytes(self.push_back)
 
520
                self.push_back = ''
 
521
            else:
 
522
                bytes = self.socket.recv(4096)
 
523
                if bytes == '':
 
524
                    self.finished = True
 
525
                    return
 
526
                protocol.accept_bytes(bytes)
 
527
        
 
528
        self.push_back = protocol.excess_buffer
 
529
    
 
530
    def terminate_due_to_error(self):
 
531
        """Called when an unhandled exception from the protocol occurs."""
 
532
        # TODO: This should log to a server log file, but no such thing
 
533
        # exists yet.  Andrew Bennetts 2006-09-29.
 
534
        self.socket.close()
 
535
        self.finished = True
 
536
 
 
537
    def _write_out(self, bytes):
 
538
        self.socket.sendall(bytes)
 
539
 
 
540
 
 
541
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
542
 
249
543
    def __init__(self, in_file, out_file, backing_transport):
250
544
        """Construct new server.
251
545
 
253
547
        :param out_file: Python file to write responses.
254
548
        :param backing_transport: Transport for the directory served.
255
549
        """
 
550
        SmartServerStreamMedium.__init__(self, backing_transport)
256
551
        self._in = in_file
257
552
        self._out = out_file
258
 
        self.smart_server = SmartServer(backing_transport)
259
 
        # server can call back to us to get bulk data - this is not really
260
 
        # ideal, they should get it per request instead
261
 
        self.smart_server._recv_body = self._recv_bulk
262
 
 
263
 
    def _recv_tuple(self):
264
 
        """Read a request from the client and return as a tuple.
265
 
        
266
 
        Returns None at end of file (if the client closed the connection.)
267
 
        """
268
 
        return _recv_tuple(self._in)
269
 
 
270
 
    def _send_tuple(self, args):
271
 
        """Send response header"""
272
 
        return self._write_and_flush(_encode_tuple(args))
273
 
 
274
 
    def _send_error_and_disconnect(self, exception):
275
 
        self._send_tuple(('error', str(exception)))
276
 
        ## self._out.close()
277
 
        ## self._in.close()
278
 
 
279
 
    def _serve_one_request(self):
280
 
        """Read one request from input, process, send back a response.
281
 
        
282
 
        :return: False if the server should terminate, otherwise None.
283
 
        """
284
 
        req_args = self._recv_tuple()
285
 
        if req_args == None:
286
 
            # client closed connection
287
 
            return False  # shutdown server
288
 
        try:
289
 
            response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
290
 
            self._send_tuple(response.args)
291
 
            if response.body is not None:
292
 
                self._send_bulk_data(response.body)
293
 
        except KeyboardInterrupt:
294
 
            raise
295
 
        except Exception, e:
296
 
            # everything else: pass to client, flush, and quit
297
 
            self._send_error_and_disconnect(e)
298
 
            return False
299
 
 
300
 
    def serve(self):
301
 
        """Serve requests until the client disconnects."""
302
 
        # Keep a reference to stderr because the sys module's globals get set to
303
 
        # None during interpreter shutdown.
304
 
        from sys import stderr
305
 
        try:
306
 
            while self._serve_one_request() != False:
307
 
                pass
308
 
        except Exception, e:
309
 
            stderr.write("%s terminating on exception %s\n" % (self, e))
310
 
            raise
 
553
 
 
554
    def _serve_one_request_unguarded(self, protocol):
 
555
        while True:
 
556
            bytes_to_read = protocol.next_read_size()
 
557
            if bytes_to_read == 0:
 
558
                # Finished serving this request.
 
559
                self._out.flush()
 
560
                return
 
561
            bytes = self._in.read(bytes_to_read)
 
562
            if bytes == '':
 
563
                # Connection has been closed.
 
564
                self.finished = True
 
565
                self._out.flush()
 
566
                return
 
567
            protocol.accept_bytes(bytes)
 
568
 
 
569
    def terminate_due_to_error(self):
 
570
        # TODO: This should log to a server log file, but no such thing
 
571
        # exists yet.  Andrew Bennetts 2006-09-29.
 
572
        self._out.close()
 
573
        self.finished = True
 
574
 
 
575
    def _write_out(self, bytes):
 
576
        self._out.write(bytes)
311
577
 
312
578
 
313
579
class SmartServerResponse(object):
314
 
    """Response generated by SmartServer."""
 
580
    """Response generated by SmartServerRequestHandler."""
315
581
 
316
582
    def __init__(self, args, body=None):
317
583
        self.args = args
318
584
        self.body = body
319
585
 
320
 
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
 
586
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
321
587
# for delivering the data for a request. This could be done with as the
322
588
# StreamServer, though that would create conflation between request and response
323
589
# which may be undesirable.
324
590
 
325
591
 
326
 
class SmartServer(object):
 
592
class SmartServerRequestHandler(object):
327
593
    """Protocol logic for smart server.
328
594
    
329
595
    This doesn't handle serialization at all, it just processes requests and
330
596
    creates responses.
331
597
    """
332
598
 
333
 
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
 
    # encoding or decoding logic to allow the wire protocol to vary from the
335
 
    # object protocol: we will want to tweak the wire protocol separate from
336
 
    # the object model, and ideally we will be able to do that without having
337
 
    # a SmartServer subclass for each wire protocol, rather just a Protocol
338
 
    # subclass.
 
599
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
600
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
601
    # from the object protocol: we will want to tweak the wire protocol separate
 
602
    # from the object model, and ideally we will be able to do that without
 
603
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
604
    # just a Protocol subclass.
339
605
 
340
606
    # TODO: Better way of representing the body for commands that take it,
341
607
    # and allow it to be streamed into the server.
342
608
    
343
609
    def __init__(self, backing_transport):
344
610
        self._backing_transport = backing_transport
 
611
        self._converted_command = False
 
612
        self.finished_reading = False
 
613
        self._body_bytes = ''
 
614
        self.response = None
 
615
 
 
616
    def accept_body(self, bytes):
 
617
        """Accept body data.
 
618
 
 
619
        This should be overriden for each command that desired body data to
 
620
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
621
 
 
622
        The deserialisation into that format should be done in the Protocol
 
623
        object. Set self.desired_body_format to the format your method will
 
624
        handle.
 
625
        """
 
626
        # default fallback is to accumulate bytes.
 
627
        self._body_bytes += bytes
 
628
        
 
629
    def _end_of_body_handler(self):
 
630
        """An unimplemented end of body handler."""
 
631
        raise NotImplementedError(self._end_of_body_handler)
345
632
        
346
633
    def do_hello(self):
347
634
        """Answer a version request with my version."""
363
650
            return int(mode)
364
651
 
365
652
    def do_append(self, relpath, mode):
 
653
        self._converted_command = True
 
654
        self._relpath = relpath
 
655
        self._mode = self._deserialise_optional_mode(mode)
 
656
        self._end_of_body_handler = self._handle_do_append_end
 
657
    
 
658
    def _handle_do_append_end(self):
366
659
        old_length = self._backing_transport.append_bytes(
367
 
            relpath, self._recv_body(), self._deserialise_optional_mode(mode))
368
 
        return SmartServerResponse(('appended', '%d' % old_length))
 
660
            self._relpath, self._body_bytes, self._mode)
 
661
        self.response = SmartServerResponse(('appended', '%d' % old_length))
369
662
 
370
663
    def do_delete(self, relpath):
371
664
        self._backing_transport.delete(relpath)
389
682
        self._backing_transport.move(rel_from, rel_to)
390
683
 
391
684
    def do_put(self, relpath, mode):
392
 
        self._backing_transport.put_bytes(relpath,
393
 
                self._recv_body(),
394
 
                self._deserialise_optional_mode(mode))
 
685
        self._converted_command = True
 
686
        self._relpath = relpath
 
687
        self._mode = self._deserialise_optional_mode(mode)
 
688
        self._end_of_body_handler = self._handle_do_put
 
689
 
 
690
    def _handle_do_put(self):
 
691
        self._backing_transport.put_bytes(self._relpath,
 
692
                self._body_bytes, self._mode)
 
693
        self.response = SmartServerResponse(('ok',))
395
694
 
396
695
    def _deserialise_offsets(self, text):
397
696
        # XXX: FIXME this should be on the protocol object.
404
703
        return offsets
405
704
 
406
705
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
407
 
        create_parent_dir = (create_parent == 'T')
408
 
        self._backing_transport.put_bytes_non_atomic(relpath,
409
 
                self._recv_body(),
410
 
                mode=self._deserialise_optional_mode(mode),
411
 
                create_parent_dir=create_parent_dir,
412
 
                dir_mode=self._deserialise_optional_mode(dir_mode))
 
706
        self._converted_command = True
 
707
        self._end_of_body_handler = self._handle_put_non_atomic
 
708
        self._relpath = relpath
 
709
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
710
        self._mode = self._deserialise_optional_mode(mode)
 
711
        # a boolean would be nicer XXX
 
712
        self._create_parent = (create_parent == 'T')
 
713
 
 
714
    def _handle_put_non_atomic(self):
 
715
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
716
                self._body_bytes,
 
717
                mode=self._mode,
 
718
                create_parent_dir=self._create_parent,
 
719
                dir_mode=self._dir_mode)
 
720
        self.response = SmartServerResponse(('ok',))
413
721
 
414
722
    def do_readv(self, relpath):
415
 
        offsets = self._deserialise_offsets(self._recv_body())
 
723
        self._converted_command = True
 
724
        self._end_of_body_handler = self._handle_readv_offsets
 
725
        self._relpath = relpath
 
726
 
 
727
    def end_of_body(self):
 
728
        """No more body data will be received."""
 
729
        self._run_handler_code(self._end_of_body_handler, (), {})
 
730
        # cannot read after this.
 
731
        self.finished_reading = True
 
732
 
 
733
    def _handle_readv_offsets(self):
 
734
        """accept offsets for a readv request."""
 
735
        offsets = self._deserialise_offsets(self._body_bytes)
416
736
        backing_bytes = ''.join(bytes for offset, bytes in
417
 
                             self._backing_transport.readv(relpath, offsets))
418
 
        return SmartServerResponse(('readv',), backing_bytes)
 
737
            self._backing_transport.readv(self._relpath, offsets))
 
738
        self.response = SmartServerResponse(('readv',), backing_bytes)
419
739
        
420
740
    def do_rename(self, rel_from, rel_to):
421
741
        self._backing_transport.rename(rel_from, rel_to)
439
759
        return SmartServerResponse((), tmpf.read())
440
760
 
441
761
    def dispatch_command(self, cmd, args):
 
762
        """Deprecated compatibility method.""" # XXX XXX
442
763
        func = getattr(self, 'do_' + cmd, None)
443
764
        if func is None:
444
765
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
766
        self._run_handler_code(func, args, {})
 
767
 
 
768
    def _run_handler_code(self, callable, args, kwargs):
 
769
        """Run some handler specific code 'callable'.
 
770
 
 
771
        If a result is returned, it is considered to be the commands response,
 
772
        and finished_reading is set true, and its assigned to self.response.
 
773
 
 
774
        Any exceptions caught are translated and a response object created
 
775
        from them.
 
776
        """
 
777
        result = self._call_converting_errors(callable, args, kwargs)
 
778
        if result is not None:
 
779
            self.response = result
 
780
            self.finished_reading = True
 
781
        # handle unconverted commands
 
782
        if not self._converted_command:
 
783
            self.finished_reading = True
 
784
            if result is None:
 
785
                self.response = SmartServerResponse(('ok',))
 
786
 
 
787
    def _call_converting_errors(self, callable, args, kwargs):
 
788
        """Call callable converting errors to Response objects."""
445
789
        try:
446
 
            result = func(*args)
447
 
            if result is None: 
448
 
                result = SmartServerResponse(('ok',))
449
 
            return result
 
790
            return callable(*args, **kwargs)
450
791
        except errors.NoSuchFile, e:
451
792
            return SmartServerResponse(('NoSuchFile', e.path))
452
793
        except errors.FileExists, e:
477
818
class SmartTCPServer(object):
478
819
    """Listens on a TCP socket and accepts connections from smart clients"""
479
820
 
480
 
    def __init__(self, backing_transport=None, host='127.0.0.1', port=0):
 
821
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
481
822
        """Construct a new server.
482
823
 
483
824
        To actually start it running, call either start_background_thread or
486
827
        :param host: Name of the interface to listen on.
487
828
        :param port: TCP port to listen on, or 0 to allocate a transient port.
488
829
        """
489
 
        if backing_transport is None:
490
 
            backing_transport = memory.MemoryTransport()
491
830
        self._server_socket = socket.socket()
492
831
        self._server_socket.bind((host, port))
493
832
        self.port = self._server_socket.getsockname()[1]
522
861
        # propogates to the newly accepted socket.
523
862
        conn.setblocking(True)
524
863
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
525
 
        from_client = conn.makefile('r')
526
 
        to_client = conn.makefile('w')
527
 
        handler = SmartStreamServer(from_client, to_client,
528
 
                self.backing_transport)
 
864
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
529
865
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
530
866
        connection_thread.setDaemon(True)
531
867
        connection_thread.start()
608
944
    # SmartTransport is an adapter from the Transport object model to the 
609
945
    # SmartClient model, not an encoder.
610
946
 
611
 
    def __init__(self, url, clone_from=None, client=None):
 
947
    def __init__(self, url, clone_from=None, medium=None):
612
948
        """Constructor.
613
949
 
614
 
        :param client: ignored when clone_from is not None.
 
950
        :param medium: The medium to use for this RemoteTransport. This must be
 
951
            supplied if clone_from is None.
615
952
        """
616
953
        ### Technically super() here is faulty because Transport's __init__
617
954
        ### fails to take 2 parameters, and if super were to choose a silly
622
959
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
623
960
                transport.split_url(url)
624
961
        if clone_from is None:
625
 
            if client is None:
626
 
                self._client = SmartStreamClient(self._connect_to_server)
627
 
            else:
628
 
                self._client = client
 
962
            self._medium = medium
629
963
        else:
630
964
            # credentials may be stripped from the base in some circumstances
631
965
            # as yet to be clearly defined or documented, so copy them.
632
966
            self._username = clone_from._username
633
967
            # reuse same connection
634
 
            self._client = clone_from._client
 
968
            self._medium = clone_from._medium
 
969
        assert self._medium is not None
635
970
 
636
971
    def abspath(self, relpath):
637
972
        """Return the full url to the given relative path.
647
982
        This essentially opens a handle on a different remote directory.
648
983
        """
649
984
        if relative_url is None:
650
 
            return self.__class__(self.base, self)
 
985
            return SmartTransport(self.base, self)
651
986
        else:
652
 
            return self.__class__(self.abspath(relative_url), self)
 
987
            return SmartTransport(self.abspath(relative_url), self)
653
988
 
654
989
    def is_readonly(self):
655
990
        """Smart server transport can do read/write file operations."""
656
991
        return False
657
992
                                                   
658
993
    def get_smart_client(self):
659
 
        return self._client
 
994
        return self._medium
 
995
 
 
996
    def get_smart_medium(self):
 
997
        return self._medium
660
998
                                                   
661
999
    def _unparse_url(self, path):
662
1000
        """Return URL for a path.
679
1017
        """Returns the Unicode version of the absolute path for relpath."""
680
1018
        return self._combine_paths(self._path, relpath)
681
1019
 
 
1020
    def _call(self, method, *args):
 
1021
        resp = self._call2(method, *args)
 
1022
        self._translate_error(resp)
 
1023
 
 
1024
    def _call2(self, method, *args):
 
1025
        """Call a method on the remote server."""
 
1026
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1027
        protocol.call(method, *args)
 
1028
        return protocol.read_response_tuple()
 
1029
 
 
1030
    def _call_with_body_bytes(self, method, args, body):
 
1031
        """Call a method on the remote server with body bytes."""
 
1032
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1033
        protocol.call_with_body_bytes((method, ) + args, body)
 
1034
        return protocol.read_response_tuple()
 
1035
 
682
1036
    def has(self, relpath):
683
1037
        """Indicate whether a remote file of the given name exists or not.
684
1038
 
685
1039
        :see: Transport.has()
686
1040
        """
687
 
        resp = self._client._call('has', self._remote_path(relpath))
 
1041
        resp = self._call2('has', self._remote_path(relpath))
688
1042
        if resp == ('yes', ):
689
1043
            return True
690
1044
        elif resp == ('no', ):
697
1051
        
698
1052
        :see: Transport.get_bytes()/get_file()
699
1053
        """
 
1054
        return StringIO(self.get_bytes(relpath))
 
1055
 
 
1056
    def get_bytes(self, relpath):
700
1057
        remote = self._remote_path(relpath)
701
 
        resp = self._client._call('get', remote)
 
1058
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1059
        protocol.call('get', remote)
 
1060
        resp = protocol.read_response_tuple(True)
702
1061
        if resp != ('ok', ):
 
1062
            protocol.cancel_read_body()
703
1063
            self._translate_error(resp, relpath)
704
 
        return StringIO(self._client._recv_bulk())
 
1064
        return protocol.read_body_bytes()
705
1065
 
706
1066
    def _serialise_optional_mode(self, mode):
707
1067
        if mode is None:
710
1070
            return '%d' % mode
711
1071
 
712
1072
    def mkdir(self, relpath, mode=None):
713
 
        resp = self._client._call('mkdir', 
714
 
                                  self._remote_path(relpath), 
715
 
                                  self._serialise_optional_mode(mode))
 
1073
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1074
            self._serialise_optional_mode(mode))
716
1075
        self._translate_error(resp)
717
1076
 
718
1077
    def put_bytes(self, relpath, upload_contents, mode=None):
719
1078
        # FIXME: upload_file is probably not safe for non-ascii characters -
720
1079
        # should probably just pass all parameters as length-delimited
721
1080
        # strings?
722
 
        resp = self._client._call_with_upload(
723
 
            'put',
 
1081
        resp = self._call_with_body_bytes('put',
724
1082
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
725
1083
            upload_contents)
726
1084
        self._translate_error(resp)
734
1092
        if create_parent_dir:
735
1093
            create_parent_str = 'T'
736
1094
 
737
 
        resp = self._client._call_with_upload(
 
1095
        resp = self._call_with_body_bytes(
738
1096
            'put_non_atomic',
739
1097
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
740
1098
             create_parent_str, self._serialise_optional_mode(dir_mode)),
763
1121
        return self.append_bytes(relpath, from_file.read(), mode)
764
1122
        
765
1123
    def append_bytes(self, relpath, bytes, mode=None):
766
 
        resp = self._client._call_with_upload(
 
1124
        resp = self._call_with_body_bytes(
767
1125
            'append',
768
1126
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
769
1127
            bytes)
772
1130
        self._translate_error(resp)
773
1131
 
774
1132
    def delete(self, relpath):
775
 
        resp = self._client._call('delete', self._remote_path(relpath))
 
1133
        resp = self._call2('delete', self._remote_path(relpath))
776
1134
        self._translate_error(resp)
777
1135
 
778
1136
    def readv(self, relpath, offsets):
789
1147
                               limit=self._max_readv_combine,
790
1148
                               fudge_factor=self._bytes_to_read_before_seek))
791
1149
 
792
 
 
793
 
        resp = self._client._call_with_upload(
794
 
            'readv',
795
 
            (self._remote_path(relpath),),
796
 
            self._client._serialise_offsets((c.start, c.length) for c in coalesced))
 
1150
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1151
        protocol.call_with_body_readv_array(
 
1152
            ('readv', self._remote_path(relpath)),
 
1153
            [(c.start, c.length) for c in coalesced])
 
1154
        resp = protocol.read_response_tuple(True)
797
1155
 
798
1156
        if resp[0] != 'readv':
799
1157
            # This should raise an exception
 
1158
            protocol.cancel_read_body()
800
1159
            self._translate_error(resp)
801
1160
            return
802
1161
 
803
 
        data = self._client._recv_bulk()
 
1162
        # FIXME: this should know how many bytes are needed, for clarity.
 
1163
        data = protocol.read_body_bytes()
804
1164
        # Cache the results, but only until they have been fulfilled
805
1165
        data_map = {}
806
1166
        for c_offset in coalesced:
819
1179
                cur_offset_and_size = offset_stack.next()
820
1180
 
821
1181
    def rename(self, rel_from, rel_to):
822
 
        self._call('rename', 
 
1182
        self._call('rename',
823
1183
                   self._remote_path(rel_from),
824
1184
                   self._remote_path(rel_to))
825
1185
 
826
1186
    def move(self, rel_from, rel_to):
827
 
        self._call('move', 
 
1187
        self._call('move',
828
1188
                   self._remote_path(rel_from),
829
1189
                   self._remote_path(rel_to))
830
1190
 
831
1191
    def rmdir(self, relpath):
832
1192
        resp = self._call('rmdir', self._remote_path(relpath))
833
1193
 
834
 
    def _call(self, method, *args):
835
 
        resp = self._client._call(method, *args)
836
 
        self._translate_error(resp)
837
 
 
838
1194
    def _translate_error(self, resp, orig_path=None):
839
1195
        """Raise an exception from a response"""
840
1196
        if resp is None:
877
1233
        else:
878
1234
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
879
1235
 
880
 
    def _send_tuple(self, args):
881
 
        self._client._send_tuple(args)
882
 
 
883
 
    def _recv_tuple(self):
884
 
        return self._client._recv_tuple()
885
 
 
886
1236
    def disconnect(self):
887
 
        self._client.disconnect()
 
1237
        self._medium.disconnect()
888
1238
 
889
1239
    def delete_tree(self, relpath):
890
1240
        raise errors.TransportNotPossible('readonly transport')
891
1241
 
892
1242
    def stat(self, relpath):
893
 
        resp = self._client._call('stat', self._remote_path(relpath))
 
1243
        resp = self._call2('stat', self._remote_path(relpath))
894
1244
        if resp[0] == 'stat':
895
1245
            return SmartStat(int(resp[1]), int(resp[2], 8))
896
1246
        else:
913
1263
        return True
914
1264
 
915
1265
    def list_dir(self, relpath):
916
 
        resp = self._client._call('list_dir',
917
 
                                  self._remote_path(relpath))
 
1266
        resp = self._call2('list_dir', self._remote_path(relpath))
918
1267
        if resp[0] == 'names':
919
1268
            return [name.encode('ascii') for name in resp[1:]]
920
1269
        else:
921
1270
            self._translate_error(resp)
922
1271
 
923
1272
    def iter_files_recursive(self):
924
 
        resp = self._client._call('iter_files_recursive',
925
 
                                  self._remote_path(''))
 
1273
        resp = self._call2('iter_files_recursive', self._remote_path(''))
926
1274
        if resp[0] == 'names':
927
1275
            return resp[1:]
928
1276
        else:
929
1277
            self._translate_error(resp)
930
1278
 
931
1279
 
932
 
class SmartStreamClient(SmartProtocolBase):
933
 
    """Connection to smart server over two streams"""
934
 
 
935
 
    def __init__(self, connect_func):
936
 
        self._connect_func = connect_func
937
 
        self._connected = False
938
 
 
939
 
    def __del__(self):
940
 
        self.disconnect()
941
 
 
942
 
    def _ensure_connection(self):
943
 
        if not self._connected:
944
 
            self._in, self._out = self._connect_func()
945
 
            self._connected = True
946
 
 
947
 
    def _send_tuple(self, args):
948
 
        self._ensure_connection()
949
 
        return self._write_and_flush(_encode_tuple(args))
950
 
 
951
 
    def _send_bulk_data(self, body):
952
 
        self._ensure_connection()
953
 
        SmartProtocolBase._send_bulk_data(self, body)
954
 
        
955
 
    def _recv_bulk(self):
956
 
        self._ensure_connection()
957
 
        return SmartProtocolBase._recv_bulk(self)
 
1280
class SmartClientMediumRequest(object):
 
1281
    """A request on a SmartClientMedium.
 
1282
 
 
1283
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1284
    the response bytes to be read via read_bytes.
 
1285
 
 
1286
    For instance:
 
1287
    request.accept_bytes('123')
 
1288
    request.finished_writing()
 
1289
    result = request.read_bytes(3)
 
1290
    request.finished_reading()
 
1291
 
 
1292
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1293
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1294
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1295
    details on concurrency and pipelining.
 
1296
    """
 
1297
 
 
1298
    def __init__(self, medium):
 
1299
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1300
        self._medium = medium
 
1301
        # we track state by constants - we may want to use the same
 
1302
        # pattern as BodyReader if it gets more complex.
 
1303
        # valid states are: "writing", "reading", "done"
 
1304
        self._state = "writing"
 
1305
 
 
1306
    def accept_bytes(self, bytes):
 
1307
        """Accept bytes for inclusion in this request.
 
1308
 
 
1309
        This method may not be be called after finished_writing() has been
 
1310
        called.  It depends upon the Medium whether or not the bytes will be
 
1311
        immediately transmitted. Message based Mediums will tend to buffer the
 
1312
        bytes until finished_writing() is called.
 
1313
 
 
1314
        :param bytes: A bytestring.
 
1315
        """
 
1316
        if self._state != "writing":
 
1317
            raise errors.WritingCompleted(self)
 
1318
        self._accept_bytes(bytes)
 
1319
 
 
1320
    def _accept_bytes(self, bytes):
 
1321
        """Helper for accept_bytes.
 
1322
 
 
1323
        Accept_bytes checks the state of the request to determing if bytes
 
1324
        should be accepted. After that it hands off to _accept_bytes to do the
 
1325
        actual acceptance.
 
1326
        """
 
1327
        raise NotImplementedError(self._accept_bytes)
 
1328
 
 
1329
    def finished_reading(self):
 
1330
        """Inform the request that all desired data has been read.
 
1331
 
 
1332
        This will remove the request from the pipeline for its medium (if the
 
1333
        medium supports pipelining) and any further calls to methods on the
 
1334
        request will raise ReadingCompleted.
 
1335
        """
 
1336
        if self._state == "writing":
 
1337
            raise errors.WritingNotComplete(self)
 
1338
        if self._state != "reading":
 
1339
            raise errors.ReadingCompleted(self)
 
1340
        self._state = "done"
 
1341
        self._finished_reading()
 
1342
 
 
1343
    def _finished_reading(self):
 
1344
        """Helper for finished_reading.
 
1345
 
 
1346
        finished_reading checks the state of the request to determine if 
 
1347
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1348
        to perform the action.
 
1349
        """
 
1350
        raise NotImplementedError(self._finished_reading)
 
1351
 
 
1352
    def finished_writing(self):
 
1353
        """Finish the writing phase of this request.
 
1354
 
 
1355
        This will flush all pending data for this request along the medium.
 
1356
        After calling finished_writing, you may not call accept_bytes anymore.
 
1357
        """
 
1358
        if self._state != "writing":
 
1359
            raise errors.WritingCompleted(self)
 
1360
        self._state = "reading"
 
1361
        self._finished_writing()
 
1362
 
 
1363
    def _finished_writing(self):
 
1364
        """Helper for finished_writing.
 
1365
 
 
1366
        finished_writing checks the state of the request to determine if 
 
1367
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1368
        to perform the action.
 
1369
        """
 
1370
        raise NotImplementedError(self._finished_writing)
 
1371
 
 
1372
    def read_bytes(self, count):
 
1373
        """Read bytes from this requests response.
 
1374
 
 
1375
        This method will block and wait for count bytes to be read. It may not
 
1376
        be invoked until finished_writing() has been called - this is to ensure
 
1377
        a message-based approach to requests, for compatability with message
 
1378
        based mediums like HTTP.
 
1379
        """
 
1380
        if self._state == "writing":
 
1381
            raise errors.WritingNotComplete(self)
 
1382
        if self._state != "reading":
 
1383
            raise errors.ReadingCompleted(self)
 
1384
        return self._read_bytes(count)
 
1385
 
 
1386
    def _read_bytes(self, count):
 
1387
        """Helper for read_bytes.
 
1388
 
 
1389
        read_bytes checks the state of the request to determing if bytes
 
1390
        should be read. After that it hands off to _read_bytes to do the
 
1391
        actual read.
 
1392
        """
 
1393
        raise NotImplementedError(self._read_bytes)
 
1394
 
 
1395
 
 
1396
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1397
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1398
 
 
1399
    def __init__(self, medium):
 
1400
        SmartClientMediumRequest.__init__(self, medium)
 
1401
        # check that we are safe concurrency wise. If some streams start
 
1402
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1403
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1404
        # and the setting/unsetting of _current_request likewise moved into
 
1405
        # that class : but its unneeded overhead for now. RBC 20060922
 
1406
        if self._medium._current_request is not None:
 
1407
            raise errors.TooManyConcurrentRequests(self._medium)
 
1408
        self._medium._current_request = self
 
1409
 
 
1410
    def _accept_bytes(self, bytes):
 
1411
        """See SmartClientMediumRequest._accept_bytes.
 
1412
        
 
1413
        This forwards to self._medium._accept_bytes because we are operating
 
1414
        on the mediums stream.
 
1415
        """
 
1416
        self._medium._accept_bytes(bytes)
 
1417
 
 
1418
    def _finished_reading(self):
 
1419
        """See SmartClientMediumRequest._finished_reading.
 
1420
 
 
1421
        This clears the _current_request on self._medium to allow a new 
 
1422
        request to be created.
 
1423
        """
 
1424
        assert self._medium._current_request is self
 
1425
        self._medium._current_request = None
 
1426
        
 
1427
    def _finished_writing(self):
 
1428
        """See SmartClientMediumRequest._finished_writing.
 
1429
 
 
1430
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1431
        """
 
1432
        self._medium._flush()
 
1433
 
 
1434
    def _read_bytes(self, count):
 
1435
        """See SmartClientMediumRequest._read_bytes.
 
1436
        
 
1437
        This forwards to self._medium._read_bytes because we are operating
 
1438
        on the mediums stream.
 
1439
        """
 
1440
        return self._medium._read_bytes(count)
 
1441
 
 
1442
 
 
1443
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1444
    """The client-side protocol for smart version 1."""
 
1445
 
 
1446
    def __init__(self, request):
 
1447
        """Construct a SmartClientRequestProtocolOne.
 
1448
 
 
1449
        :param request: A SmartClientMediumRequest to serialise onto and
 
1450
            deserialise from.
 
1451
        """
 
1452
        self._request = request
 
1453
        self._body_buffer = None
 
1454
 
 
1455
    def call(self, *args):
 
1456
        bytes = _encode_tuple(args)
 
1457
        self._request.accept_bytes(bytes)
 
1458
        self._request.finished_writing()
 
1459
 
 
1460
    def call_with_body_bytes(self, args, body):
 
1461
        """Make a remote call of args with body bytes 'body'.
 
1462
 
 
1463
        After calling this, call read_response_tuple to find the result out.
 
1464
        """
 
1465
        bytes = _encode_tuple(args)
 
1466
        self._request.accept_bytes(bytes)
 
1467
        bytes = self._encode_bulk_data(body)
 
1468
        self._request.accept_bytes(bytes)
 
1469
        self._request.finished_writing()
 
1470
 
 
1471
    def call_with_body_readv_array(self, args, body):
 
1472
        """Make a remote call with a readv array.
 
1473
 
 
1474
        The body is encoded with one line per readv offset pair. The numbers in
 
1475
        each pair are separated by a comma, and no trailing \n is emitted.
 
1476
        """
 
1477
        bytes = _encode_tuple(args)
 
1478
        self._request.accept_bytes(bytes)
 
1479
        readv_bytes = self._serialise_offsets(body)
 
1480
        bytes = self._encode_bulk_data(readv_bytes)
 
1481
        self._request.accept_bytes(bytes)
 
1482
        self._request.finished_writing()
 
1483
 
 
1484
    def cancel_read_body(self):
 
1485
        """After expecting a body, a response code may indicate one otherwise.
 
1486
 
 
1487
        This method lets the domain client inform the protocol that no body
 
1488
        will be transmitted. This is a terminal method: after calling it the
 
1489
        protocol is not able to be used further.
 
1490
        """
 
1491
        self._request.finished_reading()
 
1492
 
 
1493
    def read_response_tuple(self, expect_body=False):
 
1494
        """Read a response tuple from the wire.
 
1495
 
 
1496
        This should only be called once.
 
1497
        """
 
1498
        result = self._recv_tuple()
 
1499
        if not expect_body:
 
1500
            self._request.finished_reading()
 
1501
        return result
 
1502
 
 
1503
    def read_body_bytes(self, count=-1):
 
1504
        """Read bytes from the body, decoding into a byte stream.
 
1505
        
 
1506
        We read all bytes at once to ensure we've checked the trailer for 
 
1507
        errors, and then feed the buffer back as read_body_bytes is called.
 
1508
        """
 
1509
        if self._body_buffer is not None:
 
1510
            return self._body_buffer.read(count)
 
1511
        _body_decoder = LengthPrefixedBodyDecoder()
 
1512
 
 
1513
        while not _body_decoder.finished_reading:
 
1514
            bytes_wanted = _body_decoder.next_read_size()
 
1515
            bytes = self._request.read_bytes(bytes_wanted)
 
1516
            _body_decoder.accept_bytes(bytes)
 
1517
        self._request.finished_reading()
 
1518
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1519
        # XXX: TODO check the trailer result.
 
1520
        return self._body_buffer.read(count)
958
1521
 
959
1522
    def _recv_tuple(self):
960
 
        self._ensure_connection()
961
 
        return SmartProtocolBase._recv_tuple(self)
962
 
 
963
 
    def _recv_trailer(self):
964
 
        self._ensure_connection()
965
 
        return SmartProtocolBase._recv_trailer(self)
966
 
 
967
 
    def disconnect(self):
968
 
        """Close connection to the server"""
969
 
        if self._connected:
970
 
            self._out.close()
971
 
            self._in.close()
972
 
 
973
 
    def _call(self, *args):
974
 
        self._send_tuple(args)
975
 
        return self._recv_tuple()
976
 
 
977
 
    def _call_with_upload(self, method, args, body):
978
 
        """Call an rpc, supplying bulk upload data.
979
 
 
980
 
        :param method: method name to call
981
 
        :param args: parameter args tuple
982
 
        :param body: upload body as a byte string
983
 
        """
984
 
        self._send_tuple((method,) + args)
985
 
        self._send_bulk_data(body)
986
 
        return self._recv_tuple()
 
1523
        """Receive a tuple from the medium request."""
 
1524
        line = ''
 
1525
        while not line or line[-1] != '\n':
 
1526
            # TODO: this is inefficient - but tuples are short.
 
1527
            new_char = self._request.read_bytes(1)
 
1528
            line += new_char
 
1529
            assert new_char != '', "end of file reading from server."
 
1530
        return _decode_tuple(line)
987
1531
 
988
1532
    def query_version(self):
989
1533
        """Return protocol version number of the server."""
990
 
        # XXX: should make sure it's empty
991
 
        self._send_tuple(('hello',))
992
 
        resp = self._recv_tuple()
 
1534
        self.call('hello')
 
1535
        resp = self.read_response_tuple()
993
1536
        if resp == ('ok', '1'):
994
1537
            return 1
995
1538
        else:
996
1539
            raise errors.SmartProtocolError("bad response %r" % (resp,))
997
1540
 
998
1541
 
999
 
class SmartTCPTransport(SmartTransport):
1000
 
    """Connection to smart server over plain tcp"""
1001
 
 
1002
 
    def __init__(self, url, clone_from=None):
1003
 
        super(SmartTCPTransport, self).__init__(url, clone_from)
1004
 
        try:
1005
 
            self._port = int(self._port)
1006
 
        except (ValueError, TypeError), e:
1007
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1008
 
        self._socket = None
1009
 
 
1010
 
    def _connect_to_server(self):
 
1542
class SmartClientMedium(object):
 
1543
    """Smart client is a medium for sending smart protocol requests over."""
 
1544
 
 
1545
    def disconnect(self):
 
1546
        """If this medium maintains a persistent connection, close it.
 
1547
        
 
1548
        The default implementation does nothing.
 
1549
        """
 
1550
        
 
1551
 
 
1552
class SmartClientStreamMedium(SmartClientMedium):
 
1553
    """Stream based medium common class.
 
1554
 
 
1555
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1556
    SmartClientStreamMediumRequest for their requests, and should implement
 
1557
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1558
    receive bytes.
 
1559
    """
 
1560
 
 
1561
    def __init__(self):
 
1562
        self._current_request = None
 
1563
 
 
1564
    def accept_bytes(self, bytes):
 
1565
        self._accept_bytes(bytes)
 
1566
 
 
1567
    def __del__(self):
 
1568
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1569
        finished with it.
 
1570
        """
 
1571
        self.disconnect()
 
1572
 
 
1573
    def _flush(self):
 
1574
        """Flush the output stream.
 
1575
        
 
1576
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1577
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1578
        """
 
1579
        raise NotImplementedError(self._flush)
 
1580
 
 
1581
    def get_request(self):
 
1582
        """See SmartClientMedium.get_request().
 
1583
 
 
1584
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1585
        for get_request.
 
1586
        """
 
1587
        return SmartClientStreamMediumRequest(self)
 
1588
 
 
1589
    def read_bytes(self, count):
 
1590
        return self._read_bytes(count)
 
1591
 
 
1592
 
 
1593
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1594
    """A client medium using simple pipes.
 
1595
    
 
1596
    This client does not manage the pipes: it assumes they will always be open.
 
1597
    """
 
1598
 
 
1599
    def __init__(self, readable_pipe, writeable_pipe):
 
1600
        SmartClientStreamMedium.__init__(self)
 
1601
        self._readable_pipe = readable_pipe
 
1602
        self._writeable_pipe = writeable_pipe
 
1603
 
 
1604
    def _accept_bytes(self, bytes):
 
1605
        """See SmartClientStreamMedium.accept_bytes."""
 
1606
        self._writeable_pipe.write(bytes)
 
1607
 
 
1608
    def _flush(self):
 
1609
        """See SmartClientStreamMedium._flush()."""
 
1610
        self._writeable_pipe.flush()
 
1611
 
 
1612
    def _read_bytes(self, count):
 
1613
        """See SmartClientStreamMedium._read_bytes."""
 
1614
        return self._readable_pipe.read(count)
 
1615
 
 
1616
 
 
1617
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1618
    """A client medium using SSH."""
 
1619
    
 
1620
    def __init__(self, host, port=None, username=None, password=None,
 
1621
            vendor=None):
 
1622
        """Creates a client that will connect on the first use.
 
1623
        
 
1624
        :param vendor: An optional override for the ssh vendor to use. See
 
1625
            bzrlib.transport.ssh for details on ssh vendors.
 
1626
        """
 
1627
        SmartClientStreamMedium.__init__(self)
 
1628
        self._connected = False
 
1629
        self._host = host
 
1630
        self._password = password
 
1631
        self._port = port
 
1632
        self._username = username
 
1633
        self._read_from = None
 
1634
        self._ssh_connection = None
 
1635
        self._vendor = vendor
 
1636
        self._write_to = None
 
1637
 
 
1638
    def _accept_bytes(self, bytes):
 
1639
        """See SmartClientStreamMedium.accept_bytes."""
 
1640
        self._ensure_connection()
 
1641
        self._write_to.write(bytes)
 
1642
 
 
1643
    def disconnect(self):
 
1644
        """See SmartClientMedium.disconnect()."""
 
1645
        if not self._connected:
 
1646
            return
 
1647
        self._read_from.close()
 
1648
        self._write_to.close()
 
1649
        self._ssh_connection.close()
 
1650
        self._connected = False
 
1651
 
 
1652
    def _ensure_connection(self):
 
1653
        """Connect this medium if not already connected."""
 
1654
        if self._connected:
 
1655
            return
 
1656
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1657
        if self._vendor is None:
 
1658
            vendor = ssh._get_ssh_vendor()
 
1659
        else:
 
1660
            vendor = self._vendor
 
1661
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1662
                self._password, self._host, self._port,
 
1663
                command=[executable, 'serve', '--inet', '--directory=/',
 
1664
                         '--allow-writes'])
 
1665
        self._read_from, self._write_to = \
 
1666
            self._ssh_connection.get_filelike_channels()
 
1667
        self._connected = True
 
1668
 
 
1669
    def _flush(self):
 
1670
        """See SmartClientStreamMedium._flush()."""
 
1671
        self._write_to.flush()
 
1672
 
 
1673
    def _read_bytes(self, count):
 
1674
        """See SmartClientStreamMedium.read_bytes."""
 
1675
        if not self._connected:
 
1676
            raise errors.MediumNotConnected(self)
 
1677
        return self._read_from.read(count)
 
1678
 
 
1679
 
 
1680
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1681
    """A client medium using TCP."""
 
1682
    
 
1683
    def __init__(self, host, port):
 
1684
        """Creates a client that will connect on the first use."""
 
1685
        SmartClientStreamMedium.__init__(self)
 
1686
        self._connected = False
 
1687
        self._host = host
 
1688
        self._port = port
 
1689
        self._socket = None
 
1690
 
 
1691
    def _accept_bytes(self, bytes):
 
1692
        """See SmartClientMedium.accept_bytes."""
 
1693
        self._ensure_connection()
 
1694
        self._socket.sendall(bytes)
 
1695
 
 
1696
    def disconnect(self):
 
1697
        """See SmartClientMedium.disconnect()."""
 
1698
        if not self._connected:
 
1699
            return
 
1700
        self._socket.close()
 
1701
        self._socket = None
 
1702
        self._connected = False
 
1703
 
 
1704
    def _ensure_connection(self):
 
1705
        """Connect this medium if not already connected."""
 
1706
        if self._connected:
 
1707
            return
1011
1708
        self._socket = socket.socket()
1012
1709
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1013
1710
        result = self._socket.connect_ex((self._host, int(self._port)))
1014
1711
        if result:
1015
1712
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1016
1713
                    (self._host, self._port, os.strerror(result)))
1017
 
        # TODO: May be more efficient to just treat them as sockets
1018
 
        # throughout?  But what about pipes to ssh?...
1019
 
        to_server = self._socket.makefile('w')
1020
 
        from_server = self._socket.makefile('r')
1021
 
        return from_server, to_server
1022
 
 
1023
 
    def disconnect(self):
1024
 
        super(SmartTCPTransport, self).disconnect()
1025
 
        # XXX: Is closing the socket as well as closing the files really
1026
 
        # necessary?
1027
 
        if self._socket is not None:
1028
 
            self._socket.close()
 
1714
        self._connected = True
 
1715
 
 
1716
    def _flush(self):
 
1717
        """See SmartClientStreamMedium._flush().
 
1718
        
 
1719
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1720
        add a means to do a flush, but that can be done in the future.
 
1721
        """
 
1722
 
 
1723
    def _read_bytes(self, count):
 
1724
        """See SmartClientMedium.read_bytes."""
 
1725
        if not self._connected:
 
1726
            raise errors.MediumNotConnected(self)
 
1727
        return self._socket.recv(count)
 
1728
 
 
1729
 
 
1730
class SmartTCPTransport(SmartTransport):
 
1731
    """Connection to smart server over plain tcp.
 
1732
    
 
1733
    This is essentially just a factory to get 'RemoteTransport(url,
 
1734
        SmartTCPClientMedium).
 
1735
    """
 
1736
 
 
1737
    def __init__(self, url):
 
1738
        _scheme, _username, _password, _host, _port, _path = \
 
1739
            transport.split_url(url)
 
1740
        try:
 
1741
            _port = int(_port)
 
1742
        except (ValueError, TypeError), e:
 
1743
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
 
1744
        medium = SmartTCPClientMedium(_host, _port)
 
1745
        super(SmartTCPTransport, self).__init__(url, medium=medium)
1029
1746
 
1030
1747
 
1031
1748
class SmartSSHTransport(SmartTransport):
1032
 
    """Connection to smart server over SSH."""
1033
 
 
1034
 
    def __init__(self, url, clone_from=None):
1035
 
        # TODO: all this probably belongs in the parent class.
1036
 
        super(SmartSSHTransport, self).__init__(url, clone_from)
 
1749
    """Connection to smart server over SSH.
 
1750
 
 
1751
    This is essentially just a factory to get 'RemoteTransport(url,
 
1752
        SmartSSHClientMedium).
 
1753
    """
 
1754
 
 
1755
    def __init__(self, url):
 
1756
        _scheme, _username, _password, _host, _port, _path = \
 
1757
            transport.split_url(url)
1037
1758
        try:
1038
 
            if self._port is not None:
1039
 
                self._port = int(self._port)
 
1759
            if _port is not None:
 
1760
                _port = int(_port)
1040
1761
        except (ValueError, TypeError), e:
1041
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1042
 
 
1043
 
    def _connect_to_server(self):
1044
 
        from bzrlib.transport import ssh
1045
 
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1046
 
        vendor = ssh._get_ssh_vendor()
1047
 
        self._ssh_connection = vendor.connect_ssh(self._username,
1048
 
                self._password, self._host, self._port,
1049
 
                command=[executable, 'serve', '--inet', '--directory=/',
1050
 
                         '--allow-writes'])
1051
 
        return self._ssh_connection.get_filelike_channels()
1052
 
 
1053
 
    def disconnect(self):
1054
 
        super(SmartSSHTransport, self).disconnect()
1055
 
        self._ssh_connection.close()
 
1762
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1763
                _port)
 
1764
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1765
        super(SmartSSHTransport, self).__init__(url, medium=medium)
1056
1766
 
1057
1767
 
1058
1768
def get_test_permutations():
1059
 
    """Return (transport, server) permutations for testing"""
 
1769
    """Return (transport, server) permutations for testing."""
 
1770
    ### We may need a little more test framework support to construct an
 
1771
    ### appropriate RemoteTransport in the future.
1060
1772
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]