~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-05 04:30:07 UTC
  • mfrom: (4932 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4934.
  • Revision ID: john@arbash-meinel.com-20100105043007-ehgbldqd3q0gtyws
Merge bzr.dev, resolve conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
import sys
22
22
import threading
23
23
 
 
24
import bzrlib
24
25
from bzrlib import (
25
26
    errors,
26
27
    osutils,
27
28
    tests,
28
 
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.directory_service import directories
32
 
from bzrlib.transport import (
33
 
    chroot,
34
 
    fakenfs,
35
 
    http,
36
 
    local,
37
 
    location_to_url,
38
 
    memory,
39
 
    pathfilter,
40
 
    readonly,
41
 
    )
42
 
import bzrlib.transport.trace
43
 
from bzrlib.tests import (
44
 
    features,
45
 
    test_server,
46
 
    )
 
31
from bzrlib.errors import (DependencyNotPresent,
 
32
                           FileExists,
 
33
                           InvalidURLJoin,
 
34
                           NoSuchFile,
 
35
                           PathNotChild,
 
36
                           ReadError,
 
37
                           UnsupportedProtocol,
 
38
                           )
 
39
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
40
from bzrlib.transport import (_clear_protocol_handlers,
 
41
                              _CoalescedOffset,
 
42
                              ConnectedTransport,
 
43
                              _get_protocol_handlers,
 
44
                              _set_protocol_handlers,
 
45
                              _get_transport_modules,
 
46
                              get_transport,
 
47
                              LateReadError,
 
48
                              register_lazy_transport,
 
49
                              register_transport_proto,
 
50
                              Transport,
 
51
                              )
 
52
from bzrlib.transport.chroot import ChrootServer
 
53
from bzrlib.transport.memory import MemoryTransport
 
54
from bzrlib.transport.local import (LocalTransport,
 
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
47
57
 
48
58
 
49
59
# TODO: Should possibly split transport-specific tests into their own files.
50
60
 
51
61
 
52
 
class TestTransport(tests.TestCase):
 
62
class TestTransport(TestCase):
53
63
    """Test the non transport-concrete class functionality."""
54
64
 
55
65
    def test__get_set_protocol_handlers(self):
56
 
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
66
        handlers = _get_protocol_handlers()
 
67
        self.assertNotEqual([], handlers.keys( ))
 
68
        try:
 
69
            _clear_protocol_handlers()
 
70
            self.assertEqual([], _get_protocol_handlers().keys())
 
71
        finally:
 
72
            _set_protocol_handlers(handlers)
61
73
 
62
74
    def test_get_transport_modules(self):
63
 
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
75
        handlers = _get_protocol_handlers()
65
76
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
67
 
 
 
77
        _clear_protocol_handlers()
68
78
        class SampleHandler(object):
69
79
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
80
        try:
 
81
            _clear_protocol_handlers()
 
82
            register_transport_proto('foo')
 
83
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
84
                                    'TestTransport.SampleHandler')
 
85
            register_transport_proto('bar')
 
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
87
                                    'TestTransport.SampleHandler')
 
88
            self.assertEqual([SampleHandler.__module__,
 
89
                              'bzrlib.transport.chroot',
 
90
                              'bzrlib.transport.pathfilter'],
 
91
                             _get_transport_modules())
 
92
        finally:
 
93
            _set_protocol_handlers(handlers)
83
94
 
84
95
    def test_transport_dependency(self):
85
96
        """Transport with missing dependency causes no error"""
86
 
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
97
        saved_handlers = _get_protocol_handlers()
88
98
        # don't pollute the current handlers
89
 
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
99
        _clear_protocol_handlers()
93
100
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
101
            register_transport_proto('foo')
 
102
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
103
                    'BadTransportHandler')
 
104
            try:
 
105
                get_transport('foo://fooserver/foo')
 
106
            except UnsupportedProtocol, e:
 
107
                e_str = str(e)
 
108
                self.assertEquals('Unsupported protocol'
 
109
                                  ' for url "foo://fooserver/foo":'
 
110
                                  ' Unable to import library "some_lib":'
 
111
                                  ' testing missing dependency', str(e))
 
112
            else:
 
113
                self.fail('Did not raise UnsupportedProtocol')
 
114
        finally:
 
115
            # restore original values
 
116
            _set_protocol_handlers(saved_handlers)
103
117
 
104
118
    def test_transport_fallback(self):
105
119
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
120
        saved_handlers = _get_protocol_handlers()
 
121
        try:
 
122
            _clear_protocol_handlers()
 
123
            register_transport_proto('foo')
 
124
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
125
                    'BackupTransportHandler')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BadTransportHandler')
 
128
            t = get_transport('foo://fooserver/foo')
 
129
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
130
        finally:
 
131
            _set_protocol_handlers(saved_handlers)
116
132
 
117
133
    def test_ssh_hints(self):
118
134
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
135
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol, e:
 
136
            get_transport('ssh://fooserver/foo')
 
137
        except UnsupportedProtocol, e:
122
138
            e_str = str(e)
123
139
            self.assertEquals('Unsupported protocol'
124
140
                              ' for url "ssh://fooserver/foo":'
125
 
                              ' bzr supports bzr+ssh to operate over ssh,'
126
 
                              ' use "bzr+ssh://fooserver/foo".',
 
141
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
127
142
                              str(e))
128
143
        else:
129
144
            self.fail('Did not raise UnsupportedProtocol')
130
145
 
131
146
    def test_LateReadError(self):
132
147
        """The LateReadError helper should raise on read()."""
133
 
        a_file = transport.LateReadError('a path')
 
148
        a_file = LateReadError('a path')
