~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Andrew Bennetts
  • Date: 2009-11-25 07:27:43 UTC
  • mto: This revision was merged to the branch mainline in revision 4825.
  • Revision ID: andrew.bennetts@canonical.com-20091125072743-v6sv4m2mkt9iyslp
Terminate SSHSubprocesses when no refs to them are left, in case .close is never called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
import sys
22
22
import threading
23
23
 
 
24
import bzrlib
24
25
from bzrlib import (
25
26
    errors,
26
27
    osutils,
27
28
    tests,
28
 
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.transport import (
32
 
    chroot,
33
 
    fakenfs,
34
 
    http,
35
 
    local,
36
 
    memory,
37
 
    pathfilter,
38
 
    readonly,
39
 
    )
40
 
from bzrlib.tests import (
41
 
    features,
42
 
    test_server,
43
 
    )
 
31
from bzrlib.errors import (DependencyNotPresent,
 
32
                           FileExists,
 
33
                           InvalidURLJoin,
 
34
                           NoSuchFile,
 
35
                           PathNotChild,
 
36
                           ReadError,
 
37
                           UnsupportedProtocol,
 
38
                           )
 
39
from bzrlib.tests import ParamikoFeature, TestCase, TestCaseInTempDir
 
40
from bzrlib.transport import (_clear_protocol_handlers,
 
41
                              _CoalescedOffset,
 
42
                              ConnectedTransport,
 
43
                              _get_protocol_handlers,
 
44
                              _set_protocol_handlers,
 
45
                              _get_transport_modules,
 
46
                              get_transport,
 
47
                              LateReadError,
 
48
                              register_lazy_transport,
 
49
                              register_transport_proto,
 
50
                              Transport,
 
51
                              )
 
52
from bzrlib.transport.chroot import ChrootServer
 
53
from bzrlib.transport.memory import MemoryTransport
 
54
from bzrlib.transport.local import (LocalTransport,
 
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
44
57
 
45
58
 
46
59
# TODO: Should possibly split transport-specific tests into their own files.
47
60
 
48
61
 
49
 
class TestTransport(tests.TestCase):
 
62
class TestTransport(TestCase):
50
63
    """Test the non transport-concrete class functionality."""
51
64
 
52
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
 
    # of try/finally -- vila 20100205
54
 
 
55
65
    def test__get_set_protocol_handlers(self):
56
 
        handlers = transport._get_protocol_handlers()
 
66
        handlers = _get_protocol_handlers()
57
67
        self.assertNotEqual([], handlers.keys( ))
58
68
        try:
59
 
            transport._clear_protocol_handlers()
60
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
69
            _clear_protocol_handlers()
 
70
            self.assertEqual([], _get_protocol_handlers().keys())
61
71
        finally:
62
 
            transport._set_protocol_handlers(handlers)
 
72
            _set_protocol_handlers(handlers)
63
73
 
64
74
    def test_get_transport_modules(self):
65
 
        handlers = transport._get_protocol_handlers()
 
75
        handlers = _get_protocol_handlers()
66
76
        # don't pollute the current handlers
67
 
        transport._clear_protocol_handlers()
 
77
        _clear_protocol_handlers()
68
78
        class SampleHandler(object):
69
79
            """I exist, isnt that enough?"""
70
80
        try:
71
 
            transport._clear_protocol_handlers()
72
 
            transport.register_transport_proto('foo')
73
 
            transport.register_lazy_transport('foo',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            transport.register_transport_proto('bar')
77
 
            transport.register_lazy_transport('bar',
78
 
                                              'bzrlib.tests.test_transport',
79
 
                                              'TestTransport.SampleHandler')
 
81
            _clear_protocol_handlers()
 
82
            register_transport_proto('foo')
 
83
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
84
                                    'TestTransport.SampleHandler')
 
85
            register_transport_proto('bar')
 
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
87
                                    'TestTransport.SampleHandler')
80
88
            self.assertEqual([SampleHandler.__module__,
81
89
                              'bzrlib.transport.chroot',
82
90
                              'bzrlib.transport.pathfilter'],
83
 
                             transport._get_transport_modules())
 
91
                             _get_transport_modules())
84
92
        finally:
85
 
            transport._set_protocol_handlers(handlers)
 
93
            _set_protocol_handlers(handlers)
86
94
 
87
95
    def test_transport_dependency(self):
88
96
        """Transport with missing dependency causes no error"""
89
 
        saved_handlers = transport._get_protocol_handlers()
 
97
        saved_handlers = _get_protocol_handlers()
90
98
        # don't pollute the current handlers
91
 
        transport._clear_protocol_handlers()
 
99
        _clear_protocol_handlers()
92
100
        try:
93
 
            transport.register_transport_proto('foo')
94
 
            transport.register_lazy_transport(
95
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
101
            register_transport_proto('foo')
 
102
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
103
                    'BadTransportHandler')
96
104
            try:
97
 
                transport.get_transport('foo://fooserver/foo')
98
 
            except errors.UnsupportedProtocol, e:
 
105
                get_transport('foo://fooserver/foo')
 
106
            except UnsupportedProtocol, e:
99
107
                e_str = str(e)
