~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2010-01-29 17:54:08 UTC
  • mto: This revision was merged to the branch mainline in revision 5040.
  • Revision ID: jelmer@samba.org-20100129175408-rn80rnkg0qplyni9
RemoveĀ unusedĀ imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
 
    transport,
 
28
    transport as _mod_transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.directory_service import directories
32
31
from bzrlib.transport import (
33
 
    chroot,
34
32
    fakenfs,
35
 
    http,
36
 
    local,
37
 
    location_to_url,
38
33
    memory,
39
 
    pathfilter,
40
34
    readonly,
41
35
    )
42
 
import bzrlib.transport.trace
43
 
from bzrlib.tests import (
44
 
    features,
45
 
    test_server,
46
 
    )
 
36
from bzrlib.errors import (DependencyNotPresent,
 
37
                           FileExists,
 
38
                           InvalidURLJoin,
 
39
                           NoSuchFile,
 
40
                           PathNotChild,
 
41
                           ReadError,
 
42
                           UnsupportedProtocol,
 
43
                           )
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
45
from bzrlib.transport import (_clear_protocol_handlers,
 
46
                              _CoalescedOffset,
 
47
                              ConnectedTransport,
 
48
                              _get_protocol_handlers,
 
49
                              _set_protocol_handlers,
 
50
                              _get_transport_modules,
 
51
                              get_transport,
 
52
                              LateReadError,
 
53
                              register_lazy_transport,
 
54
                              register_transport_proto,
 
55
                              Transport,
 
56
                              )
 
57
from bzrlib.transport.chroot import ChrootServer
 
58
from bzrlib.transport.local import (LocalTransport,
 
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
47
61
 
48
62
 
49
63
# TODO: Should possibly split transport-specific tests into their own files.
50
64
 
51
65
 
52
 
class TestTransport(tests.TestCase):
 
66
class TestTransport(TestCase):
53
67
    """Test the non transport-concrete class functionality."""
54
68
 
55
69
    def test__get_set_protocol_handlers(self):
56
 
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
70
        handlers = _get_protocol_handlers()
 
71
        self.assertNotEqual([], handlers.keys( ))
 
72
        try:
 
73
            _clear_protocol_handlers()
 
74
            self.assertEqual([], _get_protocol_handlers().keys())
 
75
        finally:
 
76
            _set_protocol_handlers(handlers)
61
77
 
62
78
    def test_get_transport_modules(self):
63
 
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
79
        handlers = _get_protocol_handlers()
65
80
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
67
 
 
 
81
        _clear_protocol_handlers()
68
82
        class SampleHandler(object):
69
83
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
84
        try:
 
85
            _clear_protocol_handlers()
 
86
            register_transport_proto('foo')
 
87
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
88
                                    'TestTransport.SampleHandler')
 
89
            register_transport_proto('bar')
 
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
91
                                    'TestTransport.SampleHandler')
 
92
            self.assertEqual([SampleHandler.__module__,
 
93
                              'bzrlib.transport.chroot',
 
94
                              'bzrlib.transport.pathfilter'],
 
95
                             _get_transport_modules())
 
96
        finally:
 
97
            _set_protocol_handlers(handlers)
83
98
 
84
99
    def test_transport_dependency(self):
85
100
        """Transport with missing dependency causes no error"""
86
 
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
101
        saved_handlers = _get_protocol_handlers()
88
102
        # don't pollute the current handlers
89
 
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
103
        _clear_protocol_handlers()
93
104
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
105
            register_transport_proto('foo')
 
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
107
                    'BadTransportHandler')
 
108
            try:
 
109
                get_transport('foo://fooserver/foo')
 
110
            except UnsupportedProtocol, e:
 
111
                e_str = str(e)
 
112
                self.assertEquals('Unsupported protocol'
 
113
                                  ' for url "foo://fooserver/foo":'
 
114
                                  ' Unable to import library "some_lib":'
 
115
                                  ' testing missing dependency', str(e))
 
116
            else:
 
117
                self.fail('Did not raise UnsupportedProtocol')
 
118
        finally:
 
119
            # restore original values
 
120
            _set_protocol_handlers(saved_handlers)
103
121
 
104
122
    def test_transport_fallback(self):
105
123
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
124
        saved_handlers = _get_protocol_handlers()
 
125
        try:
 
126
            _clear_protocol_handlers()
 
127
            register_transport_proto('foo')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BackupTransportHandler')
 
130
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
131
                    'BadTransportHandler')
 
132
            t = get_transport('foo://fooserver/foo')
 
133
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
134
        finally:
 
135
            _set_protocol_handlers(saved_handlers)
116
136
 
117
137
    def test_ssh_hints(self):
118
138
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
139
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol, e:
 
140
            get_transport('ssh://fooserver/foo')
 
141
        except UnsupportedProtocol, e:
122
142
            e_str = str(e)
123
143
            self.assertEquals('Unsupported protocol'
124
144
                              ' for url "ssh://fooserver/foo":'
125
 
                              ' bzr supports bzr+ssh to operate over ssh,'
126
 
                              ' use "bzr+ssh://fooserver/foo".',
 
145
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
127
146
                              str(e))
128
147
        else:
129
148
            self.fail('Did not raise UnsupportedProtocol')
130
149
 
131
150
    def test_LateReadError(self):
132
151
        """The LateReadError helper should raise on read()."""
