~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/remote.py

  • Committer: John Arbash Meinel
  • Date: 2007-04-12 20:36:40 UTC
  • mfrom: (2413 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2566.
  • Revision ID: john@arbash-meinel.com-20070412203640-z1jld315288moxvy
[merge] bzr.dev 2413

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Smart-server protocol, client and server.
18
 
 
19
 
Requests are sent as a command and list of arguments, followed by optional
20
 
bulk body data.  Responses are similarly a response and list of arguments,
21
 
followed by bulk body data. ::
22
 
 
23
 
  SEP := '\001'
24
 
    Fields are separated by Ctrl-A.
25
 
  BULK_DATA := CHUNK TRAILER
26
 
    Chunks can be repeated as many times as necessary.
27
 
  CHUNK := CHUNK_LEN CHUNK_BODY
28
 
  CHUNK_LEN := DIGIT+ NEWLINE
29
 
    Gives the number of bytes in the following chunk.
30
 
  CHUNK_BODY := BYTE[chunk_len]
31
 
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
 
  SUCCESS_TRAILER := 'done' NEWLINE
33
 
  ERROR_TRAILER := 
34
 
 
35
 
Paths are passed across the network.  The client needs to see a namespace that
36
 
includes any repository that might need to be referenced, and the client needs
37
 
to know about a root directory beyond which it cannot ascend.
38
 
 
39
 
Servers run over ssh will typically want to be able to access any path the user 
40
 
can access.  Public servers on the other hand (which might be over http, ssh
41
 
or tcp) will typically want to restrict access to only a particular directory 
42
 
and its children, so will want to do a software virtual root at that level.
43
 
In other words they'll want to rewrite incoming paths to be under that level
44
 
(and prevent escaping using ../ tricks.)
45
 
 
46
 
URLs that include ~ should probably be passed across to the server verbatim
47
 
and the server can expand them.  This will proably not be meaningful when 
48
 
limited to a directory?
49
 
 
50
 
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
51
 
that you have multiple requests and get a read error because the other side did
52
 
shutdown.  For pipes we have read pipe which will have a zero read which marks
53
 
end-of-file.  For HTTP server environment there is not end-of-stream because
54
 
each request coming into the server is independent.
55
 
 
56
 
So we need a wrapper around pipes and sockets to seperate out requests from
57
 
substrate and this will give us a single model which is consist for HTTP,
58
 
sockets and pipes.
59
 
 
60
 
Server-side
61
 
-----------
62
 
 
63
 
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
64
 
          uses protocol to detect end-of-request, sends written
65
 
          bytes to client) e.g. socket, pipe, HTTP request handler.
66
 
  ^
67
 
  | bytes.
68
 
  v
69
 
 
70
 
PROTOCOL  (serialization, deserialization)  accepts bytes for one
71
 
          request, decodes according to internal state, pushes
72
 
          structured data to handler.  accepts structured data from
73
 
          handler and encodes and writes to the medium.  factory for
74
 
          handler.
75
 
  ^
76
 
  | structured data
77
 
  v
78
 
 
79
 
HANDLER   (domain logic) accepts structured data, operates state
80
 
          machine until the request can be satisfied,
81
 
          sends structured data to the protocol.
82
 
 
83
 
 
84
 
Client-side
85
 
-----------
86
 
 
87
 
 CLIENT             domain logic, accepts domain requests, generated structured
88
 
                    data, reads structured data from responses and turns into
89
 
                    domain data.  Sends structured data to the protocol.
90
 
                    Operates state machines until the request can be delivered
91
 
                    (e.g. reading from a bundle generated in bzrlib to deliver a
92
 
                    complete request).
93
 
 
94
 
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
95
 
                    ...
96
 
  ^
97
 
  | structured data
98
 
  v
99
 
 
100
 
PROTOCOL  (serialization, deserialization)  accepts structured data for one
101
 
          request, encodes and writes to the medium.  Reads bytes from the
102
 
          medium, decodes and allows the client to read structured data.
103
 
  ^
104
 
  | bytes.
105
 
  v
106
 
 
107
 
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
108
 
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
109
 
