~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Gordon Tyler
  • Date: 2010-02-02 06:30:43 UTC
  • mto: (5037.3.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5046.
  • Revision ID: gordon@doxxx.net-20100202063043-3ygr1114d25m3f7m
Added cmdline.split function, which replaces commands.shlex_split_unicode.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
 
import bzrlib
21
24
from bzrlib import (
22
25
    errors,
23
26
    osutils,
 
27
    tests,
 
28
    transport as _mod_transport,
24
29
    urlutils,
25
30
    )
 
31
from bzrlib.transport import (
 
32
    fakenfs,
 
33
    memory,
 
34
    readonly,
 
35
    )
26
36
from bzrlib.errors import (DependencyNotPresent,
27
37
                           FileExists,
28
38
                           InvalidURLJoin,
31
41
                           ReadError,
32
42
                           UnsupportedProtocol,
33
43
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
35
45
from bzrlib.transport import (_clear_protocol_handlers,
36
46
                              _CoalescedOffset,
37
47
                              ConnectedTransport,
45
55
                              Transport,
46
56
                              )
47
57
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
58
from bzrlib.transport.local import (LocalTransport,
50
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
51
61
 
52
62
 
53
63
# TODO: Should possibly split transport-specific tests into their own files.
80
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
91
                                    'TestTransport.SampleHandler')
82
92
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
93
                              'bzrlib.transport.chroot',
 
94
                              'bzrlib.transport.pathfilter'],
84
95
                             _get_transport_modules())
85
96
        finally:
86
97
            _set_protocol_handlers(handlers)
159
170
 
160
171
    def test_local_abspath_non_local_transport(self):
161
172
        # the base implementation should throw
162
 
        t = MemoryTransport()
 
173
        t = memory.MemoryTransport()
163
174
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
175
        self.assertEqual('memory:///t is not a local path.', str(e))
165
176
 
249
260
                   max_size=1*1024*1024*1024)
250
261
 
251
262
 
 
263
class TestMemoryServer(TestCase):
 
264
 
 
265
    def test_create_server(self):
 
266
        server = memory.MemoryServer()
 
267
        server.start_server()
 
268
        url = server.get_url()
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
 
271
        del t
 
272
        server.stop_server()
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
 
