~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Aaron Bentley
  • Date: 2007-07-17 13:27:14 UTC
  • mfrom: (2624 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2631.
  • Revision ID: abentley@panoramicfeedback.com-20070717132714-tmzx9khmg9501k51
Merge from bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from cStringIO import StringIO
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
19
18
import os
20
 
import subprocess
21
19
import sys
22
 
import threading
 
20
import stat
 
21
from cStringIO import StringIO
23
22
 
 
23
import bzrlib
24
24
from bzrlib import (
25
25
    errors,
26
 
    osutils,
27
 
    tests,
28
 
    transport,
29
26
    urlutils,
30
27
    )
31
 
from bzrlib.transport import (
32
 
    chroot,
33
 
    fakenfs,
34
 
    local,
35
 
    memory,
36
 
    pathfilter,
37
 
    readonly,
38
 
    )
39
 
from bzrlib.tests import features
 
28
from bzrlib.errors import (ConnectionError,
 
29
                           DependencyNotPresent,
 
30
                           FileExists,
 
31
                           InvalidURLJoin,
 
32
                           NoSuchFile,
 
33
                           PathNotChild,
 
34
                           TransportNotPossible,
 
35
                           ConnectionError,
 
36
                           DependencyNotPresent,
 
37
                           ReadError,
 
38
                           UnsupportedProtocol,
 
39
                           )
 
40
from bzrlib.tests import TestCase, TestCaseInTempDir
 
41
from bzrlib.transport import (_CoalescedOffset,
 
42
                              _get_protocol_handlers,
 
43
                              _set_protocol_handlers,
 
44
                              _get_transport_modules,
 
45
                              get_transport,
 
46
                              LateReadError,
 
47
                              register_lazy_transport,
 
48
                              register_transport_proto,
 
49
                              _clear_protocol_handlers,
 
50
                              Transport,
 
51
                              )
 
52
from bzrlib.transport.chroot import ChrootServer
 
53
from bzrlib.transport.memory import MemoryTransport
 
54
from bzrlib.transport.local import (LocalTransport,
 
55
                                    EmulatedWin32LocalTransport)
40
56
 
41
57
 
42
58
# TODO: Should possibly split transport-specific tests into their own files.
43
59
 
44
60
 
45
 
class TestTransport(tests.TestCase):
 
61
class TestTransport(TestCase):
46
62
    """Test the non transport-concrete class functionality."""
47
63
 
48
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
49
 
    # of try/finally -- vila 20100205
50
 
 
51
64
    def test__get_set_protocol_handlers(self):
52
 
        handlers = transport._get_protocol_handlers()
 
65
        handlers = _get_protocol_handlers()
53
66
        self.assertNotEqual([], handlers.keys( ))
54
67
        try:
55
 
            transport._clear_protocol_handlers()
56
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
68
            _clear_protocol_handlers()
 
69
            self.assertEqual([], _get_protocol_handlers().keys())
57
70
        finally:
58
 
            transport._set_protocol_handlers(handlers)
 
71
            _set_protocol_handlers(handlers)
59
72
 
60
73
    def test_get_transport_modules(self):
61
 
        handlers = transport._get_protocol_handlers()
62
 
        # don't pollute the current handlers
63
 
        transport._clear_protocol_handlers()
 
74
        handlers = _get_protocol_handlers()
64
75
        class SampleHandler(object):
65
76
            """I exist, isnt that enough?"""
66
77
        try:
67
 
            transport._clear_protocol_handlers()
68
 
            transport.register_transport_proto('foo')
69
 
            transport.register_lazy_transport('foo',
70
 
                                              'bzrlib.tests.test_transport',
71
 
                                              'TestTransport.SampleHandler')
72
 
            transport.register_transport_proto('bar')
73
 
            transport.register_lazy_transport('bar',
74
 
                                              'bzrlib.tests.test_transport',
75
 
                                              'TestTransport.SampleHandler')
76
 
            self.assertEqual([SampleHandler.__module__,
77
 
                              'bzrlib.transport.chroot',
78
 
                              'bzrlib.transport.pathfilter'],
79
 
                             transport._get_transport_modules())
 
78
            _clear_protocol_handlers()
 
79
            register_transport_proto('foo')
 
80
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
81
            register_transport_proto('bar')
 
82
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
83
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
80
85
        finally:
81
 
            transport._set_protocol_handlers(handlers)
 
86
            _set_protocol_handlers(handlers)
82
87
 
83
88
    def test_transport_dependency(self):
84
89
        """Transport with missing dependency causes no error"""
85
 
        saved_handlers = transport._get_protocol_handlers()
86
 
        # don't pollute the current handlers
87
 
        transport._clear_protocol_handlers()
 
90
        saved_handlers = _get_protocol_handlers()
88
91
        try:
89
 
            transport.register_transport_proto('foo')
90
 
            transport.register_lazy_transport(
91
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
92
            register_transport_proto('foo')
 
93
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
94
                    'BadTransportHandler')
92
95
            try:
93
 
                transport.get_transport('foo://fooserver/foo')
94
 
            except errors.UnsupportedProtocol, e:
 
96
                get_transport('foo://fooserver/foo')
 
97
            except UnsupportedProtocol, e:
95
98
                e_str = str(e)
96
99
                self.assertEquals('Unsupported protocol'
97
100
                                  ' for url "foo://fooserver/foo":'
101
104
                self.fail('Did not raise UnsupportedProtocol')
102
105
        finally:
103
106
            # restore original values
104
 
            transport._set_protocol_handlers(saved_handlers)
105
 
 
 
107
            _set_protocol_handlers(saved_handlers)
 
108
            
106
109
    def test_transport_fallback(self):
107
110
        """Transport with missing dependency causes no error"""
108
 
        saved_handlers = transport._get_protocol_handlers()
 
111
        saved_handlers = _get_protocol_handlers()
109
112
        try:
110
 
            transport._clear_protocol_handlers()
111
 
            transport.register_transport_proto('foo')
112
 
            transport.register_lazy_transport(
113
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
114
 
            transport.register_lazy_transport(
115
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
116
 
            t = transport.get_transport('foo://fooserver/foo')
 
113
            _clear_protocol_handlers()
 
114
            register_transport_proto('foo')
 
115
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
116
                    'BackupTransportHandler')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BadTransportHandler')
 
119
            t = get_transport('foo://fooserver/foo')
117
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
118
121
        finally:
119
 
            transport._set_protocol_handlers(saved_handlers)
120
 
 
121
 
    def test_ssh_hints(self):
122
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
123
 
        try:
124
 
            transport.get_transport('ssh://fooserver/foo')
125
 
        except errors.UnsupportedProtocol, e:
126
 
            e_str = str(e)
127
 
            self.assertEquals('Unsupported protocol'
128
 
                              ' for url "ssh://fooserver/foo":'
129
 
                              ' bzr supports bzr+ssh to operate over ssh,'
130
 
                              ' use "bzr+ssh://fooserver/foo".',
131
 
                              str(e))
132
 
        else:
133
 
            self.fail('Did not raise UnsupportedProtocol')
 
122
            _set_protocol_handlers(saved_handlers)
134
123
 
135
124
    def test_LateReadError(self):
136
125
        """The LateReadError helper should raise on read()."""
137
 
        a_file = transport.LateReadError('a path')
 
126
        a_file = LateReadError('a path')
138
127
        try:
139
128
            a_file.read()
140
 
        except errors.ReadError, error:
 
129
        except ReadError, error:
141
130
            self.assertEqual('a path', error.path)
142
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
131
        self.assertRaises(ReadError, a_file.read, 40)
143
132
        a_file.close()
144
133
 
145
134
    def test__combine_paths(self):
146
 
        t = transport.Transport('/')
 
135
        t = Transport('/')
147
136
        self.assertEqual('/home/sarah/project/foo',
148
137
                         t._combine_paths('/home/sarah', 'project/foo'))
149
138
        self.assertEqual('/etc',
155
144
 
156
145
    def test_local_abspath_non_local_transport(self):
157
146
        # the base implementation should throw
158
 
        t = memory.MemoryTransport()
 
147
        t = MemoryTransport()
159
148
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
160
149
        self.assertEqual('memory:///t is not a local path.', str(e))
161
150
 
162
151
 
163
 
class TestCoalesceOffsets(tests.TestCase):
164
 
 
165
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
166
 
        coalesce = transport.Transport._coalesce_offsets
167
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
168
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
169
 
                            max_size=max_size))
 
152
class TestCoalesceOffsets(TestCase):
 
153
    
 
154
    def check(self, expected, offsets, limit=0, fudge=0):
 
155
        coalesce = Transport._coalesce_offsets
 
156
        exp = [_CoalescedOffset(*x) for x in expected]
 
157
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
170
158
        self.assertEqual(exp, out)
171
159
 
172
160
    def test_coalesce_empty(self):
179
167
        self.check([(0, 10, [(0, 10)]),
180
168
                    (20, 10, [(0, 10)]),
181
169
                   ], [(0, 10), (20, 10)])
182
 
 
 
170
            
183
171
    def test_coalesce_unsorted(self):
184
172
        self.check([(20, 10, [(0, 10)]),
185
173
                    (0, 10, [(0, 10)]),
190
178
                   [(0, 10), (10, 10)])
191
179
 
192
180
    def test_coalesce_overlapped(self):
193
 
        self.assertRaises(ValueError,
194
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
195
 
                        [(0, 10), (5, 10)])
 
181
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
182
                   [(0, 10), (5, 10)])
196
183
 
197
184
    def test_coalesce_limit(self):
198
185
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
219
206
                   ], [(10, 10), (30, 10), (100, 10)],
220
207
                   fudge=10
221
208
                  )
