~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

(jelmer) Support upgrading between the 2a and development-colo formats.
 (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
 
import bzrlib
21
24
from bzrlib import (
22
25
    errors,
23
26
    osutils,
 
27
    tests,
 
28
    transport,
24
29
    urlutils,
25
30
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
31
from bzrlib.directory_service import directories
 
32
from bzrlib.transport import (
 
33
    chroot,
 
34
    fakenfs,
 
35
    http,
 
36
    local,
 
37
    location_to_url,
 
38
    memory,
 
39
    pathfilter,
 
40
    readonly,
 
41
    )
 
42
import bzrlib.transport.trace
 
43
from bzrlib.tests import (
 
44
    features,
 
45
    test_server,
 
46
    )
51
47
 
52
48
 
53
49
# TODO: Should possibly split transport-specific tests into their own files.
54
50
 
55
51
 
56
 
class TestTransport(TestCase):
 
52
class TestTransport(tests.TestCase):
57
53
    """Test the non transport-concrete class functionality."""
58
54
 
59
55
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
62
 
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
65
 
        finally:
66
 
            _set_protocol_handlers(handlers)
 
56
        handlers = transport._get_protocol_handlers()
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
67
61
 
68
62
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
70
65
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
66
        transport._clear_protocol_handlers()
 
67
 
72
68
        class SampleHandler(object):
73
69
            """I exist, isnt that enough?"""
74
 
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
85
 
        finally:
86
 
            _set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
87
83
 
88
84
    def test_transport_dependency(self):
89
85
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
88
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
93
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
 
103
 
111
104
    def test_transport_fallback(self):
112
105
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
 
106
        saved_handlers = transport._get_protocol_handlers()
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
116
 
 
117
    def test_ssh_hints(self):
 
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
114
119
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
 
121
        except errors.UnsupportedProtocol, e:
 
122
            e_str = str(e)
 
123
            self.assertEquals('Unsupported protocol'
 
124
                              ' for url "ssh://fooserver/foo":'
 
125
                              ' bzr supports bzr+ssh to operate over ssh,'
 
126
                              ' use "bzr+ssh://fooserver/foo".',
 
127
                              str(e))
 
128
        else:
 
129
            self.fail('Did not raise UnsupportedProtocol')
125
130
 
126
131
    def test_LateReadError(self):
127
132
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
 
133
        a_file = transport.LateReadError('a path')
129
134
        try:
130
135
            a_file.read()
131
 
        except ReadError, error:
 
136
        except errors.ReadError, error:
132
137
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
 
138
        self.assertRaises(errors.ReadError, a_file.read, 40)
134
139
        a_file.close()
135
140
 
136
 
    def test__combine_paths(self):
137
 
        t = Transport('/')
138
 
        self.assertEqual('/home/sarah/project/foo',
139
 
                         t._combine_paths('/home/sarah', 'project/foo'))
140
 
        self.assertEqual('/etc',
141
 
                         t._combine_paths('/home/sarah', '../../etc'))
142
 
        self.assertEqual('/etc',
143
 
                         t._combine_paths('/home/sarah', '../../../etc'))
144
 
        self.assertEqual('/etc',
145
 
                         t._combine_paths('/home/sarah', '/etc'))
146
 
 
147
141
    def test_local_abspath_non_local_transport(self):
148
142
        # the base implementation should throw
149
 
        t = MemoryTransport()
 
143
        t = memory.MemoryTransport()
150
144
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
145
        self.assertEqual('memory:///t is not a local path.', str(e))
152
146
 
153
147
 
154
 
class TestCoalesceOffsets(TestCase):
 
148
class TestCoalesceOffsets(tests.TestCase):
155
149
 
156
150
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
157
 
        coalesce = Transport._coalesce_offsets
158
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
151
        coalesce = transport.Transport._coalesce_offsets
 
152
        exp = [transport._CoalescedOffset(*x) for x in expected]
159
153
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
160
154
                            max_size=max_size))
161
155
        self.assertEqual(exp, out)
180
174
        self.check([(0, 20, [(0, 10), (10, 10)])],
181
175
                   [(0, 10), (10, 10)])
182
176
 
183
 
    # XXX: scary, http.readv() can't handle that --vila20071209
184
177
    def test_coalesce_overlapped(self):
185
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
186
 
                   [(0, 10), (5, 10)])
 
178
        self.assertRaises(ValueError,
 
179
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
180
                        [(0, 10), (5, 10)])
187
181
 
188
182
    def test_coalesce_limit(self):
189
183
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
206
200
 
207
201
    def test_coalesce_fudge(self):
208
202
        self.check([(10, 30, [(0, 10), (20, 10)]),
209
 
                    (100, 10, [(0, 10),]),
 
203
                    (100, 10, [(0, 10)]),
210
204
                   ], [(10, 10), (30, 10), (100, 10)],
211
 
                   fudge=10
212
 
                  )
 
205
                   fudge=10)
 
206
 
213
207
    def test_coalesce_max_size(self):
214
208
        self.check([(10, 20, [(0, 10), (10, 10)]),
215
209
                    (30, 50, [(0, 50)]),
216
210
                    # If one range is above max_size, it gets its own coalesced
217
211
                    # offset
218
 
                    (100, 80, [(0, 80),]),],
 
212
                    (100, 80, [(0, 80)]),],
219
213
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
220
 
                   max_size=50
221
 
                  )
 
214
                   max_size=50)
222
215
 
223
216
    def test_coalesce_no_max_size(self):
224
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
217
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
225
218
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
226
219
                  )
227
220
 
228
 
 
229
 
class TestMemoryTransport(TestCase):
 
221
    def test_coalesce_default_limit(self):
 
222
        # By default we use a 100MB max size.
 
223
        ten_mb = 10 * 1024 * 1024
 
224
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
225
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
226
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
227
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
228
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
229
                   max_size=1*1024*1024*1024)
 
230
 
 
231
 
 
232
class TestMemoryServer(tests.TestCase):
 
233
 
 
234
    def test_create_server(self):
 
235
        server = memory.MemoryServer()
 
236
        server.start_server()
 
237
        url = server.get_url()
 
238
        self.assertTrue(url in transport.transport_list_registry)
 
239
        t = transport.get_transport_from_url(url)
 
240
        del t
 
241
        server.stop_server()
 
242
        self.assertFalse(url in transport.transport_list_registry)
 
