~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/smart.py

  • Committer: John Arbash Meinel
  • Date: 2007-03-15 22:35:35 UTC
  • mto: This revision was merged to the branch mainline in revision 2363.
  • Revision ID: john@arbash-meinel.com-20070315223535-d3d4964oe1hc8zhg
Add an overzealous test, for Unicode support of _iter_changes.
For both knowns and unknowns.
And include a basic, if suboptimal, fix.
I would rather defer the decoding until we've determined that we are going to return the tuple.
There is still something broken with added files, but I'll get to that.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Smart-server protocol, client and server.
 
18
 
 
19
Requests are sent as a command and list of arguments, followed by optional
 
20
bulk body data.  Responses are similarly a response and list of arguments,
 
21
followed by bulk body data. ::
 
22
 
 
23
  SEP := '\001'
 
24
    Fields are separated by Ctrl-A.
 
25
  BULK_DATA := CHUNK TRAILER
 
26
    Chunks can be repeated as many times as necessary.
 
27
  CHUNK := CHUNK_LEN CHUNK_BODY
 
28
  CHUNK_LEN := DIGIT+ NEWLINE
 
29
    Gives the number of bytes in the following chunk.
 
30
  CHUNK_BODY := BYTE[chunk_len]
 
31
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
 
32
  SUCCESS_TRAILER := 'done' NEWLINE
 
33
  ERROR_TRAILER := 
 
34
 
 
35
Paths are passed across the network.  The client needs to see a namespace that
 
36
includes any repository that might need to be referenced, and the client needs
 
37
to know about a root directory beyond which it cannot ascend.
 
38
 
 
39
Servers run over ssh will typically want to be able to access any path the user 
 
40
can access.  Public servers on the other hand (which might be over http, ssh
 
41
or tcp) will typically want to restrict access to only a particular directory 
 
42
and its children, so will want to do a software virtual root at that level.
 
43
In other words they'll want to rewrite incoming paths to be under that level
 
44
(and prevent escaping using ../ tricks.)
 
45
 
 
46
URLs that include ~ should probably be passed across to the server verbatim
 
47
and the server can expand them.  This will proably not be meaningful when 
 
48
limited to a directory?
 
49
 
 
50
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
 
51
that you have multiple requests and get a read error because the other side did
 
52
shutdown.  For pipes we have read pipe which will have a zero read which marks
 
53
end-of-file.  For HTTP server environment there is not end-of-stream because
 
54
each request coming into the server is independent.
 
55
 
 
56
So we need a wrapper around pipes and sockets to seperate out requests from
 
57
substrate and this will give us a single model which is consist for HTTP,
 
58
sockets and pipes.
 
59
 
 
60
Server-side
 
61
-----------
 
62
 
 
63
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
 
64
          uses protocol to detect end-of-request, sends written
 
65
          bytes to client) e.g. socket, pipe, HTTP request handler.
 
66
  ^
 
67
  | bytes.
 
68
  v
 
69
 
 
70
PROTOCOL  (serialization, deserialization)  accepts bytes for one
 
71
          request, decodes according to internal state, pushes
 
72
          structured data to handler.  accepts structured data from
 
73
          handler and encodes and writes to the medium.  factory for
 
74
          handler.
 
75
  ^
 
76
  | structured data
 
77
  v
 
78
 
 
79
HANDLER   (domain logic) accepts structured data, operates state
 
80
          machine until the request can be satisfied,
 
81
          sends structured data to the protocol.
 
82
 
 
83
 
 
84
Client-side
 
85
-----------
 
86
 
 
87
 CLIENT             domain logic, accepts domain requests, generated structured
 
88
                    data, reads structured data from responses and turns into
 
89
                    domain data.  Sends structured data to the protocol.
 
90
                    Operates state machines until the request can be delivered
 
91
                    (e.g. reading from a bundle generated in bzrlib to deliver a
 
92
                    complete request).
 
93
 
 
94
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
 
95
                    ...
 
96
  ^
 
97
  | structured data
 
98
  v
 
99
 
 
100
PROTOCOL  (serialization, deserialization)  accepts structured data for one
 
101
          request, encodes and writes to the medium.  Reads bytes from the
 
102
          medium, decodes and allows the client to read structured data.
 
103
  ^
 
104
  | bytes.
 
105
  v
 
106
 
 
107
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
 
108
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
109
"""
 
110
 
 
111
 
 
112
# TODO: _translate_error should be on the client, not the transport because
 
113
#     error coding is wire protocol specific.
 
114
 
 
115
# TODO: A plain integer from query_version is too simple; should give some
 
116
# capabilities too?
 
117
 
 
118
# TODO: Server should probably catch exceptions within itself and send them
 
119
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
 
120
# Also needs to somehow report protocol errors like bad requests.  Need to
 
121
# consider how we'll handle error reporting, e.g. if we get halfway through a
 
122
# bulk transfer and then something goes wrong.
 
123
 
 
124
# TODO: Standard marker at start of request/response lines?
 
125
 
 
126
# TODO: Make each request and response self-validatable, e.g. with checksums.
 
127
#
 
128
# TODO: get/put objects could be changed to gradually read back the data as it
 
129
# comes across the network
 
130
#
 
131
# TODO: What should the server do if it hits an error and has to terminate?
 
132
#
 
133
# TODO: is it useful to allow multiple chunks in the bulk data?
 
134
#
 
135
# TODO: If we get an exception during transmission of bulk data we can't just
 
136
# emit the exception because it won't be seen.
 
137
#   John proposes:  I think it would be worthwhile to have a header on each
 
138
#   chunk, that indicates it is another chunk. Then you can send an 'error'
 
139
#   chunk as long as you finish the previous chunk.
 
140
#
 
141
# TODO: Clone method on Transport; should work up towards parent directory;
 
142
# unclear how this should be stored or communicated to the server... maybe
 
143
# just pass it on all relevant requests?
 
144
#
 
145
# TODO: Better name than clone() for changing between directories.  How about
 
146
# open_dir or change_dir or chdir?
 
147
#
 
148
# TODO: Is it really good to have the notion of current directory within the
 
149
# connection?  Perhaps all Transports should factor out a common connection
 
150
# from the thing that has the directory context?
 
151
#
 
152
# TODO: Pull more things common to sftp and ssh to a higher level.
 
153
#
 
154
# TODO: The server that manages a connection should be quite small and retain
 
155
# minimum state because each of the requests are supposed to be stateless.
 
156
# Then we can write another implementation that maps to http.
 
157
#
 
158
# TODO: What to do when a client connection is garbage collected?  Maybe just
 
159
# abruptly drop the connection?
 
160
#
 
161
# TODO: Server in some cases will need to restrict access to files outside of
 
162
# a particular root directory.  LocalTransport doesn't do anything to stop you
 
163
# ascending above the base directory, so we need to prevent paths
 
164
# containing '..' in either the server or transport layers.  (Also need to
 
165
# consider what happens if someone creates a symlink pointing outside the 
 
166
# directory tree...)
 
167
#
 
168
# TODO: Server should rebase absolute paths coming across the network to put
 
169
# them under the virtual root, if one is in use.  LocalTransport currently
 
170
# doesn't do that; if you give it an absolute path it just uses it.
 
171
 
172
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
 
173
# urlescape them instead.  Indeed possibly this should just literally be
 
174
# http-over-ssh.
 
175
#
 
176
# FIXME: This transport, with several others, has imperfect handling of paths
 
177
# within urls.  It'd probably be better for ".." from a root to raise an error
 
178
# rather than return the same directory as we do at present.
 
179
#
 
180
# TODO: Rather than working at the Transport layer we want a Branch,
 
181
# Repository or BzrDir objects that talk to a server.
 
182
#
 
183
# TODO: Probably want some way for server commands to gradually produce body
 
184
# data rather than passing it as a string; they could perhaps pass an
 
185
# iterator-like callback that will gradually yield data; it probably needs a
 
186
# close() method that will always be closed to do any necessary cleanup.
 
187
#
 
188
# TODO: Split the actual smart server from the ssh encoding of it.
 
189
#
 
190
# TODO: Perhaps support file-level readwrite operations over the transport
 
191
# too.
 
192
#
 
193
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
 
194
# branch doing file-level operations.
 
195
#
 
196
 
 
197
from cStringIO import StringIO
 
198
import os
 
199
import socket
 
200
import sys
 
201
import tempfile
 
202
import threading
 
203
import urllib
 
204
import urlparse
 
205
 
 
206
from bzrlib import (
 
207
    bzrdir,
 
208
    errors,
 
209
    revision,
 
210
    transport,
 
211
    trace,
 
212
    urlutils,
 
213
    )
 
214
from bzrlib.bundle.serializer import write_bundle
 
215
try:
 
216
    from bzrlib.transport import ssh
 
217
except errors.ParamikoNotPresent:
 
218
    # no paramiko.  SmartSSHClientMedium will break.
 
219
    pass
 
220
 
 
221
# must do this otherwise urllib can't parse the urls properly :(
 
222
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh', 'bzr+http']:
 
223
    transport.register_urlparse_netloc_protocol(scheme)
 
224
del scheme
 
225
 
 
226
 
 
227
# Port 4155 is the default port for bzr://, registered with IANA.
 
228
BZR_DEFAULT_PORT = 4155
 
229
 
 
230
 
 
231
def _recv_tuple(from_file):
 
232
    req_line = from_file.readline()
 
233
    return _decode_tuple(req_line)
 
234
 
 
235
 
 
236
def _decode_tuple(req_line):
 
237
    if req_line == None or req_line == '':
 
238
        return None
 
239
    if req_line[-1] != '\n':
 
240
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
 
241
    return tuple(req_line[:-1].split('\x01'))
 
242
 
 
243
 
 
244
def _encode_tuple(args):
 
245
    """Encode the tuple args to a bytestream."""
 
246
    return '\x01'.join(args) + '\n'
 
247
 
 
248
 
 
249
class SmartProtocolBase(object):
 
250
    """Methods common to client and server"""
 
251
 
 
252
    # TODO: this only actually accomodates a single block; possibly should
 
253
    # support multiple chunks?
 
254
    def _encode_bulk_data(self, body):
 
255
        """Encode body as a bulk data chunk."""
 
256
        return ''.join(('%d\n' % len(body), body, 'done\n'))
 
257
 
 
258
    def _serialise_offsets(self, offsets):
 
259
        """Serialise a readv offset list."""
 
260
        txt = []
 
261
        for start, length in offsets:
 
262
            txt.append('%d,%d' % (start, length))
 
263
        return '\n'.join(txt)
 
264
        
 
265
 
 
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
 
267
    """Server-side encoding and decoding logic for smart version 1."""
 
268
    
 
269
    def __init__(self, backing_transport, write_func):
 
270
        self._backing_transport = backing_transport
 
271
        self.excess_buffer = ''
 
272
        self._finished = False
 
273
        self.in_buffer = ''
 
274
        self.has_dispatched = False
 
275
        self.request = None
 
276
        self._body_decoder = None
 
277
        self._write_func = write_func
 
278
 
 
279
    def accept_bytes(self, bytes):
 
280
        """Take bytes, and advance the internal state machine appropriately.
 