222
 
    def test_coalesce_max_size(self):
223
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
224
 
                    (30, 50, [(0, 50)]),
225
 
                    # If one range is above max_size, it gets its own coalesced
226
 
                    # offset
227
 
                    (100, 80, [(0, 80),]),],
228
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
229
 
                   max_size=50
230
 
                  )
231
 
 
232
 
    def test_coalesce_no_max_size(self):
233
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
234
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
235
 
                  )
236
 
 
237
 
    def test_coalesce_default_limit(self):
238
 
        # By default we use a 100MB max size.
239
 
        ten_mb = 10*1024*1024
240
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
241
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
242
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
243
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
244
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
245
 
                   max_size=1*1024*1024*1024)
246
 
 
247
 
 
248
 
class TestMemoryServer(tests.TestCase):
249
 
 
250
 
    def test_create_server(self):
251
 
        server = memory.MemoryServer()
252
 
        server.start_server()
253
 
        url = server.get_url()
254
 
        self.assertTrue(url in transport.transport_list_registry)
255
 
        t = transport.get_transport(url)
256
 
        del t
257
 
        server.stop_server()
258
 
        self.assertFalse(url in transport.transport_list_registry)
259
 
        self.assertRaises(errors.UnsupportedProtocol,
260
 
                          transport.get_transport, url)
261
 
 
262
 
 
263
 
class TestMemoryTransport(tests.TestCase):
 
209
 
 
210
 
 
211
class TestMemoryTransport(TestCase):
264
212
 
265
213
    def test_get_transport(self):
266
 
        memory.MemoryTransport()
 
214
        MemoryTransport()
267
215
 
268
216
    def test_clone(self):
269
 
        t = memory.MemoryTransport()
270
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
271
 
        self.assertEqual("memory:///", t.clone("/").base)
 
217
        transport = MemoryTransport()
 
218
        self.assertTrue(isinstance(transport, MemoryTransport))
 
219
        self.assertEqual("memory:///", transport.clone("/").base)
272
220
 
273
221
    def test_abspath(self):
274
 
        t = memory.MemoryTransport()
275
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
222
        transport = MemoryTransport()
 
223
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
276
224
 
277
225
    def test_abspath_of_root(self):
278
 
        t = memory.MemoryTransport()
279
 
        self.assertEqual("memory:///", t.base)
280
 
        self.assertEqual("memory:///", t.abspath('/'))
 
226
        transport = MemoryTransport()
 
227
        self.assertEqual("memory:///", transport.base)
 
228
        self.assertEqual("memory:///", transport.abspath('/'))
281
229
 
282
230
    def test_abspath_of_relpath_starting_at_root(self):
283
 
        t = memory.MemoryTransport()
284
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
231
        transport = MemoryTransport()
 
232
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
285
233
 
286
234
    def test_append_and_get(self):
