~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-01 08:02:42 UTC
  • mfrom: (5390.3.3 faster-revert-593560)
  • Revision ID: pqm@pqm.ubuntu.com-20100901080242-esg62ody4frwmy66
(spiv) Avoid repeatedly calling self.target.all_file_ids() in
 InterTree.iter_changes. (Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
from cStringIO import StringIO
19
 
import errno
20
19
import os
21
20
import subprocess
22
21
import sys
29
28
    transport,
30
29
    urlutils,
31
30
    )
32
 
from bzrlib.directory_service import directories
33
31
from bzrlib.transport import (
34
32
    chroot,
35
33
    fakenfs,
36
 
    http,
37
34
    local,
38
 
    location_to_url,
39
35
    memory,
40
36
    pathfilter,
41
37
    readonly,
42
38
    )
43
 
import bzrlib.transport.trace
44
39
from bzrlib.tests import (
45
40
    features,
46
41
    test_server,
53
48
class TestTransport(tests.TestCase):
54
49
    """Test the non transport-concrete class functionality."""
55
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
56
54
    def test__get_set_protocol_handlers(self):
57
55
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys())
59
 
        transport._clear_protocol_handlers()
60
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
61
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
56
        self.assertNotEqual([], handlers.keys( ))
 
57
        try:
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
60
        finally:
 
61
            transport._set_protocol_handlers(handlers)
62
62
 
63
63
    def test_get_transport_modules(self):
64
64
        handlers = transport._get_protocol_handlers()
65
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
65
        # don't pollute the current handlers
67
66
        transport._clear_protocol_handlers()
68
 
 
69
67
        class SampleHandler(object):
70
68
            """I exist, isnt that enough?"""
71
 
        transport._clear_protocol_handlers()
72
 
        transport.register_transport_proto('foo')
73
 
        transport.register_lazy_transport('foo',
74
 
                                            'bzrlib.tests.test_transport',
75
 
                                            'TestTransport.SampleHandler')
76
 
        transport.register_transport_proto('bar')
77
 
        transport.register_lazy_transport('bar',
78
 
                                            'bzrlib.tests.test_transport',
79
 
                                            'TestTransport.SampleHandler')
80
 
        self.assertEqual([SampleHandler.__module__,
81
 
                            'bzrlib.transport.chroot',
82
 
                            'bzrlib.transport.pathfilter'],
83
 
                            transport._get_transport_modules())
 
69
        try:
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__,
 
80
                              'bzrlib.transport.chroot',
 
81
                              'bzrlib.transport.pathfilter'],
 
82
                             transport._get_transport_modules())
 
83
        finally:
 
84
            transport._set_protocol_handlers(handlers)
84
85
 
85
86
    def test_transport_dependency(self):
86
87
        """Transport with missing dependency causes no error"""
87
88
        saved_handlers = transport._get_protocol_handlers()
88
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
89
        # don't pollute the current handlers
90
90
        transport._clear_protocol_handlers()
91
 
        transport.register_transport_proto('foo')
