~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2010-02-10 17:52:08 UTC
  • mfrom: (5021 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5023.
  • Revision ID: john@arbash-meinel.com-20100210175208-bubuwav4uqigu291
Merge bzr.dev 5021 to resolve NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
 
    transport as _mod_transport,
 
28
    transport,
29
29
    urlutils,
30
30
    )
31
31
from bzrlib.transport import (
 
32
    chroot,
32
33
    fakenfs,
 
34
    local,
33
35
    memory,
 
36
    pathfilter,
34
37
    readonly,
35
38
    )
36
 
from bzrlib.errors import (DependencyNotPresent,
37
 
                           FileExists,
38
 
                           InvalidURLJoin,
39
 
                           NoSuchFile,
40
 
                           PathNotChild,
41
 
                           ReadError,
42
 
                           UnsupportedProtocol,
43
 
                           )
44
 
from bzrlib.tests import features, TestCase, TestCaseInTempDir
45
 
from bzrlib.transport import (_clear_protocol_handlers,
46
 
                              _CoalescedOffset,
47
 
                              ConnectedTransport,
48
 
                              _get_protocol_handlers,
49
 
                              _set_protocol_handlers,
50
 
                              _get_transport_modules,
51
 
                              get_transport,
52
 
                              LateReadError,
53
 
                              register_lazy_transport,
54
 
                              register_transport_proto,
55
 
                              Transport,
56
 
                              )
57
 
from bzrlib.transport.chroot import ChrootServer
58
 
from bzrlib.transport.local import (LocalTransport,
59
 
                                    EmulatedWin32LocalTransport)
60
 
from bzrlib.transport.pathfilter import PathFilteringServer
 
39
from bzrlib.tests import features
61
40
 
62
41
 
63
42
# TODO: Should possibly split transport-specific tests into their own files.
64
43
 
65
44
 
66
 
class TestTransport(TestCase):
 
45
class TestTransport(tests.TestCase):
67
46
    """Test the non transport-concrete class functionality."""
68
47
 
 
48
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
49
    # of try/finally -- vila 20100205
 
50
 
69
51
    def test__get_set_protocol_handlers(self):
70
 
        handlers = _get_protocol_handlers()
 
52
        handlers = transport._get_protocol_handlers()
71
53
        self.assertNotEqual([], handlers.keys( ))
72
54
        try:
73
 
            _clear_protocol_handlers()
74
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
55
            transport._clear_protocol_handlers()
 
56
            self.assertEqual([], transport._get_protocol_handlers().keys())
75
57
        finally:
76
 
            _set_protocol_handlers(handlers)
 
58
            transport._set_protocol_handlers(handlers)
77
59
 
78
60
    def test_get_transport_modules(self):
79
 
        handlers = _get_protocol_handlers()
 
61
        handlers = transport._get_protocol_handlers()
80
62
        # don't pollute the current handlers
81
 
        _clear_protocol_handlers()
 
63
        transport._clear_protocol_handlers()
82
64
        class SampleHandler(object):
83
65
            """I exist, isnt that enough?"""
84
66
        try:
85
 
            _clear_protocol_handlers()
86
 
            register_transport_proto('foo')
87
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
88
 
                                    'TestTransport.SampleHandler')
89
 
            register_transport_proto('bar')
90
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
91
 
                                    'TestTransport.SampleHandler')
 
67
            transport._clear_protocol_handlers()
 
68
            transport.register_transport_proto('foo')
 
69
            transport.register_lazy_transport('foo',
 
70
                                              'bzrlib.tests.test_transport',
 
71
                                              'TestTransport.SampleHandler')
 
72
            transport.register_transport_proto('bar')
 
73
            transport.register_lazy_transport('bar',
 
74
                                              'bzrlib.tests.test_transport',
 
75
                                              'TestTransport.SampleHandler')
92
76
            self.assertEqual([SampleHandler.__module__,
93
77
                              'bzrlib.transport.chroot',
94
78
                              'bzrlib.transport.pathfilter'],
95
 
                             _get_transport_modules())
 
79
                             transport._get_transport_modules())
96
80
        finally:
97
 
            _set_protocol_handlers(handlers)
 
81
            transport._set_protocol_handlers(handlers)
98
82
 
99
83
    def test_transport_dependency(self):
100
84
        """Transport with missing dependency causes no error"""
101
 
        saved_handlers = _get_protocol_handlers()
 
85
        saved_handlers = transport._get_protocol_handlers()
102
86
        # don't pollute the current handlers
103
 
        _clear_protocol_handlers()
 
87
        transport._clear_protocol_handlers()
104
88
        try:
105
 
            register_transport_proto('foo')
106
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
107
 
                    'BadTransportHandler')
 
89
            transport.register_transport_proto('foo')
 
90
            transport.register_lazy_transport(
 
91
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
108
92
            try:
109
 
                get_transport('foo://fooserver/foo')
110
 
            except UnsupportedProtocol, e:
 
93
                transport.get_transport('foo://fooserver/foo')
 
94
            except errors.UnsupportedProtocol, e:
111
95
                e_str = str(e)
112
96
                self.assertEquals('Unsupported protocol'
113
97
                                  ' for url "foo://fooserver/foo":'
117
101
                self.fail('Did not raise UnsupportedProtocol')
118
102
        finally:
119
103
            # restore original values
120
 
            _set_protocol_handlers(saved_handlers)
 
104
            transport._set_protocol_handlers(saved_handlers)
121
105
 
122
106
    def test_transport_fallback(self):
123
107
        """Transport with missing dependency causes no error"""
124
 
        saved_handlers = _get_protocol_handlers()
 
108
        saved_handlers = transport._get_protocol_handlers()
125
109
        try:
126
 
            _clear_protocol_handlers()
127
 
            register_transport_proto('foo')
128
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
129
 
                    'BackupTransportHandler')
130
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
131
 
                    'BadTransportHandler')
