~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Patch Queue Manager
  • Date: 2011-10-14 16:54:26 UTC
  • mfrom: (6216.1.1 remove-this-file)
  • Revision ID: pqm@pqm.ubuntu.com-20111014165426-tjix4e6idryf1r2z
(jelmer) Remove an accidentally committed .THIS file. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
 
import bzrlib
21
24
from bzrlib import (
22
25
    errors,
23
26
    osutils,
 
27
    tests,
 
28
    transport,
24
29
    urlutils,
25
30
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
31
from bzrlib.directory_service import directories
 
32
from bzrlib.transport import (
 
33
    chroot,
 
34
    fakenfs,
 
35
    http,
 
36
    local,
 
37
    location_to_url,
 
38
    memory,
 
39
    pathfilter,
 
40
    readonly,
 
41
    )
 
42
import bzrlib.transport.trace
 
43
from bzrlib.tests import (
 
44
    features,
 
45
    test_server,
 
46
    )
51
47
 
52
48
 
53
49
# TODO: Should possibly split transport-specific tests into their own files.
54
50
 
55
51
 
56
 
class TestTransport(TestCase):
 
52
class TestTransport(tests.TestCase):
57
53
    """Test the non transport-concrete class functionality."""
58
54
 
59
55
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
62
 
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
65
 
        finally:
66
 
            _set_protocol_handlers(handlers)
 
56
        handlers = transport._get_protocol_handlers()
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
67
61
 
68
62
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
70
65
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
66
        transport._clear_protocol_handlers()
 
67
 
72
68
        class SampleHandler(object):
73
69
            """I exist, isnt that enough?"""
74
 
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
85
 
        finally:
86
 
            _set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
87
83
 
88
84
    def test_transport_dependency(self):
89
85
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
88
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
93
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
110
103
 
111
104
    def test_transport_fallback(self):
112
105
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
114
 
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
106
        saved_handlers = transport._get_protocol_handlers()
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
125
116
 
126
117
    def test_ssh_hints(self):
127
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
119
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
 
121
        except errors.UnsupportedProtocol, e:
131
122
            e_str = str(e)
132
123
            self.assertEquals('Unsupported protocol'
133
124
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
125
                              ' bzr supports bzr+ssh to operate over ssh,'
 
126
                              ' use "bzr+ssh://fooserver/foo".',
135
127
                              str(e))
136
128
        else:
137
129
            self.fail('Did not raise UnsupportedProtocol')
138
130
 
139
131
    def test_LateReadError(self):
140
132
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
 
133
        a_file = transport.LateReadError('a path')
142
134
        try:
143
135
            a_file.read()
144
 
        except ReadError, error:
 
136
        except errors.ReadError, error:
145
137
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
 
138
        self.assertRaises(errors.ReadError, a_file.read, 40)
147
139
        a_file.close()
148
140
 
149
 
    def test__combine_paths(self):
150
 
        t = Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
141
    def test_local_abspath_non_local_transport(self):
161
142
        # the base implementation should throw
162
 
        t = MemoryTransport()
 
143
        t = memory.MemoryTransport()
163
144
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
145
        self.assertEqual('memory:///t is not a local path.', str(e))
165
146
 
166
147
 
167
 
class TestCoalesceOffsets(TestCase):
 
148
class TestCoalesceOffsets(tests.TestCase):
168
149
 
169
150
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
 
        coalesce = Transport._coalesce_offsets
171
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
151
        coalesce = transport.Transport._coalesce_offsets
 
152
        exp = [transport._CoalescedOffset(*x) for x in expected]
172
153
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
154
                            max_size=max_size))
174
155
        self.assertEqual(exp, out)
219
200
 
220
201
    def test_coalesce_fudge(self):
221
202
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
 
203
                    (100, 10, [(0, 10)]),
223
204
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
 
205
                   fudge=10)
 
206
 
226
207
    def test_coalesce_max_size(self):
227
208
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
209
                    (30, 50, [(0, 50)]),
229
210
                    # If one range is above max_size, it gets its own coalesced
230
211
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
 
212
                    (100, 80, [(0, 80)]),],
232
213
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
 
214
                   max_size=50)
235
215
 
236
216
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
217
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
218
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
219
                  )
240
220
 
241
221
    def test_coalesce_default_limit(self):
242
222
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
223
        ten_mb = 10 * 1024 * 1024
 
224
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
225
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
226
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
227
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
228
                   [(i * ten_mb, ten_mb) for i in range(11)],
249
229
                   max_size=1*1024*1024*1024)
250
230
 
251
231
 
252
 
class TestMemoryTransport(TestCase):
 
232
class TestMemoryServer(tests.TestCase):
 
233
 
 
234
    def test_create_server(self):
 
235
        server = memory.MemoryServer()
 
236
        server.start_server()
 
237
        url = server.get_url()
 
238
        self.assertTrue(url in transport.transport_list_registry)
 
239
        t = transport.get_transport_from_url(url)
 
240
        del t
 
241
        server.stop_server()
 
242
        self.assertFalse(url in transport.transport_list_registry)
 
243
        self.assertRaises(errors.UnsupportedProtocol,
 
244
                          transport.get_transport, url)
 
245
 
 
246
 
 
247
class TestMemoryTransport(tests.TestCase):
253
248
 
254
249
    def test_get_transport(self):
255
 
        MemoryTransport()
 
250
        memory.MemoryTransport()
256
251
 
257
252
    def test_clone(self):
258
 
        transport = MemoryTransport()
259
 
        self.assertTrue(isinstance(transport, MemoryTransport))
260
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
253
        t = memory.MemoryTransport()
 
254
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
255
        self.assertEqual("memory:///", t.clone("/").base)
261
256
 
262
257
    def test_abspath(self):
263
 
        transport = MemoryTransport()
264
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
258
        t = memory.MemoryTransport()
 
259
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
265
260
 