134
149
        try:
135
150
            a_file.read()
136
 
        except errors.ReadError, error:
 
151
        except ReadError, error:
137
152
            self.assertEqual('a path', error.path)
138
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
153
        self.assertRaises(ReadError, a_file.read, 40)
139
154
        a_file.close()
140
155
 
 
156
    def test__combine_paths(self):
 
157
        t = Transport('/')
 
158
        self.assertEqual('/home/sarah/project/foo',
 
159
                         t._combine_paths('/home/sarah', 'project/foo'))
 
160
        self.assertEqual('/etc',
 
161
                         t._combine_paths('/home/sarah', '../../etc'))
 
162
        self.assertEqual('/etc',
 
163
                         t._combine_paths('/home/sarah', '../../../etc'))
 
164
        self.assertEqual('/etc',
 
165
                         t._combine_paths('/home/sarah', '/etc'))
 
166
 
141
167
    def test_local_abspath_non_local_transport(self):
142
168
        # the base implementation should throw
143
 
        t = memory.MemoryTransport()
 
169
        t = MemoryTransport()
144
170
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
145
171
        self.assertEqual('memory:///t is not a local path.', str(e))
146
172
 
147
173
 
148
 
class TestCoalesceOffsets(tests.TestCase):
 
174
class TestCoalesceOffsets(TestCase):
149
175
 
150
176
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
151
 
        coalesce = transport.Transport._coalesce_offsets
152
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
177
        coalesce = Transport._coalesce_offsets
 
178
        exp = [_CoalescedOffset(*x) for x in expected]
153
179
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
154
180
                            max_size=max_size))
155
181
        self.assertEqual(exp, out)
200
226
 
201
227
    def test_coalesce_fudge(self):
202
228
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
229
                    (100, 10, [(0, 10),]),
204
230
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
 
231
                   fudge=10
 
232
                  )
207
233
    def test_coalesce_max_size(self):
208
234
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
235
                    (30, 50, [(0, 50)]),
210
236
                    # If one range is above max_size, it gets its own coalesced
211
237
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
 
238
                    (100, 80, [(0, 80),]),],
213
239
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
 
240
                   max_size=50
 
241
                  )
215
242
 
216
243
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
244
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
245
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
219
246
                  )
220
247
 
221
248
    def test_coalesce_default_limit(self):
222
249
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
250
        ten_mb = 10*1024*1024
 
251
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
225
252
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
253
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
254
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
255
                   [(i*ten_mb, ten_mb) for i in range(11)],
229
256
                   max_size=1*1024*1024*1024)
230
257
 
231
258
 
232
 
class TestMemoryServer(tests.TestCase):
233
 
 
234
 
    def test_create_server(self):
235
 
        server = memory.MemoryServer()
236
 
        server.start_server()
237
 
        url = server.get_url()
238
 
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
240
 
        del t
241
 
        server.stop_server()
242
 
        self.assertFalse(url in transport.transport_list_registry)
243
 
        self.assertRaises(errors.UnsupportedProtocol,
244
 
                          transport.get_transport, url)
245
 
 
246
 
 
247
 
class TestMemoryTransport(tests.TestCase):
 
259
class TestMemoryTransport(TestCase):
248
260
 
249
261
    def test_get_transport(self):
250
 
        memory.MemoryTransport()
 
262
        MemoryTransport()
251
263
 
252
264
    def test_clone(self):
253
 
        t = memory.MemoryTransport()
254
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
255
 
        self.assertEqual("memory:///", t.clone("/").base)
 
265
        transport = MemoryTransport()
 
266
        self.assertTrue(isinstance(transport, MemoryTransport))
 
267
        self.assertEqual("memory:///", transport.clone("/").base)
256
268
 
257
269
    def test_abspath(self):
258
 
        t = memory.MemoryTransport()
259
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
270
        transport = MemoryTransport()
 
271
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
260
272
 
261
273
    def test_abspath_of_root(self):
262
 
        t = memory.MemoryTransport()
263
 
        self.assertEqual("memory:///", t.base)
264
 
        self.assertEqual("memory:///", t.abspath('/'))
 
274
        transport = MemoryTransport()
 
275
        self.assertEqual("memory:///", transport.base)
 
276
        self.assertEqual("memory:///", transport.abspath('/'))
265
277
 
266
278
    def test_abspath_of_relpath_starting_at_root(self):
267
 
        t = memory.MemoryTransport()
268
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
279
        transport = MemoryTransport()
 
280
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
269
281
 
270
282
    def test_append_and_get(self):
271
 
        t = memory.MemoryTransport()
272
 
        t.append_bytes('path', 'content')
273
 
        self.assertEqual(t.get('path').read(), 'content')
274
 
        t.append_file('path', StringIO('content'))
275
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
283
        transport = MemoryTransport()
 
284
        transport.append_bytes('path', 'content')
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.append_file('path', StringIO('content'))
 
287
        self.assertEqual(transport.get('path').read(), 'contentcontent')
276
288
 
277
289
    def test_put_and_get(self):
278
 
        t = memory.MemoryTransport()
279
 
        t.put_file('path', StringIO('content'))
280
 
        self.assertEqual(t.get('path').read(), 'content')
281
 
        t.put_bytes('path', 'content')
282
 
        self.assertEqual(t.get('path').read(), 'content')
 
290
        transport = MemoryTransport()
 
291
        transport.put_file('path', StringIO('content'))
 
292
        self.assertEqual(transport.get('path').read(), 'content')
 
293
        transport.put_bytes('path', 'content')
 
294
        self.assertEqual(transport.get('path').read(), 'content')
283
295
 
284
296
    def test_append_without_dir_fails(self):