281
        
 
282
        :param bytes: must be a byte string
 
283
        """
 
284
        assert isinstance(bytes, str)
 
285
        self.in_buffer += bytes
 
286
        if not self.has_dispatched:
 
287
            if '\n' not in self.in_buffer:
 
288
                # no command line yet
 
289
                return
 
290
            self.has_dispatched = True
 
291
            try:
 
292
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
 
293
                first_line += '\n'
 
294
                req_args = _decode_tuple(first_line)
 
295
                self.request = SmartServerRequestHandler(
 
296
                    self._backing_transport)
 
297
                self.request.dispatch_command(req_args[0], req_args[1:])
 
298
                if self.request.finished_reading:
 
299
                    # trivial request
 
300
                    self.excess_buffer = self.in_buffer
 
301
                    self.in_buffer = ''
 
302
                    self._send_response(self.request.response.args,
 
303
                        self.request.response.body)
 
304
            except KeyboardInterrupt:
 
305
                raise
 
306
            except Exception, exception:
 
307
                # everything else: pass to client, flush, and quit
 
308
                self._send_response(('error', str(exception)))
 
309
                return
 
310
 
 
311
        if self.has_dispatched:
 
312
            if self._finished:
 
313
                # nothing to do.XXX: this routine should be a single state 
 
314
                # machine too.
 
315
                self.excess_buffer += self.in_buffer
 
316
                self.in_buffer = ''
 
317
                return
 
318
            if self._body_decoder is None:
 
319
                self._body_decoder = LengthPrefixedBodyDecoder()
 
320
            self._body_decoder.accept_bytes(self.in_buffer)
 
321
            self.in_buffer = self._body_decoder.unused_data
 
322
            body_data = self._body_decoder.read_pending_data()
 
323
            self.request.accept_body(body_data)
 
324
            if self._body_decoder.finished_reading:
 
325
                self.request.end_of_body()
 
326
                assert self.request.finished_reading, \
 
327
                    "no more body, request not finished"
 
328
            if self.request.response is not None:
 
329
                self._send_response(self.request.response.args,
 
330
                    self.request.response.body)
 
331
                self.excess_buffer = self.in_buffer
 
332
                self.in_buffer = ''
 
333
            else:
 
334
                assert not self.request.finished_reading, \
 
335
                    "no response and we have finished reading."
 
336
 
 
337
    def _send_response(self, args, body=None):
 
338
        """Send a smart server response down the output stream."""
 
339
        assert not self._finished, 'response already sent'
 
340
        self._finished = True
 
341
        self._write_func(_encode_tuple(args))
 
342
        if body is not None:
 
343
            assert isinstance(body, str), 'body must be a str'
 
344
            bytes = self._encode_bulk_data(body)
 
345
            self._write_func(bytes)
 
346
 
 
347
    def next_read_size(self):
 
348
        if self._finished:
 
349
            return 0
 
350
        if self._body_decoder is None:
 
351
            return 1
 
352
        else:
 
353
            return self._body_decoder.next_read_size()
 
354
 
 
355
 
 
356
class LengthPrefixedBodyDecoder(object):
 
357
    """Decodes the length-prefixed bulk data."""
 
358
    
 
359
    def __init__(self):
 
360
        self.bytes_left = None
 
361
        self.finished_reading = False
 
362
        self.unused_data = ''
 
363
        self.state_accept = self._state_accept_expecting_length
 
364
        self.state_read = self._state_read_no_data
 
365
        self._in_buffer = ''
 
366
        self._trailer_buffer = ''
 
367
    
 
368
    def accept_bytes(self, bytes):
 
369
        """Decode as much of bytes as possible.
 
370
 
 
371
        If 'bytes' contains too much data it will be appended to
 
372
        self.unused_data.
 
373
 
 
374
        finished_reading will be set when no more data is required.  Further
 
375
        data will be appended to self.unused_data.
 
376
        """
 
377
        # accept_bytes is allowed to change the state
 
378
        current_state = self.state_accept
 
379
        self.state_accept(bytes)
 
380
        while current_state != self.state_accept:
 
381
            current_state = self.state_accept
 
382
            self.state_accept('')
 
383
 
 
384
    def next_read_size(self):
 
385
        if self.bytes_left is not None:
 
386
            # Ideally we want to read all the remainder of the body and the
 
387
            # trailer in one go.
 
388
            return self.bytes_left + 5
 
389
        elif self.state_accept == self._state_accept_reading_trailer:
 
390
            # Just the trailer left
 
391
            return 5 - len(self._trailer_buffer)
 
392
        elif self.state_accept == self._state_accept_expecting_length:
 
393
            # There's still at least 6 bytes left ('\n' to end the length, plus
 
394
            # 'done\n').
 
395
            return 6
 
396
        else:
 
397
            # Reading excess data.  Either way, 1 byte at a time is fine.
 
398
            return 1
 
399
        
 
400
    def read_pending_data(self):
 
401
        """Return any pending data that has been decoded."""
 
402
        return self.state_read()
 
403
 
 
404
    def _state_accept_expecting_length(self, bytes):
 
405
        self._in_buffer += bytes
 
406
        pos = self._in_buffer.find('\n')
 
407
        if pos == -1:
 
408
            return
 
409
        self.bytes_left = int(self._in_buffer[:pos])
 
410
        self._in_buffer = self._in_buffer[pos+1:]
 
411
        self.bytes_left -= len(self._in_buffer)
 
412
        self.state_accept = self._state_accept_reading_body
 
413
        self.state_read = self._state_read_in_buffer
 
414
 
 
415
    def _state_accept_reading_body(self, bytes):
 
416
        self._in_buffer += bytes
 
417
        self.bytes_left -= len(bytes)
 
418
        if self.bytes_left <= 0:
 
419
            # Finished with body
 
420
            if self.bytes_left != 0:
 
421
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
 
422
                self._in_buffer = self._in_buffer[:self.bytes_left]
 
423
            self.bytes_left = None
 
424
            self.state_accept = self._state_accept_reading_trailer
 
425
        
 
426
    def _state_accept_reading_trailer(self, bytes):
 
427
        self._trailer_buffer += bytes
 
428
        # TODO: what if the trailer does not match "done\n"?  Should this raise
 
429
        # a ProtocolViolation exception?
 
430
        if self._trailer_buffer.startswith('done\n'):
 
431
            self.unused_data = self._trailer_buffer[len('done\n'):]
 
432
            self.state_accept = self._state_accept_reading_unused
 
433
            self.finished_reading = True
 
434
    
 
435
    def _state_accept_reading_unused(self, bytes):
 
436
        self.unused_data += bytes
 
437
 
 
438
    def _state_read_no_data(self):
 
439
        return ''
 
440
 
 
441
    def _state_read_in_buffer(self):
 
442
        result = self._in_buffer
 
443
        self._in_buffer = ''
 
444
        return result
 
445
 
 
446
 
 
447
class SmartServerStreamMedium(object):
 
448
    """Handles smart commands coming over a stream.
 
449
 
 
450
    The stream may be a pipe connected to sshd, or a tcp socket, or an
 
451
    in-process fifo for testing.
 
452
 
 
453
    One instance is created for each connected client; it can serve multiple
 
454
    requests in the lifetime of the connection.
 
455
 
 
456
    The server passes requests through to an underlying backing transport, 
 
457
    which will typically be a LocalTransport looking at the server's filesystem.
 
458
    """
 
459
 
 
460
    def __init__(self, backing_transport):
 
461
        """Construct new server.
 
462
 
 
463
        :param backing_transport: Transport for the directory served.
 
