~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-07-02 07:06:39 UTC
  • mfrom: (2553.2.14 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20070702070639-um9oyfoc2i6g8umv
(robertc) Reduce duplication in interface based testing by extracting a new class TestScenarioApplier.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
 
21
from cStringIO import StringIO
 
22
 
 
23
import bzrlib
 
24
from bzrlib import (
 
25
    errors,
 
26
    urlutils,
 
27
    )
 
28
from bzrlib.errors import (ConnectionError,
 
29
                           DependencyNotPresent,
 
30
                           FileExists,
 
31
                           InvalidURLJoin,
 
32
                           NoSuchFile,
 
33
                           PathNotChild,
 
34
                           TransportNotPossible,
 
35
                           UnsupportedProtocol,
 
36
                           )
 
37
from bzrlib.tests import TestCase, TestCaseInTempDir
 
38
from bzrlib.transport import (_CoalescedOffset,
 
39
                              _get_protocol_handlers,
 
40
                              _set_protocol_handlers,
 
41
                              _get_transport_modules,
 
42
                              get_transport,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              _clear_protocol_handlers,
 
46
                              Transport,
 
47
                              )
 
48
from bzrlib.transport.chroot import ChrootServer
 
49
from bzrlib.transport.memory import MemoryTransport
 
50
from bzrlib.transport.local import (LocalTransport,
 
51
                                    EmulatedWin32LocalTransport)
 
52
 
 
53
 
 
54
# TODO: Should possibly split transport-specific tests into their own files.
 
55
 
 
56
 
 
57
class TestTransport(TestCase):
 
58
    """Test the non transport-concrete class functionality."""
 
59
 
 
60
    def test__get_set_protocol_handlers(self):
 
61
        handlers = _get_protocol_handlers()
 
62
        self.assertNotEqual([], handlers.keys( ))
 
63
        try:
 
64
            _clear_protocol_handlers()
 
65
            self.assertEqual([], _get_protocol_handlers().keys())
 
66
        finally:
 
67
            _set_protocol_handlers(handlers)
 
68
 
 
69
    def test_get_transport_modules(self):
 
70
        handlers = _get_protocol_handlers()
 
71
        class SampleHandler(object):
 
72
            """I exist, isnt that enough?"""
 
73
        try:
 
74
            _clear_protocol_handlers()
 
75
            register_transport_proto('foo')
 
76
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
77
            register_transport_proto('bar')
 
78
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
80
                             _get_transport_modules())
 
81
        finally:
 
82
            _set_protocol_handlers(handlers)
 
83
 
 
84
    def test_transport_dependency(self):
 
85
        """Transport with missing dependency causes no error"""
 
86
        saved_handlers = _get_protocol_handlers()
 
87
        try:
 
88
            register_transport_proto('foo')
 
89
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
90
                    'BadTransportHandler')
 
91
            try:
 
92
                get_transport('foo://fooserver/foo')
 
93
            except UnsupportedProtocol, e:
 
94
                e_str = str(e)
 
95
                self.assertEquals('Unsupported protocol'
 
96
                                  ' for url "foo://fooserver/foo":'
 
97
                                  ' Unable to import library "some_lib":'
 
98
                                  ' testing missing dependency', str(e))
 
99
            else:
 
100
                self.fail('Did not raise UnsupportedProtocol')
 
101
        finally:
 
102
            # restore original values
 
103
            _set_protocol_handlers(saved_handlers)
 
104
            
 
105
    def test_transport_fallback(self):
 
106
        """Transport with missing dependency causes no error"""
 
107
        saved_handlers = _get_protocol_handlers()
 
108
        try:
 
109
            _clear_protocol_handlers()
 
110
            register_transport_proto('foo')
 
111
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
112
                    'BackupTransportHandler')
 
113
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
114
                    'BadTransportHandler')
 
115
            t = get_transport('foo://fooserver/foo')
 
116
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
117
        finally:
 