287
 
        t = memory.MemoryTransport()
288
 
        t.append_bytes('path', 'content')
289
 
        self.assertEqual(t.get('path').read(), 'content')
290
 
        t.append_file('path', StringIO('content'))
291
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
235
        transport = MemoryTransport()
 
236
        transport.append_bytes('path', 'content')
 
237
        self.assertEqual(transport.get('path').read(), 'content')
 
238
        transport.append_file('path', StringIO('content'))
 
239
        self.assertEqual(transport.get('path').read(), 'contentcontent')
292
240
 
293
241
    def test_put_and_get(self):
294
 
        t = memory.MemoryTransport()
295
 
        t.put_file('path', StringIO('content'))
296
 
        self.assertEqual(t.get('path').read(), 'content')
297
 
        t.put_bytes('path', 'content')
298
 
        self.assertEqual(t.get('path').read(), 'content')
 
242
        transport = MemoryTransport()
 
243
        transport.put_file('path', StringIO('content'))
 
244
        self.assertEqual(transport.get('path').read(), 'content')
 
245
        transport.put_bytes('path', 'content')
 
246
        self.assertEqual(transport.get('path').read(), 'content')
299
247
 
300
248
    def test_append_without_dir_fails(self):
301
 
        t = memory.MemoryTransport()
302
 
        self.assertRaises(errors.NoSuchFile,
303
 
                          t.append_bytes, 'dir/path', 'content')
 
249
        transport = MemoryTransport()
 
250
        self.assertRaises(NoSuchFile,
 
251
                          transport.append_bytes, 'dir/path', 'content')
304
252
 
305
253
    def test_put_without_dir_fails(self):
306
 
        t = memory.MemoryTransport()
307
 
        self.assertRaises(errors.NoSuchFile,
308
 
                          t.put_file, 'dir/path', StringIO('content'))
 
254
        transport = MemoryTransport()
 
255
        self.assertRaises(NoSuchFile,
 
256
                          transport.put_file, 'dir/path', StringIO('content'))
309
257
 
310
258
    def test_get_missing(self):
311
 
        transport = memory.MemoryTransport()
312
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
259
        transport = MemoryTransport()
 
260
        self.assertRaises(NoSuchFile, transport.get, 'foo')
313
261
 
314
262
    def test_has_missing(self):
315
 
        t = memory.MemoryTransport()
316
 
        self.assertEquals(False, t.has('foo'))
 
263
        transport = MemoryTransport()
 
264
        self.assertEquals(False, transport.has('foo'))
317
265
 
318
266
    def test_has_present(self):
319
 
        t = memory.MemoryTransport()
320
 
        t.append_bytes('foo', 'content')
321
 
        self.assertEquals(True, t.has('foo'))
 
267
        transport = MemoryTransport()
 
268
        transport.append_bytes('foo', 'content')
 
269
        self.assertEquals(True, transport.has('foo'))
322
270
 
323
271
    def test_list_dir(self):
324
 
        t = memory.MemoryTransport()
325
 
        t.put_bytes('foo', 'content')
326
 
        t.mkdir('dir')
327
 
        t.put_bytes('dir/subfoo', 'content')
328
 
        t.put_bytes('dirlike', 'content')
 
272
        transport = MemoryTransport()
 
273
        transport.put_bytes('foo', 'content')
 
274
        transport.mkdir('dir')
 
275
        transport.put_bytes('dir/subfoo', 'content')
 
276
        transport.put_bytes('dirlike', 'content')
329
277
 
330
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
331
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
278
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
279
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
332
280
 
333
281
    def test_mkdir(self):
334
 
        t = memory.MemoryTransport()
335
 
        t.mkdir('dir')
336
 
        t.append_bytes('dir/path', 'content')
337
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
282
        transport = MemoryTransport()
 
283
        transport.mkdir('dir')
 
284
        transport.append_bytes('dir/path', 'content')
 
285
        self.assertEqual(transport.get('dir/path').read(), 'content')
338
286
 
339
287
    def test_mkdir_missing_parent(self):
340
 
        t = memory.MemoryTransport()
341
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
288
        transport = MemoryTransport()
 
289
        self.assertRaises(NoSuchFile,
 
290
                          transport.mkdir, 'dir/dir')
342
291
 
343
292
    def test_mkdir_twice(self):
344
 
        t = memory.MemoryTransport()
345
 
        t.mkdir('dir')
346
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
293
        transport = MemoryTransport()
 
294
        transport.mkdir('dir')
 
295
        self.assertRaises(FileExists, transport.mkdir, 'dir')
347
296
 
348
297
    def test_parameters(self):
349
 
        t = memory.MemoryTransport()
350
 
        self.assertEqual(True, t.listable())
351
 
        self.assertEqual(False, t.is_readonly())
 
298
        transport = MemoryTransport()
 
299
        self.assertEqual(True, transport.listable())
 
300
        self.assertEqual(False, transport.should_cache())
 
301
        self.assertEqual(False, transport.is_readonly())
352
302
 
353
303
    def test_iter_files_recursive(self):
354
 
        t = memory.MemoryTransport()
355
 
        t.mkdir('dir')
356
 
        t.put_bytes('dir/foo', 'content')
357
 
        t.put_bytes('dir/bar', 'content')
358
 
        t.put_bytes('bar', 'content')
359
 
        paths = set(t.iter_files_recursive())
 
304
        transport = MemoryTransport()
 
305
        transport.mkdir('dir')
 
306
        transport.put_bytes('dir/foo', 'content')
 
307
        transport.put_bytes('dir/bar', 'content')
 
308
        transport.put_bytes('bar', 'content')
 
309
        paths = set(transport.iter_files_recursive())
360
310
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
361
311
 
362
312
    def test_stat(self):
363
 
        t = memory.MemoryTransport()
364
 
        t.put_bytes('foo', 'content')
365
 
        t.put_bytes('bar', 'phowar')
366
 
        self.assertEqual(7, t.stat('foo').st_size)
367
 
        self.assertEqual(6, t.stat('bar').st_size)
368
 
 
369
 
 
370
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.put_bytes('bar', 'phowar')
 
316
        self.assertEqual(7, transport.stat('foo').st_size)
 
317
        self.assertEqual(6, transport.stat('bar').st_size)
 