92
 
        transport.register_lazy_transport(
93
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
94
91
        try:
95
 
            transport.get_transport_from_url('foo://fooserver/foo')
96
 
        except errors.UnsupportedProtocol, e:
97
 
            e_str = str(e)
98
 
            self.assertEqual('Unsupported protocol'
99
 
                                ' for url "foo://fooserver/foo":'
100
 
                                ' Unable to import library "some_lib":'
101
 
                                ' testing missing dependency', str(e))
102
 
        else:
103
 
            self.fail('Did not raise UnsupportedProtocol')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
95
            try:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
 
98
                e_str = str(e)
 
99
                self.assertEquals('Unsupported protocol'
 
100
                                  ' for url "foo://fooserver/foo":'
 
101
                                  ' Unable to import library "some_lib":'
 
102
                                  ' testing missing dependency', str(e))
 
103
            else:
 
104
                self.fail('Did not raise UnsupportedProtocol')
 
105
        finally:
 
106
            # restore original values
 
107
            transport._set_protocol_handlers(saved_handlers)
104
108
 
105
109
    def test_transport_fallback(self):
106
110
        """Transport with missing dependency causes no error"""
107
111
        saved_handlers = transport._get_protocol_handlers()
108
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
109
 
        transport._clear_protocol_handlers()
110
 
        transport.register_transport_proto('foo')
111
 
        transport.register_lazy_transport(
112
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
113
 
        transport.register_lazy_transport(
114
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
115
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
116
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
        try:
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
 
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
121
        finally:
 
122
            transport._set_protocol_handlers(saved_handlers)
117
123
 
118
124
    def test_ssh_hints(self):
119
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
120
126
        try:
121
 
            transport.get_transport_from_url('ssh://fooserver/foo')
 
127
            transport.get_transport('ssh://fooserver/foo')
122
128
        except errors.UnsupportedProtocol, e:
123
129
            e_str = str(e)
124
 
            self.assertEqual('Unsupported protocol'
 
130
            self.assertEquals('Unsupported protocol'
125
131
                              ' for url "ssh://fooserver/foo":'
126
132
                              ' bzr supports bzr+ssh to operate over ssh,'
127
133
                              ' use "bzr+ssh://fooserver/foo".',
139
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
140
146
        a_file.close()
141
147
 
 
148
    def test__combine_paths(self):
 
149
        t = transport.Transport('/')
 
150
        self.assertEqual('/home/sarah/project/foo',
 
151
                         t._combine_paths('/home/sarah', 'project/foo'))
 
152
        self.assertEqual('/etc',
 
153
                         t._combine_paths('/home/sarah', '../../etc'))
 
154
        self.assertEqual('/etc',
 
155
                         t._combine_paths('/home/sarah', '../../../etc'))
 
156
        self.assertEqual('/etc',
 
157
                         t._combine_paths('/home/sarah', '/etc'))
 
158
 
142
159
    def test_local_abspath_non_local_transport(self):
143
160
        # the base implementation should throw
144
161
        t = memory.MemoryTransport()
201
218
 
202
219
    def test_coalesce_fudge(self):
203
220
        self.check([(10, 30, [(0, 10), (20, 10)]),
204
 
                    (100, 10, [(0, 10)]),
 
221
                    (100, 10, [(0, 10),]),
205
222
                   ], [(10, 10), (30, 10), (100, 10)],
206
 
                   fudge=10)
207
 
 
 
223
                   fudge=10
 
224
                  )
208
225
    def test_coalesce_max_size(self):
209
226
        self.check([(10, 20, [(0, 10), (10, 10)]),
210
227
                    (30, 50, [(0, 50)]),
211
228
                    # If one range is above max_size, it gets its own coalesced
212
229
                    # offset
213
 
                    (100, 80, [(0, 80)]),],
 
230
                    (100, 80, [(0, 80),]),],
214
231
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
215
 
                   max_size=50)
 
232
                   max_size=50
 
233
                  )
216
234
 
217
235
    def test_coalesce_no_max_size(self):
218
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
236
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
219
237
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
220
238
                  )
221
239
 
222
240
    def test_coalesce_default_limit(self):
223
241
        # By default we use a 100MB max size.
224
 
        ten_mb = 10 * 1024 * 1024
225
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
242
        ten_mb = 10*1024*1024
 
243
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
226
244
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
227
245
                   [(i*ten_mb, ten_mb) for i in range(11)])
228
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
246
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)],
230
248
                   max_size=1*1024*1024*1024)
231
249
 
232
250
 
237
255
        server.start_server()
238
256
        url = server.get_url()
239
257
        self.assertTrue(url in transport.transport_list_registry)
240
 
        t = transport.get_transport_from_url(url)
 
258
        t = transport.get_transport(url)
241
259
        del t
242
260
        server.stop_server()
243
261
        self.assertFalse(url in transport.transport_list_registry)
298
316
 
299
317
    def test_has_missing(self):
300
318
        t = memory.MemoryTransport()