118
            _set_protocol_handlers(saved_handlers)
 
119
 
 
120
    def test__combine_paths(self):
 
121
        t = Transport('/')
 
122
        self.assertEqual('/home/sarah/project/foo',
 
123
                         t._combine_paths('/home/sarah', 'project/foo'))
 
124
        self.assertEqual('/etc',
 
125
                         t._combine_paths('/home/sarah', '../../etc'))
 
126
        self.assertEqual('/etc',
 
127
                         t._combine_paths('/home/sarah', '../../../etc'))
 
128
        self.assertEqual('/etc',
 
129
                         t._combine_paths('/home/sarah', '/etc'))
 
130
 
 
131
    def test_local_abspath_non_local_transport(self):
 
132
        # the base implementation should throw
 
133
        t = MemoryTransport()
 
134
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
135
        self.assertEqual('memory:///t is not a local path.', str(e))
 
136
 
 
137
 
 
138
class TestCoalesceOffsets(TestCase):
 
139
    
 
140
    def check(self, expected, offsets, limit=0, fudge=0):
 
141
        coalesce = Transport._coalesce_offsets
 
142
        exp = [_CoalescedOffset(*x) for x in expected]
 
143
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
144
        self.assertEqual(exp, out)
 
145
 
 
146
    def test_coalesce_empty(self):
 
147
        self.check([], [])
 
148
 
 
149
    def test_coalesce_simple(self):
 
150
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
151
 
 
152
    def test_coalesce_unrelated(self):
 
153
        self.check([(0, 10, [(0, 10)]),
 
154
                    (20, 10, [(0, 10)]),
 
155
                   ], [(0, 10), (20, 10)])
 
156
            
 
157
    def test_coalesce_unsorted(self):
 
158
        self.check([(20, 10, [(0, 10)]),
 
159
                    (0, 10, [(0, 10)]),
 
160
                   ], [(20, 10), (0, 10)])
 
161
 
 
162
    def test_coalesce_nearby(self):
 
163
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
164
                   [(0, 10), (10, 10)])
 
165
 
 
166
    def test_coalesce_overlapped(self):
 
167
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
168
                   [(0, 10), (5, 10)])
 
169
 
 
170
    def test_coalesce_limit(self):
 
171
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
172
                              (30, 10), (40, 10)]),
 
173
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
174
                              (30, 10), (40, 10)]),
 
175
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
176
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
177
                       (90, 10), (100, 10)],
 
178
                    limit=5)
 
179
 
 
180
    def test_coalesce_no_limit(self):
 
181
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
182
                               (30, 10), (40, 10), (50, 10),
 
183
                               (60, 10), (70, 10), (80, 10),
 
184
                               (90, 10)]),
 
185
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
186
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
187
                       (90, 10), (100, 10)])
 
188
 
 
189
    def test_coalesce_fudge(self):
 
190
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
191
                    (100, 10, [(0, 10),]),
 
192
                   ], [(10, 10), (30, 10), (100, 10)],
 
193
                   fudge=10
 
194
                  )
 
195
 
 
196
 
 
197
class TestMemoryTransport(TestCase):
 
198
 
 
199
    def test_get_transport(self):
 
200
        MemoryTransport()
 
201
 
 
202
    def test_clone(self):
 
203
        transport = MemoryTransport()
 
204
        self.assertTrue(isinstance(transport, MemoryTransport))
 
205
        self.assertEqual("memory:///", transport.clone("/").base)
 
206
 
 
207
    def test_abspath(self):
 
208
        transport = MemoryTransport()
 
209
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
210
 
 
211
    def test_abspath_of_root(self):
 
212
        transport = MemoryTransport()
 
213
        self.assertEqual("memory:///", transport.base)
 
214
        self.assertEqual("memory:///", transport.abspath('/'))
 
215
 
 
216
    def test_abspath_of_relpath_starting_at_root(self):
 
217
        transport = MemoryTransport()
 