318
 
 
319
 
 
320
class ChrootDecoratorTransportTest(TestCase):
371
321
    """Chroot decoration specific tests."""
372
322
 
373
323
    def test_abspath(self):
374
324
        # The abspath is always relative to the chroot_url.
375
 
        server = chroot.ChrootServer(
376
 
            transport.get_transport('memory:///foo/bar/'))
377
 
        self.start_server(server)
378
 
        t = transport.get_transport(server.get_url())
379
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
325
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
326
        server.setUp()
 
327
        transport = get_transport(server.get_url())
 
328
        self.assertEqual(server.get_url(), transport.abspath('/'))
380
329
 
381
 
        subdir_t = t.clone('subdir')
382
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
330
        subdir_transport = transport.clone('subdir')
 
331
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
332
        server.tearDown()
383
333
 
384
334
    def test_clone(self):
385
 
        server = chroot.ChrootServer(
386
 
            transport.get_transport('memory:///foo/bar/'))
387
 
        self.start_server(server)
388
 
        t = transport.get_transport(server.get_url())
 
335
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
336
        server.setUp()
 
337
        transport = get_transport(server.get_url())
389
338
        # relpath from root and root path are the same
390
 
        relpath_cloned = t.clone('foo')
391
 
        abspath_cloned = t.clone('/foo')
 
339
        relpath_cloned = transport.clone('foo')
 
340
        abspath_cloned = transport.clone('/foo')
392
341
        self.assertEqual(server, relpath_cloned.server)
393
342
        self.assertEqual(server, abspath_cloned.server)
394
 
 
 
343
        server.tearDown()
 
344
    
395
345
    def test_chroot_url_preserves_chroot(self):
396
346
        """Calling get_transport on a chroot transport's base should produce a
397
347
        transport with exactly the same behaviour as the original chroot
400
350
        This is so that it is not possible to escape a chroot by doing::
401
351
            url = chroot_transport.base
402
352
            parent_url = urlutils.join(url, '..')
403
 
            new_t = transport.get_transport(parent_url)
 
353
            new_transport = get_transport(parent_url)
404
354
        """
405
 
        server = chroot.ChrootServer(
406
 
            transport.get_transport('memory:///path/subpath'))
407
 
        self.start_server(server)
408
 
        t = transport.get_transport(server.get_url())
409
 
        new_t = transport.get_transport(t.base)
410
 
        self.assertEqual(t.server, new_t.server)
411
 
        self.assertEqual(t.base, new_t.base)
412
 
 
 
355
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
356
        server.setUp()
 
357
        transport = get_transport(server.get_url())
 
358
        new_transport = get_transport(transport.base)
 
359
        self.assertEqual(transport.server, new_transport.server)
 
360
        self.assertEqual(transport.base, new_transport.base)
 
361
        server.tearDown()
 
362
        
413
363
    def test_urljoin_preserves_chroot(self):
414
364
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
415
365
        URL that escapes the intended chroot.
417
367
        This is so that it is not possible to escape a chroot by doing::
418
368
            url = chroot_transport.base
419
369
            parent_url = urlutils.join(url, '..')
420
 
            new_t = transport.get_transport(parent_url)
 
370
            new_transport = get_transport(parent_url)
421
371
        """
422
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
423
 
        self.start_server(server)
424
 
        t = transport.get_transport(server.get_url())
 
372
        server = ChrootServer(get_transport('memory:///path/'))
 
373
        server.setUp()
 
374
        transport = get_transport(server.get_url())
425
375
        self.assertRaises(
426
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
427
 
 
428
 
 
429
 
class TestChrootServer(tests.TestCase):
 
376
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
377
        server.tearDown()
 
378
 
 
379
 
 
380
class ChrootServerTest(TestCase):
430
381
 
431
382
    def test_construct(self):
432
 
        backing_transport = memory.MemoryTransport()
433
 
        server = chroot.ChrootServer(backing_transport)
 
383
        backing_transport = MemoryTransport()
 
384
        server = ChrootServer(backing_transport)
434
385
        self.assertEqual(backing_transport, server.backing_transport)
435
386
 
436
387
    def test_setUp(self):
437
 
        backing_transport = memory.MemoryTransport()
438
 
        server = chroot.ChrootServer(backing_transport)
439
 
        server.start_server()
440
 
        try:
441
 
            self.assertTrue(server.scheme
442
 
                            in transport._get_protocol_handlers().keys())
443
 
        finally:
444
 
            server.stop_server()
 
388
        backing_transport = MemoryTransport()
 
389
        server = ChrootServer(backing_transport)
 
390
        server.setUp()
 
391
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
445
392
 
446
 
    def test_stop_server(self):
447
 
        backing_transport = memory.MemoryTransport()
448
 
        server = chroot.ChrootServer(backing_transport)
449
 
        server.start_server()
450
 
        server.stop_server()
451
 
        self.assertFalse(server.scheme
452
 
                         in transport._get_protocol_handlers().keys())
 
393
    def test_tearDown(self):
 
394
        backing_transport = MemoryTransport()
 
395
        server = ChrootServer(backing_transport)
 
396
        server.setUp()
 
397
        server.tearDown()
 
398
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
453
399
 
454
400
    def test_get_url(self):
455
 
        backing_transport = memory.MemoryTransport()
456
 
        server = chroot.ChrootServer(backing_transport)
457
 
        server.start_server()
458
 
        try:
459
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
460
 
        finally:
461
 
            server.stop_server()
462
 
 
463
 
 
464
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
465
 
    """Pathfilter decoration specific tests."""
466
 
 
467
 
    def test_abspath(self):
468
 
        # The abspath is always relative to the base of the backing transport.
469
 
        server = pathfilter.PathFilteringServer(
470
 
            transport.get_transport('memory:///foo/bar/'),
471
 
            lambda x: x)
472
 
        server.start_server()
473
 
        t = transport.get_transport(server.get_url())
474
 
        self.assertEqual(server.get_url(), t.abspath('/'))
475
 
 
476
 
        subdir_t = t.clone('subdir')
477
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
478
 
        server.stop_server()
479
 
 
480
 
    def make_pf_transport(self, filter_func=None):
481
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
482
 
        
483
 
        :param filter_func: by default this will be a no-op function.  Use this
484
 
            parameter to override it."""
485
 
        if filter_func is None:
486
 
            filter_func = lambda x: x