285
 
        t = memory.MemoryTransport()
286
 
        self.assertRaises(errors.NoSuchFile,
287
 
                          t.append_bytes, 'dir/path', 'content')
 
297
        transport = MemoryTransport()
 
298
        self.assertRaises(NoSuchFile,
 
299
                          transport.append_bytes, 'dir/path', 'content')
288
300
 
289
301
    def test_put_without_dir_fails(self):
290
 
        t = memory.MemoryTransport()
291
 
        self.assertRaises(errors.NoSuchFile,
292
 
                          t.put_file, 'dir/path', StringIO('content'))
 
302
        transport = MemoryTransport()
 
303
        self.assertRaises(NoSuchFile,
 
304
                          transport.put_file, 'dir/path', StringIO('content'))
293
305
 
294
306
    def test_get_missing(self):
295
 
        transport = memory.MemoryTransport()
296
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
307
        transport = MemoryTransport()
 
308
        self.assertRaises(NoSuchFile, transport.get, 'foo')
297
309
 
298
310
    def test_has_missing(self):
299
 
        t = memory.MemoryTransport()
300
 
        self.assertEquals(False, t.has('foo'))
 
311
        transport = MemoryTransport()
 
312
        self.assertEquals(False, transport.has('foo'))
301
313
 
302
314
    def test_has_present(self):
303
 
        t = memory.MemoryTransport()
304
 
        t.append_bytes('foo', 'content')
305
 
        self.assertEquals(True, t.has('foo'))
 
315
        transport = MemoryTransport()
 
316
        transport.append_bytes('foo', 'content')
 
317
        self.assertEquals(True, transport.has('foo'))
306
318
 
307
319
    def test_list_dir(self):
308
 
        t = memory.MemoryTransport()
309
 
        t.put_bytes('foo', 'content')
310
 
        t.mkdir('dir')
311
 
        t.put_bytes('dir/subfoo', 'content')
312
 
        t.put_bytes('dirlike', 'content')
 
320
        transport = MemoryTransport()
 
321
        transport.put_bytes('foo', 'content')
 
322
        transport.mkdir('dir')
 
323
        transport.put_bytes('dir/subfoo', 'content')
 
324
        transport.put_bytes('dirlike', 'content')
313
325
 
314
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
315
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
326
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
327
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
316
328
 
317
329
    def test_mkdir(self):
318
 
        t = memory.MemoryTransport()
319
 
        t.mkdir('dir')
320
 
        t.append_bytes('dir/path', 'content')
321
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
330
        transport = MemoryTransport()
 
331
        transport.mkdir('dir')
 
332
        transport.append_bytes('dir/path', 'content')
 
333
        self.assertEqual(transport.get('dir/path').read(), 'content')
322
334
 
323
335
    def test_mkdir_missing_parent(self):
324
 
        t = memory.MemoryTransport()
325
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
336
        transport = MemoryTransport()
 
337
        self.assertRaises(NoSuchFile,
 
338
                          transport.mkdir, 'dir/dir')
326
339
 
327
340
    def test_mkdir_twice(self):
328
 
        t = memory.MemoryTransport()
329
 
        t.mkdir('dir')
330
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
341
        transport = MemoryTransport()
 
342
        transport.mkdir('dir')
 
343
        self.assertRaises(FileExists, transport.mkdir, 'dir')
331
344
 
332
345
    def test_parameters(self):
333
 
        t = memory.MemoryTransport()
334
 
        self.assertEqual(True, t.listable())
335
 
        self.assertEqual(False, t.is_readonly())
 
346
        transport = MemoryTransport()
 
347
        self.assertEqual(True, transport.listable())
 
348
        self.assertEqual(False, transport.is_readonly())
336
349
 
337
350
    def test_iter_files_recursive(self):
338
 
        t = memory.MemoryTransport()
339
 
        t.mkdir('dir')
340
 
        t.put_bytes('dir/foo', 'content')
341
 
        t.put_bytes('dir/bar', 'content')
342
 
        t.put_bytes('bar', 'content')
343
 
        paths = set(t.iter_files_recursive())
 
351
        transport = MemoryTransport()
 
352
        transport.mkdir('dir')
 
353
        transport.put_bytes('dir/foo', 'content')
 
354
        transport.put_bytes('dir/bar', 'content')
 
355
        transport.put_bytes('bar', 'content')
 
356
        paths = set(transport.iter_files_recursive())
344
357
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
345
358
 
