~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-18 13:27:08 UTC
  • mfrom: (4011.4.3 ssh-hints)
  • Revision ID: pqm@pqm.ubuntu.com-20090218132708-okubrahz9exvae9r
(Jelmer) Point out bzr+ssh:// to the user when they use ssh://.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
 
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
 
54
 
 
55
 
 
56
class TestTransport(TestCase):
 
57
    """Test the non transport-concrete class functionality."""
 
58
 
 
59
    def test__get_set_protocol_handlers(self):
 
60
        handlers = _get_protocol_handlers()
 
61
        self.assertNotEqual([], handlers.keys( ))
 
62
        try:
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
 
67
 
 
68
    def test_get_transport_modules(self):
 
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
 
72
        class SampleHandler(object):
 
73
            """I exist, isnt that enough?"""
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
 
85
        finally:
 
86
            _set_protocol_handlers(handlers)
 
87
 
 
88
    def test_transport_dependency(self):
 
89
        """Transport with missing dependency causes no error"""
 
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
 
93
        try:
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
 
110
            
 
111
    def test_transport_fallback(self):
 
112
        """Transport with missing dependency causes no error"""
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
 
125
 
 
126
    def test_ssh_hints(self):
 
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
128
        try:
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
 
131
            e_str = str(e)
 
132
            self.assertEquals('Unsupported protocol'
 
133
                              ' for url "ssh://fooserver/foo":'
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".', 
 
135
                              str(e))
 
136
        else:
 
137
            self.fail('Did not raise UnsupportedProtocol')
 
138
 
 
139
    def test_LateReadError(self):
 
140
        """The LateReadError helper should raise on read()."""
 
141
        a_file = LateReadError('a path')
 
142
        try:
 
143
            a_file.read()
 
144
        except ReadError, error:
 
145
            self.assertEqual('a path', error.path)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
 
147
        a_file.close()
 
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
 
160
    def test_local_abspath_non_local_transport(self):
 
161
        # the base implementation should throw
 
162
        t = MemoryTransport()
 
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
164
        self.assertEqual('memory:///t is not a local path.', str(e))
 
165
 
 
166
 
 
167
class TestCoalesceOffsets(TestCase):
 
168
 
 
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
170
        coalesce = Transport._coalesce_offsets
 
171
        exp = [_CoalescedOffset(*x) for x in expected]
 
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
173
                            max_size=max_size))
 
174
        self.assertEqual(exp, out)
 
175
 
 
176
    def test_coalesce_empty(self):
 
177
        self.check([], [])
 
178
 
 
179
    def test_coalesce_simple(self):
 
180
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
181
 
 
182
    def test_coalesce_unrelated(self):
 
183
        self.check([(0, 10, [(0, 10)]),
 
184
                    (20, 10, [(0, 10)]),
 
185
                   ], [(0, 10), (20, 10)])
 
186
 
 
187
    def test_coalesce_unsorted(self):
 
188
        self.check([(20, 10, [(0, 10)]),
 
189
                    (0, 10, [(0, 10)]),
 
190
                   ], [(20, 10), (0, 10)])
 
191
 
 
192
    def test_coalesce_nearby(self):
 
193
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
194
                   [(0, 10), (10, 10)])
 
195
 
 
196
    def test_coalesce_overlapped(self):
 
197
        self.assertRaises(ValueError,
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
 
200
 
 
201
    def test_coalesce_limit(self):
 
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
203
                              (30, 10), (40, 10)]),
 
204
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
205
                              (30, 10), (40, 10)]),
 
206
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
207
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
208
                       (90, 10), (100, 10)],
 
209
                    limit=5)
 
210
 
 
211
    def test_coalesce_no_limit(self):
 
212
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
213
                               (30, 10), (40, 10), (50, 10),
 
214
                               (60, 10), (70, 10), (80, 10),
 
215
                               (90, 10)]),
 
216
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
217
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
218
                       (90, 10), (100, 10)])
 
219
 
 
220
    def test_coalesce_fudge(self):
 
221
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
222
                    (100, 10, [(0, 10),]),
 
223
                   ], [(10, 10), (30, 10), (100, 10)],
 
224
                   fudge=10
 
225
                  )
 
226
    def test_coalesce_max_size(self):
 
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
228
                    (30, 50, [(0, 50)]),
 