487
 
        server = pathfilter.PathFilteringServer(
488
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
489
 
        server.start_server()
490
 
        self.addCleanup(server.stop_server)
491
 
        return transport.get_transport(server.get_url())
492
 
 
493
 
    def test__filter(self):
494
 
        # _filter (with an identity func as filter_func) always returns
495
 
        # paths relative to the base of the backing transport.
496
 
        t = self.make_pf_transport()
497
 
        self.assertEqual('foo', t._filter('foo'))
498
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
499
 
        self.assertEqual('', t._filter('..'))
500
 
        self.assertEqual('', t._filter('/'))
501
 
        # The base of the pathfiltering transport is taken into account too.
502
 
        t = t.clone('subdir1/subdir2')
503
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
504
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
505
 
        self.assertEqual('subdir1', t._filter('..'))
506
 
        self.assertEqual('', t._filter('/'))
507
 
 
508
 
    def test_filter_invocation(self):
509
 
        filter_log = []
510
 
        def filter(path):
511
 
            filter_log.append(path)
512
 
            return path
513
 
        t = self.make_pf_transport(filter)
514
 
        t.has('abc')
515
 
        self.assertEqual(['abc'], filter_log)
516
 
        del filter_log[:]
517
 
        t.clone('abc').has('xyz')
518
 
        self.assertEqual(['abc/xyz'], filter_log)
519
 
        del filter_log[:]
520
 
        t.has('/abc')
521
 
        self.assertEqual(['abc'], filter_log)
522
 
 
523
 
    def test_clone(self):
524
 
        t = self.make_pf_transport()
525
 
        # relpath from root and root path are the same
526
 
        relpath_cloned = t.clone('foo')
527
 
        abspath_cloned = t.clone('/foo')
528
 
        self.assertEqual(t.server, relpath_cloned.server)
529
 
        self.assertEqual(t.server, abspath_cloned.server)
530
 
 
531
 
    def test_url_preserves_pathfiltering(self):
532
 
        """Calling get_transport on a pathfiltered transport's base should
533
 
        produce a transport with exactly the same behaviour as the original
534
 
        pathfiltered transport.
535
 
 
536
 
        This is so that it is not possible to escape (accidentally or
537
 
        otherwise) the filtering by doing::
538
 
            url = filtered_transport.base
539
 
            parent_url = urlutils.join(url, '..')
540
 
            new_t = transport.get_transport(parent_url)
541
 
        """
542
 
        t = self.make_pf_transport()
543
 
        new_t = transport.get_transport(t.base)
544
 
        self.assertEqual(t.server, new_t.server)
545
 
        self.assertEqual(t.base, new_t.base)
546
 
 
547
 
 
548
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
401
        backing_transport = MemoryTransport()
 
402
        server = ChrootServer(backing_transport)
 
403
        server.setUp()
 
404
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
405
        server.tearDown()
 
406
 
 
407
 
 
408
class ReadonlyDecoratorTransportTest(TestCase):
549
409
    """Readonly decoration specific tests."""
550
410
 
551
411
    def test_local_parameters(self):
 
412
        import bzrlib.transport.readonly as readonly
552
413
        # connect to . in readonly mode
553
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
554
 
        self.assertEqual(True, t.listable())
555
 
        self.assertEqual(True, t.is_readonly())
 
414
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
415
        self.assertEqual(True, transport.listable())
 
416
        self.assertEqual(False, transport.should_cache())
 
417
        self.assertEqual(True, transport.is_readonly())
556
418
 
557
419
    def test_http_parameters(self):
558
 
        from bzrlib.tests.http_server import HttpServer
559
 
        # connect to '.' via http which is not listable
 
420
        from bzrlib.tests.HttpServer import HttpServer
 
421
        import bzrlib.transport.readonly as readonly
 
422
        # connect to . via http which is not listable
560
423
        server = HttpServer()
561
 
        self.start_server(server)
562
 
        t = transport.get_transport('readonly+' + server.get_url())
563
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
564
 
        self.assertEqual(False, t.listable())
565
 
        self.assertEqual(True, t.is_readonly())
566
 
 
567
 
 
568
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
424
        server.setUp()
 
425
        try:
 
426
            transport = get_transport('readonly+' + server.get_url())
 
427
            self.failUnless(isinstance(transport,
 
428
                                       readonly.ReadonlyTransportDecorator))
 
429
            self.assertEqual(False, transport.listable())
 
430
            self.assertEqual(True, transport.should_cache())
 
431
            self.assertEqual(True, transport.is_readonly())
 
432
        finally:
 
433
            server.tearDown()
 
434
 
 
435
 
 
436
class FakeNFSDecoratorTests(TestCaseInTempDir):
569
437
    """NFS decorator specific tests."""
570
438
 
571
439
    def get_nfs_transport(self, url):
 
440
        import bzrlib.transport.fakenfs as fakenfs
572
441
        # connect to url with nfs decoration
573
442
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
574
443
 
575
444
    def test_local_parameters(self):
576
 
        # the listable and is_readonly parameters
 
445
        # the listable, should_cache and is_readonly parameters
577
446
        # are not changed by the fakenfs decorator
578
 
        t = self.get_nfs_transport('.')
579
 
        self.assertEqual(True, t.listable())
580
 
        self.assertEqual(False, t.is_readonly())
 
447
        transport = self.get_nfs_transport('.')
 
448
        self.assertEqual(True, transport.listable())
 
449
        self.assertEqual(False, transport.should_cache())
 
450
        self.assertEqual(False, transport.is_readonly())
581
451
 
582
452
    def test_http_parameters(self):
583
 
        # the listable and is_readonly parameters
 
453
        # the listable, should_cache and is_readonly parameters
584
454
        # are not changed by the fakenfs decorator
585
 
        from bzrlib.tests.http_server import HttpServer
586
 
        # connect to '.' via http which is not listable
 
455
        from bzrlib.tests.HttpServer import HttpServer
 
456
        # connect to . via http which is not listable
587
457
        server = HttpServer()
588
 
        self.start_server(server)
589
 
        t = self.get_nfs_transport(server.get_url())
590
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
591
 
        self.assertEqual(False, t.listable())
592
 
        self.assertEqual(True, t.is_readonly())
 
458
        server.setUp()
 
459
        try:
 
460
            transport = self.get_nfs_transport(server.get_url())
 
461
            self.assertIsInstance(
 
462
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
463
            self.assertEqual(False, transport.listable())
 
464
            self.assertEqual(True, transport.should_cache())
 
465
            self.assertEqual(True, transport.is_readonly())
 
466
        finally:
 
467
            server.tearDown()
593
468
 
594
469
    def test_fakenfs_server_default(self):
595
470
        # a FakeNFSServer() should bring up a local relpath server for itself
 
471
        import bzrlib.transport.fakenfs as fakenfs
596
472
        server = fakenfs.FakeNFSServer()
597
 
        self.start_server(server)
598
 
        # the url should be decorated appropriately
599
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
600
 
        # and we should be able to get a transport for it
601
 
        t = transport.get_transport(server.get_url())
602
 
        # which must be a FakeNFSTransportDecorator instance.
603
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
473
        server.setUp()
 
474
        try:
 
475
            # the url should be decorated appropriately
 
476
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
477
            # and we should be able to get a transport for it
 
478
            transport = get_transport(server.get_url())
 
479
            # which must be a FakeNFSTransportDecorator instance.
 
480
            self.assertIsInstance(
 
481
                transport, fakenfs.FakeNFSTransportDecorator)
 
482
        finally:
 
483
            server.tearDown()
604
484
 
605
485
    def test_fakenfs_rename_semantics(self):
606
486
        # a FakeNFS transport must mangle the way rename errors occur to
607
487
        # look like NFS problems.
608
 
        t = self.get_nfs_transport('.')
 
488
        transport = self.get_nfs_transport('.')
609
489
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
610
 
                        transport=t)
611
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
612
 
 
613
 
 
614
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
490
                        transport=transport)
 