346
359
    def test_stat(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.put_bytes('foo', 'content')
349
 
        t.put_bytes('bar', 'phowar')
350
 
        self.assertEqual(7, t.stat('foo').st_size)
351
 
        self.assertEqual(6, t.stat('bar').st_size)
352
 
 
353
 
 
354
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
360
        transport = MemoryTransport()
 
361
        transport.put_bytes('foo', 'content')
 
362
        transport.put_bytes('bar', 'phowar')
 
363
        self.assertEqual(7, transport.stat('foo').st_size)
 
364
        self.assertEqual(6, transport.stat('bar').st_size)
 
365
 
 
366
 
 
367
class ChrootDecoratorTransportTest(TestCase):
355
368
    """Chroot decoration specific tests."""
356
369
 
357
370
    def test_abspath(self):
358
371
        # The abspath is always relative to the chroot_url.
359
 
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
372
        server = ChrootServer(get_transport('memory:///foo/bar/'))
361
373
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
363
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
374
        transport = get_transport(server.get_url())
 
375
        self.assertEqual(server.get_url(), transport.abspath('/'))
364
376
 
365
 
        subdir_t = t.clone('subdir')
366
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
377
        subdir_transport = transport.clone('subdir')
 
378
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
367
379
 
368
380
    def test_clone(self):
369
 
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
381
        server = ChrootServer(get_transport('memory:///foo/bar/'))
371
382
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
383
        transport = get_transport(server.get_url())
373
384
        # relpath from root and root path are the same
374
 
        relpath_cloned = t.clone('foo')
375
 
        abspath_cloned = t.clone('/foo')
 
385
        relpath_cloned = transport.clone('foo')
 
386
        abspath_cloned = transport.clone('/foo')
376
387
        self.assertEqual(server, relpath_cloned.server)
377
388
        self.assertEqual(server, abspath_cloned.server)
378
389
 
384
395
        This is so that it is not possible to escape a chroot by doing::
385
396
            url = chroot_transport.base
386
397
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
398
            new_transport = get_transport(parent_url)
388
399
        """
389
 
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
400
        server = ChrootServer(get_transport('memory:///path/subpath'))
391
401
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
394
 
        self.assertEqual(t.server, new_t.server)
395
 
        self.assertEqual(t.base, new_t.base)
 
402
        transport = get_transport(server.get_url())
 
403
        new_transport = get_transport(transport.base)
 
404
        self.assertEqual(transport.server, new_transport.server)
 
405
        self.assertEqual(transport.base, new_transport.base)
396
406
 
397
407
    def test_urljoin_preserves_chroot(self):
398
408
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
401
411
        This is so that it is not possible to escape a chroot by doing::
402
412
            url = chroot_transport.base
403
413
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
414
            new_transport = get_transport(parent_url)
405
415
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
 
416
        server = ChrootServer(get_transport('memory:///path/'))
408
417
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
418
        transport = get_transport(server.get_url())
410
419
        self.assertRaises(
411
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
 
 
413
 
 
414
 
class TestChrootServer(tests.TestCase):
 
420
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
421
 
 
422
 
 
423
class ChrootServerTest(TestCase):
415
424
 
416
425
    def test_construct(self):
417
 
        backing_transport = memory.MemoryTransport()
418
 
        server = chroot.ChrootServer(backing_transport)
 
426
        backing_transport = MemoryTransport()
 
427
        server = ChrootServer(backing_transport)
419
428
        self.assertEqual(backing_transport, server.backing_transport)
420
429
 
421
430
    def test_setUp(self):
422
 
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
424
 
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
431
        backing_transport = MemoryTransport()
 
432
        server = ChrootServer(backing_transport)
 
433
        server.setUp()
 
434
        try:
 
435
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
436
        finally:
 
437
            server.tearDown()
428
438
 
429
 
    def test_stop_server(self):
430
 
        backing_transport = memory.MemoryTransport()
431
 
        server = chroot.ChrootServer(backing_transport)
432
 
        server.start_server()
433
 
        server.stop_server()
434
 
        self.assertFalse(server.scheme
435
 
                         in transport._get_protocol_handlers().keys())
 
439
    def test_tearDown(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        server.tearDown()
 
444
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
436
445
 
437
446
    def test_get_url(self):
438
 
        backing_transport = memory.MemoryTransport()
439
 
        server = chroot.ChrootServer(backing_transport)
440
 
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
442
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
443
 
 
444
 
 
445
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
447
        backing_transport = MemoryTransport()
 
448
        server = ChrootServer(backing_transport)
 
449
        server.setUp()
 
450
        try:
 
451
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
452
        finally:
 
453
            server.tearDown()
 
454
 
 
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
446
457
    """Pathfilter decoration specific tests."""
447
458
 
448
459
    def test_abspath(self):
449
460
        # The abspath is always relative to the base of the backing transport.
450
 
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
452
462
            lambda x: x)
453
 
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
455
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
456
466
 
457
 
        subdir_t = t.clone('subdir')
458
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
459
 
        server.stop_server()
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
469
        server.tearDown()
460
470
 
461
471
    def make_pf_transport(self, filter_func=None):
462
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
 
473
        
464
474
        :param filter_func: by default this will be a no-op function.  Use this
465
475
            parameter to override it."""
466
476
        if filter_func is None:
467
477
            filter_func = lambda x: x
468
 
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
470
 
        server.start_server()
471
 
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
473
483
 
474
484
    def test__filter(self):
475
485
        # _filter (with an identity func as filter_func) always returns
476
486
        # paths relative to the base of the backing transport.
477
 
        t = self.make_pf_transport()
478
 
        self.assertEqual('foo', t._filter('foo'))
479
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
480
 
        self.assertEqual('', t._filter('..'))
481
 
        self.assertEqual('', t._filter('/'))
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
482
492
        # The base of the pathfiltering transport is taken into account too.
483
 
        t = t.clone('subdir1/subdir2')
484
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
485
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
486
 
        self.assertEqual('subdir1', t._filter('..'))
487
 
        self.assertEqual('', t._filter('/'))
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
488
499
 
489
500
    def test_filter_invocation(self):
490
501
        filter_log = []
491
 
 
492
502
        def filter(path):
493
503
            filter_log.append(path)
494
504
            return path
495
 
        t = self.make_pf_transport(filter)
496
 
        t.has('abc')
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
497
507
        self.assertEqual(['abc'], filter_log)
498
508
        del filter_log[:]
499
 
        t.clone('abc').has('xyz')
 
509
        transport.clone('abc').has('xyz')
500
510
        self.assertEqual(['abc/xyz'], filter_log)
501
511
        del filter_log[:]
502
 
        t.has('/abc')
 
512
        transport.has('/abc')
503
513
        self.assertEqual(['abc'], filter_log)
504
514
 
505
515
    def test_clone(self):
506
 
        t = self.make_pf_transport()
 
516
        transport = self.make_pf_transport()
507
517
        # relpath from root and root path are the same
508
 
        relpath_cloned = t.clone('foo')
509
 
        abspath_cloned = t.clone('/foo')
510
 
        self.assertEqual(t.server, relpath_cloned.server)
511
 
        self.assertEqual(t.server, abspath_cloned.server)
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
512
522
 
513
523
    def test_url_preserves_pathfiltering(self):
514
524
        """Calling get_transport on a pathfiltered transport's base should
519
529
        otherwise) the filtering by doing::
520
530
            url = filtered_transport.base
521
531
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
 
532
            new_transport = get_transport(parent_url)
523
533
        """
524
 
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
526
 
        self.assertEqual(t.server, new_t.server)
527
 
        self.assertEqual(t.base, new_t.base)
528
 
 
529
 
 
530
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
 
539
 
 
540
class ReadonlyDecoratorTransportTest(TestCase):
531
541
    """Readonly decoration specific tests."""
532
542
 
533
543
    def test_local_parameters(self):
 
544
        import bzrlib.transport.readonly as readonly
534
545
        # connect to . in readonly mode
535
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
536
 
        self.assertEqual(True, t.listable())
537
 
        self.assertEqual(True, t.is_readonly())
 
546
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
547
        self.assertEqual(True, transport.listable())
 
548
        self.assertEqual(True, transport.is_readonly())
538
549
 
539
550
    def test_http_parameters(self):
540
551
        from bzrlib.tests.http_server import HttpServer
 
552
        import bzrlib.transport.readonly as readonly
541
553
        # connect to '.' via http which is not listable
542
554
        server = HttpServer()
543
555
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
546
 
        self.assertEqual(False, t.listable())
547
 
        self.assertEqual(True, t.is_readonly())
548
 
 
549
 
 
550
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
556
        transport = get_transport('readonly+' + server.get_url())
 
557
        self.failUnless(isinstance(transport,
 
558
                                   readonly.ReadonlyTransportDecorator))
 
559
        self.assertEqual(False, transport.listable())
 
560
        self.assertEqual(True, transport.is_readonly())
 
561
 
 
562
 
 
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
551
564
    """NFS decorator specific tests."""
552
565
 
553
566
    def get_nfs_transport(self, url):
 
567
        import bzrlib.transport.fakenfs as fakenfs
554
568
        # connect to url with nfs decoration
555
569
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
556
570
 
557
571
    def test_local_parameters(self):
558
572
        # the listable and is_readonly parameters
559
573
        # are not changed by the fakenfs decorator
560
 
        t = self.get_nfs_transport('.')
561
 
        self.assertEqual(True, t.listable())
562
 
        self.assertEqual(False, t.is_readonly())
 
574
        transport = self.get_nfs_transport('.')
 
575
        self.assertEqual(True, transport.listable())
 
576
        self.assertEqual(False, transport.is_readonly())
563
577
 
564
578
    def test_http_parameters(self):
565
579
        # the listable and is_readonly parameters
568
582
        # connect to '.' via http which is not listable
569
583
        server = HttpServer()
570
584
        self.start_server(server)
571
 
        t = self.get_nfs_transport(server.get_url())
572
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
573
 
        self.assertEqual(False, t.listable())
574
 
        self.assertEqual(True, t.is_readonly())
 
585
        transport = self.get_nfs_transport(server.get_url())
 
586
        self.assertIsInstance(
 
587
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
588
        self.assertEqual(False, transport.listable())
 
589
        self.assertEqual(True, transport.is_readonly())
575
590
 
576
591
    def test_fakenfs_server_default(self):
577
592
        # a FakeNFSServer() should bring up a local relpath server for itself
578
 
        server = test_server.FakeNFSServer()
 
593
        import bzrlib.transport.fakenfs as fakenfs
 
594
        server = fakenfs.FakeNFSServer()
579
595
        self.start_server(server)
580
596
        # the url should be decorated appropriately
581
597
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
598
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
 
599
        transport = get_transport(server.get_url())
584
600
        # which must be a FakeNFSTransportDecorator instance.
585
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
601
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
586
602
 
587
603
    def test_fakenfs_rename_semantics(self):
588
604
        # a FakeNFS transport must mangle the way rename errors occur to
589
605
        # look like NFS problems.
590
 
        t = self.get_nfs_transport('.')
 
606
        transport = self.get_nfs_transport('.')
591
607
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
592
 
                        transport=t)
593
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
594
 
 
595
 
 
596
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
608
                        transport=transport)
 