218
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
219
 
 
220
    def test_append_and_get(self):
 
221
        transport = MemoryTransport()
 
222
        transport.append_bytes('path', 'content')
 
223
        self.assertEqual(transport.get('path').read(), 'content')
 
224
        transport.append_file('path', StringIO('content'))
 
225
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
226
 
 
227
    def test_put_and_get(self):
 
228
        transport = MemoryTransport()
 
229
        transport.put_file('path', StringIO('content'))
 
230
        self.assertEqual(transport.get('path').read(), 'content')
 
231
        transport.put_bytes('path', 'content')
 
232
        self.assertEqual(transport.get('path').read(), 'content')
 
233
 
 
234
    def test_append_without_dir_fails(self):
 
235
        transport = MemoryTransport()
 
236
        self.assertRaises(NoSuchFile,
 
237
                          transport.append_bytes, 'dir/path', 'content')
 
238
 
 
239
    def test_put_without_dir_fails(self):
 
240
        transport = MemoryTransport()
 
241
        self.assertRaises(NoSuchFile,
 
242
                          transport.put_file, 'dir/path', StringIO('content'))
 
243
 
 
244
    def test_get_missing(self):
 
245
        transport = MemoryTransport()
 
246
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
247
 
 
248
    def test_has_missing(self):
 
249
        transport = MemoryTransport()
 
250
        self.assertEquals(False, transport.has('foo'))
 
251
 
 
252
    def test_has_present(self):
 
253
        transport = MemoryTransport()
 
254
        transport.append_bytes('foo', 'content')
 
255
        self.assertEquals(True, transport.has('foo'))
 
256
 
 
257
    def test_list_dir(self):
 
258
        transport = MemoryTransport()
 
259
        transport.put_bytes('foo', 'content')
 
260
        transport.mkdir('dir')
 
261
        transport.put_bytes('dir/subfoo', 'content')
 
262
        transport.put_bytes('dirlike', 'content')
 
263
 
 
264
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
265
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
266
 
 
267
    def test_mkdir(self):
 
268
        transport = MemoryTransport()
 
269
        transport.mkdir('dir')
 
270
        transport.append_bytes('dir/path', 'content')
 
271
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
272
 
 
273
    def test_mkdir_missing_parent(self):
 
274
        transport = MemoryTransport()
 
275
        self.assertRaises(NoSuchFile,
 
276
                          transport.mkdir, 'dir/dir')
 
277
 
 
278
    def test_mkdir_twice(self):
 
279
        transport = MemoryTransport()
 
280
        transport.mkdir('dir')
 
281
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
282
 
 
283
    def test_parameters(self):
 
284
        transport = MemoryTransport()
 
285
        self.assertEqual(True, transport.listable())
 
286
        self.assertEqual(False, transport.should_cache())
 
287
        self.assertEqual(False, transport.is_readonly())
 
288
 
 
289
    def test_iter_files_recursive(self):
 
290
        transport = MemoryTransport()
 
291
        transport.mkdir('dir')
 
292
        transport.put_bytes('dir/foo', 'content')
 
293
        transport.put_bytes('dir/bar', 'content')
 
294
        transport.put_bytes('bar', 'content')
 
295
        paths = set(transport.iter_files_recursive())
 
296
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
297
 
 
298
    def test_stat(self):
 
299
        transport = MemoryTransport()
 
300
        transport.put_bytes('foo', 'content')
 
301
        transport.put_bytes('bar', 'phowar')
 
302
        self.assertEqual(7, transport.stat('foo').st_size)
 
303
        self.assertEqual(6, transport.stat('bar').st_size)
 
304
 
 
305
 
 
306
class ChrootDecoratorTransportTest(TestCase):
 
307
    """Chroot decoration specific tests."""
 
308
 
 
309
    def test_abspath(self):
 
310
        # The abspath is always relative to the chroot_url.
 
311
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
312
        server.setUp()
 
