~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2007-11-29 07:12:42 UTC
  • mto: This revision was merged to the branch mainline in revision 3048.
  • Revision ID: mbp@sourcefrog.net-20071129071242-1tcn2a18547udlvw
Fix up calls to TestCase.build_tree passing a string rather than a list

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
 
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
 
54
 
 
55
 
 
56
class TestTransport(TestCase):
 
57
    """Test the non transport-concrete class functionality."""
 
58
 
 
59
    def test__get_set_protocol_handlers(self):
 
60
        handlers = _get_protocol_handlers()
 
61
        self.assertNotEqual([], handlers.keys( ))
 
62
        try:
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
 
67
 
 
68
    def test_get_transport_modules(self):
 
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
 
72
        class SampleHandler(object):
 
73
            """I exist, isnt that enough?"""
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
 
85
        finally:
 
86
            _set_protocol_handlers(handlers)
 
87
 
 
88
    def test_transport_dependency(self):
 
89
        """Transport with missing dependency causes no error"""
 
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
 
93
        try:
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
 
110
            
 
111
    def test_transport_fallback(self):
 
112
        """Transport with missing dependency causes no error"""
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
 
125
 
 
126
    def test_LateReadError(self):
 
127
        """The LateReadError helper should raise on read()."""
 
128
        a_file = LateReadError('a path')
 
129
        try:
 
130
            a_file.read()
 
131
        except ReadError, error:
 
132
            self.assertEqual('a path', error.path)
 
133
        self.assertRaises(ReadError, a_file.read, 40)
 
134
        a_file.close()
 
135
 
 
136
    def test__combine_paths(self):
 
137
        t = Transport('/')
 
138
        self.assertEqual('/home/sarah/project/foo',
 
139
                         t._combine_paths('/home/sarah', 'project/foo'))
 
140
        self.assertEqual('/etc',
 
141
                         t._combine_paths('/home/sarah', '../../etc'))
 
142
        self.assertEqual('/etc',
 
143
                         t._combine_paths('/home/sarah', '../../../etc'))
 
144
        self.assertEqual('/etc',
 
145
                         t._combine_paths('/home/sarah', '/etc'))
 
146
 
 
147
    def test_local_abspath_non_local_transport(self):
 
148
        # the base implementation should throw
 
149
        t = MemoryTransport()
 
150
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
151
        self.assertEqual('memory:///t is not a local path.', str(e))
 
152
 
 
153
 
 
154
class TestCoalesceOffsets(TestCase):
 
155
    
 
156
    def check(self, expected, offsets, limit=0, fudge=0):
 
157
        coalesce = Transport._coalesce_offsets
 
158
        exp = [_CoalescedOffset(*x) for x in expected]
 
159
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
160
        self.assertEqual(exp, out)
 
161
 
 
162
    def test_coalesce_empty(self):
 
163
        self.check([], [])
 
164
 
 
165
    def test_coalesce_simple(self):
 
166
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
167
 
 
168
    def test_coalesce_unrelated(self):
 
169
        self.check([(0, 10, [(0, 10)]),
 
170
                    (20, 10, [(0, 10)]),
 
171
                   ], [(0, 10), (20, 10)])
 
172
            
 
173
    def test_coalesce_unsorted(self):
 
174
        self.check([(20, 10, [(0, 10)]),
 
175
                    (0, 10, [(0, 10)]),
 
176
                   ], [(20, 10), (0, 10)])
 
177
 
 
178
    def test_coalesce_nearby(self):
 
179
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
180
                   [(0, 10), (10, 10)])
 
181
 
 
182
    def test_coalesce_overlapped(self):
 
183
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
184
                   [(0, 10), (5, 10)])
 
185
 
 
186
    def test_coalesce_limit(self):
 
187
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
188
                              (30, 10), (40, 10)]),
 
189
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
190
                              (30, 10), (40, 10)]),
 
191
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
192
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
193
                       (90, 10), (100, 10)],
 
194
                    limit=5)
 
195
 
 
196
    def test_coalesce_no_limit(self):
 
197
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
198
                               (30, 10), (40, 10), (50, 10),
 
199
                               (60, 10), (70, 10), (80, 10),
 
200
                               (90, 10)]),
 
201
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
202
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
203
                       (90, 10), (100, 10)])
 
204
 
 
205
    def test_coalesce_fudge(self):
 
206
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
207
                    (100, 10, [(0, 10),]),
 
208
                   ], [(10, 10), (30, 10), (100, 10)],
 
