~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2010-01-29 14:09:05 UTC
  • mto: This revision was merged to the branch mainline in revision 4992.
  • Revision ID: mbp@sourcefrog.net-20100129140905-2uiarb6p8di1ywsr
Correction to url

from review: https://code.edge.launchpad.net/~mbp/bzr/doc/+merge/18250

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
 
    transport,
 
28
    transport as _mod_transport,
29
29
    urlutils,
30
30
    )
31
31
from bzrlib.transport import (
32
 
    chroot,
33
32
    fakenfs,
34
 
    local,
35
33
    memory,
36
 
    pathfilter,
37
34
    readonly,
38
35
    )
39
 
from bzrlib.tests import (
40
 
    features,
41
 
    test_server,
42
 
    )
 
36
from bzrlib.errors import (DependencyNotPresent,
 
37
                           FileExists,
 
38
                           InvalidURLJoin,
 
39
                           NoSuchFile,
 
40
                           PathNotChild,
 
41
                           ReadError,
 
42
                           UnsupportedProtocol,
 
43
                           )
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
45
from bzrlib.transport import (_clear_protocol_handlers,
 
46
                              _CoalescedOffset,
 
47
                              ConnectedTransport,
 
48
                              _get_protocol_handlers,
 
49
                              _set_protocol_handlers,
 
50
                              _get_transport_modules,
 
51
                              get_transport,
 
52
                              LateReadError,
 
53
                              register_lazy_transport,
 
54
                              register_transport_proto,
 
55
                              Transport,
 
56
                              )
 
57
from bzrlib.transport.chroot import ChrootServer
 
58
from bzrlib.transport.local import (LocalTransport,
 
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
43
61
 
44
62
 
45
63
# TODO: Should possibly split transport-specific tests into their own files.
46
64
 
47
65
 
48
 
class TestTransport(tests.TestCase):
 
66
class TestTransport(TestCase):
49
67
    """Test the non transport-concrete class functionality."""
50
68
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
69
    def test__get_set_protocol_handlers(self):
55
 
        handlers = transport._get_protocol_handlers()
 
70
        handlers = _get_protocol_handlers()
56
71
        self.assertNotEqual([], handlers.keys( ))
57
72
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
73
            _clear_protocol_handlers()
 
74
            self.assertEqual([], _get_protocol_handlers().keys())
60
75
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
76
            _set_protocol_handlers(handlers)
62
77
 
63
78
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
 
79
        handlers = _get_protocol_handlers()
65
80
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
 
81
        _clear_protocol_handlers()
67
82
        class SampleHandler(object):
68
83
            """I exist, isnt that enough?"""
69
84
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
 
85
            _clear_protocol_handlers()
 
86
            register_transport_proto('foo')
 
87
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
88
                                    'TestTransport.SampleHandler')
 
89
            register_transport_proto('bar')
 
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
91
                                    'TestTransport.SampleHandler')
79
92
            self.assertEqual([SampleHandler.__module__,
80
93
                              'bzrlib.transport.chroot',
81
94
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
 
95
                             _get_transport_modules())
83
96
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
97
            _set_protocol_handlers(handlers)
85
98
 
86
99
    def test_transport_dependency(self):
87
100
        """Transport with missing dependency causes no error"""
88
 
        saved_handlers = transport._get_protocol_handlers()
 
101
        saved_handlers = _get_protocol_handlers()
89
102
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
 
103
        _clear_protocol_handlers()
91
104
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
105
            register_transport_proto('foo')
 
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
107
                    'BadTransportHandler')
95
108
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
 
109
                get_transport('foo://fooserver/foo')
 
110
            except UnsupportedProtocol, e:
98
111
                e_str = str(e)
99
112
                self.assertEquals('Unsupported protocol'
100
113
                                  ' for url "foo://fooserver/foo":'
104
117
                self.fail('Did not raise UnsupportedProtocol')
105
118
        finally:
106
119
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
120
            _set_protocol_handlers(saved_handlers)
108
121
 
109
122
    def test_transport_fallback(self):
110
123
        """Transport with missing dependency causes no error"""
111
 
        saved_handlers = transport._get_protocol_handlers()
 
124
        saved_handlers = _get_protocol_handlers()
112
125
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
 
126
            _clear_protocol_handlers()
 
127
            register_transport_proto('foo')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BackupTransportHandler')
 
130
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
131
                    'BadTransportHandler')
 
