~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-03 10:06:19 UTC
  • mfrom: (4999.3.2 apport)
  • Revision ID: pqm@pqm.ubuntu.com-20100203100619-f76bo5y5bd5c14wk
(mbp) use apport to send bugs, not just store them

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import errno
20
19
import os
21
20
import subprocess
22
21
import sys
26
25
    errors,
27
26
    osutils,
28
27
    tests,
29
 
    transport,
 
28
    transport as _mod_transport,
30
29
    urlutils,
31
30
    )
32
 
from bzrlib.directory_service import directories
33
31
from bzrlib.transport import (
34
 
    chroot,
35
32
    fakenfs,
36
 
    http,
37
 
    local,
38
 
    location_to_url,
39
33
    memory,
40
 
    pathfilter,
41
34
    readonly,
42
35
    )
43
 
import bzrlib.transport.trace
44
 
from bzrlib.tests import (
45
 
    features,
46
 
    test_server,
47
 
    )
 
36
from bzrlib.errors import (DependencyNotPresent,
 
37
                           FileExists,
 
38
                           InvalidURLJoin,
 
39
                           NoSuchFile,
 
40
                           PathNotChild,
 
41
                           ReadError,
 
42
                           UnsupportedProtocol,
 
43
                           )
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
45
from bzrlib.transport import (_clear_protocol_handlers,
 
46
                              _CoalescedOffset,
 
47
                              ConnectedTransport,
 
48
                              _get_protocol_handlers,
 
49
                              _set_protocol_handlers,
 
50
                              _get_transport_modules,
 
51
                              get_transport,
 
52
                              LateReadError,
 
53
                              register_lazy_transport,
 
54
                              register_transport_proto,
 
55
                              Transport,
 
56
                              )
 
57
from bzrlib.transport.chroot import ChrootServer
 
58
from bzrlib.transport.local import (LocalTransport,
 
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
48
61
 
49
62
 
50
63
# TODO: Should possibly split transport-specific tests into their own files.
51
64
 
52
65
 
53
 
class TestTransport(tests.TestCase):
 
66
class TestTransport(TestCase):
54
67
    """Test the non transport-concrete class functionality."""
55
68
 
56
69
    def test__get_set_protocol_handlers(self):
57
 
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys())
59
 
        transport._clear_protocol_handlers()
60
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
61
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
70
        handlers = _get_protocol_handlers()
 
71
        self.assertNotEqual([], handlers.keys( ))
 
72
        try:
 
73
            _clear_protocol_handlers()
 
74
            self.assertEqual([], _get_protocol_handlers().keys())
 
75
        finally:
 
76
            _set_protocol_handlers(handlers)
62
77
 
63
78
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
65
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
79
        handlers = _get_protocol_handlers()
66
80
        # don't pollute the current handlers
67
 
        transport._clear_protocol_handlers()
68
 
 
 
81
        _clear_protocol_handlers()
69
82
        class SampleHandler(object):
70
83
            """I exist, isnt that enough?"""
71
 
        transport._clear_protocol_handlers()
72
 
        transport.register_transport_proto('foo')
73
 
        transport.register_lazy_transport('foo',
74
 
                                            'bzrlib.tests.test_transport',
75
 
                                            'TestTransport.SampleHandler')
76
 
        transport.register_transport_proto('bar')
77
 
        transport.register_lazy_transport('bar',
78
 
                                            'bzrlib.tests.test_transport',
79
 
                                            'TestTransport.SampleHandler')
80
 
        self.assertEqual([SampleHandler.__module__,
81
 
                            'bzrlib.transport.chroot',
82
 
                            'bzrlib.transport.pathfilter'],
83
 
                            transport._get_transport_modules())
 
84
        try:
 
85
            _clear_protocol_handlers()
 
86
            register_transport_proto('foo')
 
87
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
88
                                    'TestTransport.SampleHandler')
 
89
            register_transport_proto('bar')
 
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
91
                                    'TestTransport.SampleHandler')
 
92
            self.assertEqual([SampleHandler.__module__,
 
93
                              'bzrlib.transport.chroot',
 
94
                              'bzrlib.transport.pathfilter'],
 
95
                             _get_transport_modules())
 
96
        finally:
 
97
            _set_protocol_handlers(handlers)
84
98
 
85
99
    def test_transport_dependency(self):
86
100
        """Transport with missing dependency causes no error"""
87
 
        saved_handlers = transport._get_protocol_handlers()
88
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
101
        saved_handlers = _get_protocol_handlers()
89
102
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
91
 
        transport.register_transport_proto('foo')
92
 
        transport.register_lazy_transport(
93
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
103
        _clear_protocol_handlers()
94
104
        try:
95
 
            transport.get_transport_from_url('foo://fooserver/foo')
96
 
        except errors.UnsupportedProtocol, e:
97
 
            e_str = str(e)
98
 
            self.assertEquals('Unsupported protocol'
99
 
                                ' for url "foo://fooserver/foo":'
100
 
                                ' Unable to import library "some_lib":'
101
 
                                ' testing missing dependency', str(e))
102
 
        else:
103
 
            self.fail('Did not raise UnsupportedProtocol')
 
105
            register_transport_proto('foo')
 
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
107
                    'BadTransportHandler')
 
108
            try:
 
109
                get_transport('foo://fooserver/foo')
 
110
            except UnsupportedProtocol, e:
 
111
                e_str = str(e)
 
112
                self.assertEquals('Unsupported protocol'
 
113
                                  ' for url "foo://fooserver/foo":'
 
114
                                  ' Unable to import library "some_lib":'
 
115
                                  ' testing missing dependency', str(e))
 
116
            else:
 
117
                self.fail('Did not raise UnsupportedProtocol')
 
118
        finally:
 
119
            # restore original values
 
120
            _set_protocol_handlers(saved_handlers)
104
121
 
105
122
    def test_transport_fallback(self):
106
123
        """Transport with missing dependency causes no error"""
107
 
        saved_handlers = transport._get_protocol_handlers()
108
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
109
 
        transport._clear_protocol_handlers()
110
 
        transport.register_transport_proto('foo')
