~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

merge merge tweaks from aaron, which includes latest .dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
from cStringIO import StringIO
19
 
 
20
 
import bzrlib
21
 
from bzrlib import (
22
 
    errors,
23
 
    osutils,
24
 
    urlutils,
25
 
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
51
 
 
52
 
 
53
 
# TODO: Should possibly split transport-specific tests into their own files.
54
 
 
55
 
 
56
 
class TestTransport(TestCase):
57
 
    """Test the non transport-concrete class functionality."""
58
 
 
59
 
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
62
 
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
65
 
        finally:
66
 
            _set_protocol_handlers(handlers)
67
 
 
68
 
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
70
 
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
72
 
        class SampleHandler(object):
73
 
            """I exist, isnt that enough?"""
74
 
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
85
 
        finally:
86
 
            _set_protocol_handlers(handlers)
87
 
 
88
 
    def test_transport_dependency(self):
89
 
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
91
 
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
93
 
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
107
 
        finally:
108
 
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
111
 
    def test_transport_fallback(self):
112
 
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
114
 
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
122
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
 
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
125
 
 
126
 
    def test_ssh_hints(self):
127
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
 
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
131
 
            e_str = str(e)
132
 
            self.assertEquals('Unsupported protocol'
133
 
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".', 
135
 
                              str(e))
136
 
        else:
137
 
            self.fail('Did not raise UnsupportedProtocol')
138
 
 
139
 
    def test_LateReadError(self):
140
 
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
142
 
        try:
143
 
            a_file.read()
144
 
        except ReadError, error:
145
 
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
147
 
        a_file.close()
148
 
 
149
 
    def test__combine_paths(self):
150
 
        t = Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
 
    def test_local_abspath_non_local_transport(self):
161
 
        # the base implementation should throw
162
 
        t = MemoryTransport()
163
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
 
        self.assertEqual('memory:///t is not a local path.', str(e))
165
 
 
166
 
 
167
 
class TestCoalesceOffsets(TestCase):
168
 
 
169
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
 
        coalesce = Transport._coalesce_offsets
171
 
        exp = [_CoalescedOffset(*x) for x in expected]
172
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
 
                            max_size=max_size))
174
 
        self.assertEqual(exp, out)
175
 
 
176
 
    def test_coalesce_empty(self):
177
 
        self.check([], [])
178
 
 
179
 
    def test_coalesce_simple(self):
180
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
181
 
 
182
 
    def test_coalesce_unrelated(self):
183
 
        self.check([(0, 10, [(0, 10)]),
184
 
                    (20, 10, [(0, 10)]),
185
 
                   ], [(0, 10), (20, 10)])
186
 
 
187
 
    def test_coalesce_unsorted(self):
188
 
        self.check([(20, 10, [(0, 10)]),
189
 
                    (0, 10, [(0, 10)]),
190
 
                   ], [(20, 10), (0, 10)])
191
 
 
192
 
    def test_coalesce_nearby(self):
193
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
194
 
                   [(0, 10), (10, 10)])
195
 
 
196
 
    def test_coalesce_overlapped(self):
197
 
        self.assertRaises(ValueError,
198
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
199
 
                        [(0, 10), (5, 10)])
200
 
 
201
 
    def test_coalesce_limit(self):
202
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
203
 
                              (30, 10), (40, 10)]),
204
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
205
 
                              (30, 10), (40, 10)]),
206
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
207
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
208
 
                       (90, 10), (100, 10)],
209
 
                    limit=5)
210
 
 
211
 
    def test_coalesce_no_limit(self):
212
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
213
 
                               (30, 10), (40, 10), (50, 10),
214
 
                               (60, 10), (70, 10), (80, 10),
215
 
                               (90, 10)]),
216
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
217
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
218
 
                       (90, 10), (100, 10)])
219
 
 
220
 
    def test_coalesce_fudge(self):
221
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
223
 
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
226
 
    def test_coalesce_max_size(self):
227
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
 
                    (30, 50, [(0, 50)]),
229
 
                    # If one range is above max_size, it gets its own coalesced
230
 
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
232
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
235
 
 
236
 
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
 
                  )
240
 
 
241
 
    def test_coalesce_default_limit(self):
242
 
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
249
 
                   max_size=1*1024*1024*1024)
250
 
 
251
 
 
252
 
class TestMemoryTransport(TestCase):
253
 
 
254
 
    def test_get_transport(self):
