~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/transport/remote.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-05-04 12:10:51 UTC
  • mfrom: (5819.1.4 777007-developer-doc)
  • Revision ID: pqm@pqm.ubuntu.com-20110504121051-aovlsmqiivjmc4fc
(jelmer) Small fixes to developer documentation. (Jonathan Riddell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Smart-server protocol, client and server.
18
 
 
19
 
Requests are sent as a command and list of arguments, followed by optional
20
 
bulk body data.  Responses are similarly a response and list of arguments,
21
 
followed by bulk body data. ::
22
 
 
23
 
  SEP := '\001'
24
 
    Fields are separated by Ctrl-A.
25
 
  BULK_DATA := CHUNK TRAILER
26
 
    Chunks can be repeated as many times as necessary.
27
 
  CHUNK := CHUNK_LEN CHUNK_BODY
28
 
  CHUNK_LEN := DIGIT+ NEWLINE
29
 
    Gives the number of bytes in the following chunk.
30
 
  CHUNK_BODY := BYTE[chunk_len]
31
 
  TRAILER := SUCCESS_TRAILER | ERROR_TRAILER
32
 
  SUCCESS_TRAILER := 'done' NEWLINE
33
 
  ERROR_TRAILER := 
34
 
 
35
 
Paths are passed across the network.  The client needs to see a namespace that
36
 
includes any repository that might need to be referenced, and the client needs
37
 
to know about a root directory beyond which it cannot ascend.
38
 
 
39
 
Servers run over ssh will typically want to be able to access any path the user 
40
 
can access.  Public servers on the other hand (which might be over http, ssh
41
 
or tcp) will typically want to restrict access to only a particular directory 
42
 
and its children, so will want to do a software virtual root at that level.
43
 
In other words they'll want to rewrite incoming paths to be under that level
44
 
(and prevent escaping using ../ tricks.)
45
 
 
46
 
URLs that include ~ should probably be passed across to the server verbatim
47
 
and the server can expand them.  This will proably not be meaningful when 
48
 
limited to a directory?
49
 
 
50
 
At the bottom level socket, pipes, HTTP server.  For sockets, we have the idea
51
 
that you have multiple requests and get a read error because the other side did
52
 
shutdown.  For pipes we have read pipe which will have a zero read which marks
53
 
end-of-file.  For HTTP server environment there is not end-of-stream because
54
 
each request coming into the server is independent.
55
 
 
56
 
So we need a wrapper around pipes and sockets to seperate out requests from
57
 
substrate and this will give us a single model which is consist for HTTP,
58
 
sockets and pipes.
59
 
 
60
 
Server-side
61
 
-----------
62
 
 
63
 
 MEDIUM  (factory for protocol, reads bytes & pushes to protocol,
64
 
          uses protocol to detect end-of-request, sends written
65
 
          bytes to client) e.g. socket, pipe, HTTP request handler.
66
 
  ^
67
 
  | bytes.
68
 
  v
69
 
 
70
 
PROTOCOL  (serialization, deserialization)  accepts bytes for one
71
 
          request, decodes according to internal state, pushes
72
 
          structured data to handler.  accepts structured data from
73
 
          handler and encodes and writes to the medium.  factory for
74
 
          handler.
75
 
  ^
76
 
  | structured data
77
 
  v
78
 
 
79
 
HANDLER   (domain logic) accepts structured data, operates state
80
 
          machine until the request can be satisfied,
81
 
          sends structured data to the protocol.
82
 
 
83
 
 
84
 
Client-side
85
 
-----------
86
 
 
87
 
 CLIENT             domain logic, accepts domain requests, generated structured
88
 
                    data, reads structured data from responses and turns into
89
 
                    domain data.  Sends structured data to the protocol.
90
 
                    Operates state machines until the request can be delivered
91
 
                    (e.g. reading from a bundle generated in bzrlib to deliver a
92
 
                    complete request).
93
 
 
94
 
                    Possibly this should just be RemoteBzrDir, RemoteTransport,
95
 
                    ...
96
 
  ^
97
 
  | structured data
98
 
  v
99
 
 
100
 
PROTOCOL  (serialization, deserialization)  accepts structured data for one
101
 
          request, encodes and writes to the medium.  Reads bytes from the
102
 
          medium, decodes and allows the client to read structured data.
103
 
  ^
104
 
  | bytes.
105
 
  v
106
 
 
107
 
 MEDIUM  (accepts bytes from the protocol & delivers to the remote server.
108
 
          Allows the potocol to read bytes e.g. socket, pipe, HTTP request.
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""RemoteTransport client for the smart-server.
 
18
 
 
19
This module shouldn't be accessed directly.  The classes defined here should be
 
20
imported from bzrlib.smart.
109
21
"""
110
22
 
111
 
 
112
 
# TODO: _translate_error should be on the client, not the transport because
113
 
#     error coding is wire protocol specific.
114
 
 
115
 
# TODO: A plain integer from query_version is too simple; should give some
116
 
# capabilities too?
117
 
 
118
 
# TODO: Server should probably catch exceptions within itself and send them
119
 
# back across the network.  (But shouldn't catch KeyboardInterrupt etc)
120
 
# Also needs to somehow report protocol errors like bad requests.  Need to
121
 
# consider how we'll handle error reporting, e.g. if we get halfway through a
122
 
# bulk transfer and then something goes wrong.
123
 
 
124
 
# TODO: Standard marker at start of request/response lines?
125
 
 
126
 
# TODO: Make each request and response self-validatable, e.g. with checksums.
127
 
#
128
 
# TODO: get/put objects could be changed to gradually read back the data as it
129
 
# comes across the network
130
 
#
131
 
# TODO: What should the server do if it hits an error and has to terminate?
132
 
#
133
 
# TODO: is it useful to allow multiple chunks in the bulk data?
134
 
#
135
 
# TODO: If we get an exception during transmission of bulk data we can't just
136
 
# emit the exception because it won't be seen.
137
 
#   John proposes:  I think it would be worthwhile to have a header on each
138
 
#   chunk, that indicates it is another chunk. Then you can send an 'error'
139
 
#   chunk as long as you finish the previous chunk.
140
 
#
141
 
# TODO: Clone method on Transport; should work up towards parent directory;
142
 
# unclear how this should be stored or communicated to the server... maybe
143
 
# just pass it on all relevant requests?
144
 
#
145
 
# TODO: Better name than clone() for changing between directories.  How about
146
 
# open_dir or change_dir or chdir?
147
 
#
148
 
# TODO: Is it really good to have the notion of current directory within the
149
 
# connection?  Perhaps all Transports should factor out a common connection
150
 
# from the thing that has the directory context?
151
 
#
152
 
# TODO: Pull more things common to sftp and ssh to a higher level.
153
 
#
154
 
# TODO: The server that manages a connection should be quite small and retain
155
 
# minimum state because each of the requests are supposed to be stateless.
156
 
# Then we can write another implementation that maps to http.
157
 
#
158
 
# TODO: What to do when a client connection is garbage collected?  Maybe just
159
 
# abruptly drop the connection?
160
 
#
161
 
# TODO: Server in some cases will need to restrict access to files outside of
162
 
# a particular root directory.  LocalTransport doesn't do anything to stop you
163
 
# ascending above the base directory, so we need to prevent paths
164
 
# containing '..' in either the server or transport layers.  (Also need to
165
 
# consider what happens if someone creates a symlink pointing outside the 
166
 
# directory tree...)
167
 
#
168
 
# TODO: Server should rebase absolute paths coming across the network to put
169
 
# them under the virtual root, if one is in use.  LocalTransport currently
170
 
# doesn't do that; if you give it an absolute path it just uses it.
171
 
172
 
# XXX: Arguments can't contain newlines or ascii; possibly we should e.g.
173
 
# urlescape them instead.  Indeed possibly this should just literally be
174
 
# http-over-ssh.
175
 
#
176
 
# FIXME: This transport, with several others, has imperfect handling of paths
177
 
# within urls.  It'd probably be better for ".." from a root to raise an error
178
 
# rather than return the same directory as we do at present.
179
 
#
180
 
# TODO: Rather than working at the Transport layer we want a Branch,
181
 
# Repository or BzrDir objects that talk to a server.
182
 
#
183
 
# TODO: Probably want some way for server commands to gradually produce body
184
 
# data rather than passing it as a string; they could perhaps pass an
185
 
# iterator-like callback that will gradually yield data; it probably needs a
186
 
# close() method that will always be closed to do any necessary cleanup.
187
 
#
188
 
# TODO: Split the actual smart server from the ssh encoding of it.
189
 
#
190
 
# TODO: Perhaps support file-level readwrite operations over the transport
191
 
# too.
192
 
#
193
 
# TODO: SmartBzrDir class, proxying all Branch etc methods across to another
194
 
# branch doing file-level operations.
195
 
#
 
23
__all__ = ['RemoteTransport', 'RemoteTCPTransport', 'RemoteSSHTransport']
196
24
 
197
25
from cStringIO import StringIO
198
 
import os
199
 
import socket
200
 
import tempfile
201
 
import threading
202
 
import urllib
203
 
import urlparse
204
26
 
205
27
from bzrlib import (
206
 
    bzrdir,
 
28
    config,
 
29
    debug,
207
30
    errors,
208
 
    revision,
 
31
    remote,
 
32
    trace,
209
33
    transport,
210
 
    trace,
211
34
    urlutils,
212
35
    )
213
 
from bzrlib.bundle.serializer import write_bundle
214
 
try:
215
 
    from bzrlib.transport import ssh
216
 
except errors.ParamikoNotPresent:
217
 
    # no paramiko.  SmartSSHClientMedium will break.
218
 
    pass
219
 
 
220
 
# must do this otherwise urllib can't parse the urls properly :(
221
 
for scheme in ['ssh', 'bzr', 'bzr+loopback', 'bzr+ssh']:
222
 
    transport.register_urlparse_netloc_protocol(scheme)
223
 
del scheme
224
 
 
225
 
 
226
 
def _recv_tuple(from_file):
227
 
    req_line = from_file.readline()
228
 
    return _decode_tuple(req_line)
229
 
 
230
 
 
231
 
def _decode_tuple(req_line):
232
 
    if req_line == None or req_line == '':
233
 
        return None
234
 
    if req_line[-1] != '\n':
235
 
        raise errors.SmartProtocolError("request %r not terminated" % req_line)
236
 
    return tuple(req_line[:-1].split('\x01'))
237
 
 
238
 
 
239
 
def _encode_tuple(args):
240
 
    """Encode the tuple args to a bytestream."""
241
 
    return '\x01'.join(args) + '\n'
242
 
 
243
 
 
244
 
class SmartProtocolBase(object):
245
 
    """Methods common to client and server"""
246
 
 
247
 
    # TODO: this only actually accomodates a single block; possibly should
248
 
    # support multiple chunks?
249
 
    def _encode_bulk_data(self, body):
250
 
        """Encode body as a bulk data chunk."""
251
 
        return ''.join(('%d\n' % len(body), body, 'done\n'))
252
 
 
253
 
    def _serialise_offsets(self, offsets):
254
 
        """Serialise a readv offset list."""
255
 
        txt = []
256
 
        for start, length in offsets:
257
 
            txt.append('%d,%d' % (start, length))
258
 
        return '\n'.join(txt)
259
 
        
260
 
 
261
 
class SmartServerRequestProtocolOne(SmartProtocolBase):
262
 
    """Server-side encoding and decoding logic for smart version 1."""
263
 
    
264
 
    def __init__(self, backing_transport, write_func):
265
 
        self._backing_transport = backing_transport
266
 
        self.excess_buffer = ''
267
 
        self._finished = False
268
 
        self.in_buffer = ''
269
 
        self.has_dispatched = False
270
 
        self.request = None
271
 
        self._body_decoder = None
272
 
        self._write_func = write_func
273
 
 
274
 
    def accept_bytes(self, bytes):
275
 
        """Take bytes, and advance the internal state machine appropriately.
276
 
        
277
 
        :param bytes: must be a byte string
278
 
        """
279
 
        assert isinstance(bytes, str)
280
 
        self.in_buffer += bytes
281
 
        if not self.has_dispatched:
282
 
            if '\n' not in self.in_buffer:
283
 
                # no command line yet
284
 
                return
285
 
            self.has_dispatched = True
286
 
            try:
287
 
                first_line, self.in_buffer = self.in_buffer.split('\n', 1)
288
 
                first_line += '\n'
289
 
                req_args = _decode_tuple(first_line)
290
 
                self.request = SmartServerRequestHandler(
291
 
                    self._backing_transport)
292
 
                self.request.dispatch_command(req_args[0], req_args[1:])
293
 
                if self.request.finished_reading:
294
 
                    # trivial request
295
 
                    self.excess_buffer = self.in_buffer
296
 
                    self.in_buffer = ''
297
 
                    self._send_response(self.request.response.args,
298
 
                        self.request.response.body)
299
 
            except KeyboardInterrupt:
300
 
                raise
301
 
            except Exception, exception:
302
 
                # everything else: pass to client, flush, and quit
303
 
                self._send_response(('error', str(exception)))
304
 
                return
305
 
 
306
 
        if self.has_dispatched:
307
 
            if self._finished:
308
 
                # nothing to do.XXX: this routine should be a single state 
309
 
                # machine too.
310
 
                self.excess_buffer += self.in_buffer
311
 
                self.in_buffer = ''
312
 
                return
313
 
            if self._body_decoder is None:
314
 
                self._body_decoder = LengthPrefixedBodyDecoder()
315
 
            self._body_decoder.accept_bytes(self.in_buffer)
316
 
            self.in_buffer = self._body_decoder.unused_data
317
 
            body_data = self._body_decoder.read_pending_data()
318
 
            self.request.accept_body(body_data)
319
 
            if self._body_decoder.finished_reading:
320
 
                self.request.end_of_body()
321
 
                assert self.request.finished_reading, \
322
 
                    "no more body, request not finished"
323
 
            if self.request.response is not None:
324
 
                self._send_response(self.request.response.args,
325
 
                    self.request.response.body)
326
 
                self.excess_buffer = self.in_buffer
327
 
                self.in_buffer = ''
328
 
            else:
329
 
                assert not self.request.finished_reading, \
330
 
                    "no response and we have finished reading."
331
 
 
332
 
    def _send_response(self, args, body=None):
333
 
        """Send a smart server response down the output stream."""
334
 
        assert not self._finished, 'response already sent'
335
 
        self._finished = True
336
 
        self._write_func(_encode_tuple(args))
337
 
        if body is not None:
338
 
            assert isinstance(body, str), 'body must be a str'
339
 
            bytes = self._encode_bulk_data(body)
340
 
            self._write_func(bytes)
341
 
 
342
 
    def next_read_size(self):
343
 
        if self._finished:
344
 
            return 0
345
 
        if self._body_decoder is None:
346
 
            return 1
347
 
        else:
348
 
            return self._body_decoder.next_read_size()
349
 
 
350
 
 
351
 
class LengthPrefixedBodyDecoder(object):
352
 
    """Decodes the length-prefixed bulk data."""
353
 
    
354
 
    def __init__(self):
355
 
        self.bytes_left = None
356
 
        self.finished_reading = False
357
 
        self.unused_data = ''
358
 
        self.state_accept = self._state_accept_expecting_length
359
 
        self.state_read = self._state_read_no_data
360
 
        self._in_buffer = ''
361
 
        self._trailer_buffer = ''
362
 
    
363
 
    def accept_bytes(self, bytes):
364
 
        """Decode as much of bytes as possible.
365
 
 
366
 
        If 'bytes' contains too much data it will be appended to
367
 
        self.unused_data.
368
 
 
369
 
        finished_reading will be set when no more data is required.  Further
370
 
        data will be appended to self.unused_data.
371
 
        """
372
 
        # accept_bytes is allowed to change the state
373
 
        current_state = self.state_accept
374
 
        self.state_accept(bytes)
375
 
        while current_state != self.state_accept:
376
 
            current_state = self.state_accept
377
 
            self.state_accept('')
378
 
 
379
 
    def next_read_size(self):
380
 
        if self.bytes_left is not None:
381
 
            # Ideally we want to read all the remainder of the body and the
382
 
            # trailer in one go.
383
 
            return self.bytes_left + 5
384
 
        elif self.state_accept == self._state_accept_reading_trailer:
385
 
            # Just the trailer left
386
 
            return 5 - len(self._trailer_buffer)
387
 
        elif self.state_accept == self._state_accept_expecting_length:
388
 
            # There's still at least 6 bytes left ('\n' to end the length, plus
389
 
            # 'done\n').
390
 
            return 6
391
 
        else:
392
 
            # Reading excess data.  Either way, 1 byte at a time is fine.
393
 
            return 1
394
 
        
395
 
    def read_pending_data(self):
396
 
        """Return any pending data that has been decoded."""
397
 
        return self.state_read()
398
 
 
399
 
    def _state_accept_expecting_length(self, bytes):
400
 
        self._in_buffer += bytes
401
 
        pos = self._in_buffer.find('\n')
402
 
        if pos == -1:
403
 
            return
404
 
        self.bytes_left = int(self._in_buffer[:pos])
405
 
        self._in_buffer = self._in_buffer[pos+1:]
406
 
        self.bytes_left -= len(self._in_buffer)
407
 
        self.state_accept = self._state_accept_reading_body
408
 
        self.state_read = self._state_read_in_buffer
409
 
 
410
 
    def _state_accept_reading_body(self, bytes):
411
 
        self._in_buffer += bytes
412
 
        self.bytes_left -= len(bytes)
413
 
        if self.bytes_left <= 0:
414
 
            # Finished with body
415
 
            if self.bytes_left != 0:
416
 
                self._trailer_buffer = self._in_buffer[self.bytes_left:]
417
 
                self._in_buffer = self._in_buffer[:self.bytes_left]
418
 
            self.bytes_left = None
419
 
            self.state_accept = self._state_accept_reading_trailer
420
 
        
421
 
    def _state_accept_reading_trailer(self, bytes):
422
 
        self._trailer_buffer += bytes
423
 
        # TODO: what if the trailer does not match "done\n"?  Should this raise
424
 
        # a ProtocolViolation exception?
425
 
        if self._trailer_buffer.startswith('done\n'):
426
 
            self.unused_data = self._trailer_buffer[len('done\n'):]
427
 
            self.state_accept = self._state_accept_reading_unused
428
 
            self.finished_reading = True
429
 
    
430
 
    def _state_accept_reading_unused(self, bytes):
431
 
        self.unused_data += bytes
432
 
 
433
 
    def _state_read_no_data(self):
434
 
        return ''
435
 
 
436
 
    def _state_read_in_buffer(self):
437
 
        result = self._in_buffer
438
 
        self._in_buffer = ''
439
 
        return result
440
 
 
441
 
 
442
 
class SmartServerStreamMedium(object):
443
 
    """Handles smart commands coming over a stream.
444
 
 
445
 
    The stream may be a pipe connected to sshd, or a tcp socket, or an
446
 
    in-process fifo for testing.
447
 
 
448
 
    One instance is created for each connected client; it can serve multiple
449
 
    requests in the lifetime of the connection.
450
 
 
451
 
    The server passes requests through to an underlying backing transport, 
452
 
    which will typically be a LocalTransport looking at the server's filesystem.
453
 
    """
454
 
 
455
 
    def __init__(self, backing_transport):
456
 
        """Construct new server.
457
 
 
458
 
        :param backing_transport: Transport for the directory served.
459
 
        """
460
 
        # backing_transport could be passed to serve instead of __init__
461
 
        self.backing_transport = backing_transport
462
 
        self.finished = False
463
 
 
464
 
    def serve(self):
465
 
        """Serve requests until the client disconnects."""
466
 
        # Keep a reference to stderr because the sys module's globals get set to
467
 
        # None during interpreter shutdown.
468
 
        from sys import stderr
469
 
        try:
470
 
            while not self.finished:
471
 
                protocol = SmartServerRequestProtocolOne(self.backing_transport,
472
 
                                                         self._write_out)
473
 
                self._serve_one_request(protocol)
474
 
        except Exception, e:
475
 
            stderr.write("%s terminating on exception %s\n" % (self, e))
476
 
            raise
477
 
 
478
 
    def _serve_one_request(self, protocol):
479
 
        """Read one request from input, process, send back a response.
480
 
        
481
 
        :param protocol: a SmartServerRequestProtocol.
482
 
        """
483
 
        try:
484
 
            self._serve_one_request_unguarded(protocol)
485
 
        except KeyboardInterrupt:
486
 
            raise
487
 
        except Exception, e:
488
 
            self.terminate_due_to_error()
489
 
 
490
 
    def terminate_due_to_error(self):
491
 
        """Called when an unhandled exception from the protocol occurs."""
492
 
        raise NotImplementedError(self.terminate_due_to_error)
493
 
 
494
 
 
495
 
class SmartServerSocketStreamMedium(SmartServerStreamMedium):
496
 
 
497
 
    def __init__(self, sock, backing_transport):
498
 
        """Constructor.
499
 
 
500
 
        :param sock: the socket the server will read from.  It will be put
501
 
            into blocking mode.
502
 
        """
503
 
        SmartServerStreamMedium.__init__(self, backing_transport)
504
 
        self.push_back = ''
505
 
        sock.setblocking(True)
506
 
        self.socket = sock
507
 
 
508
 
    def _serve_one_request_unguarded(self, protocol):
509
 
        while protocol.next_read_size():
510
 
            if self.push_back:
511
 
                protocol.accept_bytes(self.push_back)
512
 
                self.push_back = ''
513
 
            else:
514
 
                bytes = self.socket.recv(4096)
515
 
                if bytes == '':
516
 
                    self.finished = True
517
 
                    return
518
 
                protocol.accept_bytes(bytes)
519
 
        
520
 
        self.push_back = protocol.excess_buffer
521
 
    
522
 
    def terminate_due_to_error(self):
523
 
        """Called when an unhandled exception from the protocol occurs."""
524
 
        # TODO: This should log to a server log file, but no such thing
525
 
        # exists yet.  Andrew Bennetts 2006-09-29.
526
 
        self.socket.close()
527
 
        self.finished = True
528
 
 
529
 
    def _write_out(self, bytes):
530
 
        self.socket.sendall(bytes)
531
 
 
532
 
 
533
 
class SmartServerPipeStreamMedium(SmartServerStreamMedium):
534
 
 
535
 
    def __init__(self, in_file, out_file, backing_transport):
536
 
        """Construct new server.
537
 
 
538
 
        :param in_file: Python file from which requests can be read.
539
 
        :param out_file: Python file to write responses.
540
 
        :param backing_transport: Transport for the directory served.
541
 
        """
542
 
        SmartServerStreamMedium.__init__(self, backing_transport)
543
 
        self._in = in_file
544
 
        self._out = out_file
545
 
 
546
 
    def _serve_one_request_unguarded(self, protocol):
547
 
        while True:
548
 
            bytes_to_read = protocol.next_read_size()
549
 
            if bytes_to_read == 0:
550
 
                # Finished serving this request.
551
 
                self._out.flush()
552
 
                return
553
 
            bytes = self._in.read(bytes_to_read)
554
 
            if bytes == '':
555
 
                # Connection has been closed.
556
 
                self.finished = True
557
 
                self._out.flush()
558
 
                return
559
 
            protocol.accept_bytes(bytes)
560
 
 
561
 
    def terminate_due_to_error(self):
562
 
        # TODO: This should log to a server log file, but no such thing
563
 
        # exists yet.  Andrew Bennetts 2006-09-29.
564
 
        self._out.close()
565
 
        self.finished = True
566
 
 
567
 
    def _write_out(self, bytes):
568
 
        self._out.write(bytes)
569
 
 
570
 
 
571
 
class SmartServerResponse(object):
572
 
    """Response generated by SmartServerRequestHandler."""
573
 
 
574
 
    def __init__(self, args, body=None):
575
 
        self.args = args
576
 
        self.body = body
577
 
 
578
 
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
579
 
# for delivering the data for a request. This could be done with as the
580
 
# StreamServer, though that would create conflation between request and response
581
 
# which may be undesirable.
582
 
 
583
 
 
584
 
class SmartServerRequestHandler(object):
585
 
    """Protocol logic for smart server.
586
 
    
587
 
    This doesn't handle serialization at all, it just processes requests and
588
 
    creates responses.
589
 
    """
590
 
 
591
 
    # IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
592
 
    # not contain encoding or decoding logic to allow the wire protocol to vary
593
 
    # from the object protocol: we will want to tweak the wire protocol separate
594
 
    # from the object model, and ideally we will be able to do that without
595
 
    # having a SmartServerRequestHandler subclass for each wire protocol, rather
596
 
    # just a Protocol subclass.
597
 
 
598
 
    # TODO: Better way of representing the body for commands that take it,
599
 
    # and allow it to be streamed into the server.
600
 
    
601
 
    def __init__(self, backing_transport):
602
 
        self._backing_transport = backing_transport
603
 
        self._converted_command = False
604
 
        self.finished_reading = False
605
 
        self._body_bytes = ''
606
 
        self.response = None
607
 
 
608
 
    def accept_body(self, bytes):
609
 
        """Accept body data.
610
 
 
611
 
        This should be overriden for each command that desired body data to
612
 
        handle the right format of that data. I.e. plain bytes, a bundle etc.
613
 
 
614
 
        The deserialisation into that format should be done in the Protocol
615
 
        object. Set self.desired_body_format to the format your method will
616
 
        handle.
617
 
        """
618
 
        # default fallback is to accumulate bytes.
619
 
        self._body_bytes += bytes
620
 
        
621
 
    def _end_of_body_handler(self):
622
 
        """An unimplemented end of body handler."""
623
 
        raise NotImplementedError(self._end_of_body_handler)
624
 
        
625
 
    def do_hello(self):
626
 
        """Answer a version request with my version."""
627
 
        return SmartServerResponse(('ok', '1'))
628
 
 
629
 
    def do_has(self, relpath):
630
 
        r = self._backing_transport.has(relpath) and 'yes' or 'no'
631
 
        return SmartServerResponse((r,))
632
 
 
633
 
    def do_get(self, relpath):
634
 
        backing_bytes = self._backing_transport.get_bytes(relpath)
635
 
        return SmartServerResponse(('ok',), backing_bytes)
636
 
 
637
 
    def _deserialise_optional_mode(self, mode):
638
 
        # XXX: FIXME this should be on the protocol object.
639
 
        if mode == '':
640
 
            return None
641
 
        else:
642
 
            return int(mode)
643
 
 
644
 
    def do_append(self, relpath, mode):
645
 
        self._converted_command = True
646
 
        self._relpath = relpath
647
 
        self._mode = self._deserialise_optional_mode(mode)
648
 
        self._end_of_body_handler = self._handle_do_append_end
649
 
    
650
 
    def _handle_do_append_end(self):
651
 
        old_length = self._backing_transport.append_bytes(
652
 
            self._relpath, self._body_bytes, self._mode)
653
 
        self.response = SmartServerResponse(('appended', '%d' % old_length))
654
 
 
655
 
    def do_delete(self, relpath):
656
 
        self._backing_transport.delete(relpath)
657
 
 
658
 
    def do_iter_files_recursive(self, relpath):
659
 
        transport = self._backing_transport.clone(relpath)
660
 
        filenames = transport.iter_files_recursive()
661
 
        return SmartServerResponse(('names',) + tuple(filenames))
662
 
 
663
 
    def do_list_dir(self, relpath):
664
 
        filenames = self._backing_transport.list_dir(relpath)
665
 
        return SmartServerResponse(('names',) + tuple(filenames))
666
 
 
667
 
    def do_mkdir(self, relpath, mode):
668
 
        self._backing_transport.mkdir(relpath,
669
 
                                      self._deserialise_optional_mode(mode))
670
 
 
671
 
    def do_move(self, rel_from, rel_to):
672
 
        self._backing_transport.move(rel_from, rel_to)
673
 
 
674
 
    def do_put(self, relpath, mode):
675
 
        self._converted_command = True
676
 
        self._relpath = relpath
677
 
        self._mode = self._deserialise_optional_mode(mode)
678
 
        self._end_of_body_handler = self._handle_do_put
679
 
 
680
 
    def _handle_do_put(self):
681
 
        self._backing_transport.put_bytes(self._relpath,
682
 
                self._body_bytes, self._mode)
683
 
        self.response = SmartServerResponse(('ok',))
684
 
 
685
 
    def _deserialise_offsets(self, text):
686
 
        # XXX: FIXME this should be on the protocol object.
687
 
        offsets = []
688
 
        for line in text.split('\n'):
689
 
            if not line:
690
 
                continue
691
 
            start, length = line.split(',')
692
 
            offsets.append((int(start), int(length)))
693
 
        return offsets
694
 
 
695
 
    def do_put_non_atomic(self, relpath, mode, create_parent, dir_mode):
696
 
        self._converted_command = True
697
 
        self._end_of_body_handler = self._handle_put_non_atomic
698
 
        self._relpath = relpath
699
 
        self._dir_mode = self._deserialise_optional_mode(dir_mode)
700
 
        self._mode = self._deserialise_optional_mode(mode)
701
 
        # a boolean would be nicer XXX
702
 
        self._create_parent = (create_parent == 'T')
703
 
 
704
 
    def _handle_put_non_atomic(self):
705
 
        self._backing_transport.put_bytes_non_atomic(self._relpath,
706
 
                self._body_bytes,
707
 
                mode=self._mode,
708
 
                create_parent_dir=self._create_parent,
709
 
                dir_mode=self._dir_mode)
710
 
        self.response = SmartServerResponse(('ok',))
711
 
 
712
 
    def do_readv(self, relpath):
713
 
        self._converted_command = True
714
 
        self._end_of_body_handler = self._handle_readv_offsets
715
 
        self._relpath = relpath
716
 
 
717
 
    def end_of_body(self):
718
 
        """No more body data will be received."""
719
 
        self._run_handler_code(self._end_of_body_handler, (), {})
720
 
        # cannot read after this.
721
 
        self.finished_reading = True
722
 
 
723
 
    def _handle_readv_offsets(self):
724
 
        """accept offsets for a readv request."""
725
 
        offsets = self._deserialise_offsets(self._body_bytes)
726
 
        backing_bytes = ''.join(bytes for offset, bytes in
727
 
            self._backing_transport.readv(self._relpath, offsets))
728
 
        self.response = SmartServerResponse(('readv',), backing_bytes)
729
 
        
730
 
    def do_rename(self, rel_from, rel_to):
731
 
        self._backing_transport.rename(rel_from, rel_to)
732
 
 
733
 
    def do_rmdir(self, relpath):
734
 
        self._backing_transport.rmdir(relpath)
735
 
 
736
 
    def do_stat(self, relpath):
737
 
        stat = self._backing_transport.stat(relpath)
738
 
        return SmartServerResponse(('stat', str(stat.st_size), oct(stat.st_mode)))
739
 
        
740
 
    def do_get_bundle(self, path, revision_id):
741
 
        # open transport relative to our base
742
 
        t = self._backing_transport.clone(path)
743
 
        control, extra_path = bzrdir.BzrDir.open_containing_from_transport(t)
744
 
        repo = control.open_repository()
745
 
        tmpf = tempfile.TemporaryFile()
746
 
        base_revision = revision.NULL_REVISION
747
 
        write_bundle(repo, revision_id, base_revision, tmpf)
748
 
        tmpf.seek(0)
749
 
        return SmartServerResponse((), tmpf.read())
750
 
 
751
 
    def dispatch_command(self, cmd, args):
752
 
        """Deprecated compatibility method.""" # XXX XXX
753
 
        func = getattr(self, 'do_' + cmd, None)
754
 
        if func is None:
755
 
            raise errors.SmartProtocolError("bad request %r" % (cmd,))
756
 
        self._run_handler_code(func, args, {})
757
 
 
758
 
    def _run_handler_code(self, callable, args, kwargs):
759
 
        """Run some handler specific code 'callable'.
760
 
 
761
 
        If a result is returned, it is considered to be the commands response,
762
 
        and finished_reading is set true, and its assigned to self.response.
763
 
 
764
 
        Any exceptions caught are translated and a response object created
765
 
        from them.
766
 
        """
767
 
        result = self._call_converting_errors(callable, args, kwargs)
768
 
        if result is not None:
769
 
            self.response = result
770
 
            self.finished_reading = True
771
 
        # handle unconverted commands
772
 
        if not self._converted_command:
773
 
            self.finished_reading = True
774
 
            if result is None:
775
 
                self.response = SmartServerResponse(('ok',))
776
 
 
777
 
    def _call_converting_errors(self, callable, args, kwargs):
778
 
        """Call callable converting errors to Response objects."""
779
 
        try:
780
 
            return callable(*args, **kwargs)
781
 
        except errors.NoSuchFile, e:
782
 
            return SmartServerResponse(('NoSuchFile', e.path))
783
 
        except errors.FileExists, e:
784
 
            return SmartServerResponse(('FileExists', e.path))
785
 
        except errors.DirectoryNotEmpty, e:
786
 
            return SmartServerResponse(('DirectoryNotEmpty', e.path))
787
 
        except errors.ShortReadvError, e:
788
 
            return SmartServerResponse(('ShortReadvError',
789
 
                e.path, str(e.offset), str(e.length), str(e.actual)))
790
 
        except UnicodeError, e:
791
 
            # If it is a DecodeError, than most likely we are starting
792
 
            # with a plain string
793
 
            str_or_unicode = e.object
794
 
            if isinstance(str_or_unicode, unicode):
795
 
                # XXX: UTF-8 might have \x01 (our seperator byte) in it.  We
796
 
                # should escape it somehow.
797
 
                val = 'u:' + str_or_unicode.encode('utf-8')
798
 
            else:
799
 
                val = 's:' + str_or_unicode.encode('base64')
800
 
            # This handles UnicodeEncodeError or UnicodeDecodeError
801
 
            return SmartServerResponse((e.__class__.__name__,
802
 
                    e.encoding, val, str(e.start), str(e.end), e.reason))
803
 
        except errors.TransportNotPossible, e:
804
 
            if e.msg == "readonly transport":
805
 
                return SmartServerResponse(('ReadOnlyError', ))
806
 
            else:
807
 
                raise
808
 
 
809
 
 
810
 
class SmartTCPServer(object):
811
 
    """Listens on a TCP socket and accepts connections from smart clients"""
812
 
 
813
 
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
814
 
        """Construct a new server.
815
 
 
816
 
        To actually start it running, call either start_background_thread or
817
 
        serve.
818
 
 
819
 
        :param host: Name of the interface to listen on.
820
 
        :param port: TCP port to listen on, or 0 to allocate a transient port.
821
 
        """
822
 
        self._server_socket = socket.socket()
823
 
        self._server_socket.bind((host, port))
824
 
        self.port = self._server_socket.getsockname()[1]
825
 
        self._server_socket.listen(1)
826
 
        self._server_socket.settimeout(1)
827
 
        self.backing_transport = backing_transport
828
 
 
829
 
    def serve(self):
830
 
        # let connections timeout so that we get a chance to terminate
831
 
        # Keep a reference to the exceptions we want to catch because the socket
832
 
        # module's globals get set to None during interpreter shutdown.
833
 
        from socket import timeout as socket_timeout
834
 
        from socket import error as socket_error
835
 
        self._should_terminate = False
836
 
        while not self._should_terminate:
837
 
            try:
838
 
                self.accept_and_serve()
839
 
            except socket_timeout:
840
 
                # just check if we're asked to stop
841
 
                pass
842
 
            except socket_error, e:
843
 
                trace.warning("client disconnected: %s", e)
844
 
                pass
845
 
 
846
 
    def get_url(self):
847
 
        """Return the url of the server"""
848
 
        return "bzr://%s:%d/" % self._server_socket.getsockname()
849
 
 
850
 
    def accept_and_serve(self):
851
 
        conn, client_addr = self._server_socket.accept()
852
 
        # For WIN32, where the timeout value from the listening socket
853
 
        # propogates to the newly accepted socket.
854
 
        conn.setblocking(True)
855
 
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
856
 
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
857
 
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
858
 
        connection_thread.setDaemon(True)
859
 
        connection_thread.start()
860
 
 
861
 
    def start_background_thread(self):
862
 
        self._server_thread = threading.Thread(None,
863
 
                self.serve,
864
 
                name='server-' + self.get_url())
865
 
        self._server_thread.setDaemon(True)
866
 
        self._server_thread.start()
867
 
 
868
 
    def stop_background_thread(self):
869
 
        self._should_terminate = True
870
 
        # self._server_socket.close()
871
 
        # we used to join the thread, but it's not really necessary; it will
872
 
        # terminate in time
873
 
        ## self._server_thread.join()
874
 
 
875
 
 
876
 
class SmartTCPServer_for_testing(SmartTCPServer):
877
 
    """Server suitable for use by transport tests.
878
 
    
879
 
    This server is backed by the process's cwd.
880
 
    """
881
 
 
882
 
    def __init__(self):
883
 
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
884
 
        # The server is set up by default like for ssh access: the client
885
 
        # passes filesystem-absolute paths; therefore the server must look
886
 
        # them up relative to the root directory.  it might be better to act
887
 
        # a public server and have the server rewrite paths into the test
888
 
        # directory.
889
 
        SmartTCPServer.__init__(self,
890
 
            transport.get_transport(urlutils.local_path_to_url('/')))
891
 
        
892
 
    def setUp(self):
893
 
        """Set up server for testing"""
894
 
        self.start_background_thread()
895
 
 
896
 
    def tearDown(self):
897
 
        self.stop_background_thread()
898
 
 
899
 
    def get_url(self):
900
 
        """Return the url of the server"""
901
 
        host, port = self._server_socket.getsockname()
902
 
        return "bzr://%s:%d%s" % (host, port, urlutils.escape(self._homedir))
903
 
 
904
 
    def get_bogus_url(self):
905
 
        """Return a URL which will fail to connect"""
906
 
        return 'bzr://127.0.0.1:1/'
907
 
 
908
 
 
909
 
class SmartStat(object):
 
36
from bzrlib.smart import client, medium
 
37
from bzrlib.symbol_versioning import (
 
38
    deprecated_method,
 
39
    )
 
40
 
 
41
 
 
42
class _SmartStat(object):
910
43
 
911
44
    def __init__(self, size, mode):
912
45
        self.st_size = size
913
46
        self.st_mode = mode
914
47
 
915
48
 
916
 
class SmartTransport(transport.Transport):
 
49
class RemoteTransport(transport.ConnectedTransport):
917
50
    """Connection to a smart server.
918
51
 
919
 
    The connection holds references to pipes that can be used to send requests
920
 
    to the server.
 
52
    The connection holds references to the medium that can be used to send
 
53
    requests to the server.
921
54
 
922
55
    The connection has a notion of the current directory to which it's
923
56
    connected; this is incorporated in filenames passed to the server.
924
 
    
925
 
    This supports some higher-level RPC operations and can also be treated 
 
57
 
 
58
    This supports some higher-level RPC operations and can also be treated
926
59
    like a Transport to do file-like operations.
927
60
 
928
 
    The connection can be made over a tcp socket, or (in future) an ssh pipe
929
 
    or a series of http requests.  There are concrete subclasses for each
930
 
    type: SmartTCPTransport, etc.
 
61
    The connection can be made over a tcp socket, an ssh pipe or a series of
 
62
    http requests.  There are concrete subclasses for each type:
 
63
    RemoteTCPTransport, etc.
931
64
    """
932
65
 
933
 
    # IMPORTANT FOR IMPLEMENTORS: SmartTransport MUST NOT be given encoding
 
66
    # When making a readv request, cap it at requesting 5MB of data
 
67
    _max_readv_bytes = 5*1024*1024
 
68
 
 
69
    # IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
934
70
    # responsibilities: Put those on SmartClient or similar. This is vital for
935
71
    # the ability to support multiple versions of the smart protocol over time:
936
 
    # SmartTransport is an adapter from the Transport object model to the 
 
72
    # RemoteTransport is an adapter from the Transport object model to the
937
73
    # SmartClient model, not an encoder.
938
74
 
939
 
    def __init__(self, url, clone_from=None, medium=None):
 
75
    # FIXME: the medium parameter should be private, only the tests requires
 
76
    # it. It may be even clearer to define a TestRemoteTransport that handles
 
77
    # the specific cases of providing a _client and/or a _medium, and leave
 
78
    # RemoteTransport as an abstract class.
 
79
    def __init__(self, url, _from_transport=None, medium=None, _client=None):
940
80
        """Constructor.
941
81
 
942
 
        :param medium: The medium to use for this RemoteTransport. This must be
943
 
            supplied if clone_from is None.
944
 
        """
945
 
        ### Technically super() here is faulty because Transport's __init__
946
 
        ### fails to take 2 parameters, and if super were to choose a silly
947
 
        ### initialisation order things would blow up. 
948
 
        if not url.endswith('/'):
949
 
            url += '/'
950
 
        super(SmartTransport, self).__init__(url)
951
 
        self._scheme, self._username, self._password, self._host, self._port, self._path = \
952
 
                transport.split_url(url)
953
 
        if clone_from is None:
954
 
            self._medium = medium
955
 
        else:
956
 
            # credentials may be stripped from the base in some circumstances
957
 
            # as yet to be clearly defined or documented, so copy them.
958
 
            self._username = clone_from._username
959
 
            # reuse same connection
960
 
            self._medium = clone_from._medium
961
 
        assert self._medium is not None
962
 
 
963
 
    def abspath(self, relpath):
964
 
        """Return the full url to the given relative path.
965
 
        
966
 
        @param relpath: the relative path or path components
967
 
        @type relpath: str or list
968
 
        """
969
 
        return self._unparse_url(self._remote_path(relpath))
970
 
    
971
 
    def clone(self, relative_url):
972
 
        """Make a new SmartTransport related to me, sharing the same connection.
973
 
 
974
 
        This essentially opens a handle on a different remote directory.
975
 
        """
976
 
        if relative_url is None:
977
 
            return SmartTransport(self.base, self)
978
 
        else:
979
 
            return SmartTransport(self.abspath(relative_url), self)
 
82
        :param _from_transport: Another RemoteTransport instance that this
 
83
            one is being cloned from.  Attributes such as the medium will
 
84
            be reused.
 
85
 
 
86
        :param medium: The medium to use for this RemoteTransport.  If None,
 
87
            the medium from the _from_transport is shared.  If both this
 
88
            and _from_transport are None, a new medium will be built.
 
89
            _from_transport and medium cannot both be specified.
 
90
 
 
91
        :param _client: Override the _SmartClient used by this transport.  This
 
92
            should only be used for testing purposes; normally this is
 
93
            determined from the medium.
 
94
        """
 
95
        super(RemoteTransport, self).__init__(
 
96
            url, _from_transport=_from_transport)
 
97
 
 
98
        # The medium is the connection, except when we need to share it with
 
99
        # other objects (RemoteBzrDir, RemoteRepository etc). In these cases
 
100
        # what we want to share is really the shared connection.
 
101
 
 
102
        if (_from_transport is not None
 
103
            and isinstance(_from_transport, RemoteTransport)):
 
104
            _client = _from_transport._client
 
105
        elif _from_transport is None:
 
106
            # If no _from_transport is specified, we need to intialize the
 
107
            # shared medium.
 
108
            credentials = None
 
109
            if medium is None:
 
110
                medium, credentials = self._build_medium()
 
111
                if 'hpss' in debug.debug_flags:
 
112
                    trace.mutter('hpss: Built a new medium: %s',
 
113
                                 medium.__class__.__name__)
 
114
            self._shared_connection = transport._SharedConnection(medium,
 
115
                                                                  credentials,
 
116
                                                                  self.base)
 
117
        elif medium is None:
 
118
            # No medium was specified, so share the medium from the
 
119
            # _from_transport.
 
120
            medium = self._shared_connection.connection
 
121
        else:
 
122
            raise AssertionError(
 
123
                "Both _from_transport (%r) and medium (%r) passed to "
 
124
                "RemoteTransport.__init__, but these parameters are mutally "
 
125
                "exclusive." % (_from_transport, medium))
 
126
 
 
127
        if _client is None:
 
128
            self._client = client._SmartClient(medium)
 
129
        else:
 
130
            self._client = _client
 
131
 
 
132
    def _build_medium(self):
 
133
        """Create the medium if _from_transport does not provide one.
 
134
 
 
135
        The medium is analogous to the connection for ConnectedTransport: it
 
136
        allows connection sharing.
 
137
        """
 
138
        # No credentials
 
139
        return None, None
 
140
 
 
141
    def _report_activity(self, bytes, direction):
 
142
        """See Transport._report_activity.
 
143
 
 
144
        Does nothing; the smart medium will report activity triggered by a
 
145
        RemoteTransport.
 
146
        """
 
147
        pass
980
148
 
981
149
    def is_readonly(self):
982
150
        """Smart server transport can do read/write file operations."""
983
 
        return False
984
 
                                                   
 
151
        try:
 
152
            resp = self._call2('Transport.is_readonly')
 
153
        except errors.UnknownSmartMethod:
 
154
            # XXX: nasty hack: servers before 0.16 don't have a
 
155
            # 'Transport.is_readonly' verb, so we do what clients before 0.16
 
156
            # did: assume False.
 
157
            return False
 
158
        if resp == ('yes', ):
 
159
            return True
 
160
        elif resp == ('no', ):
 
161
            return False
 
162
        else:
 
163
            raise errors.UnexpectedSmartServerResponse(resp)
 
164
 
985
165
    def get_smart_client(self):
986
 
        return self._medium
 
166
        return self._get_connection()
987
167
 
988
168
    def get_smart_medium(self):
989
 
        return self._medium
990
 
                                                   
991
 
    def _unparse_url(self, path):
992
 
        """Return URL for a path.
993
 
 
994
 
        :see: SFTPUrlHandling._unparse_url
995
 
        """
996
 
        # TODO: Eventually it should be possible to unify this with
997
 
        # SFTPUrlHandling._unparse_url?
998
 
        if path == '':
999
 
            path = '/'
1000
 
        path = urllib.quote(path)
1001
 
        netloc = urllib.quote(self._host)
1002
 
        if self._username is not None:
1003
 
            netloc = '%s@%s' % (urllib.quote(self._username), netloc)
1004
 
        if self._port is not None:
1005
 
            netloc = '%s:%d' % (netloc, self._port)
1006
 
        return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
 
169
        return self._get_connection()
1007
170
 
1008
171
    def _remote_path(self, relpath):
1009
172
        """Returns the Unicode version of the absolute path for relpath."""
1011
174
 
1012
175
    def _call(self, method, *args):
1013
176
        resp = self._call2(method, *args)
1014
 
        self._translate_error(resp)
 
177
        self._ensure_ok(resp)
1015
178
 
1016
179
    def _call2(self, method, *args):
1017
180
        """Call a method on the remote server."""
1018
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1019
 
        protocol.call(method, *args)
1020
 
        return protocol.read_response_tuple()
 
181
        try:
 
182
            return self._client.call(method, *args)
 
183
        except errors.ErrorFromSmartServer, err:
 
184
            # The first argument, if present, is always a path.
 
185
            if args:
 
186
                context = {'relpath': args[0]}
 
187
            else:
 
188
                context = {}
 
189
            self._translate_error(err, **context)
1021
190
 
1022
191
    def _call_with_body_bytes(self, method, args, body):
1023
192
        """Call a method on the remote server with body bytes."""
1024
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1025
 
        protocol.call_with_body_bytes((method, ) + args, body)
1026
 
        return protocol.read_response_tuple()
 
193
        try:
 
194
            return self._client.call_with_body_bytes(method, args, body)
 
195
        except errors.ErrorFromSmartServer, err:
 
196
            # The first argument, if present, is always a path.
 
197
            if args:
 
198
                context = {'relpath': args[0]}
 
199
            else:
 
200
                context = {}
 
201
            self._translate_error(err, **context)
1027
202
 
1028
203
    def has(self, relpath):
1029
204
        """Indicate whether a remote file of the given name exists or not.
1036
211
        elif resp == ('no', ):
1037
212
            return False
1038
213
        else:
1039
 
            self._translate_error(resp)
 
214
            raise errors.UnexpectedSmartServerResponse(resp)
1040
215
 
1041
216
    def get(self, relpath):
1042
217
        """Return file-like object reading the contents of a remote file.
1043
 
        
 
218
 
1044
219
        :see: Transport.get_bytes()/get_file()
1045
220
        """
1046
221
        return StringIO(self.get_bytes(relpath))
1047
222
 
1048
223
    def get_bytes(self, relpath):
1049
224
        remote = self._remote_path(relpath)
1050
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1051
 
        protocol.call('get', remote)
1052
 
        resp = protocol.read_response_tuple(True)
 
225
        try:
 
226
            resp, response_handler = self._client.call_expecting_body('get', remote)
 
227
        except errors.ErrorFromSmartServer, err:
 
228
            self._translate_error(err, relpath)
1053
229
        if resp != ('ok', ):
1054
 
            protocol.cancel_read_body()
1055
 
            self._translate_error(resp, relpath)
1056
 
        return protocol.read_body_bytes()
 
230
            response_handler.cancel_read_body()
 
231
            raise errors.UnexpectedSmartServerResponse(resp)
 
232
        return response_handler.read_body_bytes()
1057
233
 
1058
234
    def _serialise_optional_mode(self, mode):
1059
235
        if mode is None:
1064
240
    def mkdir(self, relpath, mode=None):
1065
241
        resp = self._call2('mkdir', self._remote_path(relpath),
1066
242
            self._serialise_optional_mode(mode))
1067
 
        self._translate_error(resp)
 
243
 
 
244
    def open_write_stream(self, relpath, mode=None):
 
245
        """See Transport.open_write_stream."""
 
246
        self.put_bytes(relpath, "", mode)
 
247
        result = transport.AppendBasedFileStream(self, relpath)
 
248
        transport._file_streams[self.abspath(relpath)] = result
 
249
        return result
1068
250
 
1069
251
    def put_bytes(self, relpath, upload_contents, mode=None):
1070
252
        # FIXME: upload_file is probably not safe for non-ascii characters -
1071
253
        # should probably just pass all parameters as length-delimited
1072
254
        # strings?
 
255
        if type(upload_contents) is unicode:
 
256
            # Although not strictly correct, we raise UnicodeEncodeError to be
 
257
            # compatible with other transports.
 
258
            raise UnicodeEncodeError(
 
259
                'undefined', upload_contents, 0, 1,
 
260
                'put_bytes must be given bytes, not unicode.')
1073
261
        resp = self._call_with_body_bytes('put',
1074
262
            (self._remote_path(relpath), self._serialise_optional_mode(mode)),
1075
263
            upload_contents)
1076
 
        self._translate_error(resp)
 
264
        self._ensure_ok(resp)
 
265
        return len(upload_contents)
1077
266
 
1078
267
    def put_bytes_non_atomic(self, relpath, bytes, mode=None,
1079
268
                             create_parent_dir=False,
1089
278
            (self._remote_path(relpath), self._serialise_optional_mode(mode),
1090
279
             create_parent_str, self._serialise_optional_mode(dir_mode)),
1091
280
            bytes)
1092
 
        self._translate_error(resp)
 
281
        self._ensure_ok(resp)
1093
282
 
1094
283
    def put_file(self, relpath, upload_file, mode=None):
1095
284
        # its not ideal to seek back, but currently put_non_atomic_file depends
1111
300
 
1112
301
    def append_file(self, relpath, from_file, mode=None):
1113
302
        return self.append_bytes(relpath, from_file.read(), mode)
1114
 
        
 
303
 
1115
304
    def append_bytes(self, relpath, bytes, mode=None):
1116
305
        resp = self._call_with_body_bytes(
1117
306
            'append',
1119
308
            bytes)
1120
309
        if resp[0] == 'appended':
1121
310
            return int(resp[1])
1122
 
        self._translate_error(resp)
 
311
        raise errors.UnexpectedSmartServerResponse(resp)
1123
312
 
1124
313
    def delete(self, relpath):
1125
314
        resp = self._call2('delete', self._remote_path(relpath))
1126
 
        self._translate_error(resp)
1127
 
 
1128
 
    def readv(self, relpath, offsets):
 
315
        self._ensure_ok(resp)
 
316
 
 
317
    def external_url(self):
 
318
        """See bzrlib.transport.Transport.external_url."""
 
319
        # the external path for RemoteTransports is the base
 
320
        return self.base
 
321
 
 
322
    def recommended_page_size(self):
 
323
        """Return the recommended page size for this transport."""
 
324
        return 64 * 1024
 
325
 
 
326
    def _readv(self, relpath, offsets):
1129
327
        if not offsets:
1130
328
            return
1131
329
 
1132
330
        offsets = list(offsets)
1133
331
 
1134
332
        sorted_offsets = sorted(offsets)
1135
 
        # turn the list of offsets into a stack
1136
 
        offset_stack = iter(offsets)
1137
 
        cur_offset_and_size = offset_stack.next()
1138
333
        coalesced = list(self._coalesce_offsets(sorted_offsets,
1139
334
                               limit=self._max_readv_combine,
1140
 
                               fudge_factor=self._bytes_to_read_before_seek))
1141
 
 
1142
 
        protocol = SmartClientRequestProtocolOne(self._medium.get_request())
1143
 
        protocol.call_with_body_readv_array(
1144
 
            ('readv', self._remote_path(relpath)),
1145
 
            [(c.start, c.length) for c in coalesced])
1146
 
        resp = protocol.read_response_tuple(True)
1147
 
 
1148
 
        if resp[0] != 'readv':
1149
 
            # This should raise an exception
1150
 
            protocol.cancel_read_body()
1151
 
            self._translate_error(resp)
1152
 
            return
1153
 
 
1154
 
        # FIXME: this should know how many bytes are needed, for clarity.
1155
 
        data = protocol.read_body_bytes()
 
335
                               fudge_factor=self._bytes_to_read_before_seek,
 
336
                               max_size=self._max_readv_bytes))
 
337
 
 
338
        # now that we've coallesced things, avoid making enormous requests
 
339
        requests = []
 
340
        cur_request = []
 
341
        cur_len = 0
 
342
        for c in coalesced:
 
343
            if c.length + cur_len > self._max_readv_bytes:
 
344
                requests.append(cur_request)
 
345
                cur_request = [c]
 
346
                cur_len = c.length
 
347
                continue
 
348
            cur_request.append(c)
 
349
            cur_len += c.length
 
350
        if cur_request:
 
351
            requests.append(cur_request)
 
352
        if 'hpss' in debug.debug_flags:
 
353
            trace.mutter('%s.readv %s offsets => %s coalesced'
 
354
                         ' => %s requests (%s)',
 
355
                         self.__class__.__name__, len(offsets), len(coalesced),
 
356
                         len(requests), sum(map(len, requests)))
1156
357
        # Cache the results, but only until they have been fulfilled
1157
358
        data_map = {}
 
359
        # turn the list of offsets into a single stack to iterate
 
360
        offset_stack = iter(offsets)
 
361
        # using a list so it can be modified when passing down and coming back
 
362
        next_offset = [offset_stack.next()]
 
363
        for cur_request in requests:
 
364
            try:
 
365
                result = self._client.call_with_body_readv_array(
 
366
                    ('readv', self._remote_path(relpath),),
 
367
                    [(c.start, c.length) for c in cur_request])
 
368
                resp, response_handler = result
 
369
            except errors.ErrorFromSmartServer, err:
 
370
                self._translate_error(err, relpath)
 
371
 
 
372
            if resp[0] != 'readv':
 
373
                # This should raise an exception
 
374
                response_handler.cancel_read_body()
 
375
                raise errors.UnexpectedSmartServerResponse(resp)
 
376
 
 
377
            for res in self._handle_response(offset_stack, cur_request,
 
378
                                             response_handler,
 
379
                                             data_map,
 
380
                                             next_offset):
 
381
                yield res
 
382
 
 
383
    def _handle_response(self, offset_stack, coalesced, response_handler,
 
384
                         data_map, next_offset):
 
385
        cur_offset_and_size = next_offset[0]
 
386
        # FIXME: this should know how many bytes are needed, for clarity.
 
387
        data = response_handler.read_body_bytes()
 
388
        data_offset = 0
1158
389
        for c_offset in coalesced:
1159
390
            if len(data) < c_offset.length:
1160
391
                raise errors.ShortReadvError(relpath, c_offset.start,
1161
392
                            c_offset.length, actual=len(data))
1162
393
            for suboffset, subsize in c_offset.ranges:
1163
394
                key = (c_offset.start+suboffset, subsize)
1164
 
                data_map[key] = data[suboffset:suboffset+subsize]
1165
 
            data = data[c_offset.length:]
 
395
                this_data = data[data_offset+suboffset:
 
396
                                 data_offset+suboffset+subsize]
 
397
                # Special case when the data is in-order, rather than packing
 
398
                # into a map and then back out again. Benchmarking shows that
 
399
                # this has 100% hit rate, but leave in the data_map work just
 
400
                # in case.
 
401
                # TODO: Could we get away with using buffer() to avoid the
 
402
                #       memory copy?  Callers would need to realize they may
 
403
                #       not have a real string.
 
404
                if key == cur_offset_and_size:
 
405
                    yield cur_offset_and_size[0], this_data
 
406
                    cur_offset_and_size = next_offset[0] = offset_stack.next()
 
407
                else:
 
408
                    data_map[key] = this_data
 
409
            data_offset += c_offset.length
1166
410
 
1167
411
            # Now that we've read some data, see if we can yield anything back
1168
412
            while cur_offset_and_size in data_map:
1169
413
                this_data = data_map.pop(cur_offset_and_size)
1170
414
                yield cur_offset_and_size[0], this_data
1171
 
                cur_offset_and_size = offset_stack.next()
 
415
                cur_offset_and_size = next_offset[0] = offset_stack.next()
1172
416
 
1173
417
    def rename(self, rel_from, rel_to):
1174
418
        self._call('rename',
1183
427
    def rmdir(self, relpath):
1184
428
        resp = self._call('rmdir', self._remote_path(relpath))
1185
429
 
1186
 
    def _translate_error(self, resp, orig_path=None):
1187
 
        """Raise an exception from a response"""
1188
 
        if resp is None:
1189
 
            what = None
1190
 
        else:
1191
 
            what = resp[0]
1192
 
        if what == 'ok':
1193
 
            return
1194
 
        elif what == 'NoSuchFile':
1195
 
            if orig_path is not None:
1196
 
                error_path = orig_path
1197
 
            else:
1198
 
                error_path = resp[1]
1199
 
            raise errors.NoSuchFile(error_path)
1200
 
        elif what == 'error':
1201
 
            raise errors.SmartProtocolError(unicode(resp[1]))
1202
 
        elif what == 'FileExists':
1203
 
            raise errors.FileExists(resp[1])
1204
 
        elif what == 'DirectoryNotEmpty':
1205
 
            raise errors.DirectoryNotEmpty(resp[1])
1206
 
        elif what == 'ShortReadvError':
1207
 
            raise errors.ShortReadvError(resp[1], int(resp[2]),
1208
 
                                         int(resp[3]), int(resp[4]))
1209
 
        elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
1210
 
            encoding = str(resp[1]) # encoding must always be a string
1211
 
            val = resp[2]
1212
 
            start = int(resp[3])
1213
 
            end = int(resp[4])
1214
 
            reason = str(resp[5]) # reason must always be a string
1215
 
            if val.startswith('u:'):
1216
 
                val = val[2:].decode('utf-8')
1217
 
            elif val.startswith('s:'):
1218
 
                val = val[2:].decode('base64')
1219
 
            if what == 'UnicodeDecodeError':
1220
 
                raise UnicodeDecodeError(encoding, val, start, end, reason)
1221
 
            elif what == 'UnicodeEncodeError':
1222
 
                raise UnicodeEncodeError(encoding, val, start, end, reason)
1223
 
        elif what == "ReadOnlyError":
1224
 
            raise errors.TransportNotPossible('readonly transport')
1225
 
        else:
1226
 
            raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
 
430
    def _ensure_ok(self, resp):
 
431
        if resp[0] != 'ok':
 
432
            raise errors.UnexpectedSmartServerResponse(resp)
 
433
 
 
434
    def _translate_error(self, err, relpath=None):
 
435
        remote._translate_error(err, path=relpath)
1227
436
 
1228
437
    def disconnect(self):
1229
 
        self._medium.disconnect()
1230
 
 
1231
 
    def delete_tree(self, relpath):
1232
 
        raise errors.TransportNotPossible('readonly transport')
 
438
        m = self.get_smart_medium()
 
439
        if m is not None:
 
440
            m.disconnect()
1233
441
 
1234
442
    def stat(self, relpath):
1235
443
        resp = self._call2('stat', self._remote_path(relpath))
1236
444
        if resp[0] == 'stat':
1237
 
            return SmartStat(int(resp[1]), int(resp[2], 8))
1238
 
        else:
1239
 
            self._translate_error(resp)
 
445
            return _SmartStat(int(resp[1]), int(resp[2], 8))
 
446
        raise errors.UnexpectedSmartServerResponse(resp)
1240
447
 
1241
448
    ## def lock_read(self, relpath):
1242
449
    ##     """Lock the given file for shared (read) access.
1258
465
        resp = self._call2('list_dir', self._remote_path(relpath))
1259
466
        if resp[0] == 'names':
1260
467
            return [name.encode('ascii') for name in resp[1:]]
1261
 
        else:
1262
 
            self._translate_error(resp)
 
468
        raise errors.UnexpectedSmartServerResponse(resp)
1263
469
 
1264
470
    def iter_files_recursive(self):
1265
471
        resp = self._call2('iter_files_recursive', self._remote_path(''))
1266
472
        if resp[0] == 'names':
1267
473
            return resp[1:]
1268
 
        else:
1269
 
            self._translate_error(resp)
1270
 
 
1271
 
 
1272
 
class SmartClientMediumRequest(object):
1273
 
    """A request on a SmartClientMedium.
1274
 
 
1275
 
    Each request allows bytes to be provided to it via accept_bytes, and then
1276
 
    the response bytes to be read via read_bytes.
1277
 
 
1278
 
    For instance:
1279
 
    request.accept_bytes('123')
1280
 
    request.finished_writing()
1281
 
    result = request.read_bytes(3)
1282
 
    request.finished_reading()
1283
 
 
1284
 
    It is up to the individual SmartClientMedium whether multiple concurrent
1285
 
    requests can exist. See SmartClientMedium.get_request to obtain instances 
1286
 
    of SmartClientMediumRequest, and the concrete Medium you are using for 
1287
 
    details on concurrency and pipelining.
1288
 
    """
1289
 
 
1290
 
    def __init__(self, medium):
1291
 
        """Construct a SmartClientMediumRequest for the medium medium."""
1292
 
        self._medium = medium
1293
 
        # we track state by constants - we may want to use the same
1294
 
        # pattern as BodyReader if it gets more complex.
1295
 
        # valid states are: "writing", "reading", "done"
1296
 
        self._state = "writing"
1297
 
 
1298
 
    def accept_bytes(self, bytes):
1299
 
        """Accept bytes for inclusion in this request.
1300
 
 
1301
 
        This method may not be be called after finished_writing() has been
1302
 
        called.  It depends upon the Medium whether or not the bytes will be
1303
 
        immediately transmitted. Message based Mediums will tend to buffer the
1304
 
        bytes until finished_writing() is called.
1305
 
 
1306
 
        :param bytes: A bytestring.
1307
 
        """
1308
 
        if self._state != "writing":
1309
 
            raise errors.WritingCompleted(self)
1310
 
        self._accept_bytes(bytes)
1311
 
 
1312
 
    def _accept_bytes(self, bytes):
1313
 
        """Helper for accept_bytes.
1314
 
 
1315
 
        Accept_bytes checks the state of the request to determing if bytes
1316
 
        should be accepted. After that it hands off to _accept_bytes to do the
1317
 
        actual acceptance.
1318
 
        """
1319
 
        raise NotImplementedError(self._accept_bytes)
1320
 
 
1321
 
    def finished_reading(self):
1322
 
        """Inform the request that all desired data has been read.
1323
 
 
1324
 
        This will remove the request from the pipeline for its medium (if the
1325
 
        medium supports pipelining) and any further calls to methods on the
1326
 
        request will raise ReadingCompleted.
1327
 
        """
1328
 
        if self._state == "writing":
1329
 
            raise errors.WritingNotComplete(self)
1330
 
        if self._state != "reading":
1331
 
            raise errors.ReadingCompleted(self)
1332
 
        self._state = "done"
1333
 
        self._finished_reading()
1334
 
 
1335
 
    def _finished_reading(self):
1336
 
        """Helper for finished_reading.
1337
 
 
1338
 
        finished_reading checks the state of the request to determine if 
1339
 
        finished_reading is allowed, and if it is hands off to _finished_reading
1340
 
        to perform the action.
1341
 
        """
1342
 
        raise NotImplementedError(self._finished_reading)
1343
 
 
1344
 
    def finished_writing(self):
1345
 
        """Finish the writing phase of this request.
1346
 
 
1347
 
        This will flush all pending data for this request along the medium.
1348
 
        After calling finished_writing, you may not call accept_bytes anymore.
1349
 
        """
1350
 
        if self._state != "writing":
1351
 
            raise errors.WritingCompleted(self)
1352
 
        self._state = "reading"
1353
 
        self._finished_writing()
1354
 
 
1355
 
    def _finished_writing(self):
1356
 
        """Helper for finished_writing.
1357
 
 
1358
 
        finished_writing checks the state of the request to determine if 
1359
 
        finished_writing is allowed, and if it is hands off to _finished_writing
1360
 
        to perform the action.
1361
 
        """
1362
 
        raise NotImplementedError(self._finished_writing)
1363
 
 
1364
 
    def read_bytes(self, count):
1365
 
        """Read bytes from this requests response.
1366
 
 
1367
 
        This method will block and wait for count bytes to be read. It may not
1368
 
        be invoked until finished_writing() has been called - this is to ensure
1369
 
        a message-based approach to requests, for compatability with message
1370
 
        based mediums like HTTP.
1371
 
        """
1372
 
        if self._state == "writing":
1373
 
            raise errors.WritingNotComplete(self)
1374
 
        if self._state != "reading":
1375
 
            raise errors.ReadingCompleted(self)
1376
 
        return self._read_bytes(count)
1377
 
 
1378
 
    def _read_bytes(self, count):
1379
 
        """Helper for read_bytes.
1380
 
 
1381
 
        read_bytes checks the state of the request to determing if bytes
1382
 
        should be read. After that it hands off to _read_bytes to do the
1383
 
        actual read.
1384
 
        """
1385
 
        raise NotImplementedError(self._read_bytes)
1386
 
 
1387
 
 
1388
 
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1389
 
    """A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1390
 
 
1391
 
    def __init__(self, medium):
1392
 
        SmartClientMediumRequest.__init__(self, medium)
1393
 
        # check that we are safe concurrency wise. If some streams start
1394
 
        # allowing concurrent requests - i.e. via multiplexing - then this
1395
 
        # assert should be moved to SmartClientStreamMedium.get_request,
1396
 
        # and the setting/unsetting of _current_request likewise moved into
1397
 
        # that class : but its unneeded overhead for now. RBC 20060922
1398
 
        if self._medium._current_request is not None:
1399
 
            raise errors.TooManyConcurrentRequests(self._medium)
1400
 
        self._medium._current_request = self
1401
 
 
1402
 
    def _accept_bytes(self, bytes):
1403
 
        """See SmartClientMediumRequest._accept_bytes.
1404
 
        
1405
 
        This forwards to self._medium._accept_bytes because we are operating
1406
 
        on the mediums stream.
1407
 
        """
1408
 
        self._medium._accept_bytes(bytes)
1409
 
 
1410
 
    def _finished_reading(self):
1411
 
        """See SmartClientMediumRequest._finished_reading.
1412
 
 
1413
 
        This clears the _current_request on self._medium to allow a new 
1414
 
        request to be created.
1415
 
        """
1416
 
        assert self._medium._current_request is self
1417
 
        self._medium._current_request = None
1418
 
        
1419
 
    def _finished_writing(self):
1420
 
        """See SmartClientMediumRequest._finished_writing.
1421
 
 
1422
 
        This invokes self._medium._flush to ensure all bytes are transmitted.
1423
 
        """
1424
 
        self._medium._flush()
1425
 
 
1426
 
    def _read_bytes(self, count):
1427
 
        """See SmartClientMediumRequest._read_bytes.
1428
 
        
1429
 
        This forwards to self._medium._read_bytes because we are operating
1430
 
        on the mediums stream.
1431
 
        """
1432
 
        return self._medium._read_bytes(count)
1433
 
 
1434
 
 
1435
 
class SmartClientRequestProtocolOne(SmartProtocolBase):
1436
 
    """The client-side protocol for smart version 1."""
1437
 
 
1438
 
    def __init__(self, request):
1439
 
        """Construct a SmartClientRequestProtocolOne.
1440
 
 
1441
 
        :param request: A SmartClientMediumRequest to serialise onto and
1442
 
            deserialise from.
1443
 
        """
1444
 
        self._request = request
1445
 
        self._body_buffer = None
1446
 
 
1447
 
    def call(self, *args):
1448
 
        bytes = _encode_tuple(args)
1449
 
        self._request.accept_bytes(bytes)
1450
 
        self._request.finished_writing()
1451
 
 
1452
 
    def call_with_body_bytes(self, args, body):
1453
 
        """Make a remote call of args with body bytes 'body'.
1454
 
 
1455
 
        After calling this, call read_response_tuple to find the result out.
1456
 
        """
1457
 
        bytes = _encode_tuple(args)
1458
 
        self._request.accept_bytes(bytes)
1459
 
        bytes = self._encode_bulk_data(body)
1460
 
        self._request.accept_bytes(bytes)
1461
 
        self._request.finished_writing()
1462
 
 
1463
 
    def call_with_body_readv_array(self, args, body):
1464
 
        """Make a remote call with a readv array.
1465
 
 
1466
 
        The body is encoded with one line per readv offset pair. The numbers in
1467
 
        each pair are separated by a comma, and no trailing \n is emitted.
1468
 
        """
1469
 
        bytes = _encode_tuple(args)
1470
 
        self._request.accept_bytes(bytes)
1471
 
        readv_bytes = self._serialise_offsets(body)
1472
 
        bytes = self._encode_bulk_data(readv_bytes)
1473
 
        self._request.accept_bytes(bytes)
1474
 
        self._request.finished_writing()
1475
 
 
1476
 
    def cancel_read_body(self):
1477
 
        """After expecting a body, a response code may indicate one otherwise.
1478
 
 
1479
 
        This method lets the domain client inform the protocol that no body
1480
 
        will be transmitted. This is a terminal method: after calling it the
1481
 
        protocol is not able to be used further.
1482
 
        """
1483
 
        self._request.finished_reading()
1484
 
 
1485
 
    def read_response_tuple(self, expect_body=False):
1486
 
        """Read a response tuple from the wire.
1487
 
 
1488
 
        This should only be called once.
1489
 
        """
1490
 
        result = self._recv_tuple()
1491
 
        if not expect_body:
1492
 
            self._request.finished_reading()
1493
 
        return result
1494
 
 
1495
 
    def read_body_bytes(self, count=-1):
1496
 
        """Read bytes from the body, decoding into a byte stream.
1497
 
        
1498
 
        We read all bytes at once to ensure we've checked the trailer for 
1499
 
        errors, and then feed the buffer back as read_body_bytes is called.
1500
 
        """
1501
 
        if self._body_buffer is not None:
1502
 
            return self._body_buffer.read(count)
1503
 
        _body_decoder = LengthPrefixedBodyDecoder()
1504
 
 
1505
 
        while not _body_decoder.finished_reading:
1506
 
            bytes_wanted = _body_decoder.next_read_size()
1507
 
            bytes = self._request.read_bytes(bytes_wanted)
1508
 
            _body_decoder.accept_bytes(bytes)
1509
 
        self._request.finished_reading()
1510
 
        self._body_buffer = StringIO(_body_decoder.read_pending_data())
1511
 
        # XXX: TODO check the trailer result.
1512
 
        return self._body_buffer.read(count)
1513
 
 
1514
 
    def _recv_tuple(self):
1515
 
        """Receive a tuple from the medium request."""
1516
 
        line = ''
1517
 
        while not line or line[-1] != '\n':
1518
 
            # TODO: this is inefficient - but tuples are short.
1519
 
            new_char = self._request.read_bytes(1)
1520
 
            line += new_char
1521
 
            assert new_char != '', "end of file reading from server."
1522
 
        return _decode_tuple(line)
1523
 
 
1524
 
    def query_version(self):
1525
 
        """Return protocol version number of the server."""
1526
 
        self.call('hello')
1527
 
        resp = self.read_response_tuple()
1528
 
        if resp == ('ok', '1'):
1529
 
            return 1
1530
 
        else:
1531
 
            raise errors.SmartProtocolError("bad response %r" % (resp,))
1532
 
 
1533
 
 
1534
 
class SmartClientMedium(object):
1535
 
    """Smart client is a medium for sending smart protocol requests over."""
1536
 
 
1537
 
    def disconnect(self):
1538
 
        """If this medium maintains a persistent connection, close it.
1539
 
        
1540
 
        The default implementation does nothing.
1541
 
        """
1542
 
        
1543
 
 
1544
 
class SmartClientStreamMedium(SmartClientMedium):
1545
 
    """Stream based medium common class.
1546
 
 
1547
 
    SmartClientStreamMediums operate on a stream. All subclasses use a common
1548
 
    SmartClientStreamMediumRequest for their requests, and should implement
1549
 
    _accept_bytes and _read_bytes to allow the request objects to send and
1550
 
    receive bytes.
1551
 
    """
1552
 
 
1553
 
    def __init__(self):
1554
 
        self._current_request = None
1555
 
 
1556
 
    def accept_bytes(self, bytes):
1557
 
        self._accept_bytes(bytes)
1558
 
 
1559
 
    def __del__(self):
1560
 
        """The SmartClientStreamMedium knows how to close the stream when it is
1561
 
        finished with it.
1562
 
        """
1563
 
        self.disconnect()
1564
 
 
1565
 
    def _flush(self):
1566
 
        """Flush the output stream.
1567
 
        
1568
 
        This method is used by the SmartClientStreamMediumRequest to ensure that
1569
 
        all data for a request is sent, to avoid long timeouts or deadlocks.
1570
 
        """
1571
 
        raise NotImplementedError(self._flush)
1572
 
 
1573
 
    def get_request(self):
1574
 
        """See SmartClientMedium.get_request().
1575
 
 
1576
 
        SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1577
 
        for get_request.
1578
 
        """
1579
 
        return SmartClientStreamMediumRequest(self)
1580
 
 
1581
 
    def read_bytes(self, count):
1582
 
        return self._read_bytes(count)
1583
 
 
1584
 
 
1585
 
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1586
 
    """A client medium using simple pipes.
1587
 
    
1588
 
    This client does not manage the pipes: it assumes they will always be open.
1589
 
    """
1590
 
 
1591
 
    def __init__(self, readable_pipe, writeable_pipe):
1592
 
        SmartClientStreamMedium.__init__(self)
1593
 
        self._readable_pipe = readable_pipe
1594
 
        self._writeable_pipe = writeable_pipe
1595
 
 
1596
 
    def _accept_bytes(self, bytes):
1597
 
        """See SmartClientStreamMedium.accept_bytes."""
1598
 
        self._writeable_pipe.write(bytes)
1599
 
 
1600
 
    def _flush(self):
1601
 
        """See SmartClientStreamMedium._flush()."""
1602
 
        self._writeable_pipe.flush()
1603
 
 
1604
 
    def _read_bytes(self, count):
1605
 
        """See SmartClientStreamMedium._read_bytes."""
1606
 
        return self._readable_pipe.read(count)
1607
 
 
1608
 
 
1609
 
class SmartSSHClientMedium(SmartClientStreamMedium):
1610
 
    """A client medium using SSH."""
1611
 
    
1612
 
    def __init__(self, host, port=None, username=None, password=None,
1613
 
            vendor=None):
1614
 
        """Creates a client that will connect on the first use.
1615
 
        
1616
 
        :param vendor: An optional override for the ssh vendor to use. See
1617
 
            bzrlib.transport.ssh for details on ssh vendors.
1618
 
        """
1619
 
        SmartClientStreamMedium.__init__(self)
1620
 
        self._connected = False
1621
 
        self._host = host
1622
 
        self._password = password
1623
 
        self._port = port
1624
 
        self._username = username
1625
 
        self._read_from = None
1626
 
        self._ssh_connection = None
1627
 
        self._vendor = vendor
1628
 
        self._write_to = None
1629
 
 
1630
 
    def _accept_bytes(self, bytes):
1631
 
        """See SmartClientStreamMedium.accept_bytes."""
1632
 
        self._ensure_connection()
1633
 
        self._write_to.write(bytes)
1634
 
 
1635
 
    def disconnect(self):
1636
 
        """See SmartClientMedium.disconnect()."""
1637
 
        if not self._connected:
1638
 
            return
1639
 
        self._read_from.close()
1640
 
        self._write_to.close()
1641
 
        self._ssh_connection.close()
1642
 
        self._connected = False
1643
 
 
1644
 
    def _ensure_connection(self):
1645
 
        """Connect this medium if not already connected."""
1646
 
        if self._connected:
1647
 
            return
1648
 
        executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1649
 
        if self._vendor is None:
1650
 
            vendor = ssh._get_ssh_vendor()
1651
 
        else:
1652
 
            vendor = self._vendor
1653
 
        self._ssh_connection = vendor.connect_ssh(self._username,
1654
 
                self._password, self._host, self._port,
1655
 
                command=[executable, 'serve', '--inet', '--directory=/',
1656
 
                         '--allow-writes'])
1657
 
        self._read_from, self._write_to = \
1658
 
            self._ssh_connection.get_filelike_channels()
1659
 
        self._connected = True
1660
 
 
1661
 
    def _flush(self):
1662
 
        """See SmartClientStreamMedium._flush()."""
1663
 
        self._write_to.flush()
1664
 
 
1665
 
    def _read_bytes(self, count):
1666
 
        """See SmartClientStreamMedium.read_bytes."""
1667
 
        if not self._connected:
1668
 
            raise errors.MediumNotConnected(self)
1669
 
        return self._read_from.read(count)
1670
 
 
1671
 
 
1672
 
class SmartTCPClientMedium(SmartClientStreamMedium):
1673
 
    """A client medium using TCP."""
1674
 
    
1675
 
    def __init__(self, host, port):
1676
 
        """Creates a client that will connect on the first use."""
1677
 
        SmartClientStreamMedium.__init__(self)
1678
 
        self._connected = False
1679
 
        self._host = host
1680
 
        self._port = port
1681
 
        self._socket = None
1682
 
 
1683
 
    def _accept_bytes(self, bytes):
1684
 
        """See SmartClientMedium.accept_bytes."""
1685
 
        self._ensure_connection()
1686
 
        self._socket.sendall(bytes)
1687
 
 
1688
 
    def disconnect(self):
1689
 
        """See SmartClientMedium.disconnect()."""
1690
 
        if not self._connected:
1691
 
            return
1692
 
        self._socket.close()
1693
 
        self._socket = None
1694
 
        self._connected = False
1695
 
 
1696
 
    def _ensure_connection(self):
1697
 
        """Connect this medium if not already connected."""
1698
 
        if self._connected:
1699
 
            return
1700
 
        self._socket = socket.socket()
1701
 
        self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1702
 
        result = self._socket.connect_ex((self._host, int(self._port)))
1703
 
        if result:
1704
 
            raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1705
 
                    (self._host, self._port, os.strerror(result)))
1706
 
        self._connected = True
1707
 
 
1708
 
    def _flush(self):
1709
 
        """See SmartClientStreamMedium._flush().
1710
 
        
1711
 
        For TCP we do no flushing. We may want to turn off TCP_NODELAY and 
1712
 
        add a means to do a flush, but that can be done in the future.
1713
 
        """
1714
 
 
1715
 
    def _read_bytes(self, count):
1716
 
        """See SmartClientMedium.read_bytes."""
1717
 
        if not self._connected:
1718
 
            raise errors.MediumNotConnected(self)
1719
 
        return self._socket.recv(count)
1720
 
 
1721
 
 
1722
 
class SmartTCPTransport(SmartTransport):
 
474
        raise errors.UnexpectedSmartServerResponse(resp)
 
475
 
 
476
 
 
477
class RemoteTCPTransport(RemoteTransport):
1723
478
    """Connection to smart server over plain tcp.
1724
 
    
 
479
 
1725
480
    This is essentially just a factory to get 'RemoteTransport(url,
1726
481
        SmartTCPClientMedium).
1727
482
    """
1728
483
 
1729
 
    def __init__(self, url):
1730
 
        _scheme, _username, _password, _host, _port, _path = \
1731
 
            transport.split_url(url)
1732
 
        try:
1733
 
            _port = int(_port)
1734
 
        except (ValueError, TypeError), e:
1735
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1736
 
        medium = SmartTCPClientMedium(_host, _port)
1737
 
        super(SmartTCPTransport, self).__init__(url, medium=medium)
1738
 
 
1739
 
 
1740
 
class SmartSSHTransport(SmartTransport):
 
484
    def _build_medium(self):
 
485
        client_medium = medium.SmartTCPClientMedium(
 
486
            self._host, self._port, self.base)
 
487
        return client_medium, None
 
488
 
 
489
 
 
490
class RemoteTCPTransportV2Only(RemoteTransport):
 
491
    """Connection to smart server over plain tcp with the client hard-coded to
 
492
    assume protocol v2 and remote server version <= 1.6.
 
493
 
 
494
    This should only be used for testing.
 
495
    """
 
496
 
 
497
    def _build_medium(self):
 
498
        client_medium = medium.SmartTCPClientMedium(
 
499
            self._host, self._port, self.base)
 
500
        client_medium._protocol_version = 2
 
501
        client_medium._remember_remote_is_before((1, 6))
 
502
        return client_medium, None
 
503
 
 
504
 
 
505
class RemoteSSHTransport(RemoteTransport):
1741
506
    """Connection to smart server over SSH.
1742
507
 
1743
508
    This is essentially just a factory to get 'RemoteTransport(url,
1744
509
        SmartSSHClientMedium).