229
                    # If one range is above max_size, it gets its own coalesced
 
230
                    # offset
 
231
                    (100, 80, [(0, 80),]),],
 
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
233
                   max_size=50
 
234
                  )
 
235
 
 
236
    def test_coalesce_no_max_size(self):
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
239
                  )
 
240
 
 
241
    def test_coalesce_default_limit(self):
 
242
        # By default we use a 100MB max size.
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
 
250
 
 
251
 
 
252
class TestMemoryTransport(TestCase):
 
253
 
 
254
    def test_get_transport(self):
 
255
        MemoryTransport()
 
256
 
 
257
    def test_clone(self):
 
258
        transport = MemoryTransport()
 
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
 
261
 
 
262
    def test_abspath(self):
 
263
        transport = MemoryTransport()
 
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
265
 
 
266
    def test_abspath_of_root(self):
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
 
270
 
 
271
    def test_abspath_of_relpath_starting_at_root(self):
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
274
 
 
275
    def test_append_and_get(self):
 
276
        transport = MemoryTransport()
 
277
        transport.append_bytes('path', 'content')
 
278
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        transport.append_file('path', StringIO('content'))
 
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
281
 
 
282
    def test_put_and_get(self):
 
283
        transport = MemoryTransport()
 
284
        transport.put_file('path', StringIO('content'))
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.put_bytes('path', 'content')
 
287
        self.assertEqual(transport.get('path').read(), 'content')
 
288
 
 
289
    def test_append_without_dir_fails(self):
 
290
        transport = MemoryTransport()
 
291
        self.assertRaises(NoSuchFile,
 
292
                          transport.append_bytes, 'dir/path', 'content')
 
293
 
 
294
    def test_put_without_dir_fails(self):
 
295
        transport = MemoryTransport()
 
