~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import subprocess
 
20
import sys
 
21
import threading
19
22
 
20
 
import bzrlib
21
23
from bzrlib import (
22
24
    errors,
23
25
    osutils,
 
26
    tests,
 
27
    transport,
24
28
    urlutils,
25
29
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
30
from bzrlib.transport import (
 
31
    chroot,
 
32
    fakenfs,
 
33
    http,
 
34
    local,
 
35
    memory,
 
36
    pathfilter,
 
37
    readonly,
 
38
    )
 
39
from bzrlib.tests import (
 
40
    features,
 
41
    test_server,
 
42
    )
51
43
 
52
44
 
53
45
# TODO: Should possibly split transport-specific tests into their own files.
54
46
 
55
47
 
56
 
class TestTransport(TestCase):
 
48
class TestTransport(tests.TestCase):
57
49
    """Test the non transport-concrete class functionality."""
58
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
59
54
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
 
55
        handlers = transport._get_protocol_handlers()
61
56
        self.assertNotEqual([], handlers.keys( ))
62
57
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
65
60
        finally:
66
 
            _set_protocol_handlers(handlers)
 
61
            transport._set_protocol_handlers(handlers)
67
62
 
68
63
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
64
        handlers = transport._get_protocol_handlers()
70
65
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
66
        transport._clear_protocol_handlers()
72
67
        class SampleHandler(object):
73
68
            """I exist, isnt that enough?"""
74
69
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
82
79
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
85
83
        finally:
86
 
            _set_protocol_handlers(handlers)
 
84
            transport._set_protocol_handlers(handlers)
87
85
 
88
86
    def test_transport_dependency(self):
89
87
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
88
        saved_handlers = transport._get_protocol_handlers()
91
89
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
90
        transport._clear_protocol_handlers()
93
91
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
97
95
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
100
98
                e_str = str(e)
101
99
                self.assertEquals('Unsupported protocol'
102
100
                                  ' for url "foo://fooserver/foo":'
106
104
                self.fail('Did not raise UnsupportedProtocol')
107
105
        finally:
108
106
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
 
107
            transport._set_protocol_handlers(saved_handlers)
 
108
 
111
109
    def test_transport_fallback(self):
112
110
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
 
111
        saved_handlers = transport._get_protocol_handlers()
114
112
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
122
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
121
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
122
            transport._set_protocol_handlers(saved_handlers)
 
123
 
 
124
    def test_ssh_hints(self):
 
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
126
        try:
 
127
            transport.get_transport('ssh://fooserver/foo')
 
128
        except errors.UnsupportedProtocol, e:
 
129
            e_str = str(e)
 
130
            self.assertEquals('Unsupported protocol'
 
131
                              ' for url "ssh://fooserver/foo":'
 
132
                              ' bzr supports bzr+ssh to operate over ssh,'
 
133
                              ' use "bzr+ssh://fooserver/foo".',
 
134
                              str(e))
 
135
        else:
 
136
            self.fail('Did not raise UnsupportedProtocol')
125
137
 
126
138
    def test_LateReadError(self):
127
139
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
 
140
        a_file = transport.LateReadError('a path')
129
141
        try:
130
142
            a_file.read()
131
 
        except ReadError, error:
 
143
        except errors.ReadError, error:
132
144
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
 
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
134
146
        a_file.close()
135
147
 
136
148
    def test__combine_paths(self):
137
 
        t = Transport('/')
 
149
        t = transport.Transport('/')
138
150
        self.assertEqual('/home/sarah/project/foo',
139
151
                         t._combine_paths('/home/sarah', 'project/foo'))
140
152
        self.assertEqual('/etc',
146
158
 
147
159
    def test_local_abspath_non_local_transport(self):
148
160
        # the base implementation should throw
149
 
        t = MemoryTransport()
 
161
        t = memory.MemoryTransport()
150
162
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
163
        self.assertEqual('memory:///t is not a local path.', str(e))
152
164
 
153
165
 
154
 
class TestCoalesceOffsets(TestCase):
155
 
    
156
 
    def check(self, expected, offsets, limit=0, fudge=0):
157
 
        coalesce = Transport._coalesce_offsets
158
 
        exp = [_CoalescedOffset(*x) for x in expected]
159
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
166
class TestCoalesceOffsets(tests.TestCase):
 
167
 
 
168
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
169
        coalesce = transport.Transport._coalesce_offsets
 
170
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
171
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
172
                            max_size=max_size))
160
173
        self.assertEqual(exp, out)
161
174
 
162
175
    def test_coalesce_empty(self):
169
182
        self.check([(0, 10, [(0, 10)]),
170
183
                    (20, 10, [(0, 10)]),
171
184
                   ], [(0, 10), (20, 10)])
172
 
            
 
185
 
173
186
    def test_coalesce_unsorted(self):
174
187
        self.check([(20, 10, [(0, 10)]),
175
188
                    (0, 10, [(0, 10)]),
180
193
                   [(0, 10), (10, 10)])
181
194
 
182
195
    def test_coalesce_overlapped(self):
183
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
184
 
                   [(0, 10), (5, 10)])
 
196
        self.assertRaises(ValueError,
 
197
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
198
                        [(0, 10), (5, 10)])
185
199
 
186
200
    def test_coalesce_limit(self):
187
201
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
208
222
                   ], [(10, 10), (30, 10), (100, 10)],
209
223
                   fudge=10
210
224
                  )
211
 
 
212
 
 
213
 
class TestMemoryTransport(TestCase):
 
225
    def test_coalesce_max_size(self):
 
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
227
                    (30, 50, [(0, 50)]),
 