132
            t = get_transport('foo://fooserver/foo')
120
133
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
134
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
135
            _set_protocol_handlers(saved_handlers)
123
136
 
124
137
    def test_ssh_hints(self):
125
138
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
139
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
140
            get_transport('ssh://fooserver/foo')
 
141
        except UnsupportedProtocol, e:
129
142
            e_str = str(e)
130
143
            self.assertEquals('Unsupported protocol'
131
144
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
 
145
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
134
146
                              str(e))
135
147
        else:
136
148
            self.fail('Did not raise UnsupportedProtocol')
137
149
 
138
150
    def test_LateReadError(self):
139
151
        """The LateReadError helper should raise on read()."""
140
 
        a_file = transport.LateReadError('a path')
 
152
        a_file = LateReadError('a path')
141
153
        try:
142
154
            a_file.read()
143
 
        except errors.ReadError, error:
 
155
        except ReadError, error:
144
156
            self.assertEqual('a path', error.path)
145
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
157
        self.assertRaises(ReadError, a_file.read, 40)
146
158
        a_file.close()
147
159
 
148
160
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
 
161
        t = Transport('/')
150
162
        self.assertEqual('/home/sarah/project/foo',
151
163
                         t._combine_paths('/home/sarah', 'project/foo'))
152
164
        self.assertEqual('/etc',
163
175
        self.assertEqual('memory:///t is not a local path.', str(e))
164
176
 
165
177
 
166
 
class TestCoalesceOffsets(tests.TestCase):
 
178
class TestCoalesceOffsets(TestCase):
167
179
 
168
180
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
169
 
        coalesce = transport.Transport._coalesce_offsets
170
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
181
        coalesce = Transport._coalesce_offsets
 
182
        exp = [_CoalescedOffset(*x) for x in expected]
171
183
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
172
184
                            max_size=max_size))
173
185
        self.assertEqual(exp, out)
248
260
                   max_size=1*1024*1024*1024)
249
261
 
250
262
 
251
 
class TestMemoryServer(tests.TestCase):
 
263
class TestMemoryServer(TestCase):
252
264
 
253
265
    def test_create_server(self):
254
266
        server = memory.MemoryServer()
255
267
        server.start_server()
256
268
        url = server.get_url()
257
 
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
259
271
        del t
260
272
        server.stop_server()
261
 
        self.assertFalse(url in transport.transport_list_registry)
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
262
274
        self.assertRaises(errors.UnsupportedProtocol,
263
 
                          transport.get_transport, url)
264
 
 
265
 
 
266
 
class TestMemoryTransport(tests.TestCase):
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
 
278
class TestMemoryTransport(TestCase):
267
279
 
268
280
    def test_get_transport(self):
269
281
        memory.MemoryTransport()
270
282
 
271
283
    def test_clone(self):
272
 
        t = memory.MemoryTransport()
273
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
274
 
        self.assertEqual("memory:///", t.clone("/").base)
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
286
        self.assertEqual("memory:///", transport.clone("/").base)
275
287
 
276
288
    def test_abspath(self):
277
 
        t = memory.MemoryTransport()
278
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
289
        transport = memory.MemoryTransport()
 
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
279
291
 
280
292
    def test_abspath_of_root(self):
281
 
        t = memory.MemoryTransport()
282
 
        self.assertEqual("memory:///", t.base)
283
 
        self.assertEqual("memory:///", t.abspath('/'))
 
293
        transport = memory.MemoryTransport()
 
294
        self.assertEqual("memory:///", transport.base)
 
295
        self.assertEqual("memory:///", transport.abspath('/'))
284
296
 
285
297
    def test_abspath_of_relpath_starting_at_root(self):
286
 
        t = memory.MemoryTransport()
287
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
298
        transport = memory.MemoryTransport()
 
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
288
300
 
289
301
    def test_append_and_get(self):
290
 
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
302
        transport = memory.MemoryTransport()
 
