~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Patch Queue Manager
  • Date: 2016-04-21 04:10:52 UTC
  • mfrom: (6616.1.1 fix-en-user-guide)
  • Revision ID: pqm@pqm.ubuntu.com-20160421041052-clcye7ns1qcl2n7w
(richard-wilbur) Ensure build of English use guide always uses English text
 even when user's locale specifies a different language. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import errno
 
20
import os
 
21
import subprocess
 
22
import sys
 
23
import threading
19
24
 
20
 
import bzrlib
21
25
from bzrlib import (
22
26
    errors,
23
27
    osutils,
 
28
    tests,
 
29
    transport,
24
30
    urlutils,
25
31
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
32
from bzrlib.directory_service import directories
 
33
from bzrlib.transport import (
 
34
    chroot,
 
35
    fakenfs,
 
36
    http,
 
37
    local,
 
38
    location_to_url,
 
39
    memory,
 
40
    pathfilter,
 
41
    readonly,
 
42
    )
 
43
import bzrlib.transport.trace
 
44
from bzrlib.tests import (
 
45
    features,
 
46
    test_server,
 
47
    )
51
48
 
52
49
 
53
50
# TODO: Should possibly split transport-specific tests into their own files.
54
51
 
55
52
 
56
 
class TestTransport(TestCase):
 
53
class TestTransport(tests.TestCase):
57
54
    """Test the non transport-concrete class functionality."""
58
55
 
59
56
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
62
 
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
65
 
        finally:
66
 
            _set_protocol_handlers(handlers)
 
57
        handlers = transport._get_protocol_handlers()
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
67
62
 
68
63
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
70
66
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
67
        transport._clear_protocol_handlers()
 
68
 
72
69
        class SampleHandler(object):
73
70
            """I exist, isnt that enough?"""
74
 
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
85
 
        finally:
86
 
            _set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
87
84
 
88
85
    def test_transport_dependency(self):
89
86
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
89
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
93
94
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEqual('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
 
104
 
111
105
    def test_transport_fallback(self):
112
106
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
 
107
        saved_handlers = transport._get_protocol_handlers()
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
117
 
 
118
    def test_ssh_hints(self):
 
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
114
120
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
 
122
        except errors.UnsupportedProtocol, e:
 
123
            e_str = str(e)
 
124
            self.assertEqual('Unsupported protocol'
 
125
                              ' for url "ssh://fooserver/foo":'
 
126
                              ' bzr supports bzr+ssh to operate over ssh,'
 
127
                              ' use "bzr+ssh://fooserver/foo".',
 
128
                              str(e))
 
129
        else:
 
130
            self.fail('Did not raise UnsupportedProtocol')
125
131
 
126
132
    def test_LateReadError(self):
127
133
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
 
134
        a_file = transport.LateReadError('a path')
129
135
        try:
130
136
            a_file.read()
131
 
        except ReadError, error:
 
137
        except errors.ReadError, error:
132
138
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
 
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
134
140
        a_file.close()
135
141
 
136
 
    def test__combine_paths(self):
137
 
        t = Transport('/')
138
 
        self.assertEqual('/home/sarah/project/foo',
139
 
                         t._combine_paths('/home/sarah', 'project/foo'))
140
 
        self.assertEqual('/etc',
141
 
                         t._combine_paths('/home/sarah', '../../etc'))
142
 
        self.assertEqual('/etc',
143
 
                         t._combine_paths('/home/sarah', '../../../etc'))
144
 
        self.assertEqual('/etc',
145
 
                         t._combine_paths('/home/sarah', '/etc'))
146
 
 
147
142
    def test_local_abspath_non_local_transport(self):
148
143
        # the base implementation should throw
149
 
        t = MemoryTransport()
 
144
        t = memory.MemoryTransport()
150
145
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
146
        self.assertEqual('memory:///t is not a local path.', str(e))
152
147
 
153
148
 
154
 
class TestCoalesceOffsets(TestCase):
 
149
class TestCoalesceOffsets(tests.TestCase):
155
150
 
156
151
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
157
 
        coalesce = Transport._coalesce_offsets
158
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
152
        coalesce = transport.Transport._coalesce_offsets
 
153
        exp = [transport._CoalescedOffset(*x) for x in expected]
159
154
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
160
155
                            max_size=max_size))
161
156
        self.assertEqual(exp, out)
180
175
        self.check([(0, 20, [(0, 10), (10, 10)])],
181
176
                   [(0, 10), (10, 10)])
182
177
 
183
 
    # XXX: scary, http.readv() can't handle that --vila20071209
184
178
    def test_coalesce_overlapped(self):
185
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
186
 
                   [(0, 10), (5, 10)])
 
179
        self.assertRaises(ValueError,
 
180
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                        [(0, 10), (5, 10)])
187
182
 
188
183
    def test_coalesce_limit(self):
189
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
206
201
 
207
202
    def test_coalesce_fudge(self):
208
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
209
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
210
205
                   ], [(10, 10), (30, 10), (100, 10)],
211
 
                   fudge=10
212
 
                  )
 
206
                   fudge=10)
 
207
 
213
208
    def test_coalesce_max_size(self):
214
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
215
210
                    (30, 50, [(0, 50)]),
216
211
                    # If one range is above max_size, it gets its own coalesced
217
212
                    # offset
218
 
                    (100, 80, [(0, 80),]),],
 
213
                    (100, 80, [(0, 80)]),],
219
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
220
 
                   max_size=50
221
 
                  )
 
215
                   max_size=50)
222
216
 
223
217
    def test_coalesce_no_max_size(self):
224
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
225
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
226
220
                  )
227
221
 
228
 
 
229
 
class TestMemoryTransport(TestCase):
 
222
    def test_coalesce_default_limit(self):
 
223
        # By default we use a 100MB max size.
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
230
                   max_size=1*1024*1024*1024)
 
231
 
 
232
 
 
233
class TestMemoryServer(tests.TestCase):
 
234
 
 
235
    def test_create_server(self):
 
236
        server = memory.MemoryServer()
 
237
        server.start_server()
 
238
        url = server.get_url()
 
239
        self.assertTrue(url in transport.transport_list_registry)
 
240
        t = transport.get_transport_from_url(url)
 
241
        del t
 
242
        server.stop_server()
 
243
        self.assertFalse(url in transport.transport_list_registry)
 
244
        self.assertRaises(errors.UnsupportedProtocol,
 
245
                          transport.get_transport, url)
 
246
 
 
247
 
 
248
class TestMemoryTransport(tests.TestCase):
230
249
 
231
250
    def test_get_transport(self):
232
 
        MemoryTransport()
 
