~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2007-09-19 13:12:32 UTC
  • mto: (2592.3.153 repository)
  • mto: This revision was merged to the branch mainline in revision 2933.
  • Revision ID: mbp@sourcefrog.net-20070919131232-0gtp1q90fxz10ctn
move commit_write_group to RepositoryPackCollection

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
 
21
from cStringIO import StringIO
 
22
 
 
23
import bzrlib
 
24
from bzrlib import (
 
25
    errors,
 
26
    urlutils,
 
27
    )
 
28
from bzrlib.errors import (ConnectionError,
 
29
                           DependencyNotPresent,
 
30
                           FileExists,
 
31
                           InvalidURLJoin,
 
32
                           NoSuchFile,
 
33
                           PathNotChild,
 
34
                           TransportNotPossible,
 
35
                           ConnectionError,
 
36
                           DependencyNotPresent,
 
37
                           ReadError,
 
38
                           UnsupportedProtocol,
 
39
                           )
 
40
from bzrlib.tests import TestCase, TestCaseInTempDir
 
41
from bzrlib.transport import (_CoalescedOffset,
 
42
                              ConnectedTransport,
 
43
                              _get_protocol_handlers,
 
44
                              _set_protocol_handlers,
 
45
                              _get_transport_modules,
 
46
                              get_transport,
 
47
                              LateReadError,
 
48
                              register_lazy_transport,
 
49
                              register_transport_proto,
 
50
                              _clear_protocol_handlers,
 
51
                              Transport,
 
52
                              )
 
53
from bzrlib.transport.chroot import ChrootServer
 
54
from bzrlib.transport.memory import MemoryTransport
 
55
from bzrlib.transport.local import (LocalTransport,
 
56
                                    EmulatedWin32LocalTransport)
 
57
 
 
58
 
 
59
# TODO: Should possibly split transport-specific tests into their own files.
 
60
 
 
61
 
 
62
class TestTransport(TestCase):
 
63
    """Test the non transport-concrete class functionality."""
 
64
 
 
65
    def test__get_set_protocol_handlers(self):
 
66
        handlers = _get_protocol_handlers()
 
67
        self.assertNotEqual([], handlers.keys( ))
 
68
        try:
 
69
            _clear_protocol_handlers()
 
70
            self.assertEqual([], _get_protocol_handlers().keys())
 
71
        finally:
 
72
            _set_protocol_handlers(handlers)
 
73
 
 
74
    def test_get_transport_modules(self):
 
75
        handlers = _get_protocol_handlers()
 
76
        class SampleHandler(object):
 
77
            """I exist, isnt that enough?"""
 
78
        try:
 
79
            _clear_protocol_handlers()
 
80
            register_transport_proto('foo')
 
81
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
82
            register_transport_proto('bar')
 
83
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
84
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
85
                             _get_transport_modules())
 
86
        finally:
 
87
            _set_protocol_handlers(handlers)
 
88
 
 
89
    def test_transport_dependency(self):
 
90
        """Transport with missing dependency causes no error"""
 
91
        saved_handlers = _get_protocol_handlers()
 
92
        try:
 
93
            register_transport_proto('foo')
 
94
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
95
                    'BadTransportHandler')
 
96
            try:
 
97
                get_transport('foo://fooserver/foo')
 
98
            except UnsupportedProtocol, e:
 
99
                e_str = str(e)
 
100
                self.assertEquals('Unsupported protocol'
 
101
                                  ' for url "foo://fooserver/foo":'
 
102
                                  ' Unable to import library "some_lib":'
 
103
                                  ' testing missing dependency', str(e))
 
104
            else:
 
105
                self.fail('Did not raise UnsupportedProtocol')
 
106
        finally:
 
107
            # restore original values
 
108
            _set_protocol_handlers(saved_handlers)
 
109
            
 
110
    def test_transport_fallback(self):
 
111
        """Transport with missing dependency causes no error"""
 
112
        saved_handlers = _get_protocol_handlers()
 
113
        try:
 
114
            _clear_protocol_handlers()
 
115
            register_transport_proto('foo')
 