100
108
                self.assertEquals('Unsupported protocol'
101
109
                                  ' for url "foo://fooserver/foo":'
105
113
                self.fail('Did not raise UnsupportedProtocol')
106
114
        finally:
107
115
            # restore original values
108
 
            transport._set_protocol_handlers(saved_handlers)
 
116
            _set_protocol_handlers(saved_handlers)
109
117
 
110
118
    def test_transport_fallback(self):
111
119
        """Transport with missing dependency causes no error"""
112
 
        saved_handlers = transport._get_protocol_handlers()
 
120
        saved_handlers = _get_protocol_handlers()
113
121
        try:
114
 
            transport._clear_protocol_handlers()
115
 
            transport.register_transport_proto('foo')
116
 
            transport.register_lazy_transport(
117
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
 
            transport.register_lazy_transport(
119
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
 
            t = transport.get_transport('foo://fooserver/foo')
 
122
            _clear_protocol_handlers()
 
123
            register_transport_proto('foo')
 
124
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
125
                    'BackupTransportHandler')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BadTransportHandler')
 
128
            t = get_transport('foo://fooserver/foo')
121
129
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
130
        finally:
123
 
            transport._set_protocol_handlers(saved_handlers)
 
131
            _set_protocol_handlers(saved_handlers)
124
132
 
125
133
    def test_ssh_hints(self):
126
134
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
135
        try:
128
 
            transport.get_transport('ssh://fooserver/foo')
129
 
        except errors.UnsupportedProtocol, e:
 
136
            get_transport('ssh://fooserver/foo')
 
137
        except UnsupportedProtocol, e:
130
138
            e_str = str(e)
131
139
            self.assertEquals('Unsupported protocol'
132
140
                              ' for url "ssh://fooserver/foo":'
133
 
                              ' bzr supports bzr+ssh to operate over ssh,'
134
 
                              ' use "bzr+ssh://fooserver/foo".',
 
141
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
135
142
                              str(e))
136
143
        else:
137
144
            self.fail('Did not raise UnsupportedProtocol')
138
145
 
139
146
    def test_LateReadError(self):
140
147
        """The LateReadError helper should raise on read()."""
141
 
        a_file = transport.LateReadError('a path')
 
148
        a_file = LateReadError('a path')
142
149
        try:
143
150
            a_file.read()
144
 
        except errors.ReadError, error:
 
151
        except ReadError, error:
145
152
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
153
        self.assertRaises(ReadError, a_file.read, 40)
147
154
        a_file.close()
148
155
 
149
156
    def test__combine_paths(self):
150
 
        t = transport.Transport('/')
 
157
        t = Transport('/')
151
158
        self.assertEqual('/home/sarah/project/foo',
152
159
                         t._combine_paths('/home/sarah', 'project/foo'))
153
160
        self.assertEqual('/etc',
159
166
 
160
167
    def test_local_abspath_non_local_transport(self):
161
168
        # the base implementation should throw
162
 
        t = memory.MemoryTransport()
 
169
        t = MemoryTransport()
163
170
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
171
        self.assertEqual('memory:///t is not a local path.', str(e))
165
172
 
166
173
 
167
 
class TestCoalesceOffsets(tests.TestCase):
 
174
class TestCoalesceOffsets(TestCase):
168
175
 
169
176
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
 
        coalesce = transport.Transport._coalesce_offsets
171
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
177
        coalesce = Transport._coalesce_offsets
 
178
        exp = [_CoalescedOffset(*x) for x in expected]
172
179
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
180
                            max_size=max_size))
174
181
        self.assertEqual(exp, out)
249
256
                   max_size=1*1024*1024*1024)
250
257
 
251
258
 
252
 
class TestMemoryServer(tests.TestCase):
253
 
 
254
 
    def test_create_server(self):
255
 
        server = memory.MemoryServer()
256
 
        server.start_server()
257
 
        url = server.get_url()
258
 
        self.assertTrue(url in transport.transport_list_registry)
259
 
        t = transport.get_transport(url)
260
 
        del t
261
 
        server.stop_server()
262
 
        self.assertFalse(url in transport.transport_list_registry)
263
 
        self.assertRaises(errors.UnsupportedProtocol,
264
 
                          transport.get_transport, url)
265
 
 
266
 
 
267
 
class TestMemoryTransport(tests.TestCase):
 
259
class TestMemoryTransport(TestCase):
268
260
 
269
261
    def test_get_transport(self):
270
 
        memory.MemoryTransport()
 
262
        MemoryTransport()
271
263
 
272
264
    def test_clone(self):
273
 
        t = memory.MemoryTransport()
274
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
275
 
        self.assertEqual("memory:///", t.clone("/").base)
 
265
        transport = MemoryTransport()
 
266
        self.assertTrue(isinstance(transport, MemoryTransport))
 
267
        self.assertEqual("memory:///", transport.clone("/").base)
276
268
 
277
269
    def test_abspath(self):
278
 
        t = memory.MemoryTransport()
279
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
270
        transport = MemoryTransport()
 
271
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
280
272
 
281
273
    def test_abspath_of_root(self):
282
 
        t = memory.MemoryTransport()
283
 
        self.assertEqual("memory:///", t.base)
284
 
        self.assertEqual("memory:///", t.abspath('/'))
 
274
        transport = MemoryTransport()
 
275
        self.assertEqual("memory:///", transport.base)
 
276
        self.assertEqual("memory:///", transport.abspath('/'))
285
277
 
286
278
    def test_abspath_of_relpath_starting_at_root(self):
287
 
        t = memory.MemoryTransport()
288
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
279
        transport = MemoryTransport()
 
280
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
289
281
 
290
282
    def test_append_and_get(self):
291
 
        t = memory.MemoryTransport()
292
 
        t.append_bytes('path', 'content')
293
 
        self.assertEqual(t.get('path').read(), 'content')
294
 
        t.append_file('path', StringIO('content'))
295
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
283
        transport = MemoryTransport()
 
284
        transport.append_bytes('path', 'content')
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.append_file('path', StringIO('content'))
 
287
        self.assertEqual(transport.get('path').read(), 'contentcontent')
296
288
 
297
289
    def test_put_and_get(self):
298
 
        t = memory.MemoryTransport()
299
 
        t.put_file('path', StringIO('content'))
300
 
        self.assertEqual(t.get('path').read(), 'content')
