~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Robert Collins
  • Date: 2009-09-07 03:08:30 UTC
  • mto: This revision was merged to the branch mainline in revision 4690.
  • Revision ID: robertc@robertcollins.net-20090907030830-rf59kt28d550eauj
Milestones language tightning, internal consistency.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
import os
19
 
import sys
20
 
import stat
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
21
18
from cStringIO import StringIO
22
19
 
23
 
from bzrlib.errors import (NoSuchFile, FileExists,
24
 
                           TransportNotPossible, ConnectionError)
25
 
from bzrlib.tests import TestCase
26
 
from bzrlib.transport import (_get_protocol_handlers,
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
27
40
                              _get_transport_modules,
28
41
                              get_transport,
 
42
                              LateReadError,
29
43
                              register_lazy_transport,
30
 
                              _set_protocol_handlers,
31
 
                              urlescape,
 
44
                              register_transport_proto,
 
45
                              Transport,
32
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
33
54
 
34
55
 
35
56
class TestTransport(TestCase):
36
57
    """Test the non transport-concrete class functionality."""
37
58
 
38
 
    def test_urlescape(self):
39
 
        self.assertEqual('%25', urlescape('%'))
40
 
 
41
59
    def test__get_set_protocol_handlers(self):
42
60
        handlers = _get_protocol_handlers()
43
 
        self.assertNotEqual({}, handlers)
 
61
        self.assertNotEqual([], handlers.keys( ))
44
62
        try:
45
 
            _set_protocol_handlers({})
46
 
            self.assertEqual({}, _get_protocol_handlers())
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
47
65
        finally:
48
66
            _set_protocol_handlers(handlers)
49
67
 
50
68
    def test_get_transport_modules(self):
51
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
52
72
        class SampleHandler(object):
53
73
            """I exist, isnt that enough?"""
54
74
        try:
55
 
            my_handlers = {}
56
 
            _set_protocol_handlers(my_handlers)
57
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
58
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
59
 
            self.assertEqual([SampleHandler.__module__],
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
60
84
                             _get_transport_modules())
61
85
        finally:
62
86
            _set_protocol_handlers(handlers)
63
 
            
64
 
 
65
 
class MemoryTransportTest(TestCase):
66
 
    """Memory transport specific tests."""
 
87
 
 
88
    def test_transport_dependency(self):
 
89
        """Transport with missing dependency causes no error"""
 
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
 
93
        try:
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
 
110
 
 
111
    def test_transport_fallback(self):
 
112
        """Transport with missing dependency causes no error"""
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
 
125
 
 
126
    def test_ssh_hints(self):
 
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
128
        try:
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
 
131
            e_str = str(e)
 
132
            self.assertEquals('Unsupported protocol'
 
133
                              ' for url "ssh://fooserver/foo":'
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
135
                              str(e))
 
136
        else:
 
137
            self.fail('Did not raise UnsupportedProtocol')
 
138
 
 
139
    def test_LateReadError(self):
 
140
        """The LateReadError helper should raise on read()."""
 
141
        a_file = LateReadError('a path')
 
142
        try:
 
143
            a_file.read()
 
144
        except ReadError, error:
 
145
            self.assertEqual('a path', error.path)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
 
147
        a_file.close()
 
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
 
160
    def test_local_abspath_non_local_transport(self):
 
161
        # the base implementation should throw
 
162
        t = MemoryTransport()
 
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
164
        self.assertEqual('memory:///t is not a local path.', str(e))
 
165
 
 
166
 
 
167
class TestCoalesceOffsets(TestCase):
 
168
 
 
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
170
        coalesce = Transport._coalesce_offsets
 
171
        exp = [_CoalescedOffset(*x) for x in expected]
 
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
173
                            max_size=max_size))
 
174
        self.assertEqual(exp, out)
 
175
 
 
176
    def test_coalesce_empty(self):
 
177
        self.check([], [])
 
