~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: mbp at sourcefrog
  • Date: 2005-03-24 00:44:18 UTC
  • Revision ID: mbp@sourcefrog.net-20050324004418-b4a050f656c07f5f
show space usage for various stores in the info command

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
import os
19
 
import sys
20
 
import stat
21
 
from cStringIO import StringIO
22
 
 
23
 
import bzrlib
24
 
from bzrlib import (
25
 
    errors,
26
 
    urlutils,
27
 
    )
28
 
from bzrlib.errors import (ConnectionError,
29
 
                           DependencyNotPresent,
30
 
                           FileExists,
31
 
                           InvalidURLJoin,
32
 
                           NoSuchFile,
33
 
                           PathNotChild,
34
 
                           TransportNotPossible,
35
 
                           ConnectionError,
36
 
                           DependencyNotPresent,
37
 
                           ReadError,
38
 
                           UnsupportedProtocol,
39
 
                           )
40
 
from bzrlib.tests import TestCase, TestCaseInTempDir
41
 
from bzrlib.transport import (_CoalescedOffset,
42
 
                              ConnectedTransport,
43
 
                              _get_protocol_handlers,
44
 
                              _set_protocol_handlers,
45
 
                              _get_transport_modules,
46
 
                              get_transport,
47
 
                              LateReadError,
48
 
                              register_lazy_transport,
49
 
                              register_transport_proto,
50
 
                              _clear_protocol_handlers,
51
 
                              Transport,
52
 
                              )
53
 
from bzrlib.transport.chroot import ChrootServer
54
 
from bzrlib.transport.memory import MemoryTransport
55
 
from bzrlib.transport.local import (LocalTransport,
56
 
                                    EmulatedWin32LocalTransport)
57
 
 
58
 
 
59
 
# TODO: Should possibly split transport-specific tests into their own files.
60
 
 
61
 
 
62
 
class TestTransport(TestCase):
63
 
    """Test the non transport-concrete class functionality."""
64
 
 
65
 
    def test__get_set_protocol_handlers(self):
66
 
        handlers = _get_protocol_handlers()
67
 
        self.assertNotEqual([], handlers.keys( ))
68
 
        try:
69
 
            _clear_protocol_handlers()
70
 
            self.assertEqual([], _get_protocol_handlers().keys())
71
 
        finally:
72
 
            _set_protocol_handlers(handlers)
73
 
 
74
 
    def test_get_transport_modules(self):
75
 
        handlers = _get_protocol_handlers()
76
 
        class SampleHandler(object):
77
 
            """I exist, isnt that enough?"""
78
 
        try:
79
 
            _clear_protocol_handlers()
80
 
            register_transport_proto('foo')
81
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
82
 
            register_transport_proto('bar')
83
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
84
 
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
85
 
                             _get_transport_modules())
86
 
        finally:
87
 
            _set_protocol_handlers(handlers)
88
 
 
89
 
    def test_transport_dependency(self):
90
 
        """Transport with missing dependency causes no error"""
91
 
        saved_handlers = _get_protocol_handlers()
92
 
        try:
93
 
            register_transport_proto('foo')
94
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
95
 
                    'BadTransportHandler')
96
 
            try:
97
 
                get_transport('foo://fooserver/foo')
98
 
            except UnsupportedProtocol, e:
99
 
                e_str = str(e)
100
 
                self.assertEquals('Unsupported protocol'
101
 
                                  ' for url "foo://fooserver/foo":'
102
 
                                  ' Unable to import library "some_lib":'
103
 
                                  ' testing missing dependency', str(e))
104
 
            else:
105
 
                self.fail('Did not raise UnsupportedProtocol')
106
 
        finally:
107
 
            # restore original values
108
 
            _set_protocol_handlers(saved_handlers)
109
 
            
110
 
    def test_transport_fallback(self):
111
 
        """Transport with missing dependency causes no error"""
112
 
        saved_handlers = _get_protocol_handlers()
113
 
        try:
114
 
            _clear_protocol_handlers()
115
 
            register_transport_proto('foo')