"""
110
 
 
111
 
 
112
 
# TODO: _translate_error should be on the client, not the transport because
113
 
#     error coding is wire protocol specific.
114
 
 
115
 
# TODO: A plain integer from query_version is too simple; should give some
116
 
# capabilities too?
117
 
 
118
 
# TODO: Server should probably catch exceptions within itself and send them
119
 
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
120
 
# Also needs to somehow report protocol errors like bad requests.  Need to
121
 
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
 
# bulk transfer and then something goes wrong.
123
 
 
124
 
# TODO: Standard marker at start of request/response lines?
125
 
 
126
 
# TODO: Make each request and response self-validatable, e.g. with checksums.
127
 
#
128
 
# TODO: get/put objects could be changed to gradually read back the data as it
129
 
# comes across the network
130
 
#
131
 
# TODO: What should the server do if it hits an error and has to terminate?
132
 
#
133
 
# TODO: is it useful to allow multiple chunks in the bulk data?
134
 
#
135
 
# TODO: If we get an exception during transmission of bulk data we can't just
136
 
# emit the exception because it won't be seen.
137
 
#   John proposes:  I think it would be worthwhile to have a header on each
138
 
#   chunk, that indicates it is another chunk. Then you can send an 'error'
139
 
#   chunk as long as you finish the previous chunk.
140
 
#
141
 
# TODO: Clone method on Transport; should work up towards parent directory;
142
 
# unclear how this should be stored or communicated to the server... maybe
143
 
# just pass it on all relevant requests?
144
 
#
145
 
# TODO: Better name than clone() for changing between directories.  How about
146
 
# open_dir or change_dir or chdir?
147
 
#
148
 
# TODO: Is it really good to have the notion of current directory within the
149
 
# connection?  Perhaps all Transports should factor out a common connection
150
 
# from the thing that has the directory context?
151
 
#
152
 
# TODO: Pull more things common to sftp and ssh to a higher level.
153
 
#
154
 
# TODO: The server that manages a connection should be quite small and retain
155
 
# minimum state because each of the requests are supposed to be stateless.
156
 
# Then we can write another implementation that maps to http.
157
 
#
158
 
# TODO: What to do when a client connection is garbage collected?  Maybe just
159
 
# abruptly drop the connection?
160
 
#
161
 
# TODO: Server in some cases will need to restrict access to files outside of
162
 
# a particular root directory.  LocalTransport doesn't do anything to stop you
163
 
# ascending above the base directory, so we need to prevent paths
164
 
# containing '..' in either the server or transport layers.  (Also need to
165
 
# consider what happens if someone creates a symlink pointing outside the 
166
 
# directory tree...)
167
 
#
168
 
# TODO: Server should rebase absolute paths coming across the network to put
169
 
# them under the virtual root, if one is in use.  LocalTransport currently
170
 
# doesn't do that; if you give it an absolute path it just uses it.
171
 
172
 
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
 
# urlescape them instead.  Indeed possibly this should just literally be
174
 
# http-over-ssh.
175
 
#
176
 
# FIXME: This transport, with several others, has imperfect handling of paths
177
 
# within urls.  It'd probably be better for ".." from a root to raise an error
178
 
# rather than return the same directory as we do at present.
179
 
#
180
 
# TODO: Rather than working at the Transport layer we want a Branch,
181
 
# Repository or BzrDir objects that talk to a server.
182
 
#
183
 
# TODO: Probably want some way for server commands to gradually produce body
184
 
# data rather than passing it as a string; they could perhaps pass an
185
 
# iterator-like callback that will gradually yield data; it probably needs a
186
 
# close() method that will always be closed to do any necessary cleanup.
187
 
#
188
 
# TODO: Split the actual smart server from the ssh encoding of it.
189
 
#
190
 
# TODO: Perhaps support file-level readwrite operations over the transport
191
 
# too.
192
 
#
193
 
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
 
# branch doing file-level operations.
195
 
#
196
 
 
197
17
from cStringIO import StringIO
198
 
import os
199
 
import socket
200
 
import sys
201
 
import tempfile
202
 
import threading
203
18
import urllib
204
19
import urlparse
205
20
 
206
21
from bzrlib import (
207
 
    bzrdir,
208
22
    errors,
209
 
    revision,
210
23
    transport,
211
 
    trace,
212
 
    urlutils,
213
24
    )
214
 
from bzrlib.bundle.serializer import write_bundle
215
 
try:
216
 
    from bzrlib.transport import ssh
217
 
except errors.ParamikoNotPresent:
218
 
    # no paramiko.  SmartSSHClientMedium will break.
219
 
    pass
 
25
from bzrlib.smart.protocol import SmartClientRequestProtocolOne
 
26
from bzrlib.smart.medium import SmartTCPClientMedium, SmartSSHClientMedium
220
27
 
221
28
# must do this otherwise urllib can't parse the urls properly :(
222
29
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
228
35
BZR_DEFAULT_PORT = 4155
229
36
 
230
37
 
231
 
def _recv_tuple(from_file):
232
 
    req_line = from_file.readline()
233
 
    return _decode_tuple(req_line)
234
 
 
235
 
 
236
 
def _decode_tuple(req_line):
237
 
    if req_line == None or req_line == '':
238
 
        return None
239
 
    if req_line[-1] != '\n':
240
 
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
241
 
    return tuple(req_line[:-1].split('\x01'))
242
 
 
243
 
 
244
 
def _encode_tuple(args):
245
 
    """Encode the tuple args to a bytestream."""
246
 
    return '\x01'.join(args) + '\n'
247
 
 
248
 
 
249
 
class SmartProtocolBase(object):
250
 
    """Methods common to client and server"""
251
 
 
252
 
    # TODO: this only actually accomodates a single block; possibly should
253
 
    # support multiple chunks?
254
 
    def _encode_bulk_data(self, body):
255
 
        """Encode body as a bulk data chunk."""
256
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
257
 
 
258
 
    def _serialise_offsets(self, offsets):
259
 
        """Serialise a readv offset list."""
260
 
        txt = []
261
 
        for start, length in offsets:
262
 
            txt.append('%d,%d' % (start, length))
263
 
        return '\n'.join(txt)
264
 
        
265
 
 
266
 
class SmartServerRequestProtocolOne(SmartProtocolBase):
267
 
    """Server-side encoding and decoding logic for smart version 1."""
268
 
    
269
 
    def __init__(self, backing_transport, write_func):
270
 
        self._backing_transport = backing_transport
271
 
        self.excess_buffer = ''
272
 
        self._finished = False
273
 
        self.in_buffer = ''
274
 
        self.has_dispatched = False
275
 
        self.request = None
276
 
        self._body_decoder = None
277
 
        self._write_func = write_func
278
 
 
279
 
    def accept_bytes(self, bytes):
280
 
        """Take bytes, and advance the internal state machine appropriately.
281
 
        
282
 
        :param bytes: must be a byte string
283
 
        """
284
 
        assert isinstance(bytes, str)
285
 
        self.in_buffer += bytes
286
 
        if not self.has_dispatched:
287
 
            if '\n' not in self.in_buffer:
288
 
                # no command line yet
289
 
                return
290
 
            self.has_dispatched = True
291
 
            try:
292
 
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
293
 
                first_line += '\n'
294
 
                req_args = _decode_tuple(first_line)
295
 
                self.request = SmartServerRequestHandler(
296
 
                    self._backing_transport)
297
 
                self.request.dispatch_command(req_args[0], req_args[1:])
298
 
                if self.request.finished_reading:
299
 
                    # trivial request
300
 
                    self.excess_buffer = self.in_buffer
301
 
                    self.in_buffer = ''
302
 
                    self._send_response(self.request.response.args,
303
 
                        self.request.response.body)
304
 
            except KeyboardInterrupt:
305
 
                raise
306
 
            except Exception, exception:
307
 
                # everything else: pass to client, flush, and quit
308
 
                self._send_response(('error', str(exception)))
309
 
                return
310
 
 
311
 
        if self.has_dispatched:
312
 
            if self._finished:
313
 
                # nothing to do.XXX: this routine should be a single state 
314
 
                # machine too.
315
 
                self.excess_buffer += self.in_buffer
316
 
                self.in_buffer = ''
317
 
                return
318
 
            if self._body_decoder is None:
319
 
                self._body_decoder = LengthPrefixedBodyDecoder()
320
 
            self._body_decoder.accept_bytes(self.in_buffer)
321
 
            self.in_buffer = self._body_decoder.unused_data
322
 
            body_data = self._body_decoder.read_pending_data()
323
 
            self.request.accept_body(body_data)
324
 
            if self._body_decoder.finished_reading:
325
 
                self.request.end_of_body()
326
 
                assert self.request.finished_reading, \
327
 
                    "no more body, request not finished"
328
 
            if self.request.response is not None:
329
 
                self._send_response(self.request.response.args,
330
 
                    self.request.response.body)
331
 
                self.excess_buffer = self.in_buffer
332
 
                self.in_buffer = ''
333
 
            else:
334
 
                assert not self.request.finished_reading, \
335
 
                    "no response and we have finished reading."
336
 
 
337
 
    def _send_response(self, args, body=None):
338
 
        """Send a smart server response down the output stream."""
339
 
        assert not self._finished, 'response already sent'
340
 
        self._finished = True
341
 
        self._write_func(_encode_tuple(args))
342
 
        if body is not None:
343
 
            assert isinstance(body, str), 'body must be a str'
344
 
            bytes = self._encode_bulk_data(body)
345
 
            self._write_func(bytes)
346
 
 
347
 
    def next_read_size(self):
348
 
        if self._finished:
349
 
            return 0
350
 
        if self._body_decoder is None:
351
 
            return 1
352
 
        else:
353
 
            return self._body_decoder.next_read_size()
354
 
 
355
 
 
356
 
class LengthPrefixedBodyDecoder(object):
357
 
    """Decodes the length-prefixed bulk data."""
358
 
    
359
 
    def __init__(self):
360
 
        self.bytes_left = None
361
 
        self.finished_reading = False
362
 
        self.unused_data = ''
363
 
        self.state_accept = self._state_accept_expecting_length
364
 
        self.state_read = self._state_read_no_data
365
 
        self._in_buffer = ''
366
 
        self._trailer_buffer = ''
367
 
    
368
 
    def accept_bytes(self, bytes):
369
 
        """Decode as much of bytes as possible.