491
        self.assertRaises(errors.ResourceBusy,
 
492
                          transport.rename, 'from', 'to')
 
493
 
 
494
 
 
495
class FakeVFATDecoratorTests(TestCaseInTempDir):
615
496
    """Tests for simulation of VFAT restrictions"""
616
497
 
617
498
    def get_vfat_transport(self, url):
621
502
 
622
503
    def test_transport_creation(self):
623
504
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
624
 
        t = self.get_vfat_transport('.')
625
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
505
        transport = self.get_vfat_transport('.')
 
506
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
626
507
 
627
508
    def test_transport_mkdir(self):
628
 
        t = self.get_vfat_transport('.')
629
 
        t.mkdir('HELLO')
630
 
        self.assertTrue(t.has('hello'))
631
 
        self.assertTrue(t.has('Hello'))
 
509
        transport = self.get_vfat_transport('.')
 
510
        transport.mkdir('HELLO')
 
511
        self.assertTrue(transport.has('hello'))
 
512
        self.assertTrue(transport.has('Hello'))
632
513
 
633
514
    def test_forbidden_chars(self):
634
 
        t = self.get_vfat_transport('.')
635
 
        self.assertRaises(ValueError, t.has, "<NU>")
636
 
 
637
 
 
638
 
class BadTransportHandler(transport.Transport):
 
515
        transport = self.get_vfat_transport('.')
 
516
        self.assertRaises(ValueError, transport.has, "<NU>")
 
517
 
 
518
 
 
519
class BadTransportHandler(Transport):
639
520
    def __init__(self, base_url):
640
 
        raise errors.DependencyNotPresent('some_lib',
641
 
                                          'testing missing dependency')
642
 
 
643
 
 
644
 
class BackupTransportHandler(transport.Transport):
 
521
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
522
 
 
523
 
 
524
class BackupTransportHandler(Transport):
645
525
    """Test transport that works as a backup for the BadTransportHandler"""
646
526
    pass
647
527
 
648
528
 
649
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
529
class TestTransportImplementation(TestCaseInTempDir):
650
530
    """Implementation verification for transports.
651
 
 
 
531
    
652
532
    To verify a transport we need a server factory, which is a callable
653
533
    that accepts no parameters and returns an implementation of
654
534
    bzrlib.transport.Server.
655
 
 
 
535
    
656
536
    That Server is then used to construct transport instances and test
657
537
    the transport via loopback activity.
658
538
 
659
 
    Currently this assumes that the Transport object is connected to the
660
 
    current working directory.  So that whatever is done
661
 
    through the transport, should show up in the working
 
539
    Currently this assumes that the Transport object is connected to the 
 
540
    current working directory.  So that whatever is done 
 
541
    through the transport, should show up in the working 
662
542
    directory, and vice-versa. This is a bug, because its possible to have
663
 
    URL schemes which provide access to something that may not be
664
 
    result in storage on the local disk, i.e. due to file system limits, or
 
543
    URL schemes which provide access to something that may not be 
 
544
    result in storage on the local disk, i.e. due to file system limits, or 
665
545
    due to it being a database or some other non-filesystem tool.
666
546
 
667
547
    This also tests to make sure that the functions work with both
668
548
    generators and lists (assuming iter(list) is effectively a generator)
669
549
    """
670
 
 
 
550
    
671
551
    def setUp(self):
672
552
        super(TestTransportImplementation, self).setUp()
673
553
        self._server = self.transport_server()
674
 
        self.start_server(self._server)
 
554
        self._server.setUp()
 
555
        self.addCleanup(self._server.tearDown)
675
556
 
676
557
    def get_transport(self, relpath=None):