116
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
117
 
                    'BackupTransportHandler')
118
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
119
 
                    'BadTransportHandler')
120
 
            t = get_transport('foo://fooserver/foo')
121
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
122
 
        finally:
123
 
            _set_protocol_handlers(saved_handlers)
124
 
 
125
 
    def test_LateReadError(self):
126
 
        """The LateReadError helper should raise on read()."""
127
 
        a_file = LateReadError('a path')
128
 
        try:
129
 
            a_file.read()
130
 
        except ReadError, error:
131
 
            self.assertEqual('a path', error.path)
132
 
        self.assertRaises(ReadError, a_file.read, 40)
133
 
        a_file.close()
134
 
 
135
 
    def test__combine_paths(self):
136
 
        t = Transport('/')
137
 
        self.assertEqual('/home/sarah/project/foo',
138
 
                         t._combine_paths('/home/sarah', 'project/foo'))
139
 
        self.assertEqual('/etc',
140
 
                         t._combine_paths('/home/sarah', '../../etc'))
141
 
        self.assertEqual('/etc',
142
 
                         t._combine_paths('/home/sarah', '../../../etc'))
143
 
        self.assertEqual('/etc',
144
 
                         t._combine_paths('/home/sarah', '/etc'))
145
 
 
146
 
    def test_local_abspath_non_local_transport(self):
147
 
        # the base implementation should throw
148
 
        t = MemoryTransport()
149
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
150
 
        self.assertEqual('memory:///t is not a local path.', str(e))
151
 
 
152
 
 
153
 
class TestCoalesceOffsets(TestCase):
154
 
    
155
 
    def check(self, expected, offsets, limit=0, fudge=0):
156
 
        coalesce = Transport._coalesce_offsets
157
 
        exp = [_CoalescedOffset(*x) for x in expected]
158
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
159
 
        self.assertEqual(exp, out)
160
 
 
161
 
    def test_coalesce_empty(self):
162
 
        self.check([], [])
163
 
 
164
 
    def test_coalesce_simple(self):
165
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
166
 
 
167
 
    def test_coalesce_unrelated(self):
168
 
        self.check([(0, 10, [(0, 10)]),
169
 
                    (20, 10, [(0, 10)]),
170
 
                   ], [(0, 10), (20, 10)])
171
 
            
172
 
    def test_coalesce_unsorted(self):
173
 
        self.check([(20, 10, [(0, 10)]),
174
 
                    (0, 10, [(0, 10)]),
175
 
                   ], [(20, 10), (0, 10)])
176
 
 
177
 
    def test_coalesce_nearby(self):
178
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
179
 
                   [(0, 10), (10, 10)])
180
 
 
181
 
    def test_coalesce_overlapped(self):
182
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
183
 
                   [(0, 10), (5, 10)])
184
 
 
185
 
    def test_coalesce_limit(self):
186
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
 
                              (30, 10), (40, 10)]),
188
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
189
 
                              (30, 10), (40, 10)]),
190
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
191
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
192
 
                       (90, 10), (100, 10)],
193
 
                    limit=5)
194
 
 
195
 
    def test_coalesce_no_limit(self):
196
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
 
                               (30, 10), (40, 10), (50, 10),
198
 
                               (60, 10), (70, 10), (80, 10),
199
 
                               (90, 10)]),
200
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
201
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
202
 
                       (90, 10), (100, 10)])
203
 
 
204
 
    def test_coalesce_fudge(self):
205
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
206
 
                    (100, 10, [(0, 10),]),
207
 
                   ], [(10, 10), (30, 10), (100, 10)],
208
 
                   fudge=10
209
 
                  )
210
 
 
211
 
 
212
 
class TestMemoryTransport(TestCase):
213
 
 
214
 
    def test_get_transport(self):
215
 
        MemoryTransport()
216
 
 
217
 
    def test_clone(self):
218
 
        transport = MemoryTransport()
219
 
        self.assertTrue(isinstance(transport, MemoryTransport))
220
 
        self.assertEqual("memory:///", transport.clone("/").base)
