~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Andrew Bennetts
  • Date: 2009-04-02 05:53:12 UTC
  • mto: This revision was merged to the branch mainline in revision 4242.
  • Revision ID: andrew.bennetts@canonical.com-20090402055312-h7mvgumvm7e620mj
Fix nits in spelling and naming.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import os
20
 
import subprocess
21
 
import sys
22
 
import threading
23
19
 
 
20
import bzrlib
24
21
from bzrlib import (
25
22
    errors,
26
23
    osutils,
27
 
    tests,
28
 
    transport,
29
24
    urlutils,
30
25
    )
31
 
from bzrlib.directory_service import directories
32
 
from bzrlib.transport import (
33
 
    chroot,
34
 
    fakenfs,
35
 
    http,
36
 
    local,
37
 
    location_to_url,
38
 
    memory,
39
 
    pathfilter,
40
 
    readonly,
41
 
    )
42
 
import bzrlib.transport.trace
43
 
from bzrlib.tests import (
44
 
    features,
45
 
    test_server,
46
 
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
47
51
 
48
52
 
49
53
# TODO: Should possibly split transport-specific tests into their own files.
50
54
 
51
55
 
52
 
class TestTransport(tests.TestCase):
 
56
class TestTransport(TestCase):
53
57
    """Test the non transport-concrete class functionality."""
54
58
 
55
59
    def test__get_set_protocol_handlers(self):
56
 
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        handlers = _get_protocol_handlers()
 
61
        self.assertNotEqual([], handlers.keys( ))
 
62
        try:
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
61
67
 
62
68
    def test_get_transport_modules(self):
63
 
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
69
        handlers = _get_protocol_handlers()
65
70
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
67
 
 
 
71
        _clear_protocol_handlers()
68
72
        class SampleHandler(object):
69
73
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
 
85
        finally:
 
86
            _set_protocol_handlers(handlers)
83
87
 
84
88
    def test_transport_dependency(self):
85
89
        """Transport with missing dependency causes no error"""
86
 
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
90
        saved_handlers = _get_protocol_handlers()
88
91
        # don't pollute the current handlers
89
 
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
92
        _clear_protocol_handlers()
93
93
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
103
110
 
104
111
    def test_transport_fallback(self):
105
112
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
116
125
 
117
126
    def test_ssh_hints(self):
118
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
128
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol, e:
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
122
131
            e_str = str(e)
123
132
            self.assertEquals('Unsupported protocol'
124
133
                              ' for url "ssh://fooserver/foo":'
125
 
                              ' bzr supports bzr+ssh to operate over ssh,'
126
 
                              ' use "bzr+ssh://fooserver/foo".',
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
127
135
                              str(e))
128
136
        else:
129
137
            self.fail('Did not raise UnsupportedProtocol')
130
138
 
131
139
    def test_LateReadError(self):
132
140
        """The LateReadError helper should raise on read()."""
133
 
        a_file = transport.LateReadError('a path')
 
141
        a_file = LateReadError('a path')
134
142
        try:
135
143
            a_file.read()
136
 
        except errors.ReadError, error:
 
144
        except ReadError, error:
137
145
            self.assertEqual('a path', error.path)
138
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
139
147
        a_file.close()
140
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
141
160
    def test_local_abspath_non_local_transport(self):
142
161
        # the base implementation should throw
143
 
        t = memory.MemoryTransport()
 
162
        t = MemoryTransport()
144
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
145
164
        self.assertEqual('memory:///t is not a local path.', str(e))
146
165
 
147
166
 
148
 
class TestCoalesceOffsets(tests.TestCase):
 
167
class TestCoalesceOffsets(TestCase):
149
168
 
150
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
151
 
        coalesce = transport.Transport._coalesce_offsets
152
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
170
        coalesce = Transport._coalesce_offsets
 
171
        exp = [_CoalescedOffset(*x) for x in expected]
153
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
154
173
                            max_size=max_size))
155
174
        self.assertEqual(exp, out)
200
219
 
201
220
    def test_coalesce_fudge(self):
202
221
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
222
                    (100, 10, [(0, 10),]),
204
223
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
 
224
                   fudge=10
 
225
                  )
207
226
    def test_coalesce_max_size(self):
208
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
228
                    (30, 50, [(0, 50)]),
210
229
                    # If one range is above max_size, it gets its own coalesced
211
230
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
 
231
                    (100, 80, [(0, 80),]),],
213
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
 
233
                   max_size=50
 
234
                  )
215
235
 
216
236
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
219
239
                  )
220
240
 
221
241
    def test_coalesce_default_limit(self):
222
242
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
225
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
229
249
                   max_size=1*1024*1024*1024)
230
250
 
231
251
 
232
 
class TestMemoryServer(tests.TestCase):
233
 
 
234
 
    def test_create_server(self):
235
 
        server = memory.MemoryServer()
236
 
        server.start_server()
237
 
        url = server.get_url()
238
 
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
240
 
        del t
241
 
        server.stop_server()
242
 
        self.assertFalse(url in transport.transport_list_registry)
243
 
        self.assertRaises(errors.UnsupportedProtocol,
244
 
                          transport.get_transport, url)
245
 
 
246
 
 
247
 
class TestMemoryTransport(tests.TestCase):
 
252
class TestMemoryTransport(TestCase):
248
253
 
249
254
    def test_get_transport(self):