609
        self.assertRaises(errors.ResourceBusy,
 
610
                          transport.rename, 'from', 'to')
 
611
 
 
612
 
 
613
class FakeVFATDecoratorTests(TestCaseInTempDir):
597
614
    """Tests for simulation of VFAT restrictions"""
598
615
 
599
616
    def get_vfat_transport(self, url):
603
620
 
604
621
    def test_transport_creation(self):
605
622
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
606
 
        t = self.get_vfat_transport('.')
607
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
623
        transport = self.get_vfat_transport('.')
 
624
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
608
625
 
609
626
    def test_transport_mkdir(self):
610
 
        t = self.get_vfat_transport('.')
611
 
        t.mkdir('HELLO')
612
 
        self.assertTrue(t.has('hello'))
613
 
        self.assertTrue(t.has('Hello'))
 
627
        transport = self.get_vfat_transport('.')
 
628
        transport.mkdir('HELLO')
 
629
        self.assertTrue(transport.has('hello'))
 
630
        self.assertTrue(transport.has('Hello'))
614
631
 
615
632
    def test_forbidden_chars(self):
616
 
        t = self.get_vfat_transport('.')
617
 
        self.assertRaises(ValueError, t.has, "<NU>")
618
 
 
619
 
 
620
 
class BadTransportHandler(transport.Transport):
 
