~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2005-05-13 00:57:32 UTC
  • Revision ID: mbp@sourcefrog.net-20050513005732-26b0a3042cbb57d1
- more notes on tagging

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
import os
19
 
import sys
20
 
import stat
21
 
from cStringIO import StringIO
22
 
 
23
 
import bzrlib
24
 
from bzrlib import (
25
 
    errors,
26
 
    osutils,
27
 
    urlutils,
28
 
    )
29
 
from bzrlib.errors import (ConnectionError,
30
 
                           DependencyNotPresent,
31
 
                           FileExists,
32
 
                           InvalidURLJoin,
33
 
                           NoSuchFile,
34
 
                           PathNotChild,
35
 
                           TransportNotPossible,
36
 
                           ConnectionError,
37
 
                           DependencyNotPresent,
38
 
                           ReadError,
39
 
                           UnsupportedProtocol,
40
 
                           )
41
 
from bzrlib.tests import TestCase, TestCaseInTempDir
42
 
from bzrlib.transport import (_CoalescedOffset,
43
 
                              ConnectedTransport,
44
 
                              _get_protocol_handlers,
45
 
                              _set_protocol_handlers,
46
 
                              _get_transport_modules,
47
 
                              get_transport,
48
 
                              LateReadError,
49
 
                              register_lazy_transport,
50
 
                              register_transport_proto,
51
 
                              _clear_protocol_handlers,
52
 
                              Transport,
53
 
                              )
54
 
from bzrlib.transport.chroot import ChrootServer
55
 
from bzrlib.transport.memory import MemoryTransport
56
 
from bzrlib.transport.local import (LocalTransport,
57
 
                                    EmulatedWin32LocalTransport)
58
 
 
59
 
 
60
 
# TODO: Should possibly split transport-specific tests into their own files.
61
 
 
62
 
 
63
 
class TestTransport(TestCase):
64
 
    """Test the non transport-concrete class functionality."""
65
 
 
66
 
    def test__get_set_protocol_handlers(self):
67
 
        handlers = _get_protocol_handlers()
68
 
        self.assertNotEqual([], handlers.keys( ))
69
 
        try:
70
 
            _clear_protocol_handlers()
71
 
            self.assertEqual([], _get_protocol_handlers().keys())
72
 
        finally:
73
 
            _set_protocol_handlers(handlers)
74
 
 
75
 
    def test_get_transport_modules(self):
76
 
        handlers = _get_protocol_handlers()
77
 
        class SampleHandler(object):
78
 
            """I exist, isnt that enough?"""
79
 
        try:
80
 
            _clear_protocol_handlers()
81
 
            register_transport_proto('foo')
82
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
83
 
            register_transport_proto('bar')
84
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
85
 
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
86
 
                             _get_transport_modules())
87
 
        finally:
88
 
            _set_protocol_handlers(handlers)
89
 
 
90
 
    def test_transport_dependency(self):
91
 
        """Transport with missing dependency causes no error"""
92
 
        saved_handlers = _get_protocol_handlers()
93
 
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
111
 
    def test_transport_fallback(self):
112
 
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
114
 
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
125
 
 
126
 
    def test_LateReadError(self):
127
 
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
129
 
        try:
130
 
            a_file.read()
131
 
        except ReadError, error:
132
 
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
134
 
        a_file.close()
135
 
 
136
 
    def test__combine_paths(self):
137
 
        t = Transport('/')
138
 
        self.assertEqual('/home/sarah/project/foo',
139
 
                         t._combine_paths('/home/sarah', 'project/foo'))
140
 
        self.assertEqual('/etc',
141
 
                         t._combine_paths('/home/sarah', '../../etc'))
142
 
        self.assertEqual('/etc',
143
 
                         t._combine_paths('/home/sarah', '../../../etc'))
144
 
        self.assertEqual('/etc',
145
 
                         t._combine_paths('/home/sarah', '/etc'))
146
 
 
147
 
    def test_local_abspath_non_local_transport(self):
148
 
        # the base implementation should throw
149
 
        t = MemoryTransport()
150
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
 
        self.assertEqual('memory:///t is not a local path.', str(e))
152
 
 
153
 
 
154
 
class TestCoalesceOffsets(TestCase):
155
 
    
156
 
    def check(self, expected, offsets, limit=0, fudge=0):
157
 
        coalesce = Transport._coalesce_offsets
158
 
        exp = [_CoalescedOffset(*x) for x in expected]
159
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
160
 
        self.assertEqual(exp, out)