303
        transport.append_bytes('path', 'content')
 
304
        self.assertEqual(transport.get('path').read(), 'content')
 
305
        transport.append_file('path', StringIO('content'))
 
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
295
307
 
296
308
    def test_put_and_get(self):
297
 
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
309
        transport = memory.MemoryTransport()
 
310
        transport.put_file('path', StringIO('content'))
 
311
        self.assertEqual(transport.get('path').read(), 'content')
 
312
        transport.put_bytes('path', 'content')
 
313
        self.assertEqual(transport.get('path').read(), 'content')
302
314
 
303
315
    def test_append_without_dir_fails(self):
304
 
        t = memory.MemoryTransport()
305
 
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
316
        transport = memory.MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.append_bytes, 'dir/path', 'content')
307
319
 
308
320
    def test_put_without_dir_fails(self):
309
 
        t = memory.MemoryTransport()
310
 
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
321
        transport = memory.MemoryTransport()
 
322
        self.assertRaises(NoSuchFile,
 
323
                          transport.put_file, 'dir/path', StringIO('content'))
312
324
 
313
325
    def test_get_missing(self):
314
326
        transport = memory.MemoryTransport()
315
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
316
328
 
317
329
    def test_has_missing(self):
318
 
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
330
        transport = memory.MemoryTransport()
 
331
        self.assertEquals(False, transport.has('foo'))
320
332
 