209
                   fudge=10
 
210
                  )
 
211
 
 
212
 
 
213
class TestMemoryTransport(TestCase):
 
214
 
 
215
    def test_get_transport(self):
 
216
        MemoryTransport()
 
217
 
 
218
    def test_clone(self):
 
219
        transport = MemoryTransport()
 
220
        self.assertTrue(isinstance(transport, MemoryTransport))
 
221
        self.assertEqual("memory:///", transport.clone("/").base)
 
222
 
 
223
    def test_abspath(self):
 
224
        transport = MemoryTransport()
 
225
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
226
 
 
227
    def test_abspath_of_root(self):
 
228
        transport = MemoryTransport()
 
229
        self.assertEqual("memory:///", transport.base)
 
230
        self.assertEqual("memory:///", transport.abspath('/'))
 
231
 
 
232
    def test_abspath_of_relpath_starting_at_root(self):
 
233
        transport = MemoryTransport()
 
234
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
235
 
 
236
    def test_append_and_get(self):
 
237
        transport = MemoryTransport()
 
238
        transport.append_bytes('path', 'content')
 
239
        self.assertEqual(transport.get('path').read(), 'content')
 
240
        transport.append_file('path', StringIO('content'))
 
241
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
242
 
 
243
    def test_put_and_get(self):
 
244
        transport = MemoryTransport()
 
245
        transport.put_file('path', StringIO('content'))
 
246
        self.assertEqual(transport.get('path').read(), 'content')
 
247
        transport.put_bytes('path', 'content')
 
248
        self.assertEqual(transport.get('path').read(), 'content')
 
249
 
 
250
    def test_append_without_dir_fails(self):
 
251
        transport = MemoryTransport()
 
252
        self.assertRaises(NoSuchFile,
 
253
                          transport.append_bytes, 'dir/path', 'content')
 
254
 
 
255
    def test_put_without_dir_fails(self):
 
256
        transport = MemoryTransport()
 
257
        self.assertRaises(NoSuchFile,
 
258
                          transport.put_file, 'dir/path', StringIO('content'))
 
259
 
 
260
    def test_get_missing(self):
 
261
        transport = MemoryTransport()
 