633
        transport = self.get_vfat_transport('.')
 
634
        self.assertRaises(ValueError, transport.has, "<NU>")
 
635
 
 
636
 
 
637
class BadTransportHandler(Transport):
621
638
    def __init__(self, base_url):
622
 
        raise errors.DependencyNotPresent('some_lib',
623
 
                                          'testing missing dependency')
624
 
 
625
 
 
626
 
class BackupTransportHandler(transport.Transport):
 
639
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
640
 
 
641
 
 
642
class BackupTransportHandler(Transport):
627
643
    """Test transport that works as a backup for the BadTransportHandler"""
628
644
    pass
629
645
 
630
646
 
631
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
647
class TestTransportImplementation(TestCaseInTempDir):
632
648
    """Implementation verification for transports.
633
649
 
634
650
    To verify a transport we need a server factory, which is a callable
663
679
        base_url = self._server.get_url()
664
680
        url = self._adjust_url(base_url, relpath)
665
681
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
 
682
        t = get_transport(url)
667
683
        # vila--20070607 if the following are commented out the test suite
668
684
        # still pass. Is this really still needed or was it a forgotten
669
685
        # temporary fix ?
674
690
        return t
675
691
 
676
692
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
 
class TestLocalTransports(tests.TestCase):
 
693
class TestLocalTransports(TestCase):
715
694
 
716
695
    def test_get_transport_from_abspath(self):
717
696
        here = osutils.abspath('.')
718
 
        t = transport.get_transport(here)
719
 
        self.assertIsInstance(t, local.LocalTransport)
 
697
        t = get_transport(here)
 
698
        self.assertIsInstance(t, LocalTransport)
720
699
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
721
700
 
722
701
    def test_get_transport_from_relpath(self):
723
702
        here = osutils.abspath('.')
724
 
        t = transport.get_transport('.')
725
 
        self.assertIsInstance(t, local.LocalTransport)
 
703
        t = get_transport('.')
 
704
        self.assertIsInstance(t, LocalTransport)
726
705
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
727
706
 
728
707
    def test_get_transport_from_local_url(self):
729
708
        here = osutils.abspath('.')
730
709
        here_url = urlutils.local_path_to_url(here) + '/'
731
 
        t = transport.get_transport(here_url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
 
710
        t = get_transport(here_url)
 
711
        self.assertIsInstance(t, LocalTransport)
733
712
        self.assertEquals(t.base, here_url)
734
713
 
735
714
    def test_local_abspath(self):
736
715
        here = osutils.abspath('.')
737
 
        t = transport.get_transport(here)
 
716
        t = get_transport(here)
738
717
        self.assertEquals(t.local_abspath(''), here)
739
718
 
740
719
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
    def test_missing_directory(self):
764
 
        t = self.get_transport('.')
765
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
766
 
 
767
 
 
768
 
class TestWin32LocalTransport(tests.TestCase):
 
720
class TestWin32LocalTransport(TestCase):
769
721
 
770
722
    def test_unc_clone_to_root(self):
771
723
        # Win32 UNC path like \\HOST\path
772
724
        # clone to root should stop at least at \\HOST part
773
725
        # not on \\
774
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
726
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
775
727
        for i in xrange(4):
776
728
            t = t.clone('..')
777
729
        self.assertEquals(t.base, 'file://HOST/')
780
732
        self.assertEquals(t.base, 'file://HOST/')
781
733
 
782
734
 
783
 
class TestConnectedTransport(tests.TestCase):
 
735
class TestConnectedTransport(TestCase):
784
736
    """Tests for connected to remote server transports"""
785
737
 
786
738
    def test_parse_url(self):
787
 
        t = transport.ConnectedTransport(
788
 
            'http://simple.example.com/home/source')
789
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
790
 
        self.assertEquals(t._parsed_url.port, None)
791
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
792
 
        self.assertTrue(t._parsed_url.user is None)
793
 
        self.assertTrue(t._parsed_url.password is None)
 
739
        t = ConnectedTransport('http://simple.example.com/home/source')
 
740
        self.assertEquals(t._host, 'simple.example.com')
 
741
        self.assertEquals(t._port, None)
 
742
        self.assertEquals(t._path, '/home/source/')
 
743
        self.failUnless(t._user is None)
 
744
        self.failUnless(t._password is None)
794
745
 
795
746
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
796
747
 
797
748
    def test_parse_url_with_at_in_user(self):
798
749
        # Bug 228058
799
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
800
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
750
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
751
        self.assertEquals(t._user, 'user@host.com')
801
752
 
802
753
    def test_parse_quoted_url(self):
803
 
        t = transport.ConnectedTransport(
804
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
805
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
806
 
        self.assertEquals(t._parsed_url.port, 2222)
807
 
        self.assertEquals(t._parsed_url.user, 'robey')
808
 
        self.assertEquals(t._parsed_url.password, 'h@t')
809
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
754
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
755
        self.assertEquals(t._host, 'exAmple.com')
 
756
        self.assertEquals(t._port, 2222)
 
757
        self.assertEquals(t._user, 'robey')
 
758
        self.assertEquals(t._password, 'h@t')
 
759
        self.assertEquals(t._path, '/path/')
810
760
 
811
761
        # Base should not keep track of the password
812
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
762
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
813
763
 
814
764
    def test_parse_invalid_url(self):
815
765
        self.assertRaises(errors.InvalidURL,
816
 
                          transport.ConnectedTransport,
 
766
                          ConnectedTransport,
817
767
                          'sftp://lily.org:~janneke/public/bzr/gub')
818
768
 
819
769
    def test_relpath(self):
820
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
770
        t = ConnectedTransport('sftp://user@host.com/abs/path')
821
771
 
822
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
823
 
            'sub')
 
772
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
824
773
        self.assertRaises(errors.PathNotChild, t.relpath,
825
774
                          'http://user@host.com/abs/path/sub')
826
775
        self.assertRaises(errors.PathNotChild, t.relpath,
830
779
        self.assertRaises(errors.PathNotChild, t.relpath,
831
780
                          'sftp://user@host.com:33/abs/path/sub')
832
781
        # Make sure it works when we don't supply a username
833
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
782
        t = ConnectedTransport('sftp://host.com/abs/path')
834
783
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
835
784
 
836
785
        # Make sure it works when parts of the path will be url encoded
837
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
786
        t = ConnectedTransport('sftp://host.com/dev/%path')
838
787
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
839
788
 
840
789
    def test_connection_sharing_propagate_credentials(self):
841
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
842
 
        self.assertEquals('user', t._parsed_url.user)
843
 
        self.assertEquals('host.com', t._parsed_url.host)
 
790
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
791
        self.assertEquals('user', t._user)
 
792
        self.assertEquals('host.com', t._host)
844
793
        self.assertIs(None, t._get_connection())
845
 
        self.assertIs(None, t._parsed_url.password)
 
794
        self.assertIs(None, t._password)
846
795
        c = t.clone('subdir')
847
796
        self.assertIs(None, c._get_connection())
848
 
        self.assertIs(None, t._parsed_url.password)
 
797
        self.assertIs(None, t._password)
849
798
 
850
799
        # Simulate the user entering a password
851
800
        password = 'secret'
865
814
        self.assertIs(new_password, c._get_credentials())
866
815
 
867
816
 
868
 
class TestReusedTransports(tests.TestCase):
 
817
class TestReusedTransports(TestCase):
869
818
    """Tests for transport reuse"""
870
819
 
871
820
    def test_reuse_same_transport(self):
872
821
        possible_transports = []
873
 
        t1 = transport.get_transport_from_url('http://foo/',
874
 
                                     possible_transports=possible_transports)
 
822
        t1 = get_transport('http://foo/',
 
823
                           possible_transports=possible_transports)
875
824
        self.assertEqual([t1], possible_transports)
876
 
        t2 = transport.get_transport_from_url('http://foo/',
877
 
                                     possible_transports=[t1])
 
825
        t2 = get_transport('http://foo/', possible_transports=[t1])
878
826
        self.assertIs(t1, t2)
879
827
 
880
828
        # Also check that final '/' are handled correctly
881
 
        t3 = transport.get_transport_from_url('http://foo/path/')
882
 
        t4 = transport.get_transport_from_url('http://foo/path',
883
 
                                     possible_transports=[t3])
 
829
        t3 = get_transport('http://foo/path/')
 
830
        t4 = get_transport('http://foo/path', possible_transports=[t3])
884
831
        self.assertIs(t3, t4)
885
832
 
886
 
        t5 = transport.get_transport_from_url('http://foo/path')
887
 
        t6 = transport.get_transport_from_url('http://foo/path/',
888
 
                                     possible_transports=[t5])
 
833
        t5 = get_transport('http://foo/path')
 
834
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
889
835
        self.assertIs(t5, t6)
890
836
 
891
837
    def test_don_t_reuse_different_transport(self):
892
 
        t1 = transport.get_transport_from_url('http://foo/path')
893
 
        t2 = transport.get_transport_from_url('http://bar/path',
894
 
                                     possible_transports=[t1])
 
838
        t1 = get_transport('http://foo/path')
 
839
        t2 = get_transport('http://bar/path', possible_transports=[t1])
895
840
        self.assertIsNot(t1, t2)
896
841
 
897
842
 
898
 
class TestTransportTrace(tests.TestCase):
 
843
class TestTransportTrace(TestCase):
899
844
 
900
 
    def test_decorator(self):
901
 
        t = transport.get_transport_from_url('trace+memory://')
 
845
    def test_get(self):
 
846
        transport = get_transport('trace+memory://')
902
847
        self.assertIsInstance(
903
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
848
            transport, bzrlib.transport.trace.TransportTraceDecorator)
904
849
 
905
850
    def test_clone_preserves_activity(self):
906
 
        t = transport.get_transport_from_url('trace+memory://')
907
 
        t2 = t.clone('.')
908
 
        self.assertTrue(t is not t2)
909
 
        self.assertTrue(t._activity is t2._activity)
 
851
        transport = get_transport('trace+memory://')
 
852
        transport2 = transport.clone('.')
 
853
        self.assertTrue(transport is not transport2)
 
854
        self.assertTrue(transport._activity is transport2._activity)
910
855
 
911
856
    # the following specific tests are for the operations that have made use of
912
857
    # logging in tests; we could test every single operation but doing that
913
858
    # still won't cause a test failure when the top level Transport API
914
859
    # changes; so there is little return doing that.
915
860
    def test_get(self):
916
 
        t = transport.get_transport_from_url('trace+memory:///')
917
 
        t.put_bytes('foo', 'barish')
918
 
        t.get('foo')
 
861
        transport = get_transport('trace+memory:///')
 
862
        transport.put_bytes('foo', 'barish')
 
863
        transport.get('foo')
919
864
        expected_result = []
920
865
        # put_bytes records the bytes, not the content to avoid memory
921
866
        # pressure.
922
867
        expected_result.append(('put_bytes', 'foo', 6, None))
923
868
        # get records the file name only.
924
869
        expected_result.append(('get', 'foo'))
925
 
        self.assertEqual(expected_result, t._activity)
 
870
        self.assertEqual(expected_result, transport._activity)
926
871
 
927
872
    def test_readv(self):
928
 
        t = transport.get_transport_from_url('trace+memory:///')
929
 
        t.put_bytes('foo', 'barish')
930
 
        list(t.readv('foo', [(0, 1), (3, 2)],
931
 
                     adjust_for_latency=True, upper_limit=6))
 
873
        transport = get_transport('trace+memory:///')
 
874
        transport.put_bytes('foo', 'barish')
 
875
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
876
            upper_limit=6))
932
877
        expected_result = []
933
878
        # put_bytes records the bytes, not the content to avoid memory
934
879
        # pressure.
935
880
        expected_result.append(('put_bytes', 'foo', 6, None))
936
881
        # readv records the supplied offset request
937
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
938
 
        self.assertEqual(expected_result, t._activity)
 
883
        self.assertEqual(expected_result, transport._activity)
939
884
 
940
885
 
941
886
class TestSSHConnections(tests.TestCaseWithTransport):
942
887
 
943
888
    def test_bzr_connect_to_bzr_ssh(self):
944
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
889
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
945
890
 
946
891
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
947
892
        """