116
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
117
                    'BackupTransportHandler')
 
118
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
119
                    'BadTransportHandler')
 
120
            t = get_transport('foo://fooserver/foo')
 
121
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
122
        finally:
 
123
            _set_protocol_handlers(saved_handlers)
 
124
 
 
125
    def test_LateReadError(self):
 
126
        """The LateReadError helper should raise on read()."""
 
127
        a_file = LateReadError('a path')
 
128
        try:
 
129
            a_file.read()
 
130
        except ReadError, error:
 
131
            self.assertEqual('a path', error.path)
 
132
        self.assertRaises(ReadError, a_file.read, 40)
 
133
        a_file.close()
 
134
 
 
135
    def test__combine_paths(self):
 
136
        t = Transport('/')
 
137
        self.assertEqual('/home/sarah/project/foo',
 
138
                         t._combine_paths('/home/sarah', 'project/foo'))
 
139
        self.assertEqual('/etc',
 
140
                         t._combine_paths('/home/sarah', '../../etc'))
 
141
        self.assertEqual('/etc',
 
142
                         t._combine_paths('/home/sarah', '../../../etc'))
 
143
        self.assertEqual('/etc',
 
144
                         t._combine_paths('/home/sarah', '/etc'))
 
145
 
 
146
    def test_local_abspath_non_local_transport(self):
 
147
        # the base implementation should throw
 
148
        t = MemoryTransport()
 
149
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
150
        self.assertEqual('memory:///t is not a local path.', str(e))
 
151
 
 
152
 
 
153
class TestCoalesceOffsets(TestCase):
 
154
    
 
155
    def check(self, expected, offsets, limit=0, fudge=0):
 
156
        coalesce = Transport._coalesce_offsets
 
157
        exp = [_CoalescedOffset(*x) for x in expected]
 
158
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
159
        self.assertEqual(exp, out)
 
160
 
 
161
    def test_coalesce_empty(self):
 
162
        self.check([], [])
 
163
 
 
164
    def test_coalesce_simple(self):
 
165
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
166
 
 
167
    def test_coalesce_unrelated(self):
 
168
        self.check([(0, 10, [(0, 10)]),
 
169
                    (20, 10, [(0, 10)]),
 
170
                   ], [(0, 10), (20, 10)])
 
171
            
 
172
    def test_coalesce_unsorted(self):
 
173
        self.check([(20, 10, [(0, 10)]),
 
174
                    (0, 10, [(0, 10)]),
 
175
                   ], [(20, 10), (0, 10)])
 
176
 
 
177
    def test_coalesce_nearby(self):
 
178
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
179
                   [(0, 10), (10, 10)])
 
180
 
 
181
    def test_coalesce_overlapped(self):
 
182
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
183
                   [(0, 10), (5, 10)])
 
184
 
 
185
    def test_coalesce_limit(self):
 
186
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
187
                              (30, 10), (40, 10)]),
 
188
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
189
                              (30, 10), (40, 10)]),
 
190
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
191
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
192
                       (90, 10), (100, 10)],
 
193
                    limit=5)
 
194
 
 
195
    def test_coalesce_no_limit(self):
 
196
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
197
                               (30, 10), (40, 10), (50, 10),
 
198
                               (60, 10), (70, 10), (80, 10),
 
199
                               (90, 10)]),
 
200
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
201
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
202
                       (90, 10), (100, 10)])
 
203
 
 
204
    def test_coalesce_fudge(self):
 
205
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
206
                    (100, 10, [(0, 10),]),
 
207
                   ], [(10, 10), (30, 10), (100, 10)],
 
208
                   fudge=10
 
209
                  )
 
210
 
 
211
 
 
212
class TestMemoryTransport(TestCase):
 
213
 
 
214
    def test_get_transport(self):
 
215
        MemoryTransport()
 
216
 
 
217
    def test_clone(self):
 
218
        transport = MemoryTransport()
 
219
        self.assertTrue(isinstance(transport, MemoryTransport))
 
220
        self.assertEqual("memory:///", transport.clone("/").base)
 