132
 
            t = get_transport('foo://fooserver/foo')
 
110
            transport._clear_protocol_handlers()
 
111
            transport.register_transport_proto('foo')
 
112
            transport.register_lazy_transport(
 
113
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
114
            transport.register_lazy_transport(
 
115
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
116
            t = transport.get_transport('foo://fooserver/foo')
133
117
            self.assertTrue(isinstance(t, BackupTransportHandler))
134
118
        finally:
135
 
            _set_protocol_handlers(saved_handlers)
 
119
            transport._set_protocol_handlers(saved_handlers)
136
120
 
137
121
    def test_ssh_hints(self):
138
122
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
139
123
        try:
140
 
            get_transport('ssh://fooserver/foo')
141
 
        except UnsupportedProtocol, e:
 
124
            transport.get_transport('ssh://fooserver/foo')
 
125
        except errors.UnsupportedProtocol, e:
142
126
            e_str = str(e)
143
127
            self.assertEquals('Unsupported protocol'
144
128
                              ' for url "ssh://fooserver/foo":'
145
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
129
                              ' bzr supports bzr+ssh to operate over ssh,'
 
130
                              ' use "bzr+ssh://fooserver/foo".',
146
131
                              str(e))
147
132
        else:
148
133
            self.fail('Did not raise UnsupportedProtocol')
149
134
 
150
135
    def test_LateReadError(self):
151
136
        """The LateReadError helper should raise on read()."""
152
 
        a_file = LateReadError('a path')
 
137
        a_file = transport.LateReadError('a path')
153
138
        try:
154
139
            a_file.read()
155
 
        except ReadError, error:
 
140
        except errors.ReadError, error:
156
141
            self.assertEqual('a path', error.path)
157
 
        self.assertRaises(ReadError, a_file.read, 40)
 
142
        self.assertRaises(errors.ReadError, a_file.read, 40)
158
143
        a_file.close()
159
144
 
160
145
    def test__combine_paths(self):
161
 
        t = Transport('/')
 
146
        t = transport.Transport('/')
162
147
        self.assertEqual('/home/sarah/project/foo',
163
148
                         t._combine_paths('/home/sarah', 'project/foo'))
164
149
        self.assertEqual('/etc',
175
160
        self.assertEqual('memory:///t is not a local path.', str(e))
176
161
 
177
162
 
178
 
class TestCoalesceOffsets(TestCase):
 
163
class TestCoalesceOffsets(tests.TestCase):
179
164
 
180
165
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
181
 
        coalesce = Transport._coalesce_offsets
182
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
166
        coalesce = transport.Transport._coalesce_offsets
 
167
        exp = [transport._CoalescedOffset(*x) for x in expected]
183
168
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
184
169
                            max_size=max_size))
185
170
        self.assertEqual(exp, out)
260
245
                   max_size=1*1024*1024*1024)
261
246
 
262
247
 
263
 
class TestMemoryServer(TestCase):
 
248
class TestMemoryServer(tests.TestCase):
264
249
 
265
250
    def test_create_server(self):
266
251
        server = memory.MemoryServer()
267
252
        server.start_server()
268
253
        url = server.get_url()
269
 
        self.assertTrue(url in _mod_transport.transport_list_registry)
270
 
        t = _mod_transport.get_transport(url)
 
254
        self.assertTrue(url in transport.transport_list_registry)
 
255
        t = transport.get_transport(url)
271
256
        del t
272
257
        server.stop_server()
273
 
        self.assertFalse(url in _mod_transport.transport_list_registry)
 
258
        self.assertFalse(url in transport.transport_list_registry)
274
259
        self.assertRaises(errors.UnsupportedProtocol,
275
 
                          _mod_transport.get_transport, url)
276
 
 
277
 
 
278
 
class TestMemoryTransport(TestCase):
 
260
                          transport.get_transport, url)
 
261
 
 
262
 
 
263
class TestMemoryTransport(tests.TestCase):
279
264
 
280
265
    def test_get_transport(self):
281
266
        memory.MemoryTransport()
282
267
 
283
268
    def test_clone(self):
284
 
        transport = memory.MemoryTransport()
285
 
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
286
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
269
        t = memory.MemoryTransport()
 
270
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
271
        self.assertEqual("memory:///", t.clone("/").base)
287
272
 
288
273
    def test_abspath(self):
289
 
        transport = memory.MemoryTransport()
290
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
274
        t = memory.MemoryTransport()
 
275
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
291
276
 
292
277
    def test_abspath_of_root(self):
293
 
        transport = memory.MemoryTransport()
294
 
        self.assertEqual("memory:///", transport.base)
295
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
278
        t = memory.MemoryTransport()
 
279
        self.assertEqual("memory:///", t.base)
 
280
        self.assertEqual("memory:///", t.abspath('/'))
296
281
 
297
282
    def test_abspath_of_relpath_starting_at_root(self):
298
 
        transport = memory.MemoryTransport()