954
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
955
900
        # override the interface (doesn't change self._vendor).
956
901
        # Note that this does encryption, so can be slow.
957
 
        from bzrlib.tests import stub_sftp
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
958
904
 
959
905
        # Start an SSH server
960
906
        self.command_executed = []
963
909
        # SSH channel ourselves.  Surely this has already been implemented
964
910
        # elsewhere?
965
911
        started = []
966
 
 
967
 
        class StubSSHServer(stub_sftp.StubServer):
 
912
        class StubSSHServer(StubServer):
968
913
 
969
914
            test = self
970
915
 
975
920
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
976
921
 
977
922
                # XXX: horribly inefficient, not to mention ugly.
978
 
                # Start a thread for each of stdin/out/err, and relay bytes
979
 
                # from the subprocess to channel and vice versa.
 
923
                # Start a thread for each of stdin/out/err, and relay bytes from
 
924
                # the subprocess to channel and vice versa.
980
925
                def ferry_bytes(read, write, close):
981
926
                    while True:
982
927
                        bytes = read(1)
998
943
 
999
944
                return True
1000
945
 
1001
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
946
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
1002
947
        # We *don't* want to override the default SSH vendor: the detected one
1003
948
        # is the one to use.
1004
 
 
1005
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1006
 
        # inherits from SFTPServer which forces the SSH vendor to