301
 
        self.assertEqual(False, t.has('foo'))
 
319
        self.assertEquals(False, t.has('foo'))
302
320
 
303
321
    def test_has_present(self):
304
322
        t = memory.MemoryTransport()
305
323
        t.append_bytes('foo', 'content')
306
 
        self.assertEqual(True, t.has('foo'))
 
324
        self.assertEquals(True, t.has('foo'))
307
325
 
308
326
    def test_list_dir(self):
309
327
        t = memory.MemoryTransport()
312
330
        t.put_bytes('dir/subfoo', 'content')
313
331
        t.put_bytes('dirlike', 'content')
314
332
 
315
 
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
316
 
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
 
333
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
334
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
317
335
 
318
336
    def test_mkdir(self):
319
337
        t = memory.MemoryTransport()
358
376
    def test_abspath(self):
359
377
        # The abspath is always relative to the chroot_url.
360
378
        server = chroot.ChrootServer(
361
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
379
            transport.get_transport('memory:///foo/bar/'))
362
380
        self.start_server(server)
363
 
        t = transport.get_transport_from_url(server.get_url())
 
381
        t = transport.get_transport(server.get_url())
364
382
        self.assertEqual(server.get_url(), t.abspath('/'))
365
383
 
366
384
        subdir_t = t.clone('subdir')
368
386
 
369
387
    def test_clone(self):
370
388
        server = chroot.ChrootServer(
371
 
            transport.get_transport_from_url('memory:///foo/bar/'))
 
389
            transport.get_transport('memory:///foo/bar/'))
372
390
        self.start_server(server)
