~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Aaron Bentley
  • Date: 2007-01-16 13:12:54 UTC
  • mto: (2230.3.47 branch6)
  • mto: This revision was merged to the branch mainline in revision 2290.
  • Revision ID: aaron.bentley@utoronto.ca-20070116131254-sjruli93timappd4
work in progress bind stuff

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
18
21
from cStringIO import StringIO
19
22
 
20
23
import bzrlib
21
 
from bzrlib import (
22
 
    errors,
23
 
    osutils,
24
 
    urlutils,
25
 
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
 
24
from bzrlib import urlutils
 
25
from bzrlib.errors import (NoSuchFile, FileExists,
 
26
                           TransportNotPossible,
 
27
                           ConnectionError,
 
28
                           DependencyNotPresent,
 
29
                           UnsupportedProtocol,
30
30
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
31
                           )
34
32
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
 
33
from bzrlib.transport import (_CoalescedOffset,
38
34
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
35
                              _get_transport_modules,
41
36
                              get_transport,
42
 
                              LateReadError,
43
37
                              register_lazy_transport,
44
 
                              register_transport_proto,
 
38
                              _set_protocol_handlers,
45
39
                              Transport,
46
40
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
41
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
42
from bzrlib.transport.local import LocalTransport
51
43
 
52
44
 
53
45
# TODO: Should possibly split transport-specific tests into their own files.
58
50
 
59
51
    def test__get_set_protocol_handlers(self):
60
52
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
 
53
        self.assertNotEqual({}, handlers)
62
54
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
55
            _set_protocol_handlers({})
 
56
            self.assertEqual({}, _get_protocol_handlers())
65
57
        finally:
66
58
            _set_protocol_handlers(handlers)
67
59
 
68
60
    def test_get_transport_modules(self):
69
61
        handlers = _get_protocol_handlers()
70
 
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
72
62
        class SampleHandler(object):
73
63
            """I exist, isnt that enough?"""
74
64
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
65
            my_handlers = {}
 
66
            _set_protocol_handlers(my_handlers)
 
67
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
68
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
69
            self.assertEqual([SampleHandler.__module__],
84
70
                             _get_transport_modules())
85
71
        finally:
86
72
            _set_protocol_handlers(handlers)
88
74
    def test_transport_dependency(self):
89
75
        """Transport with missing dependency causes no error"""
90
76
        saved_handlers = _get_protocol_handlers()
91
 
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
93
77
        try:
94
 
            register_transport_proto('foo')
95
78
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
79
                    'BadTransportHandler')
97
80
            try:
107
90
        finally:
108
91
            # restore original values
109
92
            _set_protocol_handlers(saved_handlers)
110
 
 
 
93
            
111
94
    def test_transport_fallback(self):
112
95
        """Transport with missing dependency causes no error"""
113
96
        saved_handlers = _get_protocol_handlers()
114
97
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
 
98
            _set_protocol_handlers({})
117
99
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
100
                    'BackupTransportHandler')
119
101
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
123
105
        finally:
124
106
            _set_protocol_handlers(saved_handlers)
125
107
 
126
 
    def test_ssh_hints(self):
127
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
 
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
131
 
            e_str = str(e)
132
 
            self.assertEquals('Unsupported protocol'
133
 
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
135
 
                              str(e))
136
 
        else:
137
 
            self.fail('Did not raise UnsupportedProtocol')
138
 
 
139
 
    def test_LateReadError(self):
140
 
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
142
 
        try:
143
 
            a_file.read()
144
 
        except ReadError, error:
145
 
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
147
 
        a_file.close()
148
 
 
149
108
    def test__combine_paths(self):
150
109
        t = Transport('/')
151
110
        self.assertEqual('/home/sarah/project/foo',
157
116
        self.assertEqual('/etc',
158
117
                         t._combine_paths('/home/sarah', '/etc'))
159
118
 
160
 
    def test_local_abspath_non_local_transport(self):
161
 
        # the base implementation should throw
162
 
        t = MemoryTransport()
163
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
 
        self.assertEqual('memory:///t is not a local path.', str(e))
165
 
 
166
119
 
167
120
class TestCoalesceOffsets(TestCase):
168
 
 
169
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
121
    
 
122
    def check(self, expected, offsets, limit=0, fudge=0):
170
123
        coalesce = Transport._coalesce_offsets
171
124
        exp = [_CoalescedOffset(*x) for x in expected]
172
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
 
                            max_size=max_size))
 