1007
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1008
949
        self.start_server(ssh_server)
1009
 
        port = ssh_server.port
 
950
        port = ssh_server._listener.port
1010
951
 
1011
952
        if sys.platform == 'win32':
1012
953
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1013
954
        else:
1014
955
            bzr_remote_path = self.get_bzr_path()
1015
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
956
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1016
957
 
1017
958
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1018
959
        # variable is used to tell bzr what command to run on the remote end.
1022
963
            # prefix a '/' to get the right path.
1023
964
            path_to_branch = '/' + path_to_branch
1024
965
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1025
 
        t = transport.get_transport(url)
 
966
        t = get_transport(url)
1026
967
        self.permit_url(t.base)
1027
968
        t.mkdir('foo')
1028
969
 
1039
980
        # And the rest are threads
1040
981
        for t in started[1:]:
1041
982
            t.join()
1042
 
 
1043
 
 
1044
 
class TestUnhtml(tests.TestCase):
1045
 
 
1046
 
    """Tests for unhtml_roughly"""
1047
 
 
1048
 
    def test_truncation(self):
1049
 
        fake_html = "<p>something!\n" * 1000
1050
 
        result = http.unhtml_roughly(fake_html)
1051
 
        self.assertEquals(len(result), 1000)
1052
 
        self.assertStartsWith(result, " something!")
1053
 
 
1054
 
 
1055
 
class SomeDirectory(object):
1056
 
 
1057
 
    def look_up(self, name, url):
1058
 
        return "http://bar"
1059
 
 
1060
 
 
1061
 
class TestLocationToUrl(tests.TestCase):
1062
 
 
1063
 
    def get_base_location(self):
1064
 
        path = osutils.abspath('/foo/bar')
1065
 
        if path.startswith('/'):
1066
 
            url = 'file://%s' % (path,)
1067
 
        else:
1068
 
            # On Windows, abspaths start with the drive letter, so we have to
1069
 
            # add in the extra '/'
1070
 
            url = 'file:///%s' % (path,)
1071
 
        return path, url
1072
 
 
1073
 
    def test_regular_url(self):
1074
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1075
 
 
1076
 
    def test_directory(self):
1077
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1078
 
        self.addCleanup(directories.remove, "bar:")
1079
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1080
 
 
1081
 
    def test_unicode_url(self):
1082
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1083
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1084
 
 
1085
 
    def test_unicode_path(self):
1086
 
        path, url = self.get_base_location()
1087
 
        location = path + "\xc3\xaf".decode("utf-8")
1088
 
        url += '%C3%AF'
1089
 
        self.assertEquals(url, location_to_url(location))
1090
 
 
1091
 
    def test_path(self):
1092
 
        path, url = self.get_base_location()
1093
 
        self.assertEquals(url, location_to_url(path))
1094
 
 
1095
 
    def test_relative_file_url(self):
1096
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1097
 
            location_to_url("file:bar"))
1098
 
 
1099
 
    def test_absolute_file_url(self):
1100
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))