221
 
 
222
 
    def test_abspath(self):
223
 
        transport = MemoryTransport()
224
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
225
 
 
226
 
    def test_abspath_of_root(self):
227
 
        transport = MemoryTransport()
228
 
        self.assertEqual("memory:///", transport.base)
229
 
        self.assertEqual("memory:///", transport.abspath('/'))
230
 
 
231
 
    def test_abspath_of_relpath_starting_at_root(self):
232
 
        transport = MemoryTransport()
233
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
234
 
 
235
 
    def test_append_and_get(self):
236
 
        transport = MemoryTransport()
237
 
        transport.append_bytes('path', 'content')
238
 
        self.assertEqual(transport.get('path').read(), 'content')
239
 
        transport.append_file('path', StringIO('content'))
240
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
241
 
 
242
 
    def test_put_and_get(self):
243
 
        transport = MemoryTransport()
244
 
        transport.put_file('path', StringIO('content'))
245
 
        self.assertEqual(transport.get('path').read(), 'content')
246
 
        transport.put_bytes('path', 'content')
247
 
        self.assertEqual(transport.get('path').read(), 'content')
248
 
 
249
 
    def test_append_without_dir_fails(self):
250
 
        transport = MemoryTransport()
251
 
        self.assertRaises(NoSuchFile,
252
 
                          transport.append_bytes, 'dir/path', 'content')
253
 
 
254
 
    def test_put_without_dir_fails(self):
255
 
        transport = MemoryTransport()
256
 
        self.assertRaises(NoSuchFile,
257
 
                          transport.put_file, 'dir/path', StringIO('content'))
258
 
 
259
 
    def test_get_missing(self):
260
 
        transport = MemoryTransport()
261
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
262
 
 
263
 
    def test_has_missing(self):
264
 
        transport = MemoryTransport()
265
 
        self.assertEquals(False, transport.has('foo'))
266
 
 
267
 
    def test_has_present(self):
268
 
        transport = MemoryTransport()
269
 
        transport.append_bytes('foo', 'content')
270
 
        self.assertEquals(True, transport.has('foo'))
271
 
 
272
 
    def test_list_dir(self):
273
 
        transport = MemoryTransport()
274
 
        transport.put_bytes('foo', 'content')
275
 
        transport.mkdir('dir')
276
 
        transport.put_bytes('dir/subfoo', 'content')
277
 
        transport.put_bytes('dirlike', 'content')
278
 
 
279
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
280
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
281
 
 
282
 
    def test_mkdir(self):
283
 
        transport = MemoryTransport()
284
 
        transport.mkdir('dir')
285
 
        transport.append_bytes('dir/path', 'content')
286
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
287
 
 
288
 
    def test_mkdir_missing_parent(self):
289
 
        transport = MemoryTransport()
290
 
        self.assertRaises(NoSuchFile,
291
 
                          transport.mkdir, 'dir/dir')
292
 
 
293
 
    def test_mkdir_twice(self):
294
 
        transport = MemoryTransport()
295
 
        transport.mkdir('dir')
296
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
297
 
 
298
 
    def test_parameters(self):
299
 
        transport = MemoryTransport()
300
 
        self.assertEqual(True, transport.listable())
301
 
        self.assertEqual(False, transport.is_readonly())
302
 
 
303
 
    def test_iter_files_recursive(self):
304
 
        transport = MemoryTransport()
305
 
        transport.mkdir('dir')
306
 
        transport.put_bytes('dir/foo', 'content')
307
 
        transport.put_bytes('dir/bar', 'content')
308
 
        transport.put_bytes('bar', 'content')
309
 
        paths = set(transport.iter_files_recursive())
310
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
311
 
 
312
 
    def test_stat(self):
313
 
        transport = MemoryTransport()
314
 
        transport.put_bytes('foo', 'content')
315
 
        transport.put_bytes('bar', 'phowar')
316
 
        self.assertEqual(7, transport.stat('foo').st_size)
317
 
        self.assertEqual(6, transport.stat('bar').st_size)
318
 
 
319
 
 
320
 