321
333
    def test_has_present(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
334
        transport = memory.MemoryTransport()
 
335
        transport.append_bytes('foo', 'content')
 
336
        self.assertEquals(True, transport.has('foo'))
325
337
 
326
338
    def test_list_dir(self):
327
 
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
329
 
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
339
        transport = memory.MemoryTransport()
 
340
        transport.put_bytes('foo', 'content')
 
341
        transport.mkdir('dir')
 
342
        transport.put_bytes('dir/subfoo', 'content')
 
343
        transport.put_bytes('dirlike', 'content')
332
344
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
345
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
335
347
 
336
348
    def test_mkdir(self):
337
 
        t = memory.MemoryTransport()
338
 
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
349
        transport = memory.MemoryTransport()
 
350
        transport.mkdir('dir')
 
351
        transport.append_bytes('dir/path', 'content')
 
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
341
353
 
342
354
    def test_mkdir_missing_parent(self):
343
 
        t = memory.MemoryTransport()
344
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
355
        transport = memory.MemoryTransport()
 
356
        self.assertRaises(NoSuchFile,
 
357
                          transport.mkdir, 'dir/dir')
345
358
 
346
359
    def test_mkdir_twice(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.mkdir('dir')
349
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
360
        transport = memory.MemoryTransport()
 
361
        transport.mkdir('dir')
 
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
350
363
 
351
364
    def test_parameters(self):
352
 
        t = memory.MemoryTransport()
353
 
        self.assertEqual(True, t.listable())
354
 
        self.assertEqual(False, t.is_readonly())
 
365
        transport = memory.MemoryTransport()
 
366
        self.assertEqual(True, transport.listable())
 
367
        self.assertEqual(False, transport.is_readonly())
355
368
 
356
369
    def test_iter_files_recursive(self):
357
 
        t = memory.MemoryTransport()
358
 
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
362
 
        paths = set(t.iter_files_recursive())
 
370
        transport = memory.MemoryTransport()
 
371
        transport.mkdir('dir')
 
372
        transport.put_bytes('dir/foo', 'content')
 
373
        transport.put_bytes('dir/bar', 'content')
 
374
        transport.put_bytes('bar', 'content')
 
375
        paths = set(transport.iter_files_recursive())
363
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
364
377
 
365
378
    def test_stat(self):
366
 
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
369
 
        self.assertEqual(7, t.stat('foo').st_size)
370
 
        self.assertEqual(6, t.stat('bar').st_size)
371
 
 
372
 
 
373
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
379
        transport = memory.MemoryTransport()
 
380
        transport.put_bytes('foo', 'content')
 
381
        transport.put_bytes('bar', 'phowar')
 
382
        self.assertEqual(7, transport.stat('foo').st_size)
 
383
        self.assertEqual(6, transport.stat('bar').st_size)
 
384
 
 
385
 
 
386
class ChrootDecoratorTransportTest(TestCase):
374
387
    """Chroot decoration specific tests."""
375
388
 
376
389
    def test_abspath(self):
377
390
        # The abspath is always relative to the chroot_url.
378
 
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
380
392
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
382
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
393
        transport = get_transport(server.get_url())
 
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
383
395
 
384
 
        subdir_t = t.clone('subdir')
385
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
396
        subdir_transport = transport.clone('subdir')
 
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
386
398
 
387
399
    def test_clone(self):
388
 
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
390
401
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
402
        transport = get_transport(server.get_url())
392
403
        # relpath from root and root path are the same
393
 
        relpath_cloned = t.clone('foo')
394
 
        abspath_cloned = t.clone('/foo')
 
404
        relpath_cloned = transport.clone('foo')
 
405
        abspath_cloned = transport.clone('/foo')
395
406
        self.assertEqual(server, relpath_cloned.server)
396
407
        self.assertEqual(server, abspath_cloned.server)
397
408
 
403
414
        This is so that it is not possible to escape a chroot by doing::
404
415
            url = chroot_transport.base
405
416
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
417
            new_transport = get_transport(parent_url)
407
418
        """
408
 
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
410
420
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
413
 
        self.assertEqual(t.server, new_t.server)
414
 
        self.assertEqual(t.base, new_t.base)
 
421
        transport = get_transport(server.get_url())
 
422
        new_transport = get_transport(transport.base)
 
423
        self.assertEqual(transport.server, new_transport.server)
 
424
        self.assertEqual(transport.base, new_transport.base)
415
425
 
416
426
    def test_urljoin_preserves_chroot(self):
417
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
420
430
        This is so that it is not possible to escape a chroot by doing::
421
431
            url = chroot_transport.base
422
432
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
433
            new_transport = get_transport(parent_url)
424
434
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
435
        server = ChrootServer(get_transport('memory:///path/'))
426
436
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
437
        transport = get_transport(server.get_url())
428
438
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
 
 
431
 
 
432
 
class TestChrootServer(tests.TestCase):
 
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
440
 
 
441
 
 
442
class ChrootServerTest(TestCase):
433
443
 
434
444
    def test_construct(self):
435
445
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
 
446
        server = ChrootServer(backing_transport)
437
447
        self.assertEqual(backing_transport, server.backing_transport)
438
448
 
439
449
    def test_setUp(self):
440
450
        backing_transport = memory.MemoryTransport()
441
 
        server = chroot.ChrootServer(backing_transport)
 
451
        server = ChrootServer(backing_transport)
442
452
        server.start_server()
443
453
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
446
455
        finally:
447
456
            server.stop_server()
448
457
 
449
458
    def test_stop_server(self):
450
459
        backing_transport = memory.MemoryTransport()
451
 
        server = chroot.ChrootServer(backing_transport)
 
460
        server = ChrootServer(backing_transport)
452
461
        server.start_server()
453
462
        server.stop_server()
454
 
        self.assertFalse(server.scheme
455
 
                         in transport._get_protocol_handlers().keys())
 
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
456
464
 
457
465
    def test_get_url(self):
458
466
        backing_transport = memory.MemoryTransport()
459
 
        server = chroot.ChrootServer(backing_transport)
 
467
        server = ChrootServer(backing_transport)
460
468
        server.start_server()
461
469
        try:
462
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
472
            server.stop_server()
465
473
 
466
474
 
467
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
475
class PathFilteringDecoratorTransportTest(TestCase):
468
476
    """Pathfilter decoration specific tests."""
469
477
 
470
478
    def test_abspath(self):
471
479
        # The abspath is always relative to the base of the backing transport.
472
 
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
474
481
            lambda x: x)
475
482
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
477
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
478
485
 
479
 
        subdir_t = t.clone('subdir')
480
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
481
488
        server.stop_server()
482
489
 
483
490
    def make_pf_transport(self, filter_func=None):
487
494
            parameter to override it."""
488
495
        if filter_func is None:
489
496
            filter_func = lambda x: x
490
 
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
492
499
        server.start_server()
493
500
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
501
        return get_transport(server.get_url())
495
502
 
496
503
    def test__filter(self):
497
504
        # _filter (with an identity func as filter_func) always returns
498
505
        # paths relative to the base of the backing transport.
499
 
        t = self.make_pf_transport()
500
 
        self.assertEqual('foo', t._filter('foo'))
501
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
502
 
        self.assertEqual('', t._filter('..'))
503
 
        self.assertEqual('', t._filter('/'))
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
504
511
        # The base of the pathfiltering transport is taken into account too.
505
 
        t = t.clone('subdir1/subdir2')
506
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
507
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
508
 
        self.assertEqual('subdir1', t._filter('..'))
509
 
        self.assertEqual('', t._filter('/'))
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
510
518
 
511
519
    def test_filter_invocation(self):
512
520
        filter_log = []
513
521
        def filter(path):
514
522
            filter_log.append(path)
515
523
            return path
516
 
        t = self.make_pf_transport(filter)
517
 
        t.has('abc')
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
518
526
        self.assertEqual(['abc'], filter_log)
519
527
        del filter_log[:]
520
 
        t.clone('abc').has('xyz')
 
528
        transport.clone('abc').has('xyz')
521
529
        self.assertEqual(['abc/xyz'], filter_log)
522
530
        del filter_log[:]
523
 
        t.has('/abc')
 
531
        transport.has('/abc')
524
532
        self.assertEqual(['abc'], filter_log)
525
533
 
526
534
    def test_clone(self):
527
 
        t = self.make_pf_transport()
 
535
        transport = self.make_pf_transport()
528
536
        # relpath from root and root path are the same
529
 
        relpath_cloned = t.clone('foo')
530
 
        abspath_cloned = t.clone('/foo')
531
 
        self.assertEqual(t.server, relpath_cloned.server)
532
 
        self.assertEqual(t.server, abspath_cloned.server)
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
533
541
 
534
542
    def test_url_preserves_pathfiltering(self):
535
543
        """Calling get_transport on a pathfiltered transport's base should
540
548
        otherwise) the filtering by doing::
541
549
            url = filtered_transport.base
542
550
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
551
            new_transport = get_transport(parent_url)
544
552
        """
545
 
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
547
 
        self.assertEqual(t.server, new_t.server)
548
 
        self.assertEqual(t.base, new_t.base)
549
 
 
550
 
 
551
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
 
557
 
 
558
 
 
559
class ReadonlyDecoratorTransportTest(TestCase):
552
560
    """Readonly decoration specific tests."""
553
561
 
554
562
    def test_local_parameters(self):
555
563
        # connect to . in readonly mode
556
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
557
 
        self.assertEqual(True, t.listable())
558
 
        self.assertEqual(True, t.is_readonly())
 
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
565
        self.assertEqual(True, transport.listable())
 
566
        self.assertEqual(True, transport.is_readonly())
559
567
 
560
568
    def test_http_parameters(self):
561
569
        from bzrlib.tests.http_server import HttpServer
562
570
        # connect to '.' via http which is not listable
563
571
        server = HttpServer()
564
572
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
567
 
        self.assertEqual(False, t.listable())
568
 
        self.assertEqual(True, t.is_readonly())
569
 
 
570
 
 
571
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
572
581
    """NFS decorator specific tests."""
573
582
 
574
583
    def get_nfs_transport(self, url):
578
587
    def test_local_parameters(self):
579
588
        # the listable and is_readonly parameters
580
589
        # are not changed by the fakenfs decorator
581
 
        t = self.get_nfs_transport('.')
582
 
        self.assertEqual(True, t.listable())
583
 
        self.assertEqual(False, t.is_readonly())
 
590
        transport = self.get_nfs_transport('.')
 
591
        self.assertEqual(True, transport.listable())
 
592
        self.assertEqual(False, transport.is_readonly())
584
593
 
585
594
    def test_http_parameters(self):
586
595
        # the listable and is_readonly parameters
589
598
        # connect to '.' via http which is not listable
590
599
        server = HttpServer()
591
600
        self.start_server(server)
592
 
        t = self.get_nfs_transport(server.get_url())
593
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
594
 
        self.assertEqual(False, t.listable())
595
 
        self.assertEqual(True, t.is_readonly())
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
596
606
 
597
607
    def test_fakenfs_server_default(self):
598
608
        # a FakeNFSServer() should bring up a local relpath server for itself
599
 
        server = test_server.FakeNFSServer()
 
609
        server = fakenfs.FakeNFSServer()
600
610
        self.start_server(server)
601
611
        # the url should be decorated appropriately
602
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
613
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
614
        transport = get_transport(server.get_url())
605
615
        # which must be a FakeNFSTransportDecorator instance.
606
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
607
617
 
608
618
    def test_fakenfs_rename_semantics(self):
609
619
        # a FakeNFS transport must mangle the way rename errors occur to
610
620
        # look like NFS problems.
611
 
        t = self.get_nfs_transport('.')
 
621
        transport = self.get_nfs_transport('.')
612
622
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
613
 
                        transport=t)
614
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
615
 
 
616
 
 
617
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
623
                        transport=transport)
 