133
 
        a_file = transport.LateReadError('a path')
 
152
        a_file = LateReadError('a path')
134
153
        try:
135
154
            a_file.read()
136
 
        except errors.ReadError, error:
 
155
        except ReadError, error:
137
156
            self.assertEqual('a path', error.path)
138
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
157
        self.assertRaises(ReadError, a_file.read, 40)
139
158
        a_file.close()
140
159
 
 
160
    def test__combine_paths(self):
 
161
        t = Transport('/')
 
162
        self.assertEqual('/home/sarah/project/foo',
 
163
                         t._combine_paths('/home/sarah', 'project/foo'))
 
164
        self.assertEqual('/etc',
 
165
                         t._combine_paths('/home/sarah', '../../etc'))
 
166
        self.assertEqual('/etc',
 
167
                         t._combine_paths('/home/sarah', '../../../etc'))
 
168
        self.assertEqual('/etc',
 
169
                         t._combine_paths('/home/sarah', '/etc'))
 
170
 
141
171
    def test_local_abspath_non_local_transport(self):
142
172
        # the base implementation should throw
143
173
        t = memory.MemoryTransport()
145
175
        self.assertEqual('memory:///t is not a local path.', str(e))
146
176
 
147
177
 
148
 
class TestCoalesceOffsets(tests.TestCase):
 
178
class TestCoalesceOffsets(TestCase):
149
179
 
150
180
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
151
 
        coalesce = transport.Transport._coalesce_offsets
152
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
181
        coalesce = Transport._coalesce_offsets
 
182
        exp = [_CoalescedOffset(*x) for x in expected]
153
183
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
154
184
                            max_size=max_size))
155
185
        self.assertEqual(exp, out)
200
230
 
201
231
    def test_coalesce_fudge(self):
202
232
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
233
                    (100, 10, [(0, 10),]),
204
234
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
 
235
                   fudge=10
 
236
                  )
207
237
    def test_coalesce_max_size(self):
208
238
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
239
                    (30, 50, [(0, 50)]),
210
240
                    # If one range is above max_size, it gets its own coalesced
211
241
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
 
242
                    (100, 80, [(0, 80),]),],
213
243
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
 
244
                   max_size=50
 
245
                  )
215
246
 
216
247
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
248
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
249
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
219
250
                  )
220
251
 
221
252
    def test_coalesce_default_limit(self):
222
253
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
254
        ten_mb = 10*1024*1024
 
255
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
225
256
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
257
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
258
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
259
                   [(i*ten_mb, ten_mb) for i in range(11)],
229
260
                   max_size=1*1024*1024*1024)
230
261
 
231
262
 
232
 
class TestMemoryServer(tests.TestCase):
 
263
class TestMemoryServer(TestCase):
233
264
 
234
265
    def test_create_server(self):
235
266
        server = memory.MemoryServer()
236
267
        server.start_server()
237
268
        url = server.get_url()
238
 
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
240
271
        del t
241
272
        server.stop_server()
242
 
        self.assertFalse(url in transport.transport_list_registry)
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
243
274
        self.assertRaises(errors.UnsupportedProtocol,
244
 
                          transport.get_transport, url)
245
 
 
246
 
 
247
 
class TestMemoryTransport(tests.TestCase):
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
 
278
class TestMemoryTransport(TestCase):
248
279
 
249
280
    def test_get_transport(self):
250
281
        memory.MemoryTransport()
251
282
 
252
283
    def test_clone(self):
253
 
        t = memory.MemoryTransport()
254
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
255
 
        self.assertEqual("memory:///", t.clone("/").base)
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
286
        self.assertEqual("memory:///", transport.clone("/").base)
256
287
 
257
288
    def test_abspath(self):
258
 
        t = memory.MemoryTransport()
259
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
289
        transport = memory.MemoryTransport()
 
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
260
291
 
261
292
    def test_abspath_of_root(self):
262
 
        t = memory.MemoryTransport()
263
 
        self.assertEqual("memory:///", t.base)
264
 
        self.assertEqual("memory:///", t.abspath('/'))
 
293
        transport = memory.MemoryTransport()
 
294
        self.assertEqual("memory:///", transport.base)
 
295
        self.assertEqual("memory:///", transport.abspath('/'))
265
296
 
266
297
    def test_abspath_of_relpath_starting_at_root(self):
267
 
        t = memory.MemoryTransport()
268
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
298
        transport = memory.MemoryTransport()
 
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
269
300
 
270
301
    def test_append_and_get(self):
271
 
        t = memory.MemoryTransport()
272
 
        t.append_bytes('path', 'content')
273
 
        self.assertEqual(t.get('path').read(), 'content')
274
 
        t.append_file('path', StringIO('content'))
275
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
302
        transport = memory.MemoryTransport()
 
303
        transport.append_bytes('path', 'content')
 
304
        self.assertEqual(transport.get('path').read(), 'content')
 
305
        transport.append_file('path', StringIO('content'))
 
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
276
307
 
277
308
    def test_put_and_get(self):
278
 
        t = memory.MemoryTransport()
279
 
        t.put_file('path', StringIO('content'))
280
 
        self.assertEqual(t.get('path').read(), 'content')
281
 
        t.put_bytes('path', 'content')
282
 
        self.assertEqual(t.get('path').read(), 'content')
 
309
        transport = memory.MemoryTransport()
 
310
        transport.put_file('path', StringIO('content'))
 