161
 
 
162
 
    def test_coalesce_empty(self):
163
 
        self.check([], [])
164
 
 
165
 
    def test_coalesce_simple(self):
166
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
167
 
 
168
 
    def test_coalesce_unrelated(self):
169
 
        self.check([(0, 10, [(0, 10)]),
170
 
                    (20, 10, [(0, 10)]),
171
 
                   ], [(0, 10), (20, 10)])
172
 
            
173
 
    def test_coalesce_unsorted(self):
174
 
        self.check([(20, 10, [(0, 10)]),
175
 
                    (0, 10, [(0, 10)]),
176
 
                   ], [(20, 10), (0, 10)])
177
 
 
178
 
    def test_coalesce_nearby(self):
179
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
180
 
                   [(0, 10), (10, 10)])
181
 
 
182
 
    def test_coalesce_overlapped(self):
183
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
184
 
                   [(0, 10), (5, 10)])
185
 
 
186
 
    def test_coalesce_limit(self):
187
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
188
 
                              (30, 10), (40, 10)]),
189
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
190
 
                              (30, 10), (40, 10)]),
191
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
192
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
193
 
                       (90, 10), (100, 10)],
194
 
                    limit=5)
195
 
 
196
 
    def test_coalesce_no_limit(self):
197
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
198
 
                               (30, 10), (40, 10), (50, 10),
199
 
                               (60, 10), (70, 10), (80, 10),
200
 
                               (90, 10)]),
201
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
202
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
203
 
                       (90, 10), (100, 10)])
204
 
 
205
 
    def test_coalesce_fudge(self):
206
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
207
 
                    (100, 10, [(0, 10),]),
208
 
                   ], [(10, 10), (30, 10), (100, 10)],
209
 
                   fudge=10
210
 
                  )
211
 
 
212
 
 
213
 
class TestMemoryTransport(TestCase):
214
 
 
215
 
    def test_get_transport(self):
216
 
        MemoryTransport()
217
 
 
218
 
    def test_clone(self):
219
 
        transport = MemoryTransport()
220
 
        self.assertTrue(isinstance(transport, MemoryTransport))
221
 
        self.assertEqual("memory:///", transport.clone("/").base)
222
 
 
223
 
    def test_abspath(self):
224
 
        transport = MemoryTransport()
225
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
226
 
 
227
 
    def test_abspath_of_root(self):
228
 
        transport = MemoryTransport()
229
 
        self.assertEqual("memory:///", transport.base)
230
 
        self.assertEqual("memory:///", transport.abspath('/'))
231
 
 
232
 
    def test_abspath_of_relpath_starting_at_root(self):
233
 
        transport = MemoryTransport()
234
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
235
 
 
236
 
    def test_append_and_get(self):
237
 
        transport = MemoryTransport()
238
 
        transport.append_bytes('path', 'content')
239
 
        self.assertEqual(transport.get('path').read(), 'content')
240
 
        transport.append_file('path', StringIO('content'))
241
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
242
 
 
243
 
    def test_put_and_get(self):
244
 
        transport = MemoryTransport()
245
 
        transport.put_file('path', StringIO('content'))
246
 
        self.assertEqual(transport.get('path').read(), 'content')
247
 
        transport.put_bytes('path', 'content')
248
 
        self.assertEqual(transport.get('path').read(), 'content')
249
 
 
250
 
    def test_append_without_dir_fails(self):
251
 
        transport = MemoryTransport()
252
 
        self.assertRaises(NoSuchFile,
253
 
                          transport.append_bytes, 'dir/path', 'content')
254
 
 
255
 
    def test_put_without_dir_fails(self):
256
 
        transport = MemoryTransport()
257
 
        self.assertRaises(NoSuchFile,
258
 
                          transport.put_file, 'dir/path', StringIO('content'))
259
 
 
260
 
    def test_get_missing(self):
261
 
        transport = MemoryTransport()
262
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
263
 
 
264
 
    def test_has_missing(self):
265
 
        transport = MemoryTransport()
266
 
        self.assertEquals(False, transport.has('foo'))
267
 
 
268
 
    def test_has_present(self):
269
 
        transport = MemoryTransport()
270
 
        transport.append_bytes('foo', 'content')
271
 
        self.assertEquals(True, transport.has('foo'))
272
 
 
273
 
    def test_list_dir(self):
274
 
        transport = MemoryTransport()
275
 
        transport.put_bytes('foo', 'content')
276
 
        transport.mkdir('dir')
277
 
        transport.put_bytes('dir/subfoo', 'content')