301
 
        t.put_bytes('path', 'content')
302
 
        self.assertEqual(t.get('path').read(), 'content')
 
290
        transport = MemoryTransport()
 
291
        transport.put_file('path', StringIO('content'))
 
292
        self.assertEqual(transport.get('path').read(), 'content')
 
293
        transport.put_bytes('path', 'content')
 
294
        self.assertEqual(transport.get('path').read(), 'content')
303
295
 
304
296
    def test_append_without_dir_fails(self):
305
 
        t = memory.MemoryTransport()
306
 
        self.assertRaises(errors.NoSuchFile,
307
 
                          t.append_bytes, 'dir/path', 'content')
 
297
        transport = MemoryTransport()
 
298
        self.assertRaises(NoSuchFile,
 
299
                          transport.append_bytes, 'dir/path', 'content')
308
300
 
309
301
    def test_put_without_dir_fails(self):
310
 
        t = memory.MemoryTransport()
311
 
        self.assertRaises(errors.NoSuchFile,
312
 
                          t.put_file, 'dir/path', StringIO('content'))
 
302
        transport = MemoryTransport()
 
303
        self.assertRaises(NoSuchFile,
 
304
                          transport.put_file, 'dir/path', StringIO('content'))
313
305
 
314
306
    def test_get_missing(self):
315
 
        transport = memory.MemoryTransport()
316
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
307
        transport = MemoryTransport()
 
308
        self.assertRaises(NoSuchFile, transport.get, 'foo')
317
309
 
318
310
    def test_has_missing(self):
319
 
        t = memory.MemoryTransport()
320
 
        self.assertEquals(False, t.has('foo'))
 
311
        transport = MemoryTransport()
 
312
        self.assertEquals(False, transport.has('foo'))
321
313
 
322
314
    def test_has_present(self):
323
 
        t = memory.MemoryTransport()
324
 
        t.append_bytes('foo', 'content')
325
 
        self.assertEquals(True, t.has('foo'))
 
315
        transport = MemoryTransport()
 
316
        transport.append_bytes('foo', 'content')
 
317
        self.assertEquals(True, transport.has('foo'))
326
318
 
327
319
    def test_list_dir(self):
328
 
        t = memory.MemoryTransport()
329
 
        t.put_bytes('foo', 'content')
330
 
        t.mkdir('dir')
331
 
        t.put_bytes('dir/subfoo', 'content')
332
 
        t.put_bytes('dirlike', 'content')
 
320
        transport = MemoryTransport()
 
321
        transport.put_bytes('foo', 'content')
 
322
        transport.mkdir('dir')
 
323
        transport.put_bytes('dir/subfoo', 'content')
 
324
        transport.put_bytes('dirlike', 'content')
333
325
 
334
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
335
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
326
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
327
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
336
328
 
337
329
    def test_mkdir(self):
338
 
        t = memory.MemoryTransport()
339
 
        t.mkdir('dir')
340
 
        t.append_bytes('dir/path', 'content')
341
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
330
        transport = MemoryTransport()
 
331
        transport.mkdir('dir')
 
332
        transport.append_bytes('dir/path', 'content')
 
333
        self.assertEqual(transport.get('dir/path').read(), 'content')
342
334
 
343
335
    def test_mkdir_missing_parent(self):
344
 
        t = memory.MemoryTransport()
345
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
336
        transport = MemoryTransport()
 
337
        self.assertRaises(NoSuchFile,
 
338
                          transport.mkdir, 'dir/dir')
346
339
 
347
340
    def test_mkdir_twice(self):
348
 
        t = memory.MemoryTransport()
349
 
        t.mkdir('dir')
350
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
341
        transport = MemoryTransport()
 
342
        transport.mkdir('dir')
 
343
        self.assertRaises(FileExists, transport.mkdir, 'dir')
351
344
 
352
345
    def test_parameters(self):
353
 
        t = memory.MemoryTransport()
354
 
        self.assertEqual(True, t.listable())
355
 
        self.assertEqual(False, t.is_readonly())
 
346
        transport = MemoryTransport()
 
347
        self.assertEqual(True, transport.listable())
 
348
        self.assertEqual(False, transport.is_readonly())
356
349
 
357
350
    def test_iter_files_recursive(self):
358
 
        t = memory.MemoryTransport()
359
 
        t.mkdir('dir')
360
 
        t.put_bytes('dir/foo', 'content')
361
 
        t.put_bytes('dir/bar', 'content')
362
 
        t.put_bytes('bar', 'content')
363
 
        paths = set(t.iter_files_recursive())
 
351
        transport = MemoryTransport()
 
352
        transport.mkdir('dir')
 
353
        transport.put_bytes('dir/foo', 'content')
 
354
        transport.put_bytes('dir/bar', 'content')
 
355
        transport.put_bytes('bar', 'content')
 
356
        paths = set(transport.iter_files_recursive())
364
357
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
365
358
 
366
359
    def test_stat(self):
367
 
        t = memory.MemoryTransport()
368
 
        t.put_bytes('foo', 'content')
369
 
        t.put_bytes('bar', 'phowar')
370
 
        self.assertEqual(7, t.stat('foo').st_size)
371
 
        self.assertEqual(6, t.stat('bar').st_size)
372
 
 
373
 
 
374
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
360
        transport = MemoryTransport()
 
361
        transport.put_bytes('foo', 'content')
 
362
        transport.put_bytes('bar', 'phowar')
 
363
        self.assertEqual(7, transport.stat('foo').st_size)
 
364
        self.assertEqual(6, transport.stat('bar').st_size)
 
365
 
 
366
 
 
367
class ChrootDecoratorTransportTest(TestCase):
375
368
    """Chroot decoration specific tests."""
376
369
 
377
370
    def test_abspath(self):