251
        memory.MemoryTransport()
233
252
 
234
253
    def test_clone(self):
235
 
        transport = MemoryTransport()
236
 
        self.assertTrue(isinstance(transport, MemoryTransport))
237
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
254
        t = memory.MemoryTransport()
 
255
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
256
        self.assertEqual("memory:///", t.clone("/").base)
238
257
 
239
258
    def test_abspath(self):
240
 
        transport = MemoryTransport()
241
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
259
        t = memory.MemoryTransport()
 
260
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
242
261
 
243
262
    def test_abspath_of_root(self):
244
 
        transport = MemoryTransport()
245
 
        self.assertEqual("memory:///", transport.base)
246
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
263
        t = memory.MemoryTransport()
 
264
        self.assertEqual("memory:///", t.base)
 
265
        self.assertEqual("memory:///", t.abspath('/'))
247
266
 
248
267
    def test_abspath_of_relpath_starting_at_root(self):
249
 
        transport = MemoryTransport()
250
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
268
        t = memory.MemoryTransport()
 
269
        self.assertEqual("memory:///foo", t.abspath('/foo'))
251
270
 
252
271
    def test_append_and_get(self):
253
 
        transport = MemoryTransport()
254
 
        transport.append_bytes('path', 'content')
255
 
        self.assertEqual(transport.get('path').read(), 'content')
256
 
        transport.append_file('path', StringIO('content'))
257
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
272
        t = memory.MemoryTransport()
 
273
        t.append_bytes('path', 'content')
 
274
        self.assertEqual(t.get('path').read(), 'content')
 
275
        t.append_file('path', StringIO('content'))
 
276
        self.assertEqual(t.get('path').read(), 'contentcontent')
258
277
 
259
278
    def test_put_and_get(self):
260
 
        transport = MemoryTransport()
261
 
        transport.put_file('path', StringIO('content'))
262
 
        self.assertEqual(transport.get('path').read(), 'content')
263
 
        transport.put_bytes('path', 'content')
264
 
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        t = memory.MemoryTransport()
 
280
        t.put_file('path', StringIO('content'))
 
281
        self.assertEqual(t.get('path').read(), 'content')
 
282
        t.put_bytes('path', 'content')
 
283
        self.assertEqual(t.get('path').read(), 'content')
265
284
 