221
 
 
222
    def test_abspath(self):
 
223
        transport = MemoryTransport()
 
224
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
225
 
 
226
    def test_abspath_of_root(self):
 
227
        transport = MemoryTransport()
 
228
        self.assertEqual("memory:///", transport.base)
 
229
        self.assertEqual("memory:///", transport.abspath('/'))
 
230
 
 
231
    def test_abspath_of_relpath_starting_at_root(self):
 
232
        transport = MemoryTransport()
 
233
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
234
 
 
235
    def test_append_and_get(self):
 
236
        transport = MemoryTransport()
 
237
        transport.append_bytes('path', 'content')
 
238
        self.assertEqual(transport.get('path').read(), 'content')
 
239
        transport.append_file('path', StringIO('content'))
 
240
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
241
 
 
242
    def test_put_and_get(self):
 
243
        transport = MemoryTransport()
 
244
        transport.put_file('path', StringIO('content'))
 
245
        self.assertEqual(transport.get('path').read(), 'content')
 
246
        transport.put_bytes('path', 'content')
 
247
        self.assertEqual(transport.get('path').read(), 'content')
 
248
 
 
249
    def test_append_without_dir_fails(self):
 
250
        transport = MemoryTransport()
 
251
        self.assertRaises(NoSuchFile,
 
252
                          transport.append_bytes, 'dir/path', 'content')
 
253
 
 
254
    def test_put_without_dir_fails(self):
 
255
        transport = MemoryTransport()
 
256
        self.assertRaises(NoSuchFile,
 
257
                          transport.put_file, 'dir/path', StringIO('content'))
 
258
 
 
259
    def test_get_missing(self):
 
260
        transport = MemoryTransport()
 
261
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
262
 
 
263
    def test_has_missing(self):
 
264
        transport = MemoryTransport()
 
265
        self.assertEquals(False, transport.has('foo'))
 
266
 
 
267
    def test_has_present(self):
 
268
        transport = MemoryTransport()
 
269
        transport.append_bytes('foo', 'content')
 
270
        self.assertEquals(True, transport.has('foo'))
 
271
 
 
272
    def test_list_dir(self):
 
273
        transport = MemoryTransport()
 
274
        transport.put_bytes('foo', 'content')
 
275
        transport.mkdir('dir')
 
276
        transport.put_bytes('dir/subfoo', 'content')
 
277
        transport.put_bytes('dirlike', 'content')
 
278
 
 
279
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
280
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
281
 
 
282
    def test_mkdir(self):
 
283
        transport = MemoryTransport()
 
284
        transport.mkdir('dir')
 
285
        transport.append_bytes('dir/path', 'content')
 
286
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
287
 
 
288
    def test_mkdir_missing_parent(self):
 
289
        transport = MemoryTransport()
 
290
        self.assertRaises(NoSuchFile,
 
291
                          transport.mkdir, 'dir/dir')
 
292
 
 
293
    def test_mkdir_twice(self):
 
294
        transport = MemoryTransport()
 
295
        transport.mkdir('dir')
 
296
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
297
 
 
298
    def test_parameters(self):
 
299
        transport = MemoryTransport()
 
300
        self.assertEqual(True, transport.listable())
 
301
        self.assertEqual(False, transport.is_readonly())
 
302
 
 
303
    def test_iter_files_recursive(self):
 
304
        transport = MemoryTransport()
 
305
        transport.mkdir('dir')
 
306
        transport.put_bytes('dir/foo', 'content')
 
307
        transport.put_bytes('dir/bar', 'content')
 
308
        transport.put_bytes('bar', 'content')
 
309
        paths = set(transport.iter_files_recursive())
 
310
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
311
 
 
312
    def test_stat(self):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.put_bytes('bar', 'phowar')
 
316
        self.assertEqual(7, transport.stat('foo').st_size)
 
317
        self.assertEqual(6, transport.stat('bar').st_size)
 
318
 
 
319
 
 
320
class ChrootDecoratorTransportTest(TestCase):
 
321
    """Chroot decoration specific tests."""
 