125
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
174
126
        self.assertEqual(exp, out)
175
127
 
176
128
    def test_coalesce_empty(self):
183
135
        self.check([(0, 10, [(0, 10)]),
184
136
                    (20, 10, [(0, 10)]),
185
137
                   ], [(0, 10), (20, 10)])
186
 
 
 
138
            
187
139
    def test_coalesce_unsorted(self):
188
140
        self.check([(20, 10, [(0, 10)]),
189
141
                    (0, 10, [(0, 10)]),
194
146
                   [(0, 10), (10, 10)])
195
147
 
196
148
    def test_coalesce_overlapped(self):
197
 
        self.assertRaises(ValueError,
198
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
199
 
                        [(0, 10), (5, 10)])
 
149
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
150
                   [(0, 10), (5, 10)])
200
151
 
201
152
    def test_coalesce_limit(self):
202
153
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
223
174
                   ], [(10, 10), (30, 10), (100, 10)],
224
175
                   fudge=10
225
176
                  )
226
 
    def test_coalesce_max_size(self):
227
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
 
                    (30, 50, [(0, 50)]),
229
 
                    # If one range is above max_size, it gets its own coalesced
230
 
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
232
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
235
 
 
236
 
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
 
                  )
240
 
 
241
 
    def test_coalesce_default_limit(self):
242
 
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
249
 
                   max_size=1*1024*1024*1024)
250
177
 
251
178
 
252
179
class TestMemoryTransport(TestCase):
338
265
    def test_parameters(self):
339
266
        transport = MemoryTransport()
340
267
        self.assertEqual(True, transport.listable())
 
268
        self.assertEqual(False, transport.should_cache())
341
269
        self.assertEqual(False, transport.is_readonly())
342
270
 
343
271
    def test_iter_files_recursive(self):
360
288
class ChrootDecoratorTransportTest(TestCase):
361
289
    """Chroot decoration specific tests."""
362
290
 
363
 
    def test_abspath(self):
364
 
        # The abspath is always relative to the chroot_url.
365
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
367
 
        transport = get_transport(server.get_url())
368
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
 
 
370
 
        subdir_transport = transport.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
373
 
 
374
 
    def test_clone(self):
375
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
377
 
        transport = get_transport(server.get_url())
378
 
        # relpath from root and root path are the same
379
 
        relpath_cloned = transport.clone('foo')
380
 
        abspath_cloned = transport.clone('/foo')
381
 
        self.assertEqual(server, relpath_cloned.server)
382
 
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
 
 
385
 
    def test_chroot_url_preserves_chroot(self):
386
 
        """Calling get_transport on a chroot transport's base should produce a
387
 
        transport with exactly the same behaviour as the original chroot
388
 
        transport.
389
 
 
390
 
        This is so that it is not possible to escape a chroot by doing::
391
 
            url = chroot_transport.base
392
 
            parent_url = urlutils.join(url, '..')
393
 
            new_transport = get_transport(parent_url)
394
 
        """
395
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
397
 
        transport = get_transport(server.get_url())
398
 
        new_transport = get_transport(transport.base)
399
 
        self.assertEqual(transport.server, new_transport.server)
400
 
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
402
 
 
403
 
    def test_urljoin_preserves_chroot(self):
404
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
405
 
        URL that escapes the intended chroot.
406
 
 
407
 
        This is so that it is not possible to escape a chroot by doing::
408
 
            url = chroot_transport.base
409
 
            parent_url = urlutils.join(url, '..')
410
 
            new_transport = get_transport(parent_url)