278
 
        transport.put_bytes('dirlike', 'content')
279
 
 
280
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
281
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
282
 
 
283
 
    def test_mkdir(self):
284
 
        transport = MemoryTransport()
285
 
        transport.mkdir('dir')
286
 
        transport.append_bytes('dir/path', 'content')
287
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
288
 
 
289
 
    def test_mkdir_missing_parent(self):
290
 
        transport = MemoryTransport()
291
 
        self.assertRaises(NoSuchFile,
292
 
                          transport.mkdir, 'dir/dir')
293
 
 
294
 
    def test_mkdir_twice(self):
295
 
        transport = MemoryTransport()
296
 
        transport.mkdir('dir')
297
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
298
 
 
299
 
    def test_parameters(self):
300
 
        transport = MemoryTransport()
301
 
        self.assertEqual(True, transport.listable())
302
 
        self.assertEqual(False, transport.is_readonly())
303
 
 
304
 
    def test_iter_files_recursive(self):
305
 
        transport = MemoryTransport()
306
 
        transport.mkdir('dir')
307
 
        transport.put_bytes('dir/foo', 'content')
308
 
        transport.put_bytes('dir/bar', 'content')
309
 
        transport.put_bytes('bar', 'content')
310
 
        paths = set(transport.iter_files_recursive())
311
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
312
 
 
313
 
    def test_stat(self):
314
 
        transport = MemoryTransport()
315
 
        transport.put_bytes('foo', 'content')
316
 
        transport.put_bytes('bar', 'phowar')
317
 
        self.assertEqual(7, transport.stat('foo').st_size)
318
 
        self.assertEqual(6, transport.stat('bar').st_size)
319
 
 
320
 
 
321
 
class ChrootDecoratorTransportTest(TestCase):
322
 
    """Chroot decoration specific tests."""
323
 
 
324
 
    def test_abspath(self):
325
 
        # The abspath is always relative to the chroot_url.
326
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
327
 
        server.setUp()
328
 
        transport = get_transport(server.get_url())
329
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
330
 
 
331
 
        subdir_transport = transport.clone('subdir')
332
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
333
 
        server.tearDown()
334
 
 
335
 
    def test_clone(self):
336
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
337
 
        server.setUp()
338
 
        transport = get_transport(server.get_url())
339
 
        # relpath from root and root path are the same
340
 
        relpath_cloned = transport.clone('foo')
341
 
        abspath_cloned = transport.clone('/foo')
342
 
        self.assertEqual(server, relpath_cloned.server)
343
 
        self.assertEqual(server, abspath_cloned.server)
344
 
        server.tearDown()
345
 
    
346
 
    def test_chroot_url_preserves_chroot(self):
347
 
        """Calling get_transport on a chroot transport's base should produce a
348
 
        transport with exactly the same behaviour as the original chroot
349
 
        transport.
350
 
 
351
 
        This is so that it is not possible to escape a chroot by doing::
352
 
            url = chroot_transport.base
353
 
            parent_url = urlutils.join(url, '..')
354
 
            new_transport = get_transport(parent_url)
355
 
        """
356
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
357
 
        server.setUp()
358
 
        transport = get_transport(server.get_url())
359
 
        new_transport = get_transport(transport.base)
360
 
        self.assertEqual(transport.server, new_transport.server)
361
 
        self.assertEqual(transport.base, new_transport.base)
362
 
        server.tearDown()
363
 
        
364
 
    def test_urljoin_preserves_chroot(self):
365
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
366
 
        URL that escapes the intended chroot.
367
 
 
368
 
        This is so that it is not possible to escape a chroot by doing::
369
 
            url = chroot_transport.base
370
 
            parent_url = urlutils.join(url, '..')
371
 
            new_transport = get_transport(parent_url)