370
 
 
371
 
        If 'bytes' contains too much data it will be appended to
372
 
        self.unused_data.
373
 
 
374
 
        finished_reading will be set when no more data is required.  Further
375
 
        data will be appended to self.unused_data.
376
 
        """
377
 
        # accept_bytes is allowed to change the state
378
 
        current_state = self.state_accept
379
 
        self.state_accept(bytes)
380
 
        while current_state != self.state_accept:
381
 
            current_state = self.state_accept
382
 
            self.state_accept('')
383
 
 
384
 
    def next_read_size(self):
385
 
        if self.bytes_left is not None:
386
 
            # Ideally we want to read all the remainder of the body and the
387
 
            # trailer in one go.
388
 
            return self.bytes_left + 5
389
 
        elif self.state_accept == self._state_accept_reading_trailer:
390
 
            # Just the trailer left
391
 
            return 5 - len(self._trailer_buffer)
392
 
        elif self.state_accept == self._state_accept_expecting_length:
393
 
            # There's still at least 6 bytes left ('\n' to end the length, plus
394
 
            # 'done\n').
395
 
            return 6
396
 
        else:
397
 
            # Reading excess data.  Either way, 1 byte at a time is fine.
398
 
            return 1
399
 
        
400
 
    def read_pending_data(self):
401
 
        """Return any pending data that has been decoded."""
402
 
        return self.state_read()
403
 
 
404
 
    def _state_accept_expecting_length(self, bytes):
405
 
        self._in_buffer += bytes
406
 
        pos = self._in_buffer.find('\n')
407
 
        if pos == -1:
408
 
            return
409
 
        self.bytes_left = int(self._in_buffer[:pos])
410
 
        self._in_buffer = self._in_buffer[pos+1:]
411
 
        self.bytes_left -= len(self._in_buffer)
412
 
        self.state_accept = self._state_accept_reading_body
413
 
        self.state_read = self._state_read_in_buffer
414
 
 
415
 
    def _state_accept_reading_body(self, bytes):
416
 
        self._in_buffer += bytes
417
 
        self.bytes_left -= len(bytes)
418
 
        if self.bytes_left <= 0:
419
 
            # Finished with body
420
 
            if self.bytes_left != 0:
421
 
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
422
 
                self._in_buffer = self._in_buffer[:self.bytes_left]
423
 
            self.bytes_left = None
424
 
            self.state_accept = self._state_accept_reading_trailer
425
 
        
426
 
    def _state_accept_reading_trailer(self, bytes):
427
 
        self._trailer_buffer += bytes
428
 
        # TODO: what if the trailer does not match "done\n"?  Should this raise
429
 
        # a ProtocolViolation exception?
430
 
        if self._trailer_buffer.startswith('done\n'):
431
 
            self.unused_data = self._trailer_buffer[len('done\n'):]
432
 
            self.state_accept = self._state_accept_reading_unused
433
 
            self.finished_reading = True
434
 
    
435
 
    def _state_accept_reading_unused(self, bytes):
436
 
        self.unused_data += bytes
437
 
 
438
 
    def _state_read_no_data(self):
439
 
        return ''
440
 
 
441
 
    def _state_read_in_buffer(self):
442
 
        result = self._in_buffer
443
 
        self._in_buffer = ''
444
 
        return result
445
 
 
446
 
 
447
 
class SmartServerStreamMedium(object):
448
 
    """Handles smart commands coming over a stream.
449
 
 
450
 
    The stream may be a pipe connected to sshd, or a tcp socket, or an
451
 
    in-process fifo for testing.
452
 
 
453
 
    One instance is created for each connected client; it can serve multiple
454
 
    requests in the lifetime of the connection.
455
 
 
456
 
    The server passes requests through to an underlying backing transport, 
457
 
    which will typically be a LocalTransport looking at the server's filesystem.
458
 
    """
459
 
 
460
 
    def __init__(self, backing_transport):
461
 
        """Construct new server.
462
 
 
463
 
        :param backing_transport: Transport for the directory served.
464
 
        """
465
 
        # backing_transport could be passed to serve instead of __init__
466
 
        self.backing_transport = backing_transport
467
 
        self.finished = False
468
 
 
469
 
    def serve(self):
470
 
        """Serve requests until the client disconnects."""
471
 
        # Keep a reference to stderr because the sys module's globals get set to
472
 
        # None during interpreter shutdown.
473
 
        from sys import stderr
474
 
        try:
475
 
            while not self.finished:
476
 
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
477
 
                                                         self._write_out)
478
 
                self._serve_one_request(protocol)
479
 
        except Exception, e:
480
 
            stderr.write("%s terminating on exception %s\n" % (self, e))
481
 
            raise
482
 
 
483
 
    def _serve_one_request(self, protocol):
484
 
        """Read one request from input, process, send back a response.
485
 
        
486
 
        :param protocol: a SmartServerRequestProtocol.
487
 
        """
488
 
        try:
489
 
            self._serve_one_request_unguarded(protocol)
490
 
        except KeyboardInterrupt:
491
 
            raise
492
 
        except Exception, e:
493
 
            self.terminate_due_to_error()
494
 
 
495
 
    def terminate_due_to_error(self):
496
 
        """Called when an unhandled exception from the protocol occurs."""
497
 
        raise NotImplementedError(self.terminate_due_to_error)
498
 
 
499
 
 
500
 
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
501
 
 
502
 
    def __init__(self, sock, backing_transport):
503
 
        """Constructor.
504
 
 
505
 
        :param sock: the socket the server will read from.  It will be put
506
 
            into blocking mode.
507
 
        """
508
 
        SmartServerStreamMedium.__init__(self, backing_transport)
509
 
        self.push_back = ''
510
 
        sock.setblocking(True)
511
 
        self.socket = sock
512
 
 
513
 
    def _serve_one_request_unguarded(self, protocol):
514
 
        while protocol.next_read_size():
515
 
            if self.push_back:
516
 
                protocol.accept_bytes(self.push_back)
517
 
                self.push_back = ''
518
 
            else:
519
 
                bytes = self.socket.recv(4096)
520
 
                if bytes == '':
521
 
                    self.finished = True
522
 
                    return
523
 
                protocol.accept_bytes(bytes)
524
 
        
525
 
        self.push_back = protocol.excess_buffer
526
 
    
527
 
    def terminate_due_to_error(self):
528
 
        """Called when an unhandled exception from the protocol occurs."""
529
 
        # TODO: This should log to a server log file, but no such thing
530
 
        # exists yet.  Andrew Bennetts 2006-09-29.
531
 
        self.socket.close()
532
 
        self.finished = True
533
 
 
534
 
    def _write_out(self, bytes):
535
 
        self.socket.sendall(bytes)
536
 
 
537
 
 
538
 
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
539
 
 
540
 
    def __init__(self, in_file, out_file, backing_transport):
541
 
        """Construct new server.