411
 
        """
412
 
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
414
 
        transport = get_transport(server.get_url())
415
 
        self.assertRaises(
416
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
 
 
419
 
 
420
 
class ChrootServerTest(TestCase):
421
 
 
422
291
    def test_construct(self):
423
 
        backing_transport = MemoryTransport()
424
 
        server = ChrootServer(backing_transport)
425
 
        self.assertEqual(backing_transport, server.backing_transport)
426
 
 
427
 
    def test_setUp(self):
428
 
        backing_transport = MemoryTransport()
429
 
        server = ChrootServer(backing_transport)
430
 
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
432
 
 
433
 
    def test_tearDown(self):
434
 
        backing_transport = MemoryTransport()
435
 
        server = ChrootServer(backing_transport)
436
 
        server.setUp()
437
 
        server.tearDown()
438
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
439
 
 
440
 
    def test_get_url(self):
441
 
        backing_transport = MemoryTransport()
442
 
        server = ChrootServer(backing_transport)
443
 
        server.setUp()
444
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        server.tearDown()
 
292
        from bzrlib.transport import chroot
 
293
        transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
 
294
        self.assertEqual('memory:///pathA/', transport.chroot_url)
 
295
 
 
296
        transport = chroot.ChrootTransportDecorator(
 
297
            'chroot+memory:///path/B', chroot='memory:///path/')
 
298
        self.assertEqual('memory:///path/', transport.chroot_url)
 
299
 
 
300
    def test_append_file(self):
 
301
        transport = get_transport('chroot+memory:///foo/bar')
 
302
        self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
 
303
 
 
304
    def test_append_bytes(self):
 
305
        transport = get_transport('chroot+memory:///foo/bar')
 
306
        self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
 
307
 
 
308
    def test_clone(self):
 
309
        transport = get_transport('chroot+memory:///foo/bar')
 
310
        self.assertRaises(PathNotChild, transport.clone, '/foo')
 
311
 
 
312
    def test_delete(self):
 
313
        transport = get_transport('chroot+memory:///foo/bar')
 
314
        self.assertRaises(PathNotChild, transport.delete, '/foo')
 
315
 
 
316
    def test_delete_tree(self):
 
317
        transport = get_transport('chroot+memory:///foo/bar')
 
318
        self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
 
319
 
 
320
    def test_get(self):
 
321
        transport = get_transport('chroot+memory:///foo/bar')
 
322
        self.assertRaises(PathNotChild, transport.get, '/foo')
 
323
 
 
324
    def test_get_bytes(self):
 
325
        transport = get_transport('chroot+memory:///foo/bar')
 
326
        self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
 
327
 
 
328
    def test_has(self):
 
329
        transport = get_transport('chroot+memory:///foo/bar')
 
330
        self.assertRaises(PathNotChild, transport.has, '/foo')
 
331
 
 
332
    def test_list_dir(self):
 
333
        transport = get_transport('chroot+memory:///foo/bar')
 
334
        self.assertRaises(PathNotChild, transport.list_dir, '/foo')
 
335
 
 
336
    def test_lock_read(self):
 
337
        transport = get_transport('chroot+memory:///foo/bar')
 
338
        self.assertRaises(PathNotChild, transport.lock_read, '/foo')
 
339
 
 
340
    def test_lock_write(self):
 
341
        transport = get_transport('chroot+memory:///foo/bar')
 
342
        self.assertRaises(PathNotChild, transport.lock_write, '/foo')
 
343
 
 
344
    def test_mkdir(self):
 
345
        transport = get_transport('chroot+memory:///foo/bar')
 
346
        self.assertRaises(PathNotChild, transport.mkdir, '/foo')
 
347
 
 
348
    def test_put_bytes(self):
 
349
        transport = get_transport('chroot+memory:///foo/bar')
 
350
        self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
 
351
 
 
352
    def test_put_file(self):
 
353
        transport = get_transport('chroot+memory:///foo/bar')
 
354
        self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
 
355
 
 
356
    def test_rename(self):
 
357
        transport = get_transport('chroot+memory:///foo/bar')
 
358
        self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
 
359
        self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
 
360
 
 
361
    def test_rmdir(self):
 
362
        transport = get_transport('chroot+memory:///foo/bar')
 
363
        self.assertRaises(PathNotChild, transport.rmdir, '/foo')
 
364
 
 
365
    def test_stat(self):
 
366
        transport = get_transport('chroot+memory:///foo/bar')
 
367
        self.assertRaises(PathNotChild, transport.stat, '/foo')
446
368
 
447
369
 
448
370
class ReadonlyDecoratorTransportTest(TestCase):
453
375
        # connect to . in readonly mode
454
376
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
377
        self.assertEqual(True, transport.listable())
 
378
        self.assertEqual(False, transport.should_cache())
456
379
        self.assertEqual(True, transport.is_readonly())
457
380
 
458
381
    def test_http_parameters(self):
459
 
        from bzrlib.tests.http_server import HttpServer
 
382
        from bzrlib.tests.HttpServer import HttpServer
460
383
        import bzrlib.transport.readonly as readonly
461
 
        # connect to '.' via http which is not listable
 
384
        # connect to . via http which is not listable
462
385
        server = HttpServer()
463
386
        server.setUp()
464
387
        try:
466
389
            self.failUnless(isinstance(transport,
467
390
                                       readonly.ReadonlyTransportDecorator))
468
391
            self.assertEqual(False, transport.listable())
 
392
            self.assertEqual(True, transport.should_cache())
469
393
            self.assertEqual(True, transport.is_readonly())
470
394
        finally:
471
395
            server.tearDown()
480
404
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
405
 
482
406
    def test_local_parameters(self):
483
 
        # the listable and is_readonly parameters
 
407
        # the listable, should_cache and is_readonly parameters
484
408
        # are not changed by the fakenfs decorator
485
409
        transport = self.get_nfs_transport('.')
486
410
        self.assertEqual(True, transport.listable())
 
411
        self.assertEqual(False, transport.should_cache())
487
412
        self.assertEqual(False, transport.is_readonly())
488
413
 
489
414
    def test_http_parameters(self):
490
 
        # the listable and is_readonly parameters
 
415
        # the listable, should_cache and is_readonly parameters
491
416
        # are not changed by the fakenfs decorator
492
 
        from bzrlib.tests.http_server import HttpServer
493
 
        # connect to '.' via http which is not listable
 
417
        from bzrlib.tests.HttpServer import HttpServer
 
418
        # connect to . via http which is not listable
494
419
        server = HttpServer()
495
420
        server.setUp()
496
421
        try:
498
423
            self.assertIsInstance(
499
424
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
425
            self.assertEqual(False, transport.listable())
 
426
            self.assertEqual(True, transport.should_cache())
501
427
            self.assertEqual(True, transport.is_readonly())
502
428
        finally:
503
429
            server.tearDown()
524
450
        transport = self.get_nfs_transport('.')
525
451
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
526
452
                        transport=transport)
527
 
        self.assertRaises(errors.ResourceBusy,
 
453
        self.assertRaises(bzrlib.errors.ResourceBusy,
528
454
                          transport.rename, 'from', 'to')
529
455
 
530
456
 
564
490
 
565
491
class TestTransportImplementation(TestCaseInTempDir):
566
492
    """Implementation verification for transports.