322
 
 
323
    def test_abspath(self):
 
324
        # The abspath is always relative to the chroot_url.
 
325
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
326
        server.setUp()
 
327
        transport = get_transport(server.get_url())
 
328
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
329
 
 
330
        subdir_transport = transport.clone('subdir')
 
331
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
332
        server.tearDown()
 
333
 
 
334
    def test_clone(self):
 
335
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
336
        server.setUp()
 
337
        transport = get_transport(server.get_url())
 
338
        # relpath from root and root path are the same
 
339
        relpath_cloned = transport.clone('foo')
 
340
        abspath_cloned = transport.clone('/foo')
 
341
        self.assertEqual(server, relpath_cloned.server)
 
342
        self.assertEqual(server, abspath_cloned.server)
 
343
        server.tearDown()
 
344
    
 
345
    def test_chroot_url_preserves_chroot(self):
 
346
        """Calling get_transport on a chroot transport's base should produce a
 
347
        transport with exactly the same behaviour as the original chroot
 
348
        transport.
 
349
 
 
350
        This is so that it is not possible to escape a chroot by doing::
 
351
            url = chroot_transport.base
 
352
            parent_url = urlutils.join(url, '..')
 
353
            new_transport = get_transport(parent_url)
 
354
        """
 
355
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
356
        server.setUp()
 
357
        transport = get_transport(server.get_url())
 
358
        new_transport = get_transport(transport.base)
 
359
        self.assertEqual(transport.server, new_transport.server)
 
360
        self.assertEqual(transport.base, new_transport.base)
 
361
        server.tearDown()
 
362
        
 
363
    def test_urljoin_preserves_chroot(self):
 
364
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
365
        URL that escapes the intended chroot.
 
366
 
 
367
        This is so that it is not possible to escape a chroot by doing::
 
368
            url = chroot_transport.base
 
369
            parent_url = urlutils.join(url, '..')
 
370
            new_transport = get_transport(parent_url)
 