464
        """
 
465
        # backing_transport could be passed to serve instead of __init__
 
466
        self.backing_transport = backing_transport
 
467
        self.finished = False
 
468
 
 
469
    def serve(self):
 
470
        """Serve requests until the client disconnects."""
 
471
        # Keep a reference to stderr because the sys module's globals get set to
 
472
        # None during interpreter shutdown.
 
473
        from sys import stderr
 
474
        try:
 
475
            while not self.finished:
 
476
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
 
477
                                                         self._write_out)
 
478
                self._serve_one_request(protocol)
 
479
        except Exception, e:
 
480
            stderr.write("%s terminating on exception %s\n" % (self, e))
 
481
            raise
 
482
 
 
483
    def _serve_one_request(self, protocol):
 
484
        """Read one request from input, process, send back a response.
 
485
        
 
486
        :param protocol: a SmartServerRequestProtocol.
 
487
        """
 
488
        try:
 
489
            self._serve_one_request_unguarded(protocol)
 
490
        except KeyboardInterrupt:
 
491
            raise
 
492
        except Exception, e:
 
493
            self.terminate_due_to_error()
 
494
 
 
495
    def terminate_due_to_error(self):
 
496
        """Called when an unhandled exception from the protocol occurs."""
 
497
        raise NotImplementedError(self.terminate_due_to_error)
 
498
 
 
499
 
 
500
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
 
501
 
 
502
    def __init__(self, sock, backing_transport):
 
503
        """Constructor.
 
504
 
 
505
        :param sock: the socket the server will read from.  It will be put
 
506
            into blocking mode.
 
507
        """
 
508
        SmartServerStreamMedium.__init__(self, backing_transport)
 
509
        self.push_back = ''
 
510
        sock.setblocking(True)
 
511
        self.socket = sock
 
512
 
 
513
    def _serve_one_request_unguarded(self, protocol):
 
514
        while protocol.next_read_size():
 
515
            if self.push_back:
 
516
                protocol.accept_bytes(self.push_back)
 
517
                self.push_back = ''
 
518
            else:
 
519
                bytes = self.socket.recv(4096)
 
520
                if bytes == '':
 
521
                    self.finished = True
 
522
                    return
 
523
                protocol.accept_bytes(bytes)
 
524
        
 
525
        self.push_back = protocol.excess_buffer
 
526
    
 
527
    def terminate_due_to_error(self):
 
528
        """Called when an unhandled exception from the protocol occurs."""
 
529
        # TODO: This should log to a server log file, but no such thing
 
530
        # exists yet.  Andrew Bennetts 2006-09-29.
 
531
        self.socket.close()
 
532
        self.finished = True
 
533
 
 
534
    def _write_out(self, bytes):
 
535
        self.socket.sendall(bytes)
 
536
 
 
537
 
 
538
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
 
539
 
 
540
    def __init__(self, in_file, out_file, backing_transport):
 
541
        """Construct new server.
 
542
 
 
543
        :param in_file: Python file from which requests can be read.
 
544
        :param out_file: Python file to write responses.
 
545
        :param backing_transport: Transport for the directory served.
 
546
        """
 
547
        SmartServerStreamMedium.__init__(self, backing_transport)
 
548
        if sys.platform == 'win32':
 
549
            # force binary mode for files
 
550
            import msvcrt
 
551
            for f in (in_file, out_file):
 
552
                fileno = getattr(f, 'fileno', None)
 
553
                if fileno:
 
554
                    msvcrt.setmode(fileno(), os.O_BINARY)
 
555
        self._in = in_file
 
556
        self._out = out_file
 
557
 
 
558
    def _serve_one_request_unguarded(self, protocol):
 
559
        while True:
 
560
            bytes_to_read = protocol.next_read_size()
 
561
            if bytes_to_read == 0:
 
562
                # Finished serving this request.
 
563
                self._out.flush()
 
564
                return
 
565
            bytes = self._in.read(bytes_to_read)
 
566
            if bytes == '':
 
567
                # Connection has been closed.
 
568
                self.finished = True
 
569
                self._out.flush()
 
570
                return
 
571
            protocol.accept_bytes(bytes)
 
572
 
 
573
    def terminate_due_to_error(self):
 
574
        # TODO: This should log to a server log file, but no such thing
 
575
        # exists yet.  Andrew Bennetts 2006-09-29.
 
576
        self._out.close()
 
577
        self.finished = True
 
578
 
 
579
    def _write_out(self, bytes):
 
580
        self._out.write(bytes)
 
581
 
 
582
 
 
583
class SmartServerResponse(object):
 
584
    """Response generated by SmartServerRequestHandler."""
 
585
 
 
586
    def __init__(self, args, body=None):
 
587
        self.args = args
 
588
        self.body = body
 
589
 
 
590
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
 
591
# for delivering the data for a request. This could be done with as the
 
592
# StreamServer, though that would create conflation between request and response
 
593
# which may be undesirable.
 
594
 
 
595
 
 
596
class SmartServerRequestHandler(object):
 
597
    """Protocol logic for smart server.
 
598
    
 
599
    This doesn't handle serialization at all, it just processes requests and
 
600
    creates responses.
 
601
    """
 
602
 
 
603
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
 
604
    # not contain encoding or decoding logic to allow the wire protocol to vary
 
605
    # from the object protocol: we will want to tweak the wire protocol separate
 
606
    # from the object model, and ideally we will be able to do that without
 
607
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
 
608
    # just a Protocol subclass.
 
609
 
 
610
    # TODO: Better way of representing the body for commands that take it,
 
611
    # and allow it to be streamed into the server.
 
612
    
 
613
    def __init__(self, backing_transport):
 
614
        self._backing_transport = backing_transport
 
615
        self._converted_command = False
 
616
        self.finished_reading = False
 
617
        self._body_bytes = ''
 
618
        self.response = None
 
619
 
 
620
    def accept_body(self, bytes):
 
621
        """Accept body data.
 
622
 
 
623
        This should be overriden for each command that desired body data to
 
624
        handle the right format of that data. I.e. plain bytes, a bundle etc.
 
625
 
 
626
        The deserialisation into that format should be done in the Protocol
 
627
        object. Set self.desired_body_format to the format your method will
 
628
        handle.
 
629
        """
 
630
        # default fallback is to accumulate bytes.
 
631
        self._body_bytes += bytes
 
632
        
 
633
    def _end_of_body_handler(self):
 
634
        """An unimplemented end of body handler."""
 
635
        raise NotImplementedError(self._end_of_body_handler)
 
636
        
 
637
    def do_hello(self):
 
638
        """Answer a version request with my version."""
 
639
        return SmartServerResponse(('ok', '1'))
 
640
 
 
641
    def do_has(self, relpath):
 
642
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
 
643
        return SmartServerResponse((r,))
 
644
 
 
645
    def do_get(self, relpath):
 
646
        backing_bytes = self._backing_transport.get_bytes(relpath)
 
647
        return SmartServerResponse(('ok',), backing_bytes)
 
648
 
 
649
    def _deserialise_optional_mode(self, mode):
 
650
        # XXX: FIXME this should be on the protocol object.
 
651
        if mode == '':
 
652
            return None
 
653
        else:
 
654
            return int(mode)
 
655
 
 
656
    def do_append(self, relpath, mode):
 
657
        self._converted_command = True
 
658
        self._relpath = relpath
 
659
        self._mode = self._deserialise_optional_mode(mode)
 
660
        self._end_of_body_handler = self._handle_do_append_end
 
661
    
 
662
    def _handle_do_append_end(self):
 
663
        old_length = self._backing_transport.append_bytes(
 
664
            self._relpath, self._body_bytes, self._mode)
 
665
        self.response = SmartServerResponse(('appended', '%d' % old_length))
 
666
 
 
667
    def do_delete(self, relpath):
 
668
        self._backing_transport.delete(relpath)
 
669
 
 
670
    def do_iter_files_recursive(self, relpath):
 
671
        transport = self._backing_transport.clone(relpath)
 
672
        filenames = transport.iter_files_recursive()
 
673
        return SmartServerResponse(('names',) + tuple(filenames))
 
674
 
 
675
    def do_list_dir(self, relpath):
 
676
        filenames = self._backing_transport.list_dir(relpath)
 
677
        return SmartServerResponse(('names',) + tuple(filenames))
 
678
 
 
679
    def do_mkdir(self, relpath, mode):
 
680
        self._backing_transport.mkdir(relpath,
 
681
                                      self._deserialise_optional_mode(mode))
 
682
 
 
683
    def do_move(self, rel_from, rel_to):
 
684
        self._backing_transport.move(rel_from, rel_to)
 
685
 
 
686
    def do_put(self, relpath, mode):
 
687
        self._converted_command = True
 
688
        self._relpath = relpath
 
689
        self._mode = self._deserialise_optional_mode(mode)
 
690
        self._end_of_body_handler = self._handle_do_put
 
691
 
 
692
    def _handle_do_put(self):
 
693
        self._backing_transport.put_bytes(self._relpath,
 
694
                self._body_bytes, self._mode)
 
695
        self.response = SmartServerResponse(('ok',))
 
696
 
 
697
    def _deserialise_offsets(self, text):
 
698
        # XXX: FIXME this should be on the protocol object.
 
699
        offsets = []
 
700
        for line in text.split('\n'):
 
701
            if not line:
 
702
                continue
 
703
            start, length = line.split(',')
 
704
            offsets.append((int(start), int(length)))
 
705
        return offsets
 
706
 
 
707
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
 
708
        self._converted_command = True
 
709
        self._end_of_body_handler = self._handle_put_non_atomic
 
710
        self._relpath = relpath
 
711
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
 
712
        self._mode = self._deserialise_optional_mode(mode)
 
713
        # a boolean would be nicer XXX
 
714
        self._create_parent = (create_parent == 'T')
 
715
 
 
716
    def _handle_put_non_atomic(self):
 
717
        self._backing_transport.put_bytes_non_atomic(self._relpath,
 
718
                self._body_bytes,
 
719
                mode=self._mode,
 
720
                create_parent_dir=self._create_parent,
 
721
                dir_mode=self._dir_mode)
 
722
        self.response = SmartServerResponse(('ok',))
 
723
 
 
724
    def do_readv(self, relpath):
 
725
        self._converted_command = True
 
726
        self._end_of_body_handler = self._handle_readv_offsets
 
727
        self._relpath = relpath
 
728
 
 
729
    def end_of_body(self):
 
730
        """No more body data will be received."""
 
731
        self._run_handler_code(self._end_of_body_handler, (), {})
 
732
        # cannot read after this.
 
733
        self.finished_reading = True
 
734
 
 
735
    def _handle_readv_offsets(self):
 
736
        """accept offsets for a readv request."""
 
737
        offsets = self._deserialise_offsets(self._body_bytes)
 
738
        backing_bytes = ''.join(bytes for offset, bytes in
 
739
            self._backing_transport.readv(self._relpath, offsets))
 
740
        self.response = SmartServerResponse(('readv',), backing_bytes)
 
741
        
 
742
    def do_rename(self, rel_from, rel_to):
 
743
        self._backing_transport.rename(rel_from, rel_to)
 
744
 
 
745
    def do_rmdir(self, relpath):
 
746
        self._backing_transport.rmdir(relpath)
 
747
 
 
748
    def do_stat(self, relpath):
 
749
        stat = self._backing_transport.stat(relpath)
 
750
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
 
751
        
 
752
    def do_get_bundle(self, path, revision_id):
 
753
        # open transport relative to our base
 
754
        t = self._backing_transport.clone(path)
 
755
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
 
756
        repo = control.open_repository()
 
757
        tmpf = tempfile.TemporaryFile()
 
758
        base_revision = revision.NULL_REVISION
 
759
        write_bundle(repo, revision_id, base_revision, tmpf)
 
760
        tmpf.seek(0)
 
761
        return SmartServerResponse((), tmpf.read())
 
762
 
 
763
    def dispatch_command(self, cmd, args):
 
764
        """Deprecated compatibility method.""" # XXX XXX
 
765
        func = getattr(self, 'do_' + cmd, None)
 
766
        if func is None:
 
767
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
 
768
        self._run_handler_code(func, args, {})
 
769
 
 
770
    def _run_handler_code(self, callable, args, kwargs):
 
771
        """Run some handler specific code 'callable'.
 