1745
510
    """
1746
511
 
 
512
    def _build_medium(self):
 
513
        location_config = config.LocationConfig(self.base)
 
514
        bzr_remote_path = location_config.get_bzr_remote_path()
 
515
        user = self._user
 
516
        if user is None:
 
517
            auth = config.AuthenticationConfig()
 
518
            user = auth.get_user('ssh', self._host, self._port)
 
519
        ssh_params = medium.SSHParams(self._host, self._port, user,
 
520
            self._password, bzr_remote_path)
 
521
        client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
 
522
        return client_medium, (user, self._password)
 
523
 
 
524
 
 
525
class RemoteHTTPTransport(RemoteTransport):
 
526
    """Just a way to connect between a bzr+http:// url and http://.
 
527
 
 
528
    This connection operates slightly differently than the RemoteSSHTransport.
 
529
    It uses a plain http:// transport underneath, which defines what remote
 
530
    .bzr/smart URL we are connected to. From there, all paths that are sent are
 
531
    sent as relative paths, this way, the remote side can properly
 
532
    de-reference them, since it is likely doing rewrite rules to translate an
 
533
    HTTP path into a local path.
 
534
    """
 
535
 
 
536
    def __init__(self, base, _from_transport=None, http_transport=None):
 
537
        if http_transport is None:
 
538
            # FIXME: the password may be lost here because it appears in the
 
539
            # url only for an intial construction (when the url came from the
 
540
            # command-line).
 
541
            http_url = base[len('bzr+'):]
 
542
            self._http_transport = transport.get_transport(http_url)
 
543
        else:
 
544
            self._http_transport = http_transport
 
545
        super(RemoteHTTPTransport, self).__init__(
 
546
            base, _from_transport=_from_transport)
 
547
 
 
548
    def _build_medium(self):
 
549
        # We let http_transport take care of the credentials
 
550
        return self._http_transport.get_smart_medium(), None
 
551
 
 
552
    def _remote_path(self, relpath):
 
553
        """After connecting, HTTP Transport only deals in relative URLs."""
 
554
        # Adjust the relpath based on which URL this smart transport is
 
555
        # connected to.
 
556
        http_base = urlutils.normalize_url(self.get_smart_medium().base)
 
557
        url = urlutils.join(self.base[len('bzr+'):], relpath)
 
558
        url = urlutils.normalize_url(url)
 
559
        return urlutils.relative_url(http_base, url)
 
560
 
 
561
    def clone(self, relative_url):
 
562
        """Make a new RemoteHTTPTransport related to me.
 