111
 
        transport.register_lazy_transport(
112
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
113
 
        transport.register_lazy_transport(
114
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
115
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
116
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
124
        saved_handlers = _get_protocol_handlers()
 
125
        try:
 
126
            _clear_protocol_handlers()
 
127
            register_transport_proto('foo')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BackupTransportHandler')
 
130
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
131
                    'BadTransportHandler')
 
132
            t = get_transport('foo://fooserver/foo')
 
133
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
134
        finally:
 
135
            _set_protocol_handlers(saved_handlers)
117
136
 
118
137
    def test_ssh_hints(self):
119
138
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
120
139
        try:
121
 
            transport.get_transport_from_url('ssh://fooserver/foo')
122
 
        except errors.UnsupportedProtocol, e:
 
140
            get_transport('ssh://fooserver/foo')
 
141
        except UnsupportedProtocol, e:
123
142
            e_str = str(e)
124
143
            self.assertEquals('Unsupported protocol'
125
144
                              ' for url "ssh://fooserver/foo":'
126
 
                              ' bzr supports bzr+ssh to operate over ssh,'
127
 
                              ' use "bzr+ssh://fooserver/foo".',
 
145
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
128
146
                              str(e))
129
147
        else:
130
148
            self.fail('Did not raise UnsupportedProtocol')
131
149
 
132
150
    def test_LateReadError(self):
133
151
        """The LateReadError helper should raise on read()."""
134
 
        a_file = transport.LateReadError('a path')
 
152
        a_file = LateReadError('a path')
135
153
        try:
136
154
            a_file.read()
137
 
        except errors.ReadError, error:
 
155
        except ReadError, error:
138
156
            self.assertEqual('a path', error.path)
139
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
157
        self.assertRaises(ReadError, a_file.read, 40)
140
158
        a_file.close()
141
159
 
 
160
    def test__combine_paths(self):
 
161
        t = Transport('/')
 
162
        self.assertEqual('/home/sarah/project/foo',
 
163
                         t._combine_paths('/home/sarah', 'project/foo'))
 
164
        self.assertEqual('/etc',
 
165
                         t._combine_paths('/home/sarah', '../../etc'))
 
166
        self.assertEqual('/etc',
 
167
                         t._combine_paths('/home/sarah', '../../../etc'))
 
168
        self.assertEqual('/etc',
 
169
                         t._combine_paths('/home/sarah', '/etc'))
 
170
 
142
171
    def test_local_abspath_non_local_transport(self):
143
172
        # the base implementation should throw
144
173
        t = memory.MemoryTransport()
146
175
        self.assertEqual('memory:///t is not a local path.', str(e))
147
176
 
148
177
 
149
 
class TestCoalesceOffsets(tests.TestCase):
 
178
class TestCoalesceOffsets(TestCase):
150
179
 
151
180
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
152
 
        coalesce = transport.Transport._coalesce_offsets
153
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
181
        coalesce = Transport._coalesce_offsets
 
182
        exp = [_CoalescedOffset(*x) for x in expected]
154
183
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
155
184
                            max_size=max_size))
156
185
        self.assertEqual(exp, out)
201
230
 
202
231
    def test_coalesce_fudge(self):
203
232
        self.check([(10, 30, [(0, 10), (20, 10)]),
204
 
                    (100, 10, [(0, 10)]),
 
233
                    (100, 10, [(0, 10),]),
205
234
                   ], [(10, 10), (30, 10), (100, 10)],
206
 
                   fudge=10)
207
 
 
 
235
                   fudge=10
 
236
                  )
208
237
    def test_coalesce_max_size(self):
209
238
        self.check([(10, 20, [(0, 10), (10, 10)]),
210
239
                    (30, 50, [(0, 50)]),
211
240
                    # If one range is above max_size, it gets its own coalesced
212
241
                    # offset
213
 
                    (100, 80, [(0, 80)]),],
 
242
                    (100, 80, [(0, 80),]),],
214
243
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
215
 
                   max_size=50)
 
244
                   max_size=50
 
245
                  )
216
246
 
217
247
    def test_coalesce_no_max_size(self):
218
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
248
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
219
249
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
220
250
                  )
221
251
 
222
252
    def test_coalesce_default_limit(self):
223
253
        # By default we use a 100MB max size.
224
 
        ten_mb = 10 * 1024 * 1024
225
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
254
        ten_mb = 10*1024*1024
 
255
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
226
256
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
227
257
                   [(i*ten_mb, ten_mb) for i in range(11)])
228
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
258
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
259
                   [(i*ten_mb, ten_mb) for i in range(11)],
230
260
                   max_size=1*1024*1024*1024)
231
261
 
232
262
 
233
 
class TestMemoryServer(tests.TestCase):
 
263
class TestMemoryServer(TestCase):
234
264
 
235
265
    def test_create_server(self):
236
266
        server = memory.MemoryServer()
237
267
        server.start_server()
238
268
        url = server.get_url()
239
 
        self.assertTrue(url in transport.transport_list_registry)
240
 
        t = transport.get_transport_from_url(url)
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
241
271
        del t
242
272
        server.stop_server()
243
 
        self.assertFalse(url in transport.transport_list_registry)
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
244
274
        self.assertRaises(errors.UnsupportedProtocol,
245
 
                          transport.get_transport, url)
246
 
 
247
 
 
248
 
class TestMemoryTransport(tests.TestCase):
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
 
278
class TestMemoryTransport(TestCase):
249
279
 
250
280
    def test_get_transport(self):
251
281
        memory.MemoryTransport()
252
282
 
253
283
    def test_clone(self):
254
 
        t = memory.MemoryTransport()
255
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
256
 
        self.assertEqual("memory:///", t.clone("/").base)
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
286
        self.assertEqual("memory:///", transport.clone("/").base)
257
287
 
258
288
    def test_abspath(self):
259
 
        t = memory.MemoryTransport()
260
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
289
        transport = memory.MemoryTransport()
 
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
261
291
 
262
292
    def test_abspath_of_root(self):
263
 
        t = memory.MemoryTransport()
264
 
        self.assertEqual("memory:///", t.base)
265
 
        self.assertEqual("memory:///", t.abspath('/'))
 
293
        transport = memory.MemoryTransport()
 
294
        self.assertEqual("memory:///", transport.base)
 
295
        self.assertEqual("memory:///", transport.abspath('/'))
266
296
 
267
297
    def test_abspath_of_relpath_starting_at_root(self):
268
 
        t = memory.MemoryTransport()
269
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
298
        transport = memory.MemoryTransport()
 
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
270
300
 
271
301
    def test_append_and_get(self):
272
 
        t = memory.MemoryTransport()
273
 
        t.append_bytes('path', 'content')
274
 
        self.assertEqual(t.get('path').read(), 'content')
275
 
        t.append_file('path', StringIO('content'))
276
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
302
        transport = memory.MemoryTransport()
 
303
        transport.append_bytes('path', 'content')
 
304
        self.assertEqual(transport.get('path').read(), 'content')
 
305
        transport.append_file('path', StringIO('content'))
 
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
277
307
 