299
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
283
        t = memory.MemoryTransport()
 
284
        self.assertEqual("memory:///foo", t.abspath('/foo'))
300
285
 
301
286
    def test_append_and_get(self):
302
 
        transport = memory.MemoryTransport()
303
 
        transport.append_bytes('path', 'content')
304
 
        self.assertEqual(transport.get('path').read(), 'content')
305
 
        transport.append_file('path', StringIO('content'))
306
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
287
        t = memory.MemoryTransport()
 
288
        t.append_bytes('path', 'content')
 
289
        self.assertEqual(t.get('path').read(), 'content')
 
290
        t.append_file('path', StringIO('content'))
 
291
        self.assertEqual(t.get('path').read(), 'contentcontent')
307
292
 
308
293
    def test_put_and_get(self):
309
 
        transport = memory.MemoryTransport()
310
 
        transport.put_file('path', StringIO('content'))
311
 
        self.assertEqual(transport.get('path').read(), 'content')
312
 
        transport.put_bytes('path', 'content')
313
 
        self.assertEqual(transport.get('path').read(), 'content')
 
294
        t = memory.MemoryTransport()
 
295
        t.put_file('path', StringIO('content'))
 
296
        self.assertEqual(t.get('path').read(), 'content')
 
297
        t.put_bytes('path', 'content')
 
298
        self.assertEqual(t.get('path').read(), 'content')
314
299
 
315
300
    def test_append_without_dir_fails(self):
316
 
        transport = memory.MemoryTransport()
317
 
        self.assertRaises(NoSuchFile,
318
 
                          transport.append_bytes, 'dir/path', 'content')
 
301
        t = memory.MemoryTransport()
 
302
        self.assertRaises(errors.NoSuchFile,
 
303
                          t.append_bytes, 'dir/path', 'content')
319
304
 
320
305
    def test_put_without_dir_fails(self):
321
 
        transport = memory.MemoryTransport()