class ChrootDecoratorTransportTest(TestCase):
321
 
    """Chroot decoration specific tests."""
322
 
 
323
 
    def test_abspath(self):
324
 
        # The abspath is always relative to the chroot_url.
325
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
326
 
        server.setUp()
327
 
        transport = get_transport(server.get_url())
328
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
329
 
 
330
 
        subdir_transport = transport.clone('subdir')
331
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
332
 
        server.tearDown()
333
 
 
334
 
    def test_clone(self):
335
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
336
 
        server.setUp()
337
 
        transport = get_transport(server.get_url())
338
 
        # relpath from root and root path are the same
339
 
        relpath_cloned = transport.clone('foo')
340
 
        abspath_cloned = transport.clone('/foo')
341
 
        self.assertEqual(server, relpath_cloned.server)
342
 
        self.assertEqual(server, abspath_cloned.server)
343
 
        server.tearDown()
344
 
    
345
 
    def test_chroot_url_preserves_chroot(self):
346
 
        """Calling get_transport on a chroot transport's base should produce a
347
 
        transport with exactly the same behaviour as the original chroot
348
 
        transport.
349
 
 
350
 
        This is so that it is not possible to escape a chroot by doing::
351
 
            url = chroot_transport.base
352
 
            parent_url = urlutils.join(url, '..')
353
 
            new_transport = get_transport(parent_url)
354
 
        """
355
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
356
 
        server.setUp()
357
 
        transport = get_transport(server.get_url())
358
 
        new_transport = get_transport(transport.base)
359
 
        self.assertEqual(transport.server, new_transport.server)
360
 
        self.assertEqual(transport.base, new_transport.base)
361
 
        server.tearDown()
362
 
        
363
 
    def test_urljoin_preserves_chroot(self):
364
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
365
 
        URL that escapes the intended chroot.
366
 
 
367
 
        This is so that it is not possible to escape a chroot by doing::
368
 
            url = chroot_transport.base
369
 
            parent_url = urlutils.join(url, '..')
370
 
            new_transport = get_transport(parent_url)
