85
82
one is being cloned from. Attributes such as the medium will
88
:param medium: The medium to use for this RemoteTransport. This must be
89
supplied if _from_transport is None.
85
:param medium: The medium to use for this RemoteTransport. If None,
86
the medium from the _from_transport is shared. If both this
87
and _from_transport are None, a new medium will be built.
88
_from_transport and medium cannot both be specified.
91
90
:param _client: Override the _SmartClient used by this transport. This
92
91
should only be used for testing purposes; normally this is
93
92
determined from the medium.
95
super(RemoteTransport, self).__init__(url,
96
_from_transport=_from_transport)
94
super(RemoteTransport, self).__init__(
95
url, _from_transport=_from_transport)
98
97
# The medium is the connection, except when we need to share it with
99
98
# other objects (RemoteBzrDir, RemoteRepository etc). In these cases
100
99
# what we want to share is really the shared connection.
102
if _from_transport is None:
101
if (_from_transport is not None
102
and isinstance(_from_transport, RemoteTransport)):
103
_client = _from_transport._client
104
elif _from_transport is None:
103
105
# If no _from_transport is specified, we need to intialize the
105
107
credentials = None
106
108
if medium is None:
107
109
medium, credentials = self._build_medium()
108
self._shared_connection= transport._SharedConnection(medium,
110
if 'hpss' in debug.debug_flags:
111
trace.mutter('hpss: Built a new medium: %s',
112
medium.__class__.__name__)
113
self._shared_connection = transport._SharedConnection(medium,
117
# No medium was specified, so share the medium from the
119
medium = self._shared_connection.connection
121
raise AssertionError(
122
"Both _from_transport (%r) and medium (%r) passed to "
123
"RemoteTransport.__init__, but these parameters are mutally "
124
"exclusive." % (_from_transport, medium))
111
126
if _client is None:
112
self._client = client._SmartClient(self.get_shared_medium())
127
self._client = client._SmartClient(medium)
114
129
self._client = _client
123
138
return None, None
140
def _report_activity(self, bytes, direction):
141
"""See Transport._report_activity.
143
Does nothing; the smart medium will report activity triggered by a
125
148
def is_readonly(self):
126
149
"""Smart server transport can do read/write file operations."""
127
resp = self._call2('Transport.is_readonly')
128
if resp == ('yes', ):
130
elif resp == ('no', ):
132
elif (resp == ('error', "Generic bzr smart protocol error: "
133
"bad request 'Transport.is_readonly'") or
134
resp == ('error', "Generic bzr smart protocol error: "
135
"bad request u'Transport.is_readonly'")):
151
resp = self._call2('Transport.is_readonly')
152
except errors.UnknownSmartMethod:
136
153
# XXX: nasty hack: servers before 0.16 don't have a
137
154
# 'Transport.is_readonly' verb, so we do what clients before 0.16
138
155
# did: assume False.
157
if resp == ('yes', ):
159
elif resp == ('no', ):
141
self._translate_error(resp)
142
raise errors.UnexpectedSmartServerResponse(resp)
162
raise errors.UnexpectedSmartServerResponse(resp)
144
164
def get_smart_client(self):
145
165
return self._get_connection()
147
167
def get_smart_medium(self):
148
168
return self._get_connection()
150
def get_shared_medium(self):
151
return self._get_shared_connection()
153
170
def _remote_path(self, relpath):
154
171
"""Returns the Unicode version of the absolute path for relpath."""
155
return self._combine_paths(self._path, relpath)
172
return urlutils.URL._combine_paths(self._parsed_url.path, relpath)
157
174
def _call(self, method, *args):
158
175
resp = self._call2(method, *args)
159
self._translate_error(resp)
176
self._ensure_ok(resp)
161
178
def _call2(self, method, *args):
162
179
"""Call a method on the remote server."""
163
return self._client.call(method, *args)
181
return self._client.call(method, *args)
182
except errors.ErrorFromSmartServer, err:
183
# The first argument, if present, is always a path.
185
context = {'relpath': args[0]}
188
self._translate_error(err, **context)
165
190
def _call_with_body_bytes(self, method, args, body):
166
191
"""Call a method on the remote server with body bytes."""
167
return self._client.call_with_body_bytes(method, args, body)
193
return self._client.call_with_body_bytes(method, args, body)
194
except errors.ErrorFromSmartServer, err:
195
# The first argument, if present, is always a path.
197
context = {'relpath': args[0]}
200
self._translate_error(err, **context)
169
202
def has(self, relpath):
170
203
"""Indicate whether a remote file of the given name exists or not.
177
210
elif resp == ('no', ):
180
self._translate_error(resp)
213
raise errors.UnexpectedSmartServerResponse(resp)
182
215
def get(self, relpath):
183
216
"""Return file-like object reading the contents of a remote file.
185
218
:see: Transport.get_bytes()/get_file()
187
220
return StringIO(self.get_bytes(relpath))
189
222
def get_bytes(self, relpath):
190
223
remote = self._remote_path(relpath)
191
request = self.get_smart_medium().get_request()
192
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
193
smart_protocol.call('get', remote)
194
resp = smart_protocol.read_response_tuple(True)
225
resp, response_handler = self._client.call_expecting_body('get', remote)
226
except errors.ErrorFromSmartServer, err:
227
self._translate_error(err, relpath)
195
228
if resp != ('ok', ):
196
smart_protocol.cancel_read_body()
197
self._translate_error(resp, relpath)
198
return smart_protocol.read_body_bytes()
229
response_handler.cancel_read_body()
230
raise errors.UnexpectedSmartServerResponse(resp)
231
return response_handler.read_body_bytes()
200
233
def _serialise_optional_mode(self, mode):
206
239
def mkdir(self, relpath, mode=None):
207
240
resp = self._call2('mkdir', self._remote_path(relpath),
208
241
self._serialise_optional_mode(mode))
209
self._translate_error(resp)
211
def put_bytes(self, relpath, upload_contents, mode=None):
212
# FIXME: upload_file is probably not safe for non-ascii characters -
213
# should probably just pass all parameters as length-delimited
215
if type(upload_contents) is unicode:
216
# Although not strictly correct, we raise UnicodeEncodeError to be
217
# compatible with other transports.
218
raise UnicodeEncodeError(
219
'undefined', upload_contents, 0, 1,
220
'put_bytes must be given bytes, not unicode.')
221
resp = self._call_with_body_bytes('put',
243
def open_write_stream(self, relpath, mode=None):
244
"""See Transport.open_write_stream."""
245
self.put_bytes(relpath, "", mode)
246
result = transport.AppendBasedFileStream(self, relpath)
247
transport._file_streams[self.abspath(relpath)] = result
250
def put_bytes(self, relpath, raw_bytes, mode=None):
251
if not isinstance(raw_bytes, str):
253
'raw_bytes must be a plain string, not %s' % type(raw_bytes))
254
resp = self._call_with_body_bytes(
222
256
(self._remote_path(relpath), self._serialise_optional_mode(mode)),
224
self._translate_error(resp)
258
self._ensure_ok(resp)
259
return len(raw_bytes)
226
def put_bytes_non_atomic(self, relpath, bytes, mode=None,
261
def put_bytes_non_atomic(self, relpath, raw_bytes, mode=None,
227
262
create_parent_dir=False,
229
264
"""See Transport.put_bytes_non_atomic."""
268
303
if resp[0] == 'appended':
269
304
return int(resp[1])
270
self._translate_error(resp)
305
raise errors.UnexpectedSmartServerResponse(resp)
272
307
def delete(self, relpath):
273
308
resp = self._call2('delete', self._remote_path(relpath))
274
self._translate_error(resp)
309
self._ensure_ok(resp)
276
311
def external_url(self):
277
312
"""See bzrlib.transport.Transport.external_url."""
278
313
# the external path for RemoteTransports is the base
281
def readv(self, relpath, offsets):
316
def recommended_page_size(self):
317
"""Return the recommended page size for this transport."""
320
def _readv(self, relpath, offsets):
285
324
offsets = list(offsets)
287
326
sorted_offsets = sorted(offsets)
288
# turn the list of offsets into a stack
289
offset_stack = iter(offsets)
290
cur_offset_and_size = offset_stack.next()
291
327
coalesced = list(self._coalesce_offsets(sorted_offsets,
292
328
limit=self._max_readv_combine,
293
fudge_factor=self._bytes_to_read_before_seek))
295
request = self.get_smart_medium().get_request()
296
smart_protocol = protocol.SmartClientRequestProtocolOne(request)
297
smart_protocol.call_with_body_readv_array(
298
('readv', self._remote_path(relpath)),
299
[(c.start, c.length) for c in coalesced])
300
resp = smart_protocol.read_response_tuple(True)
302
if resp[0] != 'readv':
303
# This should raise an exception
304
smart_protocol.cancel_read_body()
305
self._translate_error(resp)
308
# FIXME: this should know how many bytes are needed, for clarity.
309
data = smart_protocol.read_body_bytes()
329
fudge_factor=self._bytes_to_read_before_seek,
330
max_size=self._max_readv_bytes))
332
# now that we've coallesced things, avoid making enormous requests
337
if c.length + cur_len > self._max_readv_bytes:
338
requests.append(cur_request)
342
cur_request.append(c)
345
requests.append(cur_request)
346
if 'hpss' in debug.debug_flags:
347
trace.mutter('%s.readv %s offsets => %s coalesced'
348
' => %s requests (%s)',
349
self.__class__.__name__, len(offsets), len(coalesced),
350
len(requests), sum(map(len, requests)))
310
351
# Cache the results, but only until they have been fulfilled
353
# turn the list of offsets into a single stack to iterate
354
offset_stack = iter(offsets)
355
# using a list so it can be modified when passing down and coming back
356
next_offset = [offset_stack.next()]
357
for cur_request in requests:
359
result = self._client.call_with_body_readv_array(
360
('readv', self._remote_path(relpath),),
361
[(c.start, c.length) for c in cur_request])
362
resp, response_handler = result
363
except errors.ErrorFromSmartServer, err:
364
self._translate_error(err, relpath)
366
if resp[0] != 'readv':
367
# This should raise an exception
368
response_handler.cancel_read_body()
369
raise errors.UnexpectedSmartServerResponse(resp)
371
for res in self._handle_response(offset_stack, cur_request,
377
def _handle_response(self, offset_stack, coalesced, response_handler,
378
data_map, next_offset):
379
cur_offset_and_size = next_offset[0]
380
# FIXME: this should know how many bytes are needed, for clarity.
381
data = response_handler.read_body_bytes()
312
383
for c_offset in coalesced:
313
384
if len(data) < c_offset.length:
314
385
raise errors.ShortReadvError(relpath, c_offset.start,
315
386
c_offset.length, actual=len(data))
316
387
for suboffset, subsize in c_offset.ranges:
317
388
key = (c_offset.start+suboffset, subsize)
318
data_map[key] = data[suboffset:suboffset+subsize]
319
data = data[c_offset.length:]
389
this_data = data[data_offset+suboffset:
390
data_offset+suboffset+subsize]
391
# Special case when the data is in-order, rather than packing
392
# into a map and then back out again. Benchmarking shows that
393
# this has 100% hit rate, but leave in the data_map work just
395
# TODO: Could we get away with using buffer() to avoid the
396
# memory copy? Callers would need to realize they may
397
# not have a real string.
398
if key == cur_offset_and_size:
399
yield cur_offset_and_size[0], this_data
400
cur_offset_and_size = next_offset[0] = offset_stack.next()
402
data_map[key] = this_data
403
data_offset += c_offset.length
321
405
# Now that we've read some data, see if we can yield anything back
322
406
while cur_offset_and_size in data_map:
323
407
this_data = data_map.pop(cur_offset_and_size)
324
408
yield cur_offset_and_size[0], this_data
325
cur_offset_and_size = offset_stack.next()
409
cur_offset_and_size = next_offset[0] = offset_stack.next()
327
411
def rename(self, rel_from, rel_to):
328
412
self._call('rename',
337
421
def rmdir(self, relpath):
338
422
resp = self._call('rmdir', self._remote_path(relpath))
340
def _translate_error(self, resp, orig_path=None):
341
"""Raise an exception from a response"""
348
elif what == 'NoSuchFile':
349
if orig_path is not None:
350
error_path = orig_path
353
raise errors.NoSuchFile(error_path)
354
elif what == 'error':
355
raise errors.SmartProtocolError(unicode(resp[1]))
356
elif what == 'FileExists':
357
raise errors.FileExists(resp[1])
358
elif what == 'DirectoryNotEmpty':
359
raise errors.DirectoryNotEmpty(resp[1])
360
elif what == 'ShortReadvError':
361
raise errors.ShortReadvError(resp[1], int(resp[2]),
362
int(resp[3]), int(resp[4]))
363
elif what in ('UnicodeEncodeError', 'UnicodeDecodeError'):
364
encoding = str(resp[1]) # encoding must always be a string
368
reason = str(resp[5]) # reason must always be a string
369
if val.startswith('u:'):
370
val = val[2:].decode('utf-8')
371
elif val.startswith('s:'):
372
val = val[2:].decode('base64')
373
if what == 'UnicodeDecodeError':
374
raise UnicodeDecodeError(encoding, val, start, end, reason)
375
elif what == 'UnicodeEncodeError':
376
raise UnicodeEncodeError(encoding, val, start, end, reason)
377
elif what == "ReadOnlyError":
378
raise errors.TransportNotPossible('readonly transport')
379
elif what == "ReadError":
380
if orig_path is not None:
381
error_path = orig_path
384
raise errors.ReadError(error_path)
386
raise errors.SmartProtocolError('unexpected smart server error: %r' % (resp,))
424
def _ensure_ok(self, resp):
426
raise errors.UnexpectedSmartServerResponse(resp)
428
def _translate_error(self, err, relpath=None):
429
remote._translate_error(err, path=relpath)
388
431
def disconnect(self):
389
self.get_smart_medium().disconnect()
391
def delete_tree(self, relpath):
392
raise errors.TransportNotPossible('readonly transport')
432
m = self.get_smart_medium()
394
436
def stat(self, relpath):
395
437
resp = self._call2('stat', self._remote_path(relpath))
396
438
if resp[0] == 'stat':
397
439
return _SmartStat(int(resp[1]), int(resp[2], 8))
399
self._translate_error(resp)
440
raise errors.UnexpectedSmartServerResponse(resp)
401
442
## def lock_read(self, relpath):
402
443
## """Lock the given file for shared (read) access.
418
459
resp = self._call2('list_dir', self._remote_path(relpath))
419
460
if resp[0] == 'names':
420
461
return [name.encode('ascii') for name in resp[1:]]
422
self._translate_error(resp)
462
raise errors.UnexpectedSmartServerResponse(resp)
424
464
def iter_files_recursive(self):
425
465
resp = self._call2('iter_files_recursive', self._remote_path(''))
426
466
if resp[0] == 'names':
429
self._translate_error(resp)
468
raise errors.UnexpectedSmartServerResponse(resp)
432
471
class RemoteTCPTransport(RemoteTransport):
433
472
"""Connection to smart server over plain tcp.
435
474
This is essentially just a factory to get 'RemoteTransport(url,
436
475
SmartTCPClientMedium).
439
478
def _build_medium(self):
440
assert self.base.startswith('bzr://')
441
if self._port is None:
442
self._port = BZR_DEFAULT_PORT
443
return medium.SmartTCPClientMedium(self._host, self._port), None
479
client_medium = medium.SmartTCPClientMedium(
480
self._parsed_url.host, self._parsed_url.port, self.base)
481
return client_medium, None
484
class RemoteTCPTransportV2Only(RemoteTransport):
485
"""Connection to smart server over plain tcp with the client hard-coded to
486
assume protocol v2 and remote server version <= 1.6.
488
This should only be used for testing.
491
def _build_medium(self):
492
client_medium = medium.SmartTCPClientMedium(
493
self._parsed_url.host, self._parsed_url.port, self.base)
494
client_medium._protocol_version = 2
495
client_medium._remember_remote_is_before((1, 6))
496
return client_medium, None
446
499
class RemoteSSHTransport(RemoteTransport):
453
506
def _build_medium(self):
454
assert self.base.startswith('bzr+ssh://')
455
# ssh will prompt the user for a password if needed and if none is
456
# provided but it will not give it back, so no credentials can be
458
return medium.SmartSSHClientMedium(self._host, self._port,
459
self._user, self._password), None
507
location_config = config.LocationConfig(self.base)
508
bzr_remote_path = location_config.get_bzr_remote_path()
509
user = self._parsed_url.user
511
auth = config.AuthenticationConfig()
512
user = auth.get_user('ssh', self._parsed_url.host,
513
self._parsed_url.port)
514
ssh_params = medium.SSHParams(self._parsed_url.host,
515
self._parsed_url.port, user, self._parsed_url.password,
517
client_medium = medium.SmartSSHClientMedium(self.base, ssh_params)
518
return client_medium, (user, self._parsed_url.password)
462
521
class RemoteHTTPTransport(RemoteTransport):
463
522
"""Just a way to connect between a bzr+http:// url and http://.
465
524
This connection operates slightly differently than the RemoteSSHTransport.
466
525
It uses a plain http:// transport underneath, which defines what remote
467
526
.bzr/smart URL we are connected to. From there, all paths that are sent are
509
566
smart requests may be different). This is so that the server doesn't
510
567
have to handle .bzr/smart requests at arbitrary places inside .bzr
511
568
directories, just at the initial URL the user uses.
513
The exception is parent paths (i.e. relative_url of "..").
516
571
abs_url = self.abspath(relative_url)
518
573
abs_url = self.base
519
# We either use the exact same http_transport (for child locations), or
520
# a clone of the underlying http_transport (for parent locations). This
521
# means we share the connection.
522
norm_base = urlutils.normalize_url(self.base)
523
norm_abs_url = urlutils.normalize_url(abs_url)
524
normalized_rel_url = urlutils.relative_url(norm_base, norm_abs_url)
525
if normalized_rel_url == ".." or normalized_rel_url.startswith("../"):
526
http_transport = self._http_transport.clone(normalized_rel_url)
528
http_transport = self._http_transport
529
574
return RemoteHTTPTransport(abs_url,
530
575
_from_transport=self,
531
http_transport=http_transport)
576
http_transport=self._http_transport)
578
def _redirected_to(self, source, target):
579
"""See transport._redirected_to"""
580
redirected = self._http_transport._redirected_to(source, target)
581
if (redirected is not None
582
and isinstance(redirected, type(self._http_transport))):
583
return RemoteHTTPTransport('bzr+' + redirected.external_url(),
584
http_transport=redirected)
586
# Either None or a transport for a different protocol
590
class HintingSSHTransport(transport.Transport):
591
"""Simple transport that handles ssh:// and points out bzr+ssh://."""
593
def __init__(self, url):
594
raise errors.UnsupportedProtocol(url,
595
'bzr supports bzr+ssh to operate over ssh, use "bzr+%s".' % url)
534
598
def get_test_permutations():
535
599
"""Return (transport, server) permutations for testing."""
536
600
### We may need a little more test framework support to construct an
537
601
### appropriate RemoteTransport in the future.
538
from bzrlib.smart import server
539
return [(RemoteTCPTransport, server.SmartTCPServer_for_testing)]
602
from bzrlib.tests import test_server
603
return [(RemoteTCPTransport, test_server.SmartTCPServer_for_testing)]