363
370
def test_abspath(self):
364
371
# The abspath is always relative to the chroot_url.
365
372
server = ChrootServer(get_transport('memory:///foo/bar/'))
373
self.start_server(server)
367
374
transport = get_transport(server.get_url())
368
375
self.assertEqual(server.get_url(), transport.abspath('/'))
370
377
subdir_transport = transport.clone('subdir')
371
378
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
374
380
def test_clone(self):
375
381
server = ChrootServer(get_transport('memory:///foo/bar/'))
382
self.start_server(server)
377
383
transport = get_transport(server.get_url())
378
384
# relpath from root and root path are the same
379
385
relpath_cloned = transport.clone('foo')
380
386
abspath_cloned = transport.clone('/foo')
381
387
self.assertEqual(server, relpath_cloned.server)
382
388
self.assertEqual(server, abspath_cloned.server)
385
390
def test_chroot_url_preserves_chroot(self):
386
391
"""Calling get_transport on a chroot transport's base should produce a
441
447
backing_transport = MemoryTransport()
442
448
server = ChrootServer(backing_transport)
444
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
451
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
456
class PathFilteringDecoratorTransportTest(TestCase):
457
"""Pathfilter decoration specific tests."""
459
def test_abspath(self):
460
# The abspath is always relative to the base of the backing transport.
461
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
464
transport = get_transport(server.get_url())
465
self.assertEqual(server.get_url(), transport.abspath('/'))
467
subdir_transport = transport.clone('subdir')
468
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
445
469
server.tearDown()
471
def make_pf_transport(self, filter_func=None):
472
"""Make a PathFilteringTransport backed by a MemoryTransport.
474
:param filter_func: by default this will be a no-op function. Use this
475
parameter to override it."""
476
if filter_func is None:
477
filter_func = lambda x: x
478
server = PathFilteringServer(
479
get_transport('memory:///foo/bar/'), filter_func)
481
self.addCleanup(server.tearDown)
482
return get_transport(server.get_url())
484
def test__filter(self):
485
# _filter (with an identity func as filter_func) always returns
486
# paths relative to the base of the backing transport.
487
transport = self.make_pf_transport()
488
self.assertEqual('foo', transport._filter('foo'))
489
self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
self.assertEqual('', transport._filter('..'))
491
self.assertEqual('', transport._filter('/'))
492
# The base of the pathfiltering transport is taken into account too.
493
transport = transport.clone('subdir1/subdir2')
494
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
496
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
self.assertEqual('subdir1', transport._filter('..'))
498
self.assertEqual('', transport._filter('/'))
500
def test_filter_invocation(self):
503
filter_log.append(path)
505
transport = self.make_pf_transport(filter)
507
self.assertEqual(['abc'], filter_log)
509
transport.clone('abc').has('xyz')
510
self.assertEqual(['abc/xyz'], filter_log)
512
transport.has('/abc')
513
self.assertEqual(['abc'], filter_log)
515
def test_clone(self):
516
transport = self.make_pf_transport()
517
# relpath from root and root path are the same
518
relpath_cloned = transport.clone('foo')
519
abspath_cloned = transport.clone('/foo')
520
self.assertEqual(transport.server, relpath_cloned.server)
521
self.assertEqual(transport.server, abspath_cloned.server)
523
def test_url_preserves_pathfiltering(self):
524
"""Calling get_transport on a pathfiltered transport's base should
525
produce a transport with exactly the same behaviour as the original
526
pathfiltered transport.
528
This is so that it is not possible to escape (accidentally or
529
otherwise) the filtering by doing::
530
url = filtered_transport.base
531
parent_url = urlutils.join(url, '..')
532
new_transport = get_transport(parent_url)
534
transport = self.make_pf_transport()
535
new_transport = get_transport(transport.base)
536
self.assertEqual(transport.server, new_transport.server)
537
self.assertEqual(transport.base, new_transport.base)
448
540
class ReadonlyDecoratorTransportTest(TestCase):
449
541
"""Readonly decoration specific tests."""
460
552
import bzrlib.transport.readonly as readonly
461
553
# connect to '.' via http which is not listable
462
554
server = HttpServer()
465
transport = get_transport('readonly+' + server.get_url())
466
self.failUnless(isinstance(transport,
467
readonly.ReadonlyTransportDecorator))
468
self.assertEqual(False, transport.listable())
469
self.assertEqual(True, transport.is_readonly())
555
self.start_server(server)
556
transport = get_transport('readonly+' + server.get_url())
557
self.failUnless(isinstance(transport,
558
readonly.ReadonlyTransportDecorator))
559
self.assertEqual(False, transport.listable())
560
self.assertEqual(True, transport.is_readonly())
474
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
492
581
from bzrlib.tests.http_server import HttpServer
493
582
# connect to '.' via http which is not listable
494
583
server = HttpServer()
497
transport = self.get_nfs_transport(server.get_url())
498
self.assertIsInstance(
499
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
self.assertEqual(False, transport.listable())
501
self.assertEqual(True, transport.is_readonly())
584
self.start_server(server)
585
transport = self.get_nfs_transport(server.get_url())
586
self.assertIsInstance(
587
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
588
self.assertEqual(False, transport.listable())
589
self.assertEqual(True, transport.is_readonly())
505
591
def test_fakenfs_server_default(self):
506
592
# a FakeNFSServer() should bring up a local relpath server for itself
507
593
import bzrlib.transport.fakenfs as fakenfs
508
594
server = fakenfs.FakeNFSServer()
511
# the url should be decorated appropriately
512
self.assertStartsWith(server.get_url(), 'fakenfs+')
513
# and we should be able to get a transport for it
514
transport = get_transport(server.get_url())
515
# which must be a FakeNFSTransportDecorator instance.
516
self.assertIsInstance(
517
transport, fakenfs.FakeNFSTransportDecorator)
595
self.start_server(server)
596
# the url should be decorated appropriately
597
self.assertStartsWith(server.get_url(), 'fakenfs+')
598
# and we should be able to get a transport for it
599
transport = get_transport(server.get_url())
600
# which must be a FakeNFSTransportDecorator instance.
601
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
521
603
def test_fakenfs_rename_semantics(self):
522
604
# a FakeNFS transport must mangle the way rename errors occur to
800
881
# readv records the supplied offset request
801
882
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
883
self.assertEqual(expected_result, transport._activity)
886
class TestSSHConnections(tests.TestCaseWithTransport):
888
def test_bzr_connect_to_bzr_ssh(self):
889
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
891
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
893
# This test actually causes a bzr instance to be invoked, which is very
894
# expensive: it should be the only such test in the test suite.
895
# A reasonable evolution for this would be to simply check inside
896
# check_channel_exec_request that the command is appropriate, and then
897
# satisfy requests in-process.
898
self.requireFeature(ParamikoFeature)
899
# SFTPFullAbsoluteServer has a get_url method, and doesn't
900
# override the interface (doesn't change self._vendor).
901
# Note that this does encryption, so can be slow.
902
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
from bzrlib.tests.stub_sftp import StubServer
905
# Start an SSH server
906
self.command_executed = []
907
# XXX: This is horrible -- we define a really dumb SSH server that
908
# executes commands, and manage the hooking up of stdin/out/err to the
909
# SSH channel ourselves. Surely this has already been implemented
911
class StubSSHServer(StubServer):
915
def check_channel_exec_request(self, channel, command):
916
self.test.command_executed.append(command)
917
proc = subprocess.Popen(
918
command, shell=True, stdin=subprocess.PIPE,
919
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
921
# XXX: horribly inefficient, not to mention ugly.
922
# Start a thread for each of stdin/out/err, and relay bytes from
923
# the subprocess to channel and vice versa.
924
def ferry_bytes(read, write, close):
933
(channel.recv, proc.stdin.write, proc.stdin.close),
934
(proc.stdout.read, channel.sendall, channel.close),
935
(proc.stderr.read, channel.sendall_stderr, channel.close)]
936
for read, write, close in file_functions:
937
t = threading.Thread(
938
target=ferry_bytes, args=(read, write, close))
943
ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
944
# We *don't* want to override the default SSH vendor: the detected one
946
self.start_server(ssh_server)
947
port = ssh_server._listener.port
949
if sys.platform == 'win32':
950
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
952
bzr_remote_path = self.get_bzr_path()
953
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
955
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
956
# variable is used to tell bzr what command to run on the remote end.
957
path_to_branch = osutils.abspath('.')
958
if sys.platform == 'win32':
959
# On Windows, we export all drives as '/C:/, etc. So we need to
960
# prefix a '/' to get the right path.
961
path_to_branch = '/' + path_to_branch
962
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
963
t = get_transport(url)
964
self.permit_url(t.base)
968
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
969
self.command_executed)