311
        self.assertEqual(transport.get('path').read(), 'content')
 
312
        transport.put_bytes('path', 'content')
 
313
        self.assertEqual(transport.get('path').read(), 'content')
283
314
 
284
315
    def test_append_without_dir_fails(self):
285
 
        t = memory.MemoryTransport()
286
 
        self.assertRaises(errors.NoSuchFile,
287
 
                          t.append_bytes, 'dir/path', 'content')
 
316
        transport = memory.MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.append_bytes, 'dir/path', 'content')
288
319
 
289
320
    def test_put_without_dir_fails(self):
290
 
        t = memory.MemoryTransport()
291
 
        self.assertRaises(errors.NoSuchFile,
292
 
                          t.put_file, 'dir/path', StringIO('content'))
 
321
        transport = memory.MemoryTransport()
 
322
        self.assertRaises(NoSuchFile,
 
323
                          transport.put_file, 'dir/path', StringIO('content'))
293
324
 
294
325
    def test_get_missing(self):
295
326
        transport = memory.MemoryTransport()
296
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
297
328
 
298
329
    def test_has_missing(self):
299
 
        t = memory.MemoryTransport()
300
 
        self.assertEquals(False, t.has('foo'))
 
330
        transport = memory.MemoryTransport()
 
331
        self.assertEquals(False, transport.has('foo'))
301
332
 
302
333
    def test_has_present(self):
303
 
        t = memory.MemoryTransport()
304
 
        t.append_bytes('foo', 'content')
305
 
        self.assertEquals(True, t.has('foo'))
 
334
        transport = memory.MemoryTransport()
 
335
        transport.append_bytes('foo', 'content')
 
336
        self.assertEquals(True, transport.has('foo'))
306
337
 
307
338
    def test_list_dir(self):
308
 
        t = memory.MemoryTransport()
309
 
        t.put_bytes('foo', 'content')
310
 
        t.mkdir('dir')
311
 
        t.put_bytes('dir/subfoo', 'content')
312
 
        t.put_bytes('dirlike', 'content')
 
339
        transport = memory.MemoryTransport()
 
340
        transport.put_bytes('foo', 'content')
 
341
        transport.mkdir('dir')
 
342
        transport.put_bytes('dir/subfoo', 'content')
 
343
        transport.put_bytes('dirlike', 'content')
313
344
 
314
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
315
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
345
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
316
347
 
317
348
    def test_mkdir(self):
318
 
        t = memory.MemoryTransport()
319
 
        t.mkdir('dir')
320
 
        t.append_bytes('dir/path', 'content')
321
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
349
        transport = memory.MemoryTransport()
 
350
        transport.mkdir('dir')
 
351
        transport.append_bytes('dir/path', 'content')
 
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
322
353
 
323
354
    def test_mkdir_missing_parent(self):
324
 
        t = memory.MemoryTransport()
325
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
355
        transport = memory.MemoryTransport()
 
356
        self.assertRaises(NoSuchFile,
 
357
                          transport.mkdir, 'dir/dir')
326
358
 
327
359
    def test_mkdir_twice(self):
328
 
        t = memory.MemoryTransport()
329
 
        t.mkdir('dir')
330
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
360
        transport = memory.MemoryTransport()
 
361
        transport.mkdir('dir')
 
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
331
363
 
332
364
    def test_parameters(self):
333
 
        t = memory.MemoryTransport()
334
 
        self.assertEqual(True, t.listable())
335
 
        self.assertEqual(False, t.is_readonly())
 
365
        transport = memory.MemoryTransport()
 
366
        self.assertEqual(True, transport.listable())
 
367
        self.assertEqual(False, transport.is_readonly())
336
368
 
337
369
    def test_iter_files_recursive(self):
338
 
        t = memory.MemoryTransport()
339
 
        t.mkdir('dir')
340
 
        t.put_bytes('dir/foo', 'content')
341
 
        t.put_bytes('dir/bar', 'content')
342
 
        t.put_bytes('bar', 'content')
343
 
        paths = set(t.iter_files_recursive())
 
370
        transport = memory.MemoryTransport()
 
371
        transport.mkdir('dir')
 
372
        transport.put_bytes('dir/foo', 'content')
 
373
        transport.put_bytes('dir/bar', 'content')
 
374
        transport.put_bytes('bar', 'content')
 
375
        paths = set(transport.iter_files_recursive())
344
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
345
377
 
