~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-01-07 17:02:44 UTC
  • mfrom: (4934.1.14 2.1.0rc1-set-mtime)
  • Revision ID: pqm@pqm.ubuntu.com-20100107170244-3cgdapvuokgf8l42
(jam,
        gz) (bug #488724) Set the mtime of files touched in a TreeTransform.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
import sys
22
22
import threading
23
23
 
 
24
import bzrlib
24
25
from bzrlib import (
25
26
    errors,
26
27
    osutils,
27
28
    tests,
28
 
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.transport import (
32
 
    chroot,
33
 
    fakenfs,
34
 
    local,
35
 
    memory,
36
 
    pathfilter,
37
 
    readonly,
38
 
    )
39
 
from bzrlib.tests import (
40
 
    features,
41
 
    test_server,
42
 
    )
 
31
from bzrlib.errors import (DependencyNotPresent,
 
32
                           FileExists,
 
33
                           InvalidURLJoin,
 
34
                           NoSuchFile,
 
35
                           PathNotChild,
 
36
                           ReadError,
 
37
                           UnsupportedProtocol,
 
38
                           )
 
39
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
40
from bzrlib.transport import (_clear_protocol_handlers,
 
41
                              _CoalescedOffset,
 
42
                              ConnectedTransport,
 
43
                              _get_protocol_handlers,
 
44
                              _set_protocol_handlers,
 
45
                              _get_transport_modules,
 
46
                              get_transport,
 
47
                              LateReadError,
 
48
                              register_lazy_transport,
 
49
                              register_transport_proto,
 
50
                              Transport,
 
51
                              )
 
52
from bzrlib.transport.chroot import ChrootServer
 
53
from bzrlib.transport.memory import MemoryTransport
 
54
from bzrlib.transport.local import (LocalTransport,
 
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
43
57
 
44
58
 
45
59
# TODO: Should possibly split transport-specific tests into their own files.
46
60
 
47
61
 
48
 
class TestTransport(tests.TestCase):
 
62
class TestTransport(TestCase):
49
63
    """Test the non transport-concrete class functionality."""
50
64
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
65
    def test__get_set_protocol_handlers(self):
55
 
        handlers = transport._get_protocol_handlers()
 
66
        handlers = _get_protocol_handlers()
56
67
        self.assertNotEqual([], handlers.keys( ))
57
68
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
69
            _clear_protocol_handlers()
 
70
            self.assertEqual([], _get_protocol_handlers().keys())
60
71
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
72
            _set_protocol_handlers(handlers)
62
73
 
63
74
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
 
75
        handlers = _get_protocol_handlers()
65
76
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
 
77
        _clear_protocol_handlers()
67
78
        class SampleHandler(object):
68
79
            """I exist, isnt that enough?"""
69
80
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
 
81
            _clear_protocol_handlers()
 
82
            register_transport_proto('foo')
 
83
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
84
                                    'TestTransport.SampleHandler')
 
85
            register_transport_proto('bar')
 
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
87
                                    'TestTransport.SampleHandler')
79
88
            self.assertEqual([SampleHandler.__module__,
80
89
                              'bzrlib.transport.chroot',
81
90
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
 
91
                             _get_transport_modules())
83
92
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
93
            _set_protocol_handlers(handlers)
85
94
 
86
95
    def test_transport_dependency(self):
87
96
        """Transport with missing dependency causes no error"""
88
 
        saved_handlers = transport._get_protocol_handlers()
 
97
        saved_handlers = _get_protocol_handlers()
89
98
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
 
99
        _clear_protocol_handlers()
91
100
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
101
            register_transport_proto('foo')
 
102
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
103
                    'BadTransportHandler')
95
104
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
 
105
                get_transport('foo://fooserver/foo')
 
106
            except UnsupportedProtocol, e:
98
107
                e_str = str(e)
99
108
                self.assertEquals('Unsupported protocol'
100
109
                                  ' for url "foo://fooserver/foo":'
104
113
                self.fail('Did not raise UnsupportedProtocol')
105
114
        finally:
106
115
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
116
            _set_protocol_handlers(saved_handlers)
108
117
 
109
118
    def test_transport_fallback(self):
110
119
        """Transport with missing dependency causes no error"""
111
 
        saved_handlers = transport._get_protocol_handlers()
 
120
        saved_handlers = _get_protocol_handlers()
112
121
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
 
122
            _clear_protocol_handlers()
 
123
            register_transport_proto('foo')
 
124
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
125
                    'BackupTransportHandler')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BadTransportHandler')
 
128
            t = get_transport('foo://fooserver/foo')
120
129
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
130
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
131
            _set_protocol_handlers(saved_handlers)
123
132
 
124
133
    def test_ssh_hints(self):
125
134
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
135
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
136
            get_transport('ssh://fooserver/foo')
 
137
        except UnsupportedProtocol, e:
129
138
            e_str = str(e)
130
139
            self.assertEquals('Unsupported protocol'
131
140
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
 
141
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
134
142
                              str(e))
135
143
        else:
136
144
            self.fail('Did not raise UnsupportedProtocol')
137
145
 
138
146
    def test_LateReadError(self):
139
147
        """The LateReadError helper should raise on read()."""
140
 
        a_file = transport.LateReadError('a path')
 
148
        a_file = LateReadError('a path')
141
149
        try:
142
150
            a_file.read()
143
 
        except errors.ReadError, error:
 
151
        except ReadError, error:
144
152
            self.assertEqual('a path', error.path)
145
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
153
        self.assertRaises(ReadError, a_file.read, 40)
146
154
        a_file.close()
147
155
 
148
156
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
 
157
        t = Transport('/')
150
158
        self.assertEqual('/home/sarah/project/foo',
151
159
                         t._combine_paths('/home/sarah', 'project/foo'))
152
160
        self.assertEqual('/etc',
158
166
 
159
167
    def test_local_abspath_non_local_transport(self):
160
168
        # the base implementation should throw
161
 
        t = memory.MemoryTransport()
 
169
        t = MemoryTransport()
162
170
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
163
171
        self.assertEqual('memory:///t is not a local path.', str(e))
164
172
 
165
173
 
166
 
class TestCoalesceOffsets(tests.TestCase):
 
174
class TestCoalesceOffsets(TestCase):
167
175
 
168
176
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
169
 
        coalesce = transport.Transport._coalesce_offsets
170
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
177
        coalesce = Transport._coalesce_offsets
 
178
        exp = [_CoalescedOffset(*x) for x in expected]
171
179
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
172
180
                            max_size=max_size))