371
        """
 
372
        server = ChrootServer(get_transport('memory:///path/'))
 
373
        server.setUp()
 
374
        transport = get_transport(server.get_url())
 
375
        self.assertRaises(
 
376
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
377
        server.tearDown()
 
378
 
 
379
 
 
380
class ChrootServerTest(TestCase):
 
381
 
 
382
    def test_construct(self):
 
383
        backing_transport = MemoryTransport()
 
384
        server = ChrootServer(backing_transport)
 
385
        self.assertEqual(backing_transport, server.backing_transport)
 
386
 
 
387
    def test_setUp(self):
 
388
        backing_transport = MemoryTransport()
 
389
        server = ChrootServer(backing_transport)
 
390
        server.setUp()
 
391
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
392
 
 
393
    def test_tearDown(self):
 
394
        backing_transport = MemoryTransport()
 
395
        server = ChrootServer(backing_transport)
 
396
        server.setUp()
 
397
        server.tearDown()
 
398
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
399
 
 
400
    def test_get_url(self):
 
401
        backing_transport = MemoryTransport()
 
402
        server = ChrootServer(backing_transport)
 
403
        server.setUp()
 
404
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
405
        server.tearDown()
 
406
 
 
407
 
 
408
class ReadonlyDecoratorTransportTest(TestCase):
 
409
    """Readonly decoration specific tests."""
 
410
 
 
411
    def test_local_parameters(self):
 
412
        import bzrlib.transport.readonly as readonly
 
413
        # connect to . in readonly mode
 
414
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
415
        self.assertEqual(True, transport.listable())
 
416
        self.assertEqual(True, transport.is_readonly())
 
417
 
 
418
    def test_http_parameters(self):
 
419
        from bzrlib.tests.HttpServer import HttpServer
 
420
        import bzrlib.transport.readonly as readonly
 
421
        # connect to . via http which is not listable
 
422
        server = HttpServer()
 
423
        server.setUp()
 
424
        try:
 
425
            transport = get_transport('readonly+' + server.get_url())
 
426
            self.failUnless(isinstance(transport,
 
427
                                       readonly.ReadonlyTransportDecorator))
 
428
            self.assertEqual(False, transport.listable())
 
429
            self.assertEqual(True, transport.is_readonly())
 
430
        finally:
 
431
            server.tearDown()
 
432
 
 
433
 
 
434
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
435
    """NFS decorator specific tests."""
 
436
 
 
437
    def get_nfs_transport(self, url):
 
438
        import bzrlib.transport.fakenfs as fakenfs
 
439
        # connect to url with nfs decoration
 
440
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
441
 
 
442
    def test_local_parameters(self):
 
443
        # the listable and is_readonly parameters
 
444
        # are not changed by the fakenfs decorator
 
445
        transport = self.get_nfs_transport('.')
 
446
        self.assertEqual(True, transport.listable())
 
447
        self.assertEqual(False, transport.is_readonly())
 
448
 
 
449
    def test_http_parameters(self):
 
450
        # the listable and is_readonly parameters
 
451
        # are not changed by the fakenfs decorator
 
452
        from bzrlib.tests.HttpServer import HttpServer
 
453
        # connect to . via http which is not listable
 
454
        server = HttpServer()
 
455
        server.setUp()
 
456
        try:
 
457
            transport = self.get_nfs_transport(server.get_url())
 
458
            self.assertIsInstance(
 
459
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
460
            self.assertEqual(False, transport.listable())
 
461
            self.assertEqual(True, transport.is_readonly())
 
462
        finally:
 
463
            server.tearDown()
 
464
 
 
465
    def test_fakenfs_server_default(self):
 
466
        # a FakeNFSServer() should bring up a local relpath server for itself
 
467
        import bzrlib.transport.fakenfs as fakenfs
 
468
        server = fakenfs.FakeNFSServer()
 
469
        server.setUp()
 
470
        try:
 
471
            # the url should be decorated appropriately
 
472
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
473
            # and we should be able to get a transport for it
 
474
            transport = get_transport(server.get_url())
 
475
            # which must be a FakeNFSTransportDecorator instance.
 
476
            self.assertIsInstance(
 
477
                transport, fakenfs.FakeNFSTransportDecorator)
 
478
        finally:
 
479
            server.tearDown()
 
480
 
 
481
    def test_fakenfs_rename_semantics(self):
 
482
        # a FakeNFS transport must mangle the way rename errors occur to
 
483
        # look like NFS problems.
 
484
        transport = self.get_nfs_transport('.')
 
485
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
486
                        transport=transport)
 
487
        self.assertRaises(errors.ResourceBusy,
 
488
                          transport.rename, 'from', 'to')
 
489
 
 
490
 
 
491
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
492
    """Tests for simulation of VFAT restrictions"""
 
493
 
 
494
    def get_vfat_transport(self, url):
 
495
        """Return vfat-backed transport for test directory"""
 
496
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
497
        return FakeVFATTransportDecorator('vfat+' + url)
 
498
 
 
499
    def test_transport_creation(self):
 
500
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
501
        transport = self.get_vfat_transport('.')
 
502
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
503
 
 
504
    def test_transport_mkdir(self):
 
505
        transport = self.get_vfat_transport('.')
 
506
        transport.mkdir('HELLO')
 
507
        self.assertTrue(transport.has('hello'))
 
508
        self.assertTrue(transport.has('Hello'))
 
509
 
 
510
    def test_forbidden_chars(self):
 
511
        transport = self.get_vfat_transport('.')
 
512
        self.assertRaises(ValueError, transport.has, "<NU>")
 
513
 
 
514
 
 
515
class BadTransportHandler(Transport):
 
516
    def __init__(self, base_url):
 
517
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
518
 
 
519
 
 
520
class BackupTransportHandler(Transport):
 
521
    """Test transport that works as a backup for the BadTransportHandler"""
 
522
    pass
 
523
 
 
524
 
 
525
class TestTransportImplementation(TestCaseInTempDir):
 
526
    """Implementation verification for transports.
 
527
    
 
528
    To verify a transport we need a server factory, which is a callable
 