250
 
        memory.MemoryTransport()
 
255
        MemoryTransport()
251
256
 
252
257
    def test_clone(self):
253
 
        t = memory.MemoryTransport()
254
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
255
 
        self.assertEqual("memory:///", t.clone("/").base)
 
258
        transport = MemoryTransport()
 
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
256
261
 
257
262
    def test_abspath(self):
258
 
        t = memory.MemoryTransport()
259
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
263
        transport = MemoryTransport()
 
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
260
265
 
261
266
    def test_abspath_of_root(self):
262
 
        t = memory.MemoryTransport()
263
 
        self.assertEqual("memory:///", t.base)
264
 
        self.assertEqual("memory:///", t.abspath('/'))
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
265
270
 
266
271
    def test_abspath_of_relpath_starting_at_root(self):
267
 
        t = memory.MemoryTransport()
268
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
269
274
 
270
275
    def test_append_and_get(self):
271
 
        t = memory.MemoryTransport()
272
 
        t.append_bytes('path', 'content')
273
 
        self.assertEqual(t.get('path').read(), 'content')
274
 
        t.append_file('path', StringIO('content'))
275
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
276
        transport = MemoryTransport()
 
277
        transport.append_bytes('path', 'content')
 
278
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        transport.append_file('path', StringIO('content'))
 
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
276
281
 
277
282
    def test_put_and_get(self):
278
 
        t = memory.MemoryTransport()
279
 
        t.put_file('path', StringIO('content'))
280
 
        self.assertEqual(t.get('path').read(), 'content')
281
 
        t.put_bytes('path', 'content')
282
 
        self.assertEqual(t.get('path').read(), 'content')
 
283
        transport = MemoryTransport()
 
284
        transport.put_file('path', StringIO('content'))
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.put_bytes('path', 'content')
 
287
        self.assertEqual(transport.get('path').read(), 'content')
283
288
 
284
289
    def test_append_without_dir_fails(self):
285
 
        t = memory.MemoryTransport()
286
 
        self.assertRaises(errors.NoSuchFile,
287
 
                          t.append_bytes, 'dir/path', 'content')
 
290
        transport = MemoryTransport()
 
291
        self.assertRaises(NoSuchFile,
 
292
                          transport.append_bytes, 'dir/path', 'content')
288
293
 
289
294
    def test_put_without_dir_fails(self):
290
 
        t = memory.MemoryTransport()
291
 
        self.assertRaises(errors.NoSuchFile,
292
 
                          t.put_file, 'dir/path', StringIO('content'))
 
295
        transport = MemoryTransport()
 