278
308
    def test_put_and_get(self):
279
 
        t = memory.MemoryTransport()
280
 
        t.put_file('path', StringIO('content'))
281
 
        self.assertEqual(t.get('path').read(), 'content')
282
 
        t.put_bytes('path', 'content')
283
 
        self.assertEqual(t.get('path').read(), 'content')
 
309
        transport = memory.MemoryTransport()
 
310
        transport.put_file('path', StringIO('content'))
 
311
        self.assertEqual(transport.get('path').read(), 'content')
 
312
        transport.put_bytes('path', 'content')
 
313
        self.assertEqual(transport.get('path').read(), 'content')
284
314
 
285
315
    def test_append_without_dir_fails(self):
286
 
        t = memory.MemoryTransport()
287
 
        self.assertRaises(errors.NoSuchFile,
288
 
                          t.append_bytes, 'dir/path', 'content')
 
316
        transport = memory.MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.append_bytes, 'dir/path', 'content')
289
319
 
290
320
    def test_put_without_dir_fails(self):
291
 
        t = memory.MemoryTransport()
292
 
        self.assertRaises(errors.NoSuchFile,
293
 
                          t.put_file, 'dir/path', StringIO('content'))
 
321
        transport = memory.MemoryTransport()
 
322
        self.assertRaises(NoSuchFile,
 
323
                          transport.put_file, 'dir/path', StringIO('content'))
294
324
 
295
325
    def test_get_missing(self):
296
326
        transport = memory.MemoryTransport()
297
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
298
328
 
299
329
    def test_has_missing(self):
300
 
        t = memory.MemoryTransport()
301
 
        self.assertEquals(False, t.has('foo'))
 
330
        transport = memory.MemoryTransport()
 
331
        self.assertEquals(False, transport.has('foo'))
302
332
 
303
333
    def test_has_present(self):
304
 
        t = memory.MemoryTransport()
305
 
        t.append_bytes('foo', 'content')
306
 
        self.assertEquals(True, t.has('foo'))
 
334
        transport = memory.MemoryTransport()
 
335
        transport.append_bytes('foo', 'content')
 
336
        self.assertEquals(True, transport.has('foo'))
307
337
 
308
338
    def test_list_dir(self):
309
 
        t = memory.MemoryTransport()
310
 
        t.put_bytes('foo', 'content')
311
 
        t.mkdir('dir')
312
 
        t.put_bytes('dir/subfoo', 'content')
313
 
        t.put_bytes('dirlike', 'content')
 
339
        transport = memory.MemoryTransport()
 
340
        transport.put_bytes('foo', 'content')
 
341
        transport.mkdir('dir')
 
342
        transport.put_bytes('dir/subfoo', 'content')
 
343
        transport.put_bytes('dirlike', 'content')
314
344
 
315
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
316
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
345
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
317
347
 
318
348
    def test_mkdir(self):
319
 
        t = memory.MemoryTransport()
320
 
        t.mkdir('dir')
321
 
        t.append_bytes('dir/path', 'content')
322
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
349
        transport = memory.MemoryTransport()
 
350
        transport.mkdir('dir')
 
351
        transport.append_bytes('dir/path', 'content')
 
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
323
353
 
324
354
    def test_mkdir_missing_parent(self):
325
 
        t = memory.MemoryTransport()
326
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
355
        transport = memory.MemoryTransport()
 
356
        self.assertRaises(NoSuchFile,
 
357
                          transport.mkdir, 'dir/dir')
327
358
 
328
359
    def test_mkdir_twice(self):
329
 
        t = memory.MemoryTransport()
330
 
        t.mkdir('dir')
331
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
360
        transport = memory.MemoryTransport()
 
361
        transport.mkdir('dir')
 
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
332
363
 
333
364
    def test_parameters(self):
334
 
        t = memory.MemoryTransport()
335
 
        self.assertEqual(True, t.listable())
336
 
        self.assertEqual(False, t.is_readonly())
 
365
        transport = memory.MemoryTransport()
 
366
        self.assertEqual(True, transport.listable())
 
367
        self.assertEqual(False, transport.is_readonly())
337
368
 
338
369
    def test_iter_files_recursive(self):
339
 
        t = memory.MemoryTransport()
340
 
        t.mkdir('dir')
341
 
        t.put_bytes('dir/foo', 'content')
342
 
        t.put_bytes('dir/bar', 'content')
343
 
        t.put_bytes('bar', 'content')
344
 
        paths = set(t.iter_files_recursive())
 
370
        transport = memory.MemoryTransport()
 
371
        transport.mkdir('dir')
 
372
        transport.put_bytes('dir/foo', 'content')
 
373
        transport.put_bytes('dir/bar', 'content')
 
374
        transport.put_bytes('bar', 'content')
 
375
        paths = set(transport.iter_files_recursive())
345
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
346
377
 
347
378
    def test_stat(self):
348
 
        t = memory.MemoryTransport()
349
 
        t.put_bytes('foo', 'content')
350
 
        t.put_bytes('bar', 'phowar')
351
 
        self.assertEqual(7, t.stat('foo').st_size)
352
 
        self.assertEqual(6, t.stat('bar').st_size)
353
 
 
354
 
 
355
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
379
        transport = memory.MemoryTransport()
 
380
        transport.put_bytes('foo', 'content')
 
381
        transport.put_bytes('bar', 'phowar')
 
382
        self.assertEqual(7, transport.stat('foo').st_size)
 
383
        self.assertEqual(6, transport.stat('bar').st_size)
 
384
 
 
385
 
 
386
class ChrootDecoratorTransportTest(TestCase):
356
387
    """Chroot decoration specific tests."""
357
388
 
358
389
    def test_abspath(self):
359
390
        # The abspath is always relative to the chroot_url.