529
    that accepts no parameters and returns an implementation of
 
530
    bzrlib.transport.Server.
 
531
    
 
532
    That Server is then used to construct transport instances and test
 
533
    the transport via loopback activity.
 
534
 
 
535
    Currently this assumes that the Transport object is connected to the 
 
536
    current working directory.  So that whatever is done 
 
537
    through the transport, should show up in the working 
 
538
    directory, and vice-versa. This is a bug, because its possible to have
 
539
    URL schemes which provide access to something that may not be 
 
540
    result in storage on the local disk, i.e. due to file system limits, or 
 
541
    due to it being a database or some other non-filesystem tool.
 
542
 
 
543
    This also tests to make sure that the functions work with both
 
544
    generators and lists (assuming iter(list) is effectively a generator)
 
545
    """
 
546
    
 
547
    def setUp(self):
 
548
        super(TestTransportImplementation, self).setUp()
 
549
        self._server = self.transport_server()
 
550
        self._server.setUp()
 
551
        self.addCleanup(self._server.tearDown)
 
552
 
 
553
    def get_transport(self, relpath=None):
 
554
        """Return a connected transport to the local directory.
 
555
 
 
556
        :param relpath: a path relative to the base url.
 
557
        """
 
558
        base_url = self._server.get_url()
 
559
        url = self._adjust_url(base_url, relpath)
 
560
        # try getting the transport via the regular interface:
 
561
        t = get_transport(url)
 
562
        # vila--20070607 if the following are commented out the test suite
 
563
        # still pass. Is this really still needed or was it a forgotten
 
564
        # temporary fix ?
 
565
        if not isinstance(t, self.transport_class):
 
566
            # we did not get the correct transport class type. Override the
 
567
            # regular connection behaviour by direct construction.
 
568
            t = self.transport_class(url)
 
569
        return t
 
570
 
 
571
 
 
572
class TestLocalTransports(TestCase):
 
573
 
 
574
    def test_get_transport_from_abspath(self):
 
575
        here = os.path.abspath('.')
 
576
        t = get_transport(here)
 
577
        self.assertIsInstance(t, LocalTransport)
 
578
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
579
 
 
580
    def test_get_transport_from_relpath(self):
 
581
        here = os.path.abspath('.')
 
582
        t = get_transport('.')
 
583
        self.assertIsInstance(t, LocalTransport)
 
584
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
585
 
 
586
    def test_get_transport_from_local_url(self):
 
587
        here = os.path.abspath('.')
 
588
        here_url = urlutils.local_path_to_url(here) + '/'
 
589
        t = get_transport(here_url)
 
590
        self.assertIsInstance(t, LocalTransport)
 
591
        self.assertEquals(t.base, here_url)
 
592
 
 
593
    def test_local_abspath(self):
 
594
        here = os.path.abspath('.')
 
595
        t = get_transport(here)
 
596
        self.assertEquals(t.local_abspath(''), here)
 
597
 
 
598
 
 
599
class TestWin32LocalTransport(TestCase):
 
600
 
 
601
    def test_unc_clone_to_root(self):
 
602
        # Win32 UNC path like \\HOST\path
 
603
        # clone to root should stop at least at \\HOST part
 
604
        # not on \\
 
605
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
606
        for i in xrange(4):
 
607
            t = t.clone('..')
 
608
        self.assertEquals(t.base, 'file://HOST/')
 
609
        # make sure we reach the root
 
610
        t = t.clone('..')
 
611
        self.assertEquals(t.base, 'file://HOST/')
 
612
 
 
613
 
 
614
class TestConnectedTransport(TestCase):
 
615
    """Tests for connected to remote server transports"""
 
616
 
 
617
    def test_parse_url(self):
 
618
        t = ConnectedTransport('sftp://simple.example.com/home/source')
 
619
        self.assertEquals(t._host, 'simple.example.com')
 
620
        self.assertEquals(t._port, None)
 
621
        self.assertEquals(t._path, '/home/source/')
 
622
        self.failUnless(t._user is None)
 
623
        self.failUnless(t._password is None)
 
624
 
 
625
        self.assertEquals(t.base, 'sftp://simple.example.com/home/source/')
 
626
 
 
627
    def test_parse_quoted_url(self):
 
628
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
629
        self.assertEquals(t._host, 'exAmple.com')
 
630
        self.assertEquals(t._port, 2222)
 
631
        self.assertEquals(t._user, 'robey')
 
632
        self.assertEquals(t._password, 'h@t')
 
633
        self.assertEquals(t._path, '/path/')
 
634
 
 
635
        # Base should not keep track of the password
 
636
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
637
 
 
638
    def test_parse_invalid_url(self):
 
639
        self.assertRaises(errors.InvalidURL,
 
640
                          ConnectedTransport,
 
641
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
642
 
 
643
    def test_relpath(self):
 
644
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
645
 
 
646
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
647
        self.assertRaises(errors.PathNotChild, t.relpath,
 
648
                          'http://user@host.com/abs/path/sub')
 
649
        self.assertRaises(errors.PathNotChild, t.relpath,
 
650
                          'sftp://user2@host.com/abs/path/sub')
 
651
        self.assertRaises(errors.PathNotChild, t.relpath,
 
652
                          'sftp://user@otherhost.com/abs/path/sub')
 
653
        self.assertRaises(errors.PathNotChild, t.relpath,
 
654
                          'sftp://user@host.com:33/abs/path/sub')
 
655
        # Make sure it works when we don't supply a username
 
656
        t = ConnectedTransport('sftp://host.com/abs/path')
 
657
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
658
 
 
659
        # Make sure it works when parts of the path will be url encoded
 
660
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
661
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
662
 
 
663
    def test_connection_sharing_propagate_credentials(self):
 
664
        t = ConnectedTransport('foo://user@host.com/abs/path')
 
665
        self.assertIs(None, t._get_connection())
 
666
        self.assertIs(None, t._password)
 
667
        c = t.clone('subdir')
 
668
        self.assertEquals(None, c._get_connection())
 
669
        self.assertIs(None, t._password)
 
670
 
 
671
        # Simulate the user entering a password
 
672
        password = 'secret'
 
673
        connection = object()
 
674
        t._set_connection(connection, password)
 
675
        self.assertIs(connection, t._get_connection())
 
676
        self.assertIs(password, t._get_credentials())
 
677
        self.assertIs(connection, c._get_connection())
 
678
        self.assertIs(password, c._get_credentials())
 
679
 
 
680
        # credentials can be updated
 
681
        new_password = 'even more secret'
 
682
        c._update_credentials(new_password)
 
683
        self.assertIs(connection, t._get_connection())
 
684
        self.assertIs(new_password, t._get_credentials())
 
685
        self.assertIs(connection, c._get_connection())
 
686
        self.assertIs(new_password, c._get_credentials())
 
687
 
 
688
 
 
689
class TestReusedTransports(TestCase):
 
690
    """Tests for transport reuse"""
 
691
 
 
692
    def test_reuse_same_transport(self):
 
693
        possible_transports = []
 
694
        t1 = get_transport('http://foo/',
 
695
                           possible_transports=possible_transports)
 
696
        self.assertEqual([t1], possible_transports)
 
697
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
698
        self.assertIs(t1, t2)
 
699
 
 
700
        # Also check that final '/' are handled correctly
 
701
        t3 = get_transport('http://foo/path/')
 
702
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
703
        self.assertIs(t3, t4)
 
704
 
 
705
        t5 = get_transport('http://foo/path')
 
706
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
707
        self.assertIs(t5, t6)
 
708
 
 
709
    def test_don_t_reuse_different_transport(self):
 
710
        t1 = get_transport('http://foo/path')
 
711
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
712
        self.assertIsNot(t1, t2)
 
713
 
 
714
 
 
715
def get_test_permutations():
 
716
    """Return transport permutations to be used in testing.
 
717
 
 
718
    This module registers some transports, but they're only for testing
 
719
    registration.  We don't really want to run all the transport tests against
 
720
    them.
 
721
    """
 
722
    return []