~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-11 04:02:41 UTC
  • mfrom: (5017.2.2 tariff)
  • Revision ID: pqm@pqm.ubuntu.com-20100211040241-w6n021dz0uus341n
(mbp) add import-tariff tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
 
import bzrlib
21
24
from bzrlib import (
22
25
    errors,
23
26
    osutils,
 
27
    tests,
 
28
    transport,
24
29
    urlutils,
25
30
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
31
from bzrlib.transport import (
 
32
    chroot,
 
33
    fakenfs,
 
34
    local,
 
35
    memory,
 
36
    pathfilter,
 
37
    readonly,
 
38
    )
 
39
from bzrlib.tests import features
51
40
 
52
41
 
53
42
# TODO: Should possibly split transport-specific tests into their own files.
54
43
 
55
44
 
56
 
class TestTransport(TestCase):
 
45
class TestTransport(tests.TestCase):
57
46
    """Test the non transport-concrete class functionality."""
58
47
 
 
48
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
49
    # of try/finally -- vila 20100205
 
50
 
59
51
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
 
52
        handlers = transport._get_protocol_handlers()
61
53
        self.assertNotEqual([], handlers.keys( ))
62
54
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
55
            transport._clear_protocol_handlers()
 
56
            self.assertEqual([], transport._get_protocol_handlers().keys())
65
57
        finally:
66
 
            _set_protocol_handlers(handlers)
 
58
            transport._set_protocol_handlers(handlers)
67
59
 
68
60
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
61
        handlers = transport._get_protocol_handlers()
70
62
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
63
        transport._clear_protocol_handlers()
72
64
        class SampleHandler(object):
73
65
            """I exist, isnt that enough?"""
74
66
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
 
67
            transport._clear_protocol_handlers()
 
68
            transport.register_transport_proto('foo')
 
69
            transport.register_lazy_transport('foo',
 
70
                                              'bzrlib.tests.test_transport',
 
71
                                              'TestTransport.SampleHandler')
 
72
            transport.register_transport_proto('bar')
 
73
            transport.register_lazy_transport('bar',
 
74
                                              'bzrlib.tests.test_transport',
 
75
                                              'TestTransport.SampleHandler')
82
76
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
 
77
                              'bzrlib.transport.chroot',
 
78
                              'bzrlib.transport.pathfilter'],
 
79
                             transport._get_transport_modules())
85
80
        finally:
86
 
            _set_protocol_handlers(handlers)
 
81
            transport._set_protocol_handlers(handlers)
87
82
 
88
83
    def test_transport_dependency(self):
89
84
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
85
        saved_handlers = transport._get_protocol_handlers()
91
86
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
87
        transport._clear_protocol_handlers()
93
88
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
 
89
            transport.register_transport_proto('foo')
 