567
 
 
 
493
    
568
494
    To verify a transport we need a server factory, which is a callable
569
495
    that accepts no parameters and returns an implementation of
570
496
    bzrlib.transport.Server.
571
 
 
 
497
    
572
498
    That Server is then used to construct transport instances and test
573
499
    the transport via loopback activity.
574
500
 
575
 
    Currently this assumes that the Transport object is connected to the
576
 
    current working directory.  So that whatever is done
577
 
    through the transport, should show up in the working
 
501
    Currently this assumes that the Transport object is connected to the 
 
502
    current working directory.  So that whatever is done 
 
503
    through the transport, should show up in the working 
578
504
    directory, and vice-versa. This is a bug, because its possible to have
579
 
    URL schemes which provide access to something that may not be
580
 
    result in storage on the local disk, i.e. due to file system limits, or
 
505
    URL schemes which provide access to something that may not be 
 
506
    result in storage on the local disk, i.e. due to file system limits, or 
581
507
    due to it being a database or some other non-filesystem tool.
582
508
 
583
509
    This also tests to make sure that the functions work with both
584
510
    generators and lists (assuming iter(list) is effectively a generator)
585
511
    """
586
 
 
 
512
    
587
513
    def setUp(self):
588
514
        super(TestTransportImplementation, self).setUp()
589
515
        self._server = self.transport_server()
590
516
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
592
 
 
593
 
    def get_transport(self, relpath=None):
594
 
        """Return a connected transport to the local directory.
595
 
 
596
 
        :param relpath: a path relative to the base url.
