68
62
RemoteTCPTransport, etc.
65
# When making a readv request, cap it at requesting 5MB of data
66
_max_readv_bytes = 5*1024*1024
71
68
# IMPORTANT FOR IMPLEMENTORS: RemoteTransport MUST NOT be given encoding
72
69
# responsibilities: Put those on SmartClient or similar. This is vital for
73
70
# the ability to support multiple versions of the smart protocol over time:
74
# RemoteTransport is an adapter from the Transport object model to the
71
# RemoteTransport is an adapter from the Transport object model to the
75
72
# SmartClient model, not an encoder.
77
def __init__(self, url, clone_from=None, medium=None, _client=None):
74
# FIXME: the medium parameter should be private, only the tests requires
75
# it. It may be even clearer to define a TestRemoteTransport that handles
76
# the specific cases of providing a _client and/or a _medium, and leave
77
# RemoteTransport as an abstract class.
78
def __init__(self, url, _from_transport=None, medium=None, _client=None):
80
:param clone_from: Another RemoteTransport instance that this one is
81
being cloned from. Attributes such as credentials and the medium
83
:param medium: The medium to use for this RemoteTransport. This must be
84
supplied if clone_from is None.
81
:param _from_transport: Another RemoteTransport instance that this
82
one is being cloned from. Attributes such as the medium will
85
:param medium: The medium to use for this RemoteTransport. If None,
86
the medium from the _from_transport is shared. If both this
87
and _from_transport are None, a new medium will be built.
88
_from_transport and medium cannot both be specified.
85
90
:param _client: Override the _SmartClient used by this transport. This
86
91
should only be used for testing purposes; normally this is
87
92
determined from the medium.
89
### Technically super() here is faulty because Transport's __init__
90
### fails to take 2 parameters, and if super were to choose a silly
91
### initialisation order things would blow up.
92
if not url.endswith('/'):
94
super(RemoteTransport, self).__init__(url)
95
self._scheme, self._username, self._password, self._host, self._port, self._path = \
96
transport.split_url(url)
97
if clone_from is None:
94
super(RemoteTransport, self).__init__(
95
url, _from_transport=_from_transport)
97
# The medium is the connection, except when we need to share it with
98
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
99
# what we want to share is really the shared connection.
101
if (_from_transport is not None
102
and isinstance(_from_transport, RemoteTransport)):
103
_client = _from_transport._client
104
elif _from_transport is None:
105
# If no _from_transport is specified, we need to intialize the
109
medium, credentials = self._build_medium()
110
if 'hpss' in debug.debug_flags:
111
trace.mutter('hpss: Built a new medium: %s',
112
medium.__class__.__name__)
113
self._shared_connection = transport._SharedConnection(medium,
117
# No medium was specified, so share the medium from the
119
medium = self._shared_connection.connection
100
# credentials may be stripped from the base in some circumstances
101
# as yet to be clearly defined or documented, so copy them.
102
self._username = clone_from._username
103
# reuse same connection
104
self._medium = clone_from._medium
105
assert self._medium is not None
121
raise AssertionError(
122
"Both _from_transport (%r) and medium (%r) passed to "
123
"RemoteTransport.__init__, but these parameters are mutally "
124
"exclusive." % (_from_transport, medium))
106
126
if _client is None:
107
self._client = client._SmartClient(self._medium)
127
self._client = client._SmartClient(medium)
109
129
self._client = _client
111
def abspath(self, relpath):
112
"""Return the full url to the given relative path.
114
@param relpath: the relative path or path components
115
@type relpath: str or list
117
return self._unparse_url(self._remote_path(relpath))
119
def clone(self, relative_url):
120
"""Make a new RemoteTransport related to me, sharing the same connection.
122
This essentially opens a handle on a different remote directory.
124
if relative_url is None:
125
return RemoteTransport(self.base, self)
127
return RemoteTransport(self.abspath(relative_url), self)
131
def _build_medium(self):
132
"""Create the medium if _from_transport does not provide one.
134
The medium is analogous to the connection for ConnectedTransport: it
135
allows connection sharing.
140
def _report_activity(self, bytes, direction):
141
"""See Transport._report_activity.
143
Does nothing; the smart medium will report activity triggered by a
129
148
def is_readonly(self):
130
149
"""Smart server transport can do read/write file operations."""
131
resp = self._call2('Transport.is_readonly')
132
if resp == ('yes', ):
134
elif resp == ('no', ):
136
elif (resp == ('error', "Generic bzr smart protocol error: "
137
"bad request 'Transport.is_readonly'") or
138
resp == ('error', "Generic bzr smart protocol error: "
139
"bad request u'Transport.is_readonly'")):
151
resp = self._call2('Transport.is_readonly')
152
except errors.UnknownSmartMethod:
140
153
# XXX: nasty hack: servers before 0.16 don't have a
141
154
# 'Transport.is_readonly' verb, so we do what clients before 0.16
142
155
# did: assume False.
157
if resp == ('yes', ):
159
elif resp == ('no', ):
145
self._translate_error(resp)
146
raise errors.UnexpectedSmartServerResponse(resp)
162
raise errors.UnexpectedSmartServerResponse(resp)
148
164
def get_smart_client(self):
165
return self._get_connection()
151
167
def get_smart_medium(self):
154
def _unparse_url(self, path):
155
"""Return URL for a path.
157
:see: SFTPUrlHandling._unparse_url
159
# TODO: Eventually it should be possible to unify this with
160
# SFTPUrlHandling._unparse_url?
163
path = urllib.quote(path)
164
netloc = urllib.quote(self._host)
165
if self._username is not None:
166
netloc = '%s@%s' % (urllib.quote(self._username), netloc)
167
if self._port is not None:
168
netloc = '%s:%d' % (netloc, self._port)
169
return urlparse.urlunparse((self._scheme, netloc, path, '', '', ''))
168
return self._get_connection()
171
170
def _remote_path(self, relpath):
172
171
"""Returns the Unicode version of the absolute path for relpath."""
173
return self._combine_paths(self._path, relpath)
172
return urlutils.URL._combine_paths(self._parsed_url.path, relpath)
175
174
def _call(self, method, *args):
176
175
resp = self._call2(method, *args)
177
self._translate_error(resp)
176
self._ensure_ok(resp)
179
178
def _call2(self, method, *args):
180
179
"""Call a method on the remote server."""
181
return self._client.call(method, *args)
181
return self._client.call(method, *args)
182
except errors.ErrorFromSmartServer, err:
183
# The first argument, if present, is always a path.
185
context = {'relpath': args[0]}
188
self._translate_error(err, **context)
183
190
def _call_with_body_bytes(self, method, args, body):
184
191
"""Call a method on the remote server with body bytes."""
185
return self._client.call_with_body_bytes(method, args, body)
193
return self._client.call_with_body_bytes(method, args, body)
194
except errors.ErrorFromSmartServer, err:
195
# The first argument, if present, is always a path.
197
context = {'relpath': args[0]}
200
self._translate_error(err, **context)
187
202
def has(self, relpath):
188
203
"""Indicate whether a remote file of the given name exists or not.
286
308
if resp[0] == 'appended':
287
309
return int(resp[1])
288
self._translate_error(resp)
310
raise errors.UnexpectedSmartServerResponse(resp)
290
312
def delete(self, relpath):
291
313
resp = self._call2('delete', self._remote_path(relpath))
292
self._translate_error(resp)
294
def readv(self, relpath, offsets):
314
self._ensure_ok(resp)
316
def external_url(self):
317
"""See bzrlib.transport.Transport.external_url."""
318
# the external path for RemoteTransports is the base
321
def recommended_page_size(self):
322
"""Return the recommended page size for this transport."""
325
def _readv(self, relpath, offsets):
298
329
offsets = list(offsets)
300
331
sorted_offsets = sorted(offsets)
301
# turn the list of offsets into a stack
302
offset_stack = iter(offsets)
303
cur_offset_and_size = offset_stack.next()
304
332
coalesced = list(self._coalesce_offsets(sorted_offsets,
305
333
limit=self._max_readv_combine,
306
fudge_factor=self._bytes_to_read_before_seek))
308
request = self._medium.get_request()
309
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
310
smart_protocol.call_with_body_readv_array(
311
('readv', self._remote_path(relpath)),
312
[(c.start, c.length) for c in coalesced])
313
resp = smart_protocol.read_response_tuple(True)
315
if resp[0] != 'readv':
316
# This should raise an exception
317
smart_protocol.cancel_read_body()
318
self._translate_error(resp)
321
# FIXME: this should know how many bytes are needed, for clarity.
322
data = smart_protocol.read_body_bytes()
334
fudge_factor=self._bytes_to_read_before_seek,
335
max_size=self._max_readv_bytes))
337
# now that we've coallesced things, avoid making enormous requests
342
if c.length + cur_len > self._max_readv_bytes:
343
requests.append(cur_request)
347
cur_request.append(c)
350
requests.append(cur_request)
351
if 'hpss' in debug.debug_flags:
352
trace.mutter('%s.readv %s offsets => %s coalesced'
353
' => %s requests (%s)',
354
self.__class__.__name__, len(offsets), len(coalesced),
355
len(requests), sum(map(len, requests)))
323
356
# Cache the results, but only until they have been fulfilled
358
# turn the list of offsets into a single stack to iterate
359
offset_stack = iter(offsets)
360
# using a list so it can be modified when passing down and coming back
361
next_offset = [offset_stack.next()]
362
for cur_request in requests:
364
result = self._client.call_with_body_readv_array(
365
('readv', self._remote_path(relpath),),
366
[(c.start, c.length) for c in cur_request])
367
resp, response_handler = result
368
except errors.ErrorFromSmartServer, err:
369
self._translate_error(err, relpath)
371
if resp[0] != 'readv':
372
# This should raise an exception
373
response_handler.cancel_read_body()
374
raise errors.UnexpectedSmartServerResponse(resp)
376
for res in self._handle_response(offset_stack, cur_request,
382
def _handle_response(self, offset_stack, coalesced, response_handler,
383
data_map, next_offset):
384
cur_offset_and_size = next_offset[0]
385
# FIXME: this should know how many bytes are needed, for clarity.
386
data = response_handler.read_body_bytes()
325
388
for c_offset in coalesced:
326
389
if len(data) < c_offset.length:
327
390
raise errors.ShortReadvError(relpath, c_offset.start,
328
391
c_offset.length, actual=len(data))
329
392
for suboffset, subsize in c_offset.ranges:
330
393
key = (c_offset.start+suboffset, subsize)
331
data_map[key] = data[suboffset:suboffset+subsize]
332
data = data[c_offset.length:]
394
this_data = data[data_offset+suboffset:
395
data_offset+suboffset+subsize]
396
# Special case when the data is in-order, rather than packing
397
# into a map and then back out again. Benchmarking shows that
398
# this has 100% hit rate, but leave in the data_map work just
400
# TODO: Could we get away with using buffer() to avoid the
401
# memory copy? Callers would need to realize they may
402
# not have a real string.
403
if key == cur_offset_and_size:
404
yield cur_offset_and_size[0], this_data
405
cur_offset_and_size = next_offset[0] = offset_stack.next()
407
data_map[key] = this_data
408
data_offset += c_offset.length
334
410
# Now that we've read some data, see if we can yield anything back
335
411
while cur_offset_and_size in data_map:
336
412
this_data = data_map.pop(cur_offset_and_size)
337
413
yield cur_offset_and_size[0], this_data
338
cur_offset_and_size = offset_stack.next()
414
cur_offset_and_size = next_offset[0] = offset_stack.next()
340
416
def rename(self, rel_from, rel_to):
341
417
self._call('rename',
350
426
def rmdir(self, relpath):
351
427
resp = self._call('rmdir', self._remote_path(relpath))
353
def _translate_error(self, resp, orig_path=None):
354
"""Raise an exception from a response"""
361
elif what == 'NoSuchFile':
362
if orig_path is not None:
363
error_path = orig_path
366
raise errors.NoSuchFile(error_path)
367
elif what == 'error':
368
raise errors.SmartProtocolError(unicode(resp[1]))
369
elif what == 'FileExists':
370
raise errors.FileExists(resp[1])
371
elif what == 'DirectoryNotEmpty':
372
raise errors.DirectoryNotEmpty(resp[1])
373
elif what == 'ShortReadvError':
374
raise errors.ShortReadvError(resp[1], int(resp[2]),
375
int(resp[3]), int(resp[4]))
376
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
377
encoding = str(resp[1]) # encoding must always be a string
381
reason = str(resp[5]) # reason must always be a string
382
if val.startswith('u:'):
383
val = val[2:].decode('utf-8')
384
elif val.startswith('s:'):
385
val = val[2:].decode('base64')
386
if what == 'UnicodeDecodeError':
387
raise UnicodeDecodeError(encoding, val, start, end, reason)
388
elif what == 'UnicodeEncodeError':
389
raise UnicodeEncodeError(encoding, val, start, end, reason)
390
elif what == "ReadOnlyError":
391
raise errors.TransportNotPossible('readonly transport')
392
elif what == "ReadError":
393
if orig_path is not None:
394
error_path = orig_path
397
raise errors.ReadError(error_path)
399
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
429
def _ensure_ok(self, resp):
431
raise errors.UnexpectedSmartServerResponse(resp)
433
def _translate_error(self, err, relpath=None):
434
remote._translate_error(err, path=relpath)
401
436
def disconnect(self):
402
self._medium.disconnect()
404
def delete_tree(self, relpath):
405
raise errors.TransportNotPossible('readonly transport')
437
m = self.get_smart_medium()
407
441
def stat(self, relpath):
408
442
resp = self._call2('stat', self._remote_path(relpath))
409
443
if resp[0] == 'stat':
410
444
return _SmartStat(int(resp[1]), int(resp[2], 8))
412
self._translate_error(resp)
445
raise errors.UnexpectedSmartServerResponse(resp)
414
447
## def lock_read(self, relpath):
415
448
## """Lock the given file for shared (read) access.
431
464
resp = self._call2('list_dir', self._remote_path(relpath))
432
465
if resp[0] == 'names':
433
466
return [name.encode('ascii') for name in resp[1:]]
435
self._translate_error(resp)
467
raise errors.UnexpectedSmartServerResponse(resp)
437
469
def iter_files_recursive(self):
438
470
resp = self._call2('iter_files_recursive', self._remote_path(''))
439
471
if resp[0] == 'names':
442
self._translate_error(resp)
473
raise errors.UnexpectedSmartServerResponse(resp)
445
476
class RemoteTCPTransport(RemoteTransport):
446
477
"""Connection to smart server over plain tcp.
448
479
This is essentially just a factory to get 'RemoteTransport(url,
449
480
SmartTCPClientMedium).
452
def __init__(self, url):
453
_scheme, _username, _password, _host, _port, _path = \
454
transport.split_url(url)
456
_port = BZR_DEFAULT_PORT
460
except (ValueError, TypeError), e:
461
raise errors.InvalidURL(
462
path=url, extra="invalid port %s" % _port)
463
client_medium = medium.SmartTCPClientMedium(_host, _port)
464
super(RemoteTCPTransport, self).__init__(url, medium=client_medium)
483
def _build_medium(self):
484
client_medium = medium.SmartTCPClientMedium(
485
self._parsed_url.host, self._parsed_url.port, self.base)
486
return client_medium, None
489
class RemoteTCPTransportV2Only(RemoteTransport):
490
"""Connection to smart server over plain tcp with the client hard-coded to
491
assume protocol v2 and remote server version <= 1.6.
493
This should only be used for testing.
496
def _build_medium(self):
497
client_medium = medium.SmartTCPClientMedium(
498
self._parsed_url.host, self._parsed_url.port, self.base)
499
client_medium._protocol_version = 2
500
client_medium._remember_remote_is_before((1, 6))
501
return client_medium, None
467
504
class RemoteSSHTransport(RemoteTransport):
496
534
HTTP path into a local path.
499
def __init__(self, url, http_transport=None):
500
assert url.startswith('bzr+http://')
537
def __init__(self, base, _from_transport=None, http_transport=None):
502
538
if http_transport is None:
503
http_url = url[len('bzr+'):]
504
self._http_transport = transport.get_transport(http_url)
539
# FIXME: the password may be lost here because it appears in the
540
# url only for an intial construction (when the url came from the
542
http_url = base[len('bzr+'):]
543
self._http_transport = transport.get_transport_from_url(http_url)
506
545
self._http_transport = http_transport
507
http_medium = self._http_transport.get_smart_medium()
508
super(RemoteHTTPTransport, self).__init__(url, medium=http_medium)
546
super(RemoteHTTPTransport, self).__init__(
547
base, _from_transport=_from_transport)
549
def _build_medium(self):
550
# We let http_transport take care of the credentials
551
return self._http_transport.get_smart_medium(), None
510
553
def _remote_path(self, relpath):
511
"""After connecting HTTP Transport only deals in relative URLs."""
554
"""After connecting, HTTP Transport only deals in relative URLs."""
512
555
# Adjust the relpath based on which URL this smart transport is
514
base = urlutils.normalize_url(self._http_transport.base)
557
http_base = urlutils.normalize_url(self.get_smart_medium().base)
515
558
url = urlutils.join(self.base[len('bzr+'):], relpath)
516
559
url = urlutils.normalize_url(url)
517
return urlutils.relative_url(base, url)
519
def abspath(self, relpath):
520
"""Return the full url to the given relative path.
522
:param relpath: the relative path or path components
523
:type relpath: str or list
525
return self._unparse_url(self._combine_paths(self._path, relpath))
560
return urlutils.relative_url(http_base, url)
527
562
def clone(self, relative_url):
528
563
"""Make a new RemoteHTTPTransport related to me.
536
571
smart requests may be different). This is so that the server doesn't
537
572
have to handle .bzr/smart requests at arbitrary places inside .bzr
538
573
directories, just at the initial URL the user uses.
540
The exception is parent paths (i.e. relative_url of "..").
543
576
abs_url = self.abspath(relative_url)
545
578
abs_url = self.base
546
# We either use the exact same http_transport (for child locations), or
547
# a clone of the underlying http_transport (for parent locations). This
548
# means we share the connection.
549
norm_base = urlutils.normalize_url(self.base)
550
norm_abs_url = urlutils.normalize_url(abs_url)
551
normalized_rel_url = urlutils.relative_url(norm_base, norm_abs_url)
552
if normalized_rel_url == ".." or normalized_rel_url.startswith("../"):
553
http_transport = self._http_transport.clone(normalized_rel_url)
579
return RemoteHTTPTransport(abs_url,
580
_from_transport=self,
581
http_transport=self._http_transport)
583
def _redirected_to(self, source, target):
584
"""See transport._redirected_to"""
585
redirected = self._http_transport._redirected_to(source, target)
586
if (redirected is not None
587
and isinstance(redirected, type(self._http_transport))):
588
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
589
http_transport=redirected)
555
http_transport = self._http_transport
556
return RemoteHTTPTransport(abs_url, http_transport=http_transport)
591
# Either None or a transport for a different protocol
595
class HintingSSHTransport(transport.Transport):
596
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
598
def __init__(self, url):
599
raise errors.UnsupportedProtocol(url,
600
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
559
603
def get_test_permutations():
560
604
"""Return (transport, server) permutations for testing."""
561
605
### We may need a little more test framework support to construct an
562
606
### appropriate RemoteTransport in the future.
563
from bzrlib.smart import server
564
return [(RemoteTCPTransport, server.SmartTCPServer_for_testing)]
607
from bzrlib.tests import test_server
608
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]