372
 
        """
373
 
        server = ChrootServer(get_transport('memory:///path/'))
374
 
        server.setUp()
375
 
        transport = get_transport(server.get_url())
376
 
        self.assertRaises(
377
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
378
 
        server.tearDown()
379
 
 
380
 
 
381
 
class ChrootServerTest(TestCase):
382
 
 
383
 
    def test_construct(self):
384
 
        backing_transport = MemoryTransport()
385
 
        server = ChrootServer(backing_transport)
386
 
        self.assertEqual(backing_transport, server.backing_transport)
387
 
 
388
 
    def test_setUp(self):
389
 
        backing_transport = MemoryTransport()
390
 
        server = ChrootServer(backing_transport)
391
 
        server.setUp()
392
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
393
 
 
394
 
    def test_tearDown(self):
395
 
        backing_transport = MemoryTransport()
396
 
        server = ChrootServer(backing_transport)
397
 
        server.setUp()
398
 
        server.tearDown()
399
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
400
 
 
401
 
    def test_get_url(self):
402
 
        backing_transport = MemoryTransport()
403
 
        server = ChrootServer(backing_transport)
404
 
        server.setUp()
405
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
406
 
        server.tearDown()
407
 
 
408
 
 
409
 
class ReadonlyDecoratorTransportTest(TestCase):
410
 
    """Readonly decoration specific tests."""
411
 
 
412
 
    def test_local_parameters(self):
413
 
        import bzrlib.transport.readonly as readonly
414
 
        # connect to . in readonly mode
415
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
416
 
        self.assertEqual(True, transport.listable())
417
 
        self.assertEqual(True, transport.is_readonly())
418
 
 
419
 
    def test_http_parameters(self):
420
 
        from bzrlib.tests.HttpServer import HttpServer
421
 
        import bzrlib.transport.readonly as readonly
422
 
        # connect to . via http which is not listable
423
 
        server = HttpServer()
424
 
        server.setUp()
425
 
        try:
426
 
            transport = get_transport('readonly+' + server.get_url())
427
 
            self.failUnless(isinstance(transport,
428
 
                                       readonly.ReadonlyTransportDecorator))
429
 
            self.assertEqual(False, transport.listable())
430
 
            self.assertEqual(True, transport.is_readonly())
431
 
        finally:
432
 
            server.tearDown()
433
 
 
434
 
 
435
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
436
 
    """NFS decorator specific tests."""
437
 
 
438
 
    def get_nfs_transport(self, url):
439
 
        import bzrlib.transport.fakenfs as fakenfs
440
 
        # connect to url with nfs decoration
441
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
442
 
 
443
 
    def test_local_parameters(self):
444
 
        # the listable and is_readonly parameters
445
 
        # are not changed by the fakenfs decorator
446
 
        transport = self.get_nfs_transport('.')
447
 
        self.assertEqual(True, transport.listable())
448
 
        self.assertEqual(False, transport.is_readonly())
449
 
 
450
 
    def test_http_parameters(self):
451
 
        # the listable and is_readonly parameters
452
 
        # are not changed by the fakenfs decorator
453
 
        from bzrlib.tests.HttpServer import HttpServer
454
 
        # connect to . via http which is not listable
455
 
        server = HttpServer()
456
 
        server.setUp()
457
 
        try:
458
 
            transport = self.get_nfs_transport(server.get_url())
459
 
            self.assertIsInstance(
460
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
461
 
            self.assertEqual(False, transport.listable())
462
 
            self.assertEqual(True, transport.is_readonly())
463
 
        finally:
464
 
            server.tearDown()
465
 
 
466
 
    def test_fakenfs_server_default(self):
467
 
        # a FakeNFSServer() should bring up a local relpath server for itself
468
 
        import bzrlib.transport.fakenfs as fakenfs
469
 
        server = fakenfs.FakeNFSServer()
470
 
        server.setUp()
471
 
        try:
472
 
            # the url should be decorated appropriately
473
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
474
 
            # and we should be able to get a transport for it
475
 
            transport = get_transport(server.get_url())
476
 
            # which must be a FakeNFSTransportDecorator instance.
477
 
            self.assertIsInstance(
478
 
                transport, fakenfs.FakeNFSTransportDecorator)
479
 
        finally:
480
 
            server.tearDown()
481
 
 
482
 
    def test_fakenfs_rename_semantics(self):
483
 
        # a FakeNFS transport must mangle the way rename errors occur to
484
 
        # look like NFS problems.
485
 
        transport = self.get_nfs_transport('.')
486
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
487
 
                        transport=transport)
488
 
        self.assertRaises(errors.ResourceBusy,
489
 
                          transport.rename, 'from', 'to')
490
 
 
491
 
 
492
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
493
 
    """Tests for simulation of VFAT restrictions"""
494
 
 
495
 
    def get_vfat_transport(self, url):
496
 
        """Return vfat-backed transport for test directory"""
497
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
498
 
        return FakeVFATTransportDecorator('vfat+' + url)
499
 
 
500
 
    def test_transport_creation(self):
501
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
502
 
        transport = self.get_vfat_transport('.')
503
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
504
 
 
505
 
    def test_transport_mkdir(self):
506
 
        transport = self.get_vfat_transport('.')
507
 
        transport.mkdir('HELLO')
508
 
        self.assertTrue(transport.has('hello'))
509
 
        self.assertTrue(transport.has('Hello'))
510
 
 
511
 
    def test_forbidden_chars(self):
512
 
        transport = self.get_vfat_transport('.')
513
 
        self.assertRaises(ValueError, transport.has, "<NU>")
514
 
 
515
 
 
516
 
class BadTransportHandler(Transport):
517
 
    def __init__(self, base_url):
518
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
519
 
 
520
 
 
521
 
class BackupTransportHandler(Transport):
522
 
    """Test transport that works as a backup for the BadTransportHandler"""
523
 
    pass
524
 
 
525
 
 
526
 
class TestTransportImplementation(TestCaseInTempDir):
527
 
    """Implementation verification for transports.