90
            transport.register_lazy_transport(
 
91
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
97
92
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
 
93
                transport.get_transport('foo://fooserver/foo')
 
94
            except errors.UnsupportedProtocol, e:
100
95
                e_str = str(e)
101
96
                self.assertEquals('Unsupported protocol'
102
97
                                  ' for url "foo://fooserver/foo":'
106
101
                self.fail('Did not raise UnsupportedProtocol')
107
102
        finally:
108
103
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
 
104
            transport._set_protocol_handlers(saved_handlers)
110
105
 
111
106
    def test_transport_fallback(self):
112
107
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
 
108
        saved_handlers = transport._get_protocol_handlers()
114
109
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
 
110
            transport._clear_protocol_handlers()
 
111
            transport.register_transport_proto('foo')
 
112
            transport.register_lazy_transport(
 
113
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
114
            transport.register_lazy_transport(
 
115
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
116
            t = transport.get_transport('foo://fooserver/foo')
122
117
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
118
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
119
            transport._set_protocol_handlers(saved_handlers)
125
120
 
126
121
    def test_ssh_hints(self):
127
122
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
123
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
 
124
            transport.get_transport('ssh://fooserver/foo')
 
125
        except errors.UnsupportedProtocol, e:
131
126
            e_str = str(e)
132
127
            self.assertEquals('Unsupported protocol'
133
128
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
129
                              ' bzr supports bzr+ssh to operate over ssh,'
 
130
                              ' use "bzr+ssh://fooserver/foo".',
135
131
                              str(e))
136
132
        else:
137
133
            self.fail('Did not raise UnsupportedProtocol')
138
134
 
139
135
    def test_LateReadError(self):
140
136
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
 
137
        a_file = transport.LateReadError('a path')
142
138
        try:
143
139
            a_file.read()
144
 
        except ReadError, error:
 
140
        except errors.ReadError, error:
145
141
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
 
142
        self.assertRaises(errors.ReadError, a_file.read, 40)
147
143
        a_file.close()
148
144
 
149
145
    def test__combine_paths(self):
150
 
        t = Transport('/')
 
146
        t = transport.Transport('/')
151
147
        self.assertEqual('/home/sarah/project/foo',
152
148
                         t._combine_paths('/home/sarah', 'project/foo'))
153
149
        self.assertEqual('/etc',
159
155
 
160
156
    def test_local_abspath_non_local_transport(self):
161
157
        # the base implementation should throw
162
 
        t = MemoryTransport()
 
158
        t = memory.MemoryTransport()
163
159
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
160
        self.assertEqual('memory:///t is not a local path.', str(e))
165
161
 
166
162
 
167
 
class TestCoalesceOffsets(TestCase):
 
163
class TestCoalesceOffsets(tests.TestCase):
168
164
 
169
165
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
 
        coalesce = Transport._coalesce_offsets
171
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
166
        coalesce = transport.Transport._coalesce_offsets
 
167
        exp = [transport._CoalescedOffset(*x) for x in expected]
172
168
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
169
                            max_size=max_size))
174
170
        self.assertEqual(exp, out)
249
245
                   max_size=1*1024*1024*1024)
250
246
 
251
247
 
252
 
class TestMemoryTransport(TestCase):
 
248
class TestMemoryServer(tests.TestCase):
 
249
 
 
250
    def test_create_server(self):
 
251
        server = memory.MemoryServer()
 
252
        server.start_server()
 
253
        url = server.get_url()
 
254
        self.assertTrue(url in transport.transport_list_registry)
 
255
        t = transport.get_transport(url)
 
256
        del t
 
257
        server.stop_server()
 
258
        self.assertFalse(url in transport.transport_list_registry)
 
259
        self.assertRaises(errors.UnsupportedProtocol,
 
260
                          transport.get_transport, url)
 
261
 
 
262
 
 
263
class TestMemoryTransport(tests.TestCase):
253
264
 
254
265
    def test_get_transport(self):
255
 
        MemoryTransport()
 
266
        memory.MemoryTransport()
256
267
 
257
268
    def test_clone(self):
258
 
        transport = MemoryTransport()
259
 
        self.assertTrue(isinstance(transport, MemoryTransport))
260
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
269
        t = memory.MemoryTransport()
 
270
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
271
        self.assertEqual("memory:///", t.clone("/").base)
261
272
 
262
273
    def test_abspath(self):
263
 
        transport = MemoryTransport()
264
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
274
        t = memory.MemoryTransport()
 
275
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
265
276
 
266
277
    def test_abspath_of_root(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual("memory:///", transport.base)
269
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
278
        t = memory.MemoryTransport()
 
279
        self.assertEqual("memory:///", t.base)
 
280
        self.assertEqual("memory:///", t.abspath('/'))
270
281
 
271
282
    def test_abspath_of_relpath_starting_at_root(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
283
        t = memory.MemoryTransport()
 
284
        self.assertEqual("memory:///foo", t.abspath('/foo'))
274
285
 
275
286
    def test_append_and_get(self):
276
 
        transport = MemoryTransport()
277
 
        transport.append_bytes('path', 'content')
278
 
        self.assertEqual(transport.get('path').read(), 'content')
279
 
        transport.append_file('path', StringIO('content'))
280
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
287
        t = memory.MemoryTransport()
 
288
        t.append_bytes('path', 'content')
 
289
        self.assertEqual(t.get('path').read(), 'content')
 
290
        t.append_file('path', StringIO('content'))
 
291
        self.assertEqual(t.get('path').read(), 'contentcontent')
281
292
 
282
293
    def test_put_and_get(self):
283
 
        transport = MemoryTransport()
284
 
        transport.put_file('path', StringIO('content'))
285
 
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.put_bytes('path', 'content')
287
 
        self.assertEqual(transport.get('path').read(), 'content')
 
294
        t = memory.MemoryTransport()
 
295
        t.put_file('path', StringIO('content'))
 
296
        self.assertEqual(t.get('path').read(), 'content')
 
297
        t.put_bytes('path', 'content')
 
298
        self.assertEqual(t.get('path').read(), 'content')
288
299
 
289
300
    def test_append_without_dir_fails(self):
290
 
        transport = MemoryTransport()
291
 
        self.assertRaises(NoSuchFile,
292
 
                          transport.append_bytes, 'dir/path', 'content')
 
301
        t = memory.MemoryTransport()
 
302
        self.assertRaises(errors.NoSuchFile,
 
303
                          t.append_bytes, 'dir/path', 'content')
293
304
 
294
305
    def test_put_without_dir_fails(self):
295
 
        transport = MemoryTransport()
296
 
        self.assertRaises(NoSuchFile,
297
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
306
        t = memory.MemoryTransport()
 
307
        self.assertRaises(errors.NoSuchFile,
 
308
                          t.put_file, 'dir/path', StringIO('content'))
298
309
 
299
310
    def test_get_missing(self):
300
 
        transport = MemoryTransport()
301
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
311
        transport = memory.MemoryTransport()
 
312
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
302
313
 
303
314
    def test_has_missing(self):
304
 
        transport = MemoryTransport()
305
 
        self.assertEquals(False, transport.has('foo'))
 
315
        t = memory.MemoryTransport()
 
316
        self.assertEquals(False, t.has('foo'))
306
317
 
307
318
    def test_has_present(self):
308
 
        transport = MemoryTransport()
309
 
        transport.append_bytes('foo', 'content')
310
 
        self.assertEquals(True, transport.has('foo'))
 
319
        t = memory.MemoryTransport()
 
320
        t.append_bytes('foo', 'content')
 
321
        self.assertEquals(True, t.has('foo'))
311
322
 
312
323
    def test_list_dir(self):
313
 
        transport = MemoryTransport()
314
 
        transport.put_bytes('foo', 'content')
315
 
        transport.mkdir('dir')
316
 
        transport.put_bytes('dir/subfoo', 'content')
317
 
        transport.put_bytes('dirlike', 'content')
 
324
        t = memory.MemoryTransport()
 
325
        t.put_bytes('foo', 'content')
 
326
        t.mkdir('dir')
 
327
        t.put_bytes('dir/subfoo', 'content')
 
328
        t.put_bytes('dirlike', 'content')
318
329
 
319
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
330
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
331
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
321
332
 
322
333
    def test_mkdir(self):
323
 
        transport = MemoryTransport()
324
 
        transport.mkdir('dir')
325
 
        transport.append_bytes('dir/path', 'content')
326
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
334
        t = memory.MemoryTransport()
 
335
        t.mkdir('dir')
 
336
        t.append_bytes('dir/path', 'content')
 
337
        self.assertEqual(t.get('dir/path').read(), 'content')
327
338
 
328
339
    def test_mkdir_missing_parent(self):
329
 
        transport = MemoryTransport()
330
 
        self.assertRaises(NoSuchFile,
331
 
                          transport.mkdir, 'dir/dir')
 
340
        t = memory.MemoryTransport()
 
341
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
332
342
 
333
343
    def test_mkdir_twice(self):
334
 
        transport = MemoryTransport()
335
 
        transport.mkdir('dir')
336
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
344
        t = memory.MemoryTransport()
 
345
        t.mkdir('dir')
 
346
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
337
347
 
338
348
    def test_parameters(self):
339
 
        transport = MemoryTransport()
340
 
        self.assertEqual(True, transport.listable())
341
 
        self.assertEqual(False, transport.is_readonly())
 
349
        t = memory.MemoryTransport()
 
350
        self.assertEqual(True, t.listable())
 
351
        self.assertEqual(False, t.is_readonly())
342
352
 
343
353
    def test_iter_files_recursive(self):
344
 
        transport = MemoryTransport()
345
 
        transport.mkdir('dir')
346
 
        transport.put_bytes('dir/foo', 'content')
347
 
        transport.put_bytes('dir/bar', 'content')
348
 
        transport.put_bytes('bar', 'content')
349
 
        paths = set(transport.iter_files_recursive())
 
354
        t = memory.MemoryTransport()
 
355
        t.mkdir('dir')
 
356
        t.put_bytes('dir/foo', 'content')
 
357
        t.put_bytes('dir/bar', 'content')
 
358
        t.put_bytes('bar', 'content')
 
359
        paths = set(t.iter_files_recursive())
350
360
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
351
361
 
352
362
    def test_stat(self):
353
 
        transport = MemoryTransport()
354
 
        transport.put_bytes('foo', 'content')
355
 
        transport.put_bytes('bar', 'phowar')
356
 
        self.assertEqual(7, transport.stat('foo').st_size)
357
 
        self.assertEqual(6, transport.stat('bar').st_size)
358
 
 
359
 
 
360
 
class ChrootDecoratorTransportTest(TestCase):
 
363
        t = memory.MemoryTransport()
 
364
        t.put_bytes('foo', 'content')
 
365
        t.put_bytes('bar', 'phowar')
 
366
        self.assertEqual(7, t.stat('foo').st_size)
 
367
        self.assertEqual(6, t.stat('bar').st_size)
 
368
 
 
369
 
 
370
class ChrootDecoratorTransportTest(tests.TestCase):
361
371
    """Chroot decoration specific tests."""
362
372
 
363
373
    def test_abspath(self):
364
374
        # The abspath is always relative to the chroot_url.
365
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
367
 
        transport = get_transport(server.get_url())
368
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
375
        server = chroot.ChrootServer(
 
376
            transport.get_transport('memory:///foo/bar/'))
 
377
        self.start_server(server)
 
378
        t = transport.get_transport(server.get_url())
 
379
        self.assertEqual(server.get_url(), t.abspath('/'))
369
380
 
370
 
        subdir_transport = transport.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
 
381
        subdir_t = t.clone('subdir')
 
382
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
373
383
 
374
384
    def test_clone(self):
375
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
377
 
        transport = get_transport(server.get_url())
 
385
        server = chroot.ChrootServer(
 
386
            transport.get_transport('memory:///foo/bar/'))
 
387
        self.start_server(server)
 
388
        t = transport.get_transport(server.get_url())
378
389
        # relpath from root and root path are the same
379
 
        relpath_cloned = transport.clone('foo')
380
 
        abspath_cloned = transport.clone('/foo')
 
390
        relpath_cloned = t.clone('foo')
 
391
        abspath_cloned = t.clone('/foo')
381
392
        self.assertEqual(server, relpath_cloned.server)
382
393
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
394
 
385
395
    def test_chroot_url_preserves_chroot(self):
386
396
        """Calling get_transport on a chroot transport's base should produce a
390
400
        This is so that it is not possible to escape a chroot by doing::
391
401
            url = chroot_transport.base
392
402
            parent_url = urlutils.join(url, '..')
393
 
            new_transport = get_transport(parent_url)
 
403
            new_t = transport.get_transport(parent_url)
394
404
        """
395
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
397
 
        transport = get_transport(server.get_url())
398
 
        new_transport = get_transport(transport.base)
399
 
        self.assertEqual(transport.server, new_transport.server)
400
 
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
 
405
        server = chroot.ChrootServer(
 
406
            transport.get_transport('memory:///path/subpath'))
 
407
        self.start_server(server)
 
408
        t = transport.get_transport(server.get_url())
 
409
        new_t = transport.get_transport(t.base)
 
410
        self.assertEqual(t.server, new_t.server)
 
411
        self.assertEqual(t.base, new_t.base)
402
412
 
403
413
    def test_urljoin_preserves_chroot(self):
404
414
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
407
417
        This is so that it is not possible to escape a chroot by doing::
408
418
            url = chroot_transport.base
409
419
            parent_url = urlutils.join(url, '..')
410
 
            new_transport = get_transport(parent_url)
 
420
            new_t = transport.get_transport(parent_url)
411
421
        """
412
 
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
414
 
        transport = get_transport(server.get_url())
 
422
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
423
        self.start_server(server)
 
424
        t = transport.get_transport(server.get_url())
415
425
        self.assertRaises(
416
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
 
 
419
 
 
420
 
class ChrootServerTest(TestCase):
 
426
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
427
 
 
428
 
 
429
class TestChrootServer(tests.TestCase):
421
430
 
422
431
    def test_construct(self):
423
 
        backing_transport = MemoryTransport()
424
 
        server = ChrootServer(backing_transport)
 
432
        backing_transport = memory.MemoryTransport()
 
433
        server = chroot.ChrootServer(backing_transport)
425
434
        self.assertEqual(backing_transport, server.backing_transport)
426
435
 
427
436
    def test_setUp(self):
428
 
        backing_transport = MemoryTransport()
429
 
        server = ChrootServer(backing_transport)
430
 
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
437
        backing_transport = memory.MemoryTransport()
 
438
        server = chroot.ChrootServer(backing_transport)
 
439
        server.start_server()
 
440
        try:
 
441
            self.assertTrue(server.scheme
 
442
                            in transport._get_protocol_handlers().keys())
 
443
        finally:
 
444
            server.stop_server()
432
445
 
433
 
    def test_tearDown(self):
434
 
        backing_transport = MemoryTransport()
435
 
        server = ChrootServer(backing_transport)
436
 
        server.setUp()
437
 
        server.tearDown()
438
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
446
    def test_stop_server(self):
 
447
        backing_transport = memory.MemoryTransport()
 
448
        server = chroot.ChrootServer(backing_transport)
 
449
        server.start_server()
 
450
        server.stop_server()
 
451
        self.assertFalse(server.scheme
 
452
                         in transport._get_protocol_handlers().keys())
439
453
 
440
454
    def test_get_url(self):
441
 
        backing_transport = MemoryTransport()
442
 
        server = ChrootServer(backing_transport)
443
 
        server.setUp()
444
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        server.tearDown()
446
 
 
447
 
 
448
 
class ReadonlyDecoratorTransportTest(TestCase):
 
455
        backing_transport = memory.MemoryTransport()
 
456
        server = chroot.ChrootServer(backing_transport)
 
457
        server.start_server()
 
458
        try:
 
459
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
460
        finally:
 
461
            server.stop_server()
 
462
 
 
463
 
 
464
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
465
    """Pathfilter decoration specific tests."""
 
466
 
 
467
    def test_abspath(self):
 
468
        # The abspath is always relative to the base of the backing transport.
 
469
        server = pathfilter.PathFilteringServer(
 
470
            transport.get_transport('memory:///foo/bar/'),
 
471
            lambda x: x)
 
472
        server.start_server()
 
473
        t = transport.get_transport(server.get_url())
 
474
        self.assertEqual(server.get_url(), t.abspath('/'))
 
475
 
 
476
        subdir_t = t.clone('subdir')
 
477
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
478
        server.stop_server()
 
479
 
 
480
    def make_pf_transport(self, filter_func=None):
 
481
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
482
        
 
483
        :param filter_func: by default this will be a no-op function.  Use this
 
484
            parameter to override it."""
 
485
        if filter_func is None:
 
486
            filter_func = lambda x: x
 
487
        server = pathfilter.PathFilteringServer(
 
488
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
489
        server.start_server()
 
490
        self.addCleanup(server.stop_server)
 
491
        return transport.get_transport(server.get_url())
 
492
 
 
493
    def test__filter(self):
 
494
        # _filter (with an identity func as filter_func) always returns
 
495
        # paths relative to the base of the backing transport.
 
496
        t = self.make_pf_transport()
 
497
        self.assertEqual('foo', t._filter('foo'))
 
498
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
499
        self.assertEqual('', t._filter('..'))
 
500
        self.assertEqual('', t._filter('/'))
 
501
        # The base of the pathfiltering transport is taken into account too.
 
502
        t = t.clone('subdir1/subdir2')
 
503
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
504
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
505
        self.assertEqual('subdir1', t._filter('..'))
 
506
        self.assertEqual('', t._filter('/'))
 
507
 
 
508
    def test_filter_invocation(self):
 
509
        filter_log = []
 
510
        def filter(path):
 
511
            filter_log.append(path)
 
512
            return path
 
513
        t = self.make_pf_transport(filter)
 
514
        t.has('abc')
 
515
        self.assertEqual(['abc'], filter_log)
 
516
        del filter_log[:]
 
517
        t.clone('abc').has('xyz')
 
518
        self.assertEqual(['abc/xyz'], filter_log)
 
519
        del filter_log[:]
 
520
        t.has('/abc')
 
521
        self.assertEqual(['abc'], filter_log)
 
522
 
 
523
    def test_clone(self):
 
524
        t = self.make_pf_transport()
 
525
        # relpath from root and root path are the same
 
526
        relpath_cloned = t.clone('foo')
 
527
        abspath_cloned = t.clone('/foo')
 
528
        self.assertEqual(t.server, relpath_cloned.server)
 
529
        self.assertEqual(t.server, abspath_cloned.server)
 
530
 
 
531
    def test_url_preserves_pathfiltering(self):
 
532
        """Calling get_transport on a pathfiltered transport's base should
 
533
        produce a transport with exactly the same behaviour as the original
 
534
        pathfiltered transport.
 
535
 
 
536
        This is so that it is not possible to escape (accidentally or
 
537
        otherwise) the filtering by doing::
 
538
            url = filtered_transport.base
 
539
            parent_url = urlutils.join(url, '..')
 
540
            new_t = transport.get_transport(parent_url)
 
541
        """
 
542
        t = self.make_pf_transport()
 
543
        new_t = transport.get_transport(t.base)
 
544
        self.assertEqual(t.server, new_t.server)
 
545
        self.assertEqual(t.base, new_t.base)
 
546
 
 
547
 
 
548
class ReadonlyDecoratorTransportTest(tests.TestCase):
449
549
    """Readonly decoration specific tests."""
450
550
 
451
551
    def test_local_parameters(self):
452
 
        import bzrlib.transport.readonly as readonly
453
552
        # connect to . in readonly mode
454
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
 
        self.assertEqual(True, transport.listable())
456
 
        self.assertEqual(True, transport.is_readonly())
 
553
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
554
        self.assertEqual(True, t.listable())
 
555
        self.assertEqual(True, t.is_readonly())
457
556
 
458
557
    def test_http_parameters(self):
459
558
        from bzrlib.tests.http_server import HttpServer
460
 
        import bzrlib.transport.readonly as readonly
461
559
        # connect to '.' via http which is not listable
462
560
        server = HttpServer()
463
 
        server.setUp()
464
 
        try:
465
 
            transport = get_transport('readonly+' + server.get_url())
466
 
            self.failUnless(isinstance(transport,
467
 
                                       readonly.ReadonlyTransportDecorator))
468
 
            self.assertEqual(False, transport.listable())
469
 
            self.assertEqual(True, transport.is_readonly())
470
 
        finally:
471
 
            server.tearDown()
472
 
 
473
 
 
474
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
561
        self.start_server(server)
 
562
        t = transport.get_transport('readonly+' + server.get_url())
 
563
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
564
        self.assertEqual(False, t.listable())
 
565
        self.assertEqual(True, t.is_readonly())
 
566
 
 
567
 
 
568
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
475
569
    """NFS decorator specific tests."""
476
570
 
477
571
    def get_nfs_transport(self, url):
478
 
        import bzrlib.transport.fakenfs as fakenfs
479
572
        # connect to url with nfs decoration
480
573
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
574
 
482
575
    def test_local_parameters(self):
483
576
        # the listable and is_readonly parameters
484
577
        # are not changed by the fakenfs decorator
485
 
        transport = self.get_nfs_transport('.')
486
 
        self.assertEqual(True, transport.listable())
487
 
        self.assertEqual(False, transport.is_readonly())
 
578
        t = self.get_nfs_transport('.')
 
579
        self.assertEqual(True, t.listable())
 
580
        self.assertEqual(False, t.is_readonly())
488
581
 
489
582
    def test_http_parameters(self):
490
583
        # the listable and is_readonly parameters
492
585
        from bzrlib.tests.http_server import HttpServer
493
586
        # connect to '.' via http which is not listable
494
587
        server = HttpServer()
495
 
        server.setUp()
496
 
        try:
497
 
            transport = self.get_nfs_transport(server.get_url())
498
 
            self.assertIsInstance(
499
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
 
            self.assertEqual(False, transport.listable())
501
 
            self.assertEqual(True, transport.is_readonly())
502
 
        finally:
503
 
            server.tearDown()
 
588
        self.start_server(server)
 
589
        t = self.get_nfs_transport(server.get_url())
 
590
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
591
        self.assertEqual(False, t.listable())
 
592
        self.assertEqual(True, t.is_readonly())
504
593
 
505
594
    def test_fakenfs_server_default(self):
506
595
        # a FakeNFSServer() should bring up a local relpath server for itself
507
 
        import bzrlib.transport.fakenfs as fakenfs
508
596
        server = fakenfs.FakeNFSServer()
509
 
        server.setUp()
510
 
        try:
511
 
            # the url should be decorated appropriately
512
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
513
 
            # and we should be able to get a transport for it
514
 
            transport = get_transport(server.get_url())
515
 
            # which must be a FakeNFSTransportDecorator instance.
516
 
            self.assertIsInstance(
517
 
                transport, fakenfs.FakeNFSTransportDecorator)
518
 
        finally:
519
 
            server.tearDown()
 
597
        self.start_server(server)
 
598
        # the url should be decorated appropriately
 
599
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
600
        # and we should be able to get a transport for it
 
601
        t = transport.get_transport(server.get_url())
 
602
        # which must be a FakeNFSTransportDecorator instance.
 
603
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
520
604
 
521
605
    def test_fakenfs_rename_semantics(self):
522
606
        # a FakeNFS transport must mangle the way rename errors occur to
523
607
        # look like NFS problems.
524
 
        transport = self.get_nfs_transport('.')
 
608
        t = self.get_nfs_transport('.')
525
609
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
526
 
                        transport=transport)
527
 
        self.assertRaises(errors.ResourceBusy,
528
 
                          transport.rename, 'from', 'to')
529
 
 
530
 
 
531
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
610
                        transport=t)
 
611
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
612
 
 
613
 
 
614
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
532
615
    """Tests for simulation of VFAT restrictions"""
533
616
 
534
617
    def get_vfat_transport(self, url):
538
621
 
539
622
    def test_transport_creation(self):
540
623
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
541
 
        transport = self.get_vfat_transport('.')
542
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
624
        t = self.get_vfat_transport('.')
 
625
        self.assertIsInstance(t, FakeVFATTransportDecorator)
543
626
 
544
627
    def test_transport_mkdir(self):
545
 
        transport = self.get_vfat_transport('.')
546
 
        transport.mkdir('HELLO')
547
 
        self.assertTrue(transport.has('hello'))
548
 
        self.assertTrue(transport.has('Hello'))
 
628
        t = self.get_vfat_transport('.')
 
629
        t.mkdir('HELLO')
 
630
        self.assertTrue(t.has('hello'))
 
631
        self.assertTrue(t.has('Hello'))
549
632
 
550
633
    def test_forbidden_chars(self):
551
 
        transport = self.get_vfat_transport('.')
552
 
        self.assertRaises(ValueError, transport.has, "<NU>")
553
 
 
554
 
 
555
 
class BadTransportHandler(Transport):
 
634
        t = self.get_vfat_transport('.')
 
635
        self.assertRaises(ValueError, t.has, "<NU>")
 
636
 
 
637
 
 
638
class BadTransportHandler(transport.Transport):
556
639
    def __init__(self, base_url):
557
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
558
 
 
559
 
 
560
 
class BackupTransportHandler(Transport):
 
640
        raise errors.DependencyNotPresent('some_lib',
 
641
                                          'testing missing dependency')
 
642
 
 
643
 
 
644
class BackupTransportHandler(transport.Transport):
561
645
    """Test transport that works as a backup for the BadTransportHandler"""
562
646
    pass
563
647
 
564
648
 
565
 
class TestTransportImplementation(TestCaseInTempDir):
 
649
class TestTransportImplementation(tests.TestCaseInTempDir):
566
650
    """Implementation verification for transports.
567
651
 
568
652
    To verify a transport we need a server factory, which is a callable
587
671
    def setUp(self):
588
672
        super(TestTransportImplementation, self).setUp()
589
673
        self._server = self.transport_server()
590
 
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
 
674
        self.start_server(self._server)
592
675
 
593
676
    def get_transport(self, relpath=None):
594
677
        """Return a connected transport to the local directory.
598
681
        base_url = self._server.get_url()
599
682
        url = self._adjust_url(base_url, relpath)
600
683
        # try getting the transport via the regular interface:
601
 
        t = get_transport(url)
 
684
        t = transport.get_transport(url)
602
685
        # vila--20070607 if the following are commented out the test suite
603
686
        # still pass. Is this really still needed or was it a forgotten
604
687
        # temporary fix ?
609
692
        return t
610
693
 
611
694
 
612
 
class TestLocalTransports(TestCase):
 
695
class TestLocalTransports(tests.TestCase):
613
696
 
614
697
    def test_get_transport_from_abspath(self):
615
698
        here = osutils.abspath('.')
616
 
        t = get_transport(here)
617
 
        self.assertIsInstance(t, LocalTransport)
 
699
        t = transport.get_transport(here)
 
700
        self.assertIsInstance(t, local.LocalTransport)
618
701
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
619
702
 
620
703
    def test_get_transport_from_relpath(self):
621
704
        here = osutils.abspath('.')
622
 
        t = get_transport('.')
623
 
        self.assertIsInstance(t, LocalTransport)
 
705
        t = transport.get_transport('.')
 
706
        self.assertIsInstance(t, local.LocalTransport)
624
707
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
625
708
 
626
709
    def test_get_transport_from_local_url(self):
627
710
        here = osutils.abspath('.')
628
711
        here_url = urlutils.local_path_to_url(here) + '/'
629
 
        t = get_transport(here_url)
630
 
        self.assertIsInstance(t, LocalTransport)
 
712
        t = transport.get_transport(here_url)
 
713
        self.assertIsInstance(t, local.LocalTransport)
631
714
        self.assertEquals(t.base, here_url)
632
715
 
633
716
    def test_local_abspath(self):
634
717
        here = osutils.abspath('.')
635
 
        t = get_transport(here)
 
718
        t = transport.get_transport(here)
636
719
        self.assertEquals(t.local_abspath(''), here)
637
720
 
638
721
 
639
 
class TestWin32LocalTransport(TestCase):
 
722
class TestWin32LocalTransport(tests.TestCase):
640
723
 
641
724
    def test_unc_clone_to_root(self):
642
725
        # Win32 UNC path like \\HOST\path
643
726
        # clone to root should stop at least at \\HOST part
644
727
        # not on \\
645
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
728
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
646
729
        for i in xrange(4):
647
730
            t = t.clone('..')
648
731
        self.assertEquals(t.base, 'file://HOST/')
651
734
        self.assertEquals(t.base, 'file://HOST/')
652
735
 
653
736
 
654
 
class TestConnectedTransport(TestCase):
 
737
class TestConnectedTransport(tests.TestCase):
655
738
    """Tests for connected to remote server transports"""
656
739
 
657
740
    def test_parse_url(self):
658
 
        t = ConnectedTransport('http://simple.example.com/home/source')
 
741
        t = transport.ConnectedTransport(
 
742
            'http://simple.example.com/home/source')
659
743
        self.assertEquals(t._host, 'simple.example.com')
660
744
        self.assertEquals(t._port, None)
661
745
        self.assertEquals(t._path, '/home/source/')
666
750
 
667
751
    def test_parse_url_with_at_in_user(self):
668
752
        # Bug 228058
669
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
753
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
670
754
        self.assertEquals(t._user, 'user@host.com')
671
755
 
672
756
    def test_parse_quoted_url(self):
673
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
757
        t = transport.ConnectedTransport(
 
758
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
759
        self.assertEquals(t._host, 'exAmple.com')
675
760
        self.assertEquals(t._port, 2222)
676
761
        self.assertEquals(t._user, 'robey')
682
767
 
683
768
    def test_parse_invalid_url(self):
684
769
        self.assertRaises(errors.InvalidURL,
685
 
                          ConnectedTransport,
 
770
                          transport.ConnectedTransport,
686
771
                          'sftp://lily.org:~janneke/public/bzr/gub')
687
772
 
688
773
    def test_relpath(self):
689
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
774
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
690
775
 
691
776
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
692
777
        self.assertRaises(errors.PathNotChild, t.relpath,
698
783
        self.assertRaises(errors.PathNotChild, t.relpath,
699
784
                          'sftp://user@host.com:33/abs/path/sub')
700
785
        # Make sure it works when we don't supply a username
701
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
786
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
702
787
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
703
788
 
704
789
        # Make sure it works when parts of the path will be url encoded
705
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
790
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
706
791
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
707
792
 
708
793
    def test_connection_sharing_propagate_credentials(self):
709
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
794
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
710
795
        self.assertEquals('user', t._user)
711
796
        self.assertEquals('host.com', t._host)
712
797
        self.assertIs(None, t._get_connection())
733
818
        self.assertIs(new_password, c._get_credentials())
734
819
 
735
820
 
736
 
class TestReusedTransports(TestCase):
 
821
class TestReusedTransports(tests.TestCase):
737
822
    """Tests for transport reuse"""
738
823
 
739
824
    def test_reuse_same_transport(self):
740
825
        possible_transports = []
741
 
        t1 = get_transport('http://foo/',
742
 
                           possible_transports=possible_transports)
 
826
        t1 = transport.get_transport('http://foo/',
 
827
                                     possible_transports=possible_transports)
743
828
        self.assertEqual([t1], possible_transports)
744
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
829
        t2 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=[t1])
745
831
        self.assertIs(t1, t2)
746
832
 
747
833
        # Also check that final '/' are handled correctly
748
 
        t3 = get_transport('http://foo/path/')
749
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
834
        t3 = transport.get_transport('http://foo/path/')
 
835
        t4 = transport.get_transport('http://foo/path',
 
836
                                     possible_transports=[t3])
750
837
        self.assertIs(t3, t4)
751
838
 
752
 
        t5 = get_transport('http://foo/path')
753
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
839
        t5 = transport.get_transport('http://foo/path')
 
840
        t6 = transport.get_transport('http://foo/path/',
 
841
                                     possible_transports=[t5])
754
842
        self.assertIs(t5, t6)
755
843
 
756
844
    def test_don_t_reuse_different_transport(self):
757
 
        t1 = get_transport('http://foo/path')
758
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
845
        t1 = transport.get_transport('http://foo/path')
 
846
        t2 = transport.get_transport('http://bar/path',
 
847
                                     possible_transports=[t1])
759
848
        self.assertIsNot(t1, t2)
760
849
 
761
850
 
762
 
class TestTransportTrace(TestCase):
 
851
class TestTransportTrace(tests.TestCase):
763
852
 
764
853
    def test_get(self):
765
 
        transport = get_transport('trace+memory://')
766
 
        self.assertIsInstance(
767
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
854
        t = transport.get_transport('trace+memory://')
 
855
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
768
856
 
769
857
    def test_clone_preserves_activity(self):
770
 
        transport = get_transport('trace+memory://')
771
 
        transport2 = transport.clone('.')
772
 
        self.assertTrue(transport is not transport2)
773
 
        self.assertTrue(transport._activity is transport2._activity)
 
858
        t = transport.get_transport('trace+memory://')
 
859
        t2 = t.clone('.')
 
860
        self.assertTrue(t is not t2)
 
861
        self.assertTrue(t._activity is t2._activity)
774
862
 
775
863
    # the following specific tests are for the operations that have made use of
776
864
    # logging in tests; we could test every single operation but doing that
777
865
    # still won't cause a test failure when the top level Transport API
778
866
    # changes; so there is little return doing that.
779
867
    def test_get(self):
780
 
        transport = get_transport('trace+memory:///')
781
 
        transport.put_bytes('foo', 'barish')
782
 
        transport.get('foo')
 
868
        t = transport.get_transport('trace+memory:///')
 
869
        t.put_bytes('foo', 'barish')
 
870
        t.get('foo')
783
871
        expected_result = []
784
872
        # put_bytes records the bytes, not the content to avoid memory
785
873
        # pressure.
786
874
        expected_result.append(('put_bytes', 'foo', 6, None))
787
875
        # get records the file name only.
788
876
        expected_result.append(('get', 'foo'))
789
 
        self.assertEqual(expected_result, transport._activity)
 
877
        self.assertEqual(expected_result, t._activity)
790
878
 
791
879
    def test_readv(self):
792
 
        transport = get_transport('trace+memory:///')
793
 
        transport.put_bytes('foo', 'barish')
794
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
795
 
            upper_limit=6))
 
880
        t = transport.get_transport('trace+memory:///')
 
881
        t.put_bytes('foo', 'barish')
 
882
        list(t.readv('foo', [(0, 1), (3, 2)],
 
883
                     adjust_for_latency=True, upper_limit=6))
796
884
        expected_result = []
797
885
        # put_bytes records the bytes, not the content to avoid memory
798
886
        # pressure.
799
887
        expected_result.append(('put_bytes', 'foo', 6, None))
800
888
        # readv records the supplied offset request
801
889
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
 
        self.assertEqual(expected_result, transport._activity)
 
890
        self.assertEqual(expected_result, t._activity)
 
891
 
 
892
 
 
893
class TestSSHConnections(tests.TestCaseWithTransport):
 
894
 
 
895
    def test_bzr_connect_to_bzr_ssh(self):
 
896
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
897
 
 
898
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
899
        """
 
900
        # This test actually causes a bzr instance to be invoked, which is very
 
901
        # expensive: it should be the only such test in the test suite.
 
902
        # A reasonable evolution for this would be to simply check inside
 
903
        # check_channel_exec_request that the command is appropriate, and then
 
904
        # satisfy requests in-process.
 
905
        self.requireFeature(features.paramiko)
 
906
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
907
        # override the interface (doesn't change self._vendor).
 
908
        # Note that this does encryption, so can be slow.
 
909
        from bzrlib.tests import stub_sftp
 
910
 
 
911
        # Start an SSH server
 
912
        self.command_executed = []
 
913
        # XXX: This is horrible -- we define a really dumb SSH server that
 
914
        # executes commands, and manage the hooking up of stdin/out/err to the
 
915
        # SSH channel ourselves.  Surely this has already been implemented
 
916
        # elsewhere?
 
917
        started = []
 
918
        class StubSSHServer(stub_sftp.StubServer):
 
919
 
 
920
            test = self
 
921
 
 
922
            def check_channel_exec_request(self, channel, command):
 
923
                self.test.command_executed.append(command)
 
924
                proc = subprocess.Popen(
 
925
                    command, shell=True, stdin=subprocess.PIPE,
 
926
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
927
 
 
928
                # XXX: horribly inefficient, not to mention ugly.
 
929
                # Start a thread for each of stdin/out/err, and relay bytes from
 
930
                # the subprocess to channel and vice versa.
 
931
                def ferry_bytes(read, write, close):
 
932
                    while True:
 
933
                        bytes = read(1)
 
934
                        if bytes == '':
 
935
                            close()
 
936
                            break
 
937
                        write(bytes)
 
938
 
 
939
                file_functions = [
 
940
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
941
                    (proc.stdout.read, channel.sendall, channel.close),
 
942
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
943
                started.append(proc)
 
944
                for read, write, close in file_functions:
 
945
                    t = threading.Thread(
 
946
                        target=ferry_bytes, args=(read, write, close))
 
947
                    t.start()
 
948
                    started.append(t)
 
949
 
 
950
                return True
 
951
 
 
952
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
953
        # We *don't* want to override the default SSH vendor: the detected one
 
954
        # is the one to use.
 
955
        self.start_server(ssh_server)
 
956
        port = ssh_server._listener.port
 
957
 
 
958
        if sys.platform == 'win32':
 
959
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
960
        else:
 
961
            bzr_remote_path = self.get_bzr_path()
 
962
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
963
 
 
964
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
965
        # variable is used to tell bzr what command to run on the remote end.
 
966
        path_to_branch = osutils.abspath('.')
 
967
        if sys.platform == 'win32':
 
968
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
969
            # prefix a '/' to get the right path.
 
970
            path_to_branch = '/' + path_to_branch
 
971
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
972
        t = transport.get_transport(url)
 
973
        self.permit_url(t.base)
 
974
        t.mkdir('foo')
 
975
 
 
976
        self.assertEqual(
 
977
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
978
            self.command_executed)
 
979
        # Make sure to disconnect, so that the remote process can stop, and we
 
980
        # can cleanup. Then pause the test until everything is shutdown
 
981
        t._client._medium.disconnect()
 
982
        if not started:
 
983
            return
 
984
        # First wait for the subprocess
 
985
        started[0].wait()
 
986
        # And the rest are threads
 
987
        for t in started[1:]:
 
988
            t.join()