542
 
 
543
 
        :param in_file: Python file from which requests can be read.
544
 
        :param out_file: Python file to write responses.
545
 
        :param backing_transport: Transport for the directory served.
546
 
        """
547
 
        SmartServerStreamMedium.__init__(self, backing_transport)
548
 
        if sys.platform == 'win32':
549
 
            # force binary mode for files
550
 
            import msvcrt
551
 
            for f in (in_file, out_file):
552
 
                fileno = getattr(f, 'fileno', None)
553
 
                if fileno:
554
 
                    msvcrt.setmode(fileno(), os.O_BINARY)
555
 
        self._in = in_file
556
 
        self._out = out_file
557
 
 
558
 
    def _serve_one_request_unguarded(self, protocol):
559
 
        while True:
560
 
            bytes_to_read = protocol.next_read_size()
561
 
            if bytes_to_read == 0:
562
 
                # Finished serving this request.
563
 
                self._out.flush()
564
 
                return
565
 
            bytes = self._in.read(bytes_to_read)
566
 
            if bytes == '':
567
 
                # Connection has been closed.
568
 
                self.finished = True
569
 
                self._out.flush()
570
 
                return
571
 
            protocol.accept_bytes(bytes)
572
 
 
573
 
    def terminate_due_to_error(self):
574
 
        # TODO: This should log to a server log file, but no such thing
575
 
        # exists yet.  Andrew Bennetts 2006-09-29.
576
 
        self._out.close()
577
 
        self.finished = True
578
 
 
579
 
    def _write_out(self, bytes):
580
 
        self._out.write(bytes)
581
 
 
582
 
 
583
 
class SmartServerResponse(object):
584
 
    """Response generated by SmartServerRequestHandler."""
585
 
 
586
 
    def __init__(self, args, body=None):
587
 
        self.args = args
588
 
        self.body = body
589
 
 
590
 
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
591
 
# for delivering the data for a request. This could be done with as the
592
 
# StreamServer, though that would create conflation between request and response
593
 
# which may be undesirable.
594
 
 
595
 
 
596
 
class SmartServerRequestHandler(object):
597
 
    """Protocol logic for smart server.
598
 
    
599
 
    This doesn't handle serialization at all, it just processes requests and
600
 
    creates responses.
601
 
    """
602
 
 
603
 
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
604
 
    # not contain encoding or decoding logic to allow the wire protocol to vary
605
 
    # from the object protocol: we will want to tweak the wire protocol separate
606
 
    # from the object model, and ideally we will be able to do that without
607
 
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
608
 
    # just a Protocol subclass.
609
 
 
610
 
    # TODO: Better way of representing the body for commands that take it,
611
 
    # and allow it to be streamed into the server.
612
 
    
613
 
    def __init__(self, backing_transport):
614
 
        self._backing_transport = backing_transport
615
 
        self._converted_command = False
616
 
        self.finished_reading = False
617
 
        self._body_bytes = ''
618
 
        self.response = None
619
 
 
620
 
    def accept_body(self, bytes):
621
 
        """Accept body data.
622
 
 
623
 
        This should be overriden for each command that desired body data to
624
 
        handle the right format of that data. I.e. plain bytes, a bundle etc.
625
 
 
626
 
        The deserialisation into that format should be done in the Protocol
627
 
        object. Set self.desired_body_format to the format your method will
628
 
        handle.
629
 
        """
630
 
        # default fallback is to accumulate bytes.
631
 
        self._body_bytes += bytes
632
 
        
633
 
    def _end_of_body_handler(self):
634
 
        """An unimplemented end of body handler."""
635
 
        raise NotImplementedError(self._end_of_body_handler)
636
 
        
637
 
    def do_hello(self):
638
 
        """Answer a version request with my version."""
639
 
        return SmartServerResponse(('ok', '1'))
640
 
 
641
 
    def do_has(self, relpath):
642
 
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
643
 
        return SmartServerResponse((r,))
644
 
 
645
 
    def do_get(self, relpath):
646
 
        backing_bytes = self._backing_transport.get_bytes(relpath)
647
 
        return SmartServerResponse(('ok',), backing_bytes)
648
 
 
649
 
    def _deserialise_optional_mode(self, mode):
650
 
        # XXX: FIXME this should be on the protocol object.
651
 
        if mode == '':
652
 
            return None
653
 
        else:
654
 
            return int(mode)
655
 
 
656
 
    def do_append(self, relpath, mode):
657
 
        self._converted_command = True
658
 
        self._relpath = relpath
659
 
        self._mode = self._deserialise_optional_mode(mode)
660
 
        self._end_of_body_handler = self._handle_do_append_end
661
 
    
662
 
    def _handle_do_append_end(self):
663
 
        old_length = self._backing_transport.append_bytes(
664
 
            self._relpath, self._body_bytes, self._mode)
665
 
        self.response = SmartServerResponse(('appended', '%d' % old_length))
666
 
 
667
 
    def do_delete(self, relpath):
668
 
        self._backing_transport.delete(relpath)
669
 
 
670
 
    def do_iter_files_recursive(self, relpath):
671
 
        transport = self._backing_transport.clone(relpath)
672
 
        filenames = transport.iter_files_recursive()
673
 
        return SmartServerResponse(('names',) + tuple(filenames))
674
 
 
675
 
    def do_list_dir(self, relpath):
676
 
        filenames = self._backing_transport.list_dir(relpath)
677
 
        return SmartServerResponse(('names',) + tuple(filenames))
678
 
 
679
 
    def do_mkdir(self, relpath, mode):
680
 
        self._backing_transport.mkdir(relpath,
681
 
                                      self._deserialise_optional_mode(mode))
682
 
 
683
 
    def do_move(self, rel_from, rel_to):
684
 
        self._backing_transport.move(rel_from, rel_to)
685
 
 
686
 
    def do_put(self, relpath, mode):
687
 
        self._converted_command = True
688
 
        self._relpath = relpath
689
 
        self._mode = self._deserialise_optional_mode(mode)
690
 
        self._end_of_body_handler = self._handle_do_put
691
 
 
692
 
    def _handle_do_put(self):
693
 
        self._backing_transport.put_bytes(self._relpath,
694
 
                self._body_bytes, self._mode)
695
 
        self.response = SmartServerResponse(('ok',))
696
 
 
697
 
    def _deserialise_offsets(self, text):
698
 
        # XXX: FIXME this should be on the protocol object.
699
 
        offsets = []
700
 
        for line in text.split('\n'):
701
 
            if not line:
702
 
                continue
703
 
            start, length = line.split(',')
704
 
            offsets.append((int(start), int(length)))
705
 
        return offsets
706
 
 
707
 
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
708
 
        self._converted_command = True
709
 
        self._end_of_body_handler = self._handle_put_non_atomic
710
 
        self._relpath = relpath
711
 
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
712
 
        self._mode = self._deserialise_optional_mode(mode)
713
 
        # a boolean would be nicer XXX
714
 
        self._create_parent = (create_parent == 'T')
715
 
 
716
 
    def _handle_put_non_atomic(self):
717
 
        self._backing_transport.put_bytes_non_atomic(self._relpath,
718
 
                self._body_bytes,
719
 
                mode=self._mode,
720
 
                create_parent_dir=self._create_parent,
721
 
                dir_mode=self._dir_mode)
722
 
        self.response = SmartServerResponse(('ok',))
723
 
 
724
 
    def do_readv(self, relpath):
725
 
        self._converted_command = True
726
 
        self._end_of_body_handler = self._handle_readv_offsets
727
 
        self._relpath = relpath
728
 
 
729
 
    def end_of_body(self):
730
 
        """No more body data will be received."""
731
 
        self._run_handler_code(self._end_of_body_handler, (), {})
732
 
        # cannot read after this.
733
 
        self.finished_reading = True
734
 
 
735
 
    def _handle_readv_offsets(self):
736
 
        """accept offsets for a readv request."""
737
 
        offsets = self._deserialise_offsets(self._body_bytes)
738
 
        backing_bytes = ''.join(bytes for offset, bytes in
739
 
            self._backing_transport.readv(self._relpath, offsets))
740
 
        self.response = SmartServerResponse(('readv',), backing_bytes)
741
 
        
742
 
    def do_rename(self, rel_from, rel_to):
743
 
        self._backing_transport.rename(rel_from, rel_to)
744
 
 
745
 
    def do_rmdir(self, relpath):
746
 
        self._backing_transport.rmdir(relpath)
747
 
 
748
 
    def do_stat(self, relpath):
749
 
        stat = self._backing_transport.stat(relpath)
750
 
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
751
 
        
752
 
    def do_get_bundle(self, path, revision_id):
753
 
        # open transport relative to our base
754
 
        t = self._backing_transport.clone(path)
755
 
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
756
 
        repo = control.open_repository()
757
 
        tmpf = tempfile.TemporaryFile()
758
 
        base_revision = revision.NULL_REVISION
759
 
        write_bundle(repo, revision_id, base_revision, tmpf)
760
 
        tmpf.seek(0)
761
 
        return SmartServerResponse((), tmpf.read())
762
 
 
763
 
    def dispatch_command(self, cmd, args):
764
 
        """Deprecated compatibility method.""" # XXX XXX
765
 
        func = getattr(self, 'do_' + cmd, None)
766
 
        if func is None:
767
 
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
768
 
        self._run_handler_code(func, args, {})
769
 
 
770
 
    def _run_handler_code(self, callable, args, kwargs):
771
 
        """Run some handler specific code 'callable'.