677
558
        """Return a connected transport to the local directory.
681
562
        base_url = self._server.get_url()
682
563
        url = self._adjust_url(base_url, relpath)
683
564
        # try getting the transport via the regular interface:
684
 
        t = transport.get_transport(url)
685
 
        # vila--20070607 if the following are commented out the test suite
686
 
        # still pass. Is this really still needed or was it a forgotten
687
 
        # temporary fix ?
 
565
        t = get_transport(url)
688
566
        if not isinstance(t, self.transport_class):
689
567
            # we did not get the correct transport class type. Override the
690
568
            # regular connection behaviour by direct construction.
692
570
        return t
693
571
 
694
572
 
695
 
class TestLocalTransports(tests.TestCase):
 
573
class TestLocalTransports(TestCase):
696
574
 
697
575
    def test_get_transport_from_abspath(self):
698
 
        here = osutils.abspath('.')
699
 
        t = transport.get_transport(here)
700
 
        self.assertIsInstance(t, local.LocalTransport)
 
576
        here = os.path.abspath('.')
 
577
        t = get_transport(here)
 
578
        self.assertIsInstance(t, LocalTransport)
701
579
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
702
580
 
703
581
    def test_get_transport_from_relpath(self):
704
 
        here = osutils.abspath('.')
705
 
        t = transport.get_transport('.')
706
 
        self.assertIsInstance(t, local.LocalTransport)
 
582
        here = os.path.abspath('.')
 
583
        t = get_transport('.')
 
584
        self.assertIsInstance(t, LocalTransport)
707
585
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
708
586
 
709
587
    def test_get_transport_from_local_url(self):
710
 
        here = osutils.abspath('.')
 
588
        here = os.path.abspath('.')
711
589
        here_url = urlutils.local_path_to_url(here) + '/'
712
 
        t = transport.get_transport(here_url)
713
 
        self.assertIsInstance(t, local.LocalTransport)
 
590
        t = get_transport(here_url)
 
591
        self.assertIsInstance(t, LocalTransport)
714
592
        self.assertEquals(t.base, here_url)
715
593
 
716
594
    def test_local_abspath(self):
717
 
        here = osutils.abspath('.')
718
 
        t = transport.get_transport(here)
 
595
        here = os.path.abspath('.')
 
596
        t = get_transport(here)
719
597
        self.assertEquals(t.local_abspath(''), here)
720
598
 
721
599
 
722
 
class TestWin32LocalTransport(tests.TestCase):
 
600
class TestWin32LocalTransport(TestCase):
723
601
 
724
602
    def test_unc_clone_to_root(self):
725
603
        # Win32 UNC path like \\HOST\path
726
604
        # clone to root should stop at least at \\HOST part
727
605
        # not on \\
728
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
606
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
729
607
        for i in xrange(4):
730
608
            t = t.clone('..')
731
609
        self.assertEquals(t.base, 'file://HOST/')
734
612
        self.assertEquals(t.base, 'file://HOST/')
735
613
 
736
614
 
737
 
class TestConnectedTransport(tests.TestCase):
738
 
    """Tests for connected to remote server transports"""
739
 
 
740
 
    def test_parse_url(self):
741
 
        t = transport.ConnectedTransport(
742
 
            'http://simple.example.com/home/source')
743
 
        self.assertEquals(t._host, 'simple.example.com')
744
 
        self.assertEquals(t._port, None)
745
 
        self.assertEquals(t._path, '/home/source/')
746
 
        self.failUnless(t._user is None)
747
 
        self.failUnless(t._password is None)
748
 
 
749
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
750
 
 
751
 
    def test_parse_url_with_at_in_user(self):
752
 
        # Bug 228058
753
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
754
 
        self.assertEquals(t._user, 'user@host.com')
755
 
 
756
 
    def test_parse_quoted_url(self):
757
 
        t = transport.ConnectedTransport(
758
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
759
 
        self.assertEquals(t._host, 'exAmple.com')
760
 
        self.assertEquals(t._port, 2222)
761
 
        self.assertEquals(t._user, 'robey')
762
 
        self.assertEquals(t._password, 'h@t')
763
 
        self.assertEquals(t._path, '/path/')
764
 
 
765
 
        # Base should not keep track of the password
766
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
767
 
 
768
 
    def test_parse_invalid_url(self):
769
 
        self.assertRaises(errors.InvalidURL,
770
 
                          transport.ConnectedTransport,
771
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
772
 
 
773
 
    def test_relpath(self):
774
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
775
 
 
776
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
777
 
        self.assertRaises(errors.PathNotChild, t.relpath,
778
 
                          'http://user@host.com/abs/path/sub')
779
 
        self.assertRaises(errors.PathNotChild, t.relpath,
780
 
                          'sftp://user2@host.com/abs/path/sub')
781
 
        self.assertRaises(errors.PathNotChild, t.relpath,
782
 
                          'sftp://user@otherhost.com/abs/path/sub')
783
 
        self.assertRaises(errors.PathNotChild, t.relpath,
784
 
                          'sftp://user@host.com:33/abs/path/sub')
785
 
        # Make sure it works when we don't supply a username
786
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
787
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
788
 
 
789
 
        # Make sure it works when parts of the path will be url encoded
790
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
791
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
792
 
 
793
 
    def test_connection_sharing_propagate_credentials(self):
794
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
795
 
        self.assertEquals('user', t._user)
796
 
        self.assertEquals('host.com', t._host)
797
 
        self.assertIs(None, t._get_connection())
798
 
        self.assertIs(None, t._password)
799
 
        c = t.clone('subdir')
800
 
        self.assertIs(None, c._get_connection())
801
 
        self.assertIs(None, t._password)
802
 
 
803
 
        # Simulate the user entering a password
804
 
        password = 'secret'
805
 
        connection = object()
806
 
        t._set_connection(connection, password)
807
 
        self.assertIs(connection, t._get_connection())
808
 
        self.assertIs(password, t._get_credentials())
809
 
        self.assertIs(connection, c._get_connection())
810
 
        self.assertIs(password, c._get_credentials())
811
 
 
812
 
        # credentials can be updated
813
 
        new_password = 'even more secret'
814
 
        c._update_credentials(new_password)
815
 
        self.assertIs(connection, t._get_connection())
816
 
        self.assertIs(new_password, t._get_credentials())
817
 
        self.assertIs(connection, c._get_connection())
818
 
        self.assertIs(new_password, c._get_credentials())
819
 
 
820
 
 
821
 
class TestReusedTransports(tests.TestCase):
822
 
    """Tests for transport reuse"""
823
 
 
824
 
    def test_reuse_same_transport(self):
825
 
        possible_transports = []
826
 
        t1 = transport.get_transport('http://foo/',
827
 
                                     possible_transports=possible_transports)
828
 
        self.assertEqual([t1], possible_transports)
829
 
        t2 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=[t1])
831
 
        self.assertIs(t1, t2)
832
 
 
833
 
        # Also check that final '/' are handled correctly
834
 
        t3 = transport.get_transport('http://foo/path/')
835
 
        t4 = transport.get_transport('http://foo/path',
836
 
                                     possible_transports=[t3])
837
 
        self.assertIs(t3, t4)
838
 
 
839
 
        t5 = transport.get_transport('http://foo/path')
840
 
        t6 = transport.get_transport('http://foo/path/',
841
 
                                     possible_transports=[t5])
842
 
        self.assertIs(t5, t6)
843
 
 
844
 
    def test_don_t_reuse_different_transport(self):
845
 
        t1 = transport.get_transport('http://foo/path')
846
 
        t2 = transport.get_transport('http://bar/path',
847
 
                                     possible_transports=[t1])
848
 
        self.assertIsNot(t1, t2)
849
 
 
850
 
 
851
 
class TestTransportTrace(tests.TestCase):
852
 
 
853
 
    def test_get(self):
854
 
        t = transport.get_transport('trace+memory://')
855
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
856
 
 
857
 
    def test_clone_preserves_activity(self):
858
 
        t = transport.get_transport('trace+memory://')
859
 
        t2 = t.clone('.')
860
 
        self.assertTrue(t is not t2)
861
 
        self.assertTrue(t._activity is t2._activity)
862
 
 
863
 
    # the following specific tests are for the operations that have made use of
864
 
    # logging in tests; we could test every single operation but doing that
865
 
    # still won't cause a test failure when the top level Transport API
866
 
    # changes; so there is little return doing that.
867
 
    def test_get(self):
868
 
        t = transport.get_transport('trace+memory:///')
869
 
        t.put_bytes('foo', 'barish')
870
 
        t.get('foo')
871
 
        expected_result = []
872
 
        # put_bytes records the bytes, not the content to avoid memory
873
 
        # pressure.
874
 
        expected_result.append(('put_bytes', 'foo', 6, None))
875
 
        # get records the file name only.
876
 
        expected_result.append(('get', 'foo'))
877
 
        self.assertEqual(expected_result, t._activity)
878
 
 
879
 
    def test_readv(self):
880
 
        t = transport.get_transport('trace+memory:///')
881
 
        t.put_bytes('foo', 'barish')
882
 
        list(t.readv('foo', [(0, 1), (3, 2)],
883
 
                     adjust_for_latency=True, upper_limit=6))
884
 
        expected_result = []
885
 
        # put_bytes records the bytes, not the content to avoid memory
886
 
        # pressure.
887
 
        expected_result.append(('put_bytes', 'foo', 6, None))
888
 
        # readv records the supplied offset request
889
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
890
 
        self.assertEqual(expected_result, t._activity)
891
 
 
892
 
 
893
 
class TestSSHConnections(tests.TestCaseWithTransport):
894
 
 
895
 
    def test_bzr_connect_to_bzr_ssh(self):
896
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
897
 
 
898
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
899
 
        """