772
 
 
773
        If a result is returned, it is considered to be the commands response,
 
774
        and finished_reading is set true, and its assigned to self.response.
 
775
 
 
776
        Any exceptions caught are translated and a response object created
 
777
        from them.
 
778
        """
 
779
        result = self._call_converting_errors(callable, args, kwargs)
 
780
        if result is not None:
 
781
            self.response = result
 
782
            self.finished_reading = True
 
783
        # handle unconverted commands
 
784
        if not self._converted_command:
 
785
            self.finished_reading = True
 
786
            if result is None:
 
787
                self.response = SmartServerResponse(('ok',))
 
788
 
 
789
    def _call_converting_errors(self, callable, args, kwargs):
 
790
        """Call callable converting errors to Response objects."""
 
791
        try:
 
792
            return callable(*args, **kwargs)
 
793
        except errors.NoSuchFile, e:
 
794
            return SmartServerResponse(('NoSuchFile', e.path))
 
795
        except errors.FileExists, e:
 
796
            return SmartServerResponse(('FileExists', e.path))
 
797
        except errors.DirectoryNotEmpty, e:
 
798
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
 
799
        except errors.ShortReadvError, e:
 
800
            return SmartServerResponse(('ShortReadvError',
 
801
                e.path, str(e.offset), str(e.length), str(e.actual)))
 
802
        except UnicodeError, e:
 
803
            # If it is a DecodeError, than most likely we are starting
 
804
            # with a plain string
 
805
            str_or_unicode = e.object
 
806
            if isinstance(str_or_unicode, unicode):
 
807
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
 
808
                # should escape it somehow.
 
809
                val = 'u:' + str_or_unicode.encode('utf-8')
 
810
            else:
 
811
                val = 's:' + str_or_unicode.encode('base64')
 
812
            # This handles UnicodeEncodeError or UnicodeDecodeError
 
813
            return SmartServerResponse((e.__class__.__name__,
 
814
                    e.encoding, val, str(e.start), str(e.end), e.reason))
 
815
        except errors.TransportNotPossible, e:
 
816
            if e.msg == "readonly transport":
 
817
                return SmartServerResponse(('ReadOnlyError', ))
 
818
            else:
 
819
                raise
 
820
 
 
821
 
 
822
class SmartTCPServer(object):
 
823
    """Listens on a TCP socket and accepts connections from smart clients"""
 
824
 
 
825
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
826
        """Construct a new server.
 
827
 
 
828
        To actually start it running, call either start_background_thread or
 
829
        serve.
 
830
 
 
831
        :param host: Name of the interface to listen on.
 
832
        :param port: TCP port to listen on, or 0 to allocate a transient port.
 
833
        """
 
834
        self._server_socket = socket.socket()
 
835
        self._server_socket.bind((host, port))
 
836
        self.port = self._server_socket.getsockname()[1]
 
837
        self._server_socket.listen(1)
 
838
        self._server_socket.settimeout(1)
 
839
        self.backing_transport = backing_transport
 
840
 
 
841
    def serve(self):
 
842
        # let connections timeout so that we get a chance to terminate
 
843
        # Keep a reference to the exceptions we want to catch because the socket
 
844
        # module's globals get set to None during interpreter shutdown.
 
845
        from socket import timeout as socket_timeout
 
846
        from socket import error as socket_error
 
847
        self._should_terminate = False
 
848
        while not self._should_terminate:
 
849
            try:
 
850
                self.accept_and_serve()
 
851
            except socket_timeout:
 
852
                # just check if we're asked to stop
 
853
                pass
 
854
            except socket_error, e:
 
855
                trace.warning("client disconnected: %s", e)
 
856
                pass
 
857
 
 
858
    def get_url(self):
 
859
        """Return the url of the server"""
 
860
        return "bzr://%s:%d/" % self._server_socket.getsockname()
 
861
 
 
862
    def accept_and_serve(self):
 
863
        conn, client_addr = self._server_socket.accept()
 
864
        # For WIN32, where the timeout value from the listening socket
 
865
        # propogates to the newly accepted socket.
 
866
        conn.setblocking(True)
 
867
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
868
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
 
869
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
870
        connection_thread.setDaemon(True)
 
871
        connection_thread.start()
 
872
 
 
873
    def start_background_thread(self):
 
874
        self._server_thread = threading.Thread(None,
 
875
                self.serve,
 
876
                name='server-' + self.get_url())
 
877
        self._server_thread.setDaemon(True)
 
878
        self._server_thread.start()
 
879
 
 
880
    def stop_background_thread(self):
 
881
        self._should_terminate = True
 
882
        # At one point we would wait to join the threads here, but it looks
 
883
        # like they don't actually exit.  So now we just leave them running
 
884
        # and expect to terminate the process. -- mbp 20070215
 
885
        # self._server_socket.close()
 
886
        ## sys.stderr.write("waiting for server thread to finish...")
 
887
        ## self._server_thread.join()
 
888
 
 
889
 
 
890
class SmartTCPServer_for_testing(SmartTCPServer):
 
891
    """Server suitable for use by transport tests.
 
892
    
 
893
    This server is backed by the process's cwd.
 
894
    """
 
895
 
 
896
    def __init__(self):
 
897
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
 
898
        # The server is set up by default like for ssh access: the client
 
899
        # passes filesystem-absolute paths; therefore the server must look
 
900
        # them up relative to the root directory.  it might be better to act
 
901
        # a public server and have the server rewrite paths into the test
 
902
        # directory.
 
903
        SmartTCPServer.__init__(self,
 
904
            transport.get_transport(urlutils.local_path_to_url('/')))
 
905
        
 
906
    def setUp(self):
 
907
        """Set up server for testing"""
 
908
        self.start_background_thread()
 
909
 
 
910
    def tearDown(self):
 
911
        self.stop_background_thread()
 
912
 
 
913
    def get_url(self):
 
914
        """Return the url of the server"""
 
915
        host, port = self._server_socket.getsockname()
 
916
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
 
917
 
 
918
    def get_bogus_url(self):
 
919
        """Return a URL which will fail to connect"""
 
920
        return 'bzr://127.0.0.1:1/'
 
921
 
 
922
 
 
923
class SmartStat(object):
 
924
 
 
925
    def __init__(self, size, mode):
 
926
        self.st_size = size
 
927
        self.st_mode = mode
 
928
 
 
929
 
 
930
class SmartTransport(transport.Transport):
 
931
    """Connection to a smart server.
 
932
 
 
933
    The connection holds references to pipes that can be used to send requests
 
934
    to the server.
 
935
 
 
936
    The connection has a notion of the current directory to which it's
 
937
    connected; this is incorporated in filenames passed to the server.
 
938
    
 
939
    This supports some higher-level RPC operations and can also be treated 
 
