225
256
for start, length in offsets:
226
257
txt.append('%d,%d' % (start, length))
227
258
return '\n'.join(txt)
229
def _write_and_flush(self, bytes):
230
"""Write bytes to self._out and flush it."""
231
# XXX: this will be inefficient. Just ask Robert.
232
self._out.write(bytes)
236
class SmartStreamServer(SmartProtocolBase):
261
class SmartServerRequestProtocolOne(SmartProtocolBase):
262
"""Server-side encoding and decoding logic for smart version 1."""
264
def __init__(self, backing_transport, write_func):
265
self._backing_transport = backing_transport
266
self.excess_buffer = ''
267
self._finished = False
269
self.has_dispatched = False
271
self._body_decoder = None
272
self._write_func = write_func
274
def accept_bytes(self, bytes):
275
"""Take bytes, and advance the internal state machine appropriately.
277
:param bytes: must be a byte string
279
assert isinstance(bytes, str)
280
self.in_buffer += bytes
281
if not self.has_dispatched:
282
if '\n' not in self.in_buffer:
283
# no command line yet
285
self.has_dispatched = True
287
first_line, self.in_buffer = self.in_buffer.split('\n', 1)
289
req_args = _decode_tuple(first_line)
290
self.request = SmartServerRequestHandler(
291
self._backing_transport)
292
self.request.dispatch_command(req_args[0], req_args[1:])
293
if self.request.finished_reading:
295
self.excess_buffer = self.in_buffer
297
self._send_response(self.request.response.args,
298
self.request.response.body)
299
except KeyboardInterrupt:
301
except Exception, exception:
302
# everything else: pass to client, flush, and quit
303
self._send_response(('error', str(exception)))
306
if self.has_dispatched:
308
# nothing to do.XXX: this routine should be a single state
310
self.excess_buffer += self.in_buffer
313
if self._body_decoder is None:
314
self._body_decoder = LengthPrefixedBodyDecoder()
315
self._body_decoder.accept_bytes(self.in_buffer)
316
self.in_buffer = self._body_decoder.unused_data
317
body_data = self._body_decoder.read_pending_data()
318
self.request.accept_body(body_data)
319
if self._body_decoder.finished_reading:
320
self.request.end_of_body()
321
assert self.request.finished_reading, \
322
"no more body, request not finished"
323
if self.request.response is not None:
324
self._send_response(self.request.response.args,
325
self.request.response.body)
326
self.excess_buffer = self.in_buffer
329
assert not self.request.finished_reading, \
330
"no response and we have finished reading."
332
def _send_response(self, args, body=None):
333
"""Send a smart server response down the output stream."""
334
assert not self._finished, 'response already sent'
335
self._finished = True
336
self._write_func(_encode_tuple(args))
338
assert isinstance(body, str), 'body must be a str'
339
bytes = self._encode_bulk_data(body)
340
self._write_func(bytes)
342
def next_read_size(self):
345
if self._body_decoder is None:
348
return self._body_decoder.next_read_size()
351
class LengthPrefixedBodyDecoder(object):
352
"""Decodes the length-prefixed bulk data."""
355
self.bytes_left = None
356
self.finished_reading = False
357
self.unused_data = ''
358
self.state_accept = self._state_accept_expecting_length
359
self.state_read = self._state_read_no_data
361
self._trailer_buffer = ''
363
def accept_bytes(self, bytes):
364
"""Decode as much of bytes as possible.
366
If 'bytes' contains too much data it will be appended to
369
finished_reading will be set when no more data is required. Further
370
data will be appended to self.unused_data.
372
# accept_bytes is allowed to change the state
373
current_state = self.state_accept
374
self.state_accept(bytes)
375
while current_state != self.state_accept:
376
current_state = self.state_accept
377
self.state_accept('')
379
def next_read_size(self):
380
if self.bytes_left is not None:
381
# Ideally we want to read all the remainder of the body and the
383
return self.bytes_left + 5
384
elif self.state_accept == self._state_accept_reading_trailer:
385
# Just the trailer left
386
return 5 - len(self._trailer_buffer)
387
elif self.state_accept == self._state_accept_expecting_length:
388
# There's still at least 6 bytes left ('\n' to end the length, plus
392
# Reading excess data. Either way, 1 byte at a time is fine.
395
def read_pending_data(self):
396
"""Return any pending data that has been decoded."""
397
return self.state_read()
399
def _state_accept_expecting_length(self, bytes):
400
self._in_buffer += bytes
401
pos = self._in_buffer.find('\n')
404
self.bytes_left = int(self._in_buffer[:pos])
405
self._in_buffer = self._in_buffer[pos+1:]
406
self.bytes_left -= len(self._in_buffer)
407
self.state_accept = self._state_accept_reading_body
408
self.state_read = self._state_read_in_buffer
410
def _state_accept_reading_body(self, bytes):
411
self._in_buffer += bytes
412
self.bytes_left -= len(bytes)
413
if self.bytes_left <= 0:
415
if self.bytes_left != 0:
416
self._trailer_buffer = self._in_buffer[self.bytes_left:]
417
self._in_buffer = self._in_buffer[:self.bytes_left]
418
self.bytes_left = None
419
self.state_accept = self._state_accept_reading_trailer
421
def _state_accept_reading_trailer(self, bytes):
422
self._trailer_buffer += bytes
423
# TODO: what if the trailer does not match "done\n"? Should this raise
424
# a ProtocolViolation exception?
425
if self._trailer_buffer.startswith('done\n'):
426
self.unused_data = self._trailer_buffer[len('done\n'):]
427
self.state_accept = self._state_accept_reading_unused
428
self.finished_reading = True
430
def _state_accept_reading_unused(self, bytes):
431
self.unused_data += bytes
433
def _state_read_no_data(self):
436
def _state_read_in_buffer(self):
437
result = self._in_buffer
442
class SmartServerStreamMedium(object):
237
443
"""Handles smart commands coming over a stream.
239
445
The stream may be a pipe connected to sshd, or a tcp socket, or an
253
539
:param out_file: Python file to write responses.
254
540
:param backing_transport: Transport for the directory served.
542
SmartServerStreamMedium.__init__(self, backing_transport)
256
543
self._in = in_file
257
544
self._out = out_file
258
self.smart_server = SmartServer(backing_transport)
259
# server can call back to us to get bulk data - this is not really
260
# ideal, they should get it per request instead
261
self.smart_server._recv_body = self._recv_bulk
263
def _recv_tuple(self):
264
"""Read a request from the client and return as a tuple.
266
Returns None at end of file (if the client closed the connection.)
268
return _recv_tuple(self._in)
270
def _send_tuple(self, args):
271
"""Send response header"""
272
return self._write_and_flush(_encode_tuple(args))
274
def _send_error_and_disconnect(self, exception):
275
self._send_tuple(('error', str(exception)))
279
def _serve_one_request(self):
280
"""Read one request from input, process, send back a response.
282
:return: False if the server should terminate, otherwise None.
284
req_args = self._recv_tuple()
286
# client closed connection
287
return False # shutdown server
289
response = self.smart_server.dispatch_command(req_args[0], req_args[1:])
290
self._send_tuple(response.args)
291
if response.body is not None:
292
self._send_bulk_data(response.body)
293
except KeyboardInterrupt:
296
# everything else: pass to client, flush, and quit
297
self._send_error_and_disconnect(e)
301
"""Serve requests until the client disconnects."""
302
# Keep a reference to stderr because the sys module's globals get set to
303
# None during interpreter shutdown.
304
from sys import stderr
306
while self._serve_one_request() != False:
309
stderr.write("%s terminating on exception %s\n" % (self, e))
546
def _serve_one_request_unguarded(self, protocol):
548
bytes_to_read = protocol.next_read_size()
549
if bytes_to_read == 0:
550
# Finished serving this request.
553
bytes = self._in.read(bytes_to_read)
555
# Connection has been closed.
559
protocol.accept_bytes(bytes)
561
def terminate_due_to_error(self):
562
# TODO: This should log to a server log file, but no such thing
563
# exists yet. Andrew Bennetts 2006-09-29.
567
def _write_out(self, bytes):
568
self._out.write(bytes)
313
571
class SmartServerResponse(object):
314
"""Response generated by SmartServer."""
572
"""Response generated by SmartServerRequestHandler."""
316
574
def __init__(self, args, body=None):
320
# XXX: TODO: Create a SmartServerRequest which will take the responsibility
578
# XXX: TODO: Create a SmartServerRequestHandler which will take the responsibility
321
579
# for delivering the data for a request. This could be done with as the
322
580
# StreamServer, though that would create conflation between request and response
323
581
# which may be undesirable.
326
class SmartServer(object):
584
class SmartServerRequestHandler(object):
327
585
"""Protocol logic for smart server.
329
587
This doesn't handle serialization at all, it just processes requests and
330
588
creates responses.
333
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServer not contain
334
# encoding or decoding logic to allow the wire protocol to vary from the
335
# object protocol: we will want to tweak the wire protocol separate from
336
# the object model, and ideally we will be able to do that without having
337
# a SmartServer subclass for each wire protocol, rather just a Protocol
591
# IMPORTANT FOR IMPLEMENTORS: It is important that SmartServerRequestHandler
592
# not contain encoding or decoding logic to allow the wire protocol to vary
593
# from the object protocol: we will want to tweak the wire protocol separate
594
# from the object model, and ideally we will be able to do that without
595
# having a SmartServerRequestHandler subclass for each wire protocol, rather
596
# just a Protocol subclass.
340
598
# TODO: Better way of representing the body for commands that take it,
341
599
# and allow it to be streamed into the server.
343
601
def __init__(self, backing_transport):
344
602
self._backing_transport = backing_transport
603
self._converted_command = False
604
self.finished_reading = False
605
self._body_bytes = ''
608
def accept_body(self, bytes):
611
This should be overriden for each command that desired body data to
612
handle the right format of that data. I.e. plain bytes, a bundle etc.
614
The deserialisation into that format should be done in the Protocol
615
object. Set self.desired_body_format to the format your method will
618
# default fallback is to accumulate bytes.
619
self._body_bytes += bytes
621
def _end_of_body_handler(self):
622
"""An unimplemented end of body handler."""
623
raise NotImplementedError(self._end_of_body_handler)
346
625
def do_hello(self):
347
626
"""Answer a version request with my version."""
917
1257
def list_dir(self, relpath):
918
resp = self._client._call('list_dir',
919
self._remote_path(relpath))
1258
resp = self._call2('list_dir', self._remote_path(relpath))
920
1259
if resp[0] == 'names':
921
1260
return [name.encode('ascii') for name in resp[1:]]
923
1262
self._translate_error(resp)
925
1264
def iter_files_recursive(self):
926
resp = self._client._call('iter_files_recursive',
927
self._remote_path(''))
1265
resp = self._call2('iter_files_recursive', self._remote_path(''))
928
1266
if resp[0] == 'names':
931
1269
self._translate_error(resp)
934
class SmartStreamClient(SmartProtocolBase):
935
"""Connection to smart server over two streams"""
937
def __init__(self, connect_func):
938
self._connect_func = connect_func
939
self._connected = False
944
def _ensure_connection(self):
945
if not self._connected:
946
self._in, self._out = self._connect_func()
947
self._connected = True
949
def _send_tuple(self, args):
950
self._ensure_connection()
951
return self._write_and_flush(_encode_tuple(args))
953
def _send_bulk_data(self, body):
954
self._ensure_connection()
955
SmartProtocolBase._send_bulk_data(self, body)
957
def _recv_bulk(self):
958
self._ensure_connection()
959
return SmartProtocolBase._recv_bulk(self)
1272
class SmartClientMediumRequest(object):
1273
"""A request on a SmartClientMedium.
1275
Each request allows bytes to be provided to it via accept_bytes, and then
1276
the response bytes to be read via read_bytes.
1279
request.accept_bytes('123')
1280
request.finished_writing()
1281
result = request.read_bytes(3)
1282
request.finished_reading()
1284
It is up to the individual SmartClientMedium whether multiple concurrent
1285
requests can exist. See SmartClientMedium.get_request to obtain instances
1286
of SmartClientMediumRequest, and the concrete Medium you are using for
1287
details on concurrency and pipelining.
1290
def __init__(self, medium):
1291
"""Construct a SmartClientMediumRequest for the medium medium."""
1292
self._medium = medium
1293
# we track state by constants - we may want to use the same
1294
# pattern as BodyReader if it gets more complex.
1295
# valid states are: "writing", "reading", "done"
1296
self._state = "writing"
1298
def accept_bytes(self, bytes):
1299
"""Accept bytes for inclusion in this request.
1301
This method may not be be called after finished_writing() has been
1302
called. It depends upon the Medium whether or not the bytes will be
1303
immediately transmitted. Message based Mediums will tend to buffer the
1304
bytes until finished_writing() is called.
1306
:param bytes: A bytestring.
1308
if self._state != "writing":
1309
raise errors.WritingCompleted(self)
1310
self._accept_bytes(bytes)
1312
def _accept_bytes(self, bytes):
1313
"""Helper for accept_bytes.
1315
Accept_bytes checks the state of the request to determing if bytes
1316
should be accepted. After that it hands off to _accept_bytes to do the
1319
raise NotImplementedError(self._accept_bytes)
1321
def finished_reading(self):
1322
"""Inform the request that all desired data has been read.
1324
This will remove the request from the pipeline for its medium (if the
1325
medium supports pipelining) and any further calls to methods on the
1326
request will raise ReadingCompleted.
1328
if self._state == "writing":
1329
raise errors.WritingNotComplete(self)
1330
if self._state != "reading":
1331
raise errors.ReadingCompleted(self)
1332
self._state = "done"
1333
self._finished_reading()
1335
def _finished_reading(self):
1336
"""Helper for finished_reading.
1338
finished_reading checks the state of the request to determine if
1339
finished_reading is allowed, and if it is hands off to _finished_reading
1340
to perform the action.
1342
raise NotImplementedError(self._finished_reading)
1344
def finished_writing(self):
1345
"""Finish the writing phase of this request.
1347
This will flush all pending data for this request along the medium.
1348
After calling finished_writing, you may not call accept_bytes anymore.
1350
if self._state != "writing":
1351
raise errors.WritingCompleted(self)
1352
self._state = "reading"
1353
self._finished_writing()
1355
def _finished_writing(self):
1356
"""Helper for finished_writing.
1358
finished_writing checks the state of the request to determine if
1359
finished_writing is allowed, and if it is hands off to _finished_writing
1360
to perform the action.
1362
raise NotImplementedError(self._finished_writing)
1364
def read_bytes(self, count):
1365
"""Read bytes from this requests response.
1367
This method will block and wait for count bytes to be read. It may not
1368
be invoked until finished_writing() has been called - this is to ensure
1369
a message-based approach to requests, for compatability with message
1370
based mediums like HTTP.
1372
if self._state == "writing":
1373
raise errors.WritingNotComplete(self)
1374
if self._state != "reading":
1375
raise errors.ReadingCompleted(self)
1376
return self._read_bytes(count)
1378
def _read_bytes(self, count):
1379
"""Helper for read_bytes.
1381
read_bytes checks the state of the request to determing if bytes
1382
should be read. After that it hands off to _read_bytes to do the
1385
raise NotImplementedError(self._read_bytes)
1388
class SmartClientStreamMediumRequest(SmartClientMediumRequest):
1389
"""A SmartClientMediumRequest that works with an SmartClientStreamMedium."""
1391
def __init__(self, medium):
1392
SmartClientMediumRequest.__init__(self, medium)
1393
# check that we are safe concurrency wise. If some streams start
1394
# allowing concurrent requests - i.e. via multiplexing - then this
1395
# assert should be moved to SmartClientStreamMedium.get_request,
1396
# and the setting/unsetting of _current_request likewise moved into
1397
# that class : but its unneeded overhead for now. RBC 20060922
1398
if self._medium._current_request is not None:
1399
raise errors.TooManyConcurrentRequests(self._medium)
1400
self._medium._current_request = self
1402
def _accept_bytes(self, bytes):
1403
"""See SmartClientMediumRequest._accept_bytes.
1405
This forwards to self._medium._accept_bytes because we are operating
1406
on the mediums stream.
1408
self._medium._accept_bytes(bytes)
1410
def _finished_reading(self):
1411
"""See SmartClientMediumRequest._finished_reading.
1413
This clears the _current_request on self._medium to allow a new
1414
request to be created.
1416
assert self._medium._current_request is self
1417
self._medium._current_request = None
1419
def _finished_writing(self):
1420
"""See SmartClientMediumRequest._finished_writing.
1422
This invokes self._medium._flush to ensure all bytes are transmitted.
1424
self._medium._flush()
1426
def _read_bytes(self, count):
1427
"""See SmartClientMediumRequest._read_bytes.
1429
This forwards to self._medium._read_bytes because we are operating
1430
on the mediums stream.
1432
return self._medium._read_bytes(count)
1435
class SmartClientRequestProtocolOne(SmartProtocolBase):
1436
"""The client-side protocol for smart version 1."""
1438
def __init__(self, request):
1439
"""Construct a SmartClientRequestProtocolOne.
1441
:param request: A SmartClientMediumRequest to serialise onto and
1444
self._request = request
1445
self._body_buffer = None
1447
def call(self, *args):
1448
bytes = _encode_tuple(args)
1449
self._request.accept_bytes(bytes)
1450
self._request.finished_writing()
1452
def call_with_body_bytes(self, args, body):
1453
"""Make a remote call of args with body bytes 'body'.
1455
After calling this, call read_response_tuple to find the result out.
1457
bytes = _encode_tuple(args)
1458
self._request.accept_bytes(bytes)
1459
bytes = self._encode_bulk_data(body)
1460
self._request.accept_bytes(bytes)
1461
self._request.finished_writing()
1463
def call_with_body_readv_array(self, args, body):
1464
"""Make a remote call with a readv array.
1466
The body is encoded with one line per readv offset pair. The numbers in
1467
each pair are separated by a comma, and no trailing \n is emitted.
1469
bytes = _encode_tuple(args)
1470
self._request.accept_bytes(bytes)
1471
readv_bytes = self._serialise_offsets(body)
1472
bytes = self._encode_bulk_data(readv_bytes)
1473
self._request.accept_bytes(bytes)
1474
self._request.finished_writing()
1476
def cancel_read_body(self):
1477
"""After expecting a body, a response code may indicate one otherwise.
1479
This method lets the domain client inform the protocol that no body
1480
will be transmitted. This is a terminal method: after calling it the
1481
protocol is not able to be used further.
1483
self._request.finished_reading()
1485
def read_response_tuple(self, expect_body=False):
1486
"""Read a response tuple from the wire.
1488
This should only be called once.
1490
result = self._recv_tuple()
1492
self._request.finished_reading()
1495
def read_body_bytes(self, count=-1):
1496
"""Read bytes from the body, decoding into a byte stream.
1498
We read all bytes at once to ensure we've checked the trailer for
1499
errors, and then feed the buffer back as read_body_bytes is called.
1501
if self._body_buffer is not None:
1502
return self._body_buffer.read(count)
1503
_body_decoder = LengthPrefixedBodyDecoder()
1505
while not _body_decoder.finished_reading:
1506
bytes_wanted = _body_decoder.next_read_size()
1507
bytes = self._request.read_bytes(bytes_wanted)
1508
_body_decoder.accept_bytes(bytes)
1509
self._request.finished_reading()
1510
self._body_buffer = StringIO(_body_decoder.read_pending_data())
1511
# XXX: TODO check the trailer result.
1512
return self._body_buffer.read(count)
961
1514
def _recv_tuple(self):
962
self._ensure_connection()
963
return SmartProtocolBase._recv_tuple(self)
965
def _recv_trailer(self):
966
self._ensure_connection()
967
return SmartProtocolBase._recv_trailer(self)
969
def disconnect(self):
970
"""Close connection to the server"""
975
def _call(self, *args):
976
self._send_tuple(args)
977
return self._recv_tuple()
979
def _call_with_upload(self, method, args, body):
980
"""Call an rpc, supplying bulk upload data.
982
:param method: method name to call
983
:param args: parameter args tuple
984
:param body: upload body as a byte string
986
self._send_tuple((method,) + args)
987
self._send_bulk_data(body)
988
return self._recv_tuple()
1515
"""Receive a tuple from the medium request."""
1517
while not line or line[-1] != '\n':
1518
# TODO: this is inefficient - but tuples are short.
1519
new_char = self._request.read_bytes(1)
1521
assert new_char != '', "end of file reading from server."
1522
return _decode_tuple(line)
990
1524
def query_version(self):
991
1525
"""Return protocol version number of the server."""
992
# XXX: should make sure it's empty
993
self._send_tuple(('hello',))
994
resp = self._recv_tuple()
1527
resp = self.read_response_tuple()
995
1528
if resp == ('ok', '1'):
998
1531
raise errors.SmartProtocolError("bad response %r" % (resp,))
1001
class SmartTCPTransport(SmartTransport):
1002
"""Connection to smart server over plain tcp"""
1004
def __init__(self, url, clone_from=None):
1005
super(SmartTCPTransport, self).__init__(url, clone_from)
1007
self._port = int(self._port)
1008
except (ValueError, TypeError), e:
1009
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1012
def _connect_to_server(self):
1534
class SmartClientMedium(object):
1535
"""Smart client is a medium for sending smart protocol requests over."""
1537
def disconnect(self):
1538
"""If this medium maintains a persistent connection, close it.
1540
The default implementation does nothing.
1544
class SmartClientStreamMedium(SmartClientMedium):
1545
"""Stream based medium common class.
1547
SmartClientStreamMediums operate on a stream. All subclasses use a common
1548
SmartClientStreamMediumRequest for their requests, and should implement
1549
_accept_bytes and _read_bytes to allow the request objects to send and
1554
self._current_request = None
1556
def accept_bytes(self, bytes):
1557
self._accept_bytes(bytes)
1560
"""The SmartClientStreamMedium knows how to close the stream when it is
1566
"""Flush the output stream.
1568
This method is used by the SmartClientStreamMediumRequest to ensure that
1569
all data for a request is sent, to avoid long timeouts or deadlocks.
1571
raise NotImplementedError(self._flush)
1573
def get_request(self):
1574
"""See SmartClientMedium.get_request().
1576
SmartClientStreamMedium always returns a SmartClientStreamMediumRequest
1579
return SmartClientStreamMediumRequest(self)
1581
def read_bytes(self, count):
1582
return self._read_bytes(count)
1585
class SmartSimplePipesClientMedium(SmartClientStreamMedium):
1586
"""A client medium using simple pipes.
1588
This client does not manage the pipes: it assumes they will always be open.
1591
def __init__(self, readable_pipe, writeable_pipe):
1592
SmartClientStreamMedium.__init__(self)
1593
self._readable_pipe = readable_pipe
1594
self._writeable_pipe = writeable_pipe
1596
def _accept_bytes(self, bytes):
1597
"""See SmartClientStreamMedium.accept_bytes."""
1598
self._writeable_pipe.write(bytes)
1601
"""See SmartClientStreamMedium._flush()."""
1602
self._writeable_pipe.flush()
1604
def _read_bytes(self, count):
1605
"""See SmartClientStreamMedium._read_bytes."""
1606
return self._readable_pipe.read(count)
1609
class SmartSSHClientMedium(SmartClientStreamMedium):
1610
"""A client medium using SSH."""
1612
def __init__(self, host, port=None, username=None, password=None,
1614
"""Creates a client that will connect on the first use.
1616
:param vendor: An optional override for the ssh vendor to use. See
1617
bzrlib.transport.ssh for details on ssh vendors.
1619
SmartClientStreamMedium.__init__(self)
1620
self._connected = False
1622
self._password = password
1624
self._username = username
1625
self._read_from = None
1626
self._ssh_connection = None
1627
self._vendor = vendor
1628
self._write_to = None
1630
def _accept_bytes(self, bytes):
1631
"""See SmartClientStreamMedium.accept_bytes."""
1632
self._ensure_connection()
1633
self._write_to.write(bytes)
1635
def disconnect(self):
1636
"""See SmartClientMedium.disconnect()."""
1637
if not self._connected:
1639
self._read_from.close()
1640
self._write_to.close()
1641
self._ssh_connection.close()
1642
self._connected = False
1644
def _ensure_connection(self):
1645
"""Connect this medium if not already connected."""
1648
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1649
if self._vendor is None:
1650
vendor = ssh._get_ssh_vendor()
1652
vendor = self._vendor
1653
self._ssh_connection = vendor.connect_ssh(self._username,
1654
self._password, self._host, self._port,
1655
command=[executable, 'serve', '--inet', '--directory=/',
1657
self._read_from, self._write_to = \
1658
self._ssh_connection.get_filelike_channels()
1659
self._connected = True
1662
"""See SmartClientStreamMedium._flush()."""
1663
self._write_to.flush()
1665
def _read_bytes(self, count):
1666
"""See SmartClientStreamMedium.read_bytes."""
1667
if not self._connected:
1668
raise errors.MediumNotConnected(self)
1669
return self._read_from.read(count)
1672
class SmartTCPClientMedium(SmartClientStreamMedium):
1673
"""A client medium using TCP."""
1675
def __init__(self, host, port):
1676
"""Creates a client that will connect on the first use."""
1677
SmartClientStreamMedium.__init__(self)
1678
self._connected = False
1683
def _accept_bytes(self, bytes):
1684
"""See SmartClientMedium.accept_bytes."""
1685
self._ensure_connection()
1686
self._socket.sendall(bytes)
1688
def disconnect(self):
1689
"""See SmartClientMedium.disconnect()."""
1690
if not self._connected:
1692
self._socket.close()
1694
self._connected = False
1696
def _ensure_connection(self):
1697
"""Connect this medium if not already connected."""
1013
1700
self._socket = socket.socket()
1014
1701
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
1015
1702
result = self._socket.connect_ex((self._host, int(self._port)))
1017
1704
raise errors.ConnectionError("failed to connect to %s:%d: %s" %
1018
1705
(self._host, self._port, os.strerror(result)))
1019
# TODO: May be more efficient to just treat them as sockets
1020
# throughout? But what about pipes to ssh?...
1021
to_server = self._socket.makefile('w')
1022
from_server = self._socket.makefile('r')
1023
return from_server, to_server
1025
def disconnect(self):
1026
super(SmartTCPTransport, self).disconnect()
1027
# XXX: Is closing the socket as well as closing the files really
1029
if self._socket is not None:
1030
self._socket.close()
1033
from bzrlib.transport import sftp, ssh
1034
except errors.ParamikoNotPresent:
1035
# no paramiko, no SSHTransport.
1038
class SmartSSHTransport(SmartTransport):
1039
"""Connection to smart server over SSH."""
1041
def __init__(self, url, clone_from=None):
1042
# TODO: all this probably belongs in the parent class.
1043
super(SmartSSHTransport, self).__init__(url, clone_from)
1045
if self._port is not None:
1046
self._port = int(self._port)
1047
except (ValueError, TypeError), e:
1048
raise errors.InvalidURL(path=url, extra="invalid port %s" % self._port)
1050
def _connect_to_server(self):
1051
executable = os.environ.get('BZR_REMOTE_PATH', 'bzr')
1052
vendor = ssh._get_ssh_vendor()
1053
self._ssh_connection = vendor.connect_ssh(self._username,
1054
self._password, self._host, self._port,
1055
command=[executable, 'serve', '--inet', '--directory=/',
1057
return self._ssh_connection.get_filelike_channels()
1059
def disconnect(self):
1060
super(SmartSSHTransport, self).disconnect()
1061
self._ssh_connection.close()
1706
self._connected = True
1709
"""See SmartClientStreamMedium._flush().
1711
For TCP we do no flushing. We may want to turn off TCP_NODELAY and
1712
add a means to do a flush, but that can be done in the future.
1715
def _read_bytes(self, count):
1716
"""See SmartClientMedium.read_bytes."""
1717
if not self._connected:
1718
raise errors.MediumNotConnected(self)
1719
return self._socket.recv(count)
1722
class SmartTCPTransport(SmartTransport):
1723
"""Connection to smart server over plain tcp.
1725
This is essentially just a factory to get 'RemoteTransport(url,
1726
SmartTCPClientMedium).
1729
def __init__(self, url):
1730
_scheme, _username, _password, _host, _port, _path = \
1731
transport.split_url(url)
1734
except (ValueError, TypeError), e:
1735
raise errors.InvalidURL(path=url, extra="invalid port %s" % _port)
1736
medium = SmartTCPClientMedium(_host, _port)
1737
super(SmartTCPTransport, self).__init__(url, medium=medium)
1740
class SmartSSHTransport(SmartTransport):
1741
"""Connection to smart server over SSH.
1743
This is essentially just a factory to get 'RemoteTransport(url,
1744
SmartSSHClientMedium).
1747
def __init__(self, url):
1748
_scheme, _username, _password, _host, _port, _path = \
1749
transport.split_url(url)
1751
if _port is not None:
1753
except (ValueError, TypeError), e:
1754
raise errors.InvalidURL(path=url, extra="invalid port %s" %
1756
medium = SmartSSHClientMedium(_host, _port, _username, _password)
1757
super(SmartSSHTransport, self).__init__(url, medium=medium)
1760
class SmartHTTPTransport(SmartTransport):
1761
"""Just a way to connect between a bzr+http:// url and http://.
1763
This connection operates slightly differently than the SmartSSHTransport.
1764
It uses a plain http:// transport underneath, which defines what remote
1765
.bzr/smart URL we are connected to. From there, all paths that are sent are
1766
sent as relative paths, this way, the remote side can properly
1767
de-reference them, since it is likely doing rewrite rules to translate an
1768
HTTP path into a local path.
1771
def __init__(self, url, http_transport=None):
1772
assert url.startswith('bzr+http://')
1774
if http_transport is None:
1775
http_url = url[len('bzr+'):]
1776
self._http_transport = transport.get_transport(http_url)
1778
self._http_transport = http_transport
1779
http_medium = self._http_transport.get_smart_medium()
1780
super(SmartHTTPTransport, self).__init__(url, medium=http_medium)
1782
def _remote_path(self, relpath):
1783
"""After connecting HTTP Transport only deals in relative URLs."""
1789
def abspath(self, relpath):
1790
"""Return the full url to the given relative path.
1792
:param relpath: the relative path or path components
1793
:type relpath: str or list
1795
return self._unparse_url(self._combine_paths(self._path, relpath))
1797
def clone(self, relative_url):
1798
"""Make a new SmartHTTPTransport related to me.
1800
This is re-implemented rather than using the default
1801
SmartTransport.clone() because we must be careful about the underlying
1805
abs_url = self.abspath(relative_url)
1808
# By cloning the underlying http_transport, we are able to share the
1810
new_transport = self._http_transport.clone(relative_url)
1811
return SmartHTTPTransport(abs_url, http_transport=new_transport)
1064
1814
def get_test_permutations():
1065
"""Return (transport, server) permutations for testing"""
1815
"""Return (transport, server) permutations for testing."""
1816
### We may need a little more test framework support to construct an
1817
### appropriate RemoteTransport in the future.
1066
1818
return [(SmartTCPTransport, SmartTCPServer_for_testing)]