228
                    # If one range is above max_size, it gets its own coalesced
 
229
                    # offset
 
230
                    (100, 80, [(0, 80),]),],
 
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
232
                   max_size=50
 
233
                  )
 
234
 
 
235
    def test_coalesce_no_max_size(self):
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
238
                  )
 
239
 
 
240
    def test_coalesce_default_limit(self):
 
241
        # By default we use a 100MB max size.
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
248
                   max_size=1*1024*1024*1024)
 
249
 
 
250
 
 
251
class TestMemoryServer(tests.TestCase):
 
252
 
 
253
    def test_create_server(self):
 
254
        server = memory.MemoryServer()
 
255
        server.start_server()
 
256
        url = server.get_url()
 
257
        self.assertTrue(url in transport.transport_list_registry)
 
258
        t = transport.get_transport(url)
 
259
        del t
 
260
        server.stop_server()
 
261
        self.assertFalse(url in transport.transport_list_registry)
 
262
        self.assertRaises(errors.UnsupportedProtocol,
 
263
                          transport.get_transport, url)
 
264
 
 
265
 
 
266
class TestMemoryTransport(tests.TestCase):
214
267
 
215
268
    def test_get_transport(self):
216
 
        MemoryTransport()
 
269
        memory.MemoryTransport()
217
270
 
218
271
    def test_clone(self):
219
 
        transport = MemoryTransport()
220
 
        self.assertTrue(isinstance(transport, MemoryTransport))
221
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
272
        t = memory.MemoryTransport()
 
273
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
274
        self.assertEqual("memory:///", t.clone("/").base)
222
275
 
223
276
    def test_abspath(self):
224
 
        transport = MemoryTransport()
225
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
277
        t = memory.MemoryTransport()
 
278
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
226
279
 
227
280
    def test_abspath_of_root(self):
228
 
        transport = MemoryTransport()
229
 
        self.assertEqual("memory:///", transport.base)
230
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
281
        t = memory.MemoryTransport()
 
282
        self.assertEqual("memory:///", t.base)
 
283
        self.assertEqual("memory:///", t.abspath('/'))
231
284
 
232
285
    def test_abspath_of_relpath_starting_at_root(self):
233
 
        transport = MemoryTransport()
234
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
286
        t = memory.MemoryTransport()
 
287
        self.assertEqual("memory:///foo", t.abspath('/foo'))
235
288
 
236
289
    def test_append_and_get(self):
237
 
        transport = MemoryTransport()
238
 
        transport.append_bytes('path', 'content')
239
 
        self.assertEqual(transport.get('path').read(), 'content')
240
 
        transport.append_file('path', StringIO('content'))
241
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
290
        t = memory.MemoryTransport()
 
291
        t.append_bytes('path', 'content')
 
292
        self.assertEqual(t.get('path').read(), 'content')
 
293
        t.append_file('path', StringIO('content'))
 
294
        self.assertEqual(t.get('path').read(), 'contentcontent')
242
295
 
243
296
    def test_put_and_get(self):
244
 
        transport = MemoryTransport()
245
 
        transport.put_file('path', StringIO('content'))
246
 
        self.assertEqual(transport.get('path').read(), 'content')
247
 
        transport.put_bytes('path', 'content')
248
 
        self.assertEqual(transport.get('path').read(), 'content')
 
297
        t = memory.MemoryTransport()
 
298
        t.put_file('path', StringIO('content'))
 
299
        self.assertEqual(t.get('path').read(), 'content')
 
300
        t.put_bytes('path', 'content')
 
301
        self.assertEqual(t.get('path').read(), 'content')
249
302
 
250
303
    def test_append_without_dir_fails(self):
251
 
        transport = MemoryTransport()
252
 
        self.assertRaises(NoSuchFile,
253
 
                          transport.append_bytes, 'dir/path', 'content')
 
304
        t = memory.MemoryTransport()
 
305
        self.assertRaises(errors.NoSuchFile,
 
306
                          t.append_bytes, 'dir/path', 'content')
254
307
 
255
308
    def test_put_without_dir_fails(self):
256
 
        transport = MemoryTransport()