266
285
    def test_append_without_dir_fails(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertRaises(NoSuchFile,
269
 
                          transport.append_bytes, 'dir/path', 'content')
 
286
        t = memory.MemoryTransport()
 
287
        self.assertRaises(errors.NoSuchFile,
 
288
                          t.append_bytes, 'dir/path', 'content')
270
289
 
271
290
    def test_put_without_dir_fails(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertRaises(NoSuchFile,
274
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
291
        t = memory.MemoryTransport()
 
292
        self.assertRaises(errors.NoSuchFile,
 
293
                          t.put_file, 'dir/path', StringIO('content'))
275
294
 
276
295
    def test_get_missing(self):
277
 
        transport = MemoryTransport()
278
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
296
        transport = memory.MemoryTransport()
 
297
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
279
298
 
280
299
    def test_has_missing(self):
281
 
        transport = MemoryTransport()
282
 
        self.assertEquals(False, transport.has('foo'))
 
300
        t = memory.MemoryTransport()
 
301
        self.assertEqual(False, t.has('foo'))
283
302
 
284
303
    def test_has_present(self):
285
 
        transport = MemoryTransport()
286
 
        transport.append_bytes('foo', 'content')
287
 
        self.assertEquals(True, transport.has('foo'))
 
304
        t = memory.MemoryTransport()
 
305
        t.append_bytes('foo', 'content')
 
306
        self.assertEqual(True, t.has('foo'))
288
307
 
289
308
    def test_list_dir(self):
290
 
        transport = MemoryTransport()
291
 
        transport.put_bytes('foo', 'content')
292
 
        transport.mkdir('dir')
293
 
        transport.put_bytes('dir/subfoo', 'content')
294
 
        transport.put_bytes('dirlike', 'content')
 
309
        t = memory.MemoryTransport()
 
310
        t.put_bytes('foo', 'content')
 
311
        t.mkdir('dir')
 
312
        t.put_bytes('dir/subfoo', 'content')
 
313
        t.put_bytes('dirlike', 'content')
295
314
 
296
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
297
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
315
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
316
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
298
317
 
299
318
    def test_mkdir(self):
300
 
        transport = MemoryTransport()
301
 
        transport.mkdir('dir')
302
 
        transport.append_bytes('dir/path', 'content')
303
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
319
        t = memory.MemoryTransport()
 
320
        t.mkdir('dir')
 
321
        t.append_bytes('dir/path', 'content')
 
322
        self.assertEqual(t.get('dir/path').read(), 'content')
304
323
 
305
324
    def test_mkdir_missing_parent(self):
306
 
        transport = MemoryTransport()
307
 
        self.assertRaises(NoSuchFile,
308
 
                          transport.mkdir, 'dir/dir')
 
325
        t = memory.MemoryTransport()
 
326
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
309
327
 
310
328
    def test_mkdir_twice(self):
311
 
        transport = MemoryTransport()
312
 
        transport.mkdir('dir')
313
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
329
        t = memory.MemoryTransport()
 
330
        t.mkdir('dir')
 
331
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
314
332
 
315
333
    def test_parameters(self):
316
 
        transport = MemoryTransport()
317
 
        self.assertEqual(True, transport.listable())
318
 
        self.assertEqual(False, transport.is_readonly())
 
334
        t = memory.MemoryTransport()
 
335
        self.assertEqual(True, t.listable())
 
336
        self.assertEqual(False, t.is_readonly())
319
337
 
320
338
    def test_iter_files_recursive(self):
321
 
        transport = MemoryTransport()
322
 
        transport.mkdir('dir')
323
 
        transport.put_bytes('dir/foo', 'content')
324
 
        transport.put_bytes('dir/bar', 'content')
325
 
        transport.put_bytes('bar', 'content')
326
 
        paths = set(transport.iter_files_recursive())
 
339
        t = memory.MemoryTransport()
 
340
        t.mkdir('dir')
 
341
        t.put_bytes('dir/foo', 'content')
 
342
        t.put_bytes('dir/bar', 'content')
 
343
        t.put_bytes('bar', 'content')
 
344
        paths = set(t.iter_files_recursive())
327
345
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
328
346
 
329
347
    def test_stat(self):
330
 
        transport = MemoryTransport()
331
 
        transport.put_bytes('foo', 'content')
332
 
        transport.put_bytes('bar', 'phowar')
333
 
        self.assertEqual(7, transport.stat('foo').st_size)
334
 
        self.assertEqual(6, transport.stat('bar').st_size)
335
 
 
336
 
 
337
 
class ChrootDecoratorTransportTest(TestCase):
 
348
        t = memory.MemoryTransport()
 
349
        t.put_bytes('foo', 'content')
 
350
        t.put_bytes('bar', 'phowar')
 
351
        self.assertEqual(7, t.stat('foo').st_size)
 
352
        self.assertEqual(6, t.stat('bar').st_size)
 
353
 
 
354
 
 
355
class ChrootDecoratorTransportTest(tests.TestCase):
338
356
    """Chroot decoration specific tests."""
339
357
 
340
358
    def test_abspath(self):
341
359
        # The abspath is always relative to the chroot_url.
342
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
343
 
        server.setUp()
344
 
        transport = get_transport(server.get_url())
345
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
360
        server = chroot.ChrootServer(
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
 
362
        self.start_server(server)
 
363
        t = transport.get_transport_from_url(server.get_url())
 
364
        self.assertEqual(server.get_url(), t.abspath('/'))
346
365
 
347
 
        subdir_transport = transport.clone('subdir')
348
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
349
 
        server.tearDown()
 
366
        subdir_t = t.clone('subdir')
 
367
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
350
368
 
351
369
    def test_clone(self):
352
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
353
 
        server.setUp()
354
 
        transport = get_transport(server.get_url())
 
370
        server = chroot.ChrootServer(
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
 
372
        self.start_server(server)
 
373
        t = transport.get_transport_from_url(server.get_url())
355
374
        # relpath from root and root path are the same
356
 
        relpath_cloned = transport.clone('foo')
357
 
        abspath_cloned = transport.clone('/foo')
 
375
        relpath_cloned = t.clone('foo')
 
376
        abspath_cloned = t.clone('/foo')
358
377
        self.assertEqual(server, relpath_cloned.server)
359
378
        self.assertEqual(server, abspath_cloned.server)
360
 
        server.tearDown()
361
 
    
 
379
 
362
380
    def test_chroot_url_preserves_chroot(self):
363
381
        """Calling get_transport on a chroot transport's base should produce a
364
382
        transport with exactly the same behaviour as the original chroot
367
385
        This is so that it is not possible to escape a chroot by doing::
368
386
            url = chroot_transport.base
369
387
            parent_url = urlutils.join(url, '..')
370
 
            new_transport = get_transport(parent_url)
 
388
            new_t = transport.get_transport_from_url(parent_url)
371
389
        """
372
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
373
 
        server.setUp()
374
 
        transport = get_transport(server.get_url())
375
 
        new_transport = get_transport(transport.base)
376
 
        self.assertEqual(transport.server, new_transport.server)
377
 
        self.assertEqual(transport.base, new_transport.base)
378
 
        server.tearDown()
379
 
        
 
390
        server = chroot.ChrootServer(
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
 
392
        self.start_server(server)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
 
395
        self.assertEqual(t.server, new_t.server)
 
396
        self.assertEqual(t.base, new_t.base)
 
397
 
380
398
    def test_urljoin_preserves_chroot(self):
381
399
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
382
400
        URL that escapes the intended chroot.
384
402
        This is so that it is not possible to escape a chroot by doing::
385
403
            url = chroot_transport.base
386
404
            parent_url = urlutils.join(url, '..')
387
 
            new_transport = get_transport(parent_url)
 
405
            new_t = transport.get_transport_from_url(parent_url)
388
406
        """
389
 
        server = ChrootServer(get_transport('memory:///path/'))
390
 
        server.setUp()
391
 
        transport = get_transport(server.get_url())
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
 
409
        self.start_server(server)
 
410
        t = transport.get_transport_from_url(server.get_url())
392
411
        self.assertRaises(
393
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
394
 
        server.tearDown()
395
 
 
396
 
 
397
 
class ChrootServerTest(TestCase):
 
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
413
 
 
414
 
 
415
class TestChrootServer(tests.TestCase):
398
416
 
399
417
    def test_construct(self):
400
 
        backing_transport = MemoryTransport()
401
 
        server = ChrootServer(backing_transport)
 
418
        backing_transport = memory.MemoryTransport()
 
419
        server = chroot.ChrootServer(backing_transport)
402
420
        self.assertEqual(backing_transport, server.backing_transport)
403
421
 
404
422
    def test_setUp(self):
405
 
        backing_transport = MemoryTransport()
406
 
        server = ChrootServer(backing_transport)
407
 
        server.setUp()
408
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
423
        backing_transport = memory.MemoryTransport()
 
424
        server = chroot.ChrootServer(backing_transport)
 
425
        server.start_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
409
429
 
410
 
    def test_tearDown(self):
411
 
        backing_transport = MemoryTransport()
412
 
        server = ChrootServer(backing_transport)
413
 
        server.setUp()
414
 
        server.tearDown()
415
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
430
    def test_stop_server(self):
 
431
        backing_transport = memory.MemoryTransport()
 
432
        server = chroot.ChrootServer(backing_transport)
 
433
        server.start_server()
 
434
        server.stop_server()
 
435
        self.assertFalse(server.scheme
 
436
                         in transport._get_protocol_handlers().keys())
416
437
 
417
438
    def test_get_url(self):
418
 
        backing_transport = MemoryTransport()
419
 
        server = ChrootServer(backing_transport)
420
 
        server.setUp()
 
439
        backing_transport = memory.MemoryTransport()
 
440
        server = chroot.ChrootServer(backing_transport)
 
441
        server.start_server()
 
442
        self.addCleanup(server.stop_server)
421
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
422
 
        server.tearDown()
423
 
 
424
 
 
425
 
class ReadonlyDecoratorTransportTest(TestCase):
 
444
 
 
445
 
 
446
class TestHooks(tests.TestCase):
 
447
    """Basic tests for transport hooks"""
 
448
 
 
449
    def _get_connected_transport(self):
 
450
        return transport.ConnectedTransport("bogus:nowhere")
 
451
 
 
452
    def test_transporthooks_initialisation(self):
 
453
        """Check all expected transport hook points are set up"""
 
454
        hookpoint = transport.TransportHooks()
 
455
        self.assertTrue("post_connect" in hookpoint,
 
456
            "post_connect not in %s" % (hookpoint,))
 
457
 
 
458
    def test_post_connect(self):
 
459
        """Ensure the post_connect hook is called when _set_transport is"""
 
460
        calls = []
 
461
        transport.Transport.hooks.install_named_hook("post_connect",
 
462
            calls.append, None)
 
463
        t = self._get_connected_transport()
 
464
        self.assertLength(0, calls)
 
465
        t._set_connection("connection", "auth")
 
466
        self.assertEqual(calls, [t])
 
467
 
 
468
 
 
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
470
    """Pathfilter decoration specific tests."""
 
471
 
 
472
    def test_abspath(self):
 
473
        # The abspath is always relative to the base of the backing transport.
 
474
        server = pathfilter.PathFilteringServer(
 
475
            transport.get_transport_from_url('memory:///foo/bar/'),
 
476
            lambda x: x)
 
477
        server.start_server()
 
478
        t = transport.get_transport_from_url(server.get_url())
 
479
        self.assertEqual(server.get_url(), t.abspath('/'))
 
480
 
 
481
        subdir_t = t.clone('subdir')
 
482
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
483
        server.stop_server()
 
484
 
 
485
    def make_pf_transport(self, filter_func=None):
 
486
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
487
 
 
488
        :param filter_func: by default this will be a no-op function.  Use this
 
489
            parameter to override it."""
 
490
        if filter_func is None:
 
491
            filter_func = lambda x: x
 
492
        server = pathfilter.PathFilteringServer(
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
494
        server.start_server()
 
495
        self.addCleanup(server.stop_server)
 
496
        return transport.get_transport_from_url(server.get_url())
 
497
 
 
498
    def test__filter(self):
 
499
        # _filter (with an identity func as filter_func) always returns
 
500
        # paths relative to the base of the backing transport.
 
501
        t = self.make_pf_transport()
 
502
        self.assertEqual('foo', t._filter('foo'))
 
503
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
504
        self.assertEqual('', t._filter('..'))
 
505
        self.assertEqual('', t._filter('/'))
 
506
        # The base of the pathfiltering transport is taken into account too.
 
507
        t = t.clone('subdir1/subdir2')
 
508
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
509
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
510
        self.assertEqual('subdir1', t._filter('..'))
 
511
        self.assertEqual('', t._filter('/'))
 
512
 
 
513
    def test_filter_invocation(self):
 
514
        filter_log = []
 
515
 
 
516
        def filter(path):
 
517
            filter_log.append(path)
 
518
            return path
 
519
        t = self.make_pf_transport(filter)
 
520
        t.has('abc')
 
521
        self.assertEqual(['abc'], filter_log)
 
522
        del filter_log[:]
 
523
        t.clone('abc').has('xyz')
 
524
        self.assertEqual(['abc/xyz'], filter_log)
 
525
        del filter_log[:]
 
526
        t.has('/abc')
 
527
        self.assertEqual(['abc'], filter_log)
 
528
 
 
529
    def test_clone(self):
 
530
        t = self.make_pf_transport()
 
531
        # relpath from root and root path are the same
 
532
        relpath_cloned = t.clone('foo')
 
533
        abspath_cloned = t.clone('/foo')
 
534
        self.assertEqual(t.server, relpath_cloned.server)
 
535
        self.assertEqual(t.server, abspath_cloned.server)
 
536
 
 
537
    def test_url_preserves_pathfiltering(self):
 
538
        """Calling get_transport on a pathfiltered transport's base should
 
539
        produce a transport with exactly the same behaviour as the original
 
540
        pathfiltered transport.
 
541
 
 
542
        This is so that it is not possible to escape (accidentally or
 
543
        otherwise) the filtering by doing::
 
544
            url = filtered_transport.base
 
545
            parent_url = urlutils.join(url, '..')
 
546
            new_t = transport.get_transport_from_url(parent_url)
 
547
        """
 
548
        t = self.make_pf_transport()
 
549
        new_t = transport.get_transport_from_url(t.base)
 
550
        self.assertEqual(t.server, new_t.server)
 
551
        self.assertEqual(t.base, new_t.base)
 
552
 
 
553
 
 
554
class ReadonlyDecoratorTransportTest(tests.TestCase):
426
555
    """Readonly decoration specific tests."""
427
556
 
428
557
    def test_local_parameters(self):
429
 
        import bzrlib.transport.readonly as readonly
430
558
        # connect to . in readonly mode
431
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
 
        self.assertEqual(True, transport.listable())
433
 
        self.assertEqual(True, transport.is_readonly())
 
559
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
560
        self.assertEqual(True, t.listable())
 
561
        self.assertEqual(True, t.is_readonly())
434
562
 
435
563
    def test_http_parameters(self):
436
 
        from bzrlib.tests.HttpServer import HttpServer
437
 
        import bzrlib.transport.readonly as readonly
438
 
        # connect to . via http which is not listable
 
564
        from bzrlib.tests.http_server import HttpServer
 
565
        # connect to '.' via http which is not listable
439
566
        server = HttpServer()
440
 
        server.setUp()
441
 
        try:
442
 
            transport = get_transport('readonly+' + server.get_url())
443
 
            self.failUnless(isinstance(transport,
444
 
                                       readonly.ReadonlyTransportDecorator))
445
 
            self.assertEqual(False, transport.listable())
446
 
            self.assertEqual(True, transport.is_readonly())
447
 
        finally:
448
 
            server.tearDown()
449
 
 
450
 
 
451
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
567
        self.start_server(server)
 
568
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
569
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
570
        self.assertEqual(False, t.listable())
 
571
        self.assertEqual(True, t.is_readonly())
 
572
 
 
573
 
 
574
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
452
575
    """NFS decorator specific tests."""
453
576
 
454
577
    def get_nfs_transport(self, url):
455
 
        import bzrlib.transport.fakenfs as fakenfs
456
578
        # connect to url with nfs decoration
457
579
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
458
580
 
459
581
    def test_local_parameters(self):
460
582
        # the listable and is_readonly parameters
461
583
        # are not changed by the fakenfs decorator
462
 
        transport = self.get_nfs_transport('.')
463
 
        self.assertEqual(True, transport.listable())
464
 
        self.assertEqual(False, transport.is_readonly())
 
584
        t = self.get_nfs_transport('.')
 
585
        self.assertEqual(True, t.listable())
 
586
        self.assertEqual(False, t.is_readonly())
465
587
 
466
588
    def test_http_parameters(self):
467
589
        # the listable and is_readonly parameters
468
590
        # are not changed by the fakenfs decorator
469
 
        from bzrlib.tests.HttpServer import HttpServer
470
 
        # connect to . via http which is not listable
 
591
        from bzrlib.tests.http_server import HttpServer
 
592
        # connect to '.' via http which is not listable
471
593
        server = HttpServer()
472
 
        server.setUp()
473
 
        try:
474
 
            transport = self.get_nfs_transport(server.get_url())
475
 
            self.assertIsInstance(
476
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
 
            self.assertEqual(False, transport.listable())
478
 
            self.assertEqual(True, transport.is_readonly())
479
 
        finally:
480
 
            server.tearDown()
 
594
        self.start_server(server)
 
595
        t = self.get_nfs_transport(server.get_url())
 
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
597
        self.assertEqual(False, t.listable())
 
598
        self.assertEqual(True, t.is_readonly())
481
599
 
482
600
    def test_fakenfs_server_default(self):
483
601
        # a FakeNFSServer() should bring up a local relpath server for itself
484
 
        import bzrlib.transport.fakenfs as fakenfs
485
 
        server = fakenfs.FakeNFSServer()
486
 
        server.setUp()
487
 
        try:
488
 
            # the url should be decorated appropriately
489
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
490
 
            # and we should be able to get a transport for it
491
 
            transport = get_transport(server.get_url())
492
 
            # which must be a FakeNFSTransportDecorator instance.
493
 
            self.assertIsInstance(
494
 
                transport, fakenfs.FakeNFSTransportDecorator)
495
 
        finally:
496
 
            server.tearDown()
 
602
        server = test_server.FakeNFSServer()
 
603
        self.start_server(server)
 
604
        # the url should be decorated appropriately
 
605
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
606
        # and we should be able to get a transport for it
 
607
        t = transport.get_transport_from_url(server.get_url())
 
608
        # which must be a FakeNFSTransportDecorator instance.
 
609
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
497
610
 
498
611
    def test_fakenfs_rename_semantics(self):
499
612
        # a FakeNFS transport must mangle the way rename errors occur to
500
613
        # look like NFS problems.
501
 
        transport = self.get_nfs_transport('.')
 
614
        t = self.get_nfs_transport('.')
502
615
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
503
 
                        transport=transport)
504
 
        self.assertRaises(errors.ResourceBusy,
505
 
                          transport.rename, 'from', 'to')
506
 
 
507
 
 
508
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
616
                        transport=t)
 
617
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
618
 
 
619
 
 
620
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
509
621
    """Tests for simulation of VFAT restrictions"""
510
622
 
511
623
    def get_vfat_transport(self, url):
515
627
 
516
628
    def test_transport_creation(self):
517
629
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
518
 
        transport = self.get_vfat_transport('.')
519
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
630
        t = self.get_vfat_transport('.')
 
631
        self.assertIsInstance(t, FakeVFATTransportDecorator)
520
632
 
521
633
    def test_transport_mkdir(self):
522
 
        transport = self.get_vfat_transport('.')
523
 
        transport.mkdir('HELLO')
524
 
        self.assertTrue(transport.has('hello'))
525
 
        self.assertTrue(transport.has('Hello'))
 
634
        t = self.get_vfat_transport('.')
 
635
        t.mkdir('HELLO')
 
636
        self.assertTrue(t.has('hello'))
 
637
        self.assertTrue(t.has('Hello'))
526
638
 
527
639
    def test_forbidden_chars(self):
528
 
        transport = self.get_vfat_transport('.')
529
 
        self.assertRaises(ValueError, transport.has, "<NU>")
530
 
 
531
 
 
532
 
class BadTransportHandler(Transport):
 
640
        t = self.get_vfat_transport('.')
 
641
        self.assertRaises(ValueError, t.has, "<NU>")
 
642
 
 
643
 
 
644
class BadTransportHandler(transport.Transport):
533
645
    def __init__(self, base_url):
534
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
535
 
 
536
 
 
537
 
class BackupTransportHandler(Transport):
 
646
        raise errors.DependencyNotPresent('some_lib',
 
647
                                          'testing missing dependency')
 
648
 
 
649
 
 
650
class BackupTransportHandler(transport.Transport):
538
651
    """Test transport that works as a backup for the BadTransportHandler"""
539
652
    pass
540
653
 
541
654
 
542
 
class TestTransportImplementation(TestCaseInTempDir):
 
655
class TestTransportImplementation(tests.TestCaseInTempDir):
543
656
    """Implementation verification for transports.
544
 
    
 
657
 
545
658
    To verify a transport we need a server factory, which is a callable
546
659
    that accepts no parameters and returns an implementation of
547
660
    bzrlib.transport.Server.
548
 
    
 
661
 
549
662
    That Server is then used to construct transport instances and test
550
663
    the transport via loopback activity.
551
664
 
552
 
    Currently this assumes that the Transport object is connected to the 
553
 
    current working directory.  So that whatever is done 
554
 
    through the transport, should show up in the working 
 
665
    Currently this assumes that the Transport object is connected to the
 
666
    current working directory.  So that whatever is done
 
667
    through the transport, should show up in the working
555
668
    directory, and vice-versa. This is a bug, because its possible to have
556
 
    URL schemes which provide access to something that may not be 
557
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
669
    URL schemes which provide access to something that may not be
 
670
    result in storage on the local disk, i.e. due to file system limits, or
558
671
    due to it being a database or some other non-filesystem tool.
559
672
 
560
673
    This also tests to make sure that the functions work with both
561
674
    generators and lists (assuming iter(list) is effectively a generator)
562
675
    """
563
 
    
 
676
 
564
677
    def setUp(self):
565
678
        super(TestTransportImplementation, self).setUp()
566
679
        self._server = self.transport_server()
567
 
        self._server.setUp()
568
 
        self.addCleanup(self._server.tearDown)
 
680
        self.start_server(self._server)
569
681
 
570
682
    def get_transport(self, relpath=None):
571
683
        """Return a connected transport to the local directory.
575
687
        base_url = self._server.get_url()
576
688
        url = self._adjust_url(base_url, relpath)
577
689
        # try getting the transport via the regular interface:
578
 
        t = get_transport(url)
 
690
        t = transport.get_transport_from_url(url)
579
691
        # vila--20070607 if the following are commented out the test suite
580
692
        # still pass. Is this really still needed or was it a forgotten
581
693
        # temporary fix ?
586
698
        return t
587
699
 
588
700
 
589
 
class TestLocalTransports(TestCase):
 
701
class TestTransportFromPath(tests.TestCaseInTempDir):
 
702
 
 
703
    def test_with_path(self):
 
704
        t = transport.get_transport_from_path(self.test_dir)
 
705
        self.assertIsInstance(t, local.LocalTransport)
 
706
        self.assertEqual(t.base.rstrip("/"),
 
707
            urlutils.local_path_to_url(self.test_dir))
 
708
 
 
709
    def test_with_url(self):
 
710
        t = transport.get_transport_from_path("file:")
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEqual(t.base.rstrip("/"),
 
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
714
 
 
715
 
 
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
717
 
 
718
    def test_with_path(self):
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
 
721
 
 
722
    def test_with_url(self):
 
723
        url = urlutils.local_path_to_url(self.test_dir)
 
724
        t = transport.get_transport_from_url(url)
 
725
        self.assertIsInstance(t, local.LocalTransport)
 
726
        self.assertEqual(t.base.rstrip("/"), url)
 
727
 
 
728
    def test_with_url_and_segment_parameters(self):
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
730
        t = transport.get_transport_from_url(url)
 
731
        self.assertIsInstance(t, local.LocalTransport)
 
732
        self.assertEqual(t.base.rstrip("/"), url)
 
733
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
734
            f.write("data")
 
735
        self.assertTrue(t.has("afile"))
 
736
 
 
737
 
 
738
class TestLocalTransports(tests.TestCase):
590
739
 
591
740
    def test_get_transport_from_abspath(self):
592
741
        here = osutils.abspath('.')
593
 
        t = get_transport(here)
594
 
        self.assertIsInstance(t, LocalTransport)
595
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
742
        t = transport.get_transport(here)
 
743
        self.assertIsInstance(t, local.LocalTransport)
 
744
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
596
745
 
597
746
    def test_get_transport_from_relpath(self):
598
747
        here = osutils.abspath('.')
599
 
        t = get_transport('.')
600
 
        self.assertIsInstance(t, LocalTransport)
601
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
748
        t = transport.get_transport('.')
 
749
        self.assertIsInstance(t, local.LocalTransport)
 
750
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
602
751
 
603
752
    def test_get_transport_from_local_url(self):
604
753
        here = osutils.abspath('.')
605
754
        here_url = urlutils.local_path_to_url(here) + '/'
606
 
        t = get_transport(here_url)
607
 
        self.assertIsInstance(t, LocalTransport)
608
 
        self.assertEquals(t.base, here_url)
 
755
        t = transport.get_transport(here_url)
 
756
        self.assertIsInstance(t, local.LocalTransport)
 
757
        self.assertEqual(t.base, here_url)
609
758
 
610
759
    def test_local_abspath(self):
611
760
        here = osutils.abspath('.')
612
 
        t = get_transport(here)
613
 
        self.assertEquals(t.local_abspath(''), here)
614
 
 
615
 
 
616
 
class TestWin32LocalTransport(TestCase):
 
761
        t = transport.get_transport(here)
 
762
        self.assertEqual(t.local_abspath(''), here)
 
763
 
 
764
 
 
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
766
 
 
767
    def test_local_transport_mkdir(self):
 
768
        here = osutils.abspath('.')
 
769
        t = transport.get_transport(here)
 
770
        t.mkdir('test')
 
771
        self.assertTrue(os.path.exists('test'))
 
772
 
 
773
    def test_local_transport_mkdir_permission_denied(self):
 
774
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
775
        here = osutils.abspath('.')
 
776
        t = transport.get_transport(here)
 
777
        def fake_chmod(path, mode):
 
778
            e = OSError('permission denied')
 
779
            e.errno = errno.EPERM
 
780
            raise e
 
781
        self.overrideAttr(os, 'chmod', fake_chmod)
 
782
        t.mkdir('test')
 
783
        t.mkdir('test2', mode=0707)
 
784
        self.assertTrue(os.path.exists('test'))
 
785
        self.assertTrue(os.path.exists('test2'))
 
786
 
 
787
 
 
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
789
 
 
790
    def test_local_fdatasync_calls_fdatasync(self):
 
791
        """Check fdatasync on a stream tries to flush the data to the OS.
 
792
        
 
793
        We can't easily observe the external effect but we can at least see
 
794
        it's called.
 
795
        """
 
796
        sentinel = object()
 
797
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
798
        if fdatasync is sentinel:
 
799
            raise tests.TestNotApplicable('fdatasync not supported')
 
800
        t = self.get_transport('.')
 
801
        calls = self.recordCalls(os, 'fdatasync')
 
802
        w = t.open_write_stream('out')
 
803
        w.write('foo')
 
804
        w.fdatasync()
 
805
        with open('out', 'rb') as f:
 
806
            # Should have been flushed.
 
807
            self.assertEqual(f.read(), 'foo')
 
808
        self.assertEqual(len(calls), 1, calls)
 
809
 
 
810
    def test_missing_directory(self):
 
811
        t = self.get_transport('.')
 
812
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
813
 
 
814
 
 
815
class TestWin32LocalTransport(tests.TestCase):
617
816
 
618
817
    def test_unc_clone_to_root(self):
 
818
        self.requireFeature(features.win32_feature)
619
819
        # Win32 UNC path like \\HOST\path
620
820
        # clone to root should stop at least at \\HOST part
621
821
        # not on \\
622
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
822
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
623
823
        for i in xrange(4):
624
824
            t = t.clone('..')
625
 
        self.assertEquals(t.base, 'file://HOST/')
 
825
        self.assertEqual(t.base, 'file://HOST/')
626
826
        # make sure we reach the root
627
827
        t = t.clone('..')
628
 
        self.assertEquals(t.base, 'file://HOST/')
629
 
 
630
 
 
631
 
class TestConnectedTransport(TestCase):
 
828
        self.assertEqual(t.base, 'file://HOST/')
 
829
 
 
830
 
 
831
class TestConnectedTransport(tests.TestCase):
632
832
    """Tests for connected to remote server transports"""
633
833
 
634
834
    def test_parse_url(self):
635
 
        t = ConnectedTransport('http://simple.example.com/home/source')
636
 
        self.assertEquals(t._host, 'simple.example.com')
637
 
        self.assertEquals(t._port, None)
638
 
        self.assertEquals(t._path, '/home/source/')
639
 
        self.failUnless(t._user is None)
640
 
        self.failUnless(t._password is None)
641
 
 
642
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
835
        t = transport.ConnectedTransport(
 
836
            'http://simple.example.com/home/source')
 
837
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
838
        self.assertEqual(t._parsed_url.port, None)
 
839
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
840
        self.assertTrue(t._parsed_url.user is None)
 
841
        self.assertTrue(t._parsed_url.password is None)
 
842
 
 
843
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
 
844
 
 
845
    def test_parse_url_with_at_in_user(self):
 
846
        # Bug 228058
 
847
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
848
        self.assertEqual(t._parsed_url.user, 'user@host.com')
643
849
 
644
850
    def test_parse_quoted_url(self):
645
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
646
 
        self.assertEquals(t._host, 'exAmple.com')
647
 
        self.assertEquals(t._port, 2222)
648
 
        self.assertEquals(t._user, 'robey')
649
 
        self.assertEquals(t._password, 'h@t')
650
 
        self.assertEquals(t._path, '/path/')
 
851
        t = transport.ConnectedTransport(
 
852
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
853
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
854
        self.assertEqual(t._parsed_url.port, 2222)
 
855
        self.assertEqual(t._parsed_url.user, 'robey')
 
856
        self.assertEqual(t._parsed_url.password, 'h@t')
 
857
        self.assertEqual(t._parsed_url.path, '/path/')
651
858
 
652
859
        # Base should not keep track of the password
653
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
860
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
654
861
 
655
862
    def test_parse_invalid_url(self):
656
863
        self.assertRaises(errors.InvalidURL,
657
 
                          ConnectedTransport,
 
864
                          transport.ConnectedTransport,
658
865
                          'sftp://lily.org:~janneke/public/bzr/gub')
659
866
 
660
867
    def test_relpath(self):
661
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
868
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
662
869
 
663
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
870
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
871
            'sub')
664
872
        self.assertRaises(errors.PathNotChild, t.relpath,
665
873
                          'http://user@host.com/abs/path/sub')
666
874
        self.assertRaises(errors.PathNotChild, t.relpath,
670
878
        self.assertRaises(errors.PathNotChild, t.relpath,
671
879
                          'sftp://user@host.com:33/abs/path/sub')
672
880
        # Make sure it works when we don't supply a username
673
 
        t = ConnectedTransport('sftp://host.com/abs/path')
674
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
881
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
882
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
675
883
 
676
884
        # Make sure it works when parts of the path will be url encoded
677
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
678
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
885
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
886
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
679
887
 
680
888
    def test_connection_sharing_propagate_credentials(self):
681
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
682
 
        self.assertEquals('user', t._user)
683
 
        self.assertEquals('host.com', t._host)
 
889
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
890
        self.assertEqual('user', t._parsed_url.user)
 
891
        self.assertEqual('host.com', t._parsed_url.host)
684
892
        self.assertIs(None, t._get_connection())
685
 
        self.assertIs(None, t._password)
 
893
        self.assertIs(None, t._parsed_url.password)
686
894
        c = t.clone('subdir')
687
895
        self.assertIs(None, c._get_connection())
688
 
        self.assertIs(None, t._password)
 
896
        self.assertIs(None, t._parsed_url.password)
689
897
 
690
898
        # Simulate the user entering a password
691
899
        password = 'secret'
705
913
        self.assertIs(new_password, c._get_credentials())
706
914
 
707
915
 
708
 
class TestReusedTransports(TestCase):
 
916
class TestReusedTransports(tests.TestCase):
709
917
    """Tests for transport reuse"""
710
918
 
711
919
    def test_reuse_same_transport(self):
712
920
        possible_transports = []
713
 
        t1 = get_transport('http://foo/',
714
 
                           possible_transports=possible_transports)
 
921
        t1 = transport.get_transport_from_url('http://foo/',
 
922
                                     possible_transports=possible_transports)
715
923
        self.assertEqual([t1], possible_transports)
716
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
924
        t2 = transport.get_transport_from_url('http://foo/',
 
925
                                     possible_transports=[t1])
717
926
        self.assertIs(t1, t2)
718
927
 
719
928
        # Also check that final '/' are handled correctly
720
 
        t3 = get_transport('http://foo/path/')
721
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
929
        t3 = transport.get_transport_from_url('http://foo/path/')
 
930
        t4 = transport.get_transport_from_url('http://foo/path',
 
931
                                     possible_transports=[t3])
722
932
        self.assertIs(t3, t4)
723
933
 
724
 
        t5 = get_transport('http://foo/path')
725
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
934
        t5 = transport.get_transport_from_url('http://foo/path')
 
935
        t6 = transport.get_transport_from_url('http://foo/path/',
 
936
                                     possible_transports=[t5])
726
937
        self.assertIs(t5, t6)
727
938
 
728
939
    def test_don_t_reuse_different_transport(self):
729
 
        t1 = get_transport('http://foo/path')
730
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
940
        t1 = transport.get_transport_from_url('http://foo/path')
 
941
        t2 = transport.get_transport_from_url('http://bar/path',
 
942
                                     possible_transports=[t1])
731
943
        self.assertIsNot(t1, t2)
732
944
 
733
945
 
734
 
class TestTransportTrace(TestCase):
 
946
class TestTransportTrace(tests.TestCase):
735
947
 
736
 
    def test_get(self):
737
 
        transport = get_transport('trace+memory://')
 
948
    def test_decorator(self):
 
949
        t = transport.get_transport_from_url('trace+memory://')
738
950
        self.assertIsInstance(
739
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
951
            t, bzrlib.transport.trace.TransportTraceDecorator)
740
952
 
741
953
    def test_clone_preserves_activity(self):
742
 
        transport = get_transport('trace+memory://')
743
 
        transport2 = transport.clone('.')
744
 
        self.assertTrue(transport is not transport2)
745
 
        self.assertTrue(transport._activity is transport2._activity)
 
954
        t = transport.get_transport_from_url('trace+memory://')
 
955
        t2 = t.clone('.')
 
956
        self.assertTrue(t is not t2)
 
957
        self.assertTrue(t._activity is t2._activity)
746
958
 
747
959
    # the following specific tests are for the operations that have made use of
748
960
    # logging in tests; we could test every single operation but doing that
749
961
    # still won't cause a test failure when the top level Transport API
750
962
    # changes; so there is little return doing that.
751
963
    def test_get(self):
752
 
        transport = get_transport('trace+memory:///')
753
 
        transport.put_bytes('foo', 'barish')
754
 
        transport.get('foo')
 
964
        t = transport.get_transport_from_url('trace+memory:///')
 
965
        t.put_bytes('foo', 'barish')
 
966
        t.get('foo')
755
967
        expected_result = []
756
968
        # put_bytes records the bytes, not the content to avoid memory
757
969
        # pressure.
758
970
        expected_result.append(('put_bytes', 'foo', 6, None))
759
971
        # get records the file name only.
760
972
        expected_result.append(('get', 'foo'))
761
 
        self.assertEqual(expected_result, transport._activity)
 
973
        self.assertEqual(expected_result, t._activity)
762
974
 
763
975
    def test_readv(self):
764
 
        transport = get_transport('trace+memory:///')
765
 
        transport.put_bytes('foo', 'barish')
766
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
767
 
            upper_limit=6))
 
976
        t = transport.get_transport_from_url('trace+memory:///')
 
977
        t.put_bytes('foo', 'barish')
 
978
        list(t.readv('foo', [(0, 1), (3, 2)],
 
979
                     adjust_for_latency=True, upper_limit=6))
768
980
        expected_result = []
769
981
        # put_bytes records the bytes, not the content to avoid memory
770
982
        # pressure.
771
983
        expected_result.append(('put_bytes', 'foo', 6, None))
772
984
        # readv records the supplied offset request
773
985
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
774
 
        self.assertEqual(expected_result, transport._activity)
 
986
        self.assertEqual(expected_result, t._activity)
 
987
 
 
988
 
 
989
class TestSSHConnections(tests.TestCaseWithTransport):
 
990
 
 
991
    def test_bzr_connect_to_bzr_ssh(self):
 
992
        """get_transport of a bzr+ssh:// behaves correctly.
 
993
 
 
994
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
995
        """
 
996
        # This test actually causes a bzr instance to be invoked, which is very
 
997
        # expensive: it should be the only such test in the test suite.
 
998
        # A reasonable evolution for this would be to simply check inside
 
999
        # check_channel_exec_request that the command is appropriate, and then
 
1000
        # satisfy requests in-process.
 
1001
        self.requireFeature(features.paramiko)
 
1002
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
1003
        # override the interface (doesn't change self._vendor).
 
1004
        # Note that this does encryption, so can be slow.
 
1005
        from bzrlib.tests import stub_sftp
 
1006
 
 
1007
        # Start an SSH server
 
1008
        self.command_executed = []
 
1009
        # XXX: This is horrible -- we define a really dumb SSH server that
 
1010
        # executes commands, and manage the hooking up of stdin/out/err to the
 
1011
        # SSH channel ourselves.  Surely this has already been implemented
 
1012
        # elsewhere?
 
1013
        started = []
 
1014
 
 
1015
        class StubSSHServer(stub_sftp.StubServer):
 
1016
 
 
1017
            test = self
 
1018
 
 
1019
            def check_channel_exec_request(self, channel, command):
 
1020
                self.test.command_executed.append(command)
 
1021
                proc = subprocess.Popen(
 
1022
                    command, shell=True, stdin=subprocess.PIPE,
 
1023
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1024
 
 
1025
                # XXX: horribly inefficient, not to mention ugly.
 
1026
                # Start a thread for each of stdin/out/err, and relay bytes
 
1027
                # from the subprocess to channel and vice versa.
 
1028
                def ferry_bytes(read, write, close):
 
1029
                    while True:
 
1030
                        bytes = read(1)
 
1031
                        if bytes == '':
 
1032
                            close()
 
1033
                            break
 
1034
                        write(bytes)
 
1035
 
 
1036
                file_functions = [
 
1037
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
1038
                    (proc.stdout.read, channel.sendall, channel.close),
 
1039
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
1040
                started.append(proc)
 
1041
                for read, write, close in file_functions:
 
1042
                    t = threading.Thread(
 
1043
                        target=ferry_bytes, args=(read, write, close))
 
1044
                    t.start()
 
1045
                    started.append(t)
 
1046
 
 
1047
                return True
 
1048
 
 
1049
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1050
        # We *don't* want to override the default SSH vendor: the detected one
 
1051
        # is the one to use.
 
1052
 
 
1053
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1054
        # inherits from SFTPServer which forces the SSH vendor to
 
1055
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1056
        self.start_server(ssh_server)
 
1057
        port = ssh_server.port
 
1058
 
 
1059
        if sys.platform == 'win32':
 
1060
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1061
        else:
 
1062
            bzr_remote_path = self.get_bzr_path()
 
1063
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1064
 
 
1065
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1066
        # variable is used to tell bzr what command to run on the remote end.
 
1067
        path_to_branch = osutils.abspath('.')
 
1068
        if sys.platform == 'win32':
 
1069
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1070
            # prefix a '/' to get the right path.
 
1071
            path_to_branch = '/' + path_to_branch
 
1072
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1073
        t = transport.get_transport(url)
 
1074
        self.permit_url(t.base)
 
1075
        t.mkdir('foo')
 
1076
 
 
1077
        self.assertEqual(
 
1078
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1079
            self.command_executed)
 
1080
        # Make sure to disconnect, so that the remote process can stop, and we
 
1081
        # can cleanup. Then pause the test until everything is shutdown
 
1082
        t._client._medium.disconnect()
 
1083
        if not started:
 
1084
            return
 
1085
        # First wait for the subprocess
 
1086
        started[0].wait()
 
1087
        # And the rest are threads
 
1088
        for t in started[1:]:
 
1089
            t.join()
 
1090
 
 
1091
 
 
1092
class TestUnhtml(tests.TestCase):
 
1093
 
 
1094
    """Tests for unhtml_roughly"""
 
1095
 
 
1096
    def test_truncation(self):
 
1097
        fake_html = "<p>something!\n" * 1000
 
1098
        result = http.unhtml_roughly(fake_html)
 
1099
        self.assertEqual(len(result), 1000)
 
1100
        self.assertStartsWith(result, " something!")
 
1101
 
 
1102
 
 
1103
class SomeDirectory(object):
 
1104
 
 
1105
    def look_up(self, name, url):
 
1106
        return "http://bar"
 
1107
 
 
1108
 
 
1109
class TestLocationToUrl(tests.TestCase):
 
1110
 
 
1111
    def get_base_location(self):
 
1112
        path = osutils.abspath('/foo/bar')
 
1113
        if path.startswith('/'):
 
1114
            url = 'file://%s' % (path,)
 
1115
        else:
 
1116
            # On Windows, abspaths start with the drive letter, so we have to
 
1117
            # add in the extra '/'
 
1118
            url = 'file:///%s' % (path,)
 
1119
        return path, url
 
1120
 
 
1121
    def test_regular_url(self):
 
1122
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1123
 
 
1124
    def test_directory(self):
 
1125
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1126
        self.addCleanup(directories.remove, "bar:")
 
1127
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1128
 
 
1129
    def test_unicode_url(self):
 
1130
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1131
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1132
 
 
1133
    def test_unicode_path(self):
 
1134
        path, url = self.get_base_location()
 
1135
        location = path + "\xc3\xaf".decode("utf-8")
 
1136
        url += '%C3%AF'
 
1137
        self.assertEqual(url, location_to_url(location))
 
1138
 
 
1139
    def test_path(self):
 
1140
        path, url = self.get_base_location()
 
1141
        self.assertEqual(url, location_to_url(path))
 
1142
 
 
1143
    def test_relative_file_url(self):
 
1144
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1145
            location_to_url("file:bar"))
 
1146
 
 
1147
    def test_absolute_file_url(self):
 
1148
        self.assertEqual("file:///bar", location_to_url("file:/bar"))