597
 
        """
 
517
 
 
518
    def tearDown(self):
 
519
        super(TestTransportImplementation, self).tearDown()
 
520
        self._server.tearDown()
 
521
        
 
522
    def get_transport(self):
 
523
        """Return a connected transport to the local directory."""
598
524
        base_url = self._server.get_url()
599
 
        url = self._adjust_url(base_url, relpath)
600
525
        # try getting the transport via the regular interface:
601
 
        t = get_transport(url)
602
 
        # vila--20070607 if the following are commented out the test suite
603
 
        # still pass. Is this really still needed or was it a forgotten
604
 
        # temporary fix ?
 
526
        t = get_transport(base_url)
605
527
        if not isinstance(t, self.transport_class):
606
528
            # we did not get the correct transport class type. Override the
607
529
            # regular connection behaviour by direct construction.
608
 
            t = self.transport_class(url)
 
530
            t = self.transport_class(base_url)
609
531
        return t
610
532
 
611
533
 
612
534
class TestLocalTransports(TestCase):
613
535
 
614
536
    def test_get_transport_from_abspath(self):
615
 
        here = osutils.abspath('.')
 
537
        here = os.path.abspath('.')
616
538
        t = get_transport(here)
617
539
        self.assertIsInstance(t, LocalTransport)
618
540
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
619
541
 
620
542
    def test_get_transport_from_relpath(self):
621
 
        here = osutils.abspath('.')
 
543
        here = os.path.abspath('.')
622
544
        t = get_transport('.')
623
545
        self.assertIsInstance(t, LocalTransport)
624
546
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
625
547
 
626
548
    def test_get_transport_from_local_url(self):
627
 
        here = osutils.abspath('.')
 
549
        here = os.path.abspath('.')
628
550
        here_url = urlutils.local_path_to_url(here) + '/'
629
551
        t = get_transport(here_url)
630
552
        self.assertIsInstance(t, LocalTransport)
631
553
        self.assertEquals(t.base, here_url)
632
 
 
633
 
    def test_local_abspath(self):
634
 
        here = osutils.abspath('.')
635
 
        t = get_transport(here)
636
 
        self.assertEquals(t.local_abspath(''), here)
637
 
 
638
 
 
639
 
class TestWin32LocalTransport(TestCase):
640
 
 
641
 
    def test_unc_clone_to_root(self):
642
 
        # Win32 UNC path like \\HOST\path
643
 
        # clone to root should stop at least at \\HOST part
644
 
        # not on \\
645
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
646
 
        for i in xrange(4):
647
 
            t = t.clone('..')
648
 
        self.assertEquals(t.base, 'file://HOST/')
649
 
        # make sure we reach the root
650
 
        t = t.clone('..')
651
 
        self.assertEquals(t.base, 'file://HOST/')
652
 
 
653
 
 
654
 
class TestConnectedTransport(TestCase):
655
 
    """Tests for connected to remote server transports"""
656
 
 
657
 
    def test_parse_url(self):
658
 
        t = ConnectedTransport('http://simple.example.com/home/source')
659
 
        self.assertEquals(t._host, 'simple.example.com')
660
 
        self.assertEquals(t._port, None)
661
 
        self.assertEquals(t._path, '/home/source/')
662
 
        self.failUnless(t._user is None)
663
 
        self.failUnless(t._password is None)
664
 
 
665
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
666
 
 
667
 
    def test_parse_url_with_at_in_user(self):
668
 
        # Bug 228058
669
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
670
 
        self.assertEquals(t._user, 'user@host.com')
671
 
 
672
 
    def test_parse_quoted_url(self):
673
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
 
        self.assertEquals(t._host, 'exAmple.com')
675
 
        self.assertEquals(t._port, 2222)
676
 
        self.assertEquals(t._user, 'robey')
677
 
        self.assertEquals(t._password, 'h@t')
678
 
        self.assertEquals(t._path, '/path/')
679
 
 
680
 
        # Base should not keep track of the password
681
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
682
 
 
683
 
    def test_parse_invalid_url(self):
684
 
        self.assertRaises(errors.InvalidURL,
685
 
                          ConnectedTransport,
686
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
687
 
 
688
 
    def test_relpath(self):
689
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
690
 
 
691
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
692
 
        self.assertRaises(errors.PathNotChild, t.relpath,
693
 
                          'http://user@host.com/abs/path/sub')
694
 
        self.assertRaises(errors.PathNotChild, t.relpath,
695
 
                          'sftp://user2@host.com/abs/path/sub')
696
 
        self.assertRaises(errors.PathNotChild, t.relpath,
697
 
                          'sftp://user@otherhost.com/abs/path/sub')
698
 
        self.assertRaises(errors.PathNotChild, t.relpath,
699
 
                          'sftp://user@host.com:33/abs/path/sub')
700
 
        # Make sure it works when we don't supply a username
701
 
        t = ConnectedTransport('sftp://host.com/abs/path')
702
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
703
 
 
704
 
        # Make sure it works when parts of the path will be url encoded
705
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
706
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
707
 
 
708
 
    def test_connection_sharing_propagate_credentials(self):
709
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
710
 
        self.assertEquals('user', t._user)
711
 
        self.assertEquals('host.com', t._host)
712
 
        self.assertIs(None, t._get_connection())
713
 
        self.assertIs(None, t._password)
714
 
        c = t.clone('subdir')
715
 
        self.assertIs(None, c._get_connection())
716
 
        self.assertIs(None, t._password)
717
 
 
718
 
        # Simulate the user entering a password
719
 
        password = 'secret'
720
 
        connection = object()
721
 
        t._set_connection(connection, password)
722
 
        self.assertIs(connection, t._get_connection())
723
 
        self.assertIs(password, t._get_credentials())
724
 
        self.assertIs(connection, c._get_connection())
725
 
        self.assertIs(password, c._get_credentials())
726
 
 
727
 
        # credentials can be updated
728
 
        new_password = 'even more secret'
729
 
        c._update_credentials(new_password)
730
 
        self.assertIs(connection, t._get_connection())
731
 
        self.assertIs(new_password, t._get_credentials())
732
 
        self.assertIs(connection, c._get_connection())
733
 
        self.assertIs(new_password, c._get_credentials())
734
 
 
735
 
 
736
 
class TestReusedTransports(TestCase):
737
 
    """Tests for transport reuse"""
738
 
 
739
 
    def test_reuse_same_transport(self):
740
 
        possible_transports = []
741
 
        t1 = get_transport('http://foo/',
742
 
                           possible_transports=possible_transports)
743
 
        self.assertEqual([t1], possible_transports)
744
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
745
 
        self.assertIs(t1, t2)
746
 
 
747
 
        # Also check that final '/' are handled correctly
748
 
        t3 = get_transport('http://foo/path/')
749
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
750
 
        self.assertIs(t3, t4)
751
 
 
752
 
        t5 = get_transport('http://foo/path')
753
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
754
 
        self.assertIs(t5, t6)
755
 
 
756
 
    def test_don_t_reuse_different_transport(self):
757
 
        t1 = get_transport('http://foo/path')
758
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
759
 
        self.assertIsNot(t1, t2)
760
 
 
761
 
 
762
 
class TestTransportTrace(TestCase):
763
 
 
764
 
    def test_get(self):
765
 
        transport = get_transport('trace+memory://')
766
 
        self.assertIsInstance(
767
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
768
 
 
769
 
    def test_clone_preserves_activity(self):
770
 
        transport = get_transport('trace+memory://')
771
 
        transport2 = transport.clone('.')
772
 
        self.assertTrue(transport is not transport2)
773
 
        self.assertTrue(transport._activity is transport2._activity)
774
 
 
775
 
    # the following specific tests are for the operations that have made use of
776
 
    # logging in tests; we could test every single operation but doing that
777
 
    # still won't cause a test failure when the top level Transport API
778
 
    # changes; so there is little return doing that.
779
 
    def test_get(self):
780
 
        transport = get_transport('trace+memory:///')
781
 
        transport.put_bytes('foo', 'barish')
782
 
        transport.get('foo')
783
 
        expected_result = []
784
 
        # put_bytes records the bytes, not the content to avoid memory
785
 
        # pressure.
786
 
        expected_result.append(('put_bytes', 'foo', 6, None))
787
 
        # get records the file name only.
788
 
        expected_result.append(('get', 'foo'))
789
 
        self.assertEqual(expected_result, transport._activity)
790
 
 
791
 
    def test_readv(self):
792
 
        transport = get_transport('trace+memory:///')
793
 
        transport.put_bytes('foo', 'barish')
794
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
795
 
            upper_limit=6))
796
 
        expected_result = []
797
 
        # put_bytes records the bytes, not the content to avoid memory
798
 
        # pressure.
799
 
        expected_result.append(('put_bytes', 'foo', 6, None))
800
 
        # readv records the supplied offset request
801
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
 
        self.assertEqual(expected_result, transport._activity)