313
        transport = get_transport(server.get_url())
 
314
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
315
 
 
316
        subdir_transport = transport.clone('subdir')
 
317
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
318
        server.tearDown()
 
319
 
 
320
    def test_clone(self):
 
321
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
322
        server.setUp()
 
323
        transport = get_transport(server.get_url())
 
324
        # relpath from root and root path are the same
 
325
        relpath_cloned = transport.clone('foo')
 
326
        abspath_cloned = transport.clone('/foo')
 
327
        self.assertEqual(server, relpath_cloned.server)
 
328
        self.assertEqual(server, abspath_cloned.server)
 
329
        server.tearDown()
 
330
    
 
331
    def test_chroot_url_preserves_chroot(self):
 
332
        """Calling get_transport on a chroot transport's base should produce a
 
333
        transport with exactly the same behaviour as the original chroot
 
334
        transport.
 
335
 
 
336
        This is so that it is not possible to escape a chroot by doing::
 
337
            url = chroot_transport.base
 
338
            parent_url = urlutils.join(url, '..')
 
339
            new_transport = get_transport(parent_url)
 
340
        """
 
341
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
342
        server.setUp()
 
343
        transport = get_transport(server.get_url())
 
344
        new_transport = get_transport(transport.base)
 
345
        self.assertEqual(transport.server, new_transport.server)
 
346
        self.assertEqual(transport.base, new_transport.base)
 
347
        server.tearDown()
 
348
        
 
349
    def test_urljoin_preserves_chroot(self):
 
350
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
351
        URL that escapes the intended chroot.
 
352
 
 
353
        This is so that it is not possible to escape a chroot by doing::
 
354
            url = chroot_transport.base
 
355
            parent_url = urlutils.join(url, '..')
 
356
            new_transport = get_transport(parent_url)
 