296
        self.assertRaises(NoSuchFile,
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
 
298
 
 
299
    def test_get_missing(self):
 
300
        transport = MemoryTransport()
 
301
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
302
 
 
303
    def test_has_missing(self):
 
304
        transport = MemoryTransport()
 
305
        self.assertEquals(False, transport.has('foo'))
 
306
 
 
307
    def test_has_present(self):
 
308
        transport = MemoryTransport()
 
309
        transport.append_bytes('foo', 'content')
 
310
        self.assertEquals(True, transport.has('foo'))
 
311
 
 
312
    def test_list_dir(self):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
 
318
 
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
321
 
 
322
    def test_mkdir(self):
 
323
        transport = MemoryTransport()
 
324
        transport.mkdir('dir')
 
325
        transport.append_bytes('dir/path', 'content')
 
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
327
 
 
328
    def test_mkdir_missing_parent(self):
 
329
        transport = MemoryTransport()
 
330
        self.assertRaises(NoSuchFile,
 
331
                          transport.mkdir, 'dir/dir')
 
332
 
 
333
    def test_mkdir_twice(self):
 
334
        transport = MemoryTransport()
 
335
        transport.mkdir('dir')
 
336
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
337
 
 
338
    def test_parameters(self):
 
339
        transport = MemoryTransport()
 
340
        self.assertEqual(True, transport.listable())
 
341
        self.assertEqual(False, transport.is_readonly())
 
342
 
 
343
    def test_iter_files_recursive(self):
 
344
        transport = MemoryTransport()
 
345
        transport.mkdir('dir')
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
 
349
        paths = set(transport.iter_files_recursive())
 
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
351
 
 
352
    def test_stat(self):
 
353
        transport = MemoryTransport()
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
 
356
        self.assertEqual(7, transport.stat('foo').st_size)
 
357
        self.assertEqual(6, transport.stat('bar').st_size)
 
358
 
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
 
361
    """Chroot decoration specific tests."""
 
362
 
 
363
    def test_abspath(self):
 
364
        # The abspath is always relative to the chroot_url.
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        server.setUp()
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
369
 
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
        server.tearDown()
 
373
 
 
374
    def test_clone(self):
 
375
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
376
        server.setUp()
 
377
        transport = get_transport(server.get_url())
 
378
        # relpath from root and root path are the same
 
379
        relpath_cloned = transport.clone('foo')
 
380
        abspath_cloned = transport.clone('/foo')
 
381
        self.assertEqual(server, relpath_cloned.server)
 
382
        self.assertEqual(server, abspath_cloned.server)
 
383
        server.tearDown()
 
384
    
 
385
    def test_chroot_url_preserves_chroot(self):
 
386
        """Calling get_transport on a chroot transport's base should produce a
 
387
        transport with exactly the same behaviour as the original chroot
 
388
        transport.
 
389
 
 
390
        This is so that it is not possible to escape a chroot by doing::
 
391
            url = chroot_transport.base
 
392
            parent_url = urlutils.join(url, '..')
 
393
            new_transport = get_transport(parent_url)
 
394
        """
 
395
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
396
        server.setUp()
 
397
        transport = get_transport(server.get_url())
 
398
        new_transport = get_transport(transport.base)
 
399
        self.assertEqual(transport.server, new_transport.server)
 
400
        self.assertEqual(transport.base, new_transport.base)
 
401
        server.tearDown()
 
402
        
 
403
    def test_urljoin_preserves_chroot(self):
 
404
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
405
        URL that escapes the intended chroot.
 
406
 
 
407
        This is so that it is not possible to escape a chroot by doing::
 
408
            url = chroot_transport.base
 
409
            parent_url = urlutils.join(url, '..')
 
410
            new_transport = get_transport(parent_url)
 
411
        """
 
412
        server = ChrootServer(get_transport('memory:///path/'))
 
413
        server.setUp()
 
414
        transport = get_transport(server.get_url())
 
415
        self.assertRaises(
 
416
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
417
        server.tearDown()
 
418
 
 
419
 
 
420
class ChrootServerTest(TestCase):
 
421
 
 
422
    def test_construct(self):
 
423
        backing_transport = MemoryTransport()
 
424
        server = ChrootServer(backing_transport)
 
425
        self.assertEqual(backing_transport, server.backing_transport)
 
426
 
 
427
    def test_setUp(self):
 
428
        backing_transport = MemoryTransport()
 
429
        server = ChrootServer(backing_transport)
 
430
        server.setUp()
 
431
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
432
 
 
433
    def test_tearDown(self):
 
434
        backing_transport = MemoryTransport()
 
435
        server = ChrootServer(backing_transport)
 
436
        server.setUp()
 
437
        server.tearDown()
 
438
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
439
 
 
440
    def test_get_url(self):
 
441
        backing_transport = MemoryTransport()
 
442
        server = ChrootServer(backing_transport)
 
443
        server.setUp()
 
444
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
445
        server.tearDown()
 
446
 
 
447
 
 
448
class ReadonlyDecoratorTransportTest(TestCase):
 
449
    """Readonly decoration specific tests."""
 
450
 
 
451
    def test_local_parameters(self):
 
452
        import bzrlib.transport.readonly as readonly
 
453
        # connect to . in readonly mode
 
454
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
455
        self.assertEqual(True, transport.listable())
 
456
        self.assertEqual(True, transport.is_readonly())
 
457
 
 
458
    def test_http_parameters(self):
 
459
        from bzrlib.tests.http_server import HttpServer
 
460
        import bzrlib.transport.readonly as readonly
 
461
        # connect to '.' via http which is not listable
 
462
        server = HttpServer()
 
463
        server.setUp()
 
464
        try:
 
465
            transport = get_transport('readonly+' + server.get_url())
 
466
            self.failUnless(isinstance(transport,
 
467
                                       readonly.ReadonlyTransportDecorator))
 
468
            self.assertEqual(False, transport.listable())
 
469
            self.assertEqual(True, transport.is_readonly())
 
470
        finally:
 
471
            server.tearDown()
 
472
 
 
473
 
 
474
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
475
    """NFS decorator specific tests."""
 
476
 
 
477
    def get_nfs_transport(self, url):
 
478
        import bzrlib.transport.fakenfs as fakenfs
 
479
        # connect to url with nfs decoration
 
480
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
481
 
 
482
    def test_local_parameters(self):
 
483
        # the listable and is_readonly parameters
 
484
        # are not changed by the fakenfs decorator
 
485
        transport = self.get_nfs_transport('.')
 
486
        self.assertEqual(True, transport.listable())
 
487
        self.assertEqual(False, transport.is_readonly())
 
488
 
 
489
    def test_http_parameters(self):
 
490
        # the listable and is_readonly parameters
 
491
        # are not changed by the fakenfs decorator
 
492
        from bzrlib.tests.http_server import HttpServer
 
493
        # connect to '.' via http which is not listable
 
494
        server = HttpServer()
 
495
        server.setUp()
 
496
        try:
 
497
            transport = self.get_nfs_transport(server.get_url())
 
498
            self.assertIsInstance(
 
499
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
500
            self.assertEqual(False, transport.listable())
 
501
            self.assertEqual(True, transport.is_readonly())
 
502
        finally:
 
503
            server.tearDown()
 
504
 
 
505
    def test_fakenfs_server_default(self):
 
506
        # a FakeNFSServer() should bring up a local relpath server for itself
 
507
        import bzrlib.transport.fakenfs as fakenfs
 
508
        server = fakenfs.FakeNFSServer()
 
509
        server.setUp()
 
510
        try:
 
511
            # the url should be decorated appropriately
 
512
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
513
            # and we should be able to get a transport for it
 
514
            transport = get_transport(server.get_url())
 
515
            # which must be a FakeNFSTransportDecorator instance.
 
516
            self.assertIsInstance(
 
517
                transport, fakenfs.FakeNFSTransportDecorator)
 
518
        finally:
 
519
            server.tearDown()
 
520
 
 
521
    def test_fakenfs_rename_semantics(self):
 
522
        # a FakeNFS transport must mangle the way rename errors occur to
 
523
        # look like NFS problems.
 
524
        transport = self.get_nfs_transport('.')
 
525
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
526
                        transport=transport)
 
527
        self.assertRaises(errors.ResourceBusy,
 
528
                          transport.rename, 'from', 'to')
 
529
 
 
530
 
 
531
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
532
    """Tests for simulation of VFAT restrictions"""
 
533
 
 
534
    def get_vfat_transport(self, url):
 
535
        """Return vfat-backed transport for test directory"""
 
536
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
537
        return FakeVFATTransportDecorator('vfat+' + url)
 
538
 
 
539
    def test_transport_creation(self):
 
540
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
541
        transport = self.get_vfat_transport('.')
 
542
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
543
 
 
544
    def test_transport_mkdir(self):
 
545
        transport = self.get_vfat_transport('.')
 
546
        transport.mkdir('HELLO')
 
547
        self.assertTrue(transport.has('hello'))
 
548
        self.assertTrue(transport.has('Hello'))
 
549
 
 
550
    def test_forbidden_chars(self):
 
551
        transport = self.get_vfat_transport('.')
 
552
        self.assertRaises(ValueError, transport.has, "<NU>")
 
553
 
 
554
 
 
555
class BadTransportHandler(Transport):
 
556
    def __init__(self, base_url):
 
557
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
558
 
 
559
 
 
560
class BackupTransportHandler(Transport):
 
561
    """Test transport that works as a backup for the BadTransportHandler"""
 
562
    pass
 
563
 
 
564
 
 
565
class TestTransportImplementation(TestCaseInTempDir):
 
566
    """Implementation verification for transports.
 
567
    
 
568
    To verify a transport we need a server factory, which is a callable
 
569
    that accepts no parameters and returns an implementation of
 
570
    bzrlib.transport.Server.
 
571
    
 
572
    That Server is then used to construct transport instances and test
 
573
    the transport via loopback activity.
 
574
 
 
575
    Currently this assumes that the Transport object is connected to the 
 
576
    current working directory.  So that whatever is done 
 
577
    through the transport, should show up in the working 
 
578
    directory, and vice-versa. This is a bug, because its possible to have
 
579
    URL schemes which provide access to something that may not be 
 
580
    result in storage on the local disk, i.e. due to file system limits, or 
 
581
    due to it being a database or some other non-filesystem tool.
 
582
 
 
583
    This also tests to make sure that the functions work with both
 
584
    generators and lists (assuming iter(list) is effectively a generator)
 
585
    """
 
586
    
 
587
    def setUp(self):
 
588
        super(TestTransportImplementation, self).setUp()
 
589
        self._server = self.transport_server()
 
590
        self._server.setUp()
 
591
        self.addCleanup(self._server.tearDown)
 
592
 
 
593
    def get_transport(self, relpath=None):
 
594
        """Return a connected transport to the local directory.
 
595
 
 
596
        :param relpath: a path relative to the base url.
 
597
        """
 
598
        base_url = self._server.get_url()
 
599
        url = self._adjust_url(base_url, relpath)
 
600
        # try getting the transport via the regular interface:
 
601
        t = get_transport(url)
 
602
        # vila--20070607 if the following are commented out the test suite
 
603
        # still pass. Is this really still needed or was it a forgotten
 
604
        # temporary fix ?
 
605
        if not isinstance(t, self.transport_class):
 
606
            # we did not get the correct transport class type. Override the
 
607
            # regular connection behaviour by direct construction.
 
608
            t = self.transport_class(url)
 
609
        return t
 
610
 
 
611
 
 
612
class TestLocalTransports(TestCase):
 
613
 
 
614
    def test_get_transport_from_abspath(self):
 
615
        here = osutils.abspath('.')
 
616
        t = get_transport(here)
 
617
        self.assertIsInstance(t, LocalTransport)
 
618
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
619
 
 
620
    def test_get_transport_from_relpath(self):
 
621
        here = osutils.abspath('.')
 
622
        t = get_transport('.')
 
623
        self.assertIsInstance(t, LocalTransport)
 
624
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
625
 
 
626
    def test_get_transport_from_local_url(self):
 
627
        here = osutils.abspath('.')
 
628
        here_url = urlutils.local_path_to_url(here) + '/'
 
629
        t = get_transport(here_url)
 
630
        self.assertIsInstance(t, LocalTransport)
 
631
        self.assertEquals(t.base, here_url)
 
632
 
 
633
    def test_local_abspath(self):
 
634
        here = osutils.abspath('.')
 
635
        t = get_transport(here)
 
636
        self.assertEquals(t.local_abspath(''), here)
 
637
 
 
638
 
 
639
class TestWin32LocalTransport(TestCase):
 
640
 
 
641
    def test_unc_clone_to_root(self):
 
642
        # Win32 UNC path like \\HOST\path
 
643
        # clone to root should stop at least at \\HOST part
 
644
        # not on \\
 
645
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
646
        for i in xrange(4):
 
647
            t = t.clone('..')
 
648
        self.assertEquals(t.base, 'file://HOST/')
 
649
        # make sure we reach the root
 
650
        t = t.clone('..')
 
651
        self.assertEquals(t.base, 'file://HOST/')
 
652
 
 
653
 
 
654
class TestConnectedTransport(TestCase):
 
655
    """Tests for connected to remote server transports"""
 
656
 
 
657
    def test_parse_url(self):
 
658
        t = ConnectedTransport('http://simple.example.com/home/source')
 
659
        self.assertEquals(t._host, 'simple.example.com')
 
660
        self.assertEquals(t._port, None)
 
661
        self.assertEquals(t._path, '/home/source/')
 
662
        self.failUnless(t._user is None)
 
663
        self.failUnless(t._password is None)
 
664
 
 
665
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
666
 
 
667
    def test_parse_url_with_at_in_user(self):
 
668
        # Bug 228058
 
669
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
670
        self.assertEquals(t._user, 'user@host.com')
 
671
 
 
672
    def test_parse_quoted_url(self):
 
673
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
674
        self.assertEquals(t._host, 'exAmple.com')
 
675
        self.assertEquals(t._port, 2222)
 
676
        self.assertEquals(t._user, 'robey')
 
677
        self.assertEquals(t._password, 'h@t')
 
678
        self.assertEquals(t._path, '/path/')
 
679
 
 
680
        # Base should not keep track of the password
 
681
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
682
 
 
683
    def test_parse_invalid_url(self):
 
684
        self.assertRaises(errors.InvalidURL,
 
685
                          ConnectedTransport,
 
686
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
687
 
 
688
    def test_relpath(self):
 
689
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
690
 
 
691
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
692
        self.assertRaises(errors.PathNotChild, t.relpath,
 
693
                          'http://user@host.com/abs/path/sub')
 
694
        self.assertRaises(errors.PathNotChild, t.relpath,
 
695
                          'sftp://user2@host.com/abs/path/sub')
 
696
        self.assertRaises(errors.PathNotChild, t.relpath,
 
697
                          'sftp://user@otherhost.com/abs/path/sub')
 
698
        self.assertRaises(errors.PathNotChild, t.relpath,
 
699
                          'sftp://user@host.com:33/abs/path/sub')
 
700
        # Make sure it works when we don't supply a username
 
701
        t = ConnectedTransport('sftp://host.com/abs/path')
 
702
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
703
 
 
704
        # Make sure it works when parts of the path will be url encoded
 
705
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
706
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
707
 
 
708
    def test_connection_sharing_propagate_credentials(self):
 
709
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
710
        self.assertEquals('user', t._user)
 
711
        self.assertEquals('host.com', t._host)
 
712
        self.assertIs(None, t._get_connection())
 
713
        self.assertIs(None, t._password)
 
714
        c = t.clone('subdir')
 
715
        self.assertIs(None, c._get_connection())
 
716
        self.assertIs(None, t._password)
 
717
 
 
718
        # Simulate the user entering a password
 
719
        password = 'secret'
 
720
        connection = object()
 
721
        t._set_connection(connection, password)
 
722
        self.assertIs(connection, t._get_connection())
 
723
        self.assertIs(password, t._get_credentials())
 
724
        self.assertIs(connection, c._get_connection())
 
725
        self.assertIs(password, c._get_credentials())
 
726
 
 
727
        # credentials can be updated
 
728
        new_password = 'even more secret'
 
729
        c._update_credentials(new_password)
 
730
        self.assertIs(connection, t._get_connection())
 
731
        self.assertIs(new_password, t._get_credentials())
 
732
        self.assertIs(connection, c._get_connection())
 
733
        self.assertIs(new_password, c._get_credentials())
 
734
 
 
735
 
 
736
class TestReusedTransports(TestCase):
 
737
    """Tests for transport reuse"""
 
738
 
 
739
    def test_reuse_same_transport(self):
 
740
        possible_transports = []
 
741
        t1 = get_transport('http://foo/',
 
742
                           possible_transports=possible_transports)
 
743
        self.assertEqual([t1], possible_transports)
 
744
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
745
        self.assertIs(t1, t2)
 
746
 
 
747
        # Also check that final '/' are handled correctly
 
748
        t3 = get_transport('http://foo/path/')
 
749
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
750
        self.assertIs(t3, t4)
 
751
 
 
752
        t5 = get_transport('http://foo/path')
 
753
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
754
        self.assertIs(t5, t6)
 
755
 
 
756
    def test_don_t_reuse_different_transport(self):
 
757
        t1 = get_transport('http://foo/path')
 
758
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
759
        self.assertIsNot(t1, t2)
 
760
 
 
761
 
 
762
class TestTransportTrace(TestCase):
 
763
 
 
764
    def test_get(self):
 
765
        transport = get_transport('trace+memory://')
 
766
        self.assertIsInstance(
 
767
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
768
 
 
769
    def test_clone_preserves_activity(self):
 
770
        transport = get_transport('trace+memory://')
 
771
        transport2 = transport.clone('.')
 
772
        self.assertTrue(transport is not transport2)
 
773
        self.assertTrue(transport._activity is transport2._activity)
 
774
 
 
775
    # the following specific tests are for the operations that have made use of
 
776
    # logging in tests; we could test every single operation but doing that
 
777
    # still won't cause a test failure when the top level Transport API
 
778
    # changes; so there is little return doing that.
 
779
    def test_get(self):
 
780
        transport = get_transport('trace+memory:///')
 
781
        transport.put_bytes('foo', 'barish')
 
782
        transport.get('foo')
 
783
        expected_result = []
 
784
        # put_bytes records the bytes, not the content to avoid memory
 
785
        # pressure.
 
786
        expected_result.append(('put_bytes', 'foo', 6, None))
 
787
        # get records the file name only.
 
788
        expected_result.append(('get', 'foo'))
 
789
        self.assertEqual(expected_result, transport._activity)
 
790
 
 
791
    def test_readv(self):
 
792
        transport = get_transport('trace+memory:///')
 
793
        transport.put_bytes('foo', 'barish')
 
794
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
795
            upper_limit=6))
 
796
        expected_result = []
 
797
        # put_bytes records the bytes, not the content to avoid memory
 
798
        # pressure.
 
799
        expected_result.append(('put_bytes', 'foo', 6, None))
 
800
        # readv records the supplied offset request
 
801
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
802
        self.assertEqual(expected_result, transport._activity)