322
 
        self.assertRaises(NoSuchFile,
323
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
306
        t = memory.MemoryTransport()
 
307
        self.assertRaises(errors.NoSuchFile,
 
308
                          t.put_file, 'dir/path', StringIO('content'))
324
309
 
325
310
    def test_get_missing(self):
326
311
        transport = memory.MemoryTransport()
327
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
312
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
328
313
 
329
314
    def test_has_missing(self):
330
 
        transport = memory.MemoryTransport()
331
 
        self.assertEquals(False, transport.has('foo'))
 
315
        t = memory.MemoryTransport()
 
316
        self.assertEquals(False, t.has('foo'))
332
317
 
333
318
    def test_has_present(self):
334
 
        transport = memory.MemoryTransport()
335
 
        transport.append_bytes('foo', 'content')
336
 
        self.assertEquals(True, transport.has('foo'))
 
319
        t = memory.MemoryTransport()
 
320
        t.append_bytes('foo', 'content')
 
321
        self.assertEquals(True, t.has('foo'))
337
322
 
338
323
    def test_list_dir(self):
339
 
        transport = memory.MemoryTransport()
340
 
        transport.put_bytes('foo', 'content')
341
 
        transport.mkdir('dir')
342
 
        transport.put_bytes('dir/subfoo', 'content')
343
 
        transport.put_bytes('dirlike', 'content')
 
324
        t = memory.MemoryTransport()
 
325
        t.put_bytes('foo', 'content')
 
326
        t.mkdir('dir')
 
327
        t.put_bytes('dir/subfoo', 'content')
 
328
        t.put_bytes('dirlike', 'content')
344
329
 
345
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
346
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
330
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
331
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
347
332
 
348
333
    def test_mkdir(self):
349
 
        transport = memory.MemoryTransport()
350
 
        transport.mkdir('dir')
351
 
        transport.append_bytes('dir/path', 'content')
352
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
334
        t = memory.MemoryTransport()
 
335
        t.mkdir('dir')
 
336
        t.append_bytes('dir/path', 'content')
 
337
        self.assertEqual(t.get('dir/path').read(), 'content')
353
338
 
354
339
    def test_mkdir_missing_parent(self):
355
 
        transport = memory.MemoryTransport()
356
 
        self.assertRaises(NoSuchFile,
357
 
                          transport.mkdir, 'dir/dir')
 
340
        t = memory.MemoryTransport()
 
341
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
358
342
 
359
343
    def test_mkdir_twice(self):
360
 
        transport = memory.MemoryTransport()
361
 
        transport.mkdir('dir')
362
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
344
        t = memory.MemoryTransport()
 
345
        t.mkdir('dir')
 
346
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
363
347
 
364
348
    def test_parameters(self):
365
 
        transport = memory.MemoryTransport()
366
 
        self.assertEqual(True, transport.listable())
367
 
        self.assertEqual(False, transport.is_readonly())
 
349
        t = memory.MemoryTransport()
 
350
        self.assertEqual(True, t.listable())
 
351
        self.assertEqual(False, t.is_readonly())
368
352
 
369
353
    def test_iter_files_recursive(self):
370
 
        transport = memory.MemoryTransport()
371
 
        transport.mkdir('dir')
372
 
        transport.put_bytes('dir/foo', 'content')
373
 
        transport.put_bytes('dir/bar', 'content')
374
 
        transport.put_bytes('bar', 'content')
375
 
        paths = set(transport.iter_files_recursive())
 
354
        t = memory.MemoryTransport()
 
355
        t.mkdir('dir')
 
356
        t.put_bytes('dir/foo', 'content')
 
357
        t.put_bytes('dir/bar', 'content')
 
358
        t.put_bytes('bar', 'content')
 
359
        paths = set(t.iter_files_recursive())
376
360
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
377
361
 
378
362
    def test_stat(self):
379
 
        transport = memory.MemoryTransport()
380
 
        transport.put_bytes('foo', 'content')
381
 
        transport.put_bytes('bar', 'phowar')
382
 
        self.assertEqual(7, transport.stat('foo').st_size)
383
 
        self.assertEqual(6, transport.stat('bar').st_size)
384
 
 
385
 
 
386
 
class ChrootDecoratorTransportTest(TestCase):
 
363
        t = memory.MemoryTransport()
 
364
        t.put_bytes('foo', 'content')
 
365
        t.put_bytes('bar', 'phowar')
 
366
        self.assertEqual(7, t.stat('foo').st_size)
 
367
        self.assertEqual(6, t.stat('bar').st_size)
 
368
 
 
369
 
 
370
class ChrootDecoratorTransportTest(tests.TestCase):
387
371
    """Chroot decoration specific tests."""
388
372
 
389
373
    def test_abspath(self):
390
374
        # The abspath is always relative to the chroot_url.
391
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
375
        server = chroot.ChrootServer(
 
376
            transport.get_transport('memory:///foo/bar/'))
392
377
        self.start_server(server)
393
 
        transport = get_transport(server.get_url())
394
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
378
        t = transport.get_transport(server.get_url())
 
379
        self.assertEqual(server.get_url(), t.abspath('/'))
395
380
 
396
 
        subdir_transport = transport.clone('subdir')
397
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
381
        subdir_t = t.clone('subdir')
 
382
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
398
383
 
399
384
    def test_clone(self):
400
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
385
        server = chroot.ChrootServer(
 
386
            transport.get_transport('memory:///foo/bar/'))
401
387
        self.start_server(server)
402
 
        transport = get_transport(server.get_url())
 
388
        t = transport.get_transport(server.get_url())
403
389
        # relpath from root and root path are the same
404
 
        relpath_cloned = transport.clone('foo')
405
 
        abspath_cloned = transport.clone('/foo')
 
390
        relpath_cloned = t.clone('foo')
 
391
        abspath_cloned = t.clone('/foo')
406
392
        self.assertEqual(server, relpath_cloned.server)
407
393
        self.assertEqual(server, abspath_cloned.server)
408
394
 
414
400
        This is so that it is not possible to escape a chroot by doing::
415
401
            url = chroot_transport.base
416
402
            parent_url = urlutils.join(url, '..')
417
 
            new_transport = get_transport(parent_url)
 
403
            new_t = transport.get_transport(parent_url)
418
404
        """
419
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
405
        server = chroot.ChrootServer(
 
406
            transport.get_transport('memory:///path/subpath'))
420
407
        self.start_server(server)
421
 
        transport = get_transport(server.get_url())
422
 
        new_transport = get_transport(transport.base)
423
 
        self.assertEqual(transport.server, new_transport.server)
424
 
        self.assertEqual(transport.base, new_transport.base)
 
408
        t = transport.get_transport(server.get_url())
 
409
        new_t = transport.get_transport(t.base)
 
410
        self.assertEqual(t.server, new_t.server)
 
411
        self.assertEqual(t.base, new_t.base)
425
412
 
426
413
    def test_urljoin_preserves_chroot(self):
427
414
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
430
417
        This is so that it is not possible to escape a chroot by doing::
431
418
            url = chroot_transport.base
432
419
            parent_url = urlutils.join(url, '..')
433
 
            new_transport = get_transport(parent_url)
 
420
            new_t = transport.get_transport(parent_url)
434
421
        """
435
 
        server = ChrootServer(get_transport('memory:///path/'))
 
422
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
436
423
        self.start_server(server)
437
 
        transport = get_transport(server.get_url())
 
424
        t = transport.get_transport(server.get_url())
438
425
        self.assertRaises(
439
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
440
 
 
441
 
 
442
 
class ChrootServerTest(TestCase):
 
426
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
427
 
 
428
 
 
429
class TestChrootServer(tests.TestCase):
443
430
 
444
431
    def test_construct(self):
445
432
        backing_transport = memory.MemoryTransport()
446
 
        server = ChrootServer(backing_transport)
 
433
        server = chroot.ChrootServer(backing_transport)
447
434
        self.assertEqual(backing_transport, server.backing_transport)
448
435
 
449
436
    def test_setUp(self):
450
437
        backing_transport = memory.MemoryTransport()
451
 
        server = ChrootServer(backing_transport)
 
438
        server = chroot.ChrootServer(backing_transport)
452
439
        server.start_server()
453
440
        try:
454
 
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
441
            self.assertTrue(server.scheme
 
442
                            in transport._get_protocol_handlers().keys())
455
443
        finally:
456
444
            server.stop_server()
457
445
 
458
446
    def test_stop_server(self):
459
447
        backing_transport = memory.MemoryTransport()
460
 
        server = ChrootServer(backing_transport)
 
448
        server = chroot.ChrootServer(backing_transport)
461
449
        server.start_server()
462
450
        server.stop_server()
463
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
451
        self.assertFalse(server.scheme
 
452
                         in transport._get_protocol_handlers().keys())
464
453
 
465
454
    def test_get_url(self):
466
455
        backing_transport = memory.MemoryTransport()
467
 
        server = ChrootServer(backing_transport)
 
456
        server = chroot.ChrootServer(backing_transport)
468
457
        server.start_server()
469
458
        try:
470
459
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
472
461
            server.stop_server()
473
462
 
474
463
 
475
 
class PathFilteringDecoratorTransportTest(TestCase):
 
464
class PathFilteringDecoratorTransportTest(tests.TestCase):
476
465
    """Pathfilter decoration specific tests."""
477
466
 
478
467
    def test_abspath(self):
479
468
        # The abspath is always relative to the base of the backing transport.
480
 
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
469
        server = pathfilter.PathFilteringServer(
 
470
            transport.get_transport('memory:///foo/bar/'),
481
471
            lambda x: x)
482
472
        server.start_server()
483
 
        transport = get_transport(server.get_url())
484
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
473
        t = transport.get_transport(server.get_url())
 
474
        self.assertEqual(server.get_url(), t.abspath('/'))
485
475
 
486
 
        subdir_transport = transport.clone('subdir')
487
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
476
        subdir_t = t.clone('subdir')
 
477
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
488
478
        server.stop_server()
489
479
 
490
480
    def make_pf_transport(self, filter_func=None):
494
484
            parameter to override it."""
495
485
        if filter_func is None:
496
486
            filter_func = lambda x: x
497
 
        server = PathFilteringServer(
498
 
            get_transport('memory:///foo/bar/'), filter_func)
 
487
        server = pathfilter.PathFilteringServer(
 
488
            transport.get_transport('memory:///foo/bar/'), filter_func)
499
489
        server.start_server()
500
490
        self.addCleanup(server.stop_server)
501
 
        return get_transport(server.get_url())
 
491
        return transport.get_transport(server.get_url())
502
492
 
503
493
    def test__filter(self):
504
494
        # _filter (with an identity func as filter_func) always returns
505
495
        # paths relative to the base of the backing transport.
506
 
        transport = self.make_pf_transport()
507
 
        self.assertEqual('foo', transport._filter('foo'))
508
 
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
509
 
        self.assertEqual('', transport._filter('..'))
510
 
        self.assertEqual('', transport._filter('/'))
 
496
        t = self.make_pf_transport()
 
497
        self.assertEqual('foo', t._filter('foo'))
 
498
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
499
        self.assertEqual('', t._filter('..'))
 
500
        self.assertEqual('', t._filter('/'))
511
501
        # The base of the pathfiltering transport is taken into account too.
512
 
        transport = transport.clone('subdir1/subdir2')
513
 
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
514
 
        self.assertEqual(
515
 
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
516
 
        self.assertEqual('subdir1', transport._filter('..'))
517
 
        self.assertEqual('', transport._filter('/'))
 
502
        t = t.clone('subdir1/subdir2')
 
503
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
504
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
505
        self.assertEqual('subdir1', t._filter('..'))
 
506
        self.assertEqual('', t._filter('/'))
518
507
 
519
508
    def test_filter_invocation(self):
520
509
        filter_log = []
521
510
        def filter(path):
522
511
            filter_log.append(path)
523
512
            return path
524
 
        transport = self.make_pf_transport(filter)
525
 
        transport.has('abc')
 
513
        t = self.make_pf_transport(filter)
 
514
        t.has('abc')
526
515
        self.assertEqual(['abc'], filter_log)
527
516
        del filter_log[:]
528
 
        transport.clone('abc').has('xyz')
 
517
        t.clone('abc').has('xyz')
529
518
        self.assertEqual(['abc/xyz'], filter_log)
530
519
        del filter_log[:]
531
 
        transport.has('/abc')
 
520
        t.has('/abc')
532
521
        self.assertEqual(['abc'], filter_log)
533
522
 
534
523
    def test_clone(self):
535
 
        transport = self.make_pf_transport()
 
524
        t = self.make_pf_transport()
536
525
        # relpath from root and root path are the same
537
 
        relpath_cloned = transport.clone('foo')
538
 
        abspath_cloned = transport.clone('/foo')
539
 
        self.assertEqual(transport.server, relpath_cloned.server)
540
 
        self.assertEqual(transport.server, abspath_cloned.server)
 
526
        relpath_cloned = t.clone('foo')
 
527
        abspath_cloned = t.clone('/foo')
 
528
        self.assertEqual(t.server, relpath_cloned.server)
 
529
        self.assertEqual(t.server, abspath_cloned.server)
541
530
 
542
531
    def test_url_preserves_pathfiltering(self):
543
532
        """Calling get_transport on a pathfiltered transport's base should
548
537
        otherwise) the filtering by doing::
549
538
            url = filtered_transport.base
550
539
            parent_url = urlutils.join(url, '..')
551
 
            new_transport = get_transport(parent_url)
 
540
            new_t = transport.get_transport(parent_url)
552
541
        """
553
 
        transport = self.make_pf_transport()
554
 
        new_transport = get_transport(transport.base)
555
 
        self.assertEqual(transport.server, new_transport.server)
556
 
        self.assertEqual(transport.base, new_transport.base)
557
 
 
558
 
 
559
 
class ReadonlyDecoratorTransportTest(TestCase):
 
542
        t = self.make_pf_transport()
 
543
        new_t = transport.get_transport(t.base)
 
544
        self.assertEqual(t.server, new_t.server)
 
545
        self.assertEqual(t.base, new_t.base)
 
546
 
 
547
 
 
548
class ReadonlyDecoratorTransportTest(tests.TestCase):
560
549
    """Readonly decoration specific tests."""
561
550
 
562
551
    def test_local_parameters(self):
563
552
        # connect to . in readonly mode
564
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
565
 
        self.assertEqual(True, transport.listable())
566
 
        self.assertEqual(True, transport.is_readonly())
 
553
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
554
        self.assertEqual(True, t.listable())
 
555
        self.assertEqual(True, t.is_readonly())
567
556
 
568
557
    def test_http_parameters(self):
569
558
        from bzrlib.tests.http_server import HttpServer
570
559
        # connect to '.' via http which is not listable
571
560
        server = HttpServer()
572
561
        self.start_server(server)
573
 
        transport = get_transport('readonly+' + server.get_url())
574
 
        self.failUnless(isinstance(transport,
575
 
                                   readonly.ReadonlyTransportDecorator))
576
 
        self.assertEqual(False, transport.listable())
577
 
        self.assertEqual(True, transport.is_readonly())
578
 
 
579
 
 
580
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
562
        t = transport.get_transport('readonly+' + server.get_url())
 
563
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
564
        self.assertEqual(False, t.listable())
 
565
        self.assertEqual(True, t.is_readonly())
 
566
 
 
567
 
 
568
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
581
569
    """NFS decorator specific tests."""
582
570
 
583
571
    def get_nfs_transport(self, url):
587
575
    def test_local_parameters(self):
588
576
        # the listable and is_readonly parameters
589
577
        # are not changed by the fakenfs decorator
590
 
        transport = self.get_nfs_transport('.')
591
 
        self.assertEqual(True, transport.listable())
592
 
        self.assertEqual(False, transport.is_readonly())
 
578
        t = self.get_nfs_transport('.')
 
579
        self.assertEqual(True, t.listable())
 
580
        self.assertEqual(False, t.is_readonly())
593
581
 
594
582
    def test_http_parameters(self):
595
583
        # the listable and is_readonly parameters
598
586
        # connect to '.' via http which is not listable
599
587
        server = HttpServer()
600
588
        self.start_server(server)
601
 
        transport = self.get_nfs_transport(server.get_url())
602
 
        self.assertIsInstance(
603
 
            transport, fakenfs.FakeNFSTransportDecorator)
604
 
        self.assertEqual(False, transport.listable())
605
 
        self.assertEqual(True, transport.is_readonly())
 
589
        t = self.get_nfs_transport(server.get_url())
 
590
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
591
        self.assertEqual(False, t.listable())
 
592
        self.assertEqual(True, t.is_readonly())
606
593
 
607
594
    def test_fakenfs_server_default(self):
608
595
        # a FakeNFSServer() should bring up a local relpath server for itself
611
598
        # the url should be decorated appropriately
612
599
        self.assertStartsWith(server.get_url(), 'fakenfs+')
613
600
        # and we should be able to get a transport for it
614
 
        transport = get_transport(server.get_url())
 
601
        t = transport.get_transport(server.get_url())
615
602
        # which must be a FakeNFSTransportDecorator instance.
616
 
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
603
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
617
604
 
618
605
    def test_fakenfs_rename_semantics(self):
619
606
        # a FakeNFS transport must mangle the way rename errors occur to
620
607
        # look like NFS problems.
621
 
        transport = self.get_nfs_transport('.')
 
608
        t = self.get_nfs_transport('.')
622
609
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
623
 
                        transport=transport)
624
 
        self.assertRaises(errors.ResourceBusy,
625
 
                          transport.rename, 'from', 'to')
626
 
 
627
 
 
628
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
610
                        transport=t)
 
611
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
612
 
 
613
 
 
614
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
629
615
    """Tests for simulation of VFAT restrictions"""
630
616
 
631
617
    def get_vfat_transport(self, url):
635
621
 
636
622
    def test_transport_creation(self):
637
623
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
638
 
        transport = self.get_vfat_transport('.')
639
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
624
        t = self.get_vfat_transport('.')
 
625
        self.assertIsInstance(t, FakeVFATTransportDecorator)
640
626
 
641
627
    def test_transport_mkdir(self):
642
 
        transport = self.get_vfat_transport('.')
643
 
        transport.mkdir('HELLO')
644
 
        self.assertTrue(transport.has('hello'))
645
 
        self.assertTrue(transport.has('Hello'))
 
628
        t = self.get_vfat_transport('.')
 
629
        t.mkdir('HELLO')
 
630
        self.assertTrue(t.has('hello'))
 
631
        self.assertTrue(t.has('Hello'))
646
632
 
647
633
    def test_forbidden_chars(self):
648
 
        transport = self.get_vfat_transport('.')
649
 
        self.assertRaises(ValueError, transport.has, "<NU>")
650
 
 
651
 
 
652
 
class BadTransportHandler(Transport):
 
634
        t = self.get_vfat_transport('.')
 
635
        self.assertRaises(ValueError, t.has, "<NU>")
 
636
 
 
637
 
 
638
class BadTransportHandler(transport.Transport):
653
639
    def __init__(self, base_url):
654
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
655
 
 
656
 
 
657
 
class BackupTransportHandler(Transport):
 
640
        raise errors.DependencyNotPresent('some_lib',
 
641
                                          'testing missing dependency')
 
642
 
 
643
 
 
644
class BackupTransportHandler(transport.Transport):
658
645
    """Test transport that works as a backup for the BadTransportHandler"""
659
646
    pass
660
647
 
661
648
 
662
 
class TestTransportImplementation(TestCaseInTempDir):
 
649
class TestTransportImplementation(tests.TestCaseInTempDir):
663
650
    """Implementation verification for transports.
664
651
 
665
652
    To verify a transport we need a server factory, which is a callable
694
681
        base_url = self._server.get_url()
695
682
        url = self._adjust_url(base_url, relpath)
696
683
        # try getting the transport via the regular interface:
697
 
        t = get_transport(url)
 
684
        t = transport.get_transport(url)
698
685
        # vila--20070607 if the following are commented out the test suite
699
686
        # still pass. Is this really still needed or was it a forgotten
700
687
        # temporary fix ?
705
692
        return t
706
693
 
707
694
 
708
 
class TestLocalTransports(TestCase):
 
695
class TestLocalTransports(tests.TestCase):
709
696
 
710
697
    def test_get_transport_from_abspath(self):
711
698
        here = osutils.abspath('.')
712
 
        t = get_transport(here)
713
 
        self.assertIsInstance(t, LocalTransport)
 
699
        t = transport.get_transport(here)
 
700
        self.assertIsInstance(t, local.LocalTransport)
714
701
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
715
702
 
716
703
    def test_get_transport_from_relpath(self):
717
704
        here = osutils.abspath('.')
718
 
        t = get_transport('.')
719
 
        self.assertIsInstance(t, LocalTransport)
 
705
        t = transport.get_transport('.')
 
706
        self.assertIsInstance(t, local.LocalTransport)
720
707
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
721
708
 
722
709
    def test_get_transport_from_local_url(self):
723
710
        here = osutils.abspath('.')
724
711
        here_url = urlutils.local_path_to_url(here) + '/'
725
 
        t = get_transport(here_url)
726
 
        self.assertIsInstance(t, LocalTransport)
 
712
        t = transport.get_transport(here_url)
 
713
        self.assertIsInstance(t, local.LocalTransport)
727
714
        self.assertEquals(t.base, here_url)
728
715
 
729
716
    def test_local_abspath(self):
730
717
        here = osutils.abspath('.')
731
 
        t = get_transport(here)
 
718
        t = transport.get_transport(here)
732
719
        self.assertEquals(t.local_abspath(''), here)
733
720
 
734
721
 
735
 
class TestWin32LocalTransport(TestCase):
 
722
class TestWin32LocalTransport(tests.TestCase):
736
723
 
737
724
    def test_unc_clone_to_root(self):
738
725
        # Win32 UNC path like \\HOST\path
739
726
        # clone to root should stop at least at \\HOST part
740
727
        # not on \\
741
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
728
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
742
729
        for i in xrange(4):
743
730
            t = t.clone('..')
744
731
        self.assertEquals(t.base, 'file://HOST/')
747
734
        self.assertEquals(t.base, 'file://HOST/')
748
735
 
749
736
 
750
 
class TestConnectedTransport(TestCase):
 
737
class TestConnectedTransport(tests.TestCase):
751
738
    """Tests for connected to remote server transports"""
752
739
 
753
740
    def test_parse_url(self):
754
 
        t = ConnectedTransport('http://simple.example.com/home/source')
 
741
        t = transport.ConnectedTransport(
 
742
            'http://simple.example.com/home/source')
755
743
        self.assertEquals(t._host, 'simple.example.com')
756
744
        self.assertEquals(t._port, None)
757
745
        self.assertEquals(t._path, '/home/source/')
762
750
 
763
751
    def test_parse_url_with_at_in_user(self):
764
752
        # Bug 228058
765
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
753
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
766
754
        self.assertEquals(t._user, 'user@host.com')
767
755
 
768
756
    def test_parse_quoted_url(self):
769
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
757
        t = transport.ConnectedTransport(
 
758
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
770
759
        self.assertEquals(t._host, 'exAmple.com')
771
760
        self.assertEquals(t._port, 2222)
772
761
        self.assertEquals(t._user, 'robey')
778
767
 
779
768
    def test_parse_invalid_url(self):
780
769
        self.assertRaises(errors.InvalidURL,
781
 
                          ConnectedTransport,
 
770
                          transport.ConnectedTransport,
782
771
                          'sftp://lily.org:~janneke/public/bzr/gub')
783
772
 
784
773
    def test_relpath(self):
785
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
774
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
786
775
 
787
776
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
788
777
        self.assertRaises(errors.PathNotChild, t.relpath,
794
783
        self.assertRaises(errors.PathNotChild, t.relpath,
795
784
                          'sftp://user@host.com:33/abs/path/sub')
796
785
        # Make sure it works when we don't supply a username
797
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
786
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
798
787
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
799
788
 
800
789
        # Make sure it works when parts of the path will be url encoded
801
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
790
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
802
791
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
803
792
 
804
793
    def test_connection_sharing_propagate_credentials(self):
805
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
794
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
806
795
        self.assertEquals('user', t._user)
807
796
        self.assertEquals('host.com', t._host)
808
797
        self.assertIs(None, t._get_connection())
829
818
        self.assertIs(new_password, c._get_credentials())
830
819
 
831
820
 
832
 
class TestReusedTransports(TestCase):
 
821
class TestReusedTransports(tests.TestCase):
833
822
    """Tests for transport reuse"""
834
823
 
835
824
    def test_reuse_same_transport(self):
836
825
        possible_transports = []
837
 
        t1 = get_transport('http://foo/',
838
 
                           possible_transports=possible_transports)
 
826
        t1 = transport.get_transport('http://foo/',
 
827
                                     possible_transports=possible_transports)
839
828
        self.assertEqual([t1], possible_transports)
840
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
829
        t2 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=[t1])
841
831
        self.assertIs(t1, t2)
842
832
 
843
833
        # Also check that final '/' are handled correctly
844
 
        t3 = get_transport('http://foo/path/')
845
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
834
        t3 = transport.get_transport('http://foo/path/')
 
835
        t4 = transport.get_transport('http://foo/path',
 
836
                                     possible_transports=[t3])
846
837
        self.assertIs(t3, t4)
847
838
 
848
 
        t5 = get_transport('http://foo/path')
849
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
839
        t5 = transport.get_transport('http://foo/path')
 
840
        t6 = transport.get_transport('http://foo/path/',
 
841
                                     possible_transports=[t5])
850
842
        self.assertIs(t5, t6)
851
843
 
852
844
    def test_don_t_reuse_different_transport(self):
853
 
        t1 = get_transport('http://foo/path')
854
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
845
        t1 = transport.get_transport('http://foo/path')
 
846
        t2 = transport.get_transport('http://bar/path',
 
847
                                     possible_transports=[t1])
855
848
        self.assertIsNot(t1, t2)
856
849
 
857
850
 
858
 
class TestTransportTrace(TestCase):
 
851
class TestTransportTrace(tests.TestCase):
859
852
 
860
853
    def test_get(self):
861
 
        transport = get_transport('trace+memory://')
862
 
        self.assertIsInstance(
863
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
854
        t = transport.get_transport('trace+memory://')
 
855
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
864
856
 
865
857
    def test_clone_preserves_activity(self):
866
 
        transport = get_transport('trace+memory://')
867
 
        transport2 = transport.clone('.')
868
 
        self.assertTrue(transport is not transport2)
869
 
        self.assertTrue(transport._activity is transport2._activity)
 
858
        t = transport.get_transport('trace+memory://')
 
859
        t2 = t.clone('.')
 
860
        self.assertTrue(t is not t2)
 
861
        self.assertTrue(t._activity is t2._activity)
870
862
 
871
863
    # the following specific tests are for the operations that have made use of
872
864
    # logging in tests; we could test every single operation but doing that
873
865
    # still won't cause a test failure when the top level Transport API
874
866
    # changes; so there is little return doing that.
875
867
    def test_get(self):
876
 
        transport = get_transport('trace+memory:///')
877
 
        transport.put_bytes('foo', 'barish')
878
 
        transport.get('foo')
 
868
        t = transport.get_transport('trace+memory:///')
 
869
        t.put_bytes('foo', 'barish')
 
870
        t.get('foo')
879
871
        expected_result = []
880
872
        # put_bytes records the bytes, not the content to avoid memory
881
873
        # pressure.
882
874
        expected_result.append(('put_bytes', 'foo', 6, None))
883
875
        # get records the file name only.
884
876
        expected_result.append(('get', 'foo'))
885
 
        self.assertEqual(expected_result, transport._activity)
 
877
        self.assertEqual(expected_result, t._activity)
886
878
 
887
879
    def test_readv(self):
888
 
        transport = get_transport('trace+memory:///')
889
 
        transport.put_bytes('foo', 'barish')
890
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
891
 
            upper_limit=6))
 
880
        t = transport.get_transport('trace+memory:///')
 
881
        t.put_bytes('foo', 'barish')
 
882
        list(t.readv('foo', [(0, 1), (3, 2)],
 
883
                     adjust_for_latency=True, upper_limit=6))
892
884
        expected_result = []
893
885
        # put_bytes records the bytes, not the content to avoid memory
894
886
        # pressure.
895
887
        expected_result.append(('put_bytes', 'foo', 6, None))
896
888
        # readv records the supplied offset request
897
889
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
898
 
        self.assertEqual(expected_result, transport._activity)
 
890
        self.assertEqual(expected_result, t._activity)
899
891
 
900
892
 
901
893
class TestSSHConnections(tests.TestCaseWithTransport):
914
906
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
915
907
        # override the interface (doesn't change self._vendor).
916
908
        # Note that this does encryption, so can be slow.
917
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
918
 
        from bzrlib.tests.stub_sftp import StubServer
 
909
        from bzrlib.tests import stub_sftp
919
910
 
920
911
        # Start an SSH server
921
912
        self.command_executed = []
924
915
        # SSH channel ourselves.  Surely this has already been implemented
925
916
        # elsewhere?
926
917
        started = []
927
 
        class StubSSHServer(StubServer):
 
918
        class StubSSHServer(stub_sftp.StubServer):
928
919
 
929
920
            test = self
930
921
 
958
949
 
959
950
                return True
960
951
 
961
 
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
952
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
962
953
        # We *don't* want to override the default SSH vendor: the detected one
963
954
        # is the one to use.
964
955
        self.start_server(ssh_server)
978
969
            # prefix a '/' to get the right path.
979
970
            path_to_branch = '/' + path_to_branch
980
971
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
981
 
        t = get_transport(url)
 
972
        t = transport.get_transport(url)
982
973
        self.permit_url(t.base)
983
974
        t.mkdir('foo')
984
975