346
378
    def test_stat(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.put_bytes('foo', 'content')
349
 
        t.put_bytes('bar', 'phowar')
350
 
        self.assertEqual(7, t.stat('foo').st_size)
351
 
        self.assertEqual(6, t.stat('bar').st_size)
352
 
 
353
 
 
354
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
379
        transport = memory.MemoryTransport()
 
380
        transport.put_bytes('foo', 'content')
 
381
        transport.put_bytes('bar', 'phowar')
 
382
        self.assertEqual(7, transport.stat('foo').st_size)
 
383
        self.assertEqual(6, transport.stat('bar').st_size)
 
384
 
 
385
 
 
386
class ChrootDecoratorTransportTest(TestCase):
355
387
    """Chroot decoration specific tests."""
356
388
 
357
389
    def test_abspath(self):
358
390
        # The abspath is always relative to the chroot_url.
359
 
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
361
392
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
363
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
393
        transport = get_transport(server.get_url())
 
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
364
395
 
365
 
        subdir_t = t.clone('subdir')
366
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
396
        subdir_transport = transport.clone('subdir')
 
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
367
398
 
368
399
    def test_clone(self):
369
 
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
371
401
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
402
        transport = get_transport(server.get_url())
373
403
        # relpath from root and root path are the same
374
 
        relpath_cloned = t.clone('foo')
375
 
        abspath_cloned = t.clone('/foo')
 
404
        relpath_cloned = transport.clone('foo')
 
405
        abspath_cloned = transport.clone('/foo')
376
406
        self.assertEqual(server, relpath_cloned.server)
377
407
        self.assertEqual(server, abspath_cloned.server)
378
408
 
384
414
        This is so that it is not possible to escape a chroot by doing::
385
415
            url = chroot_transport.base
386
416
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
417
            new_transport = get_transport(parent_url)
388
418
        """
389
 
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
391
420
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
394
 
        self.assertEqual(t.server, new_t.server)
395
 
        self.assertEqual(t.base, new_t.base)
 
421
        transport = get_transport(server.get_url())
 
422
        new_transport = get_transport(transport.base)
 
423
        self.assertEqual(transport.server, new_transport.server)
 
424
        self.assertEqual(transport.base, new_transport.base)
396
425
 
397
426
    def test_urljoin_preserves_chroot(self):
398
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
401
430
        This is so that it is not possible to escape a chroot by doing::
402
431
            url = chroot_transport.base
403
432
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
433
            new_transport = get_transport(parent_url)
405
434
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
 
435
        server = ChrootServer(get_transport('memory:///path/'))
408
436
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
437
        transport = get_transport(server.get_url())
410
438
        self.assertRaises(
411
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
 
 
413
 
 
414
 
class TestChrootServer(tests.TestCase):
 
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
440
 
 
441
 
 
442
class ChrootServerTest(TestCase):
415
443
 
416
444
    def test_construct(self):
417
445
        backing_transport = memory.MemoryTransport()
418
 
        server = chroot.ChrootServer(backing_transport)
 
446
        server = ChrootServer(backing_transport)
419
447
        self.assertEqual(backing_transport, server.backing_transport)
420
448
 
421
449
    def test_setUp(self):
422
450
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
 
451
        server = ChrootServer(backing_transport)
424
452
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
453
        try:
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
455
        finally:
 
456
            server.stop_server()
428
457
 
429
458
    def test_stop_server(self):
430
459
        backing_transport = memory.MemoryTransport()
431
 
        server = chroot.ChrootServer(backing_transport)
 
460
        server = ChrootServer(backing_transport)
432
461
        server.start_server()
433
462
        server.stop_server()
434
 
        self.assertFalse(server.scheme
435
 
                         in transport._get_protocol_handlers().keys())
 
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
436
464
 
437
465
    def test_get_url(self):
438
466
        backing_transport = memory.MemoryTransport()
439
 
        server = chroot.ChrootServer(backing_transport)
 
467
        server = ChrootServer(backing_transport)
440
468
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
442
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
443
 
 
444
 
 
445
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
469
        try:
 
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
471
        finally:
 
472
            server.stop_server()
 
473
 
 
474
 
 
475
class PathFilteringDecoratorTransportTest(TestCase):
446
476
    """Pathfilter decoration specific tests."""
447
477
 
448
478
    def test_abspath(self):
449
479
        # The abspath is always relative to the base of the backing transport.
450
 
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
452
481
            lambda x: x)
453
482
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
455
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
456
485
 
457
 
        subdir_t = t.clone('subdir')
458
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
459
488
        server.stop_server()
460
489
 
461
490
    def make_pf_transport(self, filter_func=None):
462
491
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
 
492
        
464
493
        :param filter_func: by default this will be a no-op function.  Use this
465
494
            parameter to override it."""
466
495
        if filter_func is None:
467
496
            filter_func = lambda x: x
468
 
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
470
499
        server.start_server()
471
500
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
 
501
        return get_transport(server.get_url())
473
502
 
474
503
    def test__filter(self):
475
504
        # _filter (with an identity func as filter_func) always returns
476
505
        # paths relative to the base of the backing transport.
477
 
        t = self.make_pf_transport()
478
 
        self.assertEqual('foo', t._filter('foo'))
479
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
480
 
        self.assertEqual('', t._filter('..'))
481
 
        self.assertEqual('', t._filter('/'))
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
482
511
        # The base of the pathfiltering transport is taken into account too.
483
 
        t = t.clone('subdir1/subdir2')
484
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
485
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
486
 
        self.assertEqual('subdir1', t._filter('..'))
487
 
        self.assertEqual('', t._filter('/'))
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
488
518
 
489
519
    def test_filter_invocation(self):
490
520
        filter_log = []
491
 
 
492
521
        def filter(path):
493
522
            filter_log.append(path)
494
523
            return path
495
 
        t = self.make_pf_transport(filter)
496
 
        t.has('abc')
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
497
526
        self.assertEqual(['abc'], filter_log)
498
527
        del filter_log[:]
499
 
        t.clone('abc').has('xyz')
 
528
        transport.clone('abc').has('xyz')
500
529
        self.assertEqual(['abc/xyz'], filter_log)
501
530
        del filter_log[:]
502
 
        t.has('/abc')
 
531
        transport.has('/abc')
503
532
        self.assertEqual(['abc'], filter_log)
504
533
 
505
534
    def test_clone(self):
506
 
        t = self.make_pf_transport()
 
535
        transport = self.make_pf_transport()
507
536
        # relpath from root and root path are the same
508
 
        relpath_cloned = t.clone('foo')
509
 
        abspath_cloned = t.clone('/foo')
510
 
        self.assertEqual(t.server, relpath_cloned.server)
511
 
        self.assertEqual(t.server, abspath_cloned.server)
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
512
541
 
513
542
    def test_url_preserves_pathfiltering(self):
514
543
        """Calling get_transport on a pathfiltered transport's base should
519
548
        otherwise) the filtering by doing::
520
549
            url = filtered_transport.base
521
550
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
 
551
            new_transport = get_transport(parent_url)
523
552
        """
524
 
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
526
 
        self.assertEqual(t.server, new_t.server)
527
 
        self.assertEqual(t.base, new_t.base)
528
 
 
529
 
 
530
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
 
557
 
 
558
 
 
559
class ReadonlyDecoratorTransportTest(TestCase):
531
560
    """Readonly decoration specific tests."""
532
561
 
533
562
    def test_local_parameters(self):
534
563
        # connect to . in readonly mode
535
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
536
 
        self.assertEqual(True, t.listable())
537
 
        self.assertEqual(True, t.is_readonly())
 
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
565
        self.assertEqual(True, transport.listable())
 
566
        self.assertEqual(True, transport.is_readonly())
538
567
 
539
568
    def test_http_parameters(self):
540
569
        from bzrlib.tests.http_server import HttpServer
541
570
        # connect to '.' via http which is not listable
542
571
        server = HttpServer()
543
572
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
546
 
        self.assertEqual(False, t.listable())
547
 
        self.assertEqual(True, t.is_readonly())
548
 
 
549
 
 
550
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
551
581
    """NFS decorator specific tests."""
552
582
 
553
583
    def get_nfs_transport(self, url):
557
587
    def test_local_parameters(self):
558
588
        # the listable and is_readonly parameters
559
589
        # are not changed by the fakenfs decorator
560
 
        t = self.get_nfs_transport('.')
561
 
        self.assertEqual(True, t.listable())
562
 
        self.assertEqual(False, t.is_readonly())
 
590
        transport = self.get_nfs_transport('.')
 
591
        self.assertEqual(True, transport.listable())
 
592
        self.assertEqual(False, transport.is_readonly())
563
593
 
564
594
    def test_http_parameters(self):
565
595
        # the listable and is_readonly parameters
568
598
        # connect to '.' via http which is not listable
569
599
        server = HttpServer()
570
600
        self.start_server(server)
571
 
        t = self.get_nfs_transport(server.get_url())
572
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
573
 
        self.assertEqual(False, t.listable())
574
 
        self.assertEqual(True, t.is_readonly())
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
575
606
 
576
607
    def test_fakenfs_server_default(self):
577
608
        # a FakeNFSServer() should bring up a local relpath server for itself
578
 
        server = test_server.FakeNFSServer()
 
609
        server = fakenfs.FakeNFSServer()
579
610
        self.start_server(server)
580
611
        # the url should be decorated appropriately
581
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
613
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
 
614
        transport = get_transport(server.get_url())
584
615
        # which must be a FakeNFSTransportDecorator instance.
585
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
586
617
 
587
618
    def test_fakenfs_rename_semantics(self):
588
619
        # a FakeNFS transport must mangle the way rename errors occur to
589
620
        # look like NFS problems.
590
 
        t = self.get_nfs_transport('.')
 
621
        transport = self.get_nfs_transport('.')
591
622
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
592
 
                        transport=t)