528
 
    
529
 
    To verify a transport we need a server factory, which is a callable
530
 
    that accepts no parameters and returns an implementation of
531
 
    bzrlib.transport.Server.
532
 
    
533
 
    That Server is then used to construct transport instances and test
534
 
    the transport via loopback activity.
535
 
 
536
 
    Currently this assumes that the Transport object is connected to the 
537
 
    current working directory.  So that whatever is done 
538
 
    through the transport, should show up in the working 
539
 
    directory, and vice-versa. This is a bug, because its possible to have
540
 
    URL schemes which provide access to something that may not be 
541
 
    result in storage on the local disk, i.e. due to file system limits, or 
542
 
    due to it being a database or some other non-filesystem tool.
543
 
 
544
 
    This also tests to make sure that the functions work with both
545
 
    generators and lists (assuming iter(list) is effectively a generator)
546
 
    """
547
 
    
548
 
    def setUp(self):
549
 
        super(TestTransportImplementation, self).setUp()
550
 
        self._server = self.transport_server()
551
 
        self._server.setUp()
552
 
        self.addCleanup(self._server.tearDown)
553
 
 
554
 
    def get_transport(self, relpath=None):
555
 
        """Return a connected transport to the local directory.
556
 
 
557
 
        :param relpath: a path relative to the base url.