266
261
    def test_abspath_of_root(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual("memory:///", transport.base)
269
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
262
        t = memory.MemoryTransport()
 
263
        self.assertEqual("memory:///", t.base)
 
264
        self.assertEqual("memory:///", t.abspath('/'))
270
265
 
271
266
    def test_abspath_of_relpath_starting_at_root(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
267
        t = memory.MemoryTransport()
 
268
        self.assertEqual("memory:///foo", t.abspath('/foo'))
274
269
 
275
270
    def test_append_and_get(self):
276
 
        transport = MemoryTransport()
277
 
        transport.append_bytes('path', 'content')
278
 
        self.assertEqual(transport.get('path').read(), 'content')
279
 
        transport.append_file('path', StringIO('content'))
280
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
271
        t = memory.MemoryTransport()
 
272
        t.append_bytes('path', 'content')
 
273
        self.assertEqual(t.get('path').read(), 'content')
 
274
        t.append_file('path', StringIO('content'))
 
275
        self.assertEqual(t.get('path').read(), 'contentcontent')
281
276
 
282
277
    def test_put_and_get(self):
283
 
        transport = MemoryTransport()
284
 
        transport.put_file('path', StringIO('content'))
285
 
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.put_bytes('path', 'content')
287
 
        self.assertEqual(transport.get('path').read(), 'content')
 
278
        t = memory.MemoryTransport()
 
279
        t.put_file('path', StringIO('content'))
 
280
        self.assertEqual(t.get('path').read(), 'content')
 
281
        t.put_bytes('path', 'content')
 
282
        self.assertEqual(t.get('path').read(), 'content')
288
283
 
289
284
    def test_append_without_dir_fails(self):
290
 
        transport = MemoryTransport()
291
 
        self.assertRaises(NoSuchFile,
292
 
                          transport.append_bytes, 'dir/path', 'content')
 
285
        t = memory.MemoryTransport()
 
286
        self.assertRaises(errors.NoSuchFile,
 
287
                          t.append_bytes, 'dir/path', 'content')
293
288
 
294
289
    def test_put_without_dir_fails(self):
295
 
        transport = MemoryTransport()
296
 
        self.assertRaises(NoSuchFile,
297
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
290
        t = memory.MemoryTransport()
 
291
        self.assertRaises(errors.NoSuchFile,
 
292
                          t.put_file, 'dir/path', StringIO('content'))
298
293
 
299
294
    def test_get_missing(self):
300
 
        transport = MemoryTransport()
301
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
295
        transport = memory.MemoryTransport()
 
296
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
302
297
 
303
298
    def test_has_missing(self):
304
 
        transport = MemoryTransport()
305
 
        self.assertEquals(False, transport.has('foo'))
 
299
        t = memory.MemoryTransport()
 
300
        self.assertEquals(False, t.has('foo'))
306
301
 
307
302
    def test_has_present(self):
308
 
        transport = MemoryTransport()
309
 
        transport.append_bytes('foo', 'content')
310
 
        self.assertEquals(True, transport.has('foo'))
 
303
        t = memory.MemoryTransport()
 
304
        t.append_bytes('foo', 'content')
 
305
        self.assertEquals(True, t.has('foo'))
311
306
 
312
307
    def test_list_dir(self):
313
 
        transport = MemoryTransport()
314
 
        transport.put_bytes('foo', 'content')
315
 
        transport.mkdir('dir')
316
 
        transport.put_bytes('dir/subfoo', 'content')
317
 
        transport.put_bytes('dirlike', 'content')
 
308
        t = memory.MemoryTransport()
 
309
        t.put_bytes('foo', 'content')
 
310
        t.mkdir('dir')
 
311
        t.put_bytes('dir/subfoo', 'content')
 
312
        t.put_bytes('dirlike', 'content')
318
313
 
319
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
314
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
315
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
321
316
 
322
317
    def test_mkdir(self):
323
 
        transport = MemoryTransport()
324
 
        transport.mkdir('dir')
325
 
        transport.append_bytes('dir/path', 'content')
326
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
318
        t = memory.MemoryTransport()
 
319
        t.mkdir('dir')
 
320
        t.append_bytes('dir/path', 'content')
 
321
        self.assertEqual(t.get('dir/path').read(), 'content')
327
322
 
328
323
    def test_mkdir_missing_parent(self):
329
 
        transport = MemoryTransport()
330
 
        self.assertRaises(NoSuchFile,
331
 
                          transport.mkdir, 'dir/dir')
 
324
        t = memory.MemoryTransport()
 
325
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
332
326
 
333
327
    def test_mkdir_twice(self):
334
 
        transport = MemoryTransport()
335
 
        transport.mkdir('dir')
336
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
328
        t = memory.MemoryTransport()
 
329
        t.mkdir('dir')
 
330
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
337
331
 
338
332
    def test_parameters(self):
339
 
        transport = MemoryTransport()
340
 
        self.assertEqual(True, transport.listable())
341
 
        self.assertEqual(False, transport.is_readonly())
 
333
        t = memory.MemoryTransport()
 
334
        self.assertEqual(True, t.listable())
 
335
        self.assertEqual(False, t.is_readonly())
342
336
 
343
337
    def test_iter_files_recursive(self):
344
 
        transport = MemoryTransport()
345
 
        transport.mkdir('dir')
346
 
        transport.put_bytes('dir/foo', 'content')
347
 
        transport.put_bytes('dir/bar', 'content')
348
 
        transport.put_bytes('bar', 'content')
349
 
        paths = set(transport.iter_files_recursive())
 
338
        t = memory.MemoryTransport()
 
339
        t.mkdir('dir')
 
340
        t.put_bytes('dir/foo', 'content')
 
341
        t.put_bytes('dir/bar', 'content')
 
342
        t.put_bytes('bar', 'content')
 
343
        paths = set(t.iter_files_recursive())
350
344
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
351
345
 
352
346
    def test_stat(self):
353
 
        transport = MemoryTransport()
354
 
        transport.put_bytes('foo', 'content')
355
 
        transport.put_bytes('bar', 'phowar')
356
 
        self.assertEqual(7, transport.stat('foo').st_size)
357
 
        self.assertEqual(6, transport.stat('bar').st_size)
358
 
 
359
 
 
360
 
class ChrootDecoratorTransportTest(TestCase):
 
347
        t = memory.MemoryTransport()
 
348
        t.put_bytes('foo', 'content')
 
349
        t.put_bytes('bar', 'phowar')
 
350
        self.assertEqual(7, t.stat('foo').st_size)
 
351
        self.assertEqual(6, t.stat('bar').st_size)
 
352
 
 
353
 
 
354
class ChrootDecoratorTransportTest(tests.TestCase):
361
355
    """Chroot decoration specific tests."""
362
356
 
363
357
    def test_abspath(self):
364
358
        # The abspath is always relative to the chroot_url.
365
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
367
 
        transport = get_transport(server.get_url())
368
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
359
        server = chroot.ChrootServer(
 
360
            transport.get_transport_from_url('memory:///foo/bar/'))
 
361
        self.start_server(server)
 
362
        t = transport.get_transport_from_url(server.get_url())
 
363
        self.assertEqual(server.get_url(), t.abspath('/'))
369
364
 
370
 
        subdir_transport = transport.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
 
365
        subdir_t = t.clone('subdir')
 
366
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
373
367
 
374
368
    def test_clone(self):
375
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
377
 
        transport = get_transport(server.get_url())
 
369
        server = chroot.ChrootServer(
 
370
            transport.get_transport_from_url('memory:///foo/bar/'))
 
371
        self.start_server(server)
 
372
        t = transport.get_transport_from_url(server.get_url())
378
373
        # relpath from root and root path are the same
379
 
        relpath_cloned = transport.clone('foo')
380
 
        abspath_cloned = transport.clone('/foo')
 
374
        relpath_cloned = t.clone('foo')
 
375
        abspath_cloned = t.clone('/foo')
381
376
        self.assertEqual(server, relpath_cloned.server)
382
377
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
378
 
385
379
    def test_chroot_url_preserves_chroot(self):
386
380
        """Calling get_transport on a chroot transport's base should produce a
390
384
        This is so that it is not possible to escape a chroot by doing::
391
385
            url = chroot_transport.base
392
386
            parent_url = urlutils.join(url, '..')
393
 
            new_transport = get_transport(parent_url)
 
387
            new_t = transport.get_transport_from_url(parent_url)
394
388
        """
395
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
397
 
        transport = get_transport(server.get_url())
398
 
        new_transport = get_transport(transport.base)
399
 
        self.assertEqual(transport.server, new_transport.server)
400
 
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
 
389
        server = chroot.ChrootServer(
 
390
            transport.get_transport_from_url('memory:///path/subpath'))
 
391
        self.start_server(server)
 
392
        t = transport.get_transport_from_url(server.get_url())
 
393
        new_t = transport.get_transport_from_url(t.base)
 
394
        self.assertEqual(t.server, new_t.server)
 
395
        self.assertEqual(t.base, new_t.base)
402
396
 
403
397
    def test_urljoin_preserves_chroot(self):
404
398
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
407
401
        This is so that it is not possible to escape a chroot by doing::
408
402
            url = chroot_transport.base
409
403
            parent_url = urlutils.join(url, '..')
410
 
            new_transport = get_transport(parent_url)
 
404
            new_t = transport.get_transport_from_url(parent_url)
411
405
        """
412
 
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
414
 
        transport = get_transport(server.get_url())
 
406
        server = chroot.ChrootServer(
 
407
            transport.get_transport_from_url('memory:///path/'))
 
408
        self.start_server(server)
 
409
        t = transport.get_transport_from_url(server.get_url())
415
410
        self.assertRaises(
416
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
 
 
419
 
 
420
 
class ChrootServerTest(TestCase):
 
411
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
412
 
 
413
 
 
414
class TestChrootServer(tests.TestCase):
421
415
 
422
416
    def test_construct(self):
423
 
        backing_transport = MemoryTransport()
424
 
        server = ChrootServer(backing_transport)
 
417
        backing_transport = memory.MemoryTransport()
 
418
        server = chroot.ChrootServer(backing_transport)
425
419
        self.assertEqual(backing_transport, server.backing_transport)
426
420
 
427
421
    def test_setUp(self):
428
 
        backing_transport = MemoryTransport()
429
 
        server = ChrootServer(backing_transport)
430
 
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
422
        backing_transport = memory.MemoryTransport()
 
423
        server = chroot.ChrootServer(backing_transport)
 
424
        server.start_server()
 
425
        self.addCleanup(server.stop_server)
 
426
        self.assertTrue(server.scheme
 
427
                        in transport._get_protocol_handlers().keys())
432
428
 
433
 
    def test_tearDown(self):
434
 
        backing_transport = MemoryTransport()
435
 
        server = ChrootServer(backing_transport)
436
 
        server.setUp()
437
 
        server.tearDown()
438
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
429
    def test_stop_server(self):
 
430
        backing_transport = memory.MemoryTransport()
 
431
        server = chroot.ChrootServer(backing_transport)
 
432
        server.start_server()
 
433
        server.stop_server()
 
434
        self.assertFalse(server.scheme
 
435
                         in transport._get_protocol_handlers().keys())
439
436
 
440
437
    def test_get_url(self):
441
 
        backing_transport = MemoryTransport()
442
 
        server = ChrootServer(backing_transport)
443
 
        server.setUp()
 
438
        backing_transport = memory.MemoryTransport()
 
439
        server = chroot.ChrootServer(backing_transport)
 
440
        server.start_server()
 
441
        self.addCleanup(server.stop_server)
444
442
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        server.tearDown()
446
 
 
447
 
 
448
 
class ReadonlyDecoratorTransportTest(TestCase):
 
443
 
 
444
 
 
445
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
446
    """Pathfilter decoration specific tests."""
 
447
 
 
448
    def test_abspath(self):
 
449
        # The abspath is always relative to the base of the backing transport.
 
450
        server = pathfilter.PathFilteringServer(
 
451
            transport.get_transport_from_url('memory:///foo/bar/'),
 
452
            lambda x: x)
 
453
        server.start_server()
 
454
        t = transport.get_transport_from_url(server.get_url())
 
455
        self.assertEqual(server.get_url(), t.abspath('/'))
 
456
 
 
457
        subdir_t = t.clone('subdir')
 
458
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
459
        server.stop_server()
 
460
 
 
461
    def make_pf_transport(self, filter_func=None):
 
462
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
463
 
 
464
        :param filter_func: by default this will be a no-op function.  Use this
 
465
            parameter to override it."""
 
466
        if filter_func is None:
 
467
            filter_func = lambda x: x
 
468
        server = pathfilter.PathFilteringServer(
 
469
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
470
        server.start_server()
 
471
        self.addCleanup(server.stop_server)
 
472
        return transport.get_transport_from_url(server.get_url())
 
473
 
 
474
    def test__filter(self):
 
475
        # _filter (with an identity func as filter_func) always returns
 
476
        # paths relative to the base of the backing transport.
 
477
        t = self.make_pf_transport()
 
478
        self.assertEqual('foo', t._filter('foo'))
 
479
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
480
        self.assertEqual('', t._filter('..'))
 
481
        self.assertEqual('', t._filter('/'))
 
482
        # The base of the pathfiltering transport is taken into account too.
 
483
        t = t.clone('subdir1/subdir2')
 
484
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
485
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
486
        self.assertEqual('subdir1', t._filter('..'))
 
487
        self.assertEqual('', t._filter('/'))
 
488
 
 
489
    def test_filter_invocation(self):
 
490
        filter_log = []
 
491
 
 
492
        def filter(path):
 
493
            filter_log.append(path)
 
494
            return path
 
495
        t = self.make_pf_transport(filter)
 
496
        t.has('abc')
 
497
        self.assertEqual(['abc'], filter_log)
 
498
        del filter_log[:]
 
499
        t.clone('abc').has('xyz')
 
500
        self.assertEqual(['abc/xyz'], filter_log)
 
501
        del filter_log[:]
 
502
        t.has('/abc')
 
503
        self.assertEqual(['abc'], filter_log)
 
504
 
 
505
    def test_clone(self):
 
506
        t = self.make_pf_transport()
 
507
        # relpath from root and root path are the same
 
508
        relpath_cloned = t.clone('foo')
 
509
        abspath_cloned = t.clone('/foo')
 
510
        self.assertEqual(t.server, relpath_cloned.server)
 
511
        self.assertEqual(t.server, abspath_cloned.server)
 
512
 
 
513
    def test_url_preserves_pathfiltering(self):
 
514
        """Calling get_transport on a pathfiltered transport's base should
 
515
        produce a transport with exactly the same behaviour as the original
 
516
        pathfiltered transport.
 
517
 
 
518
        This is so that it is not possible to escape (accidentally or
 
519
        otherwise) the filtering by doing::
 
520
            url = filtered_transport.base
 
521
            parent_url = urlutils.join(url, '..')
 
522
            new_t = transport.get_transport_from_url(parent_url)
 
523
        """
 
524
        t = self.make_pf_transport()
 
525
        new_t = transport.get_transport_from_url(t.base)
 
526
        self.assertEqual(t.server, new_t.server)
 
527
        self.assertEqual(t.base, new_t.base)
 
528
 
 
529
 
 
530
class ReadonlyDecoratorTransportTest(tests.TestCase):
449
531
    """Readonly decoration specific tests."""
450
532
 
451
533
    def test_local_parameters(self):
452
 
        import bzrlib.transport.readonly as readonly
453
534
        # connect to . in readonly mode
454
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
 
        self.assertEqual(True, transport.listable())
456
 
        self.assertEqual(True, transport.is_readonly())
 
535
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
536
        self.assertEqual(True, t.listable())
 
537
        self.assertEqual(True, t.is_readonly())
457
538
 
458
539
    def test_http_parameters(self):
459
540
        from bzrlib.tests.http_server import HttpServer
460
 
        import bzrlib.transport.readonly as readonly
461
541
        # connect to '.' via http which is not listable
462
542
        server = HttpServer()
463
 
        server.setUp()
464
 
        try:
465
 
            transport = get_transport('readonly+' + server.get_url())
466
 
            self.failUnless(isinstance(transport,
467
 
                                       readonly.ReadonlyTransportDecorator))
468
 
            self.assertEqual(False, transport.listable())
469
 
            self.assertEqual(True, transport.is_readonly())
470
 
        finally:
471
 
            server.tearDown()
472
 
 
473
 
 
474
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
543
        self.start_server(server)
 
544
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
545
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
546
        self.assertEqual(False, t.listable())
 
547
        self.assertEqual(True, t.is_readonly())
 
548
 
 
549
 
 
550
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
475
551
    """NFS decorator specific tests."""
476
552
 
477
553
    def get_nfs_transport(self, url):
478
 
        import bzrlib.transport.fakenfs as fakenfs
479
554
        # connect to url with nfs decoration
480
555
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
556
 
482
557
    def test_local_parameters(self):
483
558
        # the listable and is_readonly parameters
484
559
        # are not changed by the fakenfs decorator
485
 
        transport = self.get_nfs_transport('.')
486
 
        self.assertEqual(True, transport.listable())
487
 
        self.assertEqual(False, transport.is_readonly())
 
560
        t = self.get_nfs_transport('.')
 
561
        self.assertEqual(True, t.listable())
 
562
        self.assertEqual(False, t.is_readonly())
488
563
 
489
564
    def test_http_parameters(self):
490
565
        # the listable and is_readonly parameters
492
567
        from bzrlib.tests.http_server import HttpServer
493
568
        # connect to '.' via http which is not listable
494
569
        server = HttpServer()
495
 
        server.setUp()
496
 
        try:
497
 
            transport = self.get_nfs_transport(server.get_url())
498
 
            self.assertIsInstance(
499
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
 
            self.assertEqual(False, transport.listable())
501
 
            self.assertEqual(True, transport.is_readonly())
502
 
        finally:
503
 
            server.tearDown()
 
570
        self.start_server(server)
 
571
        t = self.get_nfs_transport(server.get_url())
 
572
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
573
        self.assertEqual(False, t.listable())
 
574
        self.assertEqual(True, t.is_readonly())
504
575
 
505
576
    def test_fakenfs_server_default(self):
506
577
        # a FakeNFSServer() should bring up a local relpath server for itself
507
 
        import bzrlib.transport.fakenfs as fakenfs
508
 
        server = fakenfs.FakeNFSServer()
509
 
        server.setUp()
510
 
        try:
511
 
            # the url should be decorated appropriately
512
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
513
 
            # and we should be able to get a transport for it
514
 
            transport = get_transport(server.get_url())
515
 
            # which must be a FakeNFSTransportDecorator instance.
516
 
            self.assertIsInstance(
517
 
                transport, fakenfs.FakeNFSTransportDecorator)
518
 
        finally:
519
 
            server.tearDown()
 
578
        server = test_server.FakeNFSServer()
 
579
        self.start_server(server)
 
580
        # the url should be decorated appropriately
 
581
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
582
        # and we should be able to get a transport for it
 
583
        t = transport.get_transport_from_url(server.get_url())
 
584
        # which must be a FakeNFSTransportDecorator instance.
 
585
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
520
586
 
521
587
    def test_fakenfs_rename_semantics(self):
522
588
        # a FakeNFS transport must mangle the way rename errors occur to
523
589
        # look like NFS problems.
524
 
        transport = self.get_nfs_transport('.')
 
590
        t = self.get_nfs_transport('.')
525
591
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
526
 
                        transport=transport)
527
 
        self.assertRaises(errors.ResourceBusy,
528
 
                          transport.rename, 'from', 'to')
529
 
 
530
 
 
531
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
592
                        transport=t)
 
593
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
594
 
 
595
 
 
596
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
532
597
    """Tests for simulation of VFAT restrictions"""
533
598
 
534
599
    def get_vfat_transport(self, url):
538
603
 
539
604
    def test_transport_creation(self):
540
605
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
541
 
        transport = self.get_vfat_transport('.')
542
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
606
        t = self.get_vfat_transport('.')
 
607
        self.assertIsInstance(t, FakeVFATTransportDecorator)
543
608
 
544
609
    def test_transport_mkdir(self):
545
 
        transport = self.get_vfat_transport('.')
546
 
        transport.mkdir('HELLO')
547
 
        self.assertTrue(transport.has('hello'))
548
 
        self.assertTrue(transport.has('Hello'))
 
610
        t = self.get_vfat_transport('.')
 
611
        t.mkdir('HELLO')
 
612
        self.assertTrue(t.has('hello'))
 
613
        self.assertTrue(t.has('Hello'))
549
614
 
550
615
    def test_forbidden_chars(self):
551
 
        transport = self.get_vfat_transport('.')
552
 
        self.assertRaises(ValueError, transport.has, "<NU>")
553
 
 
554
 
 
555
 
class BadTransportHandler(Transport):
 
616
        t = self.get_vfat_transport('.')
 
617
        self.assertRaises(ValueError, t.has, "<NU>")
 
618
 
 
619
 
 
620
class BadTransportHandler(transport.Transport):
556
621
    def __init__(self, base_url):
557
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
558
 
 
559
 
 
560
 
class BackupTransportHandler(Transport):
 
622
        raise errors.DependencyNotPresent('some_lib',
 
623
                                          'testing missing dependency')
 
624
 
 
625
 
 
626
class BackupTransportHandler(transport.Transport):
561
627
    """Test transport that works as a backup for the BadTransportHandler"""
562
628
    pass
563
629
 
564
630
 
565
 
class TestTransportImplementation(TestCaseInTempDir):
 
631
class TestTransportImplementation(tests.TestCaseInTempDir):
566
632
    """Implementation verification for transports.
567
633
 
568
634
    To verify a transport we need a server factory, which is a callable
587
653
    def setUp(self):
588
654
        super(TestTransportImplementation, self).setUp()
589
655
        self._server = self.transport_server()
590
 
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
 
656
        self.start_server(self._server)
592
657
 
593
658
    def get_transport(self, relpath=None):
594
659
        """Return a connected transport to the local directory.
598
663
        base_url = self._server.get_url()
599
664
        url = self._adjust_url(base_url, relpath)
600
665
        # try getting the transport via the regular interface:
601
 
        t = get_transport(url)
 
666
        t = transport.get_transport_from_url(url)
602
667
        # vila--20070607 if the following are commented out the test suite
603
668
        # still pass. Is this really still needed or was it a forgotten
604
669
        # temporary fix ?
609
674
        return t
610
675
 
611
676
 
612
 
class TestLocalTransports(TestCase):
 
677
class TestTransportFromPath(tests.TestCaseInTempDir):
 
678
 
 
679
    def test_with_path(self):
 
680
        t = transport.get_transport_from_path(self.test_dir)
 
681
        self.assertIsInstance(t, local.LocalTransport)
 
682
        self.assertEquals(t.base.rstrip("/"),
 
683
            urlutils.local_path_to_url(self.test_dir))
 
684
 
 
685
    def test_with_url(self):
 
686
        t = transport.get_transport_from_path("file:")
 
687
        self.assertIsInstance(t, local.LocalTransport)
 
688
        self.assertEquals(t.base.rstrip("/"),
 
689
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
690
 
 
691
 
 
692
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
693
 
 
694
    def test_with_path(self):
 
695
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
696
            self.test_dir)
 
697
 
 
698
    def test_with_url(self):
 
699
        url = urlutils.local_path_to_url(self.test_dir)
 
700
        t = transport.get_transport_from_url(url)
 
701
        self.assertIsInstance(t, local.LocalTransport)
 
702
        self.assertEquals(t.base.rstrip("/"), url)
 
703
 
 
704
    def test_with_url_and_segment_parameters(self):
 
705
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
706
        t = transport.get_transport_from_url(url)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEquals(t.base.rstrip("/"), url)
 
709
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
710
            f.write("data")
 
711
        self.assertTrue(t.has("afile"))
 
712
 
 
713
 
 
714
class TestLocalTransports(tests.TestCase):
613
715
 
614
716
    def test_get_transport_from_abspath(self):
615
717
        here = osutils.abspath('.')
616
 
        t = get_transport(here)
617
 
        self.assertIsInstance(t, LocalTransport)
 
718
        t = transport.get_transport(here)
 
719
        self.assertIsInstance(t, local.LocalTransport)
618
720
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
619
721
 
620
722
    def test_get_transport_from_relpath(self):
621
723
        here = osutils.abspath('.')
622
 
        t = get_transport('.')
623
 
        self.assertIsInstance(t, LocalTransport)
 
724
        t = transport.get_transport('.')
 
725
        self.assertIsInstance(t, local.LocalTransport)
624
726
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
625
727
 
626
728
    def test_get_transport_from_local_url(self):
627
729
        here = osutils.abspath('.')
628
730
        here_url = urlutils.local_path_to_url(here) + '/'
629
 
        t = get_transport(here_url)
630
 
        self.assertIsInstance(t, LocalTransport)
 
731
        t = transport.get_transport(here_url)
 
732
        self.assertIsInstance(t, local.LocalTransport)
631
733
        self.assertEquals(t.base, here_url)
632
734
 
633
735
    def test_local_abspath(self):
634
736
        here = osutils.abspath('.')
635
 
        t = get_transport(here)
 
737
        t = transport.get_transport(here)
636
738
        self.assertEquals(t.local_abspath(''), here)
637
739
 
638
740
 
639
 
class TestWin32LocalTransport(TestCase):
 
741
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
742
 
 
743
    def test_local_fdatasync_calls_fdatasync(self):
 
744
        """Check fdatasync on a stream tries to flush the data to the OS.
 
745
        
 
746
        We can't easily observe the external effect but we can at least see
 
747
        it's called.
 
748
        """
 
749
        sentinel = object()
 
750
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
751
        if fdatasync is sentinel:
 
752
            raise tests.TestNotApplicable('fdatasync not supported')
 
753
        t = self.get_transport('.')
 
754
        calls = self.recordCalls(os, 'fdatasync')
 
755
        w = t.open_write_stream('out')
 
756
        w.write('foo')
 
757
        w.fdatasync()
 
758
        with open('out', 'rb') as f:
 
759
            # Should have been flushed.
 
760
            self.assertEquals(f.read(), 'foo')
 
761
        self.assertEquals(len(calls), 1, calls)
 
762
 
 
763
    def test_missing_directory(self):
 
764
        t = self.get_transport('.')
 
765
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
766
 
 
767
 
 
768
class TestWin32LocalTransport(tests.TestCase):
640
769
 
641
770
    def test_unc_clone_to_root(self):
642
771
        # Win32 UNC path like \\HOST\path
643
772
        # clone to root should stop at least at \\HOST part
644
773
        # not on \\
645
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
774
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
646
775
        for i in xrange(4):
647
776
            t = t.clone('..')
648
777
        self.assertEquals(t.base, 'file://HOST/')
651
780
        self.assertEquals(t.base, 'file://HOST/')
652
781
 
653
782
 
654
 
class TestConnectedTransport(TestCase):
 
783
class TestConnectedTransport(tests.TestCase):
655
784
    """Tests for connected to remote server transports"""
656
785
 
657
786
    def test_parse_url(self):
658
 
        t = ConnectedTransport('http://simple.example.com/home/source')
659
 
        self.assertEquals(t._host, 'simple.example.com')
660
 
        self.assertEquals(t._port, None)
661
 
        self.assertEquals(t._path, '/home/source/')
662
 
        self.failUnless(t._user is None)
663
 
        self.failUnless(t._password is None)
 
787
        t = transport.ConnectedTransport(
 
788
            'http://simple.example.com/home/source')
 
789
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
790
        self.assertEquals(t._parsed_url.port, None)
 
791
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
792
        self.assertTrue(t._parsed_url.user is None)
 
793
        self.assertTrue(t._parsed_url.password is None)
664
794
 
665
795
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
666
796
 
667
797
    def test_parse_url_with_at_in_user(self):
668
798
        # Bug 228058
669
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
670
 
        self.assertEquals(t._user, 'user@host.com')
 
799
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
800
        self.assertEquals(t._parsed_url.user, 'user@host.com')
671
801
 
672
802
    def test_parse_quoted_url(self):
673
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
 
        self.assertEquals(t._host, 'exAmple.com')
675
 
        self.assertEquals(t._port, 2222)
676
 
        self.assertEquals(t._user, 'robey')
677
 
        self.assertEquals(t._password, 'h@t')
678
 
        self.assertEquals(t._path, '/path/')
 
803
        t = transport.ConnectedTransport(
 
804
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
805
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
806
        self.assertEquals(t._parsed_url.port, 2222)
 
807
        self.assertEquals(t._parsed_url.user, 'robey')
 
808
        self.assertEquals(t._parsed_url.password, 'h@t')
 
809
        self.assertEquals(t._parsed_url.path, '/path/')
679
810
 
680
811
        # Base should not keep track of the password
681
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
812
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
682
813
 
683
814
    def test_parse_invalid_url(self):
684
815
        self.assertRaises(errors.InvalidURL,
685
 
                          ConnectedTransport,
 
816
                          transport.ConnectedTransport,
686
817
                          'sftp://lily.org:~janneke/public/bzr/gub')
687
818
 
688
819
    def test_relpath(self):
689
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
820
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
690
821
 
691
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
822
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
823
            'sub')
692
824
        self.assertRaises(errors.PathNotChild, t.relpath,
693
825
                          'http://user@host.com/abs/path/sub')
694
826
        self.assertRaises(errors.PathNotChild, t.relpath,
698
830
        self.assertRaises(errors.PathNotChild, t.relpath,
699
831
                          'sftp://user@host.com:33/abs/path/sub')
700
832
        # Make sure it works when we don't supply a username
701
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
833
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
702
834
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
703
835
 
704
836
        # Make sure it works when parts of the path will be url encoded
705
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
837
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
706
838
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
707
839
 
708
840
    def test_connection_sharing_propagate_credentials(self):
709
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
710
 
        self.assertEquals('user', t._user)
711
 
        self.assertEquals('host.com', t._host)
 
841
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
842
        self.assertEquals('user', t._parsed_url.user)
 
843
        self.assertEquals('host.com', t._parsed_url.host)
712
844
        self.assertIs(None, t._get_connection())
713
 
        self.assertIs(None, t._password)
 
845
        self.assertIs(None, t._parsed_url.password)
714
846
        c = t.clone('subdir')
715
847
        self.assertIs(None, c._get_connection())
716
 
        self.assertIs(None, t._password)
 
848
        self.assertIs(None, t._parsed_url.password)
717
849
 
718
850
        # Simulate the user entering a password
719
851
        password = 'secret'
733
865
        self.assertIs(new_password, c._get_credentials())
734
866
 
735
867
 
736
 
class TestReusedTransports(TestCase):
 
868
class TestReusedTransports(tests.TestCase):
737
869
    """Tests for transport reuse"""
738
870
 
739
871
    def test_reuse_same_transport(self):
740
872
        possible_transports = []
741
 
        t1 = get_transport('http://foo/',
742
 
                           possible_transports=possible_transports)
 
873
        t1 = transport.get_transport_from_url('http://foo/',
 
874
                                     possible_transports=possible_transports)
743
875
        self.assertEqual([t1], possible_transports)
744
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
876
        t2 = transport.get_transport_from_url('http://foo/',
 
877
                                     possible_transports=[t1])
745
878
        self.assertIs(t1, t2)
746
879
 
747
880
        # Also check that final '/' are handled correctly
748
 
        t3 = get_transport('http://foo/path/')
749
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
881
        t3 = transport.get_transport_from_url('http://foo/path/')
 
882
        t4 = transport.get_transport_from_url('http://foo/path',
 
883
                                     possible_transports=[t3])
750
884
        self.assertIs(t3, t4)
751
885
 
752
 
        t5 = get_transport('http://foo/path')
753
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
886
        t5 = transport.get_transport_from_url('http://foo/path')
 
887
        t6 = transport.get_transport_from_url('http://foo/path/',
 
888
                                     possible_transports=[t5])
754
889
        self.assertIs(t5, t6)
755
890
 
756
891
    def test_don_t_reuse_different_transport(self):
757
 
        t1 = get_transport('http://foo/path')
758
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
892
        t1 = transport.get_transport_from_url('http://foo/path')
 
893
        t2 = transport.get_transport_from_url('http://bar/path',
 
894
                                     possible_transports=[t1])
759
895
        self.assertIsNot(t1, t2)
760
896
 
761
897
 
762
 
class TestTransportTrace(TestCase):
 
898
class TestTransportTrace(tests.TestCase):
763
899
 
764
 
    def test_get(self):
765
 
        transport = get_transport('trace+memory://')
 
900
    def test_decorator(self):
 
901
        t = transport.get_transport_from_url('trace+memory://')
766
902
        self.assertIsInstance(
767
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
903
            t, bzrlib.transport.trace.TransportTraceDecorator)
768
904
 
769
905
    def test_clone_preserves_activity(self):
770
 
        transport = get_transport('trace+memory://')
771
 
        transport2 = transport.clone('.')
772
 
        self.assertTrue(transport is not transport2)
773
 
        self.assertTrue(transport._activity is transport2._activity)
 
906
        t = transport.get_transport_from_url('trace+memory://')
 
907
        t2 = t.clone('.')
 
908
        self.assertTrue(t is not t2)
 
909
        self.assertTrue(t._activity is t2._activity)
774
910
 
775
911
    # the following specific tests are for the operations that have made use of
776
912
    # logging in tests; we could test every single operation but doing that
777
913
    # still won't cause a test failure when the top level Transport API
778
914
    # changes; so there is little return doing that.
779
915
    def test_get(self):
780
 
        transport = get_transport('trace+memory:///')
781
 
        transport.put_bytes('foo', 'barish')
782
 
        transport.get('foo')
 
916
        t = transport.get_transport_from_url('trace+memory:///')
 
917
        t.put_bytes('foo', 'barish')
 
918
        t.get('foo')
783
919
        expected_result = []
784
920
        # put_bytes records the bytes, not the content to avoid memory
785
921
        # pressure.
786
922
        expected_result.append(('put_bytes', 'foo', 6, None))
787
923
        # get records the file name only.
788
924
        expected_result.append(('get', 'foo'))
789
 
        self.assertEqual(expected_result, transport._activity)
 
925
        self.assertEqual(expected_result, t._activity)
790
926
 
791
927
    def test_readv(self):
792
 
        transport = get_transport('trace+memory:///')
793
 
        transport.put_bytes('foo', 'barish')
794
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
795
 
            upper_limit=6))
 
928
        t = transport.get_transport_from_url('trace+memory:///')
 
929
        t.put_bytes('foo', 'barish')
 
930
        list(t.readv('foo', [(0, 1), (3, 2)],
 
931
                     adjust_for_latency=True, upper_limit=6))
796
932
        expected_result = []
797
933
        # put_bytes records the bytes, not the content to avoid memory
798
934
        # pressure.
799
935
        expected_result.append(('put_bytes', 'foo', 6, None))
800
936
        # readv records the supplied offset request
801
937
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
 
        self.assertEqual(expected_result, transport._activity)
 
938
        self.assertEqual(expected_result, t._activity)
 
939
 
 
940
 
 
941
class TestSSHConnections(tests.TestCaseWithTransport):
 
942
 
 
943
    def test_bzr_connect_to_bzr_ssh(self):
 
944
        """get_transport of a bzr+ssh:// behaves correctly.
 
945
 
 
946
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
947
        """
 
948
        # This test actually causes a bzr instance to be invoked, which is very
 
949
        # expensive: it should be the only such test in the test suite.
 
950
        # A reasonable evolution for this would be to simply check inside
 
951
        # check_channel_exec_request that the command is appropriate, and then
 
952
        # satisfy requests in-process.
 
953
        self.requireFeature(features.paramiko)
 
954
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
955
        # override the interface (doesn't change self._vendor).
 
956
        # Note that this does encryption, so can be slow.
 
957
        from bzrlib.tests import stub_sftp
 
958
 
 
959
        # Start an SSH server
 
960
        self.command_executed = []
 
961
        # XXX: This is horrible -- we define a really dumb SSH server that
 
962
        # executes commands, and manage the hooking up of stdin/out/err to the
 
963
        # SSH channel ourselves.  Surely this has already been implemented
 
964
        # elsewhere?
 
965
        started = []
 
966
 
 
967
        class StubSSHServer(stub_sftp.StubServer):
 
968
 
 
969
            test = self
 
970
 
 
971
            def check_channel_exec_request(self, channel, command):
 
972
                self.test.command_executed.append(command)
 
973
                proc = subprocess.Popen(
 
974
                    command, shell=True, stdin=subprocess.PIPE,
 
975
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
976
 
 
977
                # XXX: horribly inefficient, not to mention ugly.
 
978
                # Start a thread for each of stdin/out/err, and relay bytes
 
979
                # from the subprocess to channel and vice versa.
 
980
                def ferry_bytes(read, write, close):
 
981
                    while True:
 
982
                        bytes = read(1)
 
983
                        if bytes == '':
 
984
                            close()
 
985
                            break
 
986
                        write(bytes)
 
987
 
 
988
                file_functions = [
 
989
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
990
                    (proc.stdout.read, channel.sendall, channel.close),
 
991
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
992
                started.append(proc)
 
993
                for read, write, close in file_functions:
 
994
                    t = threading.Thread(
 
995
                        target=ferry_bytes, args=(read, write, close))
 
996
                    t.start()
 
997
                    started.append(t)
 
998
 
 
999
                return True
 
1000
 
 
1001
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1002
        # We *don't* want to override the default SSH vendor: the detected one
 
1003
        # is the one to use.
 
1004
 
 
1005
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1006
        # inherits from SFTPServer which forces the SSH vendor to
 
1007
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1008
        self.start_server(ssh_server)
 
1009
        port = ssh_server.port
 
1010
 
 
1011
        if sys.platform == 'win32':
 
1012
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1013
        else:
 
1014
            bzr_remote_path = self.get_bzr_path()
 
1015
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1016
 
 
1017
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1018
        # variable is used to tell bzr what command to run on the remote end.
 
1019
        path_to_branch = osutils.abspath('.')
 
1020
        if sys.platform == 'win32':
 
1021
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1022
            # prefix a '/' to get the right path.
 
1023
            path_to_branch = '/' + path_to_branch
 
1024
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1025
        t = transport.get_transport(url)
 
1026
        self.permit_url(t.base)
 
1027
        t.mkdir('foo')
 
1028
 
 
1029
        self.assertEqual(
 
1030
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1031
            self.command_executed)
 
1032
        # Make sure to disconnect, so that the remote process can stop, and we
 
1033
        # can cleanup. Then pause the test until everything is shutdown
 
1034
        t._client._medium.disconnect()
 
1035
        if not started:
 
1036
            return
 
1037
        # First wait for the subprocess
 
1038
        started[0].wait()
 
1039
        # And the rest are threads
 
1040
        for t in started[1:]:
 
1041
            t.join()
 
1042
 
 
1043
 
 
1044
class TestUnhtml(tests.TestCase):
 
1045
 
 
1046
    """Tests for unhtml_roughly"""
 
1047
 
 
1048
    def test_truncation(self):
 
1049
        fake_html = "<p>something!\n" * 1000
 
1050
        result = http.unhtml_roughly(fake_html)
 
1051
        self.assertEquals(len(result), 1000)
 
1052
        self.assertStartsWith(result, " something!")
 
1053
 
 
1054
 
 
1055
class SomeDirectory(object):
 
1056
 
 
1057
    def look_up(self, name, url):
 
1058
        return "http://bar"
 
1059
 
 
1060
 
 
1061
class TestLocationToUrl(tests.TestCase):
 
1062
 
 
1063
    def get_base_location(self):
 
1064
        path = osutils.abspath('/foo/bar')
 
1065
        if path.startswith('/'):
 
1066
            url = 'file://%s' % (path,)
 
1067
        else:
 
1068
            # On Windows, abspaths start with the drive letter, so we have to
 
1069
            # add in the extra '/'
 
1070
            url = 'file:///%s' % (path,)
 
1071
        return path, url
 
1072
 
 
1073
    def test_regular_url(self):
 
1074
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1075
 
 
1076
    def test_directory(self):
 
1077
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1078
        self.addCleanup(directories.remove, "bar:")
 
1079
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1080
 
 
1081
    def test_unicode_url(self):
 
1082
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1083
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1084
 
 
1085
    def test_unicode_path(self):
 
1086
        path, url = self.get_base_location()
 
1087
        location = path + "\xc3\xaf".decode("utf-8")
 
1088
        url += '%C3%AF'
 
1089
        self.assertEquals(url, location_to_url(location))
 
1090
 
 
1091
    def test_path(self):
 
1092
        path, url = self.get_base_location()
 
1093
        self.assertEquals(url, location_to_url(path))
 
1094
 
 
1095
    def test_relative_file_url(self):
 
1096
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1097
            location_to_url("file:bar"))
 
1098
 
 
1099
    def test_absolute_file_url(self):
 
1100
        self.assertEquals("file:///bar", location_to_url("file:/bar"))