593
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
594
 
 
595
 
 
596
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
623
                        transport=transport)
 
624
        self.assertRaises(errors.ResourceBusy,
 
625
                          transport.rename, 'from', 'to')
 
626
 
 
627
 
 
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
597
629
    """Tests for simulation of VFAT restrictions"""
598
630
 
599
631
    def get_vfat_transport(self, url):
603
635
 
604
636
    def test_transport_creation(self):
605
637
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
606
 
        t = self.get_vfat_transport('.')
607
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
638
        transport = self.get_vfat_transport('.')
 
639
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
608
640
 
609
641
    def test_transport_mkdir(self):
610
 
        t = self.get_vfat_transport('.')
611
 
        t.mkdir('HELLO')
612
 
        self.assertTrue(t.has('hello'))
613
 
        self.assertTrue(t.has('Hello'))
 
642
        transport = self.get_vfat_transport('.')
 
643
        transport.mkdir('HELLO')
 
644
        self.assertTrue(transport.has('hello'))
 
645
        self.assertTrue(transport.has('Hello'))
614
646
 
615
647
    def test_forbidden_chars(self):
616
 
        t = self.get_vfat_transport('.')
617
 
        self.assertRaises(ValueError, t.has, "<NU>")
618
 
 
619
 
 
620
 
class BadTransportHandler(transport.Transport):
 
648
        transport = self.get_vfat_transport('.')
 
649
        self.assertRaises(ValueError, transport.has, "<NU>")
 
650
 
 
651
 
 
652
class BadTransportHandler(Transport):
621
653
    def __init__(self, base_url):
622
 
        raise errors.DependencyNotPresent('some_lib',
623
 
                                          'testing missing dependency')
624
 
 
625
 
 
626
 
class BackupTransportHandler(transport.Transport):
 