373
 
        t = transport.get_transport_from_url(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
374
392
        # relpath from root and root path are the same
375
393
        relpath_cloned = t.clone('foo')
376
394
        abspath_cloned = t.clone('/foo')
385
403
        This is so that it is not possible to escape a chroot by doing::
386
404
            url = chroot_transport.base
387
405
            parent_url = urlutils.join(url, '..')
388
 
            new_t = transport.get_transport_from_url(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
389
407
        """
390
408
        server = chroot.ChrootServer(
391
 
            transport.get_transport_from_url('memory:///path/subpath'))
 
409
            transport.get_transport('memory:///path/subpath'))
392
410
        self.start_server(server)
393
 
        t = transport.get_transport_from_url(server.get_url())
394
 
        new_t = transport.get_transport_from_url(t.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
395
413
        self.assertEqual(t.server, new_t.server)
396
414
        self.assertEqual(t.base, new_t.base)
397
415
 
402
420
        This is so that it is not possible to escape a chroot by doing::
403
421
            url = chroot_transport.base
404
422
            parent_url = urlutils.join(url, '..')
405
 
            new_t = transport.get_transport_from_url(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
406
424
        """
407
 
        server = chroot.ChrootServer(
408
 
            transport.get_transport_from_url('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
409
426
        self.start_server(server)
410
 
        t = transport.get_transport_from_url(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
411
428
        self.assertRaises(
412
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
413
430
 
423
440
        backing_transport = memory.MemoryTransport()
424
441
        server = chroot.ChrootServer(backing_transport)
425
442
        server.start_server()
426
 
        self.addCleanup(server.stop_server)
427
 
        self.assertTrue(server.scheme
428
 
                        in transport._get_protocol_handlers().keys())
 
443
        try:
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
 
446
        finally:
 
447
            server.stop_server()
429
448
 
430
449
    def test_stop_server(self):
431
450
        backing_transport = memory.MemoryTransport()
439
458
        backing_transport = memory.MemoryTransport()
440
459
        server = chroot.ChrootServer(backing_transport)
441
460
        server.start_server()
442
 
        self.addCleanup(server.stop_server)
443
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
444
 
 
445
 
 
446
 
class TestHooks(tests.TestCase):
447
 
    """Basic tests for transport hooks"""
448
 
 
449
 
    def _get_connected_transport(self):
450
 
        return transport.ConnectedTransport("bogus:nowhere")
451
 
 
452
 
    def test_transporthooks_initialisation(self):
453
 
        """Check all expected transport hook points are set up"""
454
 
        hookpoint = transport.TransportHooks()
455
 
        self.assertTrue("post_connect" in hookpoint,
456
 
            "post_connect not in %s" % (hookpoint,))
457
 
 
458
 
    def test_post_connect(self):
459
 
        """Ensure the post_connect hook is called when _set_transport is"""
460
 
        calls = []
461
 
        transport.Transport.hooks.install_named_hook("post_connect",
462
 
            calls.append, None)
463
 
        t = self._get_connected_transport()
464
 
        self.assertLength(0, calls)
465
 
        t._set_connection("connection", "auth")
466
 
        self.assertEqual(calls, [t])
 
461
        try:
 
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
463
        finally:
 
464
            server.stop_server()
467
465
 
468
466
 
469
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
472
470
    def test_abspath(self):
473
471
        # The abspath is always relative to the base of the backing transport.
474
472
        server = pathfilter.PathFilteringServer(
475
 
            transport.get_transport_from_url('memory:///foo/bar/'),
 
473
            transport.get_transport('memory:///foo/bar/'),
476
474
            lambda x: x)
477
475
        server.start_server()
478
 
        t = transport.get_transport_from_url(server.get_url())
 
476
        t = transport.get_transport(server.get_url())
479
477
        self.assertEqual(server.get_url(), t.abspath('/'))
480
478
 
481
479
        subdir_t = t.clone('subdir')
484
482
 
485
483
    def make_pf_transport(self, filter_func=None):
486
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
487
 
 
 
485
        
488
486
        :param filter_func: by default this will be a no-op function.  Use this
489
487
            parameter to override it."""
490
488
        if filter_func is None:
491
489
            filter_func = lambda x: x
492
490
        server = pathfilter.PathFilteringServer(
493
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
494
492
        server.start_server()
495
493
        self.addCleanup(server.stop_server)
496
 
        return transport.get_transport_from_url(server.get_url())
 
494
        return transport.get_transport(server.get_url())
497
495
 
498
496
    def test__filter(self):
499
497
        # _filter (with an identity func as filter_func) always returns
512
510
 
513
511
    def test_filter_invocation(self):
514
512
        filter_log = []
515
 
 
516
513
        def filter(path):
517
514
            filter_log.append(path)
518
515
            return path
543
540
        otherwise) the filtering by doing::
544
541
            url = filtered_transport.base
545
542
            parent_url = urlutils.join(url, '..')
546
 
            new_t = transport.get_transport_from_url(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
547
544
        """
548
545
        t = self.make_pf_transport()
549
 
        new_t = transport.get_transport_from_url(t.base)
 
546
        new_t = transport.get_transport(t.base)
550
547
        self.assertEqual(t.server, new_t.server)
551
548
        self.assertEqual(t.base, new_t.base)
552
549
 
565
562
        # connect to '.' via http which is not listable
566
563
        server = HttpServer()
567
564
        self.start_server(server)
568
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
569
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
570
567
        self.assertEqual(False, t.listable())
571
568
        self.assertEqual(True, t.is_readonly())
572
569
 
604
601
        # the url should be decorated appropriately
605
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
606
603
        # and we should be able to get a transport for it
607
 
        t = transport.get_transport_from_url(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
608
605
        # which must be a FakeNFSTransportDecorator instance.
609
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
610
607
 
687
684
        base_url = self._server.get_url()
688
685
        url = self._adjust_url(base_url, relpath)
689
686
        # try getting the transport via the regular interface:
690
 
        t = transport.get_transport_from_url(url)
 
687
        t = transport.get_transport(url)
691
688
        # vila--20070607 if the following are commented out the test suite
692
689
        # still pass. Is this really still needed or was it a forgotten
693
690
        # temporary fix ?
698
695
        return t
699
696
 
700
697
 
701
 
class TestTransportFromPath(tests.TestCaseInTempDir):
702
 
 
703
 
    def test_with_path(self):
704
 
        t = transport.get_transport_from_path(self.test_dir)
705
 
        self.assertIsInstance(t, local.LocalTransport)
706
 
        self.assertEqual(t.base.rstrip("/"),
707
 
            urlutils.local_path_to_url(self.test_dir))
708
 
 
709
 
    def test_with_url(self):
710
 
        t = transport.get_transport_from_path("file:")
711
 
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEqual(t.base.rstrip("/"),
713
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
714
 
 
715
 
 
716
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
717
 
 
718
 
    def test_with_path(self):
719
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
720
 
            self.test_dir)
721
 
 
722
 
    def test_with_url(self):
723
 
        url = urlutils.local_path_to_url(self.test_dir)
724
 
        t = transport.get_transport_from_url(url)
725
 
        self.assertIsInstance(t, local.LocalTransport)
726
 
        self.assertEqual(t.base.rstrip("/"), url)
727
 
 
728
 
    def test_with_url_and_segment_parameters(self):
729
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
730
 
        t = transport.get_transport_from_url(url)
731
 
        self.assertIsInstance(t, local.LocalTransport)
732
 
        self.assertEqual(t.base.rstrip("/"), url)
733
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
734
 
            f.write("data")
735
 
        self.assertTrue(t.has("afile"))
736
 
 
737
 
 
738
698
class TestLocalTransports(tests.TestCase):
739
699
 
740
700
    def test_get_transport_from_abspath(self):
741
701
        here = osutils.abspath('.')
742
702
        t = transport.get_transport(here)
743
703
        self.assertIsInstance(t, local.LocalTransport)
744
 
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
 
704
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
745
705
 
746
706
    def test_get_transport_from_relpath(self):
747
707
        here = osutils.abspath('.')
748
708
        t = transport.get_transport('.')
749
709
        self.assertIsInstance(t, local.LocalTransport)
750
 
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
 
710
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
751
711
 
752
712
    def test_get_transport_from_local_url(self):
753
713
        here = osutils.abspath('.')
754
714
        here_url = urlutils.local_path_to_url(here) + '/'
755
715
        t = transport.get_transport(here_url)
756
716
        self.assertIsInstance(t, local.LocalTransport)
757
 
        self.assertEqual(t.base, here_url)
 
717
        self.assertEquals(t.base, here_url)
758
718
 
759
719
    def test_local_abspath(self):
760
720
        here = osutils.abspath('.')
761
721
        t = transport.get_transport(here)
762
 
        self.assertEqual(t.local_abspath(''), here)
763
 
 
764
 
 
765
 
class TestLocalTransportMutation(tests.TestCaseInTempDir):
766
 
 
767
 
    def test_local_transport_mkdir(self):
768
 
        here = osutils.abspath('.')
769
 
        t = transport.get_transport(here)
770
 
        t.mkdir('test')
771
 
        self.assertTrue(os.path.exists('test'))
772
 
 
773
 
    def test_local_transport_mkdir_permission_denied(self):
774
 
        # See https://bugs.launchpad.net/bzr/+bug/606537
775
 
        here = osutils.abspath('.')
776
 
        t = transport.get_transport(here)
777
 
        def fake_chmod(path, mode):
778
 
            e = OSError('permission denied')
779
 
            e.errno = errno.EPERM
780
 
            raise e
781
 
        self.overrideAttr(os, 'chmod', fake_chmod)
782
 
        t.mkdir('test')
783
 
        t.mkdir('test2', mode=0707)
784
 
        self.assertTrue(os.path.exists('test'))
785
 
        self.assertTrue(os.path.exists('test2'))
786
 
 
787
 
 
788
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
789
 
 
790
 
    def test_local_fdatasync_calls_fdatasync(self):
791
 
        """Check fdatasync on a stream tries to flush the data to the OS.
792
 
        
793
 
        We can't easily observe the external effect but we can at least see
794
 
        it's called.
795
 
        """
796
 
        sentinel = object()
797
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
798
 
        if fdatasync is sentinel:
799
 
            raise tests.TestNotApplicable('fdatasync not supported')
800
 
        t = self.get_transport('.')
801
 
        calls = self.recordCalls(os, 'fdatasync')
802
 
        w = t.open_write_stream('out')
803
 
        w.write('foo')
804
 
        w.fdatasync()
805
 
        with open('out', 'rb') as f:
806
 
            # Should have been flushed.
807
 
            self.assertEqual(f.read(), 'foo')
808
 
        self.assertEqual(len(calls), 1, calls)
809
 
 
810
 
    def test_missing_directory(self):
811
 
        t = self.get_transport('.')
812
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
722
        self.assertEquals(t.local_abspath(''), here)
813
723
 
814
724
 
815
725
class TestWin32LocalTransport(tests.TestCase):
816
726
 
817
727
    def test_unc_clone_to_root(self):
818
 
        self.requireFeature(features.win32_feature)
819
728
        # Win32 UNC path like \\HOST\path
820
729
        # clone to root should stop at least at \\HOST part
821
730
        # not on \\
822
731
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
823
732
        for i in xrange(4):
824
733
            t = t.clone('..')
825
 
        self.assertEqual(t.base, 'file://HOST/')
 
734
        self.assertEquals(t.base, 'file://HOST/')
826
735
        # make sure we reach the root
827
736
        t = t.clone('..')
828
 
        self.assertEqual(t.base, 'file://HOST/')
 
737
        self.assertEquals(t.base, 'file://HOST/')
829
738
 
830
739
 
831
740
class TestConnectedTransport(tests.TestCase):
834
743
    def test_parse_url(self):
835
744
        t = transport.ConnectedTransport(
836
745
            'http://simple.example.com/home/source')
837
 
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
838
 
        self.assertEqual(t._parsed_url.port, None)
839
 
        self.assertEqual(t._parsed_url.path, '/home/source/')
840
 
        self.assertTrue(t._parsed_url.user is None)
841
 
        self.assertTrue(t._parsed_url.password is None)
 
746
        self.assertEquals(t._host, 'simple.example.com')
 
747
        self.assertEquals(t._port, None)
 
748
        self.assertEquals(t._path, '/home/source/')
 
749
        self.failUnless(t._user is None)
 
750
        self.failUnless(t._password is None)
842
751
 
843
 
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
 
752
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
844
753
 
845
754
    def test_parse_url_with_at_in_user(self):
846
755
        # Bug 228058
847
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
848
 
        self.assertEqual(t._parsed_url.user, 'user@host.com')
 
757
        self.assertEquals(t._user, 'user@host.com')
849
758
 
850
759
    def test_parse_quoted_url(self):
851
760
        t = transport.ConnectedTransport(
852
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
853
 
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
854
 
        self.assertEqual(t._parsed_url.port, 2222)
855
 
        self.assertEqual(t._parsed_url.user, 'robey')
856
 
        self.assertEqual(t._parsed_url.password, 'h@t')
857
 
        self.assertEqual(t._parsed_url.path, '/path/')
 
762
        self.assertEquals(t._host, 'exAmple.com')
 
763
        self.assertEquals(t._port, 2222)
 
764
        self.assertEquals(t._user, 'robey')
 
765
        self.assertEquals(t._password, 'h@t')
 
766
        self.assertEquals(t._path, '/path/')
858
767
 
859
768
        # Base should not keep track of the password
860
 
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
769
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
861
770
 
862
771
    def test_parse_invalid_url(self):
863
772
        self.assertRaises(errors.InvalidURL,
867
776
    def test_relpath(self):
868
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
869
778
 
870
 
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
871
 
            'sub')
 
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
872
780
        self.assertRaises(errors.PathNotChild, t.relpath,
873
781
                          'http://user@host.com/abs/path/sub')
874
782
        self.assertRaises(errors.PathNotChild, t.relpath,
879
787
                          'sftp://user@host.com:33/abs/path/sub')
880
788
        # Make sure it works when we don't supply a username
881
789
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
882
 
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
790
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
883
791
 
884
792
        # Make sure it works when parts of the path will be url encoded
885
793
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
886
 
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
794
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
887
795
 
888
796
    def test_connection_sharing_propagate_credentials(self):
889
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
890
 
        self.assertEqual('user', t._parsed_url.user)
891
 
        self.assertEqual('host.com', t._parsed_url.host)
 
798
        self.assertEquals('user', t._user)
 
799
        self.assertEquals('host.com', t._host)
892
800
        self.assertIs(None, t._get_connection())
893
 
        self.assertIs(None, t._parsed_url.password)
 
801
        self.assertIs(None, t._password)
894
802
        c = t.clone('subdir')
895
803
        self.assertIs(None, c._get_connection())
896
 
        self.assertIs(None, t._parsed_url.password)
 
804
        self.assertIs(None, t._password)
897
805
 
898
806
        # Simulate the user entering a password
899
807
        password = 'secret'
918
826
 
919
827
    def test_reuse_same_transport(self):
920
828
        possible_transports = []
921
 
        t1 = transport.get_transport_from_url('http://foo/',
 
829
        t1 = transport.get_transport('http://foo/',
922
830
                                     possible_transports=possible_transports)
923
831
        self.assertEqual([t1], possible_transports)
924
 
        t2 = transport.get_transport_from_url('http://foo/',
 
832
        t2 = transport.get_transport('http://foo/',
925
833
                                     possible_transports=[t1])
926
834
        self.assertIs(t1, t2)
927
835
 
928
836
        # Also check that final '/' are handled correctly
929
 
        t3 = transport.get_transport_from_url('http://foo/path/')
930
 
        t4 = transport.get_transport_from_url('http://foo/path',
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
931
839
                                     possible_transports=[t3])
932
840
        self.assertIs(t3, t4)
933
841
 
934
 
        t5 = transport.get_transport_from_url('http://foo/path')
935
 
        t6 = transport.get_transport_from_url('http://foo/path/',
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
936
844
                                     possible_transports=[t5])
937
845
        self.assertIs(t5, t6)
938
846
 
939
847
    def test_don_t_reuse_different_transport(self):
940
 
        t1 = transport.get_transport_from_url('http://foo/path')
941
 
        t2 = transport.get_transport_from_url('http://bar/path',
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
942
850
                                     possible_transports=[t1])
943
851
        self.assertIsNot(t1, t2)
944
852
 
945
853
 
946
854
class TestTransportTrace(tests.TestCase):
947
855
 
948
 
    def test_decorator(self):
949
 
        t = transport.get_transport_from_url('trace+memory://')
950
 
        self.assertIsInstance(
951
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
856
    def test_get(self):
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
952
859
 
953
860
    def test_clone_preserves_activity(self):
954
 
        t = transport.get_transport_from_url('trace+memory://')
 
861
        t = transport.get_transport('trace+memory://')
955
862
        t2 = t.clone('.')
956
863
        self.assertTrue(t is not t2)
957
864
        self.assertTrue(t._activity is t2._activity)
961
868
    # still won't cause a test failure when the top level Transport API
962
869
    # changes; so there is little return doing that.
963
870
    def test_get(self):
964
 
        t = transport.get_transport_from_url('trace+memory:///')
 
871
        t = transport.get_transport('trace+memory:///')
965
872
        t.put_bytes('foo', 'barish')
966
873
        t.get('foo')
967
874
        expected_result = []
973
880
        self.assertEqual(expected_result, t._activity)
974
881
 
975
882
    def test_readv(self):
976
 
        t = transport.get_transport_from_url('trace+memory:///')
 
883
        t = transport.get_transport('trace+memory:///')
977
884
        t.put_bytes('foo', 'barish')
978
885
        list(t.readv('foo', [(0, 1), (3, 2)],
979
886
                     adjust_for_latency=True, upper_limit=6))
989
896
class TestSSHConnections(tests.TestCaseWithTransport):
990
897
 
991
898
    def test_bzr_connect_to_bzr_ssh(self):
992
 
        """get_transport of a bzr+ssh:// behaves correctly.
 
899
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
993
900
 
994
901
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
995
902
        """
 
903
        raise tests.TestSkipped('this test was recently broken,'
 
904
                                ' see bug #626876')
996
905
        # This test actually causes a bzr instance to be invoked, which is very
997
906
        # expensive: it should be the only such test in the test suite.
998
907
        # A reasonable evolution for this would be to simply check inside
1011
920
        # SSH channel ourselves.  Surely this has already been implemented
1012
921
        # elsewhere?
1013
922
        started = []
1014
 
 
1015
923
        class StubSSHServer(stub_sftp.StubServer):
1016
924
 
1017
925
            test = self
1023
931
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1024
932
 
1025
933
                # XXX: horribly inefficient, not to mention ugly.
1026
 
                # Start a thread for each of stdin/out/err, and relay bytes
1027
 
                # from the subprocess to channel and vice versa.
 
934
                # Start a thread for each of stdin/out/err, and relay bytes from
 
935
                # the subprocess to channel and vice versa.
1028
936
                def ferry_bytes(read, write, close):
1029
937
                    while True:
1030
938
                        bytes = read(1)
1060
968
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1061
969
        else:
1062
970
            bzr_remote_path = self.get_bzr_path()
1063
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
971
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
1064
972
 
1065
973
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1066
974
        # variable is used to tell bzr what command to run on the remote end.
1087
995
        # And the rest are threads
1088
996
        for t in started[1:]:
1089
997
            t.join()
1090
 
 
1091
 
 
1092
 
class TestUnhtml(tests.TestCase):
1093
 
 
1094
 
    """Tests for unhtml_roughly"""
1095
 
 
1096
 
    def test_truncation(self):
1097
 
        fake_html = "<p>something!\n" * 1000
1098
 
        result = http.unhtml_roughly(fake_html)
1099
 
        self.assertEqual(len(result), 1000)
1100
 
        self.assertStartsWith(result, " something!")
1101
 
 
1102
 
 
1103
 
class SomeDirectory(object):
1104
 
 
1105
 
    def look_up(self, name, url):
1106
 
        return "http://bar"
1107
 
 
1108
 
 
1109
 
class TestLocationToUrl(tests.TestCase):
1110
 
 
1111
 
    def get_base_location(self):
1112
 
        path = osutils.abspath('/foo/bar')
1113
 
        if path.startswith('/'):
1114
 
            url = 'file://%s' % (path,)
1115
 
        else:
1116
 
            # On Windows, abspaths start with the drive letter, so we have to
1117
 
            # add in the extra '/'
1118
 
            url = 'file:///%s' % (path,)
1119
 
        return path, url
1120
 
 
1121
 
    def test_regular_url(self):
1122
 
        self.assertEqual("file://foo", location_to_url("file://foo"))
1123
 
 
1124
 
    def test_directory(self):
1125
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1126
 
        self.addCleanup(directories.remove, "bar:")
1127
 
        self.assertEqual("http://bar", location_to_url("bar:"))
1128
 
 
1129
 
    def test_unicode_url(self):
1130
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1131
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1132
 
 
1133
 
    def test_unicode_path(self):
1134
 
        path, url = self.get_base_location()
1135
 
        location = path + "\xc3\xaf".decode("utf-8")
1136
 
        url += '%C3%AF'
1137
 
        self.assertEqual(url, location_to_url(location))
1138
 
 
1139
 
    def test_path(self):
1140
 
        path, url = self.get_base_location()
1141
 
        self.assertEqual(url, location_to_url(path))
1142
 
 
1143
 
    def test_relative_file_url(self):
1144
 
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1145
 
            location_to_url("file:bar"))
1146
 
 
1147
 
    def test_absolute_file_url(self):
1148
 
        self.assertEqual("file:///bar", location_to_url("file:/bar"))