257
 
        self.assertRaises(NoSuchFile,
258
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
309
        t = memory.MemoryTransport()
 
310
        self.assertRaises(errors.NoSuchFile,
 
311
                          t.put_file, 'dir/path', StringIO('content'))
259
312
 
260
313
    def test_get_missing(self):
261
 
        transport = MemoryTransport()
262
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
314
        transport = memory.MemoryTransport()
 
315
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
263
316
 
264
317
    def test_has_missing(self):
265
 
        transport = MemoryTransport()
266
 
        self.assertEquals(False, transport.has('foo'))
 
318
        t = memory.MemoryTransport()
 
319
        self.assertEquals(False, t.has('foo'))
267
320
 
268
321
    def test_has_present(self):
269
 
        transport = MemoryTransport()
270
 
        transport.append_bytes('foo', 'content')
271
 
        self.assertEquals(True, transport.has('foo'))
 
322
        t = memory.MemoryTransport()
 
323
        t.append_bytes('foo', 'content')
 
324
        self.assertEquals(True, t.has('foo'))
272
325
 
273
326
    def test_list_dir(self):
274
 
        transport = MemoryTransport()
275
 
        transport.put_bytes('foo', 'content')
276
 
        transport.mkdir('dir')
277
 
        transport.put_bytes('dir/subfoo', 'content')
278
 
        transport.put_bytes('dirlike', 'content')
 
327
        t = memory.MemoryTransport()
 
328
        t.put_bytes('foo', 'content')
 
329
        t.mkdir('dir')
 
330
        t.put_bytes('dir/subfoo', 'content')
 
331
        t.put_bytes('dirlike', 'content')
279
332
 
280
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
281
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
333
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
334
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
282
335
 
283
336
    def test_mkdir(self):
284
 
        transport = MemoryTransport()
285
 
        transport.mkdir('dir')
286
 
        transport.append_bytes('dir/path', 'content')
287
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
337
        t = memory.MemoryTransport()
 
338
        t.mkdir('dir')
 
339
        t.append_bytes('dir/path', 'content')
 
340
        self.assertEqual(t.get('dir/path').read(), 'content')
288
341
 
289
342
    def test_mkdir_missing_parent(self):
290
 
        transport = MemoryTransport()
291
 
        self.assertRaises(NoSuchFile,
292
 
                          transport.mkdir, 'dir/dir')
 
343
        t = memory.MemoryTransport()
 
344
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
293
345
 
294
346
    def test_mkdir_twice(self):
295
 
        transport = MemoryTransport()
296
 
        transport.mkdir('dir')
297
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
347
        t = memory.MemoryTransport()
 
348
        t.mkdir('dir')
 
349
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
298
350
 
299
351
    def test_parameters(self):
300
 
        transport = MemoryTransport()
301
 
        self.assertEqual(True, transport.listable())
302
 
        self.assertEqual(False, transport.is_readonly())
 
352
        t = memory.MemoryTransport()
 
353
        self.assertEqual(True, t.listable())
 
354
        self.assertEqual(False, t.is_readonly())
303
355
 
304
356
    def test_iter_files_recursive(self):
305
 
        transport = MemoryTransport()
306
 
        transport.mkdir('dir')
307
 
        transport.put_bytes('dir/foo', 'content')
308
 
        transport.put_bytes('dir/bar', 'content')
309
 
        transport.put_bytes('bar', 'content')
310
 
        paths = set(transport.iter_files_recursive())
 
357
        t = memory.MemoryTransport()
 
358
        t.mkdir('dir')
 
359
        t.put_bytes('dir/foo', 'content')
 
360
        t.put_bytes('dir/bar', 'content')
 
361
        t.put_bytes('bar', 'content')
 
362
        paths = set(t.iter_files_recursive())
311
363
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
312
364
 
313
365
    def test_stat(self):
314
 
        transport = MemoryTransport()
315
 
        transport.put_bytes('foo', 'content')
316
 
        transport.put_bytes('bar', 'phowar')
317
 
        self.assertEqual(7, transport.stat('foo').st_size)
318
 
        self.assertEqual(6, transport.stat('bar').st_size)
319
 
 
320
 
 
321
 
class ChrootDecoratorTransportTest(TestCase):
 
366
        t = memory.MemoryTransport()
 
367
        t.put_bytes('foo', 'content')
 
368
        t.put_bytes('bar', 'phowar')
 
369
        self.assertEqual(7, t.stat('foo').st_size)
 
370
        self.assertEqual(6, t.stat('bar').st_size)
 
371
 
 
372
 
 
373
class ChrootDecoratorTransportTest(tests.TestCase):
322
374
    """Chroot decoration specific tests."""
323
375
 
324
376
    def test_abspath(self):
325
377
        # The abspath is always relative to the chroot_url.
326
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
327
 
        server.setUp()
328
 
        transport = get_transport(server.get_url())
329
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
378
        server = chroot.ChrootServer(
 
379
            transport.get_transport('memory:///foo/bar/'))
 
380
        self.start_server(server)
 
381
        t = transport.get_transport(server.get_url())
 
382
        self.assertEqual(server.get_url(), t.abspath('/'))
330
383
 
331
 
        subdir_transport = transport.clone('subdir')
332
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
333
 
        server.tearDown()
 
384
        subdir_t = t.clone('subdir')
 
385
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
334
386
 
335
387
    def test_clone(self):
336
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
337
 
        server.setUp()
338
 
        transport = get_transport(server.get_url())
 
388
        server = chroot.ChrootServer(
 
389
            transport.get_transport('memory:///foo/bar/'))
 
390
        self.start_server(server)
 
391
        t = transport.get_transport(server.get_url())
339
392
        # relpath from root and root path are the same
340
 
        relpath_cloned = transport.clone('foo')
341
 
        abspath_cloned = transport.clone('/foo')
 
393
        relpath_cloned = t.clone('foo')
 
394
        abspath_cloned = t.clone('/foo')
342
395
        self.assertEqual(server, relpath_cloned.server)
343
396
        self.assertEqual(server, abspath_cloned.server)
344
 
        server.tearDown()
345
 
    
 
397
 
346
398
    def test_chroot_url_preserves_chroot(self):
347
399
        """Calling get_transport on a chroot transport's base should produce a
348
400
        transport with exactly the same behaviour as the original chroot
351
403
        This is so that it is not possible to escape a chroot by doing::
352
404
            url = chroot_transport.base
353
405
            parent_url = urlutils.join(url, '..')
354
 
            new_transport = get_transport(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
355
407
        """
356
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
357
 
        server.setUp()
358
 
        transport = get_transport(server.get_url())
359
 
        new_transport = get_transport(transport.base)
360
 
        self.assertEqual(transport.server, new_transport.server)
361
 
        self.assertEqual(transport.base, new_transport.base)
362
 
        server.tearDown()
363
 
        
 
408
        server = chroot.ChrootServer(
 
409
            transport.get_transport('memory:///path/subpath'))
 
410
        self.start_server(server)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
 
413
        self.assertEqual(t.server, new_t.server)
 
414
        self.assertEqual(t.base, new_t.base)
 
415
 
364
416
    def test_urljoin_preserves_chroot(self):
365
417
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
366
418
        URL that escapes the intended chroot.
368
420
        This is so that it is not possible to escape a chroot by doing::
369
421
            url = chroot_transport.base
370
422
            parent_url = urlutils.join(url, '..')
371
 
            new_transport = get_transport(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
372
424
        """
373
 
        server = ChrootServer(get_transport('memory:///path/'))
374
 
        server.setUp()
375
 
        transport = get_transport(server.get_url())
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
426
        self.start_server(server)
 
427
        t = transport.get_transport(server.get_url())
376
428
        self.assertRaises(
377
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
378
 
        server.tearDown()
379
 
 
380
 
 
381
 
class ChrootServerTest(TestCase):
 
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
430
 
 
431
 
 
432
class TestChrootServer(tests.TestCase):
382
433
 
383
434
    def test_construct(self):
384
 
        backing_transport = MemoryTransport()
385
 
        server = ChrootServer(backing_transport)
 
435
        backing_transport = memory.MemoryTransport()
 
436
        server = chroot.ChrootServer(backing_transport)
386
437
        self.assertEqual(backing_transport, server.backing_transport)
387
438
 
388
439
    def test_setUp(self):
389
 
        backing_transport = MemoryTransport()
390
 
        server = ChrootServer(backing_transport)
391
 
        server.setUp()
392
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
440
        backing_transport = memory.MemoryTransport()
 
441
        server = chroot.ChrootServer(backing_transport)
 
442
        server.start_server()
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
393
448
 
394
 
    def test_tearDown(self):
395
 
        backing_transport = MemoryTransport()
396
 
        server = ChrootServer(backing_transport)
397
 
        server.setUp()
398
 
        server.tearDown()
399
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
449
    def test_stop_server(self):
 
450
        backing_transport = memory.MemoryTransport()
 
451
        server = chroot.ChrootServer(backing_transport)
 
452
        server.start_server()
 
453
        server.stop_server()
 
454
        self.assertFalse(server.scheme
 
455
                         in transport._get_protocol_handlers().keys())
400
456
 
401
457
    def test_get_url(self):
402
 
        backing_transport = MemoryTransport()
403
 
        server = ChrootServer(backing_transport)
404
 
        server.setUp()
405
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
406
 
        server.tearDown()
407
 
 
408
 
 
409
 
class ReadonlyDecoratorTransportTest(TestCase):
 
458
        backing_transport = memory.MemoryTransport()
 
459
        server = chroot.ChrootServer(backing_transport)
 
460
        server.start_server()
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
 
465
 
 
466
 
 
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
468
    """Pathfilter decoration specific tests."""
 
469
 
 
470
    def test_abspath(self):
 
471
        # The abspath is always relative to the base of the backing transport.
 
472
        server = pathfilter.PathFilteringServer(
 
473
            transport.get_transport('memory:///foo/bar/'),
 
474
            lambda x: x)
 
475
        server.start_server()
 
476
        t = transport.get_transport(server.get_url())
 
477
        self.assertEqual(server.get_url(), t.abspath('/'))
 
478
 
 
479
        subdir_t = t.clone('subdir')
 
480
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
481
        server.stop_server()
 
482
 
 
483
    def make_pf_transport(self, filter_func=None):
 
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
485
        
 
486
        :param filter_func: by default this will be a no-op function.  Use this
 
487
            parameter to override it."""
 
488
        if filter_func is None:
 
489
            filter_func = lambda x: x
 
490
        server = pathfilter.PathFilteringServer(
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
492
        server.start_server()
 
493
        self.addCleanup(server.stop_server)
 
494
        return transport.get_transport(server.get_url())
 
495
 
 
496
    def test__filter(self):
 
497
        # _filter (with an identity func as filter_func) always returns
 
498
        # paths relative to the base of the backing transport.
 
499
        t = self.make_pf_transport()
 
500
        self.assertEqual('foo', t._filter('foo'))
 
501
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
502
        self.assertEqual('', t._filter('..'))
 
503
        self.assertEqual('', t._filter('/'))
 
504
        # The base of the pathfiltering transport is taken into account too.
 
505
        t = t.clone('subdir1/subdir2')
 
506
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
507
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
508
        self.assertEqual('subdir1', t._filter('..'))
 
509
        self.assertEqual('', t._filter('/'))
 
510
 
 
511
    def test_filter_invocation(self):
 
512
        filter_log = []
 
513
        def filter(path):
 
514
            filter_log.append(path)
 
515
            return path
 
516
        t = self.make_pf_transport(filter)
 
517
        t.has('abc')
 
518
        self.assertEqual(['abc'], filter_log)
 
519
        del filter_log[:]
 
520
        t.clone('abc').has('xyz')
 
521
        self.assertEqual(['abc/xyz'], filter_log)
 
522
        del filter_log[:]
 
523
        t.has('/abc')
 
524
        self.assertEqual(['abc'], filter_log)
 
525
 
 
526
    def test_clone(self):
 
527
        t = self.make_pf_transport()
 
528
        # relpath from root and root path are the same
 
529
        relpath_cloned = t.clone('foo')
 
530
        abspath_cloned = t.clone('/foo')
 
531
        self.assertEqual(t.server, relpath_cloned.server)
 
532
        self.assertEqual(t.server, abspath_cloned.server)
 
533
 
 
534
    def test_url_preserves_pathfiltering(self):
 
535
        """Calling get_transport on a pathfiltered transport's base should
 
536
        produce a transport with exactly the same behaviour as the original
 
537
        pathfiltered transport.
 
538
 
 
539
        This is so that it is not possible to escape (accidentally or
 
540
        otherwise) the filtering by doing::
 
541
            url = filtered_transport.base
 
542
            parent_url = urlutils.join(url, '..')
 
543
            new_t = transport.get_transport(parent_url)
 
544
        """
 
545
        t = self.make_pf_transport()
 
546
        new_t = transport.get_transport(t.base)
 
547
        self.assertEqual(t.server, new_t.server)
 
548
        self.assertEqual(t.base, new_t.base)
 
549
 
 
550
 
 
551
class ReadonlyDecoratorTransportTest(tests.TestCase):
410
552
    """Readonly decoration specific tests."""
411
553
 
412
554
    def test_local_parameters(self):
413
 
        import bzrlib.transport.readonly as readonly
414
555
        # connect to . in readonly mode
415
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
416
 
        self.assertEqual(True, transport.listable())
417
 
        self.assertEqual(True, transport.is_readonly())
 
556
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
557
        self.assertEqual(True, t.listable())
 
558
        self.assertEqual(True, t.is_readonly())
418
559
 
419
560
    def test_http_parameters(self):
420
 
        from bzrlib.tests.HttpServer import HttpServer
421
 
        import bzrlib.transport.readonly as readonly
422
 
        # connect to . via http which is not listable
 
561
        from bzrlib.tests.http_server import HttpServer
 
562
        # connect to '.' via http which is not listable
423
563
        server = HttpServer()
424
 
        server.setUp()
425
 
        try:
426
 
            transport = get_transport('readonly+' + server.get_url())
427
 
            self.failUnless(isinstance(transport,
428
 
                                       readonly.ReadonlyTransportDecorator))
429
 
            self.assertEqual(False, transport.listable())
430
 
            self.assertEqual(True, transport.is_readonly())
431
 
        finally:
432
 
            server.tearDown()
433
 
 
434
 
 
435
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
564
        self.start_server(server)
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
567
        self.assertEqual(False, t.listable())
 
568
        self.assertEqual(True, t.is_readonly())
 
569
 
 
570
 
 
571
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
436
572
    """NFS decorator specific tests."""
437
573
 
438
574
    def get_nfs_transport(self, url):
439
 
        import bzrlib.transport.fakenfs as fakenfs
440
575
        # connect to url with nfs decoration
441
576
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
442
577
 
443
578
    def test_local_parameters(self):
444
579
        # the listable and is_readonly parameters
445
580
        # are not changed by the fakenfs decorator
446
 
        transport = self.get_nfs_transport('.')
447
 
        self.assertEqual(True, transport.listable())
448
 
        self.assertEqual(False, transport.is_readonly())
 
581
        t = self.get_nfs_transport('.')
 
582
        self.assertEqual(True, t.listable())
 
583
        self.assertEqual(False, t.is_readonly())
449
584
 
450
585
    def test_http_parameters(self):
451
586
        # the listable and is_readonly parameters
452
587
        # are not changed by the fakenfs decorator
453
 
        from bzrlib.tests.HttpServer import HttpServer
454
 
        # connect to . via http which is not listable
 
588
        from bzrlib.tests.http_server import HttpServer
 
589
        # connect to '.' via http which is not listable
455
590
        server = HttpServer()
456
 
        server.setUp()
457
 
        try:
458
 
            transport = self.get_nfs_transport(server.get_url())
459
 
            self.assertIsInstance(
460
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
461
 
            self.assertEqual(False, transport.listable())
462
 
            self.assertEqual(True, transport.is_readonly())
463
 
        finally:
464
 
            server.tearDown()
 
591
        self.start_server(server)
 
592
        t = self.get_nfs_transport(server.get_url())
 
593
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
594
        self.assertEqual(False, t.listable())
 
595
        self.assertEqual(True, t.is_readonly())
465
596
 
466
597
    def test_fakenfs_server_default(self):
467
598
        # a FakeNFSServer() should bring up a local relpath server for itself
468
 
        import bzrlib.transport.fakenfs as fakenfs
469
 
        server = fakenfs.FakeNFSServer()
470
 
        server.setUp()
471
 
        try:
472
 
            # the url should be decorated appropriately
473
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
474
 
            # and we should be able to get a transport for it
475
 
            transport = get_transport(server.get_url())
476
 
            # which must be a FakeNFSTransportDecorator instance.
477
 
            self.assertIsInstance(
478
 
                transport, fakenfs.FakeNFSTransportDecorator)
479
 
        finally:
480
 
            server.tearDown()
 
599
        server = test_server.FakeNFSServer()
 
600
        self.start_server(server)
 
601
        # the url should be decorated appropriately
 
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
603
        # and we should be able to get a transport for it
 
604
        t = transport.get_transport(server.get_url())
 
605
        # which must be a FakeNFSTransportDecorator instance.
 
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
481
607
 
482
608
    def test_fakenfs_rename_semantics(self):
483
609
        # a FakeNFS transport must mangle the way rename errors occur to
484
610
        # look like NFS problems.
485
 
        transport = self.get_nfs_transport('.')
 
611
        t = self.get_nfs_transport('.')
486
612
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
487
 
                        transport=transport)
488
 
        self.assertRaises(errors.ResourceBusy,
489
 
                          transport.rename, 'from', 'to')
490
 
 
491
 
 
492
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
613
                        transport=t)
 
614
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
615
 
 
616
 
 
617
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
493
618
    """Tests for simulation of VFAT restrictions"""
494
619
 
495
620
    def get_vfat_transport(self, url):
499
624
 
500
625
    def test_transport_creation(self):
501
626
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
502
 
        transport = self.get_vfat_transport('.')
503
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
627
        t = self.get_vfat_transport('.')
 
628
        self.assertIsInstance(t, FakeVFATTransportDecorator)
504
629
 
505
630
    def test_transport_mkdir(self):
506
 
        transport = self.get_vfat_transport('.')
507
 
        transport.mkdir('HELLO')
508
 
        self.assertTrue(transport.has('hello'))
509
 
        self.assertTrue(transport.has('Hello'))
 
631
        t = self.get_vfat_transport('.')
 
632
        t.mkdir('HELLO')
 
633
        self.assertTrue(t.has('hello'))
 
634
        self.assertTrue(t.has('Hello'))
510
635
 
511
636
    def test_forbidden_chars(self):
512
 
        transport = self.get_vfat_transport('.')
513
 
        self.assertRaises(ValueError, transport.has, "<NU>")
514
 
 
515
 
 
516
 
class BadTransportHandler(Transport):
 
637
        t = self.get_vfat_transport('.')
 
638
        self.assertRaises(ValueError, t.has, "<NU>")
 
639
 
 
640
 
 
641
class BadTransportHandler(transport.Transport):
517
642
    def __init__(self, base_url):
518
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
519
 
 
520
 
 
521
 
class BackupTransportHandler(Transport):
 
643
        raise errors.DependencyNotPresent('some_lib',
 
644
                                          'testing missing dependency')
 
645
 
 
646
 
 
647
class BackupTransportHandler(transport.Transport):
522
648
    """Test transport that works as a backup for the BadTransportHandler"""
523
649
    pass
524
650
 
525
651
 
526
 
class TestTransportImplementation(TestCaseInTempDir):
 
652
class TestTransportImplementation(tests.TestCaseInTempDir):
527
653
    """Implementation verification for transports.
528
 
    
 
654
 
529
655
    To verify a transport we need a server factory, which is a callable
530
656
    that accepts no parameters and returns an implementation of
531
657
    bzrlib.transport.Server.
532
 
    
 
658
 
533
659
    That Server is then used to construct transport instances and test
534
660
    the transport via loopback activity.
535
661
 
536
 
    Currently this assumes that the Transport object is connected to the 
537
 
    current working directory.  So that whatever is done 
538
 
    through the transport, should show up in the working 
 
662
    Currently this assumes that the Transport object is connected to the
 
663
    current working directory.  So that whatever is done
 
664
    through the transport, should show up in the working
539
665
    directory, and vice-versa. This is a bug, because its possible to have
540
 
    URL schemes which provide access to something that may not be 
541
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
666
    URL schemes which provide access to something that may not be
 
667
    result in storage on the local disk, i.e. due to file system limits, or
542
668
    due to it being a database or some other non-filesystem tool.
543
669
 
544
670
    This also tests to make sure that the functions work with both
545
671
    generators and lists (assuming iter(list) is effectively a generator)
546
672
    """
547
 
    
 
673
 
548
674
    def setUp(self):
549
675
        super(TestTransportImplementation, self).setUp()
550
676
        self._server = self.transport_server()
551
 
        self._server.setUp()
552
 
        self.addCleanup(self._server.tearDown)
 
677
        self.start_server(self._server)
553
678
 
554
679
    def get_transport(self, relpath=None):
555
680
        """Return a connected transport to the local directory.
559
684
        base_url = self._server.get_url()
560
685
        url = self._adjust_url(base_url, relpath)
561
686
        # try getting the transport via the regular interface:
562
 
        t = get_transport(url)
 
687
        t = transport.get_transport(url)
563
688
        # vila--20070607 if the following are commented out the test suite
564
689
        # still pass. Is this really still needed or was it a forgotten
565
690
        # temporary fix ?
570
695
        return t
571
696
 
572
697
 
573
 
class TestLocalTransports(TestCase):
 
698
class TestLocalTransports(tests.TestCase):
574
699
 
575
700
    def test_get_transport_from_abspath(self):
576
701
        here = osutils.abspath('.')
577
 
        t = get_transport(here)
578
 
        self.assertIsInstance(t, LocalTransport)
 
702
        t = transport.get_transport(here)
 
703
        self.assertIsInstance(t, local.LocalTransport)
579
704
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
580
705
 
581
706
    def test_get_transport_from_relpath(self):
582
707
        here = osutils.abspath('.')
583
 
        t = get_transport('.')
584
 
        self.assertIsInstance(t, LocalTransport)
 
708
        t = transport.get_transport('.')
 
709
        self.assertIsInstance(t, local.LocalTransport)
585
710
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
586
711
 
587
712
    def test_get_transport_from_local_url(self):
588
713
        here = osutils.abspath('.')
589
714
        here_url = urlutils.local_path_to_url(here) + '/'
590
 
        t = get_transport(here_url)
591
 
        self.assertIsInstance(t, LocalTransport)
 
715
        t = transport.get_transport(here_url)
 
716
        self.assertIsInstance(t, local.LocalTransport)
592
717
        self.assertEquals(t.base, here_url)
593
718
 
594
719
    def test_local_abspath(self):
595
720
        here = osutils.abspath('.')
596
 
        t = get_transport(here)
 
721
        t = transport.get_transport(here)
597
722
        self.assertEquals(t.local_abspath(''), here)
598
723
 
599
724
 
600
 
class TestWin32LocalTransport(TestCase):
 
725
class TestWin32LocalTransport(tests.TestCase):
601
726
 
602
727
    def test_unc_clone_to_root(self):
603
728
        # Win32 UNC path like \\HOST\path
604
729
        # clone to root should stop at least at \\HOST part
605
730
        # not on \\
606
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
731
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
607
732
        for i in xrange(4):
608
733
            t = t.clone('..')
609
734
        self.assertEquals(t.base, 'file://HOST/')
612
737
        self.assertEquals(t.base, 'file://HOST/')
613
738
 
614
739
 
615
 
class TestConnectedTransport(TestCase):
 
740
class TestConnectedTransport(tests.TestCase):
616
741
    """Tests for connected to remote server transports"""
617
742
 
618
743
    def test_parse_url(self):
619
 
        t = ConnectedTransport('http://simple.example.com/home/source')
 
744
        t = transport.ConnectedTransport(
 
745
            'http://simple.example.com/home/source')
620
746
        self.assertEquals(t._host, 'simple.example.com')
621
747
        self.assertEquals(t._port, None)
622
748
        self.assertEquals(t._path, '/home/source/')
623
 
        self.failUnless(t._user is None)
624
 
        self.failUnless(t._password is None)
 
749
        self.assertTrue(t._user is None)
 
750
        self.assertTrue(t._password is None)
625
751
 
626
752
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
627
753
 
 
754
    def test_parse_url_with_at_in_user(self):
 
755
        # Bug 228058
 
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
757
        self.assertEquals(t._user, 'user@host.com')
 
758
 
628
759
    def test_parse_quoted_url(self):
629
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
760
        t = transport.ConnectedTransport(
 
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
630
762
        self.assertEquals(t._host, 'exAmple.com')
631
763
        self.assertEquals(t._port, 2222)
632
764
        self.assertEquals(t._user, 'robey')
638
770
 
639
771
    def test_parse_invalid_url(self):
640
772
        self.assertRaises(errors.InvalidURL,
641
 
                          ConnectedTransport,
 
773
                          transport.ConnectedTransport,
642
774
                          'sftp://lily.org:~janneke/public/bzr/gub')
643
775
 
644
776
    def test_relpath(self):
645
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
646
778
 
647
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
648
780
        self.assertRaises(errors.PathNotChild, t.relpath,
654
786
        self.assertRaises(errors.PathNotChild, t.relpath,
655
787
                          'sftp://user@host.com:33/abs/path/sub')
656
788
        # Make sure it works when we don't supply a username
657
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
789
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
658
790
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
659
791
 
660
792
        # Make sure it works when parts of the path will be url encoded
661
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
793
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
662
794
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
663
795
 
664
796
    def test_connection_sharing_propagate_credentials(self):
665
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
666
798
        self.assertEquals('user', t._user)
667
799
        self.assertEquals('host.com', t._host)
668
800
        self.assertIs(None, t._get_connection())
689
821
        self.assertIs(new_password, c._get_credentials())
690
822
 
691
823
 
692
 
class TestReusedTransports(TestCase):
 
824
class TestReusedTransports(tests.TestCase):
693
825
    """Tests for transport reuse"""
694
826
 
695
827
    def test_reuse_same_transport(self):
696
828
        possible_transports = []
697
 
        t1 = get_transport('http://foo/',
698
 
                           possible_transports=possible_transports)
 
829
        t1 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=possible_transports)
699
831
        self.assertEqual([t1], possible_transports)
700
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
832
        t2 = transport.get_transport('http://foo/',
 
833
                                     possible_transports=[t1])
701
834
        self.assertIs(t1, t2)
702
835
 
703
836
        # Also check that final '/' are handled correctly
704
 
        t3 = get_transport('http://foo/path/')
705
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
 
839
                                     possible_transports=[t3])
706
840
        self.assertIs(t3, t4)
707
841
 
708
 
        t5 = get_transport('http://foo/path')
709
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
 
844
                                     possible_transports=[t5])
710
845
        self.assertIs(t5, t6)
711
846
 
712
847
    def test_don_t_reuse_different_transport(self):
713
 
        t1 = get_transport('http://foo/path')
714
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
 
850
                                     possible_transports=[t1])
715
851
        self.assertIsNot(t1, t2)
716
852
 
717
853
 
718
 
class TestTransportTrace(TestCase):
 
854
class TestTransportTrace(tests.TestCase):
719
855
 
720
856
    def test_get(self):
721
 
        transport = get_transport('trace+memory://')
722
 
        self.assertIsInstance(
723
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
724
859
 
725
860
    def test_clone_preserves_activity(self):
726
 
        transport = get_transport('trace+memory://')
727
 
        transport2 = transport.clone('.')
728
 
        self.assertTrue(transport is not transport2)
729
 
        self.assertTrue(transport._activity is transport2._activity)
 
861
        t = transport.get_transport('trace+memory://')
 
862
        t2 = t.clone('.')
 
863
        self.assertTrue(t is not t2)
 
864
        self.assertTrue(t._activity is t2._activity)
730
865
 
731
866
    # the following specific tests are for the operations that have made use of
732
867
    # logging in tests; we could test every single operation but doing that
733
868
    # still won't cause a test failure when the top level Transport API
734
869
    # changes; so there is little return doing that.
735
870
    def test_get(self):
736
 
        transport = get_transport('trace+memory:///')
737
 
        transport.put_bytes('foo', 'barish')
738
 
        transport.get('foo')
 
871
        t = transport.get_transport('trace+memory:///')
 
872
        t.put_bytes('foo', 'barish')
 
873
        t.get('foo')
739
874
        expected_result = []
740
875
        # put_bytes records the bytes, not the content to avoid memory
741
876
        # pressure.
742
877
        expected_result.append(('put_bytes', 'foo', 6, None))
743
878
        # get records the file name only.
744
879
        expected_result.append(('get', 'foo'))
745
 
        self.assertEqual(expected_result, transport._activity)
 
880
        self.assertEqual(expected_result, t._activity)
746
881
 
747
882
    def test_readv(self):
748
 
        transport = get_transport('trace+memory:///')
749
 
        transport.put_bytes('foo', 'barish')
750
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
751
 
            upper_limit=6))
 
883
        t = transport.get_transport('trace+memory:///')
 
884
        t.put_bytes('foo', 'barish')
 
885
        list(t.readv('foo', [(0, 1), (3, 2)],
 
886
                     adjust_for_latency=True, upper_limit=6))
752
887
        expected_result = []
753
888
        # put_bytes records the bytes, not the content to avoid memory
754
889
        # pressure.
755
890
        expected_result.append(('put_bytes', 'foo', 6, None))
756
891
        # readv records the supplied offset request
757
892
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
758
 
        self.assertEqual(expected_result, transport._activity)
 
893
        self.assertEqual(expected_result, t._activity)
 
894
 
 
895
 
 
896
class TestSSHConnections(tests.TestCaseWithTransport):
 
897
 
 
898
    def test_bzr_connect_to_bzr_ssh(self):
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
900
 
 
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
902
        """
 
903
        # This test actually causes a bzr instance to be invoked, which is very
 
904
        # expensive: it should be the only such test in the test suite.
 
905
        # A reasonable evolution for this would be to simply check inside
 
906
        # check_channel_exec_request that the command is appropriate, and then
 
907
        # satisfy requests in-process.
 
908
        self.requireFeature(features.paramiko)
 
909
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
910
        # override the interface (doesn't change self._vendor).
 
911
        # Note that this does encryption, so can be slow.
 
912
        from bzrlib.tests import stub_sftp
 
913
 
 
914
        # Start an SSH server
 
915
        self.command_executed = []
 
916
        # XXX: This is horrible -- we define a really dumb SSH server that
 
917
        # executes commands, and manage the hooking up of stdin/out/err to the
 
918
        # SSH channel ourselves.  Surely this has already been implemented
 
919
        # elsewhere?
 
920
        started = []
 
921
        class StubSSHServer(stub_sftp.StubServer):
 
922
 
 
923
            test = self
 
924
 
 
925
            def check_channel_exec_request(self, channel, command):
 
926
                self.test.command_executed.append(command)
 
927
                proc = subprocess.Popen(
 
928
                    command, shell=True, stdin=subprocess.PIPE,
 
929
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
930
 
 
931
                # XXX: horribly inefficient, not to mention ugly.
 
932
                # Start a thread for each of stdin/out/err, and relay bytes from
 
933
                # the subprocess to channel and vice versa.
 
934
                def ferry_bytes(read, write, close):
 
935
                    while True:
 
936
                        bytes = read(1)
 
937
                        if bytes == '':
 
938
                            close()
 
939
                            break
 
940
                        write(bytes)
 
941
 
 
942
                file_functions = [
 
943
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
944
                    (proc.stdout.read, channel.sendall, channel.close),
 
945
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
946
                started.append(proc)
 
947
                for read, write, close in file_functions:
 
948
                    t = threading.Thread(
 
949
                        target=ferry_bytes, args=(read, write, close))
 
950
                    t.start()
 
951
                    started.append(t)
 
952
 
 
953
                return True
 
954
 
 
955
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
956
        # We *don't* want to override the default SSH vendor: the detected one
 
957
        # is the one to use.
 
958
 
 
959
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
960
        # inherits from SFTPServer which forces the SSH vendor to
 
961
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
962
        self.start_server(ssh_server)
 
963
        port = ssh_server.port
 
964
 
 
965
        if sys.platform == 'win32':
 
966
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
967
        else:
 
968
            bzr_remote_path = self.get_bzr_path()
 
969
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
970
 
 
971
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
972
        # variable is used to tell bzr what command to run on the remote end.
 
973
        path_to_branch = osutils.abspath('.')
 
974
        if sys.platform == 'win32':
 
975
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
976
            # prefix a '/' to get the right path.
 
977
            path_to_branch = '/' + path_to_branch
 
978
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
979
        t = transport.get_transport(url)
 
980
        self.permit_url(t.base)
 
981
        t.mkdir('foo')
 
982
 
 
983
        self.assertEqual(
 
984
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
985
            self.command_executed)
 
986
        # Make sure to disconnect, so that the remote process can stop, and we
 
987
        # can cleanup. Then pause the test until everything is shutdown
 
988
        t._client._medium.disconnect()
 
989
        if not started:
 
990
            return
 
991
        # First wait for the subprocess
 
992
        started[0].wait()
 
993
        # And the rest are threads
 
994
        for t in started[1:]:
 
995
            t.join()
 
996
 
 
997
 
 
998
class TestUnhtml(tests.TestCase):
 
999
 
 
1000
    """Tests for unhtml_roughly"""
 
1001
 
 
1002
    def test_truncation(self):
 
1003
        fake_html = "<p>something!\n" * 1000
 
1004
        result = http.unhtml_roughly(fake_html)
 
1005
        self.assertEquals(len(result), 1000)
 
1006
        self.assertStartsWith(result, " something!")