654
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
655
 
 
656
 
 
657
class BackupTransportHandler(Transport):
627
658
    """Test transport that works as a backup for the BadTransportHandler"""
628
659
    pass
629
660
 
630
661
 
631
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
662
class TestTransportImplementation(TestCaseInTempDir):
632
663
    """Implementation verification for transports.
633
664
 
634
665
    To verify a transport we need a server factory, which is a callable
663
694
        base_url = self._server.get_url()
664
695
        url = self._adjust_url(base_url, relpath)
665
696
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
 
697
        t = get_transport(url)
667
698
        # vila--20070607 if the following are commented out the test suite
668
699
        # still pass. Is this really still needed or was it a forgotten
669
700
        # temporary fix ?
674
705
        return t
675
706
 
676
707
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
 
class TestLocalTransports(tests.TestCase):
 
708
class TestLocalTransports(TestCase):
715
709
 
716
710
    def test_get_transport_from_abspath(self):
717
711
        here = osutils.abspath('.')
718
 
        t = transport.get_transport(here)
719
 
        self.assertIsInstance(t, local.LocalTransport)
 
712
        t = get_transport(here)
 
713
        self.assertIsInstance(t, LocalTransport)
720
714
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
721
715
 
722
716
    def test_get_transport_from_relpath(self):
723
717
        here = osutils.abspath('.')
724
 
        t = transport.get_transport('.')
725
 
        self.assertIsInstance(t, local.LocalTransport)
 
718
        t = get_transport('.')
 
719
        self.assertIsInstance(t, LocalTransport)
726
720
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
727
721
 
728
722
    def test_get_transport_from_local_url(self):
729
723
        here = osutils.abspath('.')
730
724
        here_url = urlutils.local_path_to_url(here) + '/'
731
 
        t = transport.get_transport(here_url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
 
725
        t = get_transport(here_url)
 
726
        self.assertIsInstance(t, LocalTransport)
733
727
        self.assertEquals(t.base, here_url)
734
728
 
735
729
    def test_local_abspath(self):
736
730
        here = osutils.abspath('.')
737
 
        t = transport.get_transport(here)
 
731
        t = get_transport(here)
738
732
        self.assertEquals(t.local_abspath(''), here)
739
733
 
740
734
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
 
764
 
class TestWin32LocalTransport(tests.TestCase):
 
735
class TestWin32LocalTransport(TestCase):
765
736
 
766
737
    def test_unc_clone_to_root(self):
767
738
        # Win32 UNC path like \\HOST\path
768
739
        # clone to root should stop at least at \\HOST part
769
740
        # not on \\
770
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
741
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
771
742
        for i in xrange(4):
772
743
            t = t.clone('..')
773
744
        self.assertEquals(t.base, 'file://HOST/')
776
747
        self.assertEquals(t.base, 'file://HOST/')
777
748
 
778
749
 
779
 
class TestConnectedTransport(tests.TestCase):
 
750
class TestConnectedTransport(TestCase):
780
751
    """Tests for connected to remote server transports"""
781
752
 
782
753
    def test_parse_url(self):
783
 
        t = transport.ConnectedTransport(
784
 
            'http://simple.example.com/home/source')
785
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
786
 
        self.assertEquals(t._parsed_url.port, None)
787
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
788
 
        self.assertTrue(t._parsed_url.user is None)
789
 
        self.assertTrue(t._parsed_url.password is None)
 
754
        t = ConnectedTransport('http://simple.example.com/home/source')
 
755
        self.assertEquals(t._host, 'simple.example.com')
 
756
        self.assertEquals(t._port, None)
 
757
        self.assertEquals(t._path, '/home/source/')
 
758
        self.failUnless(t._user is None)
 
759
        self.failUnless(t._password is None)
790
760
 
791
761
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
792
762
 
793
763
    def test_parse_url_with_at_in_user(self):
794
764
        # Bug 228058
795
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
796
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
765
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
766
        self.assertEquals(t._user, 'user@host.com')
797
767
 
798
768
    def test_parse_quoted_url(self):
799
 
        t = transport.ConnectedTransport(
800
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
801
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
802
 
        self.assertEquals(t._parsed_url.port, 2222)
803
 
        self.assertEquals(t._parsed_url.user, 'robey')
804
 
        self.assertEquals(t._parsed_url.password, 'h@t')
805
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
769
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
770
        self.assertEquals(t._host, 'exAmple.com')
 
771
        self.assertEquals(t._port, 2222)
 
772
        self.assertEquals(t._user, 'robey')
 
773
        self.assertEquals(t._password, 'h@t')
 
774
        self.assertEquals(t._path, '/path/')
806
775
 
807
776
        # Base should not keep track of the password
808
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
777
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
809
778
 
810
779
    def test_parse_invalid_url(self):
811
780
        self.assertRaises(errors.InvalidURL,
812
 
                          transport.ConnectedTransport,
 
781
                          ConnectedTransport,
813
782
                          'sftp://lily.org:~janneke/public/bzr/gub')
814
783
 
815
784
    def test_relpath(self):
816
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
785
        t = ConnectedTransport('sftp://user@host.com/abs/path')
817
786
 
818
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
819
 
            'sub')
 
787
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
820
788
        self.assertRaises(errors.PathNotChild, t.relpath,
821
789
                          'http://user@host.com/abs/path/sub')
822
790
        self.assertRaises(errors.PathNotChild, t.relpath,
826
794
        self.assertRaises(errors.PathNotChild, t.relpath,
827
795
                          'sftp://user@host.com:33/abs/path/sub')
828
796
        # Make sure it works when we don't supply a username
829
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
797
        t = ConnectedTransport('sftp://host.com/abs/path')
830
798
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
831
799
 
832
800
        # Make sure it works when parts of the path will be url encoded
833
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
801
        t = ConnectedTransport('sftp://host.com/dev/%path')
834
802
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
835
803
 
836
804
    def test_connection_sharing_propagate_credentials(self):
837
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
838
 
        self.assertEquals('user', t._parsed_url.user)
839
 
        self.assertEquals('host.com', t._parsed_url.host)
 
805
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
806
        self.assertEquals('user', t._user)
 
807
        self.assertEquals('host.com', t._host)
840
808
        self.assertIs(None, t._get_connection())
841
 
        self.assertIs(None, t._parsed_url.password)
 
809
        self.assertIs(None, t._password)
842
810
        c = t.clone('subdir')
843
811
        self.assertIs(None, c._get_connection())
844
 
        self.assertIs(None, t._parsed_url.password)
 
812
        self.assertIs(None, t._password)
845
813
 
846
814
        # Simulate the user entering a password
847
815
        password = 'secret'
861
829
        self.assertIs(new_password, c._get_credentials())
862
830
 
863
831
 
864
 
class TestReusedTransports(tests.TestCase):
 
832
class TestReusedTransports(TestCase):
865
833
    """Tests for transport reuse"""
866
834
 
867
835
    def test_reuse_same_transport(self):
868
836
        possible_transports = []
869
 
        t1 = transport.get_transport_from_url('http://foo/',
870
 
                                     possible_transports=possible_transports)
 
837
        t1 = get_transport('http://foo/',
 
838
                           possible_transports=possible_transports)
871
839
        self.assertEqual([t1], possible_transports)
872
 
        t2 = transport.get_transport_from_url('http://foo/',
873
 
                                     possible_transports=[t1])
 
840
        t2 = get_transport('http://foo/', possible_transports=[t1])
874
841
        self.assertIs(t1, t2)
875
842
 
876
843
        # Also check that final '/' are handled correctly
877
 
        t3 = transport.get_transport_from_url('http://foo/path/')
878
 
        t4 = transport.get_transport_from_url('http://foo/path',
879
 
                                     possible_transports=[t3])
 
844
        t3 = get_transport('http://foo/path/')
 
845
        t4 = get_transport('http://foo/path', possible_transports=[t3])
880
846
        self.assertIs(t3, t4)
881
847
 
882
 
        t5 = transport.get_transport_from_url('http://foo/path')
883
 
        t6 = transport.get_transport_from_url('http://foo/path/',
884
 
                                     possible_transports=[t5])
 
848
        t5 = get_transport('http://foo/path')
 
849
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
885
850
        self.assertIs(t5, t6)
886
851
 
887
852
    def test_don_t_reuse_different_transport(self):
888
 
        t1 = transport.get_transport_from_url('http://foo/path')
889
 
        t2 = transport.get_transport_from_url('http://bar/path',
890
 
                                     possible_transports=[t1])
 
853
        t1 = get_transport('http://foo/path')
 
854
        t2 = get_transport('http://bar/path', possible_transports=[t1])
891
855
        self.assertIsNot(t1, t2)
892
856
 
893
857
 
894
 
class TestTransportTrace(tests.TestCase):
 
858
class TestTransportTrace(TestCase):
895
859
 
896
 
    def test_decorator(self):
897
 
        t = transport.get_transport_from_url('trace+memory://')
 
860
    def test_get(self):
 
861
        transport = get_transport('trace+memory://')
898
862
        self.assertIsInstance(
899
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
863
            transport, bzrlib.transport.trace.TransportTraceDecorator)
900
864
 
901
865
    def test_clone_preserves_activity(self):
902
 
        t = transport.get_transport_from_url('trace+memory://')
903
 
        t2 = t.clone('.')
904
 
        self.assertTrue(t is not t2)
905
 
        self.assertTrue(t._activity is t2._activity)
 
866
        transport = get_transport('trace+memory://')
 
867
        transport2 = transport.clone('.')
 
868
        self.assertTrue(transport is not transport2)
 
869
        self.assertTrue(transport._activity is transport2._activity)
906
870
 
907
871
    # the following specific tests are for the operations that have made use of
908
872
    # logging in tests; we could test every single operation but doing that
909
873
    # still won't cause a test failure when the top level Transport API
910
874
    # changes; so there is little return doing that.
911
875
    def test_get(self):
912
 
        t = transport.get_transport_from_url('trace+memory:///')
913
 
        t.put_bytes('foo', 'barish')
914
 
        t.get('foo')
 
876
        transport = get_transport('trace+memory:///')
 
877
        transport.put_bytes('foo', 'barish')
 
878
        transport.get('foo')
915
879
        expected_result = []
916
880
        # put_bytes records the bytes, not the content to avoid memory
917
881
        # pressure.
918
882
        expected_result.append(('put_bytes', 'foo', 6, None))
919
883
        # get records the file name only.
920
884
        expected_result.append(('get', 'foo'))
921
 
        self.assertEqual(expected_result, t._activity)
 
885
        self.assertEqual(expected_result, transport._activity)
922
886
 
923
887
    def test_readv(self):
924
 
        t = transport.get_transport_from_url('trace+memory:///')
925
 
        t.put_bytes('foo', 'barish')
926
 
        list(t.readv('foo', [(0, 1), (3, 2)],
927
 
                     adjust_for_latency=True, upper_limit=6))
 
888
        transport = get_transport('trace+memory:///')
 
889
        transport.put_bytes('foo', 'barish')
 
890
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
891
            upper_limit=6))
928
892
        expected_result = []
929
893
        # put_bytes records the bytes, not the content to avoid memory
930
894
        # pressure.
931
895
        expected_result.append(('put_bytes', 'foo', 6, None))
932
896
        # readv records the supplied offset request
933
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
934
 
        self.assertEqual(expected_result, t._activity)
 
898
        self.assertEqual(expected_result, transport._activity)
935
899
 
936
900
 
937
901
class TestSSHConnections(tests.TestCaseWithTransport):
938
902
 
939
903
    def test_bzr_connect_to_bzr_ssh(self):
940
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
904
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
941
905
 
942
906
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
943
907
        """