378
371
        # The abspath is always relative to the chroot_url.
379
 
        server = chroot.ChrootServer(
380
 
            transport.get_transport('memory:///foo/bar/'))
 
372
        server = ChrootServer(get_transport('memory:///foo/bar/'))
381
373
        self.start_server(server)
382
 
        t = transport.get_transport(server.get_url())
383
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
374
        transport = get_transport(server.get_url())
 
375
        self.assertEqual(server.get_url(), transport.abspath('/'))
384
376
 
385
 
        subdir_t = t.clone('subdir')
386
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
377
        subdir_transport = transport.clone('subdir')
 
378
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
387
379
 
388
380
    def test_clone(self):
389
 
        server = chroot.ChrootServer(
390
 
            transport.get_transport('memory:///foo/bar/'))
 
381
        server = ChrootServer(get_transport('memory:///foo/bar/'))
391
382
        self.start_server(server)
392
 
        t = transport.get_transport(server.get_url())
 
383
        transport = get_transport(server.get_url())
393
384
        # relpath from root and root path are the same
394
 
        relpath_cloned = t.clone('foo')
395
 
        abspath_cloned = t.clone('/foo')
 
385
        relpath_cloned = transport.clone('foo')
 
386
        abspath_cloned = transport.clone('/foo')
396
387
        self.assertEqual(server, relpath_cloned.server)
397
388
        self.assertEqual(server, abspath_cloned.server)
398
389
 
404
395
        This is so that it is not possible to escape a chroot by doing::
405
396
            url = chroot_transport.base
406
397
            parent_url = urlutils.join(url, '..')
407
 
            new_t = transport.get_transport(parent_url)
 
398
            new_transport = get_transport(parent_url)
408
399
        """
409
 
        server = chroot.ChrootServer(
410
 
            transport.get_transport('memory:///path/subpath'))
 
400
        server = ChrootServer(get_transport('memory:///path/subpath'))
411
401
        self.start_server(server)
412
 
        t = transport.get_transport(server.get_url())
413
 
        new_t = transport.get_transport(t.base)
414
 
        self.assertEqual(t.server, new_t.server)
415
 
        self.assertEqual(t.base, new_t.base)
 
402
        transport = get_transport(server.get_url())
 
403
        new_transport = get_transport(transport.base)
 
404
        self.assertEqual(transport.server, new_transport.server)
 
405
        self.assertEqual(transport.base, new_transport.base)
416
406
 
417
407
    def test_urljoin_preserves_chroot(self):
418
408
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
421
411
        This is so that it is not possible to escape a chroot by doing::
422
412
            url = chroot_transport.base
423
413
            parent_url = urlutils.join(url, '..')
424
 
            new_t = transport.get_transport(parent_url)
 
414
            new_transport = get_transport(parent_url)
425
415
        """
426
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
416
        server = ChrootServer(get_transport('memory:///path/'))
427
417
        self.start_server(server)
428
 
        t = transport.get_transport(server.get_url())
 
418
        transport = get_transport(server.get_url())