563
 
 
564
        This is re-implemented rather than using the default
 
565
        RemoteTransport.clone() because we must be careful about the underlying
 
566
        http transport.
 
567
 
 
568
        Also, the cloned smart transport will POST to the same .bzr/smart
 
569
        location as this transport (although obviously the relative paths in the
 
570
        smart requests may be different).  This is so that the server doesn't
 
571
        have to handle .bzr/smart requests at arbitrary places inside .bzr
 
572
        directories, just at the initial URL the user uses.
 
573
        """
 
574
        if relative_url:
 
575
            abs_url = self.abspath(relative_url)
 
576
        else:
 
577
            abs_url = self.base
 
578
        return RemoteHTTPTransport(abs_url,
 
579
                                   _from_transport=self,
 
580
                                   http_transport=self._http_transport)
 
581
 
 
582
    def _redirected_to(self, source, target):
 
583
        """See transport._redirected_to"""
 
584
        redirected = self._http_transport._redirected_to(source, target)
 
585
        if (redirected is not None
 
586
            and isinstance(redirected, type(self._http_transport))):
 
587
            return RemoteHTTPTransport('bzr+' + redirected.external_url(),
 
588
                                       http_transport=redirected)
 
589
        else:
 
590
            # Either None or a transport for a different protocol
 
591
            return redirected
 
592
 
 
593
 
 
594
class HintingSSHTransport(transport.Transport):
 
595
    """Simple transport that handles ssh:// and points out bzr+ssh://."""
 
596
 
1747
597
    def __init__(self, url):
1748
 
        _scheme, _username, _password, _host, _port, _path = \
1749
 
            transport.split_url(url)
1750
 
        try:
1751
 
            if _port is not None:
1752
 
                _port = int(_port)
1753
 
        except (ValueError, TypeError), e:
1754
 
            raise errors.InvalidURL(path=url, extra="invalid port %s" % 
1755
 
                _port)
1756
 
        medium = SmartSSHClientMedium(_host, _port, _username, _password)
1757
 
        super(SmartSSHTransport, self).__init__(url, medium=medium)
 
598
        raise errors.UnsupportedProtocol(url,
 
599
            'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
1758
600
 
1759
601
 
1760
602
def get_test_permutations():
1761
603
    """Return (transport, server) permutations for testing."""
1762
604
    ### We may need a little more test framework support to construct an
1763
605
    ### appropriate RemoteTransport in the future.
1764
 
    return [(SmartTCPTransport, SmartTCPServer_for_testing)]
 
606
    from bzrlib.tests import test_server
 
607
    return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]