772
 
 
773
 
        If a result is returned, it is considered to be the commands response,
774
 
        and finished_reading is set true, and its assigned to self.response.
775
 
 
776
 
        Any exceptions caught are translated and a response object created
777
 
        from them.
778
 
        """
779
 
        result = self._call_converting_errors(callable, args, kwargs)
780
 
        if result is not None:
781
 
            self.response = result
782
 
            self.finished_reading = True
783
 
        # handle unconverted commands
784
 
        if not self._converted_command:
785
 
            self.finished_reading = True
786
 
            if result is None:
787
 
                self.response = SmartServerResponse(('ok',))
788
 
 
789
 
    def _call_converting_errors(self, callable, args, kwargs):
790
 
        """Call callable converting errors to Response objects."""
791
 
        try:
792
 
            return callable(*args, **kwargs)
793
 
        except errors.NoSuchFile, e:
794
 
            return SmartServerResponse(('NoSuchFile', e.path))
795
 
        except errors.FileExists, e:
796
 
            return SmartServerResponse(('FileExists', e.path))
797
 
        except errors.DirectoryNotEmpty, e:
798
 
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
799
 
        except errors.ShortReadvError, e:
800
 
            return SmartServerResponse(('ShortReadvError',
801
 
                e.path, str(e.offset), str(e.length), str(e.actual)))
802
 
        except UnicodeError, e:
803
 
            # If it is a DecodeError, than most likely we are starting
804
 
            # with a plain string
805
 
            str_or_unicode = e.object
806
 
            if isinstance(str_or_unicode, unicode):
807
 
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
808
 
                # should escape it somehow.
809
 
                val = 'u:' + str_or_unicode.encode('utf-8')
810
 
            else:
811
 
                val = 's:' + str_or_unicode.encode('base64')
812
 
            # This handles UnicodeEncodeError or UnicodeDecodeError
813
 
            return SmartServerResponse((e.__class__.__name__,
814
 
                    e.encoding, val, str(e.start), str(e.end), e.reason))
815
 
        except errors.TransportNotPossible, e:
816
 
            if e.msg == "readonly transport":
817
 
                return SmartServerResponse(('ReadOnlyError', ))
818
 
            else:
819
 
                raise
820
 
 
821
 
 
822
 
class SmartTCPServer(object):
823
 
    """Listens on a TCP socket and accepts connections from smart clients"""
824
 
 
825
 
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
826
 
        """Construct a new server.
827
 
 
828
 
        To actually start it running, call either start_background_thread or
829
 
        serve.
830
 
 
831
 
        :param host: Name of the interface to listen on.
832
 
        :param port: TCP port to listen on, or 0 to allocate a transient port.
833
 
        """
834
 
        self._server_socket = socket.socket()
835
 
        self._server_socket.bind((host, port))
836
 
        self.port = self._server_socket.getsockname()[1]
837
 
        self._server_socket.listen(1)
838
 
        self._server_socket.settimeout(1)
839
 
        self.backing_transport = backing_transport
840
 
 
841
 
    def serve(self):
842
 
        # let connections timeout so that we get a chance to terminate
843
 
        # Keep a reference to the exceptions we want to catch because the socket
844
 
        # module's globals get set to None during interpreter shutdown.
845
 
        from socket import timeout as socket_timeout
846
 
        from socket import error as socket_error
847
 
        self._should_terminate = False
848
 
        while not self._should_terminate:
849
 
            try:
850
 
                self.accept_and_serve()
851
 
            except socket_timeout:
852
 
                # just check if we're asked to stop
853
 
                pass
854
 
            except socket_error, e:
855
 
                trace.warning("client disconnected: %s", e)
856
 
                pass
857
 
 
858
 
    def get_url(self):
859
 
        """Return the url of the server"""
860
 
        return "bzr://%s:%d/" % self._server_socket.getsockname()
861
 
 
862
 
    def accept_and_serve(self):
863
 
        conn, client_addr = self._server_socket.accept()
864
 
        # For WIN32, where the timeout value from the listening socket
865
 
        # propogates to the newly accepted socket.
866
 
        conn.setblocking(True)
867
 
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
868
 
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
869
 
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
870
 
        connection_thread.setDaemon(True)
871
 
        connection_thread.start()
872
 
 
873
 
    def start_background_thread(self):
874
 
        self._server_thread = threading.Thread(None,
875
 
                self.serve,
876
 
                name='server-' + self.get_url())
877
 
        self._server_thread.setDaemon(True)
878
 
        self._server_thread.start()
879
 
 
880
 
    def stop_background_thread(self):
881
 
        self._should_terminate = True
882
 
        # At one point we would wait to join the threads here, but it looks
883
 
        # like they don't actually exit.  So now we just leave them running
884
 
        # and expect to terminate the process. -- mbp 20070215
885
 
        # self._server_socket.close()
886
 
        ## sys.stderr.write("waiting for server thread to finish...")
887
 
        ## self._server_thread.join()
888
 
 
889
 
 
890
 
class SmartTCPServer_for_testing(SmartTCPServer):
891
 
    """Server suitable for use by transport tests.