940
    like a Transport to do file-like operations.
 
941
 
 
942
    The connection can be made over a tcp socket, or (in future) an ssh pipe
 
943
    or a series of http requests.  There are concrete subclasses for each
 
944
    type: SmartTCPTransport, etc.
 
945
    """
 
946
 
 
947
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
948
    # responsibilities: Put those on SmartClient or similar. This is vital for
 
949
    # the ability to support multiple versions of the smart protocol over time:
 
950
    # SmartTransport is an adapter from the Transport object model to the 
 
951
    # SmartClient model, not an encoder.
 
952
 
 
953
    def __init__(self, url, clone_from=None, medium=None):
 
954
        """Constructor.
 
955
 
 
956
        :param medium: The medium to use for this RemoteTransport. This must be
 
957
            supplied if clone_from is None.
 
958
        """
 
959
        ### Technically super() here is faulty because Transport's __init__
 
960
        ### fails to take 2 parameters, and if super were to choose a silly
 
961
        ### initialisation order things would blow up. 
 
962
        if not url.endswith('/'):
 
963
            url += '/'
 
964
        super(SmartTransport, self).__init__(url)
 
965
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
 
966
                transport.split_url(url)
 
967
        if clone_from is None:
 
968
            self._medium = medium
 
969
        else:
 
970
            # credentials may be stripped from the base in some circumstances
 
971
            # as yet to be clearly defined or documented, so copy them.
 
972
            self._username = clone_from._username
 
973
            # reuse same connection
 
974
            self._medium = clone_from._medium
 
975
        assert self._medium is not None
 
976
 
 
977
    def abspath(self, relpath):
 
978
        """Return the full url to the given relative path.
 
979
        
 
980
        @param relpath: the relative path or path components
 
981
        @type relpath: str or list
 
982
        """
 
983
        return self._unparse_url(self._remote_path(relpath))
 
984
    
 
985
    def clone(self, relative_url):
 
986
        """Make a new SmartTransport related to me, sharing the same connection.
 
987
 
 
988
        This essentially opens a handle on a different remote directory.
 
989
        """
 
990
        if relative_url is None:
 
991
            return SmartTransport(self.base, self)
 
992
        else:
 
993
            return SmartTransport(self.abspath(relative_url), self)
 
994
 
 
995
    def is_readonly(self):
 
996
        """Smart server transport can do read/write file operations."""
 
997
        return False
 
998
                                                   
 
999
    def get_smart_client(self):
 
1000
        return self._medium
 
1001
 
 
1002
    def get_smart_medium(self):
 
1003
        return self._medium
 
1004
                                                   
 
1005
    def _unparse_url(self, path):
 
1006
        """Return URL for a path.
 
1007
 
 
1008
        :see: SFTPUrlHandling._unparse_url
 
1009
        """
 
1010
        # TODO: Eventually it should be possible to unify this with
 
1011
        # SFTPUrlHandling._unparse_url?
 
1012
        if path == '':
 
1013
            path = '/'
 
1014
        path = urllib.quote(path)
 
1015
        netloc = urllib.quote(self._host)
 
1016
        if self._username is not None:
 
1017
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
 
1018
        if self._port is not None:
 
1019
            netloc = '%s:%d' % (netloc, self._port)
 
1020
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
1021
 
 
1022
    def _remote_path(self, relpath):
 
1023
        """Returns the Unicode version of the absolute path for relpath."""
 
1024
        return self._combine_paths(self._path, relpath)
 
1025
 
 
1026
    def _call(self, method, *args):
 
1027
        resp = self._call2(method, *args)
 
1028
        self._translate_error(resp)
 
1029
 
 
1030
    def _call2(self, method, *args):
 
1031
        """Call a method on the remote server."""
 
1032
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1033
        protocol.call(method, *args)
 
1034
        return protocol.read_response_tuple()
 
1035
 
 
1036
    def _call_with_body_bytes(self, method, args, body):
 
1037
        """Call a method on the remote server with body bytes."""
 
1038
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1039
        protocol.call_with_body_bytes((method, ) + args, body)
 
1040
        return protocol.read_response_tuple()
 
1041
 
 
1042
    def has(self, relpath):
 
1043
        """Indicate whether a remote file of the given name exists or not.
 
1044
 
 
1045
        :see: Transport.has()
 
1046
        """
 
1047
        resp = self._call2('has', self._remote_path(relpath))
 
1048
        if resp == ('yes', ):
 
1049
            return True
 
1050
        elif resp == ('no', ):
 
1051
            return False
 
1052
        else:
 
1053
            self._translate_error(resp)
 
1054
 
 
1055
    def get(self, relpath):
 
1056
        """Return file-like object reading the contents of a remote file.
 
1057
        
 
1058
        :see: Transport.get_bytes()/get_file()
 
1059
        """
 
1060
        return StringIO(self.get_bytes(relpath))
 
1061
 
 
1062
    def get_bytes(self, relpath):
 
1063
        remote = self._remote_path(relpath)
 
1064
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1065
        protocol.call('get', remote)
 
1066
        resp = protocol.read_response_tuple(True)
 
1067
        if resp != ('ok', ):
 
1068
            protocol.cancel_read_body()
 
1069
            self._translate_error(resp, relpath)
 
1070
        return protocol.read_body_bytes()
 
1071
 
 
1072
    def _serialise_optional_mode(self, mode):
 
1073
        if mode is None:
 
1074
            return ''
 
1075
        else:
 
1076
            return '%d' % mode
 
1077
 
 
1078
    def mkdir(self, relpath, mode=None):
 
1079
        resp = self._call2('mkdir', self._remote_path(relpath),
 
1080
            self._serialise_optional_mode(mode))
 
1081
        self._translate_error(resp)
 
1082
 
 
1083
    def put_bytes(self, relpath, upload_contents, mode=None):
 
1084
        # FIXME: upload_file is probably not safe for non-ascii characters -
 
1085
        # should probably just pass all parameters as length-delimited
 
1086
        # strings?
 
1087
        resp = self._call_with_body_bytes('put',
 
1088
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1089
            upload_contents)
 
1090
        self._translate_error(resp)
 
1091
 
 
1092
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
 
1093
                             create_parent_dir=False,
 
1094
                             dir_mode=None):
 
1095
        """See Transport.put_bytes_non_atomic."""
 
1096
        # FIXME: no encoding in the transport!
 
1097
        create_parent_str = 'F'
 
1098
        if create_parent_dir:
 
1099
            create_parent_str = 'T'
 
1100
 
 
1101
        resp = self._call_with_body_bytes(
 
1102
            'put_non_atomic',
 
1103
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
 
1104
             create_parent_str, self._serialise_optional_mode(dir_mode)),
 
1105
            bytes)
 
1106
        self._translate_error(resp)
 
1107
 
 
1108
    def put_file(self, relpath, upload_file, mode=None):
 
1109
        # its not ideal to seek back, but currently put_non_atomic_file depends
 
1110
        # on transports not reading before failing - which is a faulty
 
1111
        # assumption I think - RBC 20060915
 
1112
        pos = upload_file.tell()
 
1113
        try:
 
1114
            return self.put_bytes(relpath, upload_file.read(), mode)
 
1115
        except:
 
1116
            upload_file.seek(pos)
 
1117
            raise
 
1118
 
 
1119
    def put_file_non_atomic(self, relpath, f, mode=None,
 
1120
                            create_parent_dir=False,
 
1121
                            dir_mode=None):
 
1122
        return self.put_bytes_non_atomic(relpath, f.read(), mode=mode,
 
1123
                                         create_parent_dir=create_parent_dir,
 
1124
                                         dir_mode=dir_mode)
 
1125
 
 
1126
    def append_file(self, relpath, from_file, mode=None):
 
1127
        return self.append_bytes(relpath, from_file.read(), mode)
 
1128
        
 
1129
    def append_bytes(self, relpath, bytes, mode=None):
 
1130
        resp = self._call_with_body_bytes(
 
1131
            'append',
 
1132
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
 
1133
            bytes)
 
1134
        if resp[0] == 'appended':
 
1135
            return int(resp[1])
 
1136
        self._translate_error(resp)
 
1137
 
 
1138
    def delete(self, relpath):
 
1139
        resp = self._call2('delete', self._remote_path(relpath))
 
1140
        self._translate_error(resp)
 
1141
 
 
1142
    def readv(self, relpath, offsets):
 
1143
        if not offsets:
 
1144
            return
 
1145
 
 
1146
        offsets = list(offsets)
 
1147
 
 
1148
        sorted_offsets = sorted(offsets)
 
1149
        # turn the list of offsets into a stack
 
1150
        offset_stack = iter(offsets)
 
1151
        cur_offset_and_size = offset_stack.next()
 
1152
        coalesced = list(self._coalesce_offsets(sorted_offsets,
 
1153
                               limit=self._max_readv_combine,
 
1154
                               fudge_factor=self._bytes_to_read_before_seek))
 
1155
 
 
1156
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
 
1157
        protocol.call_with_body_readv_array(
 
1158
            ('readv', self._remote_path(relpath)),
 
1159
            [(c.start, c.length) for c in coalesced])
 
1160
        resp = protocol.read_response_tuple(True)
 
1161
 
 
1162
        if resp[0] != 'readv':
 
1163
            # This should raise an exception
 
1164
            protocol.cancel_read_body()
 
1165
            self._translate_error(resp)
 
1166
            return
 
1167
 
 
1168
        # FIXME: this should know how many bytes are needed, for clarity.
 
1169
        data = protocol.read_body_bytes()
 
1170
        # Cache the results, but only until they have been fulfilled
 
1171
        data_map = {}
 
1172
        for c_offset in coalesced:
 
1173
            if len(data) < c_offset.length:
 
1174
                raise errors.ShortReadvError(relpath, c_offset.start,
 
1175
                            c_offset.length, actual=len(data))
 
1176
            for suboffset, subsize in c_offset.ranges:
 
1177
                key = (c_offset.start+suboffset, subsize)
 
1178
                data_map[key] = data[suboffset:suboffset+subsize]
 
1179
            data = data[c_offset.length:]
 
1180
 
 
1181
            # Now that we've read some data, see if we can yield anything back
 
1182
            while cur_offset_and_size in data_map:
 
1183
                this_data = data_map.pop(cur_offset_and_size)
 
1184
                yield cur_offset_and_size[0], this_data
 
1185
                cur_offset_and_size = offset_stack.next()
 
1186
 
 
1187
    def rename(self, rel_from, rel_to):
 
1188
        self._call('rename',
 
1189
                   self._remote_path(rel_from),
 
1190
                   self._remote_path(rel_to))
 
1191
 
 
1192
    def move(self, rel_from, rel_to):
 
1193
        self._call('move',
 
1194
                   self._remote_path(rel_from),
 
1195
                   self._remote_path(rel_to))
 
1196
 
 
1197
    def rmdir(self, relpath):
 
1198
        resp = self._call('rmdir', self._remote_path(relpath))
 
1199
 
 
1200
    def _translate_error(self, resp, orig_path=None):
 
1201
        """Raise an exception from a response"""
 
1202
        if resp is None:
 
1203
            what = None
 
1204
        else:
 
1205
            what = resp[0]
 
1206
        if what == 'ok':
 
1207
            return
 
1208
        elif what == 'NoSuchFile':
 
1209
            if orig_path is not None:
 
1210
                error_path = orig_path
 
1211
            else:
 
1212
                error_path = resp[1]
 
1213
            raise errors.NoSuchFile(error_path)
 
1214
        elif what == 'error':
 
1215
            raise errors.SmartProtocolError(unicode(resp[1]))
 
1216
        elif what == 'FileExists':
 
1217
            raise errors.FileExists(resp[1])
 
1218
        elif what == 'DirectoryNotEmpty':
 
1219
            raise errors.DirectoryNotEmpty(resp[1])
 
1220
        elif what == 'ShortReadvError':
 
1221
            raise errors.ShortReadvError(resp[1], int(resp[2]),
 
1222
                                         int(resp[3]), int(resp[4]))
 
1223
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
 
1224
            encoding = str(resp[1]) # encoding must always be a string
 
1225
            val = resp[2]
 
1226
            start = int(resp[3])
 
1227
            end = int(resp[4])
 
1228
            reason = str(resp[5]) # reason must always be a string
 
1229
            if val.startswith('u:'):
 
1230
                val = val[2:].decode('utf-8')
 
1231
            elif val.startswith('s:'):
 
1232
                val = val[2:].decode('base64')
 
1233
            if what == 'UnicodeDecodeError':
 
1234
                raise UnicodeDecodeError(encoding, val, start, end, reason)
 
1235
            elif what == 'UnicodeEncodeError':
 
1236
                raise UnicodeEncodeError(encoding, val, start, end, reason)
 
1237
        elif what == "ReadOnlyError":
 
1238
            raise errors.TransportNotPossible('readonly transport')
 
1239
        else:
 
1240
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
1241
 
 
1242
    def disconnect(self):
 
1243
        self._medium.disconnect()
 
1244
 
 
1245
    def delete_tree(self, relpath):
 
1246
        raise errors.TransportNotPossible('readonly transport')
 
1247
 
 
1248
    def stat(self, relpath):
 
1249
        resp = self._call2('stat', self._remote_path(relpath))
 
1250
        if resp[0] == 'stat':
 
1251
            return SmartStat(int(resp[1]), int(resp[2], 8))
 
1252
        else:
 
1253
            self._translate_error(resp)
 
1254
 
 
1255
    ## def lock_read(self, relpath):
 
1256
    ##     """Lock the given file for shared (read) access.
 
