225
261
for start, length in offsets:
226
262
txt.append('%d,%d' % (start, length))
227
263
return '\n'.join(txt)
229
def _write_and_flush(self, bytes):
230
"""Write bytes to self._out and flush it."""
231
# XXX: this will be inefficient. Just ask Robert.
232
self._out.write(bytes)
236
class SmartStreamServer(SmartProtocolBase):
266
class SmartServerRequestProtocolOne(SmartProtocolBase):
267
"""Server-side encoding and decoding logic for smart version 1."""
269
def __init__(self, backing_transport, write_func):
270
self._backing_transport = backing_transport
271
self.excess_buffer = ''
272
self._finished_reading = False
274
self.has_dispatched = False
276
self._body_decoder = None
277
self._write_func = write_func
279
def accept_bytes(self, bytes):
280
"""Take bytes, and advance the internal state machine appropriately.
282
:param bytes: must be a byte string
284
assert isinstance(bytes, str)
285
self.in_buffer += bytes
286
if not self.has_dispatched:
287
if '\n' not in self.in_buffer:
288
# no command line yet
290
self.has_dispatched = True
292
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
294
req_args = _decode_tuple(first_line)
295
self.request = SmartServerRequestHandler(
296
self._backing_transport)
297
self.request.dispatch_command(req_args[0], req_args[1:])
298
if self.request.finished_reading:
300
self.excess_buffer = self.in_buffer
302
self._send_response(self.request.response.args,
303
self.request.response.body)
304
self.sync_with_request(self.request)
305
except KeyboardInterrupt:
307
except Exception, exception:
308
# everything else: pass to client, flush, and quit
309
self._send_response(('error', str(exception)))
312
if self.has_dispatched:
313
if self._finished_reading:
314
# nothing to do.XXX: this routine should be a single state
316
self.excess_buffer += self.in_buffer
319
if self._body_decoder is None:
320
self._body_decoder = LengthPrefixedBodyDecoder()
321
self._body_decoder.accept_bytes(self.in_buffer)
322
self.in_buffer = self._body_decoder.unused_data
323
body_data = self._body_decoder.read_pending_data()
324
self.request.accept_body(body_data)
325
if self._body_decoder.finished_reading:
326
self.request.end_of_body()
327
assert self.request.finished_reading, \
328
"no more body, request not finished"
329
self.sync_with_request(self.request)
330
if self.request.response is not None:
331
self._send_response(self.request.response.args,
332
self.request.response.body)
333
self.excess_buffer = self.in_buffer
336
assert not self.request.finished_reading, \
337
"no response and we have finished reading."
339
def _send_response(self, args, body=None):
340
"""Send a smart server response down the output stream."""
341
self._write_func(_encode_tuple(args))
343
assert isinstance(body, str), 'body must be a str'
344
bytes = self._encode_bulk_data(body)
345
self._write_func(bytes)
347
def sync_with_request(self, request):
348
self._finished_reading = request.finished_reading
350
def next_read_size(self):
351
if self._finished_reading:
353
if self._body_decoder is None:
356
return self._body_decoder.next_read_size()
359
class LengthPrefixedBodyDecoder(object):
360
"""Decodes the length-prefixed bulk data."""
363
self.bytes_left = None
364
self.finished_reading = False
365
self.unused_data = ''
366
self.state_accept = self._state_accept_expecting_length
367
self.state_read = self._state_read_no_data
369
self._trailer_buffer = ''
371
def accept_bytes(self, bytes):
372
"""Decode as much of bytes as possible.
374
If 'bytes' contains too much data it will be appended to
377
finished_reading will be set when no more data is required. Further
378
data will be appended to self.unused_data.
380
# accept_bytes is allowed to change the state
381
current_state = self.state_accept
382
self.state_accept(bytes)
383
while current_state != self.state_accept:
384
current_state = self.state_accept
385
self.state_accept('')
387
def next_read_size(self):
388
if self.bytes_left is not None:
389
# Ideally we want to read all the remainder of the body and the
391
return self.bytes_left + 5
392
elif self.state_accept == self._state_accept_reading_trailer:
393
# Just the trailer left
394
return 5 - len(self._trailer_buffer)
395
elif self.state_accept == self._state_accept_expecting_length:
396
# There's still at least 6 bytes left ('\n' to end the length, plus
400
# Reading excess data. Either way, 1 byte at a time is fine.
403
def read_pending_data(self):
404
"""Return any pending data that has been decoded."""
405
return self.state_read()
407
def _state_accept_expecting_length(self, bytes):
408
self._in_buffer += bytes
409
pos = self._in_buffer.find('\n')
412
self.bytes_left = int(self._in_buffer[:pos])
413
self._in_buffer = self._in_buffer[pos+1:]
414
self.bytes_left -= len(self._in_buffer)
415
self.state_accept = self._state_accept_reading_body
416
self.state_read = self._state_read_in_buffer
418
def _state_accept_reading_body(self, bytes):
419
self._in_buffer += bytes
420
self.bytes_left -= len(bytes)
421
if self.bytes_left <= 0:
423
if self.bytes_left != 0:
424
self._trailer_buffer = self._in_buffer[self.bytes_left:]
425
self._in_buffer = self._in_buffer[:self.bytes_left]
426
self.bytes_left = None
427
self.state_accept = self._state_accept_reading_trailer
429
def _state_accept_reading_trailer(self, bytes):
430
self._trailer_buffer += bytes
431
# TODO: what if the trailer does not match "done\n"? Should this raise
432
# a ProtocolViolation exception?
433
if self._trailer_buffer.startswith('done\n'):
434
self.unused_data = self._trailer_buffer[len('done\n'):]
435
self.state_accept = self._state_accept_reading_unused
436
self.finished_reading = True
438
def _state_accept_reading_unused(self, bytes):
439
self.unused_data += bytes
441
def _state_read_no_data(self):
444
def _state_read_in_buffer(self):
445
result = self._in_buffer
450
class SmartServerStreamMedium(object):
237
451
"""Handles smart commands coming over a stream.
239
453
The stream may be a pipe connected to sshd, or a tcp socket, or an
253
547
:param out_file: Python file to write responses.
254
548
:param backing_transport: Transport for the directory served.
550
SmartServerStreamMedium.__init__(self, backing_transport)
256
551
self._in = in_file
257
552
self._out = out_file
258
self.smart_server = SmartServer(backing_transport)
259
# server can call back to us to get bulk data - this is not really
260
# ideal, they should get it per request instead
261
self.smart_server._recv_body = self._recv_bulk
263
def _recv_tuple(self):
264
"""Read a request from the client and return as a tuple.
266
Returns None at end of file (if the client closed the connection.)
268
return _recv_tuple(self._in)
270
def _send_tuple(self, args):
271
"""Send response header"""
272
return self._write_and_flush(_encode_tuple(args))
274
def _send_error_and_disconnect(self, exception):
275
self._send_tuple(('error', str(exception)))
279
def _serve_one_request(self):
280
"""Read one request from input, process, send back a response.
282
:return: False if the server should terminate, otherwise None.
284
req_args = self._recv_tuple()
286
# client closed connection
287
return False # shutdown server
289
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
290
self._send_tuple(response.args)
291
if response.body is not None:
292
self._send_bulk_data(response.body)
293
except KeyboardInterrupt:
296
# everything else: pass to client, flush, and quit
297
self._send_error_and_disconnect(e)
301
"""Serve requests until the client disconnects."""
302
# Keep a reference to stderr because the sys module's globals get set to
303
# None during interpreter shutdown.
304
from sys import stderr
306
while self._serve_one_request() != False:
309
stderr.write("%s terminating on exception %s\n" % (self, e))
554
def _serve_one_request_unguarded(self, protocol):
556
bytes_to_read = protocol.next_read_size()
557
if bytes_to_read == 0:
558
# Finished serving this request.
561
bytes = self._in.read(bytes_to_read)
563
# Connection has been closed.
567
protocol.accept_bytes(bytes)
569
def terminate_due_to_error(self):
570
# TODO: This should log to a server log file, but no such thing
571
# exists yet. Andrew Bennetts 2006-09-29.
575
def _write_out(self, bytes):
576
self._out.write(bytes)
313
579
class SmartServerResponse(object):
314
"""Response generated by SmartServer."""
580
"""Response generated by SmartServerRequestHandler."""
316
582
def __init__(self, args, body=None):
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
586
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
321
587
# for delivering the data for a request. This could be done with as the
322
588
# StreamServer, though that would create conflation between request and response
323
589
# which may be undesirable.
326
class SmartServer(object):
592
class SmartServerRequestHandler(object):
327
593
"""Protocol logic for smart server.
329
595
This doesn't handle serialization at all, it just processes requests and
330
596
creates responses.
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
599
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
600
# not contain encoding or decoding logic to allow the wire protocol to vary
601
# from the object protocol: we will want to tweak the wire protocol separate
602
# from the object model, and ideally we will be able to do that without
603
# having a SmartServerRequestHandler subclass for each wire protocol, rather
604
# just a Protocol subclass.
340
606
# TODO: Better way of representing the body for commands that take it,
341
607
# and allow it to be streamed into the server.
343
609
def __init__(self, backing_transport):
344
610
self._backing_transport = backing_transport
611
self._converted_command = False
612
self.finished_reading = False
613
self._body_bytes = ''
616
def accept_body(self, bytes):
619
This should be overriden for each command that desired body data to
620
handle the right format of that data. I.e. plain bytes, a bundle etc.
622
The deserialisation into that format should be done in the Protocol
623
object. Set self.desired_body_format to the format your method will
626
# default fallback is to accumulate bytes.
627
self._body_bytes += bytes
629
def _end_of_body_handler(self):
630
"""An unimplemented end of body handler."""
631
raise NotImplementedError(self._end_of_body_handler)
346
633
def do_hello(self):
347
634
"""Answer a version request with my version."""
915
1265
def list_dir(self, relpath):
916
resp = self._client._call('list_dir',
917
self._remote_path(relpath))
1266
resp = self._call2('list_dir', self._remote_path(relpath))
918
1267
if resp[0] == 'names':
919
1268
return [name.encode('ascii') for name in resp[1:]]
921
1270
self._translate_error(resp)
923
1272
def iter_files_recursive(self):
924
resp = self._client._call('iter_files_recursive',
925
self._remote_path(''))
1273
resp = self._call2('iter_files_recursive', self._remote_path(''))
926
1274
if resp[0] == 'names':
929
1277
self._translate_error(resp)
932
class SmartStreamClient(SmartProtocolBase):
933
"""Connection to smart server over two streams"""
935
def __init__(self, connect_func):
936
self._connect_func = connect_func
937
self._connected = False
942
def _ensure_connection(self):
943
if not self._connected:
944
self._in, self._out = self._connect_func()
945
self._connected = True
947
def _send_tuple(self, args):
948
self._ensure_connection()
949
return self._write_and_flush(_encode_tuple(args))
951
def _send_bulk_data(self, body):
952
self._ensure_connection()
953
SmartProtocolBase._send_bulk_data(self, body)
955
def _recv_bulk(self):
956
self._ensure_connection()
957
return SmartProtocolBase._recv_bulk(self)
1280
class SmartClientMediumRequest(object):
1281
"""A request on a SmartClientMedium.
1283
Each request allows bytes to be provided to it via accept_bytes, and then
1284
the response bytes to be read via read_bytes.
1287
request.accept_bytes('123')
1288
request.finished_writing()
1289
result = request.read_bytes(3)
1290
request.finished_reading()
1292
It is up to the individual SmartClientMedium whether multiple concurrent
1293
requests can exist. See SmartClientMedium.get_request to obtain instances
1294
of SmartClientMediumRequest, and the concrete Medium you are using for
1295
details on concurrency and pipelining.
1298
def __init__(self, medium):
1299
"""Construct a SmartClientMediumRequest for the medium medium."""
1300
self._medium = medium
1301
# we track state by constants - we may want to use the same
1302
# pattern as BodyReader if it gets more complex.
1303
# valid states are: "writing", "reading", "done"
1304
self._state = "writing"
1306
def accept_bytes(self, bytes):
1307
"""Accept bytes for inclusion in this request.
1309
This method may not be be called after finished_writing() has been
1310
called. It depends upon the Medium whether or not the bytes will be
1311
immediately transmitted. Message based Mediums will tend to buffer the
1312
bytes until finished_writing() is called.
1314
:param bytes: A bytestring.
1316
if self._state != "writing":
1317
raise errors.WritingCompleted(self)
1318
self._accept_bytes(bytes)
1320
def _accept_bytes(self, bytes):
1321
"""Helper for accept_bytes.
1323
Accept_bytes checks the state of the request to determing if bytes
1324
should be accepted. After that it hands off to _accept_bytes to do the
1327
raise NotImplementedError(self._accept_bytes)
1329
def finished_reading(self):
1330
"""Inform the request that all desired data has been read.
1332
This will remove the request from the pipeline for its medium (if the
1333
medium supports pipelining) and any further calls to methods on the
1334
request will raise ReadingCompleted.
1336
if self._state == "writing":
1337
raise errors.WritingNotComplete(self)
1338
if self._state != "reading":
1339
raise errors.ReadingCompleted(self)
1340
self._state = "done"
1341
self._finished_reading()
1343
def _finished_reading(self):
1344
"""Helper for finished_reading.
1346
finished_reading checks the state of the request to determine if
1347
finished_reading is allowed, and if it is hands off to _finished_reading
1348
to perform the action.
1350
raise NotImplementedError(self._finished_reading)
1352
def finished_writing(self):
1353
"""Finish the writing phase of this request.
1355
This will flush all pending data for this request along the medium.
1356
After calling finished_writing, you may not call accept_bytes anymore.
1358
if self._state != "writing":
1359
raise errors.WritingCompleted(self)
1360
self._state = "reading"
1361
self._finished_writing()
1363
def _finished_writing(self):
1364
"""Helper for finished_writing.
1366
finished_writing checks the state of the request to determine if
1367
finished_writing is allowed, and if it is hands off to _finished_writing
1368
to perform the action.
1370
raise NotImplementedError(self._finished_writing)
1372
def read_bytes(self, count):
1373
"""Read bytes from this requests response.
1375
This method will block and wait for count bytes to be read. It may not
1376
be invoked until finished_writing() has been called - this is to ensure
1377
a message-based approach to requests, for compatability with message
1378
based mediums like HTTP.
1380
if self._state == "writing":
1381
raise errors.WritingNotComplete(self)
1382
if self._state != "reading":
1383
raise errors.ReadingCompleted(self)
1384
return self._read_bytes(count)
1386
def _read_bytes(self, count):
1387
"""Helper for read_bytes.
1389
read_bytes checks the state of the request to determing if bytes
1390
should be read. After that it hands off to _read_bytes to do the
1393
raise NotImplementedError(self._read_bytes)
1396
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1397
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1399
def __init__(self, medium):
1400
SmartClientMediumRequest.__init__(self, medium)
1401
# check that we are safe concurrency wise. If some streams start
1402
# allowing concurrent requests - i.e. via multiplexing - then this
1403
# assert should be moved to SmartClientStreamMedium.get_request,
1404
# and the setting/unsetting of _current_request likewise moved into
1405
# that class : but its unneeded overhead for now. RBC 20060922
1406
if self._medium._current_request is not None:
1407
raise errors.TooManyConcurrentRequests(self._medium)
1408
self._medium._current_request = self
1410
def _accept_bytes(self, bytes):
1411
"""See SmartClientMediumRequest._accept_bytes.
1413
This forwards to self._medium._accept_bytes because we are operating
1414
on the mediums stream.
1416
self._medium._accept_bytes(bytes)
1418
def _finished_reading(self):
1419
"""See SmartClientMediumRequest._finished_reading.
1421
This clears the _current_request on self._medium to allow a new
1422
request to be created.
1424
assert self._medium._current_request is self
1425
self._medium._current_request = None
1427
def _finished_writing(self):
1428
"""See SmartClientMediumRequest._finished_writing.
1430
This invokes self._medium._flush to ensure all bytes are transmitted.
1432
self._medium._flush()
1434
def _read_bytes(self, count):
1435
"""See SmartClientMediumRequest._read_bytes.
1437
This forwards to self._medium._read_bytes because we are operating
1438
on the mediums stream.
1440
return self._medium._read_bytes(count)
1443
class SmartClientRequestProtocolOne(SmartProtocolBase):
1444
"""The client-side protocol for smart version 1."""
1446
def __init__(self, request):
1447
"""Construct a SmartClientRequestProtocolOne.
1449
:param request: A SmartClientMediumRequest to serialise onto and
1452
self._request = request
1453
self._body_buffer = None
1455
def call(self, *args):
1456
bytes = _encode_tuple(args)
1457
self._request.accept_bytes(bytes)
1458
self._request.finished_writing()
1460
def call_with_body_bytes(self, args, body):
1461
"""Make a remote call of args with body bytes 'body'.
1463
After calling this, call read_response_tuple to find the result out.
1465
bytes = _encode_tuple(args)
1466
self._request.accept_bytes(bytes)
1467
bytes = self._encode_bulk_data(body)
1468
self._request.accept_bytes(bytes)
1469
self._request.finished_writing()
1471
def call_with_body_readv_array(self, args, body):
1472
"""Make a remote call with a readv array.
1474
The body is encoded with one line per readv offset pair. The numbers in
1475
each pair are separated by a comma, and no trailing \n is emitted.
1477
bytes = _encode_tuple(args)
1478
self._request.accept_bytes(bytes)
1479
readv_bytes = self._serialise_offsets(body)
1480
bytes = self._encode_bulk_data(readv_bytes)
1481
self._request.accept_bytes(bytes)
1482
self._request.finished_writing()
1484
def cancel_read_body(self):
1485
"""After expecting a body, a response code may indicate one otherwise.
1487
This method lets the domain client inform the protocol that no body
1488
will be transmitted. This is a terminal method: after calling it the
1489
protocol is not able to be used further.
1491
self._request.finished_reading()
1493
def read_response_tuple(self, expect_body=False):
1494
"""Read a response tuple from the wire.
1496
This should only be called once.
1498
result = self._recv_tuple()
1500
self._request.finished_reading()
1503
def read_body_bytes(self, count=-1):
1504
"""Read bytes from the body, decoding into a byte stream.
1506
We read all bytes at once to ensure we've checked the trailer for
1507
errors, and then feed the buffer back as read_body_bytes is called.
1509
if self._body_buffer is not None:
1510
return self._body_buffer.read(count)
1511
_body_decoder = LengthPrefixedBodyDecoder()
1513
while not _body_decoder.finished_reading:
1514
bytes_wanted = _body_decoder.next_read_size()
1515
bytes = self._request.read_bytes(bytes_wanted)
1516
_body_decoder.accept_bytes(bytes)
1517
self._request.finished_reading()
1518
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1519
# XXX: TODO check the trailer result.
1520
return self._body_buffer.read(count)
959
1522
def _recv_tuple(self):
960
self._ensure_connection()
961
return SmartProtocolBase._recv_tuple(self)
963
def _recv_trailer(self):
964
self._ensure_connection()
965
return SmartProtocolBase._recv_trailer(self)
967
def disconnect(self):
968
"""Close connection to the server"""
973
def _call(self, *args):
974
self._send_tuple(args)
975
return self._recv_tuple()
977
def _call_with_upload(self, method, args, body):
978
"""Call an rpc, supplying bulk upload data.
980
:param method: method name to call
981
:param args: parameter args tuple
982
:param body: upload body as a byte string
984
self._send_tuple((method,) + args)
985
self._send_bulk_data(body)
986
return self._recv_tuple()
1523
"""Receive a tuple from the medium request."""
1525
while not line or line[-1] != '\n':
1526
# TODO: this is inefficient - but tuples are short.
1527
new_char = self._request.read_bytes(1)
1529
assert new_char != '', "end of file reading from server."
1530
return _decode_tuple(line)
988
1532
def query_version(self):
989
1533
"""Return protocol version number of the server."""
990
# XXX: should make sure it's empty
991
self._send_tuple(('hello',))
992
resp = self._recv_tuple()
1535
resp = self.read_response_tuple()
993
1536
if resp == ('ok', '1'):
996
1539
raise errors.SmartProtocolError("bad response %r" % (resp,))
999
class SmartTCPTransport(SmartTransport):
1000
"""Connection to smart server over plain tcp"""
1002
def __init__(self, url, clone_from=None):
1003
super(SmartTCPTransport, self).__init__(url, clone_from)
1005
self._port = int(self._port)
1006
except (ValueError, TypeError), e:
1007
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1010
def _connect_to_server(self):
1542
class SmartClientMedium(object):
1543
"""Smart client is a medium for sending smart protocol requests over."""
1545
def disconnect(self):
1546
"""If this medium maintains a persistent connection, close it.
1548
The default implementation does nothing.
1552
class SmartClientStreamMedium(SmartClientMedium):
1553
"""Stream based medium common class.
1555
SmartClientStreamMediums operate on a stream. All subclasses use a common
1556
SmartClientStreamMediumRequest for their requests, and should implement
1557
_accept_bytes and _read_bytes to allow the request objects to send and
1562
self._current_request = None
1564
def accept_bytes(self, bytes):
1565
self._accept_bytes(bytes)
1568
"""The SmartClientStreamMedium knows how to close the stream when it is
1574
"""Flush the output stream.
1576
This method is used by the SmartClientStreamMediumRequest to ensure that
1577
all data for a request is sent, to avoid long timeouts or deadlocks.
1579
raise NotImplementedError(self._flush)
1581
def get_request(self):
1582
"""See SmartClientMedium.get_request().
1584
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1587
return SmartClientStreamMediumRequest(self)
1589
def read_bytes(self, count):
1590
return self._read_bytes(count)
1593
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1594
"""A client medium using simple pipes.
1596
This client does not manage the pipes: it assumes they will always be open.
1599
def __init__(self, readable_pipe, writeable_pipe):
1600
SmartClientStreamMedium.__init__(self)
1601
self._readable_pipe = readable_pipe
1602
self._writeable_pipe = writeable_pipe
1604
def _accept_bytes(self, bytes):
1605
"""See SmartClientStreamMedium.accept_bytes."""
1606
self._writeable_pipe.write(bytes)
1609
"""See SmartClientStreamMedium._flush()."""
1610
self._writeable_pipe.flush()
1612
def _read_bytes(self, count):
1613
"""See SmartClientStreamMedium._read_bytes."""
1614
return self._readable_pipe.read(count)
1617
class SmartSSHClientMedium(SmartClientStreamMedium):
1618
"""A client medium using SSH."""
1620
def __init__(self, host, port=None, username=None, password=None,
1622
"""Creates a client that will connect on the first use.
1624
:param vendor: An optional override for the ssh vendor to use. See
1625
bzrlib.transport.ssh for details on ssh vendors.
1627
SmartClientStreamMedium.__init__(self)
1628
self._connected = False
1630
self._password = password
1632
self._username = username
1633
self._read_from = None
1634
self._ssh_connection = None
1635
self._vendor = vendor
1636
self._write_to = None
1638
def _accept_bytes(self, bytes):
1639
"""See SmartClientStreamMedium.accept_bytes."""
1640
self._ensure_connection()
1641
self._write_to.write(bytes)
1643
def disconnect(self):
1644
"""See SmartClientMedium.disconnect()."""
1645
if not self._connected:
1647
self._read_from.close()
1648
self._write_to.close()
1649
self._ssh_connection.close()
1650
self._connected = False
1652
def _ensure_connection(self):
1653
"""Connect this medium if not already connected."""
1656
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1657
if self._vendor is None:
1658
vendor = ssh._get_ssh_vendor()
1660
vendor = self._vendor
1661
self._ssh_connection = vendor.connect_ssh(self._username,
1662
self._password, self._host, self._port,
1663
command=[executable, 'serve', '--inet', '--directory=/',
1665
self._read_from, self._write_to = \
1666
self._ssh_connection.get_filelike_channels()
1667
self._connected = True
1670
"""See SmartClientStreamMedium._flush()."""
1671
self._write_to.flush()
1673
def _read_bytes(self, count):
1674
"""See SmartClientStreamMedium.read_bytes."""
1675
if not self._connected:
1676
raise errors.MediumNotConnected(self)
1677
return self._read_from.read(count)
1680
class SmartTCPClientMedium(SmartClientStreamMedium):
1681
"""A client medium using TCP."""
1683
def __init__(self, host, port):
1684
"""Creates a client that will connect on the first use."""
1685
SmartClientStreamMedium.__init__(self)
1686
self._connected = False
1691
def _accept_bytes(self, bytes):
1692
"""See SmartClientMedium.accept_bytes."""
1693
self._ensure_connection()
1694
self._socket.sendall(bytes)
1696
def disconnect(self):
1697
"""See SmartClientMedium.disconnect()."""
1698
if not self._connected:
1700
self._socket.close()
1702
self._connected = False
1704
def _ensure_connection(self):
1705
"""Connect this medium if not already connected."""
1011
1708
self._socket = socket.socket()
1012
1709
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1013
1710
result = self._socket.connect_ex((self._host, int(self._port)))
1015
1712
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1016
1713
(self._host, self._port, os.strerror(result)))
1017
# TODO: May be more efficient to just treat them as sockets
1018
# throughout? But what about pipes to ssh?...
1019
to_server = self._socket.makefile('w')
1020
from_server = self._socket.makefile('r')
1021
return from_server, to_server
1023
def disconnect(self):
1024
super(SmartTCPTransport, self).disconnect()
1025
# XXX: Is closing the socket as well as closing the files really
1027
if self._socket is not None:
1028
self._socket.close()
1714
self._connected = True
1717
"""See SmartClientStreamMedium._flush().
1719
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1720
add a means to do a flush, but that can be done in the future.
1723
def _read_bytes(self, count):
1724
"""See SmartClientMedium.read_bytes."""
1725
if not self._connected:
1726
raise errors.MediumNotConnected(self)
1727
return self._socket.recv(count)
1730
class SmartTCPTransport(SmartTransport):
1731
"""Connection to smart server over plain tcp.
1733
This is essentially just a factory to get 'RemoteTransport(url,
1734
SmartTCPClientMedium).
1737
def __init__(self, url):
1738
_scheme, _username, _password, _host, _port, _path = \
1739
transport.split_url(url)
1742
except (ValueError, TypeError), e:
1743
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1744
medium = SmartTCPClientMedium(_host, _port)
1745
super(SmartTCPTransport, self).__init__(url, medium=medium)
1031
1748
class SmartSSHTransport(SmartTransport):
1032
"""Connection to smart server over SSH."""
1034
def __init__(self, url, clone_from=None):
1035
# TODO: all this probably belongs in the parent class.
1036
super(SmartSSHTransport, self).__init__(url, clone_from)
1749
"""Connection to smart server over SSH.
1751
This is essentially just a factory to get 'RemoteTransport(url,
1752
SmartSSHClientMedium).
1755
def __init__(self, url):
1756
_scheme, _username, _password, _host, _port, _path = \
1757
transport.split_url(url)
1038
if self._port is not None:
1039
self._port = int(self._port)
1759
if _port is not None:
1040
1761
except (ValueError, TypeError), e:
1041
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1043
def _connect_to_server(self):
1044
from bzrlib.transport import ssh
1045
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1046
vendor = ssh._get_ssh_vendor()
1047
self._ssh_connection = vendor.connect_ssh(self._username,
1048
self._password, self._host, self._port,
1049
command=[executable, 'serve', '--inet', '--directory=/',
1051
return self._ssh_connection.get_filelike_channels()
1053
def disconnect(self):
1054
super(SmartSSHTransport, self).disconnect()
1055
self._ssh_connection.close()
1762
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1764
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1765
super(SmartSSHTransport, self).__init__(url, medium=medium)
1058
1768
def get_test_permutations():
1059
"""Return (transport, server) permutations for testing"""
1769
"""Return (transport, server) permutations for testing."""
1770
### We may need a little more test framework support to construct an
1771
### appropriate RemoteTransport in the future.
1060
1772
return [(SmartTCPTransport, SmartTCPServer_for_testing)]