950
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
951
915
        # override the interface (doesn't change self._vendor).
952
916
        # Note that this does encryption, so can be slow.
953
 
        from bzrlib.tests import stub_sftp
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
954
919
 
955
920
        # Start an SSH server
956
921
        self.command_executed = []
959
924
        # SSH channel ourselves.  Surely this has already been implemented
960
925
        # elsewhere?
961
926
        started = []
962
 
 
963
 
        class StubSSHServer(stub_sftp.StubServer):
 
927
        class StubSSHServer(StubServer):
964
928
 
965
929
            test = self
966
930
 
971
935
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
972
936
 
973
937
                # XXX: horribly inefficient, not to mention ugly.
974
 
                # Start a thread for each of stdin/out/err, and relay bytes
975
 
                # from the subprocess to channel and vice versa.
 
938
                # Start a thread for each of stdin/out/err, and relay bytes from
 
939
                # the subprocess to channel and vice versa.
976
940
                def ferry_bytes(read, write, close):
977
941
                    while True:
978
942
                        bytes = read(1)
994
958
 
995
959
                return True
996
960
 
997
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
998
962
        # We *don't* want to override the default SSH vendor: the detected one
999
963
        # is the one to use.
1000
 
 
1001
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1002
 
        # inherits from SFTPServer which forces the SSH vendor to