274
        self.assertRaises(errors.UnsupportedProtocol,
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
252
278
class TestMemoryTransport(TestCase):
253
279
 
254
280
    def test_get_transport(self):
255
 
        MemoryTransport()
 
281
        memory.MemoryTransport()
256
282
 
257
283
    def test_clone(self):
258
 
        transport = MemoryTransport()
259
 
        self.assertTrue(isinstance(transport, MemoryTransport))
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
260
286
        self.assertEqual("memory:///", transport.clone("/").base)
261
287
 
262
288
    def test_abspath(self):
263
 
        transport = MemoryTransport()
 
289
        transport = memory.MemoryTransport()
264
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
265
291
 
266
292
    def test_abspath_of_root(self):
267
 
        transport = MemoryTransport()
 
293
        transport = memory.MemoryTransport()
268
294
        self.assertEqual("memory:///", transport.base)
269
295
        self.assertEqual("memory:///", transport.abspath('/'))
270
296
 
271
297
    def test_abspath_of_relpath_starting_at_root(self):
272
 
        transport = MemoryTransport()
 
298
        transport = memory.MemoryTransport()
273
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
274
300
 
275
301
    def test_append_and_get(self):
276
 
        transport = MemoryTransport()
 
302
        transport = memory.MemoryTransport()
277
303
        transport.append_bytes('path', 'content')
278
304
        self.assertEqual(transport.get('path').read(), 'content')
279
305
        transport.append_file('path', StringIO('content'))
280
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
281
307
 
282
308
    def test_put_and_get(self):
283
 
        transport = MemoryTransport()
 
309
        transport = memory.MemoryTransport()
284
310
        transport.put_file('path', StringIO('content'))
285
311
        self.assertEqual(transport.get('path').read(), 'content')
286
312
        transport.put_bytes('path', 'content')
287
313
        self.assertEqual(transport.get('path').read(), 'content')
288
314
 
289
315
    def test_append_without_dir_fails(self):
290
 
        transport = MemoryTransport()
 
316
        transport = memory.MemoryTransport()
291
317
        self.assertRaises(NoSuchFile,
292
318
                          transport.append_bytes, 'dir/path', 'content')
293
319
 
294
320
    def test_put_without_dir_fails(self):
295
 
        transport = MemoryTransport()
 
321
        transport = memory.MemoryTransport()
296
322
        self.assertRaises(NoSuchFile,
297
323
                          transport.put_file, 'dir/path', StringIO('content'))
298
324
 
299
325
    def test_get_missing(self):
300
 
        transport = MemoryTransport()
 
326
        transport = memory.MemoryTransport()
301
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
302
328
 
303
329
    def test_has_missing(self):
304
 
        transport = MemoryTransport()
 
330
        transport = memory.MemoryTransport()
305
331
        self.assertEquals(False, transport.has('foo'))
306
332
 
307
333
    def test_has_present(self):
308
 
        transport = MemoryTransport()
 
334
        transport = memory.MemoryTransport()
309
335
        transport.append_bytes('foo', 'content')
310
336
        self.assertEquals(True, transport.has('foo'))
311
337
 
312
338
    def test_list_dir(self):
313
 
        transport = MemoryTransport()
 
339
        transport = memory.MemoryTransport()
314
340
        transport.put_bytes('foo', 'content')
315
341
        transport.mkdir('dir')
316
342
        transport.put_bytes('dir/subfoo', 'content')
320
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
321
347
 
322
348
    def test_mkdir(self):
323
 
        transport = MemoryTransport()
 
349
        transport = memory.MemoryTransport()
324
350
        transport.mkdir('dir')
325
351
        transport.append_bytes('dir/path', 'content')
326
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
327
353
 
328
354
    def test_mkdir_missing_parent(self):
329
 
        transport = MemoryTransport()
 
355
        transport = memory.MemoryTransport()
330
356
        self.assertRaises(NoSuchFile,
331
357
                          transport.mkdir, 'dir/dir')
332
358
 
333
359
    def test_mkdir_twice(self):
334
 
        transport = MemoryTransport()
 
360
        transport = memory.MemoryTransport()
335
361
        transport.mkdir('dir')
336
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
337
363
 
338
364
    def test_parameters(self):
339
 
        transport = MemoryTransport()
 
365
        transport = memory.MemoryTransport()
340
366
        self.assertEqual(True, transport.listable())
341
367
        self.assertEqual(False, transport.is_readonly())
342
368
 
343
369
    def test_iter_files_recursive(self):
344
 
        transport = MemoryTransport()
 
370
        transport = memory.MemoryTransport()
345
371
        transport.mkdir('dir')
346
372
        transport.put_bytes('dir/foo', 'content')
347
373
        transport.put_bytes('dir/bar', 'content')
350
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
351
377
 
352
378
    def test_stat(self):
353
 
        transport = MemoryTransport()
 
379
        transport = memory.MemoryTransport()
354
380
        transport.put_bytes('foo', 'content')
355
381
        transport.put_bytes('bar', 'phowar')
356
382
        self.assertEqual(7, transport.stat('foo').st_size)
363
389
    def test_abspath(self):
364
390
        # The abspath is always relative to the chroot_url.
365
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
 
392
        self.start_server(server)
367
393
        transport = get_transport(server.get_url())
368
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
395
 
370
396
        subdir_transport = transport.clone('subdir')
371
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
373
398
 
374
399
    def test_clone(self):
375
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
 
401
        self.start_server(server)
377
402
        transport = get_transport(server.get_url())
378
403
        # relpath from root and root path are the same
379
404
        relpath_cloned = transport.clone('foo')
380
405
        abspath_cloned = transport.clone('/foo')
381
406
        self.assertEqual(server, relpath_cloned.server)
382
407
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
408
 
385
409
    def test_chroot_url_preserves_chroot(self):
386
410
        """Calling get_transport on a chroot transport's base should produce a
393
417
            new_transport = get_transport(parent_url)
394
418
        """
395
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
 
420
        self.start_server(server)
397
421
        transport = get_transport(server.get_url())
398
422
        new_transport = get_transport(transport.base)
399
423
        self.assertEqual(transport.server, new_transport.server)
400
424
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
402
425
 
403
426
    def test_urljoin_preserves_chroot(self):
404
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
410
433
            new_transport = get_transport(parent_url)
411
434
        """
412
435
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
 
436
        self.start_server(server)
414
437
        transport = get_transport(server.get_url())
415
438
        self.assertRaises(
416
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
440
 
419
441
 
420
442
class ChrootServerTest(TestCase):
421
443
 
422
444
    def test_construct(self):
423
 
        backing_transport = MemoryTransport()
 
445
        backing_transport = memory.MemoryTransport()
424
446
        server = ChrootServer(backing_transport)
425
447
        self.assertEqual(backing_transport, server.backing_transport)
426
448
 
427
449
    def test_setUp(self):
428
 
        backing_transport = MemoryTransport()
 
450
        backing_transport = memory.MemoryTransport()
429
451
        server = ChrootServer(backing_transport)
430
 
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
452
        server.start_server()
 
453
        try:
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
455
        finally:
 
456
            server.stop_server()
432
457
 
433
 
    def test_tearDown(self):
434
 
        backing_transport = MemoryTransport()
 
458
    def test_stop_server(self):
 
459
        backing_transport = memory.MemoryTransport()
435
460
        server = ChrootServer(backing_transport)
436
 
        server.setUp()
437
 
        server.tearDown()
 
461
        server.start_server()
 
462
        server.stop_server()
438
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
439
464
 
440
465
    def test_get_url(self):
441
 
        backing_transport = MemoryTransport()
 
466
        backing_transport = memory.MemoryTransport()
442
467
        server = ChrootServer(backing_transport)
443
 
        server.setUp()
444
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        server.tearDown()
 
468
        server.start_server()
 
469
        try:
 
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
471
        finally:
 
472
            server.stop_server()
 
473
 
 
474
 
 
475
class PathFilteringDecoratorTransportTest(TestCase):
 
476
    """Pathfilter decoration specific tests."""
 
477
 
 
478
    def test_abspath(self):
 
479
        # The abspath is always relative to the base of the backing transport.
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
481
            lambda x: x)
 
482
        server.start_server()
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
485
 
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
488
        server.stop_server()
 
489
 
 
490
    def make_pf_transport(self, filter_func=None):
 
491
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
492
        
 
493
        :param filter_func: by default this will be a no-op function.  Use this
 
494
            parameter to override it."""
 
495
        if filter_func is None:
 
496
            filter_func = lambda x: x
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
 
499
        server.start_server()
 
500
        self.addCleanup(server.stop_server)
 
501
        return get_transport(server.get_url())
 
502
 
 
503
    def test__filter(self):
 
504
        # _filter (with an identity func as filter_func) always returns
 
505
        # paths relative to the base of the backing transport.
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
 
511
        # The base of the pathfiltering transport is taken into account too.
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
 
518
 
 
519
    def test_filter_invocation(self):
 
520
        filter_log = []
 
521
        def filter(path):
 
522
            filter_log.append(path)
 
523
            return path
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
 
526
        self.assertEqual(['abc'], filter_log)
 
527
        del filter_log[:]
 
528
        transport.clone('abc').has('xyz')
 
529
        self.assertEqual(['abc/xyz'], filter_log)
 
530
        del filter_log[:]
 
531
        transport.has('/abc')
 
532
        self.assertEqual(['abc'], filter_log)
 
533
 
 
534
    def test_clone(self):
 
535
        transport = self.make_pf_transport()
 
536
        # relpath from root and root path are the same
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
 
541
 
 
542
    def test_url_preserves_pathfiltering(self):
 
543
        """Calling get_transport on a pathfiltered transport's base should
 
544
        produce a transport with exactly the same behaviour as the original
 
545
        pathfiltered transport.
 
546
 
 
547
        This is so that it is not possible to escape (accidentally or
 
548
        otherwise) the filtering by doing::
 
549
            url = filtered_transport.base
 
550
            parent_url = urlutils.join(url, '..')
 
551
            new_transport = get_transport(parent_url)
 
552
        """
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
446
557
 
447
558
 
448
559
class ReadonlyDecoratorTransportTest(TestCase):
449
560
    """Readonly decoration specific tests."""
450
561
 
451
562
    def test_local_parameters(self):
452
 
        import bzrlib.transport.readonly as readonly
453
563
        # connect to . in readonly mode
454
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
565
        self.assertEqual(True, transport.listable())
457
567
 
458
568
    def test_http_parameters(self):
459
569
        from bzrlib.tests.http_server import HttpServer
460
 
        import bzrlib.transport.readonly as readonly
461
570
        # connect to '.' via http which is not listable
462
571
        server = HttpServer()
463
 
        server.setUp()
464
 
        try:
465
 
            transport = get_transport('readonly+' + server.get_url())
466
 
            self.failUnless(isinstance(transport,
467
 
                                       readonly.ReadonlyTransportDecorator))
468
 
            self.assertEqual(False, transport.listable())
469
 
            self.assertEqual(True, transport.is_readonly())
470
 
        finally:
471
 
            server.tearDown()
 
572
        self.start_server(server)
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
472
578
 
473
579
 
474
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
475
581
    """NFS decorator specific tests."""
476
582
 
477
583
    def get_nfs_transport(self, url):
478
 
        import bzrlib.transport.fakenfs as fakenfs
479
584
        # connect to url with nfs decoration
480
585
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
586
 
492
597
        from bzrlib.tests.http_server import HttpServer
493
598
        # connect to '.' via http which is not listable
494
599
        server = HttpServer()
495
 
        server.setUp()
496
 
        try:
497
 
            transport = self.get_nfs_transport(server.get_url())
498
 
            self.assertIsInstance(
499
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
 
            self.assertEqual(False, transport.listable())
501
 
            self.assertEqual(True, transport.is_readonly())
502
 
        finally:
503
 
            server.tearDown()
 
600
        self.start_server(server)
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
504
606
 
505
607
    def test_fakenfs_server_default(self):
506
608
        # a FakeNFSServer() should bring up a local relpath server for itself
507
 
        import bzrlib.transport.fakenfs as fakenfs
508
609
        server = fakenfs.FakeNFSServer()
509
 
        server.setUp()
510
 
        try:
511
 
            # the url should be decorated appropriately
512
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
513
 
            # and we should be able to get a transport for it
514
 
            transport = get_transport(server.get_url())
515
 
            # which must be a FakeNFSTransportDecorator instance.
516
 
            self.assertIsInstance(
517
 
                transport, fakenfs.FakeNFSTransportDecorator)
518
 
        finally:
519
 
            server.tearDown()
 
610
        self.start_server(server)
 
611
        # the url should be decorated appropriately
 
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
613
        # and we should be able to get a transport for it
 
614
        transport = get_transport(server.get_url())
 
615
        # which must be a FakeNFSTransportDecorator instance.
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
520
617
 
521
618
    def test_fakenfs_rename_semantics(self):
522
619
        # a FakeNFS transport must mangle the way rename errors occur to
587
684
    def setUp(self):
588
685
        super(TestTransportImplementation, self).setUp()
589
686
        self._server = self.transport_server()
590
 
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
 
687
        self.start_server(self._server)
592
688
 
593
689
    def get_transport(self, relpath=None):
594
690
        """Return a connected transport to the local directory.
800
896
        # readv records the supplied offset request
801
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
898
        self.assertEqual(expected_result, transport._activity)
 
899
 
 
900
 
 
901
class TestSSHConnections(tests.TestCaseWithTransport):
 
902
 
 
903
    def test_bzr_connect_to_bzr_ssh(self):
 
904
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
905
 
 
906
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
907
        """
 
908
        # This test actually causes a bzr instance to be invoked, which is very
 
909
        # expensive: it should be the only such test in the test suite.
 
910
        # A reasonable evolution for this would be to simply check inside
 
911
        # check_channel_exec_request that the command is appropriate, and then
 
912
        # satisfy requests in-process.
 
913
        self.requireFeature(features.paramiko)
 
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
915
        # override the interface (doesn't change self._vendor).
 
916
        # Note that this does encryption, so can be slow.
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
 
919
 
 
920
        # Start an SSH server
 
921
        self.command_executed = []
 
922
        # XXX: This is horrible -- we define a really dumb SSH server that
 
923
        # executes commands, and manage the hooking up of stdin/out/err to the
 
924
        # SSH channel ourselves.  Surely this has already been implemented
 
925
        # elsewhere?
 
926
        started = []
 
927
        class StubSSHServer(StubServer):
 
928
 
 
929
            test = self
 
930
 
 
931
            def check_channel_exec_request(self, channel, command):
 
932
                self.test.command_executed.append(command)
 
933
                proc = subprocess.Popen(
 
934
                    command, shell=True, stdin=subprocess.PIPE,
 
935
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
936
 
 
937
                # XXX: horribly inefficient, not to mention ugly.
 
938
                # Start a thread for each of stdin/out/err, and relay bytes from
 
939
                # the subprocess to channel and vice versa.
 
940
                def ferry_bytes(read, write, close):
 
941
                    while True:
 
942
                        bytes = read(1)
 
943
                        if bytes == '':
 
944
                            close()
 
945
                            break
 
946
                        write(bytes)
 
947
 
 
948
                file_functions = [
 
949
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
950
                    (proc.stdout.read, channel.sendall, channel.close),
 
951
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
952
                started.append(proc)
 
953
                for read, write, close in file_functions:
 
954
                    t = threading.Thread(
 
955
                        target=ferry_bytes, args=(read, write, close))
 
956
                    t.start()
 
957
                    started.append(t)
 
958
 
 
959
                return True
 
960
 
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
962
        # We *don't* want to override the default SSH vendor: the detected one
 
963
        # is the one to use.
 
964
        self.start_server(ssh_server)
 
965
        port = ssh_server._listener.port
 
966
 
 
967
        if sys.platform == 'win32':
 
968
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
969
        else:
 
970
            bzr_remote_path = self.get_bzr_path()
 
971
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
972
 
 
973
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
974
        # variable is used to tell bzr what command to run on the remote end.
 
975
        path_to_branch = osutils.abspath('.')
 
976
        if sys.platform == 'win32':
 
977
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
978
            # prefix a '/' to get the right path.
 
979
            path_to_branch = '/' + path_to_branch
 
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
981
        t = get_transport(url)
 
982
        self.permit_url(t.base)
 
983
        t.mkdir('foo')
 
984
 
 
985
        self.assertEqual(
 
986
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
987
            self.command_executed)
 
988
        # Make sure to disconnect, so that the remote process can stop, and we
 
989
        # can cleanup. Then pause the test until everything is shutdown
 
990
        t._client._medium.disconnect()
 
991
        if not started:
 
992
            return
 
993
        # First wait for the subprocess
 
994
        started[0].wait()
 
995
        # And the rest are threads
 
996
        for t in started[1:]:
 
997
            t.join()