892
 
    
893
 
    This server is backed by the process's cwd.
894
 
    """
895
 
 
896
 
    def __init__(self):
897
 
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
898
 
        # The server is set up by default like for ssh access: the client
899
 
        # passes filesystem-absolute paths; therefore the server must look
900
 
        # them up relative to the root directory.  it might be better to act
901
 
        # a public server and have the server rewrite paths into the test
902
 
        # directory.
903
 
        SmartTCPServer.__init__(self,
904
 
            transport.get_transport(urlutils.local_path_to_url('/')))
905
 
        
906
 
    def get_backing_transport(self, backing_transport_server):
907
 
        """Get a backing transport from a server we are decorating."""
908
 
        return transport.get_transport(backing_transport_server.get_url())
909
 
 
910
 
    def setUp(self, backing_transport_server=None):
911
 
        """Set up server for testing"""
912
 
        from bzrlib.transport.chroot import TestingChrootServer
913
 
        if backing_transport_server is None:
914
 
            from bzrlib.transport.local import LocalURLServer
915
 
            backing_transport_server = LocalURLServer()
916
 
        self.chroot_server = TestingChrootServer()
917
 
        self.chroot_server.setUp(backing_transport_server)
918
 
        self.backing_transport = transport.get_transport(
919
 
            self.chroot_server.get_url())
920
 
        self.start_background_thread()
921
 
 
922
 
    def tearDown(self):
923
 
        self.stop_background_thread()
924
 
 
925
 
    def get_bogus_url(self):
926
 
        """Return a URL which will fail to connect"""
927
 
        return 'bzr://127.0.0.1:1/'
928
 
 
929
 
 
930
38
class SmartStat(object):
931
39
 
932
40
    def __init__(self, size, mode):
1290
398
            self._translate_error(resp)
1291
399
 
1292
400
 
1293
 
class SmartClientMediumRequest(object):
1294
 
    """A request on a SmartClientMedium.
1295
 
 
1296
 
    Each request allows bytes to be provided to it via accept_bytes, and then
1297
 
    the response bytes to be read via read_bytes.
1298
 
 
1299
 
    For instance:
1300
 
    request.accept_bytes('123')
1301
 
    request.finished_writing()
1302
 
    result = request.read_bytes(3)
1303
 
    request.finished_reading()
1304
 
 
1305
 
    It is up to the individual SmartClientMedium whether multiple concurrent
1306
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
1307
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
1308
 
    details on concurrency and pipelining.
1309
 
    """
1310
 
 
1311
 
    def __init__(self, medium):
1312
 
        """Construct a SmartClientMediumRequest for the medium medium."""
1313
 
        self._medium = medium
1314
 
        # we track state by constants - we may want to use the same
1315
 
        # pattern as BodyReader if it gets more complex.
1316
 
        # valid states are: "writing", "reading", "done"
1317
 
        self._state = "writing"
1318
 
 
1319
 
    def accept_bytes(self, bytes):
1320
 
        """Accept bytes for inclusion in this request.
1321
 
 
1322
 
        This method may not be be called after finished_writing() has been
1323
 
        called.  It depends upon the Medium whether or not the bytes will be
1324
 
        immediately transmitted. Message based Mediums will tend to buffer the
1325
 
        bytes until finished_writing() is called.
1326
 
 
1327
 
        :param bytes: A bytestring.
1328
 
        """
1329
 
        if self._state != "writing":
1330
 
            raise errors.WritingCompleted(self)
1331
 
        self._accept_bytes(bytes)
1332
 
 
1333
 
    def _accept_bytes(self, bytes):
1334
 
        """Helper for accept_bytes.
1335
 
 
1336
 
        Accept_bytes checks the state of the request to determing if bytes
1337
 
        should be accepted. After that it hands off to _accept_bytes to do the
1338
 
        actual acceptance.
1339
 
        """
1340
 
        raise NotImplementedError(self._accept_bytes)
1341
 
 
1342
 
    def finished_reading(self):
1343
 
        """Inform the request that all desired data has been read.
1344
 
 
1345
 
        This will remove the request from the pipeline for its medium (if the
1346
 
        medium supports pipelining) and any further calls to methods on the
1347
 
        request will raise ReadingCompleted.
1348
 
        """
1349
 
        if self._state == "writing":
1350
 
            raise errors.WritingNotComplete(self)
1351
 
        if self._state != "reading":
1352
 
            raise errors.ReadingCompleted(self)
1353
 
        self._state = "done"
1354
 
        self._finished_reading()
1355
 
 
1356
 
    def _finished_reading(self):
1357
 
        """Helper for finished_reading.
1358
 
 
1359
 
        finished_reading checks the state of the request to determine if 
1360
 
        finished_reading is allowed, and if it is hands off to _finished_reading
1361
 
        to perform the action.
1362
 
        """
1363
 
        raise NotImplementedError(self._finished_reading)
1364
 
 
1365
 
    def finished_writing(self):
1366
 
        """Finish the writing phase of this request.
1367
 
 
1368
 
        This will flush all pending data for this request along the medium.
1369
 
        After calling finished_writing, you may not call accept_bytes anymore.
1370
 
        """
1371
 
        if self._state != "writing":
1372
 
            raise errors.WritingCompleted(self)
1373
 
        self._state = "reading"
1374
 
        self._finished_writing()
1375
 
 
1376
 
    def _finished_writing(self):
1377
 
        """Helper for finished_writing.
1378
 
 
1379
 
        finished_writing checks the state of the request to determine if 
1380
 
        finished_writing is allowed, and if it is hands off to _finished_writing
1381
 
        to perform the action.
1382
 
        """
1383
 
        raise NotImplementedError(self._finished_writing)
1384
 
 
1385
 
    def read_bytes(self, count):
1386
 
        """Read bytes from this requests response.
1387
 
 
1388
 
        This method will block and wait for count bytes to be read. It may not
1389
 
        be invoked until finished_writing() has been called - this is to ensure
1390
 
        a message-based approach to requests, for compatability with message
1391
 
        based mediums like HTTP.
1392
 
        """
1393
 
        if self._state == "writing":
1394
 
            raise errors.WritingNotComplete(self)
1395
 
        if self._state != "reading":
1396
 
            raise errors.ReadingCompleted(self)
1397
 
        return self._read_bytes(count)
1398
 
 
1399
 
    def _read_bytes(self, count):
1400
 
        """Helper for read_bytes.
1401
 
 
1402
 
        read_bytes checks the state of the request to determing if bytes
1403
 
        should be read. After that it hands off to _read_bytes to do the
1404
 
        actual read.