371
 
        """
372
 
        server = ChrootServer(get_transport('memory:///path/'))
373
 
        server.setUp()
374
 
        transport = get_transport(server.get_url())
375
 
        self.assertRaises(
376
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
377
 
        server.tearDown()
378
 
 
379
 
 
380
 
class ChrootServerTest(TestCase):
381
 
 
382
 
    def test_construct(self):
383
 
        backing_transport = MemoryTransport()
384
 
        server = ChrootServer(backing_transport)
385
 
        self.assertEqual(backing_transport, server.backing_transport)
386
 
 
387
 
    def test_setUp(self):
388
 
        backing_transport = MemoryTransport()
389
 
        server = ChrootServer(backing_transport)
390
 
        server.setUp()
391
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
392
 
 
393
 
    def test_tearDown(self):
394
 
        backing_transport = MemoryTransport()
395
 
        server = ChrootServer(backing_transport)
396
 
        server.setUp()
397
 
        server.tearDown()
398
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
399
 
 
400
 
    def test_get_url(self):
401
 
        backing_transport = MemoryTransport()
402
 
        server = ChrootServer(backing_transport)
403
 
        server.setUp()
404
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
405
 
        server.tearDown()
406
 
 
407
 
 
408
 
class ReadonlyDecoratorTransportTest(TestCase):
409
 
    """Readonly decoration specific tests."""
410
 
 
411
 
    def test_local_parameters(self):
412
 
        import bzrlib.transport.readonly as readonly
413
 
        # connect to . in readonly mode
414
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
415
 
        self.assertEqual(True, transport.listable())
416
 
        self.assertEqual(True, transport.is_readonly())
417
 
 
418
 
    def test_http_parameters(self):
419
 
        from bzrlib.tests.HttpServer import HttpServer
420
 
        import bzrlib.transport.readonly as readonly
421
 
        # connect to . via http which is not listable
422
 
        server = HttpServer()
423
 
        server.setUp()
424
 
        try:
425
 
            transport = get_transport('readonly+' + server.get_url())
426
 
            self.failUnless(isinstance(transport,
427
 
                                       readonly.ReadonlyTransportDecorator))
428
 
            self.assertEqual(False, transport.listable())
429
 
            self.assertEqual(True, transport.is_readonly())
430
 
        finally:
431
 
            server.tearDown()
432
 
 
433
 
 
434
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
435
 
    """NFS decorator specific tests."""
436
 
 
437
 
    def get_nfs_transport(self, url):
438
 
        import bzrlib.transport.fakenfs as fakenfs
439
 
        # connect to url with nfs decoration
440
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
441
 
 
442
 
    def test_local_parameters(self):
443
 
        # the listable and is_readonly parameters
444
 
        # are not changed by the fakenfs decorator
445
 
        transport = self.get_nfs_transport('.')
446
 
        self.assertEqual(True, transport.listable())
447
 
        self.assertEqual(False, transport.is_readonly())
448
 
 
449
 
    def test_http_parameters(self):
450
 
        # the listable and is_readonly parameters
451
 
        # are not changed by the fakenfs decorator
452
 
        from bzrlib.tests.HttpServer import HttpServer
453
 
        # connect to . via http which is not listable
454
 
        server = HttpServer()
455
 
        server.setUp()
456
 
        try:
457
 
            transport = self.get_nfs_transport(server.get_url())
458
 
            self.assertIsInstance(
459
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
460
 
            self.assertEqual(False, transport.listable())
461
 
            self.assertEqual(True, transport.is_readonly())
462
 
        finally:
463
 
            server.tearDown()
464
 
 
465
 
    def test_fakenfs_server_default(self):
466
 
        # a FakeNFSServer() should bring up a local relpath server for itself
467
 
        import bzrlib.transport.fakenfs as fakenfs
468
 
        server = fakenfs.FakeNFSServer()
469
 
        server.setUp()
470
 
        try:
471
 
            # the url should be decorated appropriately
472
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
473
 
            # and we should be able to get a transport for it
474
 
            transport = get_transport(server.get_url())
475
 
            # which must be a FakeNFSTransportDecorator instance.
476
 
            self.assertIsInstance(
477
 
                transport, fakenfs.FakeNFSTransportDecorator)
478
 
        finally:
479
 
            server.tearDown()
480
 
 
481
 
    def test_fakenfs_rename_semantics(self):
482
 
        # a FakeNFS transport must mangle the way rename errors occur to
483
 
        # look like NFS problems.
484
 
        transport = self.get_nfs_transport('.')
485
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
486
 
                        transport=transport)
487
 
        self.assertRaises(errors.ResourceBusy,
488
 
                          transport.rename, 'from', 'to')
489
 
 
490
 
 
491
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
492
 
    """Tests for simulation of VFAT restrictions"""
493
 
 
494
 
    def get_vfat_transport(self, url):
495
 
        """Return vfat-backed transport for test directory"""
496
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
497
 
        return FakeVFATTransportDecorator('vfat+' + url)
498
 
 
499
 
    def test_transport_creation(self):
500
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
501
 
        transport = self.get_vfat_transport('.')
502
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
503
 
 
504
 
    def test_transport_mkdir(self):
505
 
        transport = self.get_vfat_transport('.')
506
 
        transport.mkdir('HELLO')
507
 
        self.assertTrue(transport.has('hello'))
508
 
        self.assertTrue(transport.has('Hello'))
509
 
 
510
 
    def test_forbidden_chars(self):
511
 
        transport = self.get_vfat_transport('.')
512
 
        self.assertRaises(ValueError, transport.has, "<NU>")
513
 
 
514
 
 
515
 
class BadTransportHandler(Transport):
516
 
    def __init__(self, base_url):
517
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
518
 
 
519
 
 
520
 
class BackupTransportHandler(Transport):
521
 
    """Test transport that works as a backup for the BadTransportHandler"""
522
 
    pass
523
 
 
524
 
 
525
 
class TestTransportImplementation(TestCaseInTempDir):
526
 
    """Implementation verification for transports.