255
 
        MemoryTransport()
256
 
 
257
 
    def test_clone(self):
258
 
        transport = MemoryTransport()
259
 
        self.assertTrue(isinstance(transport, MemoryTransport))
260
 
        self.assertEqual("memory:///", transport.clone("/").base)
261
 
 
262
 
    def test_abspath(self):
263
 
        transport = MemoryTransport()
264
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
265
 
 
266
 
    def test_abspath_of_root(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual("memory:///", transport.base)
269
 
        self.assertEqual("memory:///", transport.abspath('/'))
270
 
 
271
 
    def test_abspath_of_relpath_starting_at_root(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
274
 
 
275
 
    def test_append_and_get(self):
276
 
        transport = MemoryTransport()
277
 
        transport.append_bytes('path', 'content')
278
 
        self.assertEqual(transport.get('path').read(), 'content')
279
 
        transport.append_file('path', StringIO('content'))
280
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
281
 
 
282
 
    def test_put_and_get(self):
283
 
        transport = MemoryTransport()
284
 
        transport.put_file('path', StringIO('content'))
285
 
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.put_bytes('path', 'content')
287
 
        self.assertEqual(transport.get('path').read(), 'content')
288
 
 
289
 
    def test_append_without_dir_fails(self):
290
 
        transport = MemoryTransport()
291
 
        self.assertRaises(NoSuchFile,
292
 
                          transport.append_bytes, 'dir/path', 'content')
293
 
 
294
 
    def test_put_without_dir_fails(self):
295
 
        transport = MemoryTransport()
296
 
        self.assertRaises(NoSuchFile,
297
 
                          transport.put_file, 'dir/path', StringIO('content'))
298
 
 
299
 
    def test_get_missing(self):
300
 
        transport = MemoryTransport()
301
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
302
 
 
303
 
    def test_has_missing(self):
304
 
        transport = MemoryTransport()
305
 
        self.assertEquals(False, transport.has('foo'))
306
 
 
307
 
    def test_has_present(self):
308
 
        transport = MemoryTransport()
309
 
        transport.append_bytes('foo', 'content')
310
 
        self.assertEquals(True, transport.has('foo'))
311
 
 
312
 
    def test_list_dir(self):
313
 
        transport = MemoryTransport()
314
 
        transport.put_bytes('foo', 'content')
315
 
        transport.mkdir('dir')
316
 
        transport.put_bytes('dir/subfoo', 'content')
317
 
        transport.put_bytes('dirlike', 'content')
318
 
 
319
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
321
 
 
322
 
    def test_mkdir(self):
323
 
        transport = MemoryTransport()
324
 
        transport.mkdir('dir')
325
 
        transport.append_bytes('dir/path', 'content')
326
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
327
 
 
328
 
    def test_mkdir_missing_parent(self):
329
 
        transport = MemoryTransport()
330
 
        self.assertRaises(NoSuchFile,
331
 
                          transport.mkdir, 'dir/dir')
332
 
 
333
 
    def test_mkdir_twice(self):
334
 
        transport = MemoryTransport()
335
 
        transport.mkdir('dir')
336
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
337
 
 
338
 
    def test_parameters(self):
339
 
        transport = MemoryTransport()
340
 
        self.assertEqual(True, transport.listable())
341
 
        self.assertEqual(False, transport.is_readonly())
342
 
 
343
 
    def test_iter_files_recursive(self):
344
 
        transport = MemoryTransport()
345
 
        transport.mkdir('dir')
346
 
        transport.put_bytes('dir/foo', 'content')
347
 
        transport.put_bytes('dir/bar', 'content')
348
 
        transport.put_bytes('bar', 'content')
349
 
        paths = set(transport.iter_files_recursive())
350
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
351
 
 
352
 
    def test_stat(self):
353
 
        transport = MemoryTransport()
354
 
        transport.put_bytes('foo', 'content')
355
 
        transport.put_bytes('bar', 'phowar')
356
 
        self.assertEqual(7, transport.stat('foo').st_size)
357
 
        self.assertEqual(6, transport.stat('bar').st_size)
358
 
 
359
 
 
360
 
class ChrootDecoratorTransportTest(TestCase):
361
 
    """Chroot decoration specific tests."""
362
 
 
363
 
    def test_abspath(self):
364
 
        # The abspath is always relative to the chroot_url.
365
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        server.setUp()
367
 
        transport = get_transport(server.get_url())
368
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
 
 
370
 
        subdir_transport = transport.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
        server.tearDown()
373
 
 
374
 
    def test_clone(self):
375
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
376
 
        server.setUp()
377
 
        transport = get_transport(server.get_url())
378
 
        # relpath from root and root path are the same
379
 
        relpath_cloned = transport.clone('foo')
380
 
        abspath_cloned = transport.clone('/foo')
381
 
        self.assertEqual(server, relpath_cloned.server)
382
 
        self.assertEqual(server, abspath_cloned.server)
383
 
        server.tearDown()
384
 
    
385
 
    def test_chroot_url_preserves_chroot(self):
386
 
        """Calling get_transport on a chroot transport's base should produce a
387
 
        transport with exactly the same behaviour as the original chroot
388
 
        transport.
389
 
 
390
 
        This is so that it is not possible to escape a chroot by doing::
391
 
            url = chroot_transport.base
392
 
            parent_url = urlutils.join(url, '..')
393
 
            new_transport = get_transport(parent_url)
394
 
        """
395
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
396
 
        server.setUp()
397
 
        transport = get_transport(server.get_url())
398
 
        new_transport = get_transport(transport.base)
399
 
        self.assertEqual(transport.server, new_transport.server)
400
 
        self.assertEqual(transport.base, new_transport.base)
401
 
        server.tearDown()
402
 
        
403
 
    def test_urljoin_preserves_chroot(self):
404
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
405
 
        URL that escapes the intended chroot.
406
 
 
407
 
        This is so that it is not possible to escape a chroot by doing::
408
 
            url = chroot_transport.base
409
 
            parent_url = urlutils.join(url, '..')
410
 
            new_transport = get_transport(parent_url)
411
 
        """
412
 
        server = ChrootServer(get_transport('memory:///path/'))
413
 
        server.setUp()
414
 
        transport = get_transport(server.get_url())
415
 
        self.assertRaises(
416
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
417
 
        server.tearDown()
418
 
 
419
 
 
420
 
class ChrootServerTest(TestCase):
421
 
 
422
 
    def test_construct(self):
423
 
        backing_transport = MemoryTransport()
424
 
        server = ChrootServer(backing_transport)
425
 
        self.assertEqual(backing_transport, server.backing_transport)
426
 
 
427
 
    def test_setUp(self):
428
 
        backing_transport = MemoryTransport()
429
 
        server = ChrootServer(backing_transport)
430
 
        server.setUp()
431
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
432
 
 
433
 
    def test_tearDown(self):
434
 
        backing_transport = MemoryTransport()
435
 
        server = ChrootServer(backing_transport)
436
 
        server.setUp()
437
 
        server.tearDown()
438
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
439
 
 
440
 
    def test_get_url(self):
441
 
        backing_transport = MemoryTransport()
442
 
        server = ChrootServer(backing_transport)
443
 
        server.setUp()
444
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        server.tearDown()
446
 
 
447
 
 
448
 
class ReadonlyDecoratorTransportTest(TestCase):
449
 
    """Readonly decoration specific tests."""
450
 
 
451
 
    def test_local_parameters(self):
452
 
        import bzrlib.transport.readonly as readonly
453
 
        # connect to . in readonly mode
454
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
 
        self.assertEqual(True, transport.listable())
456
 
        self.assertEqual(True, transport.is_readonly())
457
 
 
458
 
    def test_http_parameters(self):
459
 
        from bzrlib.tests.http_server import HttpServer
460
 
        import bzrlib.transport.readonly as readonly
461
 
        # connect to '.' via http which is not listable
462
 
        server = HttpServer()
463
 
        server.setUp()
464
 
        try:
465
 
            transport = get_transport('readonly+' + server.get_url())
466
 
            self.failUnless(isinstance(transport,
467
 
                                       readonly.ReadonlyTransportDecorator))
468
 
            self.assertEqual(False, transport.listable())
469
 
            self.assertEqual(True, transport.is_readonly())
470
 
        finally:
471
 
            server.tearDown()
472
 
 
473
 
 
474
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
475
 
    """NFS decorator specific tests."""
476
 
 
477
 
    def get_nfs_transport(self, url):
478
 
        import bzrlib.transport.fakenfs as fakenfs
479
 
        # connect to url with nfs decoration
480
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
481
 
 
482
 
    def test_local_parameters(self):
483
 
        # the listable and is_readonly parameters
484
 
        # are not changed by the fakenfs decorator
485
 
        transport = self.get_nfs_transport('.')
486
 
        self.assertEqual(True, transport.listable())
487
 
        self.assertEqual(False, transport.is_readonly())
488
 
 
489
 
    def test_http_parameters(self):
490
 
        # the listable and is_readonly parameters
491
 
        # are not changed by the fakenfs decorator
492
 
        from bzrlib.tests.http_server import HttpServer
493
 
        # connect to '.' via http which is not listable
494
 
        server = HttpServer()
495
 
        server.setUp()
496
 
        try:
497
 
            transport = self.get_nfs_transport(server.get_url())
498
 
            self.assertIsInstance(
499
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
500
 
            self.assertEqual(False, transport.listable())
501
 
            self.assertEqual(True, transport.is_readonly())
502
 
        finally:
503
 
            server.tearDown()
504
 
 
505
 
    def test_fakenfs_server_default(self):
506
 
        # a FakeNFSServer() should bring up a local relpath server for itself
507
 
        import bzrlib.transport.fakenfs as fakenfs
508
 
        server = fakenfs.FakeNFSServer()
509
 
        server.setUp()
510
 
        try:
511
 
            # the url should be decorated appropriately
512
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
513
 
            # and we should be able to get a transport for it
514
 
            transport = get_transport(server.get_url())
515
 
            # which must be a FakeNFSTransportDecorator instance.
516
 
            self.assertIsInstance(
517
 
                transport, fakenfs.FakeNFSTransportDecorator)
518
 
        finally:
519
 
            server.tearDown()
520
 
 
521
 
    def test_fakenfs_rename_semantics(self):
522
 
        # a FakeNFS transport must mangle the way rename errors occur to
523
 
        # look like NFS problems.
524
 
        transport = self.get_nfs_transport('.')
525
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
526
 
                        transport=transport)
527
 
        self.assertRaises(errors.ResourceBusy,
528
 
                          transport.rename, 'from', 'to')
529
 
 
530
 
 
531
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
532
 
    """Tests for simulation of VFAT restrictions"""
533
 
 
534
 
    def get_vfat_transport(self, url):
535
 
        """Return vfat-backed transport for test directory"""
536
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
537
 
        return FakeVFATTransportDecorator('vfat+' + url)
538
 
 
539
 
    def test_transport_creation(self):
540
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
541
 
        transport = self.get_vfat_transport('.')
542
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
543
 
 
544
 
    def test_transport_mkdir(self):
545
 
        transport = self.get_vfat_transport('.')
546
 
        transport.mkdir('HELLO')
547
 
        self.assertTrue(transport.has('hello'))
548
 
        self.assertTrue(transport.has('Hello'))
549
 
 
550
 
    def test_forbidden_chars(self):
551
 
        transport = self.get_vfat_transport('.')
552
 
        self.assertRaises(ValueError, transport.has, "<NU>")
553
 
 
554
 
 
555
 
class BadTransportHandler(Transport):
556
 
    def __init__(self, base_url):
557
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
558
 
 
559
 
 
560
 
class BackupTransportHandler(Transport):
561
 
    """Test transport that works as a backup for the BadTransportHandler"""
562
 
    pass
563
 
 
564
 
 
565
 
class TestTransportImplementation(TestCaseInTempDir):
566
 
    """Implementation verification for transports.
567
 
    
568
 
    To verify a transport we need a server factory, which is a callable
569
 
    that accepts no parameters and returns an implementation of
570
 
    bzrlib.transport.Server.
571
 
    
572
 
    That Server is then used to construct transport instances and test
573
 
    the transport via loopback activity.
574
 
 
575
 
    Currently this assumes that the Transport object is connected to the 
576
 
    current working directory.  So that whatever is done 
577
 
    through the transport, should show up in the working 
578
 
    directory, and vice-versa. This is a bug, because its possible to have
579
 
    URL schemes which provide access to something that may not be 
580
 
    result in storage on the local disk, i.e. due to file system limits, or 
581
 
    due to it being a database or some other non-filesystem tool.
582
 
 
583
 
    This also tests to make sure that the functions work with both
584
 
    generators and lists (assuming iter(list) is effectively a generator)
585
 
    """
586
 
    
587
 
    def setUp(self):
588
 
        super(TestTransportImplementation, self).setUp()
589
 
        self._server = self.transport_server()
590
 
        self._server.setUp()
591
 
        self.addCleanup(self._server.tearDown)
592
 
 
593
 
    def get_transport(self, relpath=None):
594
 
        """Return a connected transport to the local directory.
595
 
 
596
 
        :param relpath: a path relative to the base url.
597
 
        """
598
 
        base_url = self._server.get_url()
599
 
        url = self._adjust_url(base_url, relpath)
600
 
        # try getting the transport via the regular interface:
601
 
        t = get_transport(url)
602
 
        # vila--20070607 if the following are commented out the test suite
603
 
        # still pass. Is this really still needed or was it a forgotten
604
 
        # temporary fix ?
605
 
        if not isinstance(t, self.transport_class):
606
 
            # we did not get the correct transport class type. Override the
607
 
            # regular connection behaviour by direct construction.
608
 
            t = self.transport_class(url)
609
 
        return t
610
 
 
611
 
 
612
 
class TestLocalTransports(TestCase):
613
 
 
614
 
    def test_get_transport_from_abspath(self):
615
 
        here = osutils.abspath('.')
616
 
        t = get_transport(here)
617
 
        self.assertIsInstance(t, LocalTransport)
618
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
619
 
 
620
 
    def test_get_transport_from_relpath(self):
621
 
        here = osutils.abspath('.')
622
 
        t = get_transport('.')
623
 
        self.assertIsInstance(t, LocalTransport)
624
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
625
 
 
626
 
    def test_get_transport_from_local_url(self):
627
 
        here = osutils.abspath('.')
628
 
        here_url = urlutils.local_path_to_url(here) + '/'
629
 
        t = get_transport(here_url)
630
 
        self.assertIsInstance(t, LocalTransport)
631
 
        self.assertEquals(t.base, here_url)
632
 
 
633
 
    def test_local_abspath(self):
634
 
        here = osutils.abspath('.')
635
 
        t = get_transport(here)
636
 
        self.assertEquals(t.local_abspath(''), here)
637
 
 
638
 
 
639
 
class TestWin32LocalTransport(TestCase):
640
 
 
641
 
    def test_unc_clone_to_root(self):
642
 
        # Win32 UNC path like \\HOST\path
643
 
        # clone to root should stop at least at \\HOST part
644
 
        # not on \\
645
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
646
 
        for i in xrange(4):
647
 
            t = t.clone('..')
648
 
        self.assertEquals(t.base, 'file://HOST/')
649
 
        # make sure we reach the root
650
 
        t = t.clone('..')
651
 
        self.assertEquals(t.base, 'file://HOST/')
652
 
 
653
 
 
654
 
class TestConnectedTransport(TestCase):
655
 
    """Tests for connected to remote server transports"""
656
 
 
657
 
    def test_parse_url(self):
658
 
        t = ConnectedTransport('http://simple.example.com/home/source')
659
 
        self.assertEquals(t._host, 'simple.example.com')
660
 
        self.assertEquals(t._port, None)
661
 
        self.assertEquals(t._path, '/home/source/')
662
 
        self.failUnless(t._user is None)
663
 
        self.failUnless(t._password is None)
664
 
 
665
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
666
 
 
667
 
    def test_parse_url_with_at_in_user(self):
668
 
        # Bug 228058
669
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
670
 
        self.assertEquals(t._user, 'user@host.com')
671
 
 
672
 
    def test_parse_quoted_url(self):
673
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
 
        self.assertEquals(t._host, 'exAmple.com')
675
 
        self.assertEquals(t._port, 2222)
676
 
        self.assertEquals(t._user, 'robey')
677
 
        self.assertEquals(t._password, 'h@t')
678
 
        self.assertEquals(t._path, '/path/')
679
 
 
680
 
        # Base should not keep track of the password
681
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
682
 
 
683
 
    def test_parse_invalid_url(self):
684
 
        self.assertRaises(errors.InvalidURL,
685
 
                          ConnectedTransport,
686
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
687
 
 
688
 
    def test_relpath(self):
689
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
690
 
 
691
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
692
 
        self.assertRaises(errors.PathNotChild, t.relpath,
693
 
                          'http://user@host.com/abs/path/sub')
694
 
        self.assertRaises(errors.PathNotChild, t.relpath,
695
 
                          'sftp://user2@host.com/abs/path/sub')
696
 
        self.assertRaises(errors.PathNotChild, t.relpath,
697
 
                          'sftp://user@otherhost.com/abs/path/sub')
698
 
        self.assertRaises(errors.PathNotChild, t.relpath,
699
 
                          'sftp://user@host.com:33/abs/path/sub')
700
 
        # Make sure it works when we don't supply a username
701
 
        t = ConnectedTransport('sftp://host.com/abs/path')
702
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
703
 
 
704
 
        # Make sure it works when parts of the path will be url encoded
705
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
706
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
707
 
 
708
 
    def test_connection_sharing_propagate_credentials(self):
709
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
710
 
        self.assertEquals('user', t._user)
711
 
        self.assertEquals('host.com', t._host)
712
 
        self.assertIs(None, t._get_connection())
713
 
        self.assertIs(None, t._password)
714
 
        c = t.clone('subdir')
715
 
        self.assertIs(None, c._get_connection())
716
 
        self.assertIs(None, t._password)
717
 
 
718
 
        # Simulate the user entering a password
719
 
        password = 'secret'
720
 
        connection = object()
721
 
        t._set_connection(connection, password)
722
 
        self.assertIs(connection, t._get_connection())
723
 
        self.assertIs(password, t._get_credentials())
724
 
        self.assertIs(connection, c._get_connection())
725
 
        self.assertIs(password, c._get_credentials())
726
 
 
727
 
        # credentials can be updated
728
 
        new_password = 'even more secret'
729
 
        c._update_credentials(new_password)
730
 
        self.assertIs(connection, t._get_connection())
731
 
        self.assertIs(new_password, t._get_credentials())
732
 
        self.assertIs(connection, c._get_connection())
733
 
        self.assertIs(new_password, c._get_credentials())
734
 
 
735
 
 
736
 
class TestReusedTransports(TestCase):
737
 
    """Tests for transport reuse"""
738
 
 
739
 
    def test_reuse_same_transport(self):
740
 
        possible_transports = []
741
 
        t1 = get_transport('http://foo/',
742
 
                           possible_transports=possible_transports)
743
 
        self.assertEqual([t1], possible_transports)
744
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
745
 
        self.assertIs(t1, t2)
746
 
 
747
 
        # Also check that final '/' are handled correctly
748
 
        t3 = get_transport('http://foo/path/')
749
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
750
 
        self.assertIs(t3, t4)
751
 
 
752
 
        t5 = get_transport('http://foo/path')
753
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
754
 
        self.assertIs(t5, t6)
755
 
 
756
 
    def test_don_t_reuse_different_transport(self):
757
 
        t1 = get_transport('http://foo/path')
758
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
759
 
        self.assertIsNot(t1, t2)
760
 
 
761
 
 
762
 
class TestTransportTrace(TestCase):
763
 
 
764
 
    def test_get(self):
765
 
        transport = get_transport('trace+memory://')
766
 
        self.assertIsInstance(
767
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
768
 
 
769
 
    def test_clone_preserves_activity(self):
770
 
        transport = get_transport('trace+memory://')
771
 
        transport2 = transport.clone('.')
772
 
        self.assertTrue(transport is not transport2)
773
 
        self.assertTrue(transport._activity is transport2._activity)
774
 
 
775
 
    # the following specific tests are for the operations that have made use of
776
 
    # logging in tests; we could test every single operation but doing that
777
 
    # still won't cause a test failure when the top level Transport API
778
 
    # changes; so there is little return doing that.
779
 
    def test_get(self):
780
 
        transport = get_transport('trace+memory:///')
781
 
        transport.put_bytes('foo', 'barish')
782
 
        transport.get('foo')
783
 
        expected_result = []
784
 
        # put_bytes records the bytes, not the content to avoid memory
785
 
        # pressure.
786
 
        expected_result.append(('put_bytes', 'foo', 6, None))
787
 
        # get records the file name only.
788
 
        expected_result.append(('get', 'foo'))
789
 
        self.assertEqual(expected_result, transport._activity)
790
 
 
791
 
    def test_readv(self):
792
 
        transport = get_transport('trace+memory:///')
793
 
        transport.put_bytes('foo', 'barish')
794
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
795
 
            upper_limit=6))
796
 
        expected_result = []
797
 
        # put_bytes records the bytes, not the content to avoid memory
798
 
        # pressure.
799
 
        expected_result.append(('put_bytes', 'foo', 6, None))
800
 
        # readv records the supplied offset request
801
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
 
        self.assertEqual(expected_result, transport._activity)