173
181
        self.assertEqual(exp, out)
248
256
                   max_size=1*1024*1024*1024)
249
257
 
250
258
 
251
 
class TestMemoryServer(tests.TestCase):
252
 
 
253
 
    def test_create_server(self):
254
 
        server = memory.MemoryServer()
255
 
        server.start_server()
256
 
        url = server.get_url()
257
 
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
259
 
        del t
260
 
        server.stop_server()
261
 
        self.assertFalse(url in transport.transport_list_registry)
262
 
        self.assertRaises(errors.UnsupportedProtocol,
263
 
                          transport.get_transport, url)
264
 
 
265
 
 
266
 
class TestMemoryTransport(tests.TestCase):
 
259
class TestMemoryTransport(TestCase):
267
260
 
268
261
    def test_get_transport(self):
269
 
        memory.MemoryTransport()
 
262
        MemoryTransport()
270
263
 
271
264
    def test_clone(self):
272
 
        t = memory.MemoryTransport()
273
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
274
 
        self.assertEqual("memory:///", t.clone("/").base)
 
265
        transport = MemoryTransport()
 
266
        self.assertTrue(isinstance(transport, MemoryTransport))
 
267
        self.assertEqual("memory:///", transport.clone("/").base)
275
268
 
276
269
    def test_abspath(self):
277
 
        t = memory.MemoryTransport()
278
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
270
        transport = MemoryTransport()
 
271
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
279
272
 
280
273
    def test_abspath_of_root(self):
281
 
        t = memory.MemoryTransport()
282
 
        self.assertEqual("memory:///", t.base)
283
 
        self.assertEqual("memory:///", t.abspath('/'))
 
274
        transport = MemoryTransport()
 
275
        self.assertEqual("memory:///", transport.base)
 
276
        self.assertEqual("memory:///", transport.abspath('/'))
284
277
 
285
278
    def test_abspath_of_relpath_starting_at_root(self):
286
 
        t = memory.MemoryTransport()
287
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
279
        transport = MemoryTransport()
 
280
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
288
281
 
289
282
    def test_append_and_get(self):
290
 
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
283
        transport = MemoryTransport()
 
284
        transport.append_bytes('path', 'content')
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.append_file('path', StringIO('content'))
 
287
        self.assertEqual(transport.get('path').read(), 'contentcontent')