624
        self.assertRaises(errors.ResourceBusy,
 
625
                          transport.rename, 'from', 'to')
 
626
 
 
627
 
 
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
618
629
    """Tests for simulation of VFAT restrictions"""
619
630
 
620
631
    def get_vfat_transport(self, url):
624
635
 
625
636
    def test_transport_creation(self):
626
637
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
627
 
        t = self.get_vfat_transport('.')
628
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
638
        transport = self.get_vfat_transport('.')
 
639
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
629
640
 
630
641
    def test_transport_mkdir(self):
631
 
        t = self.get_vfat_transport('.')
632
 
        t.mkdir('HELLO')
633
 
        self.assertTrue(t.has('hello'))
634
 
        self.assertTrue(t.has('Hello'))
 
642
        transport = self.get_vfat_transport('.')
 
643
        transport.mkdir('HELLO')
 
644
        self.assertTrue(transport.has('hello'))
 
645
        self.assertTrue(transport.has('Hello'))
635
646
 
636
647
    def test_forbidden_chars(self):
637
 
        t = self.get_vfat_transport('.')
638
 
        self.assertRaises(ValueError, t.has, "<NU>")
639
 
 
640
 
 
641
 
class BadTransportHandler(transport.Transport):
 
648
        transport = self.get_vfat_transport('.')
 