1257
    ##     :return: A lock object, which should be passed to Transport.unlock()
 
1258
    ##     """
 
1259
    ##     # The old RemoteBranch ignore lock for reading, so we will
 
1260
    ##     # continue that tradition and return a bogus lock object.
 
1261
    ##     class BogusLock(object):
 
1262
    ##         def __init__(self, path):
 
1263
    ##             self.path = path
 
1264
    ##         def unlock(self):
 
1265
    ##             pass
 
1266
    ##     return BogusLock(relpath)
 
1267
 
 
1268
    def listable(self):
 
1269
        return True
 
1270
 
 
1271
    def list_dir(self, relpath):
 
1272
        resp = self._call2('list_dir', self._remote_path(relpath))
 
1273
        if resp[0] == 'names':
 
1274
            return [name.encode('ascii') for name in resp[1:]]
 
1275
        else:
 
1276
            self._translate_error(resp)
 
1277
 
 
1278
    def iter_files_recursive(self):
 
1279
        resp = self._call2('iter_files_recursive', self._remote_path(''))
 
1280
        if resp[0] == 'names':
 
1281
            return resp[1:]
 
1282
        else:
 
1283
            self._translate_error(resp)
 
1284
 
 
1285
 
 
1286
class SmartClientMediumRequest(object):
 
1287
    """A request on a SmartClientMedium.
 
1288
 
 
1289
    Each request allows bytes to be provided to it via accept_bytes, and then
 
1290
    the response bytes to be read via read_bytes.
 
1291
 
 
1292
    For instance:
 
1293
    request.accept_bytes('123')
 
1294
    request.finished_writing()
 
1295
    result = request.read_bytes(3)
 
1296
    request.finished_reading()
 
1297
 
 
1298
    It is up to the individual SmartClientMedium whether multiple concurrent
 
1299
    requests can exist. See SmartClientMedium.get_request to obtain instances 
 
1300
    of SmartClientMediumRequest, and the concrete Medium you are using for 
 
1301
    details on concurrency and pipelining.
 
1302
    """
 
1303
 
 
1304
    def __init__(self, medium):
 
1305
        """Construct a SmartClientMediumRequest for the medium medium."""
 
1306
        self._medium = medium
 
1307
        # we track state by constants - we may want to use the same
 
1308
        # pattern as BodyReader if it gets more complex.
 
1309
        # valid states are: "writing", "reading", "done"
 
1310
        self._state = "writing"
 
1311
 
 
1312
    def accept_bytes(self, bytes):
 
1313
        """Accept bytes for inclusion in this request.
 
1314
 
 
1315
        This method may not be be called after finished_writing() has been
 
1316
        called.  It depends upon the Medium whether or not the bytes will be
 
1317
        immediately transmitted. Message based Mediums will tend to buffer the
 
1318
        bytes until finished_writing() is called.
 
1319
 
 
1320
        :param bytes: A bytestring.
 
1321
        """
 
1322
        if self._state != "writing":
 
1323
            raise errors.WritingCompleted(self)
 
1324
        self._accept_bytes(bytes)
 
1325
 
 
1326
    def _accept_bytes(self, bytes):
 
1327
        """Helper for accept_bytes.
 
1328
 
 
1329
        Accept_bytes checks the state of the request to determing if bytes
 
1330
        should be accepted. After that it hands off to _accept_bytes to do the
 
1331
        actual acceptance.
 
1332
        """
 
1333
        raise NotImplementedError(self._accept_bytes)
 
1334
 
 
1335
    def finished_reading(self):
 
1336
        """Inform the request that all desired data has been read.
 
1337
 
 
1338
        This will remove the request from the pipeline for its medium (if the
 
1339
        medium supports pipelining) and any further calls to methods on the
 
1340
        request will raise ReadingCompleted.
 
1341
        """
 
1342
        if self._state == "writing":
 
1343
            raise errors.WritingNotComplete(self)
 
1344
        if self._state != "reading":
 
1345
            raise errors.ReadingCompleted(self)
 
1346
        self._state = "done"
 
1347
        self._finished_reading()
 
1348
 
 
1349
    def _finished_reading(self):
 
1350
        """Helper for finished_reading.
 
1351
 
 
1352
        finished_reading checks the state of the request to determine if 
 
1353
        finished_reading is allowed, and if it is hands off to _finished_reading
 
1354
        to perform the action.
 
1355
        """
 
1356
        raise NotImplementedError(self._finished_reading)
 
1357
 
 
1358
    def finished_writing(self):
 
1359
        """Finish the writing phase of this request.
 
1360
 
 
1361
        This will flush all pending data for this request along the medium.
 
1362
        After calling finished_writing, you may not call accept_bytes anymore.
 
1363
        """
 
1364
        if self._state != "writing":
 
1365
            raise errors.WritingCompleted(self)
 
1366
        self._state = "reading"
 
1367
        self._finished_writing()
 
1368
 
 
1369
    def _finished_writing(self):
 
1370
        """Helper for finished_writing.
 
1371
 
 
1372
        finished_writing checks the state of the request to determine if 
 
1373
        finished_writing is allowed, and if it is hands off to _finished_writing
 
1374
        to perform the action.
 
1375
        """
 
1376
        raise NotImplementedError(self._finished_writing)
 
1377
 
 
1378
    def read_bytes(self, count):
 