558
 
        """
559
 
        base_url = self._server.get_url()
560
 
        url = self._adjust_url(base_url, relpath)
561
 
        # try getting the transport via the regular interface:
562
 
        t = get_transport(url)
563
 
        # vila--20070607 if the following are commented out the test suite
564
 
        # still pass. Is this really still needed or was it a forgotten
565
 
        # temporary fix ?
566
 
        if not isinstance(t, self.transport_class):
567
 
            # we did not get the correct transport class type. Override the
568
 
            # regular connection behaviour by direct construction.
569
 
            t = self.transport_class(url)
570
 
        return t
571
 
 
572
 
 
573
 
class TestLocalTransports(TestCase):
574
 
 
575
 
    def test_get_transport_from_abspath(self):
576
 
        here = osutils.abspath('.')
577
 
        t = get_transport(here)
578
 
        self.assertIsInstance(t, LocalTransport)
579
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
580
 
 
581
 
    def test_get_transport_from_relpath(self):
582
 
        here = osutils.abspath('.')
583
 
        t = get_transport('.')
584
 
        self.assertIsInstance(t, LocalTransport)
585
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
586
 
 
587
 
    def test_get_transport_from_local_url(self):
588
 
        here = osutils.abspath('.')
589
 
        here_url = urlutils.local_path_to_url(here) + '/'
590
 
        t = get_transport(here_url)
591
 
        self.assertIsInstance(t, LocalTransport)
592
 
        self.assertEquals(t.base, here_url)
593
 
 
594
 
    def test_local_abspath(self):
595
 
        here = osutils.abspath('.')
596
 
        t = get_transport(here)
597
 
        self.assertEquals(t.local_abspath(''), here)
598
 
 
599
 
 
600
 
class TestWin32LocalTransport(TestCase):
601
 
 
602
 
    def test_unc_clone_to_root(self):
603
 
        # Win32 UNC path like \\HOST\path
604
 
        # clone to root should stop at least at \\HOST part
605
 
        # not on \\
606
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
607
 
        for i in xrange(4):
608
 
            t = t.clone('..')
609
 
        self.assertEquals(t.base, 'file://HOST/')
610
 
        # make sure we reach the root
611
 
        t = t.clone('..')
612
 
        self.assertEquals(t.base, 'file://HOST/')
613
 
 
614
 
 
615
 
class TestConnectedTransport(TestCase):
616
 
    """Tests for connected to remote server transports"""
617
 
 
618
 
    def test_parse_url(self):
619
 
        t = ConnectedTransport('sftp://simple.example.com/home/source')
620
 
        self.assertEquals(t._host, 'simple.example.com')
621
 
        self.assertEquals(t._port, None)
622
 
        self.assertEquals(t._path, '/home/source/')
623
 
        self.failUnless(t._user is None)
624
 
        self.failUnless(t._password is None)
625
 
 
626
 
        self.assertEquals(t.base, 'sftp://simple.example.com/home/source/')
627
 
 
628
 
    def test_parse_quoted_url(self):
629
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
630
 
        self.assertEquals(t._host, 'exAmple.com')
631
 
        self.assertEquals(t._port, 2222)
632
 
        self.assertEquals(t._user, 'robey')
633
 
        self.assertEquals(t._password, 'h@t')
634
 
        self.assertEquals(t._path, '/path/')
635
 
 
636
 
        # Base should not keep track of the password
637
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
638
 
 
639
 
    def test_parse_invalid_url(self):
640
 
        self.assertRaises(errors.InvalidURL,
641
 
                          ConnectedTransport,
642
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
643
 
 
644
 
    def test_relpath(self):
645
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
646
 
 
647
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
648
 
        self.assertRaises(errors.PathNotChild, t.relpath,
649
 
                          'http://user@host.com/abs/path/sub')
650
 
        self.assertRaises(errors.PathNotChild, t.relpath,
651
 
                          'sftp://user2@host.com/abs/path/sub')
652
 
        self.assertRaises(errors.PathNotChild, t.relpath,
653
 
                          'sftp://user@otherhost.com/abs/path/sub')
654
 
        self.assertRaises(errors.PathNotChild, t.relpath,
655
 
                          'sftp://user@host.com:33/abs/path/sub')
656
 
        # Make sure it works when we don't supply a username
657
 
        t = ConnectedTransport('sftp://host.com/abs/path')
658
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
659
 
 
660
 
        # Make sure it works when parts of the path will be url encoded
661
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
662
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
663
 
 
664
 
    def test_connection_sharing_propagate_credentials(self):
665
 
        t = ConnectedTransport('foo://user@host.com/abs/path')
666
 
        self.assertIs(None, t._get_connection())
667
 
        self.assertIs(None, t._password)
668
 
        c = t.clone('subdir')
669
 
        self.assertEquals(None, c._get_connection())
670
 
        self.assertIs(None, t._password)
671
 
 
672
 
        # Simulate the user entering a password
673
 
        password = 'secret'
674
 
        connection = object()
675
 
        t._set_connection(connection, password)
676
 
        self.assertIs(connection, t._get_connection())
677
 
        self.assertIs(password, t._get_credentials())
678
 
        self.assertIs(connection, c._get_connection())
679
 
        self.assertIs(password, c._get_credentials())
680
 
 
681
 
        # credentials can be updated
682
 
        new_password = 'even more secret'
683
 
        c._update_credentials(new_password)
684
 
        self.assertIs(connection, t._get_connection())
685
 
        self.assertIs(new_password, t._get_credentials())
686
 
        self.assertIs(connection, c._get_connection())
687
 
        self.assertIs(new_password, c._get_credentials())
688
 
 
689
 
 
690
 
class TestReusedTransports(TestCase):
691
 
    """Tests for transport reuse"""
692
 
 
693
 
    def test_reuse_same_transport(self):
694
 
        possible_transports = []
695
 
        t1 = get_transport('http://foo/',
696
 
                           possible_transports=possible_transports)
697
 
        self.assertEqual([t1], possible_transports)
698
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
699
 
        self.assertIs(t1, t2)
700
 
 
701
 
        # Also check that final '/' are handled correctly
702
 
        t3 = get_transport('http://foo/path/')
703
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
704
 
        self.assertIs(t3, t4)
705
 
 
706
 
        t5 = get_transport('http://foo/path')
707
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
708
 
        self.assertIs(t5, t6)
709
 
 
710
 
    def test_don_t_reuse_different_transport(self):
711
 
        t1 = get_transport('http://foo/path')
712
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
713
 
        self.assertIsNot(t1, t2)
714
 
 
715
 
 
716
 
def get_test_permutations():
717
 
    """Return transport permutations to be used in testing.
718
 
 
719
 
    This module registers some transports, but they're only for testing
720
 
    registration.  We don't really want to run all the transport tests against
721
 
    them.
722
 
    """
723
 
    return []