295
288
 
296
289
    def test_put_and_get(self):
297
 
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
290
        transport = MemoryTransport()
 
291
        transport.put_file('path', StringIO('content'))
 
292
        self.assertEqual(transport.get('path').read(), 'content')
 
293
        transport.put_bytes('path', 'content')
 
294
        self.assertEqual(transport.get('path').read(), 'content')
302
295
 
303
296
    def test_append_without_dir_fails(self):
304
 
        t = memory.MemoryTransport()
305
 
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
297
        transport = MemoryTransport()
 
298
        self.assertRaises(NoSuchFile,
 
299
                          transport.append_bytes, 'dir/path', 'content')
307
300
 
308
301
    def test_put_without_dir_fails(self):
309
 
        t = memory.MemoryTransport()
310
 
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
302
        transport = MemoryTransport()
 
303
        self.assertRaises(NoSuchFile,
 
304
                          transport.put_file, 'dir/path', StringIO('content'))
312
305
 
313
306
    def test_get_missing(self):
314
 
        transport = memory.MemoryTransport()
315
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
307
        transport = MemoryTransport()
 
308
        self.assertRaises(NoSuchFile, transport.get, 'foo')
316
309
 
317
310
    def test_has_missing(self):
318
 
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
311
        transport = MemoryTransport()
 
312
        self.assertEquals(False, transport.has('foo'))
320
313
 