527
 
    
528
 
    To verify a transport we need a server factory, which is a callable
529
 
    that accepts no parameters and returns an implementation of
530
 
    bzrlib.transport.Server.
531
 
    
532
 
    That Server is then used to construct transport instances and test
533
 
    the transport via loopback activity.
534
 
 
535
 
    Currently this assumes that the Transport object is connected to the 
536
 
    current working directory.  So that whatever is done 
537
 
    through the transport, should show up in the working 
538
 
    directory, and vice-versa. This is a bug, because its possible to have
539
 
    URL schemes which provide access to something that may not be 
540
 
    result in storage on the local disk, i.e. due to file system limits, or 
541
 
    due to it being a database or some other non-filesystem tool.
542
 
 
543
 
    This also tests to make sure that the functions work with both
544
 
    generators and lists (assuming iter(list) is effectively a generator)
545
 
    """
546
 
    
547
 
    def setUp(self):
548
 
        super(TestTransportImplementation, self).setUp()
549
 
        self._server = self.transport_server()
550
 
        self._server.setUp()
551
 
        self.addCleanup(self._server.tearDown)
552
 
 
553
 
    def get_transport(self, relpath=None):
554
 
        """Return a connected transport to the local directory.
555
 
 
556
 
        :param relpath: a path relative to the base url.