429
419
        self.assertRaises(
430
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
431
 
 
432
 
 
433
 
class TestChrootServer(tests.TestCase):
 
420
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
421
 
 
422
 
 
423
class ChrootServerTest(TestCase):
434
424
 
435
425
    def test_construct(self):
436
 
        backing_transport = memory.MemoryTransport()
437
 
        server = chroot.ChrootServer(backing_transport)
 
426
        backing_transport = MemoryTransport()
 
427
        server = ChrootServer(backing_transport)
438
428
        self.assertEqual(backing_transport, server.backing_transport)
439
429
 
440
430
    def test_setUp(self):
441
 
        backing_transport = memory.MemoryTransport()
442
 
        server = chroot.ChrootServer(backing_transport)
443
 
        server.start_server()
 
431
        backing_transport = MemoryTransport()
 
432
        server = ChrootServer(backing_transport)
 
433
        server.setUp()
444
434
        try:
445
 
            self.assertTrue(server.scheme
446
 
                            in transport._get_protocol_handlers().keys())
 
435
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
447
436
        finally:
448
 
            server.stop_server()
 
437
            server.tearDown()
449
438
 
450
 
    def test_stop_server(self):
451
 
        backing_transport = memory.MemoryTransport()
452
 
        server = chroot.ChrootServer(backing_transport)
453
 
        server.start_server()
454
 
        server.stop_server()
455
 
        self.assertFalse(server.scheme
456
 
                         in transport._get_protocol_handlers().keys())
 
439
    def test_tearDown(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        server.tearDown()
 
444
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
457
445
 
458
446
    def test_get_url(self):
459
 
        backing_transport = memory.MemoryTransport()
460
 
        server = chroot.ChrootServer(backing_transport)
461
 
        server.start_server()
 
447
        backing_transport = MemoryTransport()
 
448
        server = ChrootServer(backing_transport)
 
449
        server.setUp()
462
450
        try:
463
451
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
452
        finally:
465
 
            server.stop_server()
466
 
 
467
 
 
468
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
453
            server.tearDown()
 
454
 
 
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
469
457
    """Pathfilter decoration specific tests."""
470
458
 
471
459
    def test_abspath(self):
472
460
        # The abspath is always relative to the base of the backing transport.
473
 
        server = pathfilter.PathFilteringServer(
474
 
            transport.get_transport('memory:///foo/bar/'),
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
475
462
            lambda x: x)
476
 
        server.start_server()
477
 
        t = transport.get_transport(server.get_url())
478
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
479
466
 
480
 
        subdir_t = t.clone('subdir')
481
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
482
 
        server.stop_server()
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
469
        server.tearDown()
483
470
 
484
471
    def make_pf_transport(self, filter_func=None):
485
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
488
475
            parameter to override it."""
489
476
        if filter_func is None:
490
477
            filter_func = lambda x: x
491
 
        server = pathfilter.PathFilteringServer(
492
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
493
 
        server.start_server()
494
 
        self.addCleanup(server.stop_server)
495
 
        return transport.get_transport(server.get_url())
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
496
483
 
497
484
    def test__filter(self):
498
485
        # _filter (with an identity func as filter_func) always returns
499
486
        # paths relative to the base of the backing transport.
500
 
        t = self.make_pf_transport()
501
 
        self.assertEqual('foo', t._filter('foo'))
502
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
503
 
        self.assertEqual('', t._filter('..'))
504
 
        self.assertEqual('', t._filter('/'))
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
505
492
        # The base of the pathfiltering transport is taken into account too.
506
 
        t = t.clone('subdir1/subdir2')
507
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
508
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
509
 
        self.assertEqual('subdir1', t._filter('..'))
510
 
        self.assertEqual('', t._filter('/'))
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
511
499
 
512
500
    def test_filter_invocation(self):
513
501
        filter_log = []
514
502
        def filter(path):
515
503
            filter_log.append(path)
516
504
            return path
517
 
        t = self.make_pf_transport(filter)
518
 
        t.has('abc')
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
519
507
        self.assertEqual(['abc'], filter_log)
520
508
        del filter_log[:]
521
 
        t.clone('abc').has('xyz')
 
509
        transport.clone('abc').has('xyz')
522
510
        self.assertEqual(['abc/xyz'], filter_log)
523
511
        del filter_log[:]
524
 
        t.has('/abc')
 
512
        transport.has('/abc')
525
513
        self.assertEqual(['abc'], filter_log)
526
514
 
527
515
    def test_clone(self):
528
 
        t = self.make_pf_transport()
 
516
        transport = self.make_pf_transport()
529
517
        # relpath from root and root path are the same
530
 
        relpath_cloned = t.clone('foo')
531
 
        abspath_cloned = t.clone('/foo')
532
 
        self.assertEqual(t.server, relpath_cloned.server)
533
 
        self.assertEqual(t.server, abspath_cloned.server)
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
534
522
 
535
523
    def test_url_preserves_pathfiltering(self):
536
524
        """Calling get_transport on a pathfiltered transport's base should
541
529
        otherwise) the filtering by doing::
542
530
            url = filtered_transport.base
543
531
            parent_url = urlutils.join(url, '..')
544
 
            new_t = transport.get_transport(parent_url)
 
532
            new_transport = get_transport(parent_url)
545
533
        """
546
 
        t = self.make_pf_transport()
547
 
        new_t = transport.get_transport(t.base)
548
 
        self.assertEqual(t.server, new_t.server)
549
 
        self.assertEqual(t.base, new_t.base)
550
 
 
551
 
 
552
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
 
539
 
 
540
class ReadonlyDecoratorTransportTest(TestCase):
553
541
    """Readonly decoration specific tests."""
554
542
 
555
543
    def test_local_parameters(self):
 
544
        import bzrlib.transport.readonly as readonly
556
545
        # connect to . in readonly mode
557
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
558
 
        self.assertEqual(True, t.listable())
559
 
        self.assertEqual(True, t.is_readonly())
 
546
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
547
        self.assertEqual(True, transport.listable())
 
548
        self.assertEqual(True, transport.is_readonly())
560
549
 
561
550
    def test_http_parameters(self):
562
551
        from bzrlib.tests.http_server import HttpServer
 
552
        import bzrlib.transport.readonly as readonly
563
553
        # connect to '.' via http which is not listable
564
554
        server = HttpServer()
565
555
        self.start_server(server)
566
 
        t = transport.get_transport('readonly+' + server.get_url())
567
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
568
 
        self.assertEqual(False, t.listable())
569
 
        self.assertEqual(True, t.is_readonly())
570
 
 
571
 
 
572
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
556
        transport = get_transport('readonly+' + server.get_url())
 
557
        self.failUnless(isinstance(transport,
 
558
                                   readonly.ReadonlyTransportDecorator))
 
559
        self.assertEqual(False, transport.listable())
 
560
        self.assertEqual(True, transport.is_readonly())
 
561
 
 
562
 
 
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
573
564
    """NFS decorator specific tests."""
574
565
 
575
566
    def get_nfs_transport(self, url):
 
567
        import bzrlib.transport.fakenfs as fakenfs
576
568
        # connect to url with nfs decoration
577
569
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
578
570
 
579
571
    def test_local_parameters(self):
580
572
        # the listable and is_readonly parameters
581
573
        # are not changed by the fakenfs decorator
582
 
        t = self.get_nfs_transport('.')
583
 
        self.assertEqual(True, t.listable())
584
 
        self.assertEqual(False, t.is_readonly())
 
574
        transport = self.get_nfs_transport('.')
 
575
        self.assertEqual(True, transport.listable())
 
576
        self.assertEqual(False, transport.is_readonly())
585
577
 
586
578
    def test_http_parameters(self):
587
579
        # the listable and is_readonly parameters
590
582
        # connect to '.' via http which is not listable
591
583
        server = HttpServer()
592
584
        self.start_server(server)
593
 
        t = self.get_nfs_transport(server.get_url())
594
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
595
 
        self.assertEqual(False, t.listable())
596
 
        self.assertEqual(True, t.is_readonly())
 
585
        transport = self.get_nfs_transport(server.get_url())
 
586
        self.assertIsInstance(
 
587
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
588
        self.assertEqual(False, transport.listable())
 
589
        self.assertEqual(True, transport.is_readonly())
597
590
 
598
591
    def test_fakenfs_server_default(self):
599
592
        # a FakeNFSServer() should bring up a local relpath server for itself
600
 
        server = test_server.FakeNFSServer()
 
593
        import bzrlib.transport.fakenfs as fakenfs
 
594
        server = fakenfs.FakeNFSServer()
601
595
        self.start_server(server)
602
596
        # the url should be decorated appropriately
603
597
        self.assertStartsWith(server.get_url(), 'fakenfs+')
604
598
        # and we should be able to get a transport for it
605
 
        t = transport.get_transport(server.get_url())
 
599
        transport = get_transport(server.get_url())
606
600
        # which must be a FakeNFSTransportDecorator instance.
607
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
601
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
608
602
 
609
603
    def test_fakenfs_rename_semantics(self):
610
604
        # a FakeNFS transport must mangle the way rename errors occur to
611
605
        # look like NFS problems.
612
 
        t = self.get_nfs_transport('.')
 
606
        transport = self.get_nfs_transport('.')
613
607
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
614
 
                        transport=t)
615
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
616
 
 
617
 
 
618
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
608
                        transport=transport)
 
609
        self.assertRaises(errors.ResourceBusy,
 
610
                          transport.rename, 'from', 'to')
 
611
 
 
612
 
 
613
class FakeVFATDecoratorTests(TestCaseInTempDir):
619
614
    """Tests for simulation of VFAT restrictions"""
620
615
 
621
616
    def get_vfat_transport(self, url):
625
620
 
626
621
    def test_transport_creation(self):
627
622
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
628
 
        t = self.get_vfat_transport('.')
629
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
623
        transport = self.get_vfat_transport('.')
 
624
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
630
625
 
631
626
    def test_transport_mkdir(self):
632
 
        t = self.get_vfat_transport('.')
633
 
        t.mkdir('HELLO')
634
 
        self.assertTrue(t.has('hello'))
635
 
        self.assertTrue(t.has('Hello'))
 
627
        transport = self.get_vfat_transport('.')
 
628
        transport.mkdir('HELLO')
 
629
        self.assertTrue(transport.has('hello'))
 
630
        self.assertTrue(transport.has('Hello'))
636
631
 
637
632
    def test_forbidden_chars(self):
638
 
        t = self.get_vfat_transport('.')
639
 
        self.assertRaises(ValueError, t.has, "<NU>")
640
 
 
641
 
 
642
 
class BadTransportHandler(transport.Transport):
 
633
        transport = self.get_vfat_transport('.')
 
634
        self.assertRaises(ValueError, transport.has, "<NU>")
 
635
 
 
636
 
 
637
class BadTransportHandler(Transport):
643
638
    def __init__(self, base_url):
644
 
        raise errors.DependencyNotPresent('some_lib',
645
 
                                          'testing missing dependency')
646
 
 
647
 
 
648
 
class BackupTransportHandler(transport.Transport):
 
639
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
640
 
 
641
 
 
642
class BackupTransportHandler(Transport):
649
643
    """Test transport that works as a backup for the BadTransportHandler"""
650
644
    pass
651
645
 
652
646
 
653
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
647
class TestTransportImplementation(TestCaseInTempDir):
654
648
    """Implementation verification for transports.
655
649
 
656
650
    To verify a transport we need a server factory, which is a callable
685
679
        base_url = self._server.get_url()
686
680
        url = self._adjust_url(base_url, relpath)
687
681
        # try getting the transport via the regular interface:
688
 
        t = transport.get_transport(url)
 
682
        t = get_transport(url)
689
683
        # vila--20070607 if the following are commented out the test suite
690
684
        # still pass. Is this really still needed or was it a forgotten
691
685
        # temporary fix ?
696
690
        return t
697
691
 
698
692
 
699
 
class TestLocalTransports(tests.TestCase):
 
693
class TestLocalTransports(TestCase):
700
694
 
701
695
    def test_get_transport_from_abspath(self):
702
696
        here = osutils.abspath('.')
703
 
        t = transport.get_transport(here)
704
 
        self.assertIsInstance(t, local.LocalTransport)
 
697
        t = get_transport(here)
 
698
        self.assertIsInstance(t, LocalTransport)
705
699
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
706
700
 
707
701
    def test_get_transport_from_relpath(self):
708
702
        here = osutils.abspath('.')
709
 
        t = transport.get_transport('.')
710
 
        self.assertIsInstance(t, local.LocalTransport)
 
703
        t = get_transport('.')
 
704
        self.assertIsInstance(t, LocalTransport)
711
705
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
712
706
 
713
707
    def test_get_transport_from_local_url(self):
714
708
        here = osutils.abspath('.')
715
709
        here_url = urlutils.local_path_to_url(here) + '/'
716
 
        t = transport.get_transport(here_url)
717
 
        self.assertIsInstance(t, local.LocalTransport)
 
710
        t = get_transport(here_url)
 
711
        self.assertIsInstance(t, LocalTransport)
718
712
        self.assertEquals(t.base, here_url)
719
713
 
720
714
    def test_local_abspath(self):
721
715
        here = osutils.abspath('.')
722
 
        t = transport.get_transport(here)
 
716
        t = get_transport(here)
723
717
        self.assertEquals(t.local_abspath(''), here)
724
718
 
725
719
 
726
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
727
 
 
728
 
    def test_local_fdatasync_calls_fdatasync(self):
729
 
        """Check fdatasync on a stream tries to flush the data to the OS.
730
 
        
731
 
        We can't easily observe the external effect but we can at least see
732
 
        it's called.
733
 
        """
734
 
        t = self.get_transport('.')
735
 
        calls = self.recordCalls(os, 'fdatasync')
736
 
        w = t.open_write_stream('out')
737
 
        w.write('foo')
738
 
        w.fdatasync()
739
 
        with open('out', 'rb') as f:
740
 
            # Should have been flushed.
741
 
            self.assertEquals(f.read(), 'foo')
742
 
        self.assertEquals(len(calls), 1, calls)
743
 
 
744
 
 
745
 
class TestWin32LocalTransport(tests.TestCase):
 
720
class TestWin32LocalTransport(TestCase):
746
721
 
747
722
    def test_unc_clone_to_root(self):
748
723
        # Win32 UNC path like \\HOST\path
749
724
        # clone to root should stop at least at \\HOST part
750
725
        # not on \\
751
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
726
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
752
727
        for i in xrange(4):
753
728
            t = t.clone('..')
754
729
        self.assertEquals(t.base, 'file://HOST/')
757
732
        self.assertEquals(t.base, 'file://HOST/')
758
733
 
759
734
 
760
 
class TestConnectedTransport(tests.TestCase):
 
735
class TestConnectedTransport(TestCase):
761
736
    """Tests for connected to remote server transports"""
762
737
 
763
738
    def test_parse_url(self):
764
 
        t = transport.ConnectedTransport(
765
 
            'http://simple.example.com/home/source')
 
739
        t = ConnectedTransport('http://simple.example.com/home/source')
766
740
        self.assertEquals(t._host, 'simple.example.com')
767
741
        self.assertEquals(t._port, None)
768
742
        self.assertEquals(t._path, '/home/source/')
769
 
        self.assertTrue(t._user is None)
770
 
        self.assertTrue(t._password is None)
 
743
        self.failUnless(t._user is None)
 
744
        self.failUnless(t._password is None)
771
745
 
772
746
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
773
747
 
774
748
    def test_parse_url_with_at_in_user(self):
775
749
        # Bug 228058
776
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
750
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
777
751
        self.assertEquals(t._user, 'user@host.com')
778
752
 
779
753
    def test_parse_quoted_url(self):
780
 
        t = transport.ConnectedTransport(
781
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
754
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
782
755
        self.assertEquals(t._host, 'exAmple.com')
783
756
        self.assertEquals(t._port, 2222)
784
757
        self.assertEquals(t._user, 'robey')
790
763
 
791
764
    def test_parse_invalid_url(self):
792
765
        self.assertRaises(errors.InvalidURL,
793
 
                          transport.ConnectedTransport,
 
766
                          ConnectedTransport,
794
767
                          'sftp://lily.org:~janneke/public/bzr/gub')
795
768
 
796
769
    def test_relpath(self):
797
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
770
        t = ConnectedTransport('sftp://user@host.com/abs/path')
798
771
 
799
772
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
800
773
        self.assertRaises(errors.PathNotChild, t.relpath,
806
779
        self.assertRaises(errors.PathNotChild, t.relpath,
807
780
                          'sftp://user@host.com:33/abs/path/sub')
808
781
        # Make sure it works when we don't supply a username
809
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
782
        t = ConnectedTransport('sftp://host.com/abs/path')
810
783
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
811
784
 
812
785
        # Make sure it works when parts of the path will be url encoded
813
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
786
        t = ConnectedTransport('sftp://host.com/dev/%path')
814
787
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
815
788
 
816
789
    def test_connection_sharing_propagate_credentials(self):
817
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
790
        t = ConnectedTransport('ftp://user@host.com/abs/path')
818
791
        self.assertEquals('user', t._user)
819
792
        self.assertEquals('host.com', t._host)
820
793
        self.assertIs(None, t._get_connection())
841
814
        self.assertIs(new_password, c._get_credentials())
842
815
 
843
816
 
844
 
class TestReusedTransports(tests.TestCase):
 
817
class TestReusedTransports(TestCase):
845
818
    """Tests for transport reuse"""
846
819
 
847
820
    def test_reuse_same_transport(self):
848
821
        possible_transports = []
849
 
        t1 = transport.get_transport('http://foo/',
850
 
                                     possible_transports=possible_transports)
 
822
        t1 = get_transport('http://foo/',
 
823
                           possible_transports=possible_transports)
851
824
        self.assertEqual([t1], possible_transports)
852
 
        t2 = transport.get_transport('http://foo/',
853
 
                                     possible_transports=[t1])
 
825
        t2 = get_transport('http://foo/', possible_transports=[t1])
854
826
        self.assertIs(t1, t2)
855
827
 
856
828
        # Also check that final '/' are handled correctly
857
 
        t3 = transport.get_transport('http://foo/path/')
858
 
        t4 = transport.get_transport('http://foo/path',
859
 
                                     possible_transports=[t3])
 
829
        t3 = get_transport('http://foo/path/')
 
830
        t4 = get_transport('http://foo/path', possible_transports=[t3])
860
831
        self.assertIs(t3, t4)
861
832
 
862
 
        t5 = transport.get_transport('http://foo/path')
863
 
        t6 = transport.get_transport('http://foo/path/',
864
 
                                     possible_transports=[t5])
 
833
        t5 = get_transport('http://foo/path')
 
834
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
865
835
        self.assertIs(t5, t6)
866
836
 
867
837
    def test_don_t_reuse_different_transport(self):
868
 
        t1 = transport.get_transport('http://foo/path')
869
 
        t2 = transport.get_transport('http://bar/path',
870
 
                                     possible_transports=[t1])
 
838
        t1 = get_transport('http://foo/path')
 
839
        t2 = get_transport('http://bar/path', possible_transports=[t1])
871
840
        self.assertIsNot(t1, t2)
872
841
 
873
842
 
874
 
class TestTransportTrace(tests.TestCase):
 
843
class TestTransportTrace(TestCase):
875
844
 
876
845
    def test_get(self):
877
 
        t = transport.get_transport('trace+memory://')
878
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
846
        transport = get_transport('trace+memory://')
 
847
        self.assertIsInstance(
 
848
            transport, bzrlib.transport.trace.TransportTraceDecorator)
879
849
 
880
850
    def test_clone_preserves_activity(self):
881
 
        t = transport.get_transport('trace+memory://')
882
 
        t2 = t.clone('.')
883
 
        self.assertTrue(t is not t2)
884
 
        self.assertTrue(t._activity is t2._activity)
 
851
        transport = get_transport('trace+memory://')
 
852
        transport2 = transport.clone('.')
 
853
        self.assertTrue(transport is not transport2)
 
854
        self.assertTrue(transport._activity is transport2._activity)
885
855
 
886
856
    # the following specific tests are for the operations that have made use of
887
857
    # logging in tests; we could test every single operation but doing that
888
858
    # still won't cause a test failure when the top level Transport API
889
859
    # changes; so there is little return doing that.
890
860
    def test_get(self):
891
 
        t = transport.get_transport('trace+memory:///')
892
 
        t.put_bytes('foo', 'barish')
893
 
        t.get('foo')
 
861
        transport = get_transport('trace+memory:///')
 
862
        transport.put_bytes('foo', 'barish')
 
863
        transport.get('foo')
894
864
        expected_result = []
895
865
        # put_bytes records the bytes, not the content to avoid memory
896
866
        # pressure.
897
867
        expected_result.append(('put_bytes', 'foo', 6, None))
898
868
        # get records the file name only.
899
869
        expected_result.append(('get', 'foo'))
900
 
        self.assertEqual(expected_result, t._activity)
 
870
        self.assertEqual(expected_result, transport._activity)
901
871
 
902
872
    def test_readv(self):
903
 
        t = transport.get_transport('trace+memory:///')
904
 
        t.put_bytes('foo', 'barish')
905
 
        list(t.readv('foo', [(0, 1), (3, 2)],
906
 
                     adjust_for_latency=True, upper_limit=6))
 
873
        transport = get_transport('trace+memory:///')
 
874
        transport.put_bytes('foo', 'barish')
 
875
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
876
            upper_limit=6))
907
877
        expected_result = []
908
878
        # put_bytes records the bytes, not the content to avoid memory
909
879
        # pressure.
910
880
        expected_result.append(('put_bytes', 'foo', 6, None))
911
881
        # readv records the supplied offset request
912
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
913
 
        self.assertEqual(expected_result, t._activity)
 
883
        self.assertEqual(expected_result, transport._activity)
914
884
 
915
885
 
916
886
class TestSSHConnections(tests.TestCaseWithTransport):
925
895
        # A reasonable evolution for this would be to simply check inside
926
896
        # check_channel_exec_request that the command is appropriate, and then
927
897
        # satisfy requests in-process.
928
 
        self.requireFeature(features.paramiko)
 
898
        self.requireFeature(ParamikoFeature)
929
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
930
900
        # override the interface (doesn't change self._vendor).
931
901
        # Note that this does encryption, so can be slow.
932
 
        from bzrlib.tests import stub_sftp
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
933
904
 
934
905
        # Start an SSH server
935
906
        self.command_executed = []
937
908
        # executes commands, and manage the hooking up of stdin/out/err to the
938
909
        # SSH channel ourselves.  Surely this has already been implemented
939
910
        # elsewhere?
940
 
        started = []
941
 
        class StubSSHServer(stub_sftp.StubServer):
 
911
        class StubSSHServer(StubServer):
942
912
 
943
913
            test = self
944
914
 
963
933
                    (channel.recv, proc.stdin.write, proc.stdin.close),
964
934
                    (proc.stdout.read, channel.sendall, channel.close),
965
935
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
966
 
                started.append(proc)
967
936
                for read, write, close in file_functions:
968
937
                    t = threading.Thread(
969
938
                        target=ferry_bytes, args=(read, write, close))
970
939
                    t.start()
971
 
                    started.append(t)
972
940
 
973
941
                return True
974
942
 
975
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
943
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
976
944
        # We *don't* want to override the default SSH vendor: the detected one
977
945
        # is the one to use.
978
 
 
979
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
980
 
        # inherits from SFTPServer which forces the SSH vendor to
981
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
982
946
        self.start_server(ssh_server)
983
 
        port = ssh_server.port
 
947
        port = ssh_server._listener.port
984
948
 
985
949
        if sys.platform == 'win32':
986
950
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
987
951
        else:
988
952
            bzr_remote_path = self.get_bzr_path()
989
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
953
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
990
954
 
991
955
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
992
956
        # variable is used to tell bzr what command to run on the remote end.
996
960
            # prefix a '/' to get the right path.
997
961
            path_to_branch = '/' + path_to_branch
998
962
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
999
 
        t = transport.get_transport(url)
 
963
        t = get_transport(url)
1000
964
        self.permit_url(t.base)
1001
965
        t.mkdir('foo')
1002
966
 
1003
967
        self.assertEqual(
1004
968
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1005
969
            self.command_executed)
1006
 
        # Make sure to disconnect, so that the remote process can stop, and we
1007
 
        # can cleanup. Then pause the test until everything is shutdown
1008
 
        t._client._medium.disconnect()
1009
 
        if not started:
1010
 
            return
1011
 
        # First wait for the subprocess
1012
 
        started[0].wait()
1013
 
        # And the rest are threads
1014
 
        for t in started[1:]:
1015
 
            t.join()
1016
 
 
1017
 
 
1018
 
class TestUnhtml(tests.TestCase):
1019
 
 
1020
 
    """Tests for unhtml_roughly"""
1021
 
 
1022
 
    def test_truncation(self):
1023
 
        fake_html = "<p>something!\n" * 1000
1024
 
        result = http.unhtml_roughly(fake_html)
1025
 
        self.assertEquals(len(result), 1000)
1026
 
        self.assertStartsWith(result, " something!")