178
 
 
179
    def test_coalesce_simple(self):
 
180
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
181
 
 
182
    def test_coalesce_unrelated(self):
 
183
        self.check([(0, 10, [(0, 10)]),
 
184
                    (20, 10, [(0, 10)]),
 
185
                   ], [(0, 10), (20, 10)])
 
186
 
 
187
    def test_coalesce_unsorted(self):
 
188
        self.check([(20, 10, [(0, 10)]),
 
189
                    (0, 10, [(0, 10)]),
 
190
                   ], [(20, 10), (0, 10)])
 
191
 
 
192
    def test_coalesce_nearby(self):
 
193
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
194
                   [(0, 10), (10, 10)])
 
195
 
 
196
    def test_coalesce_overlapped(self):
 
197
        self.assertRaises(ValueError,
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
 
200
 
 
201
    def test_coalesce_limit(self):
 
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
203
                              (30, 10), (40, 10)]),
 
204
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
205
                              (30, 10), (40, 10)]),
 
206
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
207
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
208
                       (90, 10), (100, 10)],
 
209
                    limit=5)
 
210
 
 
211
    def test_coalesce_no_limit(self):
 
212
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
213
                               (30, 10), (40, 10), (50, 10),
 
214
                               (60, 10), (70, 10), (80, 10),
 
215
                               (90, 10)]),
 
216
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
217
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
218
                       (90, 10), (100, 10)])
 
219
 
 
220
    def test_coalesce_fudge(self):
 
221
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
222
                    (100, 10, [(0, 10),]),
 
223
                   ], [(10, 10), (30, 10), (100, 10)],
 
224
                   fudge=10
 
225
                  )
 
226
    def test_coalesce_max_size(self):
 
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
228
                    (30, 50, [(0, 50)]),
 
229
                    # If one range is above max_size, it gets its own coalesced
 
230
                    # offset
 
231
                    (100, 80, [(0, 80),]),],
 
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
233
                   max_size=50
 
234
                  )
 
235
 
 
236
    def test_coalesce_no_max_size(self):
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
239
                  )
 
240
 
 
241
    def test_coalesce_default_limit(self):
 
242
        # By default we use a 100MB max size.
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
 
250
 
 
251
 
 
252
class TestMemoryTransport(TestCase):
 
253
 
 
254
    def test_get_transport(self):
 
255
        MemoryTransport()
 
256
 
 
257
    def test_clone(self):
 
258
        transport = MemoryTransport()
 
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
 
261
 
 
262
    def test_abspath(self):
 
263
        transport = MemoryTransport()
 
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
265
 
 
266
    def test_abspath_of_root(self):
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
 
270
 
 
271
    def test_abspath_of_relpath_starting_at_root(self):
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
274
 
 
275
    def test_append_and_get(self):
 
276
        transport = MemoryTransport()
 
277
        transport.append_bytes('path', 'content')
 
278
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        transport.append_file('path', StringIO('content'))
 
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
281
 
 
282
    def test_put_and_get(self):
 
283
        transport = MemoryTransport()
 
284
        transport.put_file('path', StringIO('content'))
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.put_bytes('path', 'content')
 
287
        self.assertEqual(transport.get('path').read(), 'content')
 
288
 
 
289
    def test_append_without_dir_fails(self):
 
290
        transport = MemoryTransport()
 
291
        self.assertRaises(NoSuchFile,
 
292
                          transport.append_bytes, 'dir/path', 'content')
 
293
 
 
294
    def test_put_without_dir_fails(self):
 
295
        transport = MemoryTransport()
 