557
 
        """
558
 
        base_url = self._server.get_url()
559
 
        url = self._adjust_url(base_url, relpath)
560
 
        # try getting the transport via the regular interface:
561
 
        t = get_transport(url)
562
 
        # vila--20070607 if the following are commented out the test suite
563
 
        # still pass. Is this really still needed or was it a forgotten
564
 
        # temporary fix ?
565
 
        if not isinstance(t, self.transport_class):
566
 
            # we did not get the correct transport class type. Override the
567
 
            # regular connection behaviour by direct construction.
568
 
            t = self.transport_class(url)
569
 
        return t
570
 
 
571
 
 
572
 
class TestLocalTransports(TestCase):
573
 
 
574
 
    def test_get_transport_from_abspath(self):
575
 
        here = os.path.abspath('.')
576
 
        t = get_transport(here)
577
 
        self.assertIsInstance(t, LocalTransport)
578
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
579
 
 
580
 
    def test_get_transport_from_relpath(self):
581
 
        here = os.path.abspath('.')
582
 
        t = get_transport('.')
583
 
        self.assertIsInstance(t, LocalTransport)
584
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
585
 
 
586
 
    def test_get_transport_from_local_url(self):
587
 
        here = os.path.abspath('.')
588
 
        here_url = urlutils.local_path_to_url(here) + '/'
589
 
        t = get_transport(here_url)
590
 
        self.assertIsInstance(t, LocalTransport)
591
 
        self.assertEquals(t.base, here_url)
592
 
 
593
 
    def test_local_abspath(self):
594
 
        here = os.path.abspath('.')
595
 
        t = get_transport(here)
596
 
        self.assertEquals(t.local_abspath(''), here)
597
 
 
598
 
 
599
 
class TestWin32LocalTransport(TestCase):
600
 
 
601
 
    def test_unc_clone_to_root(self):
602
 
        # Win32 UNC path like \\HOST\path
603
 
        # clone to root should stop at least at \\HOST part
604
 
        # not on \\
605
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
606
 
        for i in xrange(4):
607
 
            t = t.clone('..')
608
 
        self.assertEquals(t.base, 'file://HOST/')
609
 
        # make sure we reach the root
610
 
        t = t.clone('..')
611
 
        self.assertEquals(t.base, 'file://HOST/')
612
 
 
613
 
 
614
 
class TestConnectedTransport(TestCase):
615
 
    """Tests for connected to remote server transports"""
616
 
 
617
 
    def test_parse_url(self):
618
 
        t = ConnectedTransport('sftp://simple.example.com/home/source')
619
 
        self.assertEquals(t._host, 'simple.example.com')
620
 
        self.assertEquals(t._port, None)
621
 
        self.assertEquals(t._path, '/home/source/')
622
 
        self.failUnless(t._user is None)
623
 
        self.failUnless(t._password is None)
624
 
 
625
 
        self.assertEquals(t.base, 'sftp://simple.example.com/home/source/')
626
 
 
627
 
    def test_parse_quoted_url(self):
628
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
629
 
        self.assertEquals(t._host, 'exAmple.com')
630
 
        self.assertEquals(t._port, 2222)
631
 
        self.assertEquals(t._user, 'robey')
632
 
        self.assertEquals(t._password, 'h@t')
633
 
        self.assertEquals(t._path, '/path/')
634
 
 
635
 
        # Base should not keep track of the password
636
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
637
 
 
638
 
    def test_parse_invalid_url(self):
639
 
        self.assertRaises(errors.InvalidURL,
640
 
                          ConnectedTransport,
641
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
642
 
 
643
 
    def test_relpath(self):
644
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
645
 
 
646
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
647
 
        self.assertRaises(errors.PathNotChild, t.relpath,
648
 
                          'http://user@host.com/abs/path/sub')
649
 
        self.assertRaises(errors.PathNotChild, t.relpath,
650
 
                          'sftp://user2@host.com/abs/path/sub')
651
 
        self.assertRaises(errors.PathNotChild, t.relpath,
652
 
                          'sftp://user@otherhost.com/abs/path/sub')
653
 
        self.assertRaises(errors.PathNotChild, t.relpath,
654
 
                          'sftp://user@host.com:33/abs/path/sub')
655
 
        # Make sure it works when we don't supply a username
656
 
        t = ConnectedTransport('sftp://host.com/abs/path')
657
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
658
 
 
659
 
        # Make sure it works when parts of the path will be url encoded
660
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
661
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
662
 
 
663
 
    def test_connection_sharing_propagate_credentials(self):
664
 
        t = ConnectedTransport('foo://user@host.com/abs/path')
665
 
        self.assertIs(None, t._get_connection())
666
 
        self.assertIs(None, t._password)
667
 
        c = t.clone('subdir')
668
 
        self.assertEquals(None, c._get_connection())
669
 
        self.assertIs(None, t._password)
670
 
 
671
 
        # Simulate the user entering a password
672
 
        password = 'secret'
673
 
        connection = object()
674
 
        t._set_connection(connection, password)
675
 
        self.assertIs(connection, t._get_connection())
676
 
        self.assertIs(password, t._get_credentials())
677
 
        self.assertIs(connection, c._get_connection())
678
 
        self.assertIs(password, c._get_credentials())
679
 
 
680
 
        # credentials can be updated
681
 
        new_password = 'even more secret'
682
 
        c._update_credentials(new_password)
683
 
        self.assertIs(connection, t._get_connection())
684
 
        self.assertIs(new_password, t._get_credentials())
685
 
        self.assertIs(connection, c._get_connection())
686
 
        self.assertIs(new_password, c._get_credentials())
687
 
 
688
 
 
689
 
class TestReusedTransports(TestCase):
690
 
    """Tests for transport reuse"""
691
 
 
692
 
    def test_reuse_same_transport(self):
693
 
        t1 = get_transport('http://foo/')
694
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
695
 
        self.assertIs(t1, t2)
696
 
 
697
 
        # Also check that final '/' are handled correctly
698
 
        t3 = get_transport('http://foo/path/')
699
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
700
 
        self.assertIs(t3, t4)
701
 
 
702
 
        t5 = get_transport('http://foo/path')
703
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
704
 
        self.assertIs(t5, t6)
705
 
 
706
 
    def test_don_t_reuse_different_transport(self):
707
 
        t1 = get_transport('http://foo/path')
708
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
709
 
        self.assertIsNot(t1, t2)
710
 
 
711
 
 
712
 
def get_test_permutations():
713
 
    """Return transport permutations to be used in testing.
714
 
 
715
 
    This module registers some transports, but they're only for testing
716
 
    registration.  We don't really want to run all the transport tests against
717
 
    them.
718
 
    """
719
 
    return []