360
 
        server = chroot.ChrootServer(
361
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
362
392
        self.start_server(server)
363
 
        t = transport.get_transport_from_url(server.get_url())
364
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
393
        transport = get_transport(server.get_url())
 
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
365
395
 
366
 
        subdir_t = t.clone('subdir')
367
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
396
        subdir_transport = transport.clone('subdir')
 
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
368
398
 
369
399
    def test_clone(self):
370
 
        server = chroot.ChrootServer(
371
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
372
401
        self.start_server(server)
373
 
        t = transport.get_transport_from_url(server.get_url())
 
402
        transport = get_transport(server.get_url())
374
403
        # relpath from root and root path are the same
375
 
        relpath_cloned = t.clone('foo')
376
 
        abspath_cloned = t.clone('/foo')
 
404
        relpath_cloned = transport.clone('foo')
 
405
        abspath_cloned = transport.clone('/foo')
377
406
        self.assertEqual(server, relpath_cloned.server)
378
407
        self.assertEqual(server, abspath_cloned.server)
379
408
 
385
414
        This is so that it is not possible to escape a chroot by doing::
386
415
            url = chroot_transport.base
387
416
            parent_url = urlutils.join(url, '..')
388
 
            new_t = transport.get_transport_from_url(parent_url)
 
417
            new_transport = get_transport(parent_url)
389
418
        """
390
 
        server = chroot.ChrootServer(
391
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
392
420
        self.start_server(server)
393
 
        t = transport.get_transport_from_url(server.get_url())
394
 
        new_t = transport.get_transport_from_url(t.base)
395
 
        self.assertEqual(t.server, new_t.server)
396
 
        self.assertEqual(t.base, new_t.base)
 
421
        transport = get_transport(server.get_url())
 
422
        new_transport = get_transport(transport.base)
 
423
        self.assertEqual(transport.server, new_transport.server)
 
424
        self.assertEqual(transport.base, new_transport.base)
397
425
 
398
426
    def test_urljoin_preserves_chroot(self):
399
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
402
430
        This is so that it is not possible to escape a chroot by doing::
403
431
            url = chroot_transport.base
404
432
            parent_url = urlutils.join(url, '..')
405
 
            new_t = transport.get_transport_from_url(parent_url)
 
433
            new_transport = get_transport(parent_url)
406
434
        """
407
 
        server = chroot.ChrootServer(
408
 
            transport.get_transport_from_url('memory:///path/'))
 
435
        server = ChrootServer(get_transport('memory:///path/'))
409
436
        self.start_server(server)
410
 
        t = transport.get_transport_from_url(server.get_url())
 
437
        transport = get_transport(server.get_url())
411
438
        self.assertRaises(
412
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
413
 
 
414
 
 
415
 
class TestChrootServer(tests.TestCase):
 
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
440
 
 
441
 
 
442
class ChrootServerTest(TestCase):
416
443
 
417
444
    def test_construct(self):
418
445
        backing_transport = memory.MemoryTransport()
419
 
        server = chroot.ChrootServer(backing_transport)
 
446
        server = ChrootServer(backing_transport)
420
447
        self.assertEqual(backing_transport, server.backing_transport)
421
448
 
422
449
    def test_setUp(self):
423
450
        backing_transport = memory.MemoryTransport()
424
 
        server = chroot.ChrootServer(backing_transport)
 
451
        server = ChrootServer(backing_transport)
425
452
        server.start_server()
426
 
        self.addCleanup(server.stop_server)
427
 
        self.assertTrue(server.scheme
428
 
                        in transport._get_protocol_handlers().keys())
 
453
        try:
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
455
        finally:
 
456
            server.stop_server()
429
457
 
430
458
    def test_stop_server(self):
431
459
        backing_transport = memory.MemoryTransport()
432
 
        server = chroot.ChrootServer(backing_transport)
 
460
        server = ChrootServer(backing_transport)
433
461
        server.start_server()
434
462
        server.stop_server()
435
 
        self.assertFalse(server.scheme
436
 
                         in transport._get_protocol_handlers().keys())
 
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
437
464
 
438
465
    def test_get_url(self):
439
466
        backing_transport = memory.MemoryTransport()
440
 
        server = chroot.ChrootServer(backing_transport)
 
467
        server = ChrootServer(backing_transport)
441
468
        server.start_server()
442
 
        self.addCleanup(server.stop_server)
443
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
444
 
 
445
 
 
446
 
class TestHooks(tests.TestCase):
447
 
    """Basic tests for transport hooks"""
448
 
 
449
 
    def _get_connected_transport(self):
450
 
        return transport.ConnectedTransport("bogus:nowhere")
451
 
 
452
 
    def test_transporthooks_initialisation(self):
453
 
        """Check all expected transport hook points are set up"""
454
 
        hookpoint = transport.TransportHooks()
455
 
        self.assertTrue("post_connect" in hookpoint,
456
 
            "post_connect not in %s" % (hookpoint,))
457
 
 
458
 
    def test_post_connect(self):
459
 
        """Ensure the post_connect hook is called when _set_transport is"""
460
 
        calls = []
461
 
        transport.Transport.hooks.install_named_hook("post_connect",
462
 
            calls.append, None)
463
 
        t = self._get_connected_transport()
464
 
        self.assertLength(0, calls)
465
 
        t._set_connection("connection", "auth")
466
 
        self.assertEqual(calls, [t])
467
 
 
468
 
 
469
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
469
        try:
 
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
471
        finally:
 
472
            server.stop_server()
 
473
 
 
474
 
 
475
class PathFilteringDecoratorTransportTest(TestCase):
470
476
    """Pathfilter decoration specific tests."""
471
477
 
472
478
    def test_abspath(self):
473
479
        # The abspath is always relative to the base of the backing transport.
474
 
        server = pathfilter.PathFilteringServer(
475
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
476
481
            lambda x: x)
477
482
        server.start_server()
478
 
        t = transport.get_transport_from_url(server.get_url())
479
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
480
485
 
481
 
        subdir_t = t.clone('subdir')
482
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
483
488
        server.stop_server()
484
489
 
485
490
    def make_pf_transport(self, filter_func=None):
486
491
        """Make a PathFilteringTransport backed by a MemoryTransport.
487
 
 
 
492
        
488
493
        :param filter_func: by default this will be a no-op function.  Use this
489
494
            parameter to override it."""
490
495
        if filter_func is None:
491
496
            filter_func = lambda x: x
492
 
        server = pathfilter.PathFilteringServer(
493
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
494
499
        server.start_server()
495
500
        self.addCleanup(server.stop_server)
496
 
        return transport.get_transport_from_url(server.get_url())
 
501
        return get_transport(server.get_url())
497
502
 
498
503
    def test__filter(self):
499
504
        # _filter (with an identity func as filter_func) always returns
500
505
        # paths relative to the base of the backing transport.
501
 
        t = self.make_pf_transport()
502
 
        self.assertEqual('foo', t._filter('foo'))
503
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
504
 
        self.assertEqual('', t._filter('..'))
505
 
        self.assertEqual('', t._filter('/'))
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
506
511
        # The base of the pathfiltering transport is taken into account too.
507
 
        t = t.clone('subdir1/subdir2')
508
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
509
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
510
 
        self.assertEqual('subdir1', t._filter('..'))
511
 
        self.assertEqual('', t._filter('/'))
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
512
518
 
513
519
    def test_filter_invocation(self):
514
520
        filter_log = []
515
 
 
516
521
        def filter(path):
517
522
            filter_log.append(path)
518
523
            return path
519
 
        t = self.make_pf_transport(filter)
520
 
        t.has('abc')
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
521
526
        self.assertEqual(['abc'], filter_log)
522
527
        del filter_log[:]
523
 
        t.clone('abc').has('xyz')
 
528
        transport.clone('abc').has('xyz')
524
529
        self.assertEqual(['abc/xyz'], filter_log)
525
530
        del filter_log[:]
526
 
        t.has('/abc')
 
531
        transport.has('/abc')
527
532
        self.assertEqual(['abc'], filter_log)
528
533
 
529
534
    def test_clone(self):
530
 
        t = self.make_pf_transport()
 
535
        transport = self.make_pf_transport()
531
536
        # relpath from root and root path are the same
532
 
        relpath_cloned = t.clone('foo')
533
 
        abspath_cloned = t.clone('/foo')
534
 
        self.assertEqual(t.server, relpath_cloned.server)
535
 
        self.assertEqual(t.server, abspath_cloned.server)
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
536
541
 
537
542
    def test_url_preserves_pathfiltering(self):
538
543
        """Calling get_transport on a pathfiltered transport's base should
543
548
        otherwise) the filtering by doing::
544
549
            url = filtered_transport.base
545
550
            parent_url = urlutils.join(url, '..')
546
 
            new_t = transport.get_transport_from_url(parent_url)
 
551
            new_transport = get_transport(parent_url)
547
552
        """
548
 
        t = self.make_pf_transport()
549
 
        new_t = transport.get_transport_from_url(t.base)
550
 
        self.assertEqual(t.server, new_t.server)
551
 
        self.assertEqual(t.base, new_t.base)
552
 
 
553
 
 
554
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
 
557
 
 
558
 
 
559
class ReadonlyDecoratorTransportTest(TestCase):
555
560
    """Readonly decoration specific tests."""
556
561
 
557
562
    def test_local_parameters(self):
558
563
        # connect to . in readonly mode
559
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
560
 
        self.assertEqual(True, t.listable())
561
 
        self.assertEqual(True, t.is_readonly())
 
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
565
        self.assertEqual(True, transport.listable())
 
566
        self.assertEqual(True, transport.is_readonly())
562
567
 
563
568
    def test_http_parameters(self):
564
569
        from bzrlib.tests.http_server import HttpServer
565
570
        # connect to '.' via http which is not listable
566
571
        server = HttpServer()
567
572
        self.start_server(server)
568
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
569
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
570
 
        self.assertEqual(False, t.listable())
571
 
        self.assertEqual(True, t.is_readonly())
572
 
 
573
 
 
574
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
575
581
    """NFS decorator specific tests."""
576
582
 
577
583
    def get_nfs_transport(self, url):
581
587
    def test_local_parameters(self):
582
588
        # the listable and is_readonly parameters
583
589
        # are not changed by the fakenfs decorator
584
 
        t = self.get_nfs_transport('.')
585
 
        self.assertEqual(True, t.listable())
586
 
        self.assertEqual(False, t.is_readonly())
 
590
        transport = self.get_nfs_transport('.')
 
591
        self.assertEqual(True, transport.listable())
 
592
        self.assertEqual(False, transport.is_readonly())
587
593
 
588
594
    def test_http_parameters(self):
589
595
        # the listable and is_readonly parameters
592
598
        # connect to '.' via http which is not listable
593
599
        server = HttpServer()
594
600
        self.start_server(server)
595
 
        t = self.get_nfs_transport(server.get_url())
596
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
597
 
        self.assertEqual(False, t.listable())
598
 
        self.assertEqual(True, t.is_readonly())
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
599
606
 
600
607
    def test_fakenfs_server_default(self):
601
608
        # a FakeNFSServer() should bring up a local relpath server for itself
602
 
        server = test_server.FakeNFSServer()
 
609
        server = fakenfs.FakeNFSServer()
603
610
        self.start_server(server)
604
611
        # the url should be decorated appropriately
605
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
606
613
        # and we should be able to get a transport for it
607
 
        t = transport.get_transport_from_url(server.get_url())
 
614
        transport = get_transport(server.get_url())
608
615
        # which must be a FakeNFSTransportDecorator instance.
609
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
610
617
 
611
618
    def test_fakenfs_rename_semantics(self):
612
619
        # a FakeNFS transport must mangle the way rename errors occur to
613
620
        # look like NFS problems.
614
 
        t = self.get_nfs_transport('.')
 
621
        transport = self.get_nfs_transport('.')
615
622
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
616
 
                        transport=t)
617
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
618
 
 
619
 
 
620
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
623
                        transport=transport)
 
624
        self.assertRaises(errors.ResourceBusy,
 
625
                          transport.rename, 'from', 'to')
 
626
 
 
627
 
 
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
621
629
    """Tests for simulation of VFAT restrictions"""
622
630
 
623
631
    def get_vfat_transport(self, url):
627
635
 
628
636
    def test_transport_creation(self):
629
637
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
630
 
        t = self.get_vfat_transport('.')
631
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
638
        transport = self.get_vfat_transport('.')
 
639
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
632
640
 
633
641
    def test_transport_mkdir(self):
634
 
        t = self.get_vfat_transport('.')
635
 
        t.mkdir('HELLO')
636
 
        self.assertTrue(t.has('hello'))
637
 
        self.assertTrue(t.has('Hello'))
 
642
        transport = self.get_vfat_transport('.')
 
643
        transport.mkdir('HELLO')
 
644
        self.assertTrue(transport.has('hello'))
 
645
        self.assertTrue(transport.has('Hello'))
638
646
 
639
647
    def test_forbidden_chars(self):
640
 
        t = self.get_vfat_transport('.')
641
 
        self.assertRaises(ValueError, t.has, "<NU>")
642
 
 
643
 
 
644
 
class BadTransportHandler(transport.Transport):
 
648
        transport = self.get_vfat_transport('.')
 
649
        self.assertRaises(ValueError, transport.has, "<NU>")
 
650
 
 
651
 
 
652
class BadTransportHandler(Transport):
645
653
    def __init__(self, base_url):
646
 
        raise errors.DependencyNotPresent('some_lib',
647
 
                                          'testing missing dependency')
648
 
 
649
 
 
650
 
class BackupTransportHandler(transport.Transport):
 
654
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
655
 
 
656
 
 
657
class BackupTransportHandler(Transport):
651
658
    """Test transport that works as a backup for the BadTransportHandler"""
652
659
    pass
653
660
 
654
661
 
655
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
662
class TestTransportImplementation(TestCaseInTempDir):
656
663
    """Implementation verification for transports.
657
664
 
658
665
    To verify a transport we need a server factory, which is a callable
687
694
        base_url = self._server.get_url()
688
695
        url = self._adjust_url(base_url, relpath)
689
696
        # try getting the transport via the regular interface:
690
 
        t = transport.get_transport_from_url(url)
 
697
        t = get_transport(url)
691
698
        # vila--20070607 if the following are commented out the test suite
692
699
        # still pass. Is this really still needed or was it a forgotten
693
700
        # temporary fix ?
698
705
        return t
699
706
 
700
707
 
701
 
class TestTransportFromPath(tests.TestCaseInTempDir):
702
 
 
703
 
    def test_with_path(self):
704
 
        t = transport.get_transport_from_path(self.test_dir)
705
 
        self.assertIsInstance(t, local.LocalTransport)
706
 
        self.assertEquals(t.base.rstrip("/"),
707
 
            urlutils.local_path_to_url(self.test_dir))
708
 
 
709
 
    def test_with_url(self):
710
 
        t = transport.get_transport_from_path("file:")
711
 
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEquals(t.base.rstrip("/"),
713
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
714
 
 
715
 
 
716
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
717
 
 
718
 
    def test_with_path(self):
719
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
720
 
            self.test_dir)
721
 
 
722
 
    def test_with_url(self):
723
 
        url = urlutils.local_path_to_url(self.test_dir)
724
 
        t = transport.get_transport_from_url(url)
725
 
        self.assertIsInstance(t, local.LocalTransport)
726
 
        self.assertEquals(t.base.rstrip("/"), url)
727
 
 
728
 
    def test_with_url_and_segment_parameters(self):
729
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
730
 
        t = transport.get_transport_from_url(url)
731
 
        self.assertIsInstance(t, local.LocalTransport)
732
 
        self.assertEquals(t.base.rstrip("/"), url)
733
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
734
 
            f.write("data")
735
 
        self.assertTrue(t.has("afile"))
736
 
 
737
 
 
738
 
class TestLocalTransports(tests.TestCase):
 
708
class TestLocalTransports(TestCase):
739
709
 
740
710
    def test_get_transport_from_abspath(self):
741
711
        here = osutils.abspath('.')
742
 
        t = transport.get_transport(here)
743
 
        self.assertIsInstance(t, local.LocalTransport)
 
712
        t = get_transport(here)
 
713
        self.assertIsInstance(t, LocalTransport)
744
714
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
745
715
 
746
716
    def test_get_transport_from_relpath(self):
747
717
        here = osutils.abspath('.')
748
 
        t = transport.get_transport('.')
749
 
        self.assertIsInstance(t, local.LocalTransport)
 
718
        t = get_transport('.')
 
719
        self.assertIsInstance(t, LocalTransport)
750
720
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
751
721
 
752
722
    def test_get_transport_from_local_url(self):
753
723
        here = osutils.abspath('.')
754
724
        here_url = urlutils.local_path_to_url(here) + '/'
755
 
        t = transport.get_transport(here_url)
756
 
        self.assertIsInstance(t, local.LocalTransport)
 
725
        t = get_transport(here_url)
 
726
        self.assertIsInstance(t, LocalTransport)
757
727
        self.assertEquals(t.base, here_url)
758
728
 
759
729
    def test_local_abspath(self):
760
730
        here = osutils.abspath('.')
761
 
        t = transport.get_transport(here)
 
731
        t = get_transport(here)
762
732
        self.assertEquals(t.local_abspath(''), here)
763
733
 
764
734
 
765
 
class TestLocalTransportMutation(tests.TestCaseInTempDir):
766
 
 
767
 
    def test_local_transport_mkdir(self):
768
 
        here = osutils.abspath('.')
769
 
        t = transport.get_transport(here)
770
 
        t.mkdir('test')
771
 
        self.assertTrue(os.path.exists('test'))
772
 
 
773
 
    def test_local_transport_mkdir_permission_denied(self):
774
 
        # See https://bugs.launchpad.net/bzr/+bug/606537
775
 
        here = osutils.abspath('.')
776
 
        t = transport.get_transport(here)
777
 
        def fake_chmod(path, mode):
778
 
            e = OSError('permission denied')
779
 
            e.errno = errno.EPERM
780
 
            raise e
781
 
        self.overrideAttr(os, 'chmod', fake_chmod)
782
 
        t.mkdir('test')
783
 
        t.mkdir('test2', mode=0707)
784
 
        self.assertTrue(os.path.exists('test'))
785
 
        self.assertTrue(os.path.exists('test2'))
786
 
 
787
 
 
788
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
789
 
 
790
 
    def test_local_fdatasync_calls_fdatasync(self):
791
 
        """Check fdatasync on a stream tries to flush the data to the OS.
792
 
        
793
 
        We can't easily observe the external effect but we can at least see
794
 
        it's called.
795
 
        """
796
 
        sentinel = object()
797
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
798
 
        if fdatasync is sentinel:
799
 
            raise tests.TestNotApplicable('fdatasync not supported')
800
 
        t = self.get_transport('.')
801
 
        calls = self.recordCalls(os, 'fdatasync')
802
 
        w = t.open_write_stream('out')
803
 
        w.write('foo')
804
 
        w.fdatasync()
805
 
        with open('out', 'rb') as f:
806
 
            # Should have been flushed.
807
 
            self.assertEquals(f.read(), 'foo')
808
 
        self.assertEquals(len(calls), 1, calls)
809
 
 
810
 
    def test_missing_directory(self):
811
 
        t = self.get_transport('.')
812
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
813
 
 
814
 
 
815
 
class TestWin32LocalTransport(tests.TestCase):
 
735
class TestWin32LocalTransport(TestCase):
816
736
 
817
737
    def test_unc_clone_to_root(self):
818
738
        # Win32 UNC path like \\HOST\path
819
739
        # clone to root should stop at least at \\HOST part
820
740
        # not on \\
821
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
741
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
822
742
        for i in xrange(4):
823
743
            t = t.clone('..')
824
744
        self.assertEquals(t.base, 'file://HOST/')
827
747
        self.assertEquals(t.base, 'file://HOST/')
828
748
 
829
749
 
830
 
class TestConnectedTransport(tests.TestCase):
 
750
class TestConnectedTransport(TestCase):
831
751
    """Tests for connected to remote server transports"""
832
752
 
833
753
    def test_parse_url(self):
834
 
        t = transport.ConnectedTransport(
835
 
            'http://simple.example.com/home/source')
836
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
837
 
        self.assertEquals(t._parsed_url.port, None)
838
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
839
 
        self.assertTrue(t._parsed_url.user is None)
840
 
        self.assertTrue(t._parsed_url.password is None)
 
754
        t = ConnectedTransport('http://simple.example.com/home/source')
 
755
        self.assertEquals(t._host, 'simple.example.com')
 
756
        self.assertEquals(t._port, None)
 
757
        self.assertEquals(t._path, '/home/source/')
 
758
        self.failUnless(t._user is None)
 
759
        self.failUnless(t._password is None)
841
760
 
842
761
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
843
762
 
844
763
    def test_parse_url_with_at_in_user(self):
845
764
        # Bug 228058
846
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
847
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
765
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
766
        self.assertEquals(t._user, 'user@host.com')
848
767
 
849
768
    def test_parse_quoted_url(self):
850
 
        t = transport.ConnectedTransport(
851
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
852
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
853
 
        self.assertEquals(t._parsed_url.port, 2222)
854
 
        self.assertEquals(t._parsed_url.user, 'robey')
855
 
        self.assertEquals(t._parsed_url.password, 'h@t')
856
 
        self.assertEquals(t._parsed_url.path, '/path/')
 
769
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
770
        self.assertEquals(t._host, 'exAmple.com')
 
771
        self.assertEquals(t._port, 2222)
 
772
        self.assertEquals(t._user, 'robey')
 
773
        self.assertEquals(t._password, 'h@t')
 
774
        self.assertEquals(t._path, '/path/')
857
775
 
858
776
        # Base should not keep track of the password
859
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
777
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
860
778
 
861
779
    def test_parse_invalid_url(self):
862
780
        self.assertRaises(errors.InvalidURL,
863
 
                          transport.ConnectedTransport,
 
781
                          ConnectedTransport,
864
782
                          'sftp://lily.org:~janneke/public/bzr/gub')
865
783
 
866
784
    def test_relpath(self):
867
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
785
        t = ConnectedTransport('sftp://user@host.com/abs/path')
868
786
 
869
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
870
 
            'sub')
 
787
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
871
788
        self.assertRaises(errors.PathNotChild, t.relpath,
872
789
                          'http://user@host.com/abs/path/sub')
873
790
        self.assertRaises(errors.PathNotChild, t.relpath,
877
794
        self.assertRaises(errors.PathNotChild, t.relpath,
878
795
                          'sftp://user@host.com:33/abs/path/sub')
879
796
        # Make sure it works when we don't supply a username
880
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
797
        t = ConnectedTransport('sftp://host.com/abs/path')
881
798
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
882
799
 
883
800
        # Make sure it works when parts of the path will be url encoded
884
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
801
        t = ConnectedTransport('sftp://host.com/dev/%path')
885
802
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
886
803
 
887
804
    def test_connection_sharing_propagate_credentials(self):
888
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
889
 
        self.assertEquals('user', t._parsed_url.user)
890
 
        self.assertEquals('host.com', t._parsed_url.host)
 
805
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
806
        self.assertEquals('user', t._user)
 
807
        self.assertEquals('host.com', t._host)
891
808
        self.assertIs(None, t._get_connection())
892
 
        self.assertIs(None, t._parsed_url.password)
 
809
        self.assertIs(None, t._password)
893
810
        c = t.clone('subdir')
894
811
        self.assertIs(None, c._get_connection())
895
 
        self.assertIs(None, t._parsed_url.password)
 
812
        self.assertIs(None, t._password)
896
813
 
897
814
        # Simulate the user entering a password
898
815
        password = 'secret'
912
829
        self.assertIs(new_password, c._get_credentials())
913
830
 
914
831
 
915
 
class TestReusedTransports(tests.TestCase):
 
832
class TestReusedTransports(TestCase):
916
833
    """Tests for transport reuse"""
917
834
 
918
835
    def test_reuse_same_transport(self):
919
836
        possible_transports = []
920
 
        t1 = transport.get_transport_from_url('http://foo/',
921
 
                                     possible_transports=possible_transports)
 
837
        t1 = get_transport('http://foo/',
 
838
                           possible_transports=possible_transports)
922
839
        self.assertEqual([t1], possible_transports)
923
 
        t2 = transport.get_transport_from_url('http://foo/',
924
 
                                     possible_transports=[t1])
 
840
        t2 = get_transport('http://foo/', possible_transports=[t1])
925
841
        self.assertIs(t1, t2)
926
842
 
927
843
        # Also check that final '/' are handled correctly
928
 
        t3 = transport.get_transport_from_url('http://foo/path/')
929
 
        t4 = transport.get_transport_from_url('http://foo/path',
930
 
                                     possible_transports=[t3])
 
844
        t3 = get_transport('http://foo/path/')
 
845
        t4 = get_transport('http://foo/path', possible_transports=[t3])
931
846
        self.assertIs(t3, t4)
932
847
 
933
 
        t5 = transport.get_transport_from_url('http://foo/path')
934
 
        t6 = transport.get_transport_from_url('http://foo/path/',
935
 
                                     possible_transports=[t5])
 
848
        t5 = get_transport('http://foo/path')
 
849
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
936
850
        self.assertIs(t5, t6)
937
851
 
938
852
    def test_don_t_reuse_different_transport(self):
939
 
        t1 = transport.get_transport_from_url('http://foo/path')
940
 
        t2 = transport.get_transport_from_url('http://bar/path',
941
 
                                     possible_transports=[t1])
 
853
        t1 = get_transport('http://foo/path')
 
854
        t2 = get_transport('http://bar/path', possible_transports=[t1])
942
855
        self.assertIsNot(t1, t2)
943
856
 
944
857
 
945
 
class TestTransportTrace(tests.TestCase):
 
858
class TestTransportTrace(TestCase):
946
859
 
947
 
    def test_decorator(self):
948
 
        t = transport.get_transport_from_url('trace+memory://')
 
860
    def test_get(self):
 
861
        transport = get_transport('trace+memory://')
949
862
        self.assertIsInstance(
950
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
863
            transport, bzrlib.transport.trace.TransportTraceDecorator)
951
864
 
952
865
    def test_clone_preserves_activity(self):
953
 
        t = transport.get_transport_from_url('trace+memory://')
954
 
        t2 = t.clone('.')
955
 
        self.assertTrue(t is not t2)
956
 
        self.assertTrue(t._activity is t2._activity)
 
866
        transport = get_transport('trace+memory://')
 
867
        transport2 = transport.clone('.')
 
868
        self.assertTrue(transport is not transport2)
 
869
        self.assertTrue(transport._activity is transport2._activity)
957
870
 
958
871
    # the following specific tests are for the operations that have made use of
959
872
    # logging in tests; we could test every single operation but doing that
960
873
    # still won't cause a test failure when the top level Transport API
961
874
    # changes; so there is little return doing that.
962
875
    def test_get(self):
963
 
        t = transport.get_transport_from_url('trace+memory:///')
964
 
        t.put_bytes('foo', 'barish')
965
 
        t.get('foo')
 
876
        transport = get_transport('trace+memory:///')
 
877
        transport.put_bytes('foo', 'barish')
 
878
        transport.get('foo')
966
879
        expected_result = []
967
880
        # put_bytes records the bytes, not the content to avoid memory
968
881
        # pressure.
969
882
        expected_result.append(('put_bytes', 'foo', 6, None))
970
883
        # get records the file name only.
971
884
        expected_result.append(('get', 'foo'))
972
 
        self.assertEqual(expected_result, t._activity)
 
885
        self.assertEqual(expected_result, transport._activity)
973
886
 
974
887
    def test_readv(self):
975
 
        t = transport.get_transport_from_url('trace+memory:///')
976
 
        t.put_bytes('foo', 'barish')
977
 
        list(t.readv('foo', [(0, 1), (3, 2)],
978
 
                     adjust_for_latency=True, upper_limit=6))
 
888
        transport = get_transport('trace+memory:///')
 
889
        transport.put_bytes('foo', 'barish')
 
890
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
891
            upper_limit=6))
979
892
        expected_result = []
980
893
        # put_bytes records the bytes, not the content to avoid memory
981
894
        # pressure.
982
895
        expected_result.append(('put_bytes', 'foo', 6, None))
983
896
        # readv records the supplied offset request
984
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
985
 
        self.assertEqual(expected_result, t._activity)
 
898
        self.assertEqual(expected_result, transport._activity)
986
899
 
987
900
 
988
901
class TestSSHConnections(tests.TestCaseWithTransport):
989
902
 
990
903
    def test_bzr_connect_to_bzr_ssh(self):
991
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
904
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
992
905
 
993
906
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
994
907
        """
1001
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
1002
915
        # override the interface (doesn't change self._vendor).
1003
916
        # Note that this does encryption, so can be slow.
1004
 
        from bzrlib.tests import stub_sftp
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
1005
919
 
1006
920
        # Start an SSH server
1007
921
        self.command_executed = []
1010
924
        # SSH channel ourselves.  Surely this has already been implemented
1011
925
        # elsewhere?
1012
926
        started = []
1013
 
 
1014
 
        class StubSSHServer(stub_sftp.StubServer):
 
927
        class StubSSHServer(StubServer):
1015
928
 
1016
929
            test = self
1017
930
 
1022
935
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1023
936
 
1024
937
                # XXX: horribly inefficient, not to mention ugly.
1025
 
                # Start a thread for each of stdin/out/err, and relay bytes
1026
 
                # from the subprocess to channel and vice versa.
 
938
                # Start a thread for each of stdin/out/err, and relay bytes from
 
939
                # the subprocess to channel and vice versa.
1027
940
                def ferry_bytes(read, write, close):
1028
941
                    while True:
1029
942
                        bytes = read(1)
1045
958
 
1046
959
                return True
1047
960
 
1048
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
1049
962
        # We *don't* want to override the default SSH vendor: the detected one
1050
963
        # is the one to use.
1051
 
 
1052
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1053
 
        # inherits from SFTPServer which forces the SSH vendor to
1054
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1055
964
        self.start_server(ssh_server)
1056
 
        port = ssh_server.port
 
965
        port = ssh_server._listener.port
1057
966
 
1058
967
        if sys.platform == 'win32':
1059
968
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1060
969
        else:
1061
970
            bzr_remote_path = self.get_bzr_path()
1062
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
971
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1063
972
 
1064
973
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1065
974
        # variable is used to tell bzr what command to run on the remote end.
1069
978
            # prefix a '/' to get the right path.
1070
979
            path_to_branch = '/' + path_to_branch
1071
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1072
 
        t = transport.get_transport(url)
 
981
        t = get_transport(url)
1073
982
        self.permit_url(t.base)
1074
983
        t.mkdir('foo')
1075
984
 
1086
995
        # And the rest are threads
1087
996
        for t in started[1:]:
1088
997
            t.join()
1089
 
 
1090
 
 
1091
 
class TestUnhtml(tests.TestCase):
1092
 
 
1093
 
    """Tests for unhtml_roughly"""
1094
 
 
1095
 
    def test_truncation(self):
1096
 
        fake_html = "<p>something!\n" * 1000
1097
 
        result = http.unhtml_roughly(fake_html)
1098
 
        self.assertEquals(len(result), 1000)
1099
 
        self.assertStartsWith(result, " something!")
1100
 
 
1101
 
 
1102
 
class SomeDirectory(object):
1103
 
 
1104
 
    def look_up(self, name, url):
1105
 
        return "http://bar"
1106
 
 
1107
 
 
1108
 
class TestLocationToUrl(tests.TestCase):
1109
 
 
1110
 
    def get_base_location(self):
1111
 
        path = osutils.abspath('/foo/bar')
1112
 
        if path.startswith('/'):
1113
 
            url = 'file://%s' % (path,)
1114
 
        else:
1115
 
            # On Windows, abspaths start with the drive letter, so we have to
1116
 
            # add in the extra '/'
1117
 
            url = 'file:///%s' % (path,)
1118
 
        return path, url
1119
 
 
1120
 
    def test_regular_url(self):
1121
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1122
 
 
1123
 
    def test_directory(self):
1124
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1125
 
        self.addCleanup(directories.remove, "bar:")
1126
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1127
 
 
1128
 
    def test_unicode_url(self):
1129
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1130
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1131
 
 
1132
 
    def test_unicode_path(self):
1133
 
        path, url = self.get_base_location()
1134
 
        location = path + "\xc3\xaf".decode("utf-8")
1135
 
        url += '%C3%AF'
1136
 
        self.assertEquals(url, location_to_url(location))
1137
 
 
1138
 
    def test_path(self):
1139
 
        path, url = self.get_base_location()
1140
 
        self.assertEquals(url, location_to_url(path))
1141
 
 
1142
 
    def test_relative_file_url(self):
1143
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1144
 
            location_to_url("file:bar"))
1145
 
 
1146
 
    def test_absolute_file_url(self):
1147
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))