243
        self.assertRaises(errors.UnsupportedProtocol,
 
244
                          transport.get_transport, url)
 
245
 
 
246
 
 
247
class TestMemoryTransport(tests.TestCase):
230
248
 
231
249
    def test_get_transport(self):
232
 
        MemoryTransport()
 
250
        memory.MemoryTransport()
233
251
 
234
252
    def test_clone(self):
235
 
        transport = MemoryTransport()
236
 
        self.assertTrue(isinstance(transport, MemoryTransport))
237
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
253
        t = memory.MemoryTransport()
 
254
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
255
        self.assertEqual("memory:///", t.clone("/").base)
238
256
 
239
257
    def test_abspath(self):
240
 
        transport = MemoryTransport()
241
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
258
        t = memory.MemoryTransport()
 
259
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
242
260
 
243
261
    def test_abspath_of_root(self):
244
 
        transport = MemoryTransport()
245
 
        self.assertEqual("memory:///", transport.base)
246
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
262
        t = memory.MemoryTransport()
 
263
        self.assertEqual("memory:///", t.base)
 
264
        self.assertEqual("memory:///", t.abspath('/'))
247
265
 
248
266
    def test_abspath_of_relpath_starting_at_root(self):
249
 
        transport = MemoryTransport()
250
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
267
        t = memory.MemoryTransport()
 
268
        self.assertEqual("memory:///foo", t.abspath('/foo'))
251
269
 
252
270
    def test_append_and_get(self):
253
 
        transport = MemoryTransport()
254
 
        transport.append_bytes('path', 'content')
255
 
        self.assertEqual(transport.get('path').read(), 'content')
256
 
        transport.append_file('path', StringIO('content'))
257
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
271
        t = memory.MemoryTransport()
 
272
        t.append_bytes('path', 'content')
 
273
        self.assertEqual(t.get('path').read(), 'content')
 
274
        t.append_file('path', StringIO('content'))
 
275
        self.assertEqual(t.get('path').read(), 'contentcontent')
258
276
 
259
277
    def test_put_and_get(self):
260
 
        transport = MemoryTransport()
261
 
        transport.put_file('path', StringIO('content'))
262
 
        self.assertEqual(transport.get('path').read(), 'content')
263
 
        transport.put_bytes('path', 'content')
264
 
        self.assertEqual(transport.get('path').read(), 'content')
 
278
        t = memory.MemoryTransport()
 
279
        t.put_file('path', StringIO('content'))
 
280
        self.assertEqual(t.get('path').read(), 'content')
 
281
        t.put_bytes('path', 'content')
 
282
        self.assertEqual(t.get('path').read(), 'content')
265
283
 