1379
        """Read bytes from this requests response.
 
1380
 
 
1381
        This method will block and wait for count bytes to be read. It may not
 
1382
        be invoked until finished_writing() has been called - this is to ensure
 
1383
        a message-based approach to requests, for compatability with message
 
1384
        based mediums like HTTP.
 
1385
        """
 
1386
        if self._state == "writing":
 
1387
            raise errors.WritingNotComplete(self)
 
1388
        if self._state != "reading":
 
1389
            raise errors.ReadingCompleted(self)
 
1390
        return self._read_bytes(count)
 
1391
 
 
1392
    def _read_bytes(self, count):
 
1393
        """Helper for read_bytes.
 
1394
 
 
1395
        read_bytes checks the state of the request to determing if bytes
 
1396
        should be read. After that it hands off to _read_bytes to do the
 
1397
        actual read.
 
1398
        """
 
1399
        raise NotImplementedError(self._read_bytes)
 
1400
 
 
1401
 
 
1402
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
 
1403
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
 
1404
 
 
1405
    def __init__(self, medium):
 
1406
        SmartClientMediumRequest.__init__(self, medium)
 
1407
        # check that we are safe concurrency wise. If some streams start
 
1408
        # allowing concurrent requests - i.e. via multiplexing - then this
 
1409
        # assert should be moved to SmartClientStreamMedium.get_request,
 
1410
        # and the setting/unsetting of _current_request likewise moved into
 
1411
        # that class : but its unneeded overhead for now. RBC 20060922
 
1412
        if self._medium._current_request is not None:
 
1413
            raise errors.TooManyConcurrentRequests(self._medium)
 
1414
        self._medium._current_request = self
 
1415
 
 
1416
    def _accept_bytes(self, bytes):
 
1417
        """See SmartClientMediumRequest._accept_bytes.
 
1418
        
 
1419
        This forwards to self._medium._accept_bytes because we are operating
 
1420
        on the mediums stream.
 
1421
        """
 
1422
        self._medium._accept_bytes(bytes)
 
1423
 
 
1424
    def _finished_reading(self):
 
1425
        """See SmartClientMediumRequest._finished_reading.
 
1426
 
 
1427
        This clears the _current_request on self._medium to allow a new 
 
1428
        request to be created.
 
1429
        """
 
1430
        assert self._medium._current_request is self
 
1431
        self._medium._current_request = None
 
1432
        
 
1433
    def _finished_writing(self):
 
1434
        """See SmartClientMediumRequest._finished_writing.
 
1435
 
 
1436
        This invokes self._medium._flush to ensure all bytes are transmitted.
 
1437
        """
 
1438
        self._medium._flush()
 
1439
 
 
1440
    def _read_bytes(self, count):
 
1441
        """See SmartClientMediumRequest._read_bytes.
 
1442
        
 
1443
        This forwards to self._medium._read_bytes because we are operating
 
1444
        on the mediums stream.
 
1445
        """
 
1446
        return self._medium._read_bytes(count)
 
1447
 
 
1448
 
 
1449
class SmartClientRequestProtocolOne(SmartProtocolBase):
 
1450
    """The client-side protocol for smart version 1."""
 
1451
 
 
1452
    def __init__(self, request):
 
1453
        """Construct a SmartClientRequestProtocolOne.
 
1454
 
 
1455
        :param request: A SmartClientMediumRequest to serialise onto and
 
1456
            deserialise from.
 
1457
        """
 
1458
        self._request = request
 
1459
        self._body_buffer = None
 
1460
 
 
1461
    def call(self, *args):
 
1462
        bytes = _encode_tuple(args)
 
1463
        self._request.accept_bytes(bytes)
 
1464
        self._request.finished_writing()
 
1465
 
 
1466
    def call_with_body_bytes(self, args, body):
 
1467
        """Make a remote call of args with body bytes 'body'.
 
1468
 
 
1469
        After calling this, call read_response_tuple to find the result out.
 
1470
        """
 
1471
        bytes = _encode_tuple(args)
 
1472
        self._request.accept_bytes(bytes)
 
1473
        bytes = self._encode_bulk_data(body)
 
1474
        self._request.accept_bytes(bytes)
 
1475
        self._request.finished_writing()
 
1476
 
 
1477
    def call_with_body_readv_array(self, args, body):
 
1478
        """Make a remote call with a readv array.
 
1479
 
 
1480
        The body is encoded with one line per readv offset pair. The numbers in
 
1481
        each pair are separated by a comma, and no trailing \n is emitted.
 
1482
        """
 
1483
        bytes = _encode_tuple(args)
 
1484
        self._request.accept_bytes(bytes)
 
1485
        readv_bytes = self._serialise_offsets(body)
 
1486
        bytes = self._encode_bulk_data(readv_bytes)
 
1487
        self._request.accept_bytes(bytes)
 
1488
        self._request.finished_writing()
 
1489
 
 
1490
    def cancel_read_body(self):
 
1491
        """After expecting a body, a response code may indicate one otherwise.
 
1492
 
 
1493
        This method lets the domain client inform the protocol that no body
 
1494
        will be transmitted. This is a terminal method: after calling it the
 
1495
        protocol is not able to be used further.
 
1496
        """
 
1497
        self._request.finished_reading()
 
1498
 
 
1499
    def read_response_tuple(self, expect_body=False):
 
1500
        """Read a response tuple from the wire.
 
1501
 
 
1502
        This should only be called once.
 
1503
        """
 
1504
        result = self._recv_tuple()
 
1505
        if not expect_body:
 
1506
            self._request.finished_reading()
 
1507
        return result
 
1508
 
 
1509
    def read_body_bytes(self, count=-1):
 
1510
        """Read bytes from the body, decoding into a byte stream.
 
1511
        
 
1512
        We read all bytes at once to ensure we've checked the trailer for 
 
1513
        errors, and then feed the buffer back as read_body_bytes is called.
 
1514
        """
 
1515
        if self._body_buffer is not None:
 
1516
            return self._body_buffer.read(count)
 
1517
        _body_decoder = LengthPrefixedBodyDecoder()
 
1518
 
 
1519
        while not _body_decoder.finished_reading:
 
1520
            bytes_wanted = _body_decoder.next_read_size()
 
1521
            bytes = self._request.read_bytes(bytes_wanted)
 
1522
            _body_decoder.accept_bytes(bytes)
 
1523
        self._request.finished_reading()
 
1524
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
 
1525
        # XXX: TODO check the trailer result.
 
1526
        return self._body_buffer.read(count)
 
1527
 
 
1528
    def _recv_tuple(self):
 
1529
        """Receive a tuple from the medium request."""
 
1530
        line = ''
 
1531
        while not line or line[-1] != '\n':
 
1532
            # TODO: this is inefficient - but tuples are short.
 
1533
            new_char = self._request.read_bytes(1)
 
1534
            line += new_char
 
1535
            assert new_char != '', "end of file reading from server."
 
1536
        return _decode_tuple(line)
 
1537
 
 
1538
    def query_version(self):
 
1539
        """Return protocol version number of the server."""
 
1540
        self.call('hello')
 
1541
        resp = self.read_response_tuple()
 
1542
        if resp == ('ok', '1'):
 
1543
            return 1
 
1544
        else:
 
1545
            raise errors.SmartProtocolError("bad response %r" % (resp,))
 
1546
 
 
1547
 
 
1548
class SmartClientMedium(object):
 
1549
    """Smart client is a medium for sending smart protocol requests over."""
 
1550
 
 
1551
    def disconnect(self):
 
1552
        """If this medium maintains a persistent connection, close it.
 
1553
        
 
1554
        The default implementation does nothing.
 
1555
        """
 
1556
        
 
1557
 
 
1558
class SmartClientStreamMedium(SmartClientMedium):
 
1559
    """Stream based medium common class.
 
1560
 
 
1561
    SmartClientStreamMediums operate on a stream. All subclasses use a common
 
1562
    SmartClientStreamMediumRequest for their requests, and should implement
 
1563
    _accept_bytes and _read_bytes to allow the request objects to send and
 
1564
    receive bytes.
 
1565
    """
 
1566
 
 
1567
    def __init__(self):
 
1568
        self._current_request = None
 
1569
 
 
1570
    def accept_bytes(self, bytes):
 
1571
        self._accept_bytes(bytes)
 
1572
 
 
1573
    def __del__(self):
 
1574
        """The SmartClientStreamMedium knows how to close the stream when it is
 
1575
        finished with it.
 
1576
        """
 
1577
        self.disconnect()
 
1578
 
 
1579
    def _flush(self):
 
1580
        """Flush the output stream.
 
1581
        
 
1582
        This method is used by the SmartClientStreamMediumRequest to ensure that
 
1583
        all data for a request is sent, to avoid long timeouts or deadlocks.
 
1584
        """
 
1585
        raise NotImplementedError(self._flush)
 
1586
 
 
1587
    def get_request(self):
 
1588
        """See SmartClientMedium.get_request().
 
1589
 
 
1590
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
 
1591
        for get_request.
 
1592
        """
 
1593
        return SmartClientStreamMediumRequest(self)
 
1594
 
 
1595
    def read_bytes(self, count):
 
1596
        return self._read_bytes(count)
 
1597
 
 
1598
 
 
1599
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
 
1600
    """A client medium using simple pipes.
 
1601
    
 
1602
    This client does not manage the pipes: it assumes they will always be open.
 
1603
    """
 
1604
 
 
1605
    def __init__(self, readable_pipe, writeable_pipe):
 