1003
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1004
964
        self.start_server(ssh_server)
1005
 
        port = ssh_server.port
 
965
        port = ssh_server._listener.port
1006
966
 
1007
967
        if sys.platform == 'win32':
1008
968
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1009
969
        else:
1010
970
            bzr_remote_path = self.get_bzr_path()
1011
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
971
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1012
972
 
1013
973
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1014
974
        # variable is used to tell bzr what command to run on the remote end.
1018
978
            # prefix a '/' to get the right path.
1019
979
            path_to_branch = '/' + path_to_branch
1020
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1021
 
        t = transport.get_transport(url)
 
981
        t = get_transport(url)
1022
982
        self.permit_url(t.base)
1023
983
        t.mkdir('foo')
1024
984
 
1035
995
        # And the rest are threads
1036
996
        for t in started[1:]:
1037
997
            t.join()
1038
 
 
1039
 
 
1040
 
class TestUnhtml(tests.TestCase):
1041
 
 
1042
 
    """Tests for unhtml_roughly"""
1043
 
 
1044
 
    def test_truncation(self):
1045
 
        fake_html = "<p>something!\n" * 1000
1046
 
        result = http.unhtml_roughly(fake_html)
1047
 
        self.assertEquals(len(result), 1000)
1048
 
        self.assertStartsWith(result, " something!")
1049
 
 
1050
 
 
1051
 
class SomeDirectory(object):
1052
 
 
1053
 
    def look_up(self, name, url):
1054
 
        return "http://bar"
1055
 
 
1056
 
 
1057
 
class TestLocationToUrl(tests.TestCase):
1058
 
 
1059
 
    def get_base_location(self):
1060
 
        path = osutils.abspath('/foo/bar')
1061
 
        if path.startswith('/'):
1062
 
            url = 'file://%s' % (path,)
1063
 
        else:
1064
 
            # On Windows, abspaths start with the drive letter, so we have to
1065
 
            # add in the extra '/'
1066
 
            url = 'file:///%s' % (path,)
1067
 
        return path, url
1068
 
 
1069
 
    def test_regular_url(self):
1070
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1071
 
 
1072
 
    def test_directory(self):
1073
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1074
 
        self.addCleanup(directories.remove, "bar:")
1075
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1076
 
 
1077
 
    def test_unicode_url(self):
1078
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1079
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1080
 
 
1081
 
    def test_unicode_path(self):
1082
 
        path, url = self.get_base_location()
1083
 
        location = path + "\xc3\xaf".decode("utf-8")
1084
 
        url += '%C3%AF'
1085
 
        self.assertEquals(url, location_to_url(location))
1086
 
 
1087
 
    def test_path(self):
1088
 
        path, url = self.get_base_location()
1089
 
        self.assertEquals(url, location_to_url(path))
1090
 
 
1091
 
    def test_relative_file_url(self):
1092
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1093
 
            location_to_url("file:bar"))
1094
 
 
1095
 
    def test_absolute_file_url(self):
1096
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))