357
        """
 
358
        server = ChrootServer(get_transport('memory:///path/'))
 
359
        server.setUp()
 
360
        transport = get_transport(server.get_url())
 
361
        self.assertRaises(
 
362
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
363
        server.tearDown()
 
364
 
 
365
 
 
366
class ChrootServerTest(TestCase):
 
367
 
 
368
    def test_construct(self):
 
369
        backing_transport = MemoryTransport()
 
370
        server = ChrootServer(backing_transport)
 
371
        self.assertEqual(backing_transport, server.backing_transport)
 
372
 
 
373
    def test_setUp(self):
 
374
        backing_transport = MemoryTransport()
 
375
        server = ChrootServer(backing_transport)
 
376
        server.setUp()
 
377
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
378
 
 
379
    def test_tearDown(self):
 
380
        backing_transport = MemoryTransport()
 
381
        server = ChrootServer(backing_transport)
 
382
        server.setUp()
 
383
        server.tearDown()
 
384
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
385
 
 
386
    def test_get_url(self):
 
387
        backing_transport = MemoryTransport()
 
388
        server = ChrootServer(backing_transport)
 
389
        server.setUp()
 
390
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
391
        server.tearDown()
 
392
 
 
393
 
 
394
class ReadonlyDecoratorTransportTest(TestCase):
 
395
    """Readonly decoration specific tests."""
 
396
 
 
397
    def test_local_parameters(self):
 
398
        import bzrlib.transport.readonly as readonly
 
399
        # connect to . in readonly mode
 
400
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
401
        self.assertEqual(True, transport.listable())
 
402
        self.assertEqual(False, transport.should_cache())
 
403
        self.assertEqual(True, transport.is_readonly())
 
404
 
 
405
    def test_http_parameters(self):
 
406
        from bzrlib.tests.HttpServer import HttpServer
 
407
        import bzrlib.transport.readonly as readonly
 
408
        # connect to . via http which is not listable
 
409
        server = HttpServer()
 
410
        server.setUp()
 
411
        try:
 
412
            transport = get_transport('readonly+' + server.get_url())
 
413
            self.failUnless(isinstance(transport,
 
414
                                       readonly.ReadonlyTransportDecorator))
 
415
            self.assertEqual(False, transport.listable())
 
416
            self.assertEqual(True, transport.should_cache())
 
417
            self.assertEqual(True, transport.is_readonly())
 
418
        finally:
 
419
            server.tearDown()
 
420
 
 
421
 
 
422
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
423
    """NFS decorator specific tests."""
 
424
 
 
425
    def get_nfs_transport(self, url):
 
426
        import bzrlib.transport.fakenfs as fakenfs
 
427
        # connect to url with nfs decoration
 
428
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
429
 
 
430
    def test_local_parameters(self):
 
431
        # the listable, should_cache and is_readonly parameters
 
432
        # are not changed by the fakenfs decorator
 
433
        transport = self.get_nfs_transport('.')
 
434
        self.assertEqual(True, transport.listable())
 
435
        self.assertEqual(False, transport.should_cache())
 
436
        self.assertEqual(False, transport.is_readonly())
 
437
 
 
438
    def test_http_parameters(self):
 
439
        # the listable, should_cache and is_readonly parameters
 
440
        # are not changed by the fakenfs decorator
 
441
        from bzrlib.tests.HttpServer import HttpServer
 
442
        # connect to . via http which is not listable
 
443
        server = HttpServer()
 
444
        server.setUp()
 
445
        try:
 
446
            transport = self.get_nfs_transport(server.get_url())
 
447
            self.assertIsInstance(
 
448
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
449
            self.assertEqual(False, transport.listable())
 
450
            self.assertEqual(True, transport.should_cache())
 
451
            self.assertEqual(True, transport.is_readonly())
 
452
        finally:
 
453
            server.tearDown()
 
454
 
 
455
    def test_fakenfs_server_default(self):
 
456
        # a FakeNFSServer() should bring up a local relpath server for itself
 
457
        import bzrlib.transport.fakenfs as fakenfs
 
458
        server = fakenfs.FakeNFSServer()
 
459
        server.setUp()
 
460
        try:
 
461
            # the url should be decorated appropriately
 
462
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
463
            # and we should be able to get a transport for it
 
464
            transport = get_transport(server.get_url())
 
465
            # which must be a FakeNFSTransportDecorator instance.
 
466
            self.assertIsInstance(
 
467
                transport, fakenfs.FakeNFSTransportDecorator)
 
468
        finally:
 
469
            server.tearDown()
 
470
 
 
471
    def test_fakenfs_rename_semantics(self):
 
472
        # a FakeNFS transport must mangle the way rename errors occur to
 
473
        # look like NFS problems.
 
474
        transport = self.get_nfs_transport('.')
 
475
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
476
                        transport=transport)
 
477
        self.assertRaises(errors.ResourceBusy,
 
478
                          transport.rename, 'from', 'to')
 
479
 
 
480
 
 
481
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
482
    """Tests for simulation of VFAT restrictions"""
 
483
 
 
484
    def get_vfat_transport(self, url):
 
485
        """Return vfat-backed transport for test directory"""
 
486
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
487
        return FakeVFATTransportDecorator('vfat+' + url)
 
488
 
 
489
    def test_transport_creation(self):
 
490
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
491
        transport = self.get_vfat_transport('.')
 
492
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
493
 
 
494
    def test_transport_mkdir(self):
 
495
        transport = self.get_vfat_transport('.')
 
496
        transport.mkdir('HELLO')
 
497
        self.assertTrue(transport.has('hello'))
 
498
        self.assertTrue(transport.has('Hello'))
 
499
 
 
500
    def test_forbidden_chars(self):
 
501
        transport = self.get_vfat_transport('.')
 
502
        self.assertRaises(ValueError, transport.has, "<NU>")
 
503
 
 
504
 
 
505
class BadTransportHandler(Transport):
 
506
    def __init__(self, base_url):
 
507
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
508
 
 
509
 
 
510
class BackupTransportHandler(Transport):
 
511
    """Test transport that works as a backup for the BadTransportHandler"""
 
512
    pass
 
513
 
 
514
 
 
515
class TestTransportImplementation(TestCaseInTempDir):
 
516
    """Implementation verification for transports.
 