296
        self.assertRaises(NoSuchFile,
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
293
298
 
294
299
    def test_get_missing(self):
295
 
        transport = memory.MemoryTransport()
296
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
300
        transport = MemoryTransport()
 
301
        self.assertRaises(NoSuchFile, transport.get, 'foo')
297
302
 
298
303
    def test_has_missing(self):
299
 
        t = memory.MemoryTransport()
300
 
        self.assertEquals(False, t.has('foo'))
 
304
        transport = MemoryTransport()
 
305
        self.assertEquals(False, transport.has('foo'))
301
306
 
302
307
    def test_has_present(self):
303
 
        t = memory.MemoryTransport()
304
 
        t.append_bytes('foo', 'content')
305
 
        self.assertEquals(True, t.has('foo'))
 
308
        transport = MemoryTransport()
 
309
        transport.append_bytes('foo', 'content')
 
310
        self.assertEquals(True, transport.has('foo'))
306
311
 
307
312
    def test_list_dir(self):
308
 
        t = memory.MemoryTransport()
309
 
        t.put_bytes('foo', 'content')
310
 
        t.mkdir('dir')
311
 
        t.put_bytes('dir/subfoo', 'content')
312
 
        t.put_bytes('dirlike', 'content')
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
313
318
 
314
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
315
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
316
321
 
317
322
    def test_mkdir(self):
318
 
        t = memory.MemoryTransport()
319
 
        t.mkdir('dir')
320
 
        t.append_bytes('dir/path', 'content')
321
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
323
        transport = MemoryTransport()
 
324
        transport.mkdir('dir')
 
325
        transport.append_bytes('dir/path', 'content')
 
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
322
327
 
323
328
    def test_mkdir_missing_parent(self):
324
 
        t = memory.MemoryTransport()
325
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
329
        transport = MemoryTransport()
 
330
        self.assertRaises(NoSuchFile,
 
331
                          transport.mkdir, 'dir/dir')
326
332
 
327
333
    def test_mkdir_twice(self):
328
 
        t = memory.MemoryTransport()
329
 
        t.mkdir('dir')
330
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
334
        transport = MemoryTransport()
 
335
        transport.mkdir('dir')
 
336
        self.assertRaises(FileExists, transport.mkdir, 'dir')
331
337
 
332
338
    def test_parameters(self):
333
 
        t = memory.MemoryTransport()
334
 
        self.assertEqual(True, t.listable())
335
 
        self.assertEqual(False, t.is_readonly())
 
339
        transport = MemoryTransport()
 
340
        self.assertEqual(True, transport.listable())
 
341
        self.assertEqual(False, transport.is_readonly())
336
342
 
337
343
    def test_iter_files_recursive(self):
338
 
        t = memory.MemoryTransport()
339
 
        t.mkdir('dir')
340
 
        t.put_bytes('dir/foo', 'content')
341
 
        t.put_bytes('dir/bar', 'content')
342
 
        t.put_bytes('bar', 'content')
343
 
        paths = set(t.iter_files_recursive())
 
344
        transport = MemoryTransport()
 
345
        transport.mkdir('dir')
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
 
349
        paths = set(transport.iter_files_recursive())
344
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
345
351
 
346
352
    def test_stat(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.put_bytes('foo', 'content')
349
 
        t.put_bytes('bar', 'phowar')
350
 
        self.assertEqual(7, t.stat('foo').st_size)
351
 
        self.assertEqual(6, t.stat('bar').st_size)
352
 
 
353
 
 
354
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
353
        transport = MemoryTransport()
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
 
356
        self.assertEqual(7, transport.stat('foo').st_size)
 
357
        self.assertEqual(6, transport.stat('bar').st_size)
 
358
 
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
355
361
    """Chroot decoration specific tests."""
356
362
 
357
363
    def test_abspath(self):
358
364
        # The abspath is always relative to the chroot_url.
359
 
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
361
 
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
363
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        server.setUp()
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
364
369
 
365
 
        subdir_t = t.clone('subdir')
366
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
        server.tearDown()
367
373
 
368
374
    def test_clone(self):
369
 
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
371
 
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
375
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
376
        server.setUp()
 
377
        transport = get_transport(server.get_url())
373
378
        # relpath from root and root path are the same
374
 
        relpath_cloned = t.clone('foo')
375
 
        abspath_cloned = t.clone('/foo')
 
379
        relpath_cloned = transport.clone('foo')
 
380
        abspath_cloned = transport.clone('/foo')
376
381
        self.assertEqual(server, relpath_cloned.server)
377
382
        self.assertEqual(server, abspath_cloned.server)
 
383
        server.tearDown()
378
384
 
379
385
    def test_chroot_url_preserves_chroot(self):
380
386
        """Calling get_transport on a chroot transport's base should produce a
384
390
        This is so that it is not possible to escape a chroot by doing::
385
391
            url = chroot_transport.base
386
392
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
393
            new_transport = get_transport(parent_url)
388
394
        """
389
 
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
391
 
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
394
 
        self.assertEqual(t.server, new_t.server)
395
 
        self.assertEqual(t.base, new_t.base)
 
395
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
396
        server.setUp()
 
397
        transport = get_transport(server.get_url())
 
398
        new_transport = get_transport(transport.base)
 
399
        self.assertEqual(transport.server, new_transport.server)
 
400
        self.assertEqual(transport.base, new_transport.base)
 
401
        server.tearDown()
396
402
 
397
403
    def test_urljoin_preserves_chroot(self):
398
404
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
401
407
        This is so that it is not possible to escape a chroot by doing::
402
408
            url = chroot_transport.base
403
409
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
410
            new_transport = get_transport(parent_url)
405
411
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
408
 
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
412
        server = ChrootServer(get_transport('memory:///path/'))
 
413
        server.setUp()
 
414
        transport = get_transport(server.get_url())
410
415
        self.assertRaises(
411
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
 
 
413
 
 
414
 
class TestChrootServer(tests.TestCase):
 
416
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
417
        server.tearDown()
 
418
 
 
419
 
 
420
class ChrootServerTest(TestCase):
415
421
 
416
422
    def test_construct(self):
417
 
        backing_transport = memory.MemoryTransport()
418
 
        server = chroot.ChrootServer(backing_transport)
 
423
        backing_transport = MemoryTransport()
 
424
        server = ChrootServer(backing_transport)
419
425
        self.assertEqual(backing_transport, server.backing_transport)
420
426
 
421
427
    def test_setUp(self):
422
 
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
424
 
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
428
        backing_transport = MemoryTransport()
 
429
        server = ChrootServer(backing_transport)
 
430
        server.setUp()
 
431
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
428
432
 
429
 
    def test_stop_server(self):
430
 
        backing_transport = memory.MemoryTransport()
431
 
        server = chroot.ChrootServer(backing_transport)
432
 
        server.start_server()
433
 
        server.stop_server()
434
 
        self.assertFalse(server.scheme
435
 
                         in transport._get_protocol_handlers().keys())
 
433
    def test_tearDown(self):
 
434
        backing_transport = MemoryTransport()
 
435
        server = ChrootServer(backing_transport)
 
436
        server.setUp()
 
437
        server.tearDown()
 
438
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
436
439
 
437
440
    def test_get_url(self):
438
 
        backing_transport = memory.MemoryTransport()
439
 
        server = chroot.ChrootServer(backing_transport)
440
 
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
 
441
        backing_transport = MemoryTransport()
 
442
        server = ChrootServer(backing_transport)
 
443
        server.setUp()
442
444
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
443
 
 
444
 
 
445
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
446
 
    """Pathfilter decoration specific tests."""
447
 
 
448
 
    def test_abspath(self):
449
 
        # The abspath is always relative to the base of the backing transport.
450
 
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
452
 
            lambda x: x)
453
 
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
455
 
        self.assertEqual(server.get_url(), t.abspath('/'))
456
 
 
457
 
        subdir_t = t.clone('subdir')
458
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
459
 
        server.stop_server()
460
 
 
461
 
    def make_pf_transport(self, filter_func=None):
462
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
464
 
        :param filter_func: by default this will be a no-op function.  Use this
465
 
            parameter to override it."""
466
 
        if filter_func is None:
467
 
            filter_func = lambda x: x
468
 
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
470
 
        server.start_server()
471
 
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
473
 
 
474
 
    def test__filter(self):
475
 
        # _filter (with an identity func as filter_func) always returns
476
 
        # paths relative to the base of the backing transport.
477
 
        t = self.make_pf_transport()
478
 
        self.assertEqual('foo', t._filter('foo'))
479
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
480
 
        self.assertEqual('', t._filter('..'))
481
 
        self.assertEqual('', t._filter('/'))
482
 
        # The base of the pathfiltering transport is taken into account too.
483
 
        t = t.clone('subdir1/subdir2')
484
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
485
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
486
 
        self.assertEqual('subdir1', t._filter('..'))
487
 
        self.assertEqual('', t._filter('/'))
488
 
 
489
 
    def test_filter_invocation(self):
490
 
        filter_log = []
491
 
 
492
 
        def filter(path):
493
 
            filter_log.append(path)
494
 
            return path
495
 
        t = self.make_pf_transport(filter)
496
 
        t.has('abc')
497
 
        self.assertEqual(['abc'], filter_log)
498
 
        del filter_log[:]
499
 
        t.clone('abc').has('xyz')
500
 
        self.assertEqual(['abc/xyz'], filter_log)
501
 
        del filter_log[:]
502
 
        t.has('/abc')
503
 
        self.assertEqual(['abc'], filter_log)
504
 
 
505
 
    def test_clone(self):
506
 
        t = self.make_pf_transport()
507
 
        # relpath from root and root path are the same
508
 
        relpath_cloned = t.clone('foo')
509
 
        abspath_cloned = t.clone('/foo')
510
 
        self.assertEqual(t.server, relpath_cloned.server)
511
 
        self.assertEqual(t.server, abspath_cloned.server)
512
 
 
513
 
    def test_url_preserves_pathfiltering(self):
514
 
        """Calling get_transport on a pathfiltered transport's base should
515
 
        produce a transport with exactly the same behaviour as the original
516
 
        pathfiltered transport.
517
 
 
518
 
        This is so that it is not possible to escape (accidentally or
519
 
        otherwise) the filtering by doing::
520
 
            url = filtered_transport.base
521
 
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
523
 
        """
524
 
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
526
 
        self.assertEqual(t.server, new_t.server)
527
 
        self.assertEqual(t.base, new_t.base)
528
 
 
529
 
 
530
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
445
        server.tearDown()
 
446
 
 
447
 
 
448
class ReadonlyDecoratorTransportTest(TestCase):
531
449
    """Readonly decoration specific tests."""
532
450
 
533
451
    def test_local_parameters(self):
 
452
        import bzrlib.transport.readonly as readonly
534
453
        # connect to . in readonly mode
535
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
536
 
        self.assertEqual(True, t.listable())
537
 
        self.assertEqual(True, t.is_readonly())
 
454
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
455
        self.assertEqual(True, transport.listable())
 
456
        self.assertEqual(True, transport.is_readonly())
538
457
 
539
458
    def test_http_parameters(self):
540
459
        from bzrlib.tests.http_server import HttpServer
 
460
        import bzrlib.transport.readonly as readonly
541
461
        # connect to '.' via http which is not listable
542
462
        server = HttpServer()
543
 
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
546
 
        self.assertEqual(False, t.listable())
547
 
        self.assertEqual(True, t.is_readonly())
548
 
 
549
 
 
550
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
463
        server.setUp()
 
464
        try:
 
465
            transport = get_transport('readonly+' + server.get_url())
 
466
            self.failUnless(isinstance(transport,
 
467
                                       readonly.ReadonlyTransportDecorator))
 
468
            self.assertEqual(False, transport.listable())
 
469
            self.assertEqual(True, transport.is_readonly())
 
470
        finally:
 
471
            server.tearDown()
 
472
 
 
473
 
 
474
class FakeNFSDecoratorTests(TestCaseInTempDir):
551
475
    """NFS decorator specific tests."""
552
476
 
553
477
    def get_nfs_transport(self, url):
 
478
        import bzrlib.transport.fakenfs as fakenfs
554
479
        # connect to url with nfs decoration
555
480
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
556
481
 
557
482
    def test_local_parameters(self):
558
483
        # the listable and is_readonly parameters
559
484
        # are not changed by the fakenfs decorator
560
 
        t = self.get_nfs_transport('.')
561
 
        self.assertEqual(True, t.listable())
562
 
        self.assertEqual(False, t.is_readonly())
 
485
        transport = self.get_nfs_transport('.')
 
486
        self.assertEqual(True, transport.listable())
 
487
        self.assertEqual(False, transport.is_readonly())
563
488
 
564
489
    def test_http_parameters(self):
565
490
        # the listable and is_readonly parameters
567
492
        from bzrlib.tests.http_server import HttpServer
568
493
        # connect to '.' via http which is not listable
569
494
        server = HttpServer()
570
 
        self.start_server(server)
571
 
        t = self.get_nfs_transport(server.get_url())
572
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
573
 
        self.assertEqual(False, t.listable())
574
 
        self.assertEqual(True, t.is_readonly())
 
495
        server.setUp()
 
496
        try:
 
497
            transport = self.get_nfs_transport(server.get_url())
 
498
            self.assertIsInstance(
 
499
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
500
            self.assertEqual(False, transport.listable())
 
501
            self.assertEqual(True, transport.is_readonly())
 
502
        finally:
 
503
            server.tearDown()
575
504
 
576
505
    def test_fakenfs_server_default(self):
577
506
        # a FakeNFSServer() should bring up a local relpath server for itself
578
 
        server = test_server.FakeNFSServer()
579
 
        self.start_server(server)
580
 
        # the url should be decorated appropriately
581
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
 
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
584
 
        # which must be a FakeNFSTransportDecorator instance.
585
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
507
        import bzrlib.transport.fakenfs as fakenfs
 
508
        server = fakenfs.FakeNFSServer()
 
509
        server.setUp()
 
510
        try:
 
511
            # the url should be decorated appropriately
 
512
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
513
            # and we should be able to get a transport for it
 
514
            transport = get_transport(server.get_url())
 
515
            # which must be a FakeNFSTransportDecorator instance.
 
516
            self.assertIsInstance(
 
517
                transport, fakenfs.FakeNFSTransportDecorator)
 
518
        finally:
 
519
            server.tearDown()
586
520
 
587
521
    def test_fakenfs_rename_semantics(self):
588
522
        # a FakeNFS transport must mangle the way rename errors occur to
589
523
        # look like NFS problems.
590
 
        t = self.get_nfs_transport('.')
 
524
        transport = self.get_nfs_transport('.')
591
525
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
592
 
                        transport=t)
593
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
594
 
 
595
 
 
596
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
526
                        transport=transport)
 
527
        self.assertRaises(errors.ResourceBusy,
 
528
                          transport.rename, 'from', 'to')
 
529
 
 
530
 
 
531
class FakeVFATDecoratorTests(TestCaseInTempDir):
597
532
    """Tests for simulation of VFAT restrictions"""
598
533
 
599
534
    def get_vfat_transport(self, url):
603
538
 
604
539
    def test_transport_creation(self):
605
540
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
606
 
        t = self.get_vfat_transport('.')
607
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
541
        transport = self.get_vfat_transport('.')
 
542
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
608
543
 
609
544
    def test_transport_mkdir(self):
610
 
        t = self.get_vfat_transport('.')
611
 
        t.mkdir('HELLO')
612
 
        self.assertTrue(t.has('hello'))
613
 
        self.assertTrue(t.has('Hello'))
 
545
        transport = self.get_vfat_transport('.')
 
546
        transport.mkdir('HELLO')
 
547
        self.assertTrue(transport.has('hello'))
 
548
        self.assertTrue(transport.has('Hello'))
614
549
 
615
550
    def test_forbidden_chars(self):
616
 
        t = self.get_vfat_transport('.')
617
 
        self.assertRaises(ValueError, t.has, "<NU>")
618
 
 
619
 
 
620
 
class BadTransportHandler(transport.Transport):
 
551
        transport = self.get_vfat_transport('.')
 
552
        self.assertRaises(ValueError, transport.has, "<NU>")
 
553
 
 
554
 
 
555
class BadTransportHandler(Transport):
621
556
    def __init__(self, base_url):
622
 
        raise errors.DependencyNotPresent('some_lib',
623
 
                                          'testing missing dependency')
624
 
 
625
 
 
626
 
class BackupTransportHandler(transport.Transport):
 
557
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
558
 
 
559
 
 
560
class BackupTransportHandler(Transport):
627
561
    """Test transport that works as a backup for the BadTransportHandler"""
628
562
    pass
629
563
 
630
564
 
631
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
565
class TestTransportImplementation(TestCaseInTempDir):
632
566
    """Implementation verification for transports.
633
567
 
634
568
    To verify a transport we need a server factory, which is a callable
653
587
    def setUp(self):
654
588
        super(TestTransportImplementation, self).setUp()
655
589
        self._server = self.transport_server()
656
 
        self.start_server(self._server)
 
590
        self._server.setUp()
 
591
        self.addCleanup(self._server.tearDown)
657
592
 
658
593
    def get_transport(self, relpath=None):
659
594
        """Return a connected transport to the local directory.
663
598
        base_url = self._server.get_url()
664
599
        url = self._adjust_url(base_url, relpath)
665
600
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
 
601
        t = get_transport(url)
667
602
        # vila--20070607 if the following are commented out the test suite
668
603
        # still pass. Is this really still needed or was it a forgotten
669
604
        # temporary fix ?
674
609
        return t
675
610
 
676
611
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
 
class TestLocalTransports(tests.TestCase):
 
612
class TestLocalTransports(TestCase):
715
613
 
716
614
    def test_get_transport_from_abspath(self):
717
615
        here = osutils.abspath('.')
718
 
        t = transport.get_transport(here)
719
 
        self.assertIsInstance(t, local.LocalTransport)
 
616
        t = get_transport(here)
 
617
        self.assertIsInstance(t, LocalTransport)
720
618
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
721
619
 
722
620
    def test_get_transport_from_relpath(self):
723
621
        here = osutils.abspath('.')
724
 
        t = transport.get_transport('.')
725
 
        self.assertIsInstance(t, local.LocalTransport)
 
622
        t = get_transport('.')
 
623
        self.assertIsInstance(t, LocalTransport)
726
624
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
727
625
 
728
626
    def test_get_transport_from_local_url(self):
729
627
        here = osutils.abspath('.')
730
628
        here_url = urlutils.local_path_to_url(here) + '/'
731
 
        t = transport.get_transport(here_url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
 
629
        t = get_transport(here_url)
 
630
        self.assertIsInstance(t, LocalTransport)
733
631
        self.assertEquals(t.base, here_url)
734
632
 
735
633
    def test_local_abspath(self):
736
634
        here = osutils.abspath('.')
737
 
        t = transport.get_transport(here)
 
635
        t = get_transport(here)
738
636
        self.assertEquals(t.local_abspath(''), here)
739
637
 
740
638
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
 
764
 
class TestWin32LocalTransport(tests.TestCase):
 
639
class TestWin32LocalTransport(TestCase):
765
640
 
766
641
    def test_unc_clone_to_root(self):
767
642
        # Win32 UNC path like \\HOST\path
768
643
        # clone to root should stop at least at \\HOST part
769
644
        # not on \\
770
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
645
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
771
646
        for i in xrange(4):
772
647
            t = t.clone('..')
773
648
        self.assertEquals(t.base, 'file://HOST/')
776
651
        self.assertEquals(t.base, 'file://HOST/')
777
652
 
778
653
 
779
 
class TestConnectedTransport(tests.TestCase):
 
654
class TestConnectedTransport(TestCase):
780
655
    """Tests for connected to remote server transports"""
781
656
 
782
657
    def test_parse_url(self):
783
 
        t = transport.ConnectedTransport(
784
 
            'http://simple.example.com/home/source')
785
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
786
 
        self.assertEquals(t._parsed_url.port, None)
787
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
788
 
        self.assertTrue(t._parsed_url.user is None)
789
 
        self.assertTrue(t._parsed_url.password is None)
 
658
        t = ConnectedTransport('http://simple.example.com/home/source')
 
659
        self.assertEquals(t._host, 'simple.example.com')
 
660
        self.assertEquals(t._port, None)
 
661
        self.assertEquals(t._path, '/home/source/')
 
662
        self.failUnless(t._user is None)
 
663
        self.failUnless(t._password is None)
790
664
 
791
665
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
792
666
 
793
667
    def test_parse_url_with_at_in_user(self):
794
668
        # Bug 228058
795
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
796
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
669
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
670
        self.assertEquals(t._user, 'user@host.com')
797
671
 
798
672
    def test_parse_quoted_url(self):
799
 
        t = transport.ConnectedTransport(
800
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
801
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
802
 
        self.assertEquals(t._parsed_url.port, 2222)
803
 
        self.assertEquals(t._parsed_url.user, 'robey')
804
 
        self.assertEquals(t._parsed_url.password, 'h@t')
805
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
673
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
674
        self.assertEquals(t._host, 'exAmple.com')
 
675
        self.assertEquals(t._port, 2222)
 
676
        self.assertEquals(t._user, 'robey')
 
677
        self.assertEquals(t._password, 'h@t')
 
678
        self.assertEquals(t._path, '/path/')
806
679
 
807
680
        # Base should not keep track of the password
808
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
681
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
809
682
 
810
683
    def test_parse_invalid_url(self):
811
684
        self.assertRaises(errors.InvalidURL,
812
 
                          transport.ConnectedTransport,
 
685
                          ConnectedTransport,
813
686
                          'sftp://lily.org:~janneke/public/bzr/gub')
814
687
 
815
688
    def test_relpath(self):
816
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
689
        t = ConnectedTransport('sftp://user@host.com/abs/path')
817
690
 
818
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
819
 
            'sub')
 
691
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
820
692
        self.assertRaises(errors.PathNotChild, t.relpath,
821
693
                          'http://user@host.com/abs/path/sub')
822
694
        self.assertRaises(errors.PathNotChild, t.relpath,
826
698
        self.assertRaises(errors.PathNotChild, t.relpath,
827
699
                          'sftp://user@host.com:33/abs/path/sub')
828
700
        # Make sure it works when we don't supply a username
829
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
701
        t = ConnectedTransport('sftp://host.com/abs/path')
830
702
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
831
703
 
832
704
        # Make sure it works when parts of the path will be url encoded
833
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
705
        t = ConnectedTransport('sftp://host.com/dev/%path')
834
706
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
835
707
 
836
708
    def test_connection_sharing_propagate_credentials(self):
837
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
838
 
        self.assertEquals('user', t._parsed_url.user)
839
 
        self.assertEquals('host.com', t._parsed_url.host)
 
709
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
710
        self.assertEquals('user', t._user)
 
711
        self.assertEquals('host.com', t._host)
840
712
        self.assertIs(None, t._get_connection())
841
 
        self.assertIs(None, t._parsed_url.password)
 
713
        self.assertIs(None, t._password)
842
714
        c = t.clone('subdir')
843
715
        self.assertIs(None, c._get_connection())
844
 
        self.assertIs(None, t._parsed_url.password)
 
716
        self.assertIs(None, t._password)
845
717
 
846
718
        # Simulate the user entering a password
847
719
        password = 'secret'
861
733
        self.assertIs(new_password, c._get_credentials())
862
734
 
863
735
 
864
 
class TestReusedTransports(tests.TestCase):
 
736
class TestReusedTransports(TestCase):
865
737
    """Tests for transport reuse"""
866
738
 
867
739
    def test_reuse_same_transport(self):
868
740
        possible_transports = []
869
 
        t1 = transport.get_transport_from_url('http://foo/',
870
 
                                     possible_transports=possible_transports)
 
741
        t1 = get_transport('http://foo/',
 
742
                           possible_transports=possible_transports)
871
743
        self.assertEqual([t1], possible_transports)
872
 
        t2 = transport.get_transport_from_url('http://foo/',
873
 
                                     possible_transports=[t1])
 
744
        t2 = get_transport('http://foo/', possible_transports=[t1])
874
745
        self.assertIs(t1, t2)
875
746
 
876
747
        # Also check that final '/' are handled correctly
877
 
        t3 = transport.get_transport_from_url('http://foo/path/')
878
 
        t4 = transport.get_transport_from_url('http://foo/path',
879
 
                                     possible_transports=[t3])
 
748
        t3 = get_transport('http://foo/path/')
 
749
        t4 = get_transport('http://foo/path', possible_transports=[t3])
880
750
        self.assertIs(t3, t4)
881
751
 
882
 
        t5 = transport.get_transport_from_url('http://foo/path')
883
 
        t6 = transport.get_transport_from_url('http://foo/path/',
884
 
                                     possible_transports=[t5])
 
752
        t5 = get_transport('http://foo/path')
 
753
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
885
754
        self.assertIs(t5, t6)
886
755
 
887
756
    def test_don_t_reuse_different_transport(self):
888
 
        t1 = transport.get_transport_from_url('http://foo/path')
889
 
        t2 = transport.get_transport_from_url('http://bar/path',
890
 
                                     possible_transports=[t1])
 
757
        t1 = get_transport('http://foo/path')
 
758
        t2 = get_transport('http://bar/path', possible_transports=[t1])
891
759
        self.assertIsNot(t1, t2)
892
760
 
893
761
 
894
 
class TestTransportTrace(tests.TestCase):
 
762
class TestTransportTrace(TestCase):
895
763
 
896
 
    def test_decorator(self):
897
 
        t = transport.get_transport_from_url('trace+memory://')
 
764
    def test_get(self):
 
765
        transport = get_transport('trace+memory://')
898
766
        self.assertIsInstance(
899
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
767
            transport, bzrlib.transport.trace.TransportTraceDecorator)
900
768
 
901
769
    def test_clone_preserves_activity(self):
902
 
        t = transport.get_transport_from_url('trace+memory://')
903
 
        t2 = t.clone('.')
904
 
        self.assertTrue(t is not t2)
905
 
        self.assertTrue(t._activity is t2._activity)
 
770
        transport = get_transport('trace+memory://')
 
771
        transport2 = transport.clone('.')
 
772
        self.assertTrue(transport is not transport2)
 
773
        self.assertTrue(transport._activity is transport2._activity)
906
774
 
907
775
    # the following specific tests are for the operations that have made use of
908
776
    # logging in tests; we could test every single operation but doing that
909
777
    # still won't cause a test failure when the top level Transport API
910
778
    # changes; so there is little return doing that.
911
779
    def test_get(self):
912
 
        t = transport.get_transport_from_url('trace+memory:///')
913
 
        t.put_bytes('foo', 'barish')
914
 
        t.get('foo')
 
780
        transport = get_transport('trace+memory:///')
 
781
        transport.put_bytes('foo', 'barish')
 
782
        transport.get('foo')
915
783
        expected_result = []
916
784
        # put_bytes records the bytes, not the content to avoid memory
917
785
        # pressure.
918
786
        expected_result.append(('put_bytes', 'foo', 6, None))
919
787
        # get records the file name only.
920
788
        expected_result.append(('get', 'foo'))
921
 
        self.assertEqual(expected_result, t._activity)
 
789
        self.assertEqual(expected_result, transport._activity)
922
790
 
923
791
    def test_readv(self):
924
 
        t = transport.get_transport_from_url('trace+memory:///')
925
 
        t.put_bytes('foo', 'barish')
926
 
        list(t.readv('foo', [(0, 1), (3, 2)],
927
 
                     adjust_for_latency=True, upper_limit=6))
 
792
        transport = get_transport('trace+memory:///')
 
793
        transport.put_bytes('foo', 'barish')
 
794
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
795
            upper_limit=6))
928
796
        expected_result = []
929
797
        # put_bytes records the bytes, not the content to avoid memory
930
798
        # pressure.
931
799
        expected_result.append(('put_bytes', 'foo', 6, None))
932
800
        # readv records the supplied offset request
933
801
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
934
 
        self.assertEqual(expected_result, t._activity)
935
 
 
936
 
 
937
 
class TestSSHConnections(tests.TestCaseWithTransport):
938
 
 
939
 
    def test_bzr_connect_to_bzr_ssh(self):
940
 
        """get_transport of a bzr+ssh:// behaves correctly.
941
 
 
942
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
943
 
        """
944
 
        # This test actually causes a bzr instance to be invoked, which is very
945
 
        # expensive: it should be the only such test in the test suite.
946
 
        # A reasonable evolution for this would be to simply check inside
947
 
        # check_channel_exec_request that the command is appropriate, and then
948
 
        # satisfy requests in-process.
949
 
        self.requireFeature(features.paramiko)
950
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
951
 
        # override the interface (doesn't change self._vendor).
952
 
        # Note that this does encryption, so can be slow.
953
 
        from bzrlib.tests import stub_sftp
954
 
 
955
 
        # Start an SSH server
956
 
        self.command_executed = []
957
 
        # XXX: This is horrible -- we define a really dumb SSH server that
958
 
        # executes commands, and manage the hooking up of stdin/out/err to the
959
 
        # SSH channel ourselves.  Surely this has already been implemented
960
 
        # elsewhere?
961
 
        started = []
962
 
 
963
 
        class StubSSHServer(stub_sftp.StubServer):
964
 
 
965
 
            test = self
966
 
 
967
 
            def check_channel_exec_request(self, channel, command):
968
 
                self.test.command_executed.append(command)
969
 
                proc = subprocess.Popen(
970
 
                    command, shell=True, stdin=subprocess.PIPE,
971
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
972
 
 
973
 
                # XXX: horribly inefficient, not to mention ugly.
974
 
                # Start a thread for each of stdin/out/err, and relay bytes
975
 
                # from the subprocess to channel and vice versa.
976
 
                def ferry_bytes(read, write, close):
977
 
                    while True:
978
 
                        bytes = read(1)
979
 
                        if bytes == '':
980
 
                            close()
981
 
                            break
982
 
                        write(bytes)
983
 
 
984
 
                file_functions = [
985
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
986
 
                    (proc.stdout.read, channel.sendall, channel.close),
987
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
988
 
                started.append(proc)
989
 
                for read, write, close in file_functions:
990
 
                    t = threading.Thread(
991
 
                        target=ferry_bytes, args=(read, write, close))
992
 
                    t.start()
993
 
                    started.append(t)
994
 
 
995
 
                return True
996
 
 
997
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
998
 
        # We *don't* want to override the default SSH vendor: the detected one
999
 
        # is the one to use.
1000
 
 
1001
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1002
 
        # inherits from SFTPServer which forces the SSH vendor to
1003
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1004
 
        self.start_server(ssh_server)
1005
 
        port = ssh_server.port
1006
 
 
1007
 
        if sys.platform == 'win32':
1008
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1009
 
        else:
1010
 
            bzr_remote_path = self.get_bzr_path()
1011
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1012
 
 
1013
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1014
 
        # variable is used to tell bzr what command to run on the remote end.
1015
 
        path_to_branch = osutils.abspath('.')
1016
 
        if sys.platform == 'win32':
1017
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
1018
 
            # prefix a '/' to get the right path.
1019
 
            path_to_branch = '/' + path_to_branch
1020
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1021
 
        t = transport.get_transport(url)
1022
 
        self.permit_url(t.base)
1023
 
        t.mkdir('foo')
1024
 
 
1025
 
        self.assertEqual(
1026
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1027
 
            self.command_executed)
1028
 
        # Make sure to disconnect, so that the remote process can stop, and we
1029
 
        # can cleanup. Then pause the test until everything is shutdown
1030
 
        t._client._medium.disconnect()
1031
 
        if not started:
1032
 
            return
1033
 
        # First wait for the subprocess
1034
 
        started[0].wait()
1035
 
        # And the rest are threads
1036
 
        for t in started[1:]:
1037
 
            t.join()
1038
 
 
1039
 
 
1040
 
class TestUnhtml(tests.TestCase):
1041
 
 
1042
 
    """Tests for unhtml_roughly"""
1043
 
 
1044
 
    def test_truncation(self):
1045
 
        fake_html = "<p>something!\n" * 1000
1046
 
        result = http.unhtml_roughly(fake_html)
1047
 
        self.assertEquals(len(result), 1000)
1048
 
        self.assertStartsWith(result, " something!")
1049
 
 
1050
 
 
1051
 
class SomeDirectory(object):
1052
 
 
1053
 
    def look_up(self, name, url):
1054
 
        return "http://bar"
1055
 
 
1056
 
 
1057
 
class TestLocationToUrl(tests.TestCase):
1058
 
 
1059
 
    def get_base_location(self):
1060
 
        path = osutils.abspath('/foo/bar')
1061
 
        if path.startswith('/'):
1062
 
            url = 'file://%s' % (path,)
1063
 
        else:
1064
 
            # On Windows, abspaths start with the drive letter, so we have to
1065
 
            # add in the extra '/'
1066
 
            url = 'file:///%s' % (path,)
1067
 
        return path, url
1068
 
 
1069
 
    def test_regular_url(self):
1070
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1071
 
 
1072
 
    def test_directory(self):
1073
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1074
 
        self.addCleanup(directories.remove, "bar:")
1075
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1076
 
 
1077
 
    def test_unicode_url(self):
1078
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1079
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1080
 
 
1081
 
    def test_unicode_path(self):
1082
 
        path, url = self.get_base_location()
1083
 
        location = path + "\xc3\xaf".decode("utf-8")
1084
 
        url += '%C3%AF'
1085
 
        self.assertEquals(url, location_to_url(location))
1086
 
 
1087
 
    def test_path(self):
1088
 
        path, url = self.get_base_location()
1089
 
        self.assertEquals(url, location_to_url(path))
1090
 
 
1091
 
    def test_relative_file_url(self):
1092
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1093
 
            location_to_url("file:bar"))
1094
 
 
1095
 
    def test_absolute_file_url(self):
1096
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))
 
802
        self.assertEqual(expected_result, transport._activity)