262
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
263
 
 
264
    def test_has_missing(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertEquals(False, transport.has('foo'))
 
267
 
 
268
    def test_has_present(self):
 
269
        transport = MemoryTransport()
 
270
        transport.append_bytes('foo', 'content')
 
271
        self.assertEquals(True, transport.has('foo'))
 
272
 
 
273
    def test_list_dir(self):
 
274
        transport = MemoryTransport()
 
275
        transport.put_bytes('foo', 'content')
 
276
        transport.mkdir('dir')
 
277
        transport.put_bytes('dir/subfoo', 'content')
 
278
        transport.put_bytes('dirlike', 'content')
 
279
 
 
280
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
281
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
282
 
 
283
    def test_mkdir(self):
 
284
        transport = MemoryTransport()
 
285
        transport.mkdir('dir')
 
286
        transport.append_bytes('dir/path', 'content')
 
287
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
288
 
 
289
    def test_mkdir_missing_parent(self):
 
290
        transport = MemoryTransport()
 
291
        self.assertRaises(NoSuchFile,
 
292
                          transport.mkdir, 'dir/dir')
 
293
 
 
294
    def test_mkdir_twice(self):
 
295
        transport = MemoryTransport()
 
296
        transport.mkdir('dir')
 
297
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
298
 
 
299
    def test_parameters(self):
 
300
        transport = MemoryTransport()
 
301
        self.assertEqual(True, transport.listable())
 
302
        self.assertEqual(False, transport.is_readonly())
 
303
 
 
304
    def test_iter_files_recursive(self):
 
305
        transport = MemoryTransport()
 
306
        transport.mkdir('dir')
 
307
        transport.put_bytes('dir/foo', 'content')
 
308
        transport.put_bytes('dir/bar', 'content')
 
309
        transport.put_bytes('bar', 'content')
 
310
        paths = set(transport.iter_files_recursive())
 
311
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
312
 
 
313
    def test_stat(self):
 
314
        transport = MemoryTransport()
 
315
        transport.put_bytes('foo', 'content')
 
316
        transport.put_bytes('bar', 'phowar')
 
317
        self.assertEqual(7, transport.stat('foo').st_size)
 
318
        self.assertEqual(6, transport.stat('bar').st_size)
 
319
 
 
320
 
 
321
class ChrootDecoratorTransportTest(TestCase):
 
322
    """Chroot decoration specific tests."""
 
323
 
 
324
    def test_abspath(self):
 
325
        # The abspath is always relative to the chroot_url.
 
326
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
327
        server.setUp()
 
328
        transport = get_transport(server.get_url())
 
329
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
330
 
 
331
        subdir_transport = transport.clone('subdir')
 
332
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
333
        server.tearDown()
 
334
 
 
335
    def test_clone(self):
 
336
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
337
        server.setUp()
 
338
        transport = get_transport(server.get_url())
 
339
        # relpath from root and root path are the same
 
340
        relpath_cloned = transport.clone('foo')
 
341
        abspath_cloned = transport.clone('/foo')
 
342
        self.assertEqual(server, relpath_cloned.server)
 
343
        self.assertEqual(server, abspath_cloned.server)
 
344
        server.tearDown()
 
345
    
 
346
    def test_chroot_url_preserves_chroot(self):
 
347
        """Calling get_transport on a chroot transport's base should produce a
 
348
        transport with exactly the same behaviour as the original chroot
 
349
        transport.
 
350
 
 
351
        This is so that it is not possible to escape a chroot by doing::
 
352
            url = chroot_transport.base
 
353
            parent_url = urlutils.join(url, '..')
 
354
            new_transport = get_transport(parent_url)
 
355
        """
 
356
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
357
        server.setUp()
 
358
        transport = get_transport(server.get_url())
 
359
        new_transport = get_transport(transport.base)
 
360
        self.assertEqual(transport.server, new_transport.server)
 
361
        self.assertEqual(transport.base, new_transport.base)
 
362
        server.tearDown()
 
363
        
 
364
    def test_urljoin_preserves_chroot(self):
 
365
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
366
        URL that escapes the intended chroot.
 
367
 
 
368
        This is so that it is not possible to escape a chroot by doing::
 
369
            url = chroot_transport.base
 
370
            parent_url = urlutils.join(url, '..')
 
371
            new_transport = get_transport(parent_url)
 
372
        """
 
373
        server = ChrootServer(get_transport('memory:///path/'))
 
374
        server.setUp()
 
375
        transport = get_transport(server.get_url())
 
376
        self.assertRaises(
 
377
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
378
        server.tearDown()
 
379
 
 
380
 
 
381
class ChrootServerTest(TestCase):
 
382
 
 
383
    def test_construct(self):
 
384
        backing_transport = MemoryTransport()
 
385
        server = ChrootServer(backing_transport)
 
386
        self.assertEqual(backing_transport, server.backing_transport)
 
387
 
 
388
    def test_setUp(self):
 
389
        backing_transport = MemoryTransport()
 
390
        server = ChrootServer(backing_transport)
 
391
        server.setUp()
 
392
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
393
 
 
394
    def test_tearDown(self):
 
395
        backing_transport = MemoryTransport()
 
396
        server = ChrootServer(backing_transport)
 
397
        server.setUp()
 
398
        server.tearDown()
 
399
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
400
 
 
401
    def test_get_url(self):
 
402
        backing_transport = MemoryTransport()
 
403
        server = ChrootServer(backing_transport)
 
404
        server.setUp()
 
405
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
406
        server.tearDown()
 
407
 
 
408
 
 
409
class ReadonlyDecoratorTransportTest(TestCase):
 
410
    """Readonly decoration specific tests."""
 
411
 
 
412
    def test_local_parameters(self):
 
413
        import bzrlib.transport.readonly as readonly
 
414
        # connect to . in readonly mode
 
415
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
416
        self.assertEqual(True, transport.listable())
 
417
        self.assertEqual(True, transport.is_readonly())
 
418
 
 
419
    def test_http_parameters(self):
 
420
        from bzrlib.tests.HttpServer import HttpServer
 
421
        import bzrlib.transport.readonly as readonly
 
422
        # connect to . via http which is not listable
 
423
        server = HttpServer()
 
424
        server.setUp()
 
425
        try:
 
426
            transport = get_transport('readonly+' + server.get_url())
 
427
            self.failUnless(isinstance(transport,
 
428
                                       readonly.ReadonlyTransportDecorator))
 
429
            self.assertEqual(False, transport.listable())
 
430
            self.assertEqual(True, transport.is_readonly())
 
431
        finally:
 
432
            server.tearDown()
 
433
 
 
434
 
 
435
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
436
    """NFS decorator specific tests."""
 
437
 
 
438
    def get_nfs_transport(self, url):
 
439
        import bzrlib.transport.fakenfs as fakenfs
 
440
        # connect to url with nfs decoration
 
441
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
442
 
 
443
    def test_local_parameters(self):
 
444
        # the listable and is_readonly parameters
 
445
        # are not changed by the fakenfs decorator
 
446
        transport = self.get_nfs_transport('.')
 
447
        self.assertEqual(True, transport.listable())
 
448
        self.assertEqual(False, transport.is_readonly())
 
449
 
 
450
    def test_http_parameters(self):
 
451
        # the listable and is_readonly parameters
 
452
        # are not changed by the fakenfs decorator
 
453
        from bzrlib.tests.HttpServer import HttpServer
 
454
        # connect to . via http which is not listable
 
455
        server = HttpServer()
 
456
        server.setUp()
 
457
        try:
 
458
            transport = self.get_nfs_transport(server.get_url())
 
459
            self.assertIsInstance(
 
460
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
461
            self.assertEqual(False, transport.listable())
 
462
            self.assertEqual(True, transport.is_readonly())
 
463
        finally:
 
464
            server.tearDown()
 
465
 
 
466
    def test_fakenfs_server_default(self):
 
467
        # a FakeNFSServer() should bring up a local relpath server for itself
 
468
        import bzrlib.transport.fakenfs as fakenfs
 
469
        server = fakenfs.FakeNFSServer()
 
470
        server.setUp()
 
471
        try:
 
472
            # the url should be decorated appropriately
 
473
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
474
            # and we should be able to get a transport for it
 
475
            transport = get_transport(server.get_url())
 
476
            # which must be a FakeNFSTransportDecorator instance.
 
477
            self.assertIsInstance(
 
478
                transport, fakenfs.FakeNFSTransportDecorator)
 
479
        finally:
 
480
            server.tearDown()
 
481
 
 
482
    def test_fakenfs_rename_semantics(self):
 
483
        # a FakeNFS transport must mangle the way rename errors occur to
 
484
        # look like NFS problems.
 
485
        transport = self.get_nfs_transport('.')
 
486
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
487
                        transport=transport)
 
488
        self.assertRaises(errors.ResourceBusy,
 
489
                          transport.rename, 'from', 'to')
 
490
 
 
491
 
 
492
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
493
    """Tests for simulation of VFAT restrictions"""
 
494
 
 
495
    def get_vfat_transport(self, url):
 
496
        """Return vfat-backed transport for test directory"""
 
497
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
498
        return FakeVFATTransportDecorator('vfat+' + url)
 
499
 
 
500
    def test_transport_creation(self):
 
501
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
502
        transport = self.get_vfat_transport('.')
 
503
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
504
 
 
505
    def test_transport_mkdir(self):
 
506
        transport = self.get_vfat_transport('.')
 
507
        transport.mkdir('HELLO')
 
508
        self.assertTrue(transport.has('hello'))
 
509
        self.assertTrue(transport.has('Hello'))
 
510
 
 
511
    def test_forbidden_chars(self):
 
512
        transport = self.get_vfat_transport('.')
 
513
        self.assertRaises(ValueError, transport.has, "<NU>")
 
514
 
 
515
 
 
516
class BadTransportHandler(Transport):
 
517
    def __init__(self, base_url):
 
518
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
519
 
 
520
 
 
521
class BackupTransportHandler(Transport):
 
522
    """Test transport that works as a backup for the BadTransportHandler"""
 
523
    pass
 
524
 
 
525
 
 
526
class TestTransportImplementation(TestCaseInTempDir):
 
527
    """Implementation verification for transports.
 
528
    
 
529
    To verify a transport we need a server factory, which is a callable
 
530
    that accepts no parameters and returns an implementation of
 
531
    bzrlib.transport.Server.
 
532
    
 
533
    That Server is then used to construct transport instances and test
 
534
    the transport via loopback activity.
 
535
 
 
536
    Currently this assumes that the Transport object is connected to the 
 
537
    current working directory.  So that whatever is done 
 
538
    through the transport, should show up in the working 
 
539
    directory, and vice-versa. This is a bug, because its possible to have
 
540
    URL schemes which provide access to something that may not be 
 
541
    result in storage on the local disk, i.e. due to file system limits, or 
 
542
    due to it being a database or some other non-filesystem tool.
 
543
 
 
544
    This also tests to make sure that the functions work with both
 
545
    generators and lists (assuming iter(list) is effectively a generator)
 
546
    """
 
547
    
 
548
    def setUp(self):
 
549
        super(TestTransportImplementation, self).setUp()
 
550
        self._server = self.transport_server()
 
551
        self._server.setUp()
 
552
        self.addCleanup(self._server.tearDown)
 
553
 
 
554
    def get_transport(self, relpath=None):
 
555
        """Return a connected transport to the local directory.
 
556
 
 
557
        :param relpath: a path relative to the base url.
 
558
        """
 
559
        base_url = self._server.get_url()
 
560
        url = self._adjust_url(base_url, relpath)
 
561
        # try getting the transport via the regular interface:
 
562
        t = get_transport(url)
 
563
        # vila--20070607 if the following are commented out the test suite
 
564
        # still pass. Is this really still needed or was it a forgotten
 
565
        # temporary fix ?
 
566
        if not isinstance(t, self.transport_class):
 
567
            # we did not get the correct transport class type. Override the
 
568
            # regular connection behaviour by direct construction.
 
569
            t = self.transport_class(url)
 
570
        return t
 
571
 
 
572
 
 
573
class TestLocalTransports(TestCase):
 
574
 
 
575
    def test_get_transport_from_abspath(self):
 
576
        here = osutils.abspath('.')
 
577
        t = get_transport(here)
 
578
        self.assertIsInstance(t, LocalTransport)
 
579
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
580
 
 
581
    def test_get_transport_from_relpath(self):
 
582
        here = osutils.abspath('.')
 
583
        t = get_transport('.')
 
584
        self.assertIsInstance(t, LocalTransport)
 
585
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
586
 
 
587
    def test_get_transport_from_local_url(self):
 
588
        here = osutils.abspath('.')
 
589
        here_url = urlutils.local_path_to_url(here) + '/'
 
590
        t = get_transport(here_url)
 
591
        self.assertIsInstance(t, LocalTransport)
 
592
        self.assertEquals(t.base, here_url)
 
593
 
 
594
    def test_local_abspath(self):
 
595
        here = osutils.abspath('.')
 
596
        t = get_transport(here)
 
597
        self.assertEquals(t.local_abspath(''), here)
 
598
 
 
599
 
 
600
class TestWin32LocalTransport(TestCase):
 
601
 
 
602
    def test_unc_clone_to_root(self):
 
603
        # Win32 UNC path like \\HOST\path
 
604
        # clone to root should stop at least at \\HOST part
 
605
        # not on \\
 
606
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
607
        for i in xrange(4):
 
608
            t = t.clone('..')
 
609
        self.assertEquals(t.base, 'file://HOST/')
 
610
        # make sure we reach the root
 
611
        t = t.clone('..')
 
612
        self.assertEquals(t.base, 'file://HOST/')
 
613
 
 
614
 
 
615
class TestConnectedTransport(TestCase):
 
616
    """Tests for connected to remote server transports"""
 
617
 
 
618
    def test_parse_url(self):
 
619
        t = ConnectedTransport('http://simple.example.com/home/source')
 
620
        self.assertEquals(t._host, 'simple.example.com')
 
621
        self.assertEquals(t._port, None)
 
622
        self.assertEquals(t._path, '/home/source/')
 
623
        self.failUnless(t._user is None)
 
624
        self.failUnless(t._password is None)
 
625
 
 
626
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
627
 
 
628
    def test_parse_quoted_url(self):
 
629
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
630
        self.assertEquals(t._host, 'exAmple.com')
 
631
        self.assertEquals(t._port, 2222)
 
632
        self.assertEquals(t._user, 'robey')
 
633
        self.assertEquals(t._password, 'h@t')
 
634
        self.assertEquals(t._path, '/path/')
 
635
 
 
636
        # Base should not keep track of the password
 
637
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
638
 
 
639
    def test_parse_invalid_url(self):
 
640
        self.assertRaises(errors.InvalidURL,
 
641
                          ConnectedTransport,
 
642
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
643
 
 
644
    def test_relpath(self):
 
645
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
646
 
 
647
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
648
        self.assertRaises(errors.PathNotChild, t.relpath,
 
649
                          'http://user@host.com/abs/path/sub')
 
650
        self.assertRaises(errors.PathNotChild, t.relpath,
 
651
                          'sftp://user2@host.com/abs/path/sub')
 
652
        self.assertRaises(errors.PathNotChild, t.relpath,
 
653
                          'sftp://user@otherhost.com/abs/path/sub')
 
654
        self.assertRaises(errors.PathNotChild, t.relpath,
 
655
                          'sftp://user@host.com:33/abs/path/sub')
 
656
        # Make sure it works when we don't supply a username
 
657
        t = ConnectedTransport('sftp://host.com/abs/path')
 
658
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
659
 
 
660
        # Make sure it works when parts of the path will be url encoded
 
661
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
662
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
663
 
 
664
    def test_connection_sharing_propagate_credentials(self):
 
665
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
666
        self.assertEquals('user', t._user)
 
667
        self.assertEquals('host.com', t._host)
 
668
        self.assertIs(None, t._get_connection())
 
669
        self.assertIs(None, t._password)
 
670
        c = t.clone('subdir')
 
671
        self.assertIs(None, c._get_connection())
 
672
        self.assertIs(None, t._password)
 
673
 
 
674
        # Simulate the user entering a password
 
675
        password = 'secret'
 
676
        connection = object()
 
677
        t._set_connection(connection, password)
 
678
        self.assertIs(connection, t._get_connection())
 
679
        self.assertIs(password, t._get_credentials())
 
680
        self.assertIs(connection, c._get_connection())
 
681
        self.assertIs(password, c._get_credentials())
 
682
 
 
683
        # credentials can be updated
 
684
        new_password = 'even more secret'
 
685
        c._update_credentials(new_password)
 
686
        self.assertIs(connection, t._get_connection())
 
687
        self.assertIs(new_password, t._get_credentials())
 
688
        self.assertIs(connection, c._get_connection())
 
689
        self.assertIs(new_password, c._get_credentials())
 
690
 
 
691
 
 
692
class TestReusedTransports(TestCase):
 
693
    """Tests for transport reuse"""
 
694
 
 
695
    def test_reuse_same_transport(self):
 
696
        possible_transports = []
 
697
        t1 = get_transport('http://foo/',
 
698
                           possible_transports=possible_transports)
 
699
        self.assertEqual([t1], possible_transports)
 
700
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
701
        self.assertIs(t1, t2)
 
702
 
 
703
        # Also check that final '/' are handled correctly
 
704
        t3 = get_transport('http://foo/path/')
 
705
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
706
        self.assertIs(t3, t4)
 
707
 
 
708
        t5 = get_transport('http://foo/path')
 
709
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
710
        self.assertIs(t5, t6)
 
711
 
 
712
    def test_don_t_reuse_different_transport(self):
 
713
        t1 = get_transport('http://foo/path')
 
714
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
715
        self.assertIsNot(t1, t2)
 
716
 
 
717
 
 
718
class TestTransportTrace(TestCase):
 
719
 
 
720
    def test_get(self):
 
721
        transport = get_transport('trace+memory://')
 
722
        self.assertIsInstance(
 
723
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
724
 
 
725
    def test_clone_preserves_activity(self):
 
726
        transport = get_transport('trace+memory://')
 
727
        transport2 = transport.clone('.')
 
728
        self.assertTrue(transport is not transport2)
 
729
        self.assertTrue(transport._activity is transport2._activity)
 
730
 
 
731
    # the following specific tests are for the operations that have made use of
 
732
    # logging in tests; we could test every single operation but doing that
 
733
    # still won't cause a test failure when the top level Transport API
 
734
    # changes; so there is little return doing that.
 
735
    def test_get(self):
 
736
        transport = get_transport('trace+memory:///')
 
737
        transport.put_bytes('foo', 'barish')
 
738
        transport.get('foo')
 
739
        expected_result = []
 
740
        # put_bytes records the bytes, not the content to avoid memory
 
741
        # pressure.
 
742
        expected_result.append(('put_bytes', 'foo', 6, None))
 
743
        # get records the file name only.
 
744
        expected_result.append(('get', 'foo'))
 
745
        self.assertEqual(expected_result, transport._activity)
 
746
 
 
747
    def test_readv(self):
 
748
        transport = get_transport('trace+memory:///')
 
749
        transport.put_bytes('foo', 'barish')
 
750
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
751
            upper_limit=6))
 
752
        expected_result = []
 
753
        # put_bytes records the bytes, not the content to avoid memory
 
754
        # pressure.
 
755
        expected_result.append(('put_bytes', 'foo', 6, None))
 
756
        # readv records the supplied offset request
 
757
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
758
        self.assertEqual(expected_result, transport._activity)