900
 
        # This test actually causes a bzr instance to be invoked, which is very
901
 
        # expensive: it should be the only such test in the test suite.
902
 
        # A reasonable evolution for this would be to simply check inside
903
 
        # check_channel_exec_request that the command is appropriate, and then
904
 
        # satisfy requests in-process.
905
 
        self.requireFeature(features.paramiko)
906
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
907
 
        # override the interface (doesn't change self._vendor).
908
 
        # Note that this does encryption, so can be slow.
909
 
        from bzrlib.tests import stub_sftp
910
 
 
911
 
        # Start an SSH server
912
 
        self.command_executed = []
913
 
        # XXX: This is horrible -- we define a really dumb SSH server that
914
 
        # executes commands, and manage the hooking up of stdin/out/err to the
915
 
        # SSH channel ourselves.  Surely this has already been implemented
916
 
        # elsewhere?
917
 
        started = []
918
 
        class StubSSHServer(stub_sftp.StubServer):
919
 
 
920
 
            test = self
921
 
 
922
 
            def check_channel_exec_request(self, channel, command):
923
 
                self.test.command_executed.append(command)
924
 
                proc = subprocess.Popen(
925
 
                    command, shell=True, stdin=subprocess.PIPE,
926
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
927
 
 
928
 
                # XXX: horribly inefficient, not to mention ugly.
929
 
                # Start a thread for each of stdin/out/err, and relay bytes from
930
 
                # the subprocess to channel and vice versa.
931
 
                def ferry_bytes(read, write, close):
932
 
                    while True:
933
 
                        bytes = read(1)
934
 
                        if bytes == '':
935
 
                            close()
936
 
                            break
937
 
                        write(bytes)
938
 
 
939
 
                file_functions = [
940
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
941
 
                    (proc.stdout.read, channel.sendall, channel.close),
942
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
943
 
                started.append(proc)
944
 
                for read, write, close in file_functions:
945
 
                    t = threading.Thread(
946
 
                        target=ferry_bytes, args=(read, write, close))
947
 
                    t.start()
948
 
                    started.append(t)
949
 
 
950
 
                return True
951
 
 
952
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
953
 
        # We *don't* want to override the default SSH vendor: the detected one
954
 
        # is the one to use.
955
 
        self.start_server(ssh_server)
956
 
        port = ssh_server._listener.port
957
 
 
958
 
        if sys.platform == 'win32':
959
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
960
 
        else:
961
 
            bzr_remote_path = self.get_bzr_path()
962
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
963
 
 
964
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
965
 
        # variable is used to tell bzr what command to run on the remote end.
966
 
        path_to_branch = osutils.abspath('.')
967
 
        if sys.platform == 'win32':
968
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
969
 
            # prefix a '/' to get the right path.
970
 
            path_to_branch = '/' + path_to_branch
971
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
972
 
        t = transport.get_transport(url)
973
 
        self.permit_url(t.base)
974
 
        t.mkdir('foo')
975
 
 
976
 
        self.assertEqual(
977
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
978
 
            self.command_executed)
979
 
        # Make sure to disconnect, so that the remote process can stop, and we
980
 
        # can cleanup. Then pause the test until everything is shutdown
981
 
        t._client._medium.disconnect()
982
 
        if not started:
983
 
            return
984
 
        # First wait for the subprocess
985
 
        started[0].wait()
986
 
        # And the rest are threads
987
 
        for t in started[1:]:
988
 
            t.join()
 
615
def get_test_permutations():
 
616
    """Return transport permutations to be used in testing.
 
617
 
 
618
    This module registers some transports, but they're only for testing
 
619
    registration.  We don't really want to run all the transport tests against
 
620
    them.
 
621
    """
 
622
    return []