649
        self.assertRaises(ValueError, transport.has, "<NU>")
 
650
 
 
651
 
 
652
class BadTransportHandler(Transport):
642
653
    def __init__(self, base_url):
643
 
        raise errors.DependencyNotPresent('some_lib',
644
 
                                          'testing missing dependency')
645
 
 
646
 
 
647
 
class BackupTransportHandler(transport.Transport):
 
654
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
655
 
 
656
 
 
657
class BackupTransportHandler(Transport):
648
658
    """Test transport that works as a backup for the BadTransportHandler"""
649
659
    pass
650
660
 
651
661
 
652
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
662
class TestTransportImplementation(TestCaseInTempDir):
653
663
    """Implementation verification for transports.
654
664
 
655
665
    To verify a transport we need a server factory, which is a callable
684
694
        base_url = self._server.get_url()
685
695
        url = self._adjust_url(base_url, relpath)
686
696
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
697
        t = get_transport(url)
688
698
        # vila--20070607 if the following are commented out the test suite
689
699
        # still pass. Is this really still needed or was it a forgotten
690
700
        # temporary fix ?
695
705
        return t
696
706
 
697
707
 
698
 
class TestLocalTransports(tests.TestCase):
 
708
class TestLocalTransports(TestCase):
699
709
 
700
710
    def test_get_transport_from_abspath(self):
701
711
        here = osutils.abspath('.')
702
 
        t = transport.get_transport(here)
703
 
        self.assertIsInstance(t, local.LocalTransport)
 
712
        t = get_transport(here)
 
713
        self.assertIsInstance(t, LocalTransport)
704
714
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
705
715
 
706
716
    def test_get_transport_from_relpath(self):
707
717
        here = osutils.abspath('.')
708
 
        t = transport.get_transport('.')
709
 
        self.assertIsInstance(t, local.LocalTransport)
 
718
        t = get_transport('.')
 
719
        self.assertIsInstance(t, LocalTransport)
710
720
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
711
721
 
712
722
    def test_get_transport_from_local_url(self):
713
723
        here = osutils.abspath('.')
714
724
        here_url = urlutils.local_path_to_url(here) + '/'
715
 
        t = transport.get_transport(here_url)
716
 
        self.assertIsInstance(t, local.LocalTransport)
 
725
        t = get_transport(here_url)
 
726
        self.assertIsInstance(t, LocalTransport)
717
727
        self.assertEquals(t.base, here_url)
718
728
 
719
729
    def test_local_abspath(self):
720
730
        here = osutils.abspath('.')
721
 
        t = transport.get_transport(here)
 
731
        t = get_transport(here)
722
732
        self.assertEquals(t.local_abspath(''), here)
723
733
 
724
734
 
725
 
class TestWin32LocalTransport(tests.TestCase):
 
735
class TestWin32LocalTransport(TestCase):
726
736
 
727
737
    def test_unc_clone_to_root(self):
728
738
        # Win32 UNC path like \\HOST\path
729
739
        # clone to root should stop at least at \\HOST part
730
740
        # not on \\
731
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
741
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
742
        for i in xrange(4):
733
743
            t = t.clone('..')
734
744
        self.assertEquals(t.base, 'file://HOST/')
737
747
        self.assertEquals(t.base, 'file://HOST/')
738
748
 
739
749
 
740
 
class TestConnectedTransport(tests.TestCase):
 
750
class TestConnectedTransport(TestCase):
741
751
    """Tests for connected to remote server transports"""
742
752
 
743
753
    def test_parse_url(self):
744
 
        t = transport.ConnectedTransport(
745
 
            'http://simple.example.com/home/source')
 
754
        t = ConnectedTransport('http://simple.example.com/home/source')
746
755
        self.assertEquals(t._host, 'simple.example.com')
747
756
        self.assertEquals(t._port, None)
748
757
        self.assertEquals(t._path, '/home/source/')
753
762
 
754
763
    def test_parse_url_with_at_in_user(self):
755
764
        # Bug 228058
756
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
765
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
757
766
        self.assertEquals(t._user, 'user@host.com')
758
767
 
759
768
    def test_parse_quoted_url(self):
760
 
        t = transport.ConnectedTransport(
761
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
769
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
770
        self.assertEquals(t._host, 'exAmple.com')
763
771
        self.assertEquals(t._port, 2222)
764
772
        self.assertEquals(t._user, 'robey')
770
778
 
771
779
    def test_parse_invalid_url(self):
772
780
        self.assertRaises(errors.InvalidURL,
773
 
                          transport.ConnectedTransport,
 
781
                          ConnectedTransport,
774
782
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
783
 
776
784
    def test_relpath(self):
777
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
785
        t = ConnectedTransport('sftp://user@host.com/abs/path')
778
786
 
779
787
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
780
788
        self.assertRaises(errors.PathNotChild, t.relpath,
786
794
        self.assertRaises(errors.PathNotChild, t.relpath,
787
795
                          'sftp://user@host.com:33/abs/path/sub')
788
796
        # Make sure it works when we don't supply a username
789
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
797
        t = ConnectedTransport('sftp://host.com/abs/path')
790
798
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
799
 
792
800
        # Make sure it works when parts of the path will be url encoded
793
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
801
        t = ConnectedTransport('sftp://host.com/dev/%path')
794
802
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
803
 
796
804
    def test_connection_sharing_propagate_credentials(self):
797
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
805
        t = ConnectedTransport('ftp://user@host.com/abs/path')
798
806
        self.assertEquals('user', t._user)
799
807
        self.assertEquals('host.com', t._host)
800
808
        self.assertIs(None, t._get_connection())
821
829
        self.assertIs(new_password, c._get_credentials())
822
830
 
823
831
 
824
 
class TestReusedTransports(tests.TestCase):
 
832
class TestReusedTransports(TestCase):
825
833
    """Tests for transport reuse"""
826
834
 
827
835
    def test_reuse_same_transport(self):
828
836
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
837
        t1 = get_transport('http://foo/',
 
838
                           possible_transports=possible_transports)
831
839
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
840
        t2 = get_transport('http://foo/', possible_transports=[t1])
834
841
        self.assertIs(t1, t2)
835
842
 
836
843
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
844
        t3 = get_transport('http://foo/path/')
 
845
        t4 = get_transport('http://foo/path', possible_transports=[t3])
840
846
        self.assertIs(t3, t4)
841
847
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
848
        t5 = get_transport('http://foo/path')
 
849
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
845
850
        self.assertIs(t5, t6)
846
851
 
847
852
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
853
        t1 = get_transport('http://foo/path')
 
854
        t2 = get_transport('http://bar/path', possible_transports=[t1])
851
855
        self.assertIsNot(t1, t2)
852
856
 
853
857
 
854
 
class TestTransportTrace(tests.TestCase):
 
858
class TestTransportTrace(TestCase):
855
859
 
856
860
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
861
        transport = get_transport('trace+memory://')
 
862
        self.assertIsInstance(
 
863
            transport, bzrlib.transport.trace.TransportTraceDecorator)
859
864
 
860
865
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
862
 
        t2 = t.clone('.')
863
 
        self.assertTrue(t is not t2)
864
 
        self.assertTrue(t._activity is t2._activity)
 
866
        transport = get_transport('trace+memory://')
 
867
        transport2 = transport.clone('.')
 
868
        self.assertTrue(transport is not transport2)
 
869
        self.assertTrue(transport._activity is transport2._activity)
865
870
 
866
871
    # the following specific tests are for the operations that have made use of
867
872
    # logging in tests; we could test every single operation but doing that
868
873
    # still won't cause a test failure when the top level Transport API
869
874
    # changes; so there is little return doing that.
870
875
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
873
 
        t.get('foo')
 
876
        transport = get_transport('trace+memory:///')
 
877
        transport.put_bytes('foo', 'barish')
 
878
        transport.get('foo')
874
879
        expected_result = []
875
880
        # put_bytes records the bytes, not the content to avoid memory
876
881
        # pressure.
877
882
        expected_result.append(('put_bytes', 'foo', 6, None))
878
883
        # get records the file name only.
879
884
        expected_result.append(('get', 'foo'))
880
 
        self.assertEqual(expected_result, t._activity)
 
885
        self.assertEqual(expected_result, transport._activity)
881
886
 
882
887
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
885
 
        list(t.readv('foo', [(0, 1), (3, 2)],
886
 
                     adjust_for_latency=True, upper_limit=6))
 
888
        transport = get_transport('trace+memory:///')
 
889
        transport.put_bytes('foo', 'barish')
 
890
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
891
            upper_limit=6))
887
892
        expected_result = []
888
893
        # put_bytes records the bytes, not the content to avoid memory
889
894
        # pressure.
890
895
        expected_result.append(('put_bytes', 'foo', 6, None))
891
896
        # readv records the supplied offset request
892
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
893
 
        self.assertEqual(expected_result, t._activity)
 
898
        self.assertEqual(expected_result, transport._activity)
894
899
 
895
900
 
896
901
class TestSSHConnections(tests.TestCaseWithTransport):
909
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
915
        # override the interface (doesn't change self._vendor).
911
916
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
913
919
 
914
920
        # Start an SSH server
915
921
        self.command_executed = []
918
924
        # SSH channel ourselves.  Surely this has already been implemented
919
925
        # elsewhere?
920
926
        started = []
921
 
        class StubSSHServer(stub_sftp.StubServer):
 
927
        class StubSSHServer(StubServer):
922
928
 
923
929
            test = self
924
930
 
952
958
 
953
959
                return True
954
960
 
955
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
956
962
        # We *don't* want to override the default SSH vendor: the detected one
957
963
        # is the one to use.
958
964
        self.start_server(ssh_server)
972
978
            # prefix a '/' to get the right path.
973
979
            path_to_branch = '/' + path_to_branch
974
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
975
 
        t = transport.get_transport(url)
 
981
        t = get_transport(url)
976
982
        self.permit_url(t.base)
977
983
        t.mkdir('foo')
978
984