517
    
 
518
    To verify a transport we need a server factory, which is a callable
 
519
    that accepts no parameters and returns an implementation of
 
520
    bzrlib.transport.Server.
 
521
    
 
522
    That Server is then used to construct transport instances and test
 
523
    the transport via loopback activity.
 
524
 
 
525
    Currently this assumes that the Transport object is connected to the 
 
526
    current working directory.  So that whatever is done 
 
527
    through the transport, should show up in the working 
 
528
    directory, and vice-versa. This is a bug, because its possible to have
 
529
    URL schemes which provide access to something that may not be 
 
530
    result in storage on the local disk, i.e. due to file system limits, or 
 
531
    due to it being a database or some other non-filesystem tool.
 
532
 
 
533
    This also tests to make sure that the functions work with both
 
534
    generators and lists (assuming iter(list) is effectively a generator)
 
535
    """
 
536
    
 
537
    def setUp(self):
 
538
        super(TestTransportImplementation, self).setUp()
 
539
        self._server = self.transport_server()
 
540
        self._server.setUp()
 
541
        self.addCleanup(self._server.tearDown)
 
542
 
 
543
    def get_transport(self):
 
544
        """Return a connected transport to the local directory."""
 
545
        base_url = self._server.get_url()
 
546
        # try getting the transport via the regular interface:
 
547
        t = get_transport(base_url)
 
548
        if not isinstance(t, self.transport_class):
 
549
            # we did not get the correct transport class type. Override the
 
550
            # regular connection behaviour by direct construction.
 
551
            t = self.transport_class(base_url)
 
552
        return t
 
553
 
 
554
 
 
555
class TestLocalTransports(TestCase):
 
556
 
 
557
    def test_get_transport_from_abspath(self):
 
558
        here = os.path.abspath('.')
 
559
        t = get_transport(here)
 
560
        self.assertIsInstance(t, LocalTransport)
 
561
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
562
 
 
563
    def test_get_transport_from_relpath(self):
 
564
        here = os.path.abspath('.')
 
565
        t = get_transport('.')
 
566
        self.assertIsInstance(t, LocalTransport)
 
567
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
568
 
 
569
    def test_get_transport_from_local_url(self):
 
570
        here = os.path.abspath('.')
 
571
        here_url = urlutils.local_path_to_url(here) + '/'
 
572
        t = get_transport(here_url)
 
573
        self.assertIsInstance(t, LocalTransport)
 
574
        self.assertEquals(t.base, here_url)
 
575
 
 
576
    def test_local_abspath(self):
 
577
        here = os.path.abspath('.')
 
578
        t = get_transport(here)
 
579
        self.assertEquals(t.local_abspath(''), here)
 
580
 
 
581
 
 
582
class TestWin32LocalTransport(TestCase):
 
583
 
 
584
    def test_unc_clone_to_root(self):
 
585
        # Win32 UNC path like \\HOST\path
 
586
        # clone to root should stop at least at \\HOST part
 
587
        # not on \\
 
588
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
589
        for i in xrange(4):
 
590
            t = t.clone('..')
 
591
        self.assertEquals(t.base, 'file://HOST/')
 
592
        # make sure we reach the root
 
593
        t = t.clone('..')
 
594
        self.assertEquals(t.base, 'file://HOST/')
 
595
 
 
596
 
 
597
def get_test_permutations():
 
598
    """Return transport permutations to be used in testing.
 
599
 
 
600
    This module registers some transports, but they're only for testing
 
601
    registration.  We don't really want to run all the transport tests against
 
602
    them.
 
603
    """
 
604
    return []