1405
 
        """
1406
 
        raise NotImplementedError(self._read_bytes)
1407
 
 
1408
 
 
1409
 
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1410
 
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1411
 
 
1412
 
    def __init__(self, medium):
1413
 
        SmartClientMediumRequest.__init__(self, medium)
1414
 
        # check that we are safe concurrency wise. If some streams start
1415
 
        # allowing concurrent requests - i.e. via multiplexing - then this
1416
 
        # assert should be moved to SmartClientStreamMedium.get_request,
1417
 
        # and the setting/unsetting of _current_request likewise moved into
1418
 
        # that class : but its unneeded overhead for now. RBC 20060922
1419
 
        if self._medium._current_request is not None:
1420
 
            raise errors.TooManyConcurrentRequests(self._medium)
1421
 
        self._medium._current_request = self
1422
 
 
1423
 
    def _accept_bytes(self, bytes):
1424
 
        """See SmartClientMediumRequest._accept_bytes.
1425
 
        
1426
 
        This forwards to self._medium._accept_bytes because we are operating
1427
 
        on the mediums stream.
1428
 
        """
1429
 
        self._medium._accept_bytes(bytes)
1430
 
 
1431
 
    def _finished_reading(self):
1432
 
        """See SmartClientMediumRequest._finished_reading.
1433
 
 
1434
 
        This clears the _current_request on self._medium to allow a new 
1435
 
        request to be created.
1436
 
        """
1437
 
        assert self._medium._current_request is self
1438
 
        self._medium._current_request = None
1439
 
        
1440
 
    def _finished_writing(self):
1441
 
        """See SmartClientMediumRequest._finished_writing.
1442
 
 
1443
 
        This invokes self._medium._flush to ensure all bytes are transmitted.
1444
 
        """
1445
 
        self._medium._flush()
1446
 
 
1447
 
    def _read_bytes(self, count):
1448
 
        """See SmartClientMediumRequest._read_bytes.
1449
 
        
1450
 
        This forwards to self._medium._read_bytes because we are operating
1451
 
        on the mediums stream.
1452
 
        """
1453
 
        return self._medium._read_bytes(count)
1454
 
 
1455
 
 
1456
 
class SmartClientRequestProtocolOne(SmartProtocolBase):
1457
 
    """The client-side protocol for smart version 1."""
1458
 
 
1459
 
    def __init__(self, request):
1460
 
        """Construct a SmartClientRequestProtocolOne.
1461
 
 
1462
 
        :param request: A SmartClientMediumRequest to serialise onto and
1463
 
            deserialise from.
1464
 
        """
1465
 
        self._request = request
1466
 
        self._body_buffer = None
1467
 
 
1468
 
    def call(self, *args):
1469
 
        bytes = _encode_tuple(args)
1470
 
        self._request.accept_bytes(bytes)
1471
 
        self._request.finished_writing()
1472
 
 
1473
 
    def call_with_body_bytes(self, args, body):
1474
 
        """Make a remote call of args with body bytes 'body'.
1475
 
 
1476
 
        After calling this, call read_response_tuple to find the result out.
1477
 
        """
1478
 
        bytes = _encode_tuple(args)
1479
 
        self._request.accept_bytes(bytes)
1480
 
        bytes = self._encode_bulk_data(body)
1481
 
        self._request.accept_bytes(bytes)
1482
 
        self._request.finished_writing()
1483
 
 
1484
 
    def call_with_body_readv_array(self, args, body):
1485
 
        """Make a remote call with a readv array.
1486
 
 
1487
 
        The body is encoded with one line per readv offset pair. The numbers in
1488
 
        each pair are separated by a comma, and no trailing \n is emitted.
1489
 
        """
1490
 
        bytes = _encode_tuple(args)
1491
 
        self._request.accept_bytes(bytes)
1492
 
        readv_bytes = self._serialise_offsets(body)
1493
 
        bytes = self._encode_bulk_data(readv_bytes)
1494
 
        self._request.accept_bytes(bytes)
1495
 
        self._request.finished_writing()
1496
 
 
1497
 
    def cancel_read_body(self):
1498
 
        """After expecting a body, a response code may indicate one otherwise.
1499
 
 
1500
 
        This method lets the domain client inform the protocol that no body
1501
 
        will be transmitted. This is a terminal method: after calling it the
1502
 
        protocol is not able to be used further.
1503
 
        """
1504
 
        self._request.finished_reading()
1505
 
 
1506
 
    def read_response_tuple(self, expect_body=False):
1507
 
        """Read a response tuple from the wire.
1508
 
 
1509
 
        This should only be called once.
1510
 
        """
1511
 
        result = self._recv_tuple()
1512
 
        if not expect_body:
1513
 
            self._request.finished_reading()
1514
 
        return result
1515
 
 
1516
 
    def read_body_bytes(self, count=-1):
1517
 
        """Read bytes from the body, decoding into a byte stream.
1518
 
        
1519
 
        We read all bytes at once to ensure we've checked the trailer for 
1520
 
        errors, and then feed the buffer back as read_body_bytes is called.
1521
 
        """
1522
 
        if self._body_buffer is not None:
1523
 
            return self._body_buffer.read(count)
1524
 
        _body_decoder = LengthPrefixedBodyDecoder()
1525
 
 
1526
 
        while not _body_decoder.finished_reading:
1527
 
            bytes_wanted = _body_decoder.next_read_size()
1528
 
            bytes = self._request.read_bytes(bytes_wanted)
1529
 
            _body_decoder.accept_bytes(bytes)
1530
 
        self._request.finished_reading()
1531
 
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
1532
 
        # XXX: TODO check the trailer result.
1533
 
        return self._body_buffer.read(count)
1534
 
 
1535
 
    def _recv_tuple(self):
1536
 
        """Receive a tuple from the medium request."""
1537
 
        line = ''
1538
 
        while not line or line[-1] != '\n':
1539
 
            # TODO: this is inefficient - but tuples are short.
1540
 
            new_char = self._request.read_bytes(1)
1541
 
            line += new_char
1542
 
            assert new_char != '', "end of file reading from server."
1543
 
        return _decode_tuple(line)
1544
 
 
1545
 
    def query_version(self):
1546
 
        """Return protocol version number of the server."""
1547
 
        self.call('hello')
1548
 
        resp = self.read_response_tuple()
1549
 
        if resp == ('ok', '1'):
1550
 
            return 1
1551
 
        else:
1552
 
            raise errors.SmartProtocolError("bad response %r" % (resp,))
1553
 
 
1554
 
 
1555
 
class SmartClientMedium(object):
1556
 
    """Smart client is a medium for sending smart protocol requests over."""
1557
 
 
1558
 
    def disconnect(self):
1559
 
        """If this medium maintains a persistent connection, close it.
1560
 
        
1561
 
        The default implementation does nothing.
1562
 
        """
1563
 
        
1564
 
 
1565
 
class SmartClientStreamMedium(SmartClientMedium):
1566
 
    """Stream based medium common class.
1567
 
 
1568
 
    SmartClientStreamMediums operate on a stream. All subclasses use a common
1569
 
    SmartClientStreamMediumRequest for their requests, and should implement
1570
 
    _accept_bytes and _read_bytes to allow the request objects to send and
1571
 
    receive bytes.