321
314
    def test_has_present(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
315
        transport = MemoryTransport()
 
316
        transport.append_bytes('foo', 'content')
 
317
        self.assertEquals(True, transport.has('foo'))
325
318
 
326
319
    def test_list_dir(self):
327
 
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
329
 
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
320
        transport = MemoryTransport()
 
321
        transport.put_bytes('foo', 'content')
 
322
        transport.mkdir('dir')
 
323
        transport.put_bytes('dir/subfoo', 'content')
 
324
        transport.put_bytes('dirlike', 'content')
332
325
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
326
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
327
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
335
328
 
336
329
    def test_mkdir(self):
337
 
        t = memory.MemoryTransport()
338
 
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
330
        transport = MemoryTransport()
 
331
        transport.mkdir('dir')
 
332
        transport.append_bytes('dir/path', 'content')
 
333
        self.assertEqual(transport.get('dir/path').read(), 'content')
341
334
 
342
335
    def test_mkdir_missing_parent(self):
343
 
        t = memory.MemoryTransport()
344
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
336
        transport = MemoryTransport()
 
337
        self.assertRaises(NoSuchFile,
 
338
                          transport.mkdir, 'dir/dir')
345
339
 
346
340
    def test_mkdir_twice(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.mkdir('dir')
349
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
341
        transport = MemoryTransport()
 
342
        transport.mkdir('dir')
 
343
        self.assertRaises(FileExists, transport.mkdir, 'dir')
350
344
 
351
345
    def test_parameters(self):
352
 
        t = memory.MemoryTransport()
353
 
        self.assertEqual(True, t.listable())
354
 
        self.assertEqual(False, t.is_readonly())
 
346
        transport = MemoryTransport()
 
347
        self.assertEqual(True, transport.listable())
 
348
        self.assertEqual(False, transport.is_readonly())
355
349
 
356
350
    def test_iter_files_recursive(self):
357
 
        t = memory.MemoryTransport()
358
 
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
362
 
        paths = set(t.iter_files_recursive())
 
351
        transport = MemoryTransport()
 
352
        transport.mkdir('dir')
 
353
        transport.put_bytes('dir/foo', 'content')
 
354
        transport.put_bytes('dir/bar', 'content')
 
355
        transport.put_bytes('bar', 'content')
 
356
        paths = set(transport.iter_files_recursive())
363
357
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
364
358
 
365
359
    def test_stat(self):
366
 
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
369
 
        self.assertEqual(7, t.stat('foo').st_size)
370
 
        self.assertEqual(6, t.stat('bar').st_size)
371
 
 
372
 
 
373
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
360
        transport = MemoryTransport()
 
361
        transport.put_bytes('foo', 'content')
 
362
        transport.put_bytes('bar', 'phowar')
 
363
        self.assertEqual(7, transport.stat('foo').st_size)
 
364
        self.assertEqual(6, transport.stat('bar').st_size)
 
365
 
 
366
 
 
367
class ChrootDecoratorTransportTest(TestCase):
374
368
    """Chroot decoration specific tests."""
375
369
 
376
370
    def test_abspath(self):
377
371
        # The abspath is always relative to the chroot_url.
378
 
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
372
        server = ChrootServer(get_transport('memory:///foo/bar/'))
380
373
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
382
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
374
        transport = get_transport(server.get_url())
 
375
        self.assertEqual(server.get_url(), transport.abspath('/'))
383
376
 
384
 
        subdir_t = t.clone('subdir')
385
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
377
        subdir_transport = transport.clone('subdir')
 
378
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
386
379
 
387
380
    def test_clone(self):
388
 
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
381
        server = ChrootServer(get_transport('memory:///foo/bar/'))
390
382
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
383
        transport = get_transport(server.get_url())
392
384
        # relpath from root and root path are the same
393
 
        relpath_cloned = t.clone('foo')
394
 
        abspath_cloned = t.clone('/foo')
 
385
        relpath_cloned = transport.clone('foo')
 
386
        abspath_cloned = transport.clone('/foo')
395
387
        self.assertEqual(server, relpath_cloned.server)
396
388
        self.assertEqual(server, abspath_cloned.server)
397
389
 
403
395
        This is so that it is not possible to escape a chroot by doing::
404
396
            url = chroot_transport.base
405
397
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
398
            new_transport = get_transport(parent_url)
407
399
        """
408
 
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
400
        server = ChrootServer(get_transport('memory:///path/subpath'))
410
401
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
413
 
        self.assertEqual(t.server, new_t.server)
414
 
        self.assertEqual(t.base, new_t.base)
 
402
        transport = get_transport(server.get_url())
 
403
        new_transport = get_transport(transport.base)
 
404
        self.assertEqual(transport.server, new_transport.server)
 
405
        self.assertEqual(transport.base, new_transport.base)
415
406
 
416
407
    def test_urljoin_preserves_chroot(self):
417
408
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
420
411
        This is so that it is not possible to escape a chroot by doing::
421
412
            url = chroot_transport.base
422
413
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
414
            new_transport = get_transport(parent_url)
424
415
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
416
        server = ChrootServer(get_transport('memory:///path/'))
426
417
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
418
        transport = get_transport(server.get_url())
428
419
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
 
 
431
 
 
432
 
class TestChrootServer(tests.TestCase):
 
420
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
421
 
 
422
 
 
423
class ChrootServerTest(TestCase):
433
424
 
434
425
    def test_construct(self):
435
 
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
 
426
        backing_transport = MemoryTransport()
 
427
        server = ChrootServer(backing_transport)
437
428
        self.assertEqual(backing_transport, server.backing_transport)
438
429
 
439
430
    def test_setUp(self):
440
 
        backing_transport = memory.MemoryTransport()
441
 
        server = chroot.ChrootServer(backing_transport)
442
 
        server.start_server()
 
431
        backing_transport = MemoryTransport()
 
432
        server = ChrootServer(backing_transport)
 
433
        server.setUp()
443
434
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
 
435
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
446
436
        finally:
447
 
            server.stop_server()
 
437
            server.tearDown()
448
438
 
449
 
    def test_stop_server(self):
450
 
        backing_transport = memory.MemoryTransport()
451
 
        server = chroot.ChrootServer(backing_transport)
452
 
        server.start_server()
453
 
        server.stop_server()
454
 
        self.assertFalse(server.scheme
455
 
                         in transport._get_protocol_handlers().keys())
 
439
    def test_tearDown(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        server.tearDown()
 
444
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
456
445
 
457
446
    def test_get_url(self):
458
 
        backing_transport = memory.MemoryTransport()
459
 
        server = chroot.ChrootServer(backing_transport)
460
 
        server.start_server()
 
447
        backing_transport = MemoryTransport()
 
448
        server = ChrootServer(backing_transport)
 
449
        server.setUp()
461
450
        try:
462
451
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
452
        finally:
464
 
            server.stop_server()
465
 
 
466
 
 
467
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
453
            server.tearDown()
 
454
 
 
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
468
457
    """Pathfilter decoration specific tests."""
469
458
 
470
459
    def test_abspath(self):
471
460
        # The abspath is always relative to the base of the backing transport.
472
 
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
474
462
            lambda x: x)
475
 
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
477
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
478
466
 
479
 
        subdir_t = t.clone('subdir')
480
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
481
 
        server.stop_server()
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
469
        server.tearDown()
482
470
 
483
471
    def make_pf_transport(self, filter_func=None):
484
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
487
475
            parameter to override it."""
488
476
        if filter_func is None:
489
477
            filter_func = lambda x: x
490
 
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
492
 
        server.start_server()
493
 
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
495
483
 
496
484
    def test__filter(self):
497
485
        # _filter (with an identity func as filter_func) always returns
498
486
        # paths relative to the base of the backing transport.
499
 
        t = self.make_pf_transport()
500
 
        self.assertEqual('foo', t._filter('foo'))
501
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
502
 
        self.assertEqual('', t._filter('..'))
503
 
        self.assertEqual('', t._filter('/'))
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
504
492
        # The base of the pathfiltering transport is taken into account too.
505
 
        t = t.clone('subdir1/subdir2')
506
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
507
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
508
 
        self.assertEqual('subdir1', t._filter('..'))
509
 
        self.assertEqual('', t._filter('/'))
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
510
499
 
511
500
    def test_filter_invocation(self):
512
501
        filter_log = []
513
502
        def filter(path):
514
503
            filter_log.append(path)
515
504
            return path
516
 
        t = self.make_pf_transport(filter)
517
 
        t.has('abc')
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
518
507
        self.assertEqual(['abc'], filter_log)
519
508
        del filter_log[:]
520
 
        t.clone('abc').has('xyz')
 
509
        transport.clone('abc').has('xyz')
521
510
        self.assertEqual(['abc/xyz'], filter_log)
522
511
        del filter_log[:]
523
 
        t.has('/abc')
 
512
        transport.has('/abc')
524
513
        self.assertEqual(['abc'], filter_log)
525
514
 
526
515
    def test_clone(self):
527
 
        t = self.make_pf_transport()
 
516
        transport = self.make_pf_transport()
528
517
        # relpath from root and root path are the same
529
 
        relpath_cloned = t.clone('foo')
530
 
        abspath_cloned = t.clone('/foo')
531
 
        self.assertEqual(t.server, relpath_cloned.server)
532
 
        self.assertEqual(t.server, abspath_cloned.server)
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
533
522
 
534
523
    def test_url_preserves_pathfiltering(self):
535
524
        """Calling get_transport on a pathfiltered transport's base should
540
529
        otherwise) the filtering by doing::
541
530
            url = filtered_transport.base
542
531
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
532
            new_transport = get_transport(parent_url)
544
533
        """
545
 
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
547
 
        self.assertEqual(t.server, new_t.server)
548
 
        self.assertEqual(t.base, new_t.base)
549
 
 
550
 
 
551
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
 
539
 
 
540
class ReadonlyDecoratorTransportTest(TestCase):
552
541
    """Readonly decoration specific tests."""
553
542
 
554
543
    def test_local_parameters(self):
 
544
        import bzrlib.transport.readonly as readonly
555
545
        # connect to . in readonly mode
556
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
557
 
        self.assertEqual(True, t.listable())
558
 
        self.assertEqual(True, t.is_readonly())
 
546
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
547
        self.assertEqual(True, transport.listable())
 
548
        self.assertEqual(True, transport.is_readonly())
559
549
 
560
550
    def test_http_parameters(self):
561
551
        from bzrlib.tests.http_server import HttpServer
 
552
        import bzrlib.transport.readonly as readonly
562
553
        # connect to '.' via http which is not listable
563
554
        server = HttpServer()
564
555
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
567
 
        self.assertEqual(False, t.listable())
568
 
        self.assertEqual(True, t.is_readonly())
569
 
 
570
 
 
571
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
556
        transport = get_transport('readonly+' + server.get_url())
 
557
        self.failUnless(isinstance(transport,
 
558
                                   readonly.ReadonlyTransportDecorator))
 
559
        self.assertEqual(False, transport.listable())
 
560
        self.assertEqual(True, transport.is_readonly())
 
561
 
 
562
 
 
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
572
564
    """NFS decorator specific tests."""
573
565
 
574
566
    def get_nfs_transport(self, url):
 
567
        import bzrlib.transport.fakenfs as fakenfs
575
568
        # connect to url with nfs decoration
576
569
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
577
570
 
578
571
    def test_local_parameters(self):
579
572
        # the listable and is_readonly parameters
580
573
        # are not changed by the fakenfs decorator
581
 
        t = self.get_nfs_transport('.')
582
 
        self.assertEqual(True, t.listable())
583
 
        self.assertEqual(False, t.is_readonly())
 
574
        transport = self.get_nfs_transport('.')
 
575
        self.assertEqual(True, transport.listable())
 
576
        self.assertEqual(False, transport.is_readonly())
584
577
 
585
578
    def test_http_parameters(self):
586
579
        # the listable and is_readonly parameters
589
582
        # connect to '.' via http which is not listable
590
583
        server = HttpServer()
591
584
        self.start_server(server)
592
 
        t = self.get_nfs_transport(server.get_url())
593
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
594
 
        self.assertEqual(False, t.listable())
595
 
        self.assertEqual(True, t.is_readonly())
 
585
        transport = self.get_nfs_transport(server.get_url())
 
586
        self.assertIsInstance(
 
587
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
588
        self.assertEqual(False, transport.listable())
 
589
        self.assertEqual(True, transport.is_readonly())
596
590
 
597
591
    def test_fakenfs_server_default(self):
598
592
        # a FakeNFSServer() should bring up a local relpath server for itself
599
 
        server = test_server.FakeNFSServer()
 
593
        import bzrlib.transport.fakenfs as fakenfs
 
594
        server = fakenfs.FakeNFSServer()
600
595
        self.start_server(server)
601
596
        # the url should be decorated appropriately
602
597
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
598
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
599
        transport = get_transport(server.get_url())
605
600
        # which must be a FakeNFSTransportDecorator instance.
606
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
601
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
607
602
 
608
603
    def test_fakenfs_rename_semantics(self):
609
604
        # a FakeNFS transport must mangle the way rename errors occur to
610
605
        # look like NFS problems.
611
 
        t = self.get_nfs_transport('.')
 
606
        transport = self.get_nfs_transport('.')
612
607
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
613
 
                        transport=t)
614
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
615
 
 
616
 
 
617
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
608
                        transport=transport)
 
609
        self.assertRaises(errors.ResourceBusy,
 
610
                          transport.rename, 'from', 'to')
 
611
 
 
612
 
 
613
class FakeVFATDecoratorTests(TestCaseInTempDir):
618
614
    """Tests for simulation of VFAT restrictions"""
619
615
 
620
616
    def get_vfat_transport(self, url):
624
620
 
625
621
    def test_transport_creation(self):
626
622
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
627
 
        t = self.get_vfat_transport('.')
628
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
623
        transport = self.get_vfat_transport('.')
 
624
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
629
625
 
630
626
    def test_transport_mkdir(self):
631
 
        t = self.get_vfat_transport('.')
632
 
        t.mkdir('HELLO')
633
 
        self.assertTrue(t.has('hello'))
634
 
        self.assertTrue(t.has('Hello'))
 
627
        transport = self.get_vfat_transport('.')
 
628
        transport.mkdir('HELLO')
 
629
        self.assertTrue(transport.has('hello'))
 
630
        self.assertTrue(transport.has('Hello'))
635
631
 
636
632
    def test_forbidden_chars(self):
637
 
        t = self.get_vfat_transport('.')
638
 
        self.assertRaises(ValueError, t.has, "<NU>")
639
 
 
640
 
 
641
 
class BadTransportHandler(transport.Transport):
 
633
        transport = self.get_vfat_transport('.')
 
634
        self.assertRaises(ValueError, transport.has, "<NU>")
 
635
 
 
636
 
 
637
class BadTransportHandler(Transport):
642
638
    def __init__(self, base_url):
643
 
        raise errors.DependencyNotPresent('some_lib',
644
 
                                          'testing missing dependency')
645
 
 
646
 
 
647
 
class BackupTransportHandler(transport.Transport):
 
639
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
640
 
 
641
 
 
642
class BackupTransportHandler(Transport):
648
643
    """Test transport that works as a backup for the BadTransportHandler"""
649
644
    pass
650
645
 
651
646
 
652
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
647
class TestTransportImplementation(TestCaseInTempDir):
653
648
    """Implementation verification for transports.
654
649
 
655
650
    To verify a transport we need a server factory, which is a callable
684
679
        base_url = self._server.get_url()
685
680
        url = self._adjust_url(base_url, relpath)
686
681
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
682
        t = get_transport(url)
688
683
        # vila--20070607 if the following are commented out the test suite
689
684
        # still pass. Is this really still needed or was it a forgotten
690
685
        # temporary fix ?
695
690
        return t
696
691
 
697
692
 
698
 
class TestLocalTransports(tests.TestCase):
 
693
class TestLocalTransports(TestCase):
699
694
 
700
695
    def test_get_transport_from_abspath(self):
701
696
        here = osutils.abspath('.')
702
 
        t = transport.get_transport(here)
703
 
        self.assertIsInstance(t, local.LocalTransport)
 
697
        t = get_transport(here)
 
698
        self.assertIsInstance(t, LocalTransport)
704
699
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
705
700
 
706
701
    def test_get_transport_from_relpath(self):
707
702
        here = osutils.abspath('.')
708
 
        t = transport.get_transport('.')
709
 
        self.assertIsInstance(t, local.LocalTransport)
 
703
        t = get_transport('.')
 
704
        self.assertIsInstance(t, LocalTransport)
710
705
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
711
706
 
712
707
    def test_get_transport_from_local_url(self):
713
708
        here = osutils.abspath('.')
714
709
        here_url = urlutils.local_path_to_url(here) + '/'
715
 
        t = transport.get_transport(here_url)
716
 
        self.assertIsInstance(t, local.LocalTransport)
 
710
        t = get_transport(here_url)
 
711
        self.assertIsInstance(t, LocalTransport)
717
712
        self.assertEquals(t.base, here_url)
718
713
 
719
714
    def test_local_abspath(self):
720
715
        here = osutils.abspath('.')
721
 
        t = transport.get_transport(here)
 
716
        t = get_transport(here)
722
717
        self.assertEquals(t.local_abspath(''), here)
723
718
 
724
719
 
725
 
class TestWin32LocalTransport(tests.TestCase):
 
720
class TestWin32LocalTransport(TestCase):
726
721
 
727
722
    def test_unc_clone_to_root(self):
728
723
        # Win32 UNC path like \\HOST\path
729
724
        # clone to root should stop at least at \\HOST part
730
725
        # not on \\
731
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
726
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
727
        for i in xrange(4):
733
728
            t = t.clone('..')
734
729
        self.assertEquals(t.base, 'file://HOST/')
737
732
        self.assertEquals(t.base, 'file://HOST/')
738
733
 
739
734
 
740
 
class TestConnectedTransport(tests.TestCase):
 
735
class TestConnectedTransport(TestCase):
741
736
    """Tests for connected to remote server transports"""
742
737
 
743
738
    def test_parse_url(self):
744
 
        t = transport.ConnectedTransport(
745
 
            'http://simple.example.com/home/source')
 
739
        t = ConnectedTransport('http://simple.example.com/home/source')
746
740
        self.assertEquals(t._host, 'simple.example.com')
747
741
        self.assertEquals(t._port, None)
748
742
        self.assertEquals(t._path, '/home/source/')
753
747
 
754
748
    def test_parse_url_with_at_in_user(self):
755
749
        # Bug 228058
756
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
750
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
757
751
        self.assertEquals(t._user, 'user@host.com')
758
752
 
759
753
    def test_parse_quoted_url(self):
760
 
        t = transport.ConnectedTransport(
761
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
754
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
755
        self.assertEquals(t._host, 'exAmple.com')
763
756
        self.assertEquals(t._port, 2222)
764
757
        self.assertEquals(t._user, 'robey')
770
763
 
771
764
    def test_parse_invalid_url(self):
772
765
        self.assertRaises(errors.InvalidURL,
773
 
                          transport.ConnectedTransport,
 
766
                          ConnectedTransport,
774
767
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
768
 
776
769
    def test_relpath(self):
777
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
770
        t = ConnectedTransport('sftp://user@host.com/abs/path')
778
771
 
779
772
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
780
773
        self.assertRaises(errors.PathNotChild, t.relpath,
786
779
        self.assertRaises(errors.PathNotChild, t.relpath,
787
780
                          'sftp://user@host.com:33/abs/path/sub')
788
781
        # Make sure it works when we don't supply a username
789
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
782
        t = ConnectedTransport('sftp://host.com/abs/path')
790
783
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
784
 
792
785
        # Make sure it works when parts of the path will be url encoded
793
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
786
        t = ConnectedTransport('sftp://host.com/dev/%path')
794
787
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
788
 
796
789
    def test_connection_sharing_propagate_credentials(self):
797
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
790
        t = ConnectedTransport('ftp://user@host.com/abs/path')
798
791
        self.assertEquals('user', t._user)
799
792
        self.assertEquals('host.com', t._host)
800
793
        self.assertIs(None, t._get_connection())
821
814
        self.assertIs(new_password, c._get_credentials())
822
815
 
823
816
 
824
 
class TestReusedTransports(tests.TestCase):
 
817
class TestReusedTransports(TestCase):
825
818
    """Tests for transport reuse"""
826
819
 
827
820
    def test_reuse_same_transport(self):
828
821
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
822
        t1 = get_transport('http://foo/',
 
823
                           possible_transports=possible_transports)
831
824
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
825
        t2 = get_transport('http://foo/', possible_transports=[t1])
834
826
        self.assertIs(t1, t2)
835
827
 
836
828
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
829
        t3 = get_transport('http://foo/path/')
 
830
        t4 = get_transport('http://foo/path', possible_transports=[t3])
840
831
        self.assertIs(t3, t4)
841
832
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
833
        t5 = get_transport('http://foo/path')
 
834
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
845
835
        self.assertIs(t5, t6)
846
836
 
847
837
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
838
        t1 = get_transport('http://foo/path')
 
839
        t2 = get_transport('http://bar/path', possible_transports=[t1])
851
840
        self.assertIsNot(t1, t2)
852
841
 
853
842
 
854
 
class TestTransportTrace(tests.TestCase):
 
843
class TestTransportTrace(TestCase):
855
844
 
856
845
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
846
        transport = get_transport('trace+memory://')
 
847
        self.assertIsInstance(
 
848
            transport, bzrlib.transport.trace.TransportTraceDecorator)
859
849
 
860
850
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
862
 
        t2 = t.clone('.')
863
 
        self.assertTrue(t is not t2)
864
 
        self.assertTrue(t._activity is t2._activity)
 
851
        transport = get_transport('trace+memory://')
 
852
        transport2 = transport.clone('.')
 
853
        self.assertTrue(transport is not transport2)
 
854
        self.assertTrue(transport._activity is transport2._activity)
865
855
 
866
856
    # the following specific tests are for the operations that have made use of
867
857
    # logging in tests; we could test every single operation but doing that
868
858
    # still won't cause a test failure when the top level Transport API
869
859
    # changes; so there is little return doing that.
870
860
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
873
 
        t.get('foo')
 
861
        transport = get_transport('trace+memory:///')
 
862
        transport.put_bytes('foo', 'barish')
 
863
        transport.get('foo')
874
864
        expected_result = []
875
865
        # put_bytes records the bytes, not the content to avoid memory
876
866
        # pressure.
877
867
        expected_result.append(('put_bytes', 'foo', 6, None))
878
868
        # get records the file name only.
879
869
        expected_result.append(('get', 'foo'))
880
 
        self.assertEqual(expected_result, t._activity)
 
870
        self.assertEqual(expected_result, transport._activity)
881
871
 
882
872
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
885
 
        list(t.readv('foo', [(0, 1), (3, 2)],
886
 
                     adjust_for_latency=True, upper_limit=6))
 
873
        transport = get_transport('trace+memory:///')
 
874
        transport.put_bytes('foo', 'barish')
 
875
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
876
            upper_limit=6))
887
877
        expected_result = []
888
878
        # put_bytes records the bytes, not the content to avoid memory
889
879
        # pressure.
890
880
        expected_result.append(('put_bytes', 'foo', 6, None))
891
881
        # readv records the supplied offset request
892
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
893
 
        self.assertEqual(expected_result, t._activity)
 
883
        self.assertEqual(expected_result, transport._activity)
894
884
 
895
885
 
896
886
class TestSSHConnections(tests.TestCaseWithTransport):
909
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
900
        # override the interface (doesn't change self._vendor).
911
901
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
913
904
 
914
905
        # Start an SSH server
915
906
        self.command_executed = []
918
909
        # SSH channel ourselves.  Surely this has already been implemented
919
910
        # elsewhere?
920
911
        started = []
921
 
        class StubSSHServer(stub_sftp.StubServer):
 
912
        class StubSSHServer(StubServer):
922
913
 
923
914
            test = self
924
915
 
952
943
 
953
944
                return True
954
945
 
955
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
946
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
956
947
        # We *don't* want to override the default SSH vendor: the detected one
957
948
        # is the one to use.
958
949
        self.start_server(ssh_server)
972
963
            # prefix a '/' to get the right path.
973
964
            path_to_branch = '/' + path_to_branch
974
965
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
975
 
        t = transport.get_transport(url)
 
966
        t = get_transport(url)
976
967
        self.permit_url(t.base)
977
968
        t.mkdir('foo')
978
969