266
284
    def test_append_without_dir_fails(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertRaises(NoSuchFile,
269
 
                          transport.append_bytes, 'dir/path', 'content')
 
285
        t = memory.MemoryTransport()
 
286
        self.assertRaises(errors.NoSuchFile,
 
287
                          t.append_bytes, 'dir/path', 'content')
270
288
 
271
289
    def test_put_without_dir_fails(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertRaises(NoSuchFile,
274
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
290
        t = memory.MemoryTransport()
 
291
        self.assertRaises(errors.NoSuchFile,
 
292
                          t.put_file, 'dir/path', StringIO('content'))
275
293
 
276
294
    def test_get_missing(self):
277
 
        transport = MemoryTransport()
278
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
295
        transport = memory.MemoryTransport()
 
296
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
279
297
 
280
298
    def test_has_missing(self):
281
 
        transport = MemoryTransport()
282
 
        self.assertEquals(False, transport.has('foo'))
 
299
        t = memory.MemoryTransport()
 
300
        self.assertEquals(False, t.has('foo'))
283
301
 
284
302
    def test_has_present(self):
285
 
        transport = MemoryTransport()
286
 
        transport.append_bytes('foo', 'content')
287
 
        self.assertEquals(True, transport.has('foo'))
 
303
        t = memory.MemoryTransport()
 
304
        t.append_bytes('foo', 'content')
 
305
        self.assertEquals(True, t.has('foo'))
288
306
 
289
307
    def test_list_dir(self):
290
 
        transport = MemoryTransport()
291
 
        transport.put_bytes('foo', 'content')
292
 
        transport.mkdir('dir')
293
 
        transport.put_bytes('dir/subfoo', 'content')
294
 
        transport.put_bytes('dirlike', 'content')
 
308
        t = memory.MemoryTransport()
 
309
        t.put_bytes('foo', 'content')
 
310
        t.mkdir('dir')
 
311
        t.put_bytes('dir/subfoo', 'content')
 
312
        t.put_bytes('dirlike', 'content')
295
313
 
296
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
297
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
314
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
315
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
298
316
 
299
317
    def test_mkdir(self):
300
 
        transport = MemoryTransport()
301
 
        transport.mkdir('dir')
302
 
        transport.append_bytes('dir/path', 'content')
303
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
318
        t = memory.MemoryTransport()
 
319
        t.mkdir('dir')
 
320
        t.append_bytes('dir/path', 'content')
 
321
        self.assertEqual(t.get('dir/path').read(), 'content')
304
322
 
305
323
    def test_mkdir_missing_parent(self):
306
 
        transport = MemoryTransport()
307
 
        self.assertRaises(NoSuchFile,
308
 
                          transport.mkdir, 'dir/dir')
 
324
        t = memory.MemoryTransport()
 
325
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
309
326
 
310
327
    def test_mkdir_twice(self):
311
 
        transport = MemoryTransport()
312
 
        transport.mkdir('dir')
313
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
328
        t = memory.MemoryTransport()
 
329
        t.mkdir('dir')
 
330
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
314
331
 
315
332
    def test_parameters(self):
316
 
        transport = MemoryTransport()
317
 
        self.assertEqual(True, transport.listable())
318
 
        self.assertEqual(False, transport.is_readonly())
 
333
        t = memory.MemoryTransport()
 
334
        self.assertEqual(True, t.listable())
 
335
        self.assertEqual(False, t.is_readonly())
319
336
 
320
337
    def test_iter_files_recursive(self):
321
 
        transport = MemoryTransport()
322
 
        transport.mkdir('dir')
323
 
        transport.put_bytes('dir/foo', 'content')
324
 
        transport.put_bytes('dir/bar', 'content')
325
 
        transport.put_bytes('bar', 'content')
326
 
        paths = set(transport.iter_files_recursive())
 
338
        t = memory.MemoryTransport()
 
339
        t.mkdir('dir')
 
340
        t.put_bytes('dir/foo', 'content')
 
341
        t.put_bytes('dir/bar', 'content')
 
342
        t.put_bytes('bar', 'content')
 
343
        paths = set(t.iter_files_recursive())
327
344
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
328
345
 
329
346
    def test_stat(self):
330
 
        transport = MemoryTransport()
331
 
        transport.put_bytes('foo', 'content')
332
 
        transport.put_bytes('bar', 'phowar')
333
 
        self.assertEqual(7, transport.stat('foo').st_size)
334
 
        self.assertEqual(6, transport.stat('bar').st_size)
335
 
 
336
 
 
337
 
class ChrootDecoratorTransportTest(TestCase):
 
347
        t = memory.MemoryTransport()
 
348
        t.put_bytes('foo', 'content')
 
349
        t.put_bytes('bar', 'phowar')
 
350
        self.assertEqual(7, t.stat('foo').st_size)
 
351
        self.assertEqual(6, t.stat('bar').st_size)
 
352
 
 
353
 
 
354
class ChrootDecoratorTransportTest(tests.TestCase):
338
355
    """Chroot decoration specific tests."""
339
356
 
340
357
    def test_abspath(self):
341
358
        # The abspath is always relative to the chroot_url.
342
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
343
 
        server.setUp()
344
 
        transport = get_transport(server.get_url())
345
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
359
        server = chroot.ChrootServer(
 
360
            transport.get_transport_from_url('memory:///foo/bar/'))
 
361
        self.start_server(server)
 
362
        t = transport.get_transport_from_url(server.get_url())
 
363
        self.assertEqual(server.get_url(), t.abspath('/'))
346
364
 
347
 
        subdir_transport = transport.clone('subdir')
348
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
349
 
        server.tearDown()
 
365
        subdir_t = t.clone('subdir')
 
366
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
350
367
 
351
368
    def test_clone(self):
352
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
353
 
        server.setUp()
354
 
        transport = get_transport(server.get_url())
 
369
        server = chroot.ChrootServer(
 
370
            transport.get_transport_from_url('memory:///foo/bar/'))
 
371
        self.start_server(server)
 
372
        t = transport.get_transport_from_url(server.get_url())
355
373
        # relpath from root and root path are the same
356
 
        relpath_cloned = transport.clone('foo')
357
 
        abspath_cloned = transport.clone('/foo')
 
374
        relpath_cloned = t.clone('foo')
 
375
        abspath_cloned = t.clone('/foo')
358
376
        self.assertEqual(server, relpath_cloned.server)
359
377
        self.assertEqual(server, abspath_cloned.server)
360
 
        server.tearDown()
361
 
    
 
378
 
362
379
    def test_chroot_url_preserves_chroot(self):
363
380
        """Calling get_transport on a chroot transport's base should produce a
364
381
        transport with exactly the same behaviour as the original chroot
367
384
        This is so that it is not possible to escape a chroot by doing::
368
385
            url = chroot_transport.base
369
386
            parent_url = urlutils.join(url, '..')
370
 
            new_transport = get_transport(parent_url)
 
387
            new_t = transport.get_transport_from_url(parent_url)
371
388
        """
372
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
373
 
        server.setUp()
374
 
        transport = get_transport(server.get_url())
375
 
        new_transport = get_transport(transport.base)
376
 
        self.assertEqual(transport.server, new_transport.server)
377
 
        self.assertEqual(transport.base, new_transport.base)
378
 
        server.tearDown()
379
 
        
 
389
        server = chroot.ChrootServer(
 
390
            transport.get_transport_from_url('memory:///path/subpath'))
 
391
        self.start_server(server)
 
392
        t = transport.get_transport_from_url(server.get_url())
 
393
        new_t = transport.get_transport_from_url(t.base)
 
394
        self.assertEqual(t.server, new_t.server)
 
395
        self.assertEqual(t.base, new_t.base)
 
396
 
380
397
    def test_urljoin_preserves_chroot(self):
381
398
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
382
399
        URL that escapes the intended chroot.
384
401
        This is so that it is not possible to escape a chroot by doing::
385
402
            url = chroot_transport.base
386
403
            parent_url = urlutils.join(url, '..')
387
 
            new_transport = get_transport(parent_url)
 
404
            new_t = transport.get_transport_from_url(parent_url)
388
405
        """
389
 
        server = ChrootServer(get_transport('memory:///path/'))
390
 
        server.setUp()
391
 
        transport = get_transport(server.get_url())
 
406
        server = chroot.ChrootServer(
 
407
            transport.get_transport_from_url('memory:///path/'))
 
408
        self.start_server(server)
 
409
        t = transport.get_transport_from_url(server.get_url())
392
410
        self.assertRaises(
393
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
394
 
        server.tearDown()
395
 
 
396
 
 
397
 
class ChrootServerTest(TestCase):
 
411
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
412
 
 
413
 
 
414
class TestChrootServer(tests.TestCase):
398
415
 
399
416
    def test_construct(self):
400
 
        backing_transport = MemoryTransport()
401
 
        server = ChrootServer(backing_transport)
 
417
        backing_transport = memory.MemoryTransport()
 
418
        server = chroot.ChrootServer(backing_transport)
402
419
        self.assertEqual(backing_transport, server.backing_transport)
403
420
 
404
421
    def test_setUp(self):
405
 
        backing_transport = MemoryTransport()
406
 
        server = ChrootServer(backing_transport)
407
 
        server.setUp()
408
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
422
        backing_transport = memory.MemoryTransport()
 
423
        server = chroot.ChrootServer(backing_transport)
 
424
        server.start_server()
 
425
        self.addCleanup(server.stop_server)
 
426
        self.assertTrue(server.scheme
 
427
                        in transport._get_protocol_handlers().keys())
409
428
 
410
 
    def test_tearDown(self):
411
 
        backing_transport = MemoryTransport()
412
 
        server = ChrootServer(backing_transport)
413
 
        server.setUp()
414
 
        server.tearDown()
415
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
429
    def test_stop_server(self):
 
430
        backing_transport = memory.MemoryTransport()
 
431
        server = chroot.ChrootServer(backing_transport)
 
432
        server.start_server()
 
433
        server.stop_server()
 
434
        self.assertFalse(server.scheme
 
435
                         in transport._get_protocol_handlers().keys())
416
436
 
417
437
    def test_get_url(self):
418
 
        backing_transport = MemoryTransport()
419
 
        server = ChrootServer(backing_transport)
420
 
        server.setUp()
 
438
        backing_transport = memory.MemoryTransport()
 
439
        server = chroot.ChrootServer(backing_transport)
 
440
        server.start_server()
 
441
        self.addCleanup(server.stop_server)
421
442
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
422
 
        server.tearDown()
423
 
 
424
 
 
425
 
class ReadonlyDecoratorTransportTest(TestCase):
 
443
 
 
444
 
 
445
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
446
    """Pathfilter decoration specific tests."""
 
447
 
 
448
    def test_abspath(self):
 
449
        # The abspath is always relative to the base of the backing transport.
 
450
        server = pathfilter.PathFilteringServer(
 
451
            transport.get_transport_from_url('memory:///foo/bar/'),
 
452
            lambda x: x)
 
453
        server.start_server()
 
454
        t = transport.get_transport_from_url(server.get_url())
 
455
        self.assertEqual(server.get_url(), t.abspath('/'))
 
456
 
 
457
        subdir_t = t.clone('subdir')
 
458
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
459
        server.stop_server()
 
460
 
 
461
    def make_pf_transport(self, filter_func=None):
 
462
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
463
 
 
464
        :param filter_func: by default this will be a no-op function.  Use this
 
465
            parameter to override it."""
 
466
        if filter_func is None:
 
467
            filter_func = lambda x: x
 
468
        server = pathfilter.PathFilteringServer(
 
469
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
470
        server.start_server()
 
471
        self.addCleanup(server.stop_server)
 
472
        return transport.get_transport_from_url(server.get_url())
 
473
 
 
474
    def test__filter(self):
 
475
        # _filter (with an identity func as filter_func) always returns
 
476
        # paths relative to the base of the backing transport.
 
477
        t = self.make_pf_transport()
 
478
        self.assertEqual('foo', t._filter('foo'))
 
479
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
480
        self.assertEqual('', t._filter('..'))
 
481
        self.assertEqual('', t._filter('/'))
 
482
        # The base of the pathfiltering transport is taken into account too.
 
483
        t = t.clone('subdir1/subdir2')
 
484
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
485
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
486
        self.assertEqual('subdir1', t._filter('..'))
 
487
        self.assertEqual('', t._filter('/'))
 
488
 
 
489
    def test_filter_invocation(self):
 
490
        filter_log = []
 
491
 
 
492
        def filter(path):
 
493
            filter_log.append(path)
 
494
            return path
 
495
        t = self.make_pf_transport(filter)
 
496
        t.has('abc')
 
497
        self.assertEqual(['abc'], filter_log)
 
498
        del filter_log[:]
 
499
        t.clone('abc').has('xyz')
 
500
        self.assertEqual(['abc/xyz'], filter_log)
 
501
        del filter_log[:]
 
502
        t.has('/abc')
 
503
        self.assertEqual(['abc'], filter_log)
 
504
 
 
505
    def test_clone(self):
 
506
        t = self.make_pf_transport()
 
507
        # relpath from root and root path are the same
 
508
        relpath_cloned = t.clone('foo')
 
509
        abspath_cloned = t.clone('/foo')
 
510
        self.assertEqual(t.server, relpath_cloned.server)
 
511
        self.assertEqual(t.server, abspath_cloned.server)
 
512
 
 
513
    def test_url_preserves_pathfiltering(self):
 
514
        """Calling get_transport on a pathfiltered transport's base should
 
515
        produce a transport with exactly the same behaviour as the original
 
516
        pathfiltered transport.
 
517
 
 
518
        This is so that it is not possible to escape (accidentally or
 
519
        otherwise) the filtering by doing::
 
520
            url = filtered_transport.base
 
521
            parent_url = urlutils.join(url, '..')
 
522
            new_t = transport.get_transport_from_url(parent_url)
 
523
        """
 
524
        t = self.make_pf_transport()
 
525
        new_t = transport.get_transport_from_url(t.base)
 
526
        self.assertEqual(t.server, new_t.server)
 
527
        self.assertEqual(t.base, new_t.base)
 
528
 
 
529
 
 
530
class ReadonlyDecoratorTransportTest(tests.TestCase):
426
531
    """Readonly decoration specific tests."""
427
532
 
428
533
    def test_local_parameters(self):
429
 
        import bzrlib.transport.readonly as readonly
430
534
        # connect to . in readonly mode
431
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
 
        self.assertEqual(True, transport.listable())
433
 
        self.assertEqual(True, transport.is_readonly())
 
535
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
536
        self.assertEqual(True, t.listable())
 
537
        self.assertEqual(True, t.is_readonly())
434
538
 
435
539
    def test_http_parameters(self):
436
540
        from bzrlib.tests.http_server import HttpServer
437
 
        import bzrlib.transport.readonly as readonly
438
541
        # connect to '.' via http which is not listable
439
542
        server = HttpServer()
440
 
        server.setUp()
441
 
        try:
442
 
            transport = get_transport('readonly+' + server.get_url())
443
 
            self.failUnless(isinstance(transport,
444
 
                                       readonly.ReadonlyTransportDecorator))
445
 
            self.assertEqual(False, transport.listable())
446
 
            self.assertEqual(True, transport.is_readonly())
447
 
        finally:
448
 
            server.tearDown()
449
 
 
450
 
 
451
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
543
        self.start_server(server)
 
544
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
545
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
546
        self.assertEqual(False, t.listable())
 
547
        self.assertEqual(True, t.is_readonly())
 
548
 
 
549
 
 
550
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
452
551
    """NFS decorator specific tests."""
453
552
 
454
553
    def get_nfs_transport(self, url):
455
 
        import bzrlib.transport.fakenfs as fakenfs
456
554
        # connect to url with nfs decoration
457
555
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
458
556
 
459
557
    def test_local_parameters(self):
460
558
        # the listable and is_readonly parameters
461
559
        # are not changed by the fakenfs decorator
462
 
        transport = self.get_nfs_transport('.')
463
 
        self.assertEqual(True, transport.listable())
464
 
        self.assertEqual(False, transport.is_readonly())
 
560
        t = self.get_nfs_transport('.')
 
561
        self.assertEqual(True, t.listable())
 
562
        self.assertEqual(False, t.is_readonly())
465
563
 
466
564
    def test_http_parameters(self):
467
565
        # the listable and is_readonly parameters
469
567
        from bzrlib.tests.http_server import HttpServer
470
568
        # connect to '.' via http which is not listable
471
569
        server = HttpServer()
472
 
        server.setUp()
473
 
        try:
474
 
            transport = self.get_nfs_transport(server.get_url())
475
 
            self.assertIsInstance(
476
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
 
            self.assertEqual(False, transport.listable())
478
 
            self.assertEqual(True, transport.is_readonly())
479
 
        finally:
480
 
            server.tearDown()
 
570
        self.start_server(server)
 
571
        t = self.get_nfs_transport(server.get_url())
 
572
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
573
        self.assertEqual(False, t.listable())
 
574
        self.assertEqual(True, t.is_readonly())
481
575
 
482
576
    def test_fakenfs_server_default(self):
483
577
        # a FakeNFSServer() should bring up a local relpath server for itself
484
 
        import bzrlib.transport.fakenfs as fakenfs
485
 
        server = fakenfs.FakeNFSServer()
486
 
        server.setUp()
487
 
        try:
488
 
            # the url should be decorated appropriately
489
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
490
 
            # and we should be able to get a transport for it
491
 
            transport = get_transport(server.get_url())
492
 
            # which must be a FakeNFSTransportDecorator instance.
493
 
            self.assertIsInstance(
494
 
                transport, fakenfs.FakeNFSTransportDecorator)
495
 
        finally:
496
 
            server.tearDown()
 
578
        server = test_server.FakeNFSServer()
 
579
        self.start_server(server)
 
580
        # the url should be decorated appropriately
 
581
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
582
        # and we should be able to get a transport for it
 
583
        t = transport.get_transport_from_url(server.get_url())
 
584
        # which must be a FakeNFSTransportDecorator instance.
 
585
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
497
586
 
498
587
    def test_fakenfs_rename_semantics(self):
499
588
        # a FakeNFS transport must mangle the way rename errors occur to
500
589
        # look like NFS problems.
501
 
        transport = self.get_nfs_transport('.')
 
590
        t = self.get_nfs_transport('.')
502
591
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
503
 
                        transport=transport)
504
 
        self.assertRaises(errors.ResourceBusy,
505
 
                          transport.rename, 'from', 'to')
506
 
 
507
 
 
508
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
592
                        transport=t)
 
593
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
594
 
 
595
 
 
596
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
509
597
    """Tests for simulation of VFAT restrictions"""
510
598
 
511
599
    def get_vfat_transport(self, url):
515
603
 
516
604
    def test_transport_creation(self):
517
605
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
518
 
        transport = self.get_vfat_transport('.')
519
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
606
        t = self.get_vfat_transport('.')
 
607
        self.assertIsInstance(t, FakeVFATTransportDecorator)
520
608
 
521
609
    def test_transport_mkdir(self):
522
 
        transport = self.get_vfat_transport('.')
523
 
        transport.mkdir('HELLO')
524
 
        self.assertTrue(transport.has('hello'))
525
 
        self.assertTrue(transport.has('Hello'))
 
610
        t = self.get_vfat_transport('.')
 
611
        t.mkdir('HELLO')
 
612
        self.assertTrue(t.has('hello'))
 
613
        self.assertTrue(t.has('Hello'))
526
614
 
527
615
    def test_forbidden_chars(self):
528
 
        transport = self.get_vfat_transport('.')
529
 
        self.assertRaises(ValueError, transport.has, "<NU>")
530
 
 
531
 
 
532
 
class BadTransportHandler(Transport):
 
616
        t = self.get_vfat_transport('.')
 
617
        self.assertRaises(ValueError, t.has, "<NU>")
 
618
 
 
619
 
 
620
class BadTransportHandler(transport.Transport):
533
621
    def __init__(self, base_url):
534
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
535
 
 
536
 
 
537
 
class BackupTransportHandler(Transport):
 
622
        raise errors.DependencyNotPresent('some_lib',
 
623
                                          'testing missing dependency')
 
624
 
 
625
 
 
626
class BackupTransportHandler(transport.Transport):
538
627
    """Test transport that works as a backup for the BadTransportHandler"""
539
628
    pass
540
629
 
541
630
 
542
 
class TestTransportImplementation(TestCaseInTempDir):
 
631
class TestTransportImplementation(tests.TestCaseInTempDir):
543
632
    """Implementation verification for transports.
544
 
    
 
633
 
545
634
    To verify a transport we need a server factory, which is a callable
546
635
    that accepts no parameters and returns an implementation of
547
636
    bzrlib.transport.Server.
548
 
    
 
637
 
549
638
    That Server is then used to construct transport instances and test
550
639
    the transport via loopback activity.
551
640
 
552
 
    Currently this assumes that the Transport object is connected to the 
553
 
    current working directory.  So that whatever is done 
554
 
    through the transport, should show up in the working 
 
641
    Currently this assumes that the Transport object is connected to the
 
642
    current working directory.  So that whatever is done
 
643
    through the transport, should show up in the working
555
644
    directory, and vice-versa. This is a bug, because its possible to have
556
 
    URL schemes which provide access to something that may not be 
557
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
645
    URL schemes which provide access to something that may not be
 
646
    result in storage on the local disk, i.e. due to file system limits, or
558
647
    due to it being a database or some other non-filesystem tool.
559
648
 
560
649
    This also tests to make sure that the functions work with both
561
650
    generators and lists (assuming iter(list) is effectively a generator)
562
651
    """
563
 
    
 
652
 
564
653
    def setUp(self):
565
654
        super(TestTransportImplementation, self).setUp()
566
655
        self._server = self.transport_server()
567
 
        self._server.setUp()
568
 
        self.addCleanup(self._server.tearDown)
 
656
        self.start_server(self._server)
569
657
 
570
658
    def get_transport(self, relpath=None):
571
659
        """Return a connected transport to the local directory.
575
663
        base_url = self._server.get_url()
576
664
        url = self._adjust_url(base_url, relpath)
577
665
        # try getting the transport via the regular interface:
578
 
        t = get_transport(url)
 
666
        t = transport.get_transport_from_url(url)
579
667
        # vila--20070607 if the following are commented out the test suite
580
668
        # still pass. Is this really still needed or was it a forgotten
581
669
        # temporary fix ?
586
674
        return t
587
675
 
588
676
 
589
 
class TestLocalTransports(TestCase):
 
677
class TestTransportFromPath(tests.TestCaseInTempDir):
 
678
 
 
679
    def test_with_path(self):
 
680
        t = transport.get_transport_from_path(self.test_dir)
 
681
        self.assertIsInstance(t, local.LocalTransport)
 
682
        self.assertEquals(t.base.rstrip("/"),
 
683
            urlutils.local_path_to_url(self.test_dir))
 
684
 
 
685
    def test_with_url(self):
 
686
        t = transport.get_transport_from_path("file:")
 
687
        self.assertIsInstance(t, local.LocalTransport)
 
688
        self.assertEquals(t.base.rstrip("/"),
 
689
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
690
 
 
691
 
 
692
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
693
 
 
694
    def test_with_path(self):
 
695
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
696
            self.test_dir)
 
697
 
 
698
    def test_with_url(self):
 
699
        url = urlutils.local_path_to_url(self.test_dir)
 
700
        t = transport.get_transport_from_url(url)
 
701
        self.assertIsInstance(t, local.LocalTransport)
 
702
        self.assertEquals(t.base.rstrip("/"), url)
 
703
 
 
704
    def test_with_url_and_segment_parameters(self):
 
705
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
706
        t = transport.get_transport_from_url(url)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEquals(t.base.rstrip("/"), url)
 
709
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
710
            f.write("data")
 
711
        self.assertTrue(t.has("afile"))
 
712
 
 
713
 
 
714
class TestLocalTransports(tests.TestCase):
590
715
 
591
716
    def test_get_transport_from_abspath(self):
592
717
        here = osutils.abspath('.')
593
 
        t = get_transport(here)
594
 
        self.assertIsInstance(t, LocalTransport)
 
718
        t = transport.get_transport(here)
 
719
        self.assertIsInstance(t, local.LocalTransport)
595
720
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
596
721
 
597
722
    def test_get_transport_from_relpath(self):
598
723
        here = osutils.abspath('.')
599
 
        t = get_transport('.')
600
 
        self.assertIsInstance(t, LocalTransport)
 
724
        t = transport.get_transport('.')
 
725
        self.assertIsInstance(t, local.LocalTransport)
601
726
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
602
727
 
603
728
    def test_get_transport_from_local_url(self):
604
729
        here = osutils.abspath('.')
605
730
        here_url = urlutils.local_path_to_url(here) + '/'
606
 
        t = get_transport(here_url)
607
 
        self.assertIsInstance(t, LocalTransport)
 
731
        t = transport.get_transport(here_url)
 
732
        self.assertIsInstance(t, local.LocalTransport)
608
733
        self.assertEquals(t.base, here_url)
609
734
 
610
735
    def test_local_abspath(self):
611
736
        here = osutils.abspath('.')
612
 
        t = get_transport(here)
 
737
        t = transport.get_transport(here)
613
738
        self.assertEquals(t.local_abspath(''), here)
614
739
 
615
740
 
616
 
class TestWin32LocalTransport(TestCase):
 
741
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
742
 
 
743
    def test_local_fdatasync_calls_fdatasync(self):
 
744
        """Check fdatasync on a stream tries to flush the data to the OS.
 
745
        
 
746
        We can't easily observe the external effect but we can at least see
 
747
        it's called.
 
748
        """
 
749
        sentinel = object()
 
750
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
751
        if fdatasync is sentinel:
 
752
            raise tests.TestNotApplicable('fdatasync not supported')
 
753
        t = self.get_transport('.')
 
754
        calls = self.recordCalls(os, 'fdatasync')
 
755
        w = t.open_write_stream('out')
 
756
        w.write('foo')
 
757
        w.fdatasync()
 
758
        with open('out', 'rb') as f:
 
759
            # Should have been flushed.
 
760
            self.assertEquals(f.read(), 'foo')
 
761
        self.assertEquals(len(calls), 1, calls)
 
762
 
 
763
    def test_missing_directory(self):
 
764
        t = self.get_transport('.')
 
765
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
766
 
 
767
 
 
768
class TestWin32LocalTransport(tests.TestCase):
617
769
 
618
770
    def test_unc_clone_to_root(self):
619
771
        # Win32 UNC path like \\HOST\path
620
772
        # clone to root should stop at least at \\HOST part
621
773
        # not on \\
622
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
774
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
623
775
        for i in xrange(4):
624
776
            t = t.clone('..')
625
777
        self.assertEquals(t.base, 'file://HOST/')
628
780
        self.assertEquals(t.base, 'file://HOST/')
629
781
 
630
782
 
631
 
class TestConnectedTransport(TestCase):
 
783
class TestConnectedTransport(tests.TestCase):
632
784
    """Tests for connected to remote server transports"""
633
785
 
634
786
    def test_parse_url(self):
635
 
        t = ConnectedTransport('http://simple.example.com/home/source')
636
 
        self.assertEquals(t._host, 'simple.example.com')
637
 
        self.assertEquals(t._port, None)
638
 
        self.assertEquals(t._path, '/home/source/')
639
 
        self.failUnless(t._user is None)
640
 
        self.failUnless(t._password is None)
 
787
        t = transport.ConnectedTransport(
 
788
            'http://simple.example.com/home/source')
 
789
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
790
        self.assertEquals(t._parsed_url.port, None)
 
791
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
792
        self.assertTrue(t._parsed_url.user is None)
 
793
        self.assertTrue(t._parsed_url.password is None)
641
794
 
642
795
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
643
796
 
 
797
    def test_parse_url_with_at_in_user(self):
 
798
        # Bug 228058
 
799
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
800
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
801
 
644
802
    def test_parse_quoted_url(self):
645
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
646
 
        self.assertEquals(t._host, 'exAmple.com')
647
 
        self.assertEquals(t._port, 2222)
648
 
        self.assertEquals(t._user, 'robey')
649
 
        self.assertEquals(t._password, 'h@t')
650
 
        self.assertEquals(t._path, '/path/')
 
803
        t = transport.ConnectedTransport(
 
804
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
805
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
806
        self.assertEquals(t._parsed_url.port, 2222)
 
807
        self.assertEquals(t._parsed_url.user, 'robey')
 
808
        self.assertEquals(t._parsed_url.password, 'h@t')
 
809
        self.assertEquals(t._parsed_url.path, '/path/')
651
810
 
652
811
        # Base should not keep track of the password
653
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
812
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
654
813
 
655
814
    def test_parse_invalid_url(self):
656
815
        self.assertRaises(errors.InvalidURL,
657
 
                          ConnectedTransport,
 
816
                          transport.ConnectedTransport,
658
817
                          'sftp://lily.org:~janneke/public/bzr/gub')
659
818
 
660
819
    def test_relpath(self):
661
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
820
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
662
821
 
663
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
822
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
823
            'sub')
664
824
        self.assertRaises(errors.PathNotChild, t.relpath,
665
825
                          'http://user@host.com/abs/path/sub')
666
826
        self.assertRaises(errors.PathNotChild, t.relpath,
670
830
        self.assertRaises(errors.PathNotChild, t.relpath,
671
831
                          'sftp://user@host.com:33/abs/path/sub')
672
832
        # Make sure it works when we don't supply a username
673
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
833
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
674
834
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
675
835
 
676
836
        # Make sure it works when parts of the path will be url encoded
677
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
837
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
678
838
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
679
839
 
680
840
    def test_connection_sharing_propagate_credentials(self):
681
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
682
 
        self.assertEquals('user', t._user)
683
 
        self.assertEquals('host.com', t._host)
 
841
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
842
        self.assertEquals('user', t._parsed_url.user)
 
843
        self.assertEquals('host.com', t._parsed_url.host)
684
844
        self.assertIs(None, t._get_connection())
685
 
        self.assertIs(None, t._password)
 
845
        self.assertIs(None, t._parsed_url.password)
686
846
        c = t.clone('subdir')
687
847
        self.assertIs(None, c._get_connection())
688
 
        self.assertIs(None, t._password)
 
848
        self.assertIs(None, t._parsed_url.password)
689
849
 
690
850
        # Simulate the user entering a password
691
851
        password = 'secret'
705
865
        self.assertIs(new_password, c._get_credentials())
706
866
 
707
867
 
708
 
class TestReusedTransports(TestCase):
 
868
class TestReusedTransports(tests.TestCase):
709
869
    """Tests for transport reuse"""
710
870
 
711
871
    def test_reuse_same_transport(self):
712
872
        possible_transports = []
713
 
        t1 = get_transport('http://foo/',
714
 
                           possible_transports=possible_transports)
 
873
        t1 = transport.get_transport_from_url('http://foo/',
 
874
                                     possible_transports=possible_transports)
715
875
        self.assertEqual([t1], possible_transports)
716
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
876
        t2 = transport.get_transport_from_url('http://foo/',
 
877
                                     possible_transports=[t1])
717
878
        self.assertIs(t1, t2)
718
879
 
719
880
        # Also check that final '/' are handled correctly
720
 
        t3 = get_transport('http://foo/path/')
721
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
881
        t3 = transport.get_transport_from_url('http://foo/path/')
 
882
        t4 = transport.get_transport_from_url('http://foo/path',
 
883
                                     possible_transports=[t3])
722
884
        self.assertIs(t3, t4)
723
885
 
724
 
        t5 = get_transport('http://foo/path')
725
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
886
        t5 = transport.get_transport_from_url('http://foo/path')
 
887
        t6 = transport.get_transport_from_url('http://foo/path/',
 
888
                                     possible_transports=[t5])
726
889
        self.assertIs(t5, t6)
727
890
 
728
891
    def test_don_t_reuse_different_transport(self):
729
 
        t1 = get_transport('http://foo/path')
730
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
892
        t1 = transport.get_transport_from_url('http://foo/path')
 
893
        t2 = transport.get_transport_from_url('http://bar/path',
 
894
                                     possible_transports=[t1])
731
895
        self.assertIsNot(t1, t2)
732
896
 
733
897
 
734
 
class TestTransportTrace(TestCase):
 
898
class TestTransportTrace(tests.TestCase):
735
899
 
736
 
    def test_get(self):
737
 
        transport = get_transport('trace+memory://')
 
900
    def test_decorator(self):
 
901
        t = transport.get_transport_from_url('trace+memory://')
738
902
        self.assertIsInstance(
739
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
903
            t, bzrlib.transport.trace.TransportTraceDecorator)
740
904
 
741
905
    def test_clone_preserves_activity(self):
742
 
        transport = get_transport('trace+memory://')
743
 
        transport2 = transport.clone('.')
744
 
        self.assertTrue(transport is not transport2)
745
 
        self.assertTrue(transport._activity is transport2._activity)
 
906
        t = transport.get_transport_from_url('trace+memory://')
 
907
        t2 = t.clone('.')
 
908
        self.assertTrue(t is not t2)
 
909
        self.assertTrue(t._activity is t2._activity)
746
910
 
747
911
    # the following specific tests are for the operations that have made use of
748
912
    # logging in tests; we could test every single operation but doing that
749
913
    # still won't cause a test failure when the top level Transport API
750
914
    # changes; so there is little return doing that.
751
915
    def test_get(self):
752
 
        transport = get_transport('trace+memory:///')
753
 
        transport.put_bytes('foo', 'barish')
754
 
        transport.get('foo')
 
916
        t = transport.get_transport_from_url('trace+memory:///')
 
917
        t.put_bytes('foo', 'barish')
 
918
        t.get('foo')
755
919
        expected_result = []
756
920
        # put_bytes records the bytes, not the content to avoid memory
757
921
        # pressure.
758
922
        expected_result.append(('put_bytes', 'foo', 6, None))
759
923
        # get records the file name only.
760
924
        expected_result.append(('get', 'foo'))
761
 
        self.assertEqual(expected_result, transport._activity)
 
925
        self.assertEqual(expected_result, t._activity)
762
926
 
763
927
    def test_readv(self):
764
 
        transport = get_transport('trace+memory:///')
765
 
        transport.put_bytes('foo', 'barish')
766
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
767
 
            upper_limit=6))
 
928
        t = transport.get_transport_from_url('trace+memory:///')
 
929
        t.put_bytes('foo', 'barish')
 
930
        list(t.readv('foo', [(0, 1), (3, 2)],
 
931
                     adjust_for_latency=True, upper_limit=6))
768
932
        expected_result = []
769
933
        # put_bytes records the bytes, not the content to avoid memory
770
934
        # pressure.
771
935
        expected_result.append(('put_bytes', 'foo', 6, None))
772
936
        # readv records the supplied offset request
773
937
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
774
 
        self.assertEqual(expected_result, transport._activity)
 
938
        self.assertEqual(expected_result, t._activity)
 
939
 
 
940
 
 
941
class TestSSHConnections(tests.TestCaseWithTransport):
 
942
 
 
943
    def test_bzr_connect_to_bzr_ssh(self):
 
944
        """get_transport of a bzr+ssh:// behaves correctly.
 
945
 
 
946
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
947
        """
 
948
        # This test actually causes a bzr instance to be invoked, which is very
 
949
        # expensive: it should be the only such test in the test suite.
 
950
        # A reasonable evolution for this would be to simply check inside
 
951
        # check_channel_exec_request that the command is appropriate, and then
 
952
        # satisfy requests in-process.
 
953
        self.requireFeature(features.paramiko)
 
954
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
955
        # override the interface (doesn't change self._vendor).
 
956
        # Note that this does encryption, so can be slow.
 
957
        from bzrlib.tests import stub_sftp
 
958
 
 
959
        # Start an SSH server
 
960
        self.command_executed = []
 
961
        # XXX: This is horrible -- we define a really dumb SSH server that
 
962
        # executes commands, and manage the hooking up of stdin/out/err to the
 
963
        # SSH channel ourselves.  Surely this has already been implemented
 
964
        # elsewhere?
 
965
        started = []
 
966
 
 
967
        class StubSSHServer(stub_sftp.StubServer):
 
968
 
 
969
            test = self
 
970
 
 
971
            def check_channel_exec_request(self, channel, command):
 
972
                self.test.command_executed.append(command)
 
973
                proc = subprocess.Popen(
 
974
                    command, shell=True, stdin=subprocess.PIPE,
 
975
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
976
 
 
977
                # XXX: horribly inefficient, not to mention ugly.
 
978
                # Start a thread for each of stdin/out/err, and relay bytes
 
979
                # from the subprocess to channel and vice versa.
 
980
                def ferry_bytes(read, write, close):
 
981
                    while True:
 
982
                        bytes = read(1)
 
983
                        if bytes == '':
 
984
                            close()
 
985
                            break
 
986
                        write(bytes)
 
987
 
 
988
                file_functions = [
 
989
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
990
                    (proc.stdout.read, channel.sendall, channel.close),
 
991
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
992
                started.append(proc)
 
993
                for read, write, close in file_functions:
 
994
                    t = threading.Thread(
 
995
                        target=ferry_bytes, args=(read, write, close))
 
996
                    t.start()
 
997
                    started.append(t)
 
998
 
 
999
                return True
 
1000
 
 
1001
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1002
        # We *don't* want to override the default SSH vendor: the detected one
 
1003
        # is the one to use.
 
1004
 
 
1005
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1006
        # inherits from SFTPServer which forces the SSH vendor to
 
1007
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1008
        self.start_server(ssh_server)
 
1009
        port = ssh_server.port
 
1010
 
 
1011
        if sys.platform == 'win32':
 
1012
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1013
        else:
 
1014
            bzr_remote_path = self.get_bzr_path()
 
1015
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1016
 
 
1017
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1018
        # variable is used to tell bzr what command to run on the remote end.
 
1019
        path_to_branch = osutils.abspath('.')
 
1020
        if sys.platform == 'win32':
 
1021
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1022
            # prefix a '/' to get the right path.
 
1023
            path_to_branch = '/' + path_to_branch
 
1024
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1025
        t = transport.get_transport(url)
 
1026
        self.permit_url(t.base)
 
1027
        t.mkdir('foo')
 
1028
 
 
1029
        self.assertEqual(
 
1030
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1031
            self.command_executed)
 
1032
        # Make sure to disconnect, so that the remote process can stop, and we
 
1033
        # can cleanup. Then pause the test until everything is shutdown
 
1034
        t._client._medium.disconnect()
 
1035
        if not started:
 
1036
            return
 
1037
        # First wait for the subprocess
 
1038
        started[0].wait()
 
1039
        # And the rest are threads
 
1040
        for t in started[1:]:
 
1041
            t.join()
 
1042
 
 
1043
 
 
1044
class TestUnhtml(tests.TestCase):
 
1045
 
 
1046
    """Tests for unhtml_roughly"""
 
1047
 
 
1048
    def test_truncation(self):
 
1049
        fake_html = "<p>something!\n" * 1000
 
1050
        result = http.unhtml_roughly(fake_html)
 
1051
        self.assertEquals(len(result), 1000)
 
1052
        self.assertStartsWith(result, " something!")
 
1053
 
 
1054
 
 
1055
class SomeDirectory(object):
 
1056
 
 
1057
    def look_up(self, name, url):
 
1058
        return "http://bar"
 
1059
 
 
1060
 
 
1061
class TestLocationToUrl(tests.TestCase):
 
1062
 
 
1063
    def get_base_location(self):
 
1064
        path = osutils.abspath('/foo/bar')
 
1065
        if path.startswith('/'):
 
1066
            url = 'file://%s' % (path,)
 
1067
        else:
 
1068
            # On Windows, abspaths start with the drive letter, so we have to
 
1069
            # add in the extra '/'
 
1070
            url = 'file:///%s' % (path,)
 
1071
        return path, url
 
1072
 
 
1073
    def test_regular_url(self):
 
1074
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1075
 
 
1076
    def test_directory(self):
 
1077
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1078
        self.addCleanup(directories.remove, "bar:")
 
1079
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1080
 
 
1081
    def test_unicode_url(self):
 
1082
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1083
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1084
 
 
1085
    def test_unicode_path(self):
 
1086
        path, url = self.get_base_location()
 
1087
        location = path + "\xc3\xaf".decode("utf-8")
 
1088
        url += '%C3%AF'
 
1089
        self.assertEquals(url, location_to_url(location))
 
1090
 
 
1091
    def test_path(self):
 
1092
        path, url = self.get_base_location()
 
1093
        self.assertEquals(url, location_to_url(path))
 
1094
 
 
1095
    def test_relative_file_url(self):
 
1096
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1097
            location_to_url("file:bar"))
 
1098
 
 
1099
    def test_absolute_file_url(self):
 
1100
        self.assertEquals("file:///bar", location_to_url("file:/bar"))