1572
 
    """
1573
 
 
1574
 
    def __init__(self):
1575
 
        self._current_request = None
1576
 
 
1577
 
    def accept_bytes(self, bytes):
1578
 
        self._accept_bytes(bytes)
1579
 
 
1580
 
    def __del__(self):
1581
 
        """The SmartClientStreamMedium knows how to close the stream when it is
1582
 
        finished with it.
1583
 
        """
1584
 
        self.disconnect()
1585
 
 
1586
 
    def _flush(self):
1587
 
        """Flush the output stream.
1588
 
        
1589
 
        This method is used by the SmartClientStreamMediumRequest to ensure that
1590
 
        all data for a request is sent, to avoid long timeouts or deadlocks.
1591
 
        """
1592
 
        raise NotImplementedError(self._flush)
1593
 
 
1594
 
    def get_request(self):
1595
 
        """See SmartClientMedium.get_request().
1596
 
 
1597
 
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1598
 
        for get_request.
1599
 
        """
1600
 
        return SmartClientStreamMediumRequest(self)
1601
 
 
1602
 
    def read_bytes(self, count):
1603
 
        return self._read_bytes(count)
1604
 
 
1605
 
 
1606
 
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1607
 
    """A client medium using simple pipes.
1608
 
    
1609
 
    This client does not manage the pipes: it assumes they will always be open.
1610
 
    """
1611
 
 
1612
 
    def __init__(self, readable_pipe, writeable_pipe):
1613
 
        SmartClientStreamMedium.__init__(self)
1614
 
        self._readable_pipe = readable_pipe
1615
 
        self._writeable_pipe = writeable_pipe
1616
 
 
1617
 
    def _accept_bytes(self, bytes):
1618
 
        """See SmartClientStreamMedium.accept_bytes."""
1619
 
        self._writeable_pipe.write(bytes)
1620
 
 
1621
 
    def _flush(self):
1622
 
        """See SmartClientStreamMedium._flush()."""
1623
 
        self._writeable_pipe.flush()
1624
 
 
1625
 
    def _read_bytes(self, count):
1626
 
        """See SmartClientStreamMedium._read_bytes."""
1627
 
        return self._readable_pipe.read(count)
1628
 
 
1629
 
 
1630
 
class SmartSSHClientMedium(SmartClientStreamMedium):
1631
 
    """A client medium using SSH."""
1632
 
    
1633
 
    def __init__(self, host, port=None, username=None, password=None,
1634
 
            vendor=None):
1635
 
        """Creates a client that will connect on the first use.
1636
 
        
1637
 
        :param vendor: An optional override for the ssh vendor to use. See
1638
 
            bzrlib.transport.ssh for details on ssh vendors.
1639
 
        """
1640
 
        SmartClientStreamMedium.__init__(self)
1641
 
        self._connected = False
1642
 
        self._host = host
1643
 
        self._password = password
1644
 
        self._port = port
1645
 
        self._username = username
1646
 
        self._read_from = None
1647
 
        self._ssh_connection = None
1648
 
        self._vendor = vendor
1649
 
        self._write_to = None
1650
 
 
1651
 
    def _accept_bytes(self, bytes):
1652
 
        """See SmartClientStreamMedium.accept_bytes."""
1653
 
        self._ensure_connection()
1654
 
        self._write_to.write(bytes)
1655
 
 
1656
 
    def disconnect(self):
1657
 
        """See SmartClientMedium.disconnect()."""
1658
 
        if not self._connected:
1659
 
            return
1660
 
        self._read_from.close()
1661
 
        self._write_to.close()
1662
 
        self._ssh_connection.close()
1663
 
        self._connected = False
1664
 
 
1665
 
    def _ensure_connection(self):
1666
 
        """Connect this medium if not already connected."""
1667
 
        if self._connected:
1668
 
            return
1669
 
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1670
 
        if self._vendor is None:
1671
 
            vendor = ssh._get_ssh_vendor()
1672
 
        else:
1673
 
            vendor = self._vendor
1674
 
        self._ssh_connection = vendor.connect_ssh(self._username,
1675
 
                self._password, self._host, self._port,
1676
 
                command=[executable, 'serve', '--inet', '--directory=/',
1677
 
                         '--allow-writes'])
1678
 
        self._read_from, self._write_to = \
1679
 
            self._ssh_connection.get_filelike_channels()
1680
 
        self._connected = True
1681
 
 
1682
 
    def _flush(self):
1683
 
        """See SmartClientStreamMedium._flush()."""
1684
 
        self._write_to.flush()
1685
 
 
1686
 
    def _read_bytes(self, count):
1687
 
        """See SmartClientStreamMedium.read_bytes."""
1688
 
        if not self._connected:
1689
 
            raise errors.MediumNotConnected(self)
1690
 
        return self._read_from.read(count)
1691
 
 
1692
 
 
1693
 
class SmartTCPClientMedium(SmartClientStreamMedium):
1694
 
    """A client medium using TCP."""
1695
 
    
1696
 
    def __init__(self, host, port):
1697
 
        """Creates a client that will connect on the first use."""
1698
 
        SmartClientStreamMedium.__init__(self)
1699
 
        self._connected = False
1700
 
        self._host = host
1701
 
        self._port = port
1702
 
        self._socket = None
1703
 
 
1704
 
    def _accept_bytes(self, bytes):
1705
 
        """See SmartClientMedium.accept_bytes."""
1706
 
        self._ensure_connection()
1707
 
        self._socket.sendall(bytes)
1708
 
 
1709
 
    def disconnect(self):
1710
 
        """See SmartClientMedium.disconnect()."""
1711
 
        if not self._connected:
1712
 
            return
1713
 
        self._socket.close()
1714
 
        self._socket = None
1715
 
        self._connected = False
1716
 
 
1717
 
    def _ensure_connection(self):
1718
 
        """Connect this medium if not already connected."""
1719
 
        if self._connected:
1720
 
            return
1721
 
        self._socket = socket.socket()
1722
 
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1723
 
        result = self._socket.connect_ex((self._host, int(self._port)))
1724
 
        if result:
1725
 
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1726
 
                    (self._host, self._port, os.strerror(result)))
1727
 
        self._connected = True
1728
 
 
1729
 
    def _flush(self):
1730
 
        """See SmartClientStreamMedium._flush().
1731
 
        
1732
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
1733
 
        add a means to do a flush, but that can be done in the future.
1734
 
        """
1735
 
 
1736
 
    def _read_bytes(self, count):
1737
 
        """See SmartClientMedium.read_bytes."""
1738
 
        if not self._connected:
1739
 
            raise errors.MediumNotConnected(self)
1740
 
        return self._socket.recv(count)
1741
 
 
1742
401
 
1743
402
class SmartTCPTransport(SmartTransport):
1744
403
    """Connection to smart server over plain tcp.
1838
497
 
1839
498
def get_test_permutations():
1840
499
    """Return (transport, server) permutations for testing."""
 
500
    from bzrlib.smart import server
1841
501
    ### We may need a little more test framework support to construct an
1842
502
    ### appropriate RemoteTransport in the future.
1843
 
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]
 
503
    return [(SmartTCPTransport, server.SmartTCPServer_for_testing)]