296
        self.assertRaises(NoSuchFile,
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
 
298
 
 
299
    def test_get_missing(self):
 
300
        transport = MemoryTransport()
 
301
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
302
 
 
303
    def test_has_missing(self):
 
304
        transport = MemoryTransport()
 
305
        self.assertEquals(False, transport.has('foo'))
 
306
 
 
307
    def test_has_present(self):
 
308
        transport = MemoryTransport()
 
309
        transport.append_bytes('foo', 'content')
 
310
        self.assertEquals(True, transport.has('foo'))
 
311
 
 
312
    def test_list_dir(self):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
 
318
 
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
321
 
 
322
    def test_mkdir(self):
 
323
        transport = MemoryTransport()
 
324
        transport.mkdir('dir')
 
325
        transport.append_bytes('dir/path', 'content')
 
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
327
 
 
328
    def test_mkdir_missing_parent(self):
 
329
        transport = MemoryTransport()
 
330
        self.assertRaises(NoSuchFile,
 
331
                          transport.mkdir, 'dir/dir')
 
332
 
 
333
    def test_mkdir_twice(self):
 
334
        transport = MemoryTransport()
 
335
        transport.mkdir('dir')
 
336
        self.assertRaises(FileExists, transport.mkdir, 'dir')
67
337
 
68
338
    def test_parameters(self):
69
 
        import bzrlib.transport.memory as memory
70
 
        transport = memory.MemoryTransport()
 
339
        transport = MemoryTransport()
71
340
        self.assertEqual(True, transport.listable())
72
 
        self.assertEqual(False, transport.should_cache())
73
341
        self.assertEqual(False, transport.is_readonly())
74
 
        
75
 
        
 
342
 
 
343
    def test_iter_files_recursive(self):
 
344
        transport = MemoryTransport()
 
345
        transport.mkdir('dir')
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
 
349
        paths = set(transport.iter_files_recursive())
 
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
351
 
 
352
    def test_stat(self):
 
353
        transport = MemoryTransport()
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
 
356
        self.assertEqual(7, transport.stat('foo').st_size)
 
357
        self.assertEqual(6, transport.stat('bar').st_size)
 
358
 
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
 
361
    """Chroot decoration specific tests."""
 
362
 
 
363
    def test_abspath(self):
 
364
        # The abspath is always relative to the chroot_url.
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        self.start_server(server)
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
369
 
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
 
 
373
    def test_clone(self):
 
374
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
375
        self.start_server(server)
 
376
        transport = get_transport(server.get_url())
 
377
        # relpath from root and root path are the same
 
378
        relpath_cloned = transport.clone('foo')
 
379
        abspath_cloned = transport.clone('/foo')
 
380
        self.assertEqual(server, relpath_cloned.server)
 
381
        self.assertEqual(server, abspath_cloned.server)
 
382
 
 
383
    def test_chroot_url_preserves_chroot(self):
 
384
        """Calling get_transport on a chroot transport's base should produce a
 
385
        transport with exactly the same behaviour as the original chroot
 
386
        transport.
 
387
 
 
388
        This is so that it is not possible to escape a chroot by doing::
 
389
            url = chroot_transport.base
 
390
            parent_url = urlutils.join(url, '..')
 
391
            new_transport = get_transport(parent_url)
 
392
        """
 
393
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
394
        self.start_server(server)
 
395
        transport = get_transport(server.get_url())
 
396
        new_transport = get_transport(transport.base)
 
397
        self.assertEqual(transport.server, new_transport.server)
 
398
        self.assertEqual(transport.base, new_transport.base)
 
399
 
 
400
    def test_urljoin_preserves_chroot(self):
 
401
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
402
        URL that escapes the intended chroot.
 
403
 
 
404
        This is so that it is not possible to escape a chroot by doing::
 
405
            url = chroot_transport.base
 
406
            parent_url = urlutils.join(url, '..')
 
407
            new_transport = get_transport(parent_url)
 
408
        """
 
409
        server = ChrootServer(get_transport('memory:///path/'))
 
410
        self.start_server(server)
 
411
        transport = get_transport(server.get_url())
 
412
        self.assertRaises(
 
413
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
414
 
 
415
 
 
416
class ChrootServerTest(TestCase):
 
417
 
 
418
    def test_construct(self):
 
419
        backing_transport = MemoryTransport()
 
420
        server = ChrootServer(backing_transport)
 
421
        self.assertEqual(backing_transport, server.backing_transport)
 
422
 
 
423
    def test_setUp(self):
 
424
        backing_transport = MemoryTransport()
 
425
        server = ChrootServer(backing_transport)
 
426
        server.setUp()
 
427
        try:
 
428
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
429
        finally:
 
430
            server.tearDown()
 
431
 
 
432
    def test_tearDown(self):
 
433
        backing_transport = MemoryTransport()
 
434
        server = ChrootServer(backing_transport)
 
435
        server.setUp()
 
436
        server.tearDown()
 
437
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
438
 
 
439
    def test_get_url(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        try:
 
444
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
445
        finally:
 
446
            server.tearDown()
 
447
 
 
448
 
76
449
class ReadonlyDecoratorTransportTest(TestCase):
77
450
    """Readonly decoration specific tests."""
78
451
 
81
454
        # connect to . in readonly mode
82
455
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
83
456
        self.assertEqual(True, transport.listable())
84
 
        self.assertEqual(False, transport.should_cache())
85
457
        self.assertEqual(True, transport.is_readonly())
86
458
 
87
459
    def test_http_parameters(self):
 
460
        from bzrlib.tests.http_server import HttpServer
88
461
        import bzrlib.transport.readonly as readonly
89
 
        from bzrlib.transport.http import HttpServer
90
 
        # connect to . via http which is not listable
91
 
        server = HttpServer()
92
 
        server.setUp()
93
 
        try:
94
 
            transport = get_transport('readonly+' + server.get_url())
95
 
            self.failUnless(isinstance(transport,
96
 
                                       readonly.ReadonlyTransportDecorator))
97
 
            self.assertEqual(False, transport.listable())
98
 
            self.assertEqual(True, transport.should_cache())
99
 
            self.assertEqual(True, transport.is_readonly())
100
 
        finally:
101
 
            server.tearDown()
 
462
        # connect to '.' via http which is not listable
 
463
        server = HttpServer()
 
464
        self.start_server(server)
 
465
        transport = get_transport('readonly+' + server.get_url())
 
466
        self.failUnless(isinstance(transport,
 
467
                                   readonly.ReadonlyTransportDecorator))
 
468
        self.assertEqual(False, transport.listable())
 
469
        self.assertEqual(True, transport.is_readonly())
 
470
 
 
471
 
 
472
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
473
    """NFS decorator specific tests."""
 
474
 
 
475
    def get_nfs_transport(self, url):
 
476
        import bzrlib.transport.fakenfs as fakenfs
 
477
        # connect to url with nfs decoration
 
478
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
479
 
 
480
    def test_local_parameters(self):
 
481
        # the listable and is_readonly parameters
 
482
        # are not changed by the fakenfs decorator
 
483
        transport = self.get_nfs_transport('.')
 
484
        self.assertEqual(True, transport.listable())
 
485
        self.assertEqual(False, transport.is_readonly())
 
486
 
 
487
    def test_http_parameters(self):
 
488
        # the listable and is_readonly parameters
 
489
        # are not changed by the fakenfs decorator
 
490
        from bzrlib.tests.http_server import HttpServer
 
491
        # connect to '.' via http which is not listable
 
492
        server = HttpServer()
 
493
        self.start_server(server)
 
494
        transport = self.get_nfs_transport(server.get_url())
 
495
        self.assertIsInstance(
 
496
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
497
        self.assertEqual(False, transport.listable())
 
498
        self.assertEqual(True, transport.is_readonly())
 
499
 
 
500
    def test_fakenfs_server_default(self):
 
501
        # a FakeNFSServer() should bring up a local relpath server for itself
 
502
        import bzrlib.transport.fakenfs as fakenfs
 
503
        server = fakenfs.FakeNFSServer()
 
504
        self.start_server(server)
 
505
        # the url should be decorated appropriately
 
506
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
507
        # and we should be able to get a transport for it
 
508
        transport = get_transport(server.get_url())
 
509
        # which must be a FakeNFSTransportDecorator instance.
 
510
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
511
 
 
512
    def test_fakenfs_rename_semantics(self):
 
513
        # a FakeNFS transport must mangle the way rename errors occur to
 
514
        # look like NFS problems.
 
515
        transport = self.get_nfs_transport('.')
 
516
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
517
                        transport=transport)
 
518
        self.assertRaises(errors.ResourceBusy,
 
519
                          transport.rename, 'from', 'to')
 
520
 
 
521
 
 
522
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
523
    """Tests for simulation of VFAT restrictions"""
 
524
 
 
525
    def get_vfat_transport(self, url):
 
526
        """Return vfat-backed transport for test directory"""
 
527
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
528
        return FakeVFATTransportDecorator('vfat+' + url)
 
529
 
 
530
    def test_transport_creation(self):
 
531
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
532
        transport = self.get_vfat_transport('.')
 
533
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
534
 
 
535
    def test_transport_mkdir(self):
 
536
        transport = self.get_vfat_transport('.')
 
537
        transport.mkdir('HELLO')
 
538
        self.assertTrue(transport.has('hello'))
 
539
        self.assertTrue(transport.has('Hello'))
 
540
 
 
541
    def test_forbidden_chars(self):
 
542
        transport = self.get_vfat_transport('.')
 
543
        self.assertRaises(ValueError, transport.has, "<NU>")
 
544
 
 
545
 
 
546
class BadTransportHandler(Transport):
 
547
    def __init__(self, base_url):
 
548
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
549
 
 
550
 
 
551
class BackupTransportHandler(Transport):
 
552
    """Test transport that works as a backup for the BadTransportHandler"""
 
553
    pass
 
554
 
 
555
 
 
556
class TestTransportImplementation(TestCaseInTempDir):
 
557
    """Implementation verification for transports.
 
558
 
 
559
    To verify a transport we need a server factory, which is a callable
 
560
    that accepts no parameters and returns an implementation of
 
561
    bzrlib.transport.Server.
 
562
 
 
563
    That Server is then used to construct transport instances and test
 
564
    the transport via loopback activity.
 
565
 
 
566
    Currently this assumes that the Transport object is connected to the
 
567
    current working directory.  So that whatever is done
 
568
    through the transport, should show up in the working
 
569
    directory, and vice-versa. This is a bug, because its possible to have
 
570
    URL schemes which provide access to something that may not be
 
571
    result in storage on the local disk, i.e. due to file system limits, or
 
572
    due to it being a database or some other non-filesystem tool.
 
573
 
 
574
    This also tests to make sure that the functions work with both
 
575
    generators and lists (assuming iter(list) is effectively a generator)
 
576
    """
 
577
 
 
578
    def setUp(self):
 
579
        super(TestTransportImplementation, self).setUp()
 
580
        self._server = self.transport_server()
 
581
        self.start_server(self._server)
 
582
 
 
583
    def get_transport(self, relpath=None):
 
584
        """Return a connected transport to the local directory.
 
585
 
 
586
        :param relpath: a path relative to the base url.
 
587
        """
 
588
        base_url = self._server.get_url()
 
589
        url = self._adjust_url(base_url, relpath)
 
590
        # try getting the transport via the regular interface:
 
591
        t = get_transport(url)
 
592
        # vila--20070607 if the following are commented out the test suite
 
593
        # still pass. Is this really still needed or was it a forgotten
 
594
        # temporary fix ?
 
595
        if not isinstance(t, self.transport_class):
 
596
            # we did not get the correct transport class type. Override the
 
597
            # regular connection behaviour by direct construction.
 
598
            t = self.transport_class(url)
 
599
        return t
 
600
 
 
601
 
 
602
class TestLocalTransports(TestCase):
 
603
 
 
604
    def test_get_transport_from_abspath(self):
 
605
        here = osutils.abspath('.')
 
606
        t = get_transport(here)
 
607
        self.assertIsInstance(t, LocalTransport)
 
608
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
609
 
 
610
    def test_get_transport_from_relpath(self):
 
611
        here = osutils.abspath('.')
 
612
        t = get_transport('.')
 
613
        self.assertIsInstance(t, LocalTransport)
 
614
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
615
 
 
616
    def test_get_transport_from_local_url(self):
 
617
        here = osutils.abspath('.')
 
618
        here_url = urlutils.local_path_to_url(here) + '/'
 
619
        t = get_transport(here_url)
 
620
        self.assertIsInstance(t, LocalTransport)
 
621
        self.assertEquals(t.base, here_url)
 
622
 
 
623
    def test_local_abspath(self):
 
624
        here = osutils.abspath('.')
 
625
        t = get_transport(here)
 
626
        self.assertEquals(t.local_abspath(''), here)
 
627
 
 
628
 
 
629
class TestWin32LocalTransport(TestCase):
 
630
 
 
631
    def test_unc_clone_to_root(self):
 
632
        # Win32 UNC path like \\HOST\path
 
633
        # clone to root should stop at least at \\HOST part
 
634
        # not on \\
 
635
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
636
        for i in xrange(4):
 
637
            t = t.clone('..')
 
638
        self.assertEquals(t.base, 'file://HOST/')
 
639
        # make sure we reach the root
 
640
        t = t.clone('..')
 
641
        self.assertEquals(t.base, 'file://HOST/')
 
642
 
 
643
 
 
644
class TestConnectedTransport(TestCase):
 
645
    """Tests for connected to remote server transports"""
 
646
 
 
647
    def test_parse_url(self):
 
648
        t = ConnectedTransport('http://simple.example.com/home/source')
 
649
        self.assertEquals(t._host, 'simple.example.com')
 
650
        self.assertEquals(t._port, None)
 
651
        self.assertEquals(t._path, '/home/source/')
 
652
        self.failUnless(t._user is None)
 
653
        self.failUnless(t._password is None)
 
654
 
 
655
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
656
 
 
657
    def test_parse_url_with_at_in_user(self):
 
658
        # Bug 228058
 
659
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
660
        self.assertEquals(t._user, 'user@host.com')
 
661
 
 
662
    def test_parse_quoted_url(self):
 
663
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
664
        self.assertEquals(t._host, 'exAmple.com')
 
665
        self.assertEquals(t._port, 2222)
 
666
        self.assertEquals(t._user, 'robey')
 
667
        self.assertEquals(t._password, 'h@t')
 
668
        self.assertEquals(t._path, '/path/')
 
669
 
 
670
        # Base should not keep track of the password
 
671
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
672
 
 
673
    def test_parse_invalid_url(self):
 
674
        self.assertRaises(errors.InvalidURL,
 
675
                          ConnectedTransport,
 
676
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
677
 
 
678
    def test_relpath(self):
 
679
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
680
 
 
681
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
682
        self.assertRaises(errors.PathNotChild, t.relpath,
 
683
                          'http://user@host.com/abs/path/sub')
 
684
        self.assertRaises(errors.PathNotChild, t.relpath,
 
685
                          'sftp://user2@host.com/abs/path/sub')
 
686
        self.assertRaises(errors.PathNotChild, t.relpath,
 
687
                          'sftp://user@otherhost.com/abs/path/sub')
 
688
        self.assertRaises(errors.PathNotChild, t.relpath,
 
689
                          'sftp://user@host.com:33/abs/path/sub')
 
690
        # Make sure it works when we don't supply a username
 
691
        t = ConnectedTransport('sftp://host.com/abs/path')
 
692
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
693
 
 
694
        # Make sure it works when parts of the path will be url encoded
 
695
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
696
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
697
 
 
698
    def test_connection_sharing_propagate_credentials(self):
 
699
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
700
        self.assertEquals('user', t._user)
 
701
        self.assertEquals('host.com', t._host)
 
702
        self.assertIs(None, t._get_connection())
 
703
        self.assertIs(None, t._password)
 
704
        c = t.clone('subdir')
 
705
        self.assertIs(None, c._get_connection())
 
706
        self.assertIs(None, t._password)
 
707
 
 
708
        # Simulate the user entering a password
 
709
        password = 'secret'
 
710
        connection = object()
 
711
        t._set_connection(connection, password)
 
712
        self.assertIs(connection, t._get_connection())
 
713
        self.assertIs(password, t._get_credentials())
 
714
        self.assertIs(connection, c._get_connection())
 
715
        self.assertIs(password, c._get_credentials())
 
716
 
 
717
        # credentials can be updated
 
718
        new_password = 'even more secret'
 
719
        c._update_credentials(new_password)
 
720
        self.assertIs(connection, t._get_connection())
 
721
        self.assertIs(new_password, t._get_credentials())
 
722
        self.assertIs(connection, c._get_connection())
 
723
        self.assertIs(new_password, c._get_credentials())
 
724
 
 
725
 
 
726
class TestReusedTransports(TestCase):
 
727
    """Tests for transport reuse"""
 
728
 
 
729
    def test_reuse_same_transport(self):
 
730
        possible_transports = []
 
731
        t1 = get_transport('http://foo/',
 
732
                           possible_transports=possible_transports)
 
733
        self.assertEqual([t1], possible_transports)
 
734
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
735
        self.assertIs(t1, t2)
 
736
 
 
737
        # Also check that final '/' are handled correctly
 
738
        t3 = get_transport('http://foo/path/')
 
739
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
740
        self.assertIs(t3, t4)
 
741
 
 
742
        t5 = get_transport('http://foo/path')
 
743
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
744
        self.assertIs(t5, t6)
 
745
 
 
746
    def test_don_t_reuse_different_transport(self):
 
747
        t1 = get_transport('http://foo/path')
 
748
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
749
        self.assertIsNot(t1, t2)
 
750
 
 
751
 
 
752
class TestTransportTrace(TestCase):
 
753
 
 
754
    def test_get(self):
 
755
        transport = get_transport('trace+memory://')
 
756
        self.assertIsInstance(
 
757
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
758
 
 
759
    def test_clone_preserves_activity(self):
 
760
        transport = get_transport('trace+memory://')
 
761
        transport2 = transport.clone('.')
 
762
        self.assertTrue(transport is not transport2)
 
763
        self.assertTrue(transport._activity is transport2._activity)
 
764
 
 
765
    # the following specific tests are for the operations that have made use of
 
766
    # logging in tests; we could test every single operation but doing that
 
767
    # still won't cause a test failure when the top level Transport API
 
768
    # changes; so there is little return doing that.
 
769
    def test_get(self):
 
770
        transport = get_transport('trace+memory:///')
 
771
        transport.put_bytes('foo', 'barish')
 
772
        transport.get('foo')
 
773
        expected_result = []
 
774
        # put_bytes records the bytes, not the content to avoid memory
 
775
        # pressure.
 
776
        expected_result.append(('put_bytes', 'foo', 6, None))
 
777
        # get records the file name only.
 
778
        expected_result.append(('get', 'foo'))
 
779
        self.assertEqual(expected_result, transport._activity)
 
780
 
 
781
    def test_readv(self):
 
782
        transport = get_transport('trace+memory:///')
 
783
        transport.put_bytes('foo', 'barish')
 
784
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
785
            upper_limit=6))
 
786
        expected_result = []
 
787
        # put_bytes records the bytes, not the content to avoid memory
 
788
        # pressure.
 
789
        expected_result.append(('put_bytes', 'foo', 6, None))
 
790
        # readv records the supplied offset request
 
791
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
792
        self.assertEqual(expected_result, transport._activity)