1606
        SmartClientStreamMedium.__init__(self)
 
1607
        self._readable_pipe = readable_pipe
 
1608
        self._writeable_pipe = writeable_pipe
 
1609
 
 
1610
    def _accept_bytes(self, bytes):
 
1611
        """See SmartClientStreamMedium.accept_bytes."""
 
1612
        self._writeable_pipe.write(bytes)
 
1613
 
 
1614
    def _flush(self):
 
1615
        """See SmartClientStreamMedium._flush()."""
 
1616
        self._writeable_pipe.flush()
 
1617
 
 
1618
    def _read_bytes(self, count):
 
1619
        """See SmartClientStreamMedium._read_bytes."""
 
1620
        return self._readable_pipe.read(count)
 
1621
 
 
1622
 
 
1623
class SmartSSHClientMedium(SmartClientStreamMedium):
 
1624
    """A client medium using SSH."""
 
1625
    
 
1626
    def __init__(self, host, port=None, username=None, password=None,
 
1627
            vendor=None):
 
1628
        """Creates a client that will connect on the first use.
 
1629
        
 
1630
        :param vendor: An optional override for the ssh vendor to use. See
 
1631
            bzrlib.transport.ssh for details on ssh vendors.
 
1632
        """
 
1633
        SmartClientStreamMedium.__init__(self)
 
1634
        self._connected = False
 
1635
        self._host = host
 
1636
        self._password = password
 
1637
        self._port = port
 
1638
        self._username = username
 
1639
        self._read_from = None
 
1640
        self._ssh_connection = None
 
1641
        self._vendor = vendor
 
1642
        self._write_to = None
 
1643
 
 
1644
    def _accept_bytes(self, bytes):
 
1645
        """See SmartClientStreamMedium.accept_bytes."""
 
1646
        self._ensure_connection()
 
1647
        self._write_to.write(bytes)
 
1648
 
 
1649
    def disconnect(self):
 
1650
        """See SmartClientMedium.disconnect()."""
 
1651
        if not self._connected:
 
1652
            return
 
1653
        self._read_from.close()
 
1654
        self._write_to.close()
 
1655
        self._ssh_connection.close()
 
1656
        self._connected = False
 
1657
 
 
1658
    def _ensure_connection(self):
 
1659
        """Connect this medium if not already connected."""
 
1660
        if self._connected:
 
1661
            return
 
1662
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
 
1663
        if self._vendor is None:
 
1664
            vendor = ssh._get_ssh_vendor()
 
1665
        else:
 
1666
            vendor = self._vendor
 
1667
        self._ssh_connection = vendor.connect_ssh(self._username,
 
1668
                self._password, self._host, self._port,
 
1669
                command=[executable, 'serve', '--inet', '--directory=/',
 
1670
                         '--allow-writes'])
 
1671
        self._read_from, self._write_to = \
 
1672
            self._ssh_connection.get_filelike_channels()
 
1673
        self._connected = True
 
1674
 
 
1675
    def _flush(self):
 
1676
        """See SmartClientStreamMedium._flush()."""
 
1677
        self._write_to.flush()
 
1678
 
 
1679
    def _read_bytes(self, count):
 
1680
        """See SmartClientStreamMedium.read_bytes."""
 
1681
        if not self._connected:
 
1682
            raise errors.MediumNotConnected(self)
 
1683
        return self._read_from.read(count)
 
1684
 
 
1685
 
 
1686
class SmartTCPClientMedium(SmartClientStreamMedium):
 
1687
    """A client medium using TCP."""
 
1688
    
 
1689
    def __init__(self, host, port):
 
1690
        """Creates a client that will connect on the first use."""
 
1691
        SmartClientStreamMedium.__init__(self)
 
1692
        self._connected = False
 
1693
        self._host = host
 
1694
        self._port = port
 
1695
        self._socket = None
 
1696
 
 
1697
    def _accept_bytes(self, bytes):
 
1698
        """See SmartClientMedium.accept_bytes."""
 
1699
        self._ensure_connection()
 
1700
        self._socket.sendall(bytes)
 
1701
 
 
1702
    def disconnect(self):
 
1703
        """See SmartClientMedium.disconnect()."""
 
1704
        if not self._connected:
 
1705
            return
 
1706
        self._socket.close()
 
1707
        self._socket = None
 
1708
        self._connected = False
 
1709
 
 
1710
    def _ensure_connection(self):
 
1711
        """Connect this medium if not already connected."""
 
1712
        if self._connected:
 
1713
            return
 
1714
        self._socket = socket.socket()
 
1715
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
 
1716
        result = self._socket.connect_ex((self._host, int(self._port)))
 
1717
        if result:
 
1718
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
 
1719
                    (self._host, self._port, os.strerror(result)))
 
1720
        self._connected = True
 
1721
 
 
1722
    def _flush(self):
 
1723
        """See SmartClientStreamMedium._flush().
 
1724
        
 
1725
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
 
1726
        add a means to do a flush, but that can be done in the future.
 
1727
        """
 
1728
 
 
1729
    def _read_bytes(self, count):
 
1730
        """See SmartClientMedium.read_bytes."""
 
1731
        if not self._connected:
 
1732
            raise errors.MediumNotConnected(self)
 
1733
        return self._socket.recv(count)
 
1734
 
 
1735
 
 
1736
class SmartTCPTransport(SmartTransport):
 
1737
    """Connection to smart server over plain tcp.
 
1738
    
 
1739
    This is essentially just a factory to get 'RemoteTransport(url,
 
1740
        SmartTCPClientMedium).
 
1741
    """
 
1742
 
 
1743
    def __init__(self, url):
 
1744
        _scheme, _username, _password, _host, _port, _path = \
 
1745
            transport.split_url(url)
 
1746
        if _port is None:
 
1747
            _port = BZR_DEFAULT_PORT
 
1748
        else:
 
1749
            try:
 
1750
                _port = int(_port)
 
1751
            except (ValueError, TypeError), e:
 
1752
                raise errors.InvalidURL(
 
1753
                    path=url, extra="invalid port %s" % _port)
 
1754
        medium = SmartTCPClientMedium(_host, _port)
 
1755
        super(SmartTCPTransport, self).__init__(url, medium=medium)
 
1756
 
 
1757
 
 
1758
class SmartSSHTransport(SmartTransport):
 
1759
    """Connection to smart server over SSH.
 
1760
 
 
1761
    This is essentially just a factory to get 'RemoteTransport(url,
 
1762
        SmartSSHClientMedium).
 
1763
    """
 
1764
 
 
1765
    def __init__(self, url):
 
1766
        _scheme, _username, _password, _host, _port, _path = \
 
1767
            transport.split_url(url)
 
1768
        try:
 
1769
            if _port is not None:
 
1770
                _port = int(_port)
 
1771
        except (ValueError, TypeError), e:
 
1772
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
 
1773
                _port)
 
1774
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
 
1775
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
1776
 
 
1777
 
 
1778
class SmartHTTPTransport(SmartTransport):
 
1779
    """Just a way to connect between a bzr+http:// url and http://.
 
1780
    
 
1781
    This connection operates slightly differently than the SmartSSHTransport.
 
1782
    It uses a plain http:// transport underneath, which defines what remote
 
1783
    .bzr/smart URL we are connected to. From there, all paths that are sent are
 
1784
    sent as relative paths, this way, the remote side can properly
 
1785
    de-reference them, since it is likely doing rewrite rules to translate an
 
1786
    HTTP path into a local path.
 
1787
    """
 
1788
 
 
1789
    def __init__(self, url, http_transport=None):
 
1790
        assert url.startswith('bzr+http://')
 
1791
 
 
1792
        if http_transport is None:
 
1793
            http_url = url[len('bzr+'):]
 
1794
            self._http_transport = transport.get_transport(http_url)
 
1795
        else:
 
1796
            self._http_transport = http_transport
 
1797
        http_medium = self._http_transport.get_smart_medium()
 
1798
        super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
 
1799
 
 
1800
    def _remote_path(self, relpath):
 
1801
        """After connecting HTTP Transport only deals in relative URLs."""
 
1802
        if relpath == '.':
 
1803
            return ''
 
1804
        else:
 
1805
            return relpath
 
1806
 
 
1807
    def abspath(self, relpath):
 
1808
        """Return the full url to the given relative path.
 
1809
        
 
1810
        :param relpath: the relative path or path components
 
1811
        :type relpath: str or list
 
1812
        """
 
1813
        return self._unparse_url(self._combine_paths(self._path, relpath))
 
1814
 
 
1815
    def clone(self, relative_url):
 
1816
        """Make a new SmartHTTPTransport related to me.
 
1817
 
 
1818
        This is re-implemented rather than using the default
 
1819
        SmartTransport.clone() because we must be careful about the underlying
 
1820
        http transport.
 
1821
        """
 
1822
        if relative_url:
 
1823
            abs_url = self.abspath(relative_url)
 
1824
        else:
 
1825
            abs_url = self.base
 
1826
        # By cloning the underlying http_transport, we are able to share the
 
1827
        # connection.
 
1828
        new_transport = self._http_transport.clone(relative_url)
 
1829
        return SmartHTTPTransport(abs_url, http_transport=new_transport)
 
1830
 
 
1831
 
 
1832
def get_test_permutations():
 
1833
    """Return (transport, server) permutations for testing."""
 
1834
    ### We may need a little more test framework support to construct an
 
1835
    ### appropriate RemoteTransport in the future.
 
1836
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]