~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-02-10 04:54:18 UTC
  • mfrom: (3988.1.3 bzr.dev)
  • Revision ID: pqm@pqm.ubuntu.com-20090210045418-u1c0p4zpnp6nna3n
(Jelmer) Add specification for colocated branches.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
 
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
 
54
 
 
55
 
 
56
class TestTransport(TestCase):
 
57
    """Test the non transport-concrete class functionality."""
 
58
 
 
59
    def test__get_set_protocol_handlers(self):
 
60
        handlers = _get_protocol_handlers()
 
61
        self.assertNotEqual([], handlers.keys( ))
 
62
        try:
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
 
67
 
 
68
    def test_get_transport_modules(self):
 
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
 
72
        class SampleHandler(object):
 
73
            """I exist, isnt that enough?"""
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
 
84
                             _get_transport_modules())
 
85
        finally:
 
86
            _set_protocol_handlers(handlers)
 
87
 
 
88
    def test_transport_dependency(self):
 
89
        """Transport with missing dependency causes no error"""
 
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
 
93
        try:
 
94
            register_transport_proto('foo')
 
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
96
                    'BadTransportHandler')
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
 
107
        finally:
 
108
            # restore original values
 
109
            _set_protocol_handlers(saved_handlers)
 
110
            
 
111
    def test_transport_fallback(self):
 
112
        """Transport with missing dependency causes no error"""
 
113
        saved_handlers = _get_protocol_handlers()
 
114
        try:
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
 
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
118
                    'BackupTransportHandler')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BadTransportHandler')
 
121
            t = get_transport('foo://fooserver/foo')
 
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
123
        finally:
 
124
            _set_protocol_handlers(saved_handlers)
 
125
 
 
126
    def test_LateReadError(self):
 
127
        """The LateReadError helper should raise on read()."""
 
128
        a_file = LateReadError('a path')
 
129
        try:
 
130
            a_file.read()
 
131
        except ReadError, error:
 
132
            self.assertEqual('a path', error.path)
 
133
        self.assertRaises(ReadError, a_file.read, 40)
 
134
        a_file.close()
 
135
 
 
136
    def test__combine_paths(self):
 
137
        t = Transport('/')
 
138
        self.assertEqual('/home/sarah/project/foo',
 
139
                         t._combine_paths('/home/sarah', 'project/foo'))
 
140
        self.assertEqual('/etc',
 
141
                         t._combine_paths('/home/sarah', '../../etc'))
 
142
        self.assertEqual('/etc',
 
143
                         t._combine_paths('/home/sarah', '../../../etc'))
 
144
        self.assertEqual('/etc',
 
145
                         t._combine_paths('/home/sarah', '/etc'))
 
146
 
 
147
    def test_local_abspath_non_local_transport(self):
 
148
        # the base implementation should throw
 
149
        t = MemoryTransport()
 
150
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
151
        self.assertEqual('memory:///t is not a local path.', str(e))
 
152
 
 
153
 
 
154
class TestCoalesceOffsets(TestCase):
 
155
 
 
156
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
157
        coalesce = Transport._coalesce_offsets
 
158
        exp = [_CoalescedOffset(*x) for x in expected]
 
159
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
160
                            max_size=max_size))
 
161
        self.assertEqual(exp, out)
 
162
 
 
163
    def test_coalesce_empty(self):
 
164
        self.check([], [])
 
165
 
 
166
    def test_coalesce_simple(self):
 
167
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
168
 
 
169
    def test_coalesce_unrelated(self):
 
170
        self.check([(0, 10, [(0, 10)]),
 
171
                    (20, 10, [(0, 10)]),
 
172
                   ], [(0, 10), (20, 10)])
 
173
 
 
174
    def test_coalesce_unsorted(self):
 
175
        self.check([(20, 10, [(0, 10)]),
 
176
                    (0, 10, [(0, 10)]),
 
177
                   ], [(20, 10), (0, 10)])
 
178
 
 
179
    def test_coalesce_nearby(self):
 
180
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
181
                   [(0, 10), (10, 10)])
 
182
 
 
183
    def test_coalesce_overlapped(self):
 
184
        self.assertRaises(ValueError,
 
185
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
186
                        [(0, 10), (5, 10)])
 
187
 
 
188
    def test_coalesce_limit(self):
 
189
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
190
                              (30, 10), (40, 10)]),
 
191
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
192
                              (30, 10), (40, 10)]),
 
193
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
194
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
195
                       (90, 10), (100, 10)],
 
196
                    limit=5)
 
197
 
 
198
    def test_coalesce_no_limit(self):
 
199
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
200
                               (30, 10), (40, 10), (50, 10),
 
201
                               (60, 10), (70, 10), (80, 10),
 
202
                               (90, 10)]),
 
203
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
204
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
205
                       (90, 10), (100, 10)])
 
206
 
 
207
    def test_coalesce_fudge(self):
 
208
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
209
                    (100, 10, [(0, 10),]),
 
210
                   ], [(10, 10), (30, 10), (100, 10)],
 
211
                   fudge=10
 
212
                  )
 
213
    def test_coalesce_max_size(self):
 
214
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
215
                    (30, 50, [(0, 50)]),
 
216
                    # If one range is above max_size, it gets its own coalesced
 
217
                    # offset
 
218
                    (100, 80, [(0, 80),]),],
 
219
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
220
                   max_size=50
 
221
                  )
 
222
 
 
223
    def test_coalesce_no_max_size(self):
 
224
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
225
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
226
                  )
 
227
 
 
228
    def test_coalesce_default_limit(self):
 
229
        # By default we use a 100MB max size.
 
230
        ten_mb = 10*1024*1024
 
231
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
232
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
233
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
234
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
235
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
236
                   max_size=1*1024*1024*1024)
 
237
 
 
238
 
 
239
class TestMemoryTransport(TestCase):
 
240
 
 
241
    def test_get_transport(self):
 
242
        MemoryTransport()
 
243
 
 
244
    def test_clone(self):
 
245
        transport = MemoryTransport()
 
246
        self.assertTrue(isinstance(transport, MemoryTransport))
 
247
        self.assertEqual("memory:///", transport.clone("/").base)
 
248
 
 
249
    def test_abspath(self):
 
250
        transport = MemoryTransport()
 
251
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
252
 
 
253
    def test_abspath_of_root(self):
 
254
        transport = MemoryTransport()
 
255
        self.assertEqual("memory:///", transport.base)
 
256
        self.assertEqual("memory:///", transport.abspath('/'))
 
257
 
 
258
    def test_abspath_of_relpath_starting_at_root(self):
 
259
        transport = MemoryTransport()
 
260
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
261
 
 
262
    def test_append_and_get(self):
 
263
        transport = MemoryTransport()
 
264
        transport.append_bytes('path', 'content')
 
265
        self.assertEqual(transport.get('path').read(), 'content')
 
266
        transport.append_file('path', StringIO('content'))
 
267
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
268
 
 
269
    def test_put_and_get(self):
 
270
        transport = MemoryTransport()
 
271
        transport.put_file('path', StringIO('content'))
 
272
        self.assertEqual(transport.get('path').read(), 'content')
 
273
        transport.put_bytes('path', 'content')
 
274
        self.assertEqual(transport.get('path').read(), 'content')
 
275
 
 
276
    def test_append_without_dir_fails(self):
 
277
        transport = MemoryTransport()
 
278
        self.assertRaises(NoSuchFile,
 
279
                          transport.append_bytes, 'dir/path', 'content')
 
280
 
 
281
    def test_put_without_dir_fails(self):
 
282
        transport = MemoryTransport()
 
283
        self.assertRaises(NoSuchFile,
 
284
                          transport.put_file, 'dir/path', StringIO('content'))
 
285
 
 
286
    def test_get_missing(self):
 
287
        transport = MemoryTransport()
 
288
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
289
 
 
290
    def test_has_missing(self):
 
291
        transport = MemoryTransport()
 
292
        self.assertEquals(False, transport.has('foo'))
 
293
 
 
294
    def test_has_present(self):
 
295
        transport = MemoryTransport()
 
296
        transport.append_bytes('foo', 'content')
 
297
        self.assertEquals(True, transport.has('foo'))
 
298
 
 
299
    def test_list_dir(self):
 
300
        transport = MemoryTransport()
 
301
        transport.put_bytes('foo', 'content')
 
302
        transport.mkdir('dir')
 
303
        transport.put_bytes('dir/subfoo', 'content')
 
304
        transport.put_bytes('dirlike', 'content')
 
305
 
 
306
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
307
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
308
 
 
309
    def test_mkdir(self):
 
310
        transport = MemoryTransport()
 
311
        transport.mkdir('dir')
 
312
        transport.append_bytes('dir/path', 'content')
 
313
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
314
 
 
315
    def test_mkdir_missing_parent(self):
 
316
        transport = MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.mkdir, 'dir/dir')
 
319
 
 
320
    def test_mkdir_twice(self):
 
321
        transport = MemoryTransport()
 
322
        transport.mkdir('dir')
 
323
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
324
 
 
325
    def test_parameters(self):
 
326
        transport = MemoryTransport()
 
327
        self.assertEqual(True, transport.listable())
 
328
        self.assertEqual(False, transport.is_readonly())
 
329
 
 
330
    def test_iter_files_recursive(self):
 
331
        transport = MemoryTransport()
 
332
        transport.mkdir('dir')
 
333
        transport.put_bytes('dir/foo', 'content')
 
334
        transport.put_bytes('dir/bar', 'content')
 
335
        transport.put_bytes('bar', 'content')
 
336
        paths = set(transport.iter_files_recursive())
 
337
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
338
 
 
339
    def test_stat(self):
 
340
        transport = MemoryTransport()
 
341
        transport.put_bytes('foo', 'content')
 
342
        transport.put_bytes('bar', 'phowar')
 
343
        self.assertEqual(7, transport.stat('foo').st_size)
 
344
        self.assertEqual(6, transport.stat('bar').st_size)
 
345
 
 
346
 
 
347
class ChrootDecoratorTransportTest(TestCase):
 
348
    """Chroot decoration specific tests."""
 
349
 
 
350
    def test_abspath(self):
 
351
        # The abspath is always relative to the chroot_url.
 
352
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
353
        server.setUp()
 
354
        transport = get_transport(server.get_url())
 
355
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
356
 
 
357
        subdir_transport = transport.clone('subdir')
 
358
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
359
        server.tearDown()
 
360
 
 
361
    def test_clone(self):
 
362
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
363
        server.setUp()
 
364
        transport = get_transport(server.get_url())
 
365
        # relpath from root and root path are the same
 
366
        relpath_cloned = transport.clone('foo')
 
367
        abspath_cloned = transport.clone('/foo')
 
368
        self.assertEqual(server, relpath_cloned.server)
 
369
        self.assertEqual(server, abspath_cloned.server)
 
370
        server.tearDown()
 
371
    
 
372
    def test_chroot_url_preserves_chroot(self):
 
373
        """Calling get_transport on a chroot transport's base should produce a
 
374
        transport with exactly the same behaviour as the original chroot
 
375
        transport.
 
376
 
 
377
        This is so that it is not possible to escape a chroot by doing::
 
378
            url = chroot_transport.base
 
379
            parent_url = urlutils.join(url, '..')
 
380
            new_transport = get_transport(parent_url)
 
381
        """
 
382
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
383
        server.setUp()
 
384
        transport = get_transport(server.get_url())
 
385
        new_transport = get_transport(transport.base)
 
386
        self.assertEqual(transport.server, new_transport.server)
 
387
        self.assertEqual(transport.base, new_transport.base)
 
388
        server.tearDown()
 
389
        
 
390
    def test_urljoin_preserves_chroot(self):
 
391
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
392
        URL that escapes the intended chroot.
 
393
 
 
394
        This is so that it is not possible to escape a chroot by doing::
 
395
            url = chroot_transport.base
 
396
            parent_url = urlutils.join(url, '..')
 
397
            new_transport = get_transport(parent_url)
 
398
        """
 
399
        server = ChrootServer(get_transport('memory:///path/'))
 
400
        server.setUp()
 
401
        transport = get_transport(server.get_url())
 
402
        self.assertRaises(
 
403
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
404
        server.tearDown()
 
405
 
 
406
 
 
407
class ChrootServerTest(TestCase):
 
408
 
 
409
    def test_construct(self):
 
410
        backing_transport = MemoryTransport()
 
411
        server = ChrootServer(backing_transport)
 
412
        self.assertEqual(backing_transport, server.backing_transport)
 
413
 
 
414
    def test_setUp(self):
 
415
        backing_transport = MemoryTransport()
 
416
        server = ChrootServer(backing_transport)
 
417
        server.setUp()
 
418
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
419
 
 
420
    def test_tearDown(self):
 
421
        backing_transport = MemoryTransport()
 
422
        server = ChrootServer(backing_transport)
 
423
        server.setUp()
 
424
        server.tearDown()
 
425
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
426
 
 
427
    def test_get_url(self):
 
428
        backing_transport = MemoryTransport()
 
429
        server = ChrootServer(backing_transport)
 
430
        server.setUp()
 
431
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
432
        server.tearDown()
 
433
 
 
434
 
 
435
class ReadonlyDecoratorTransportTest(TestCase):
 
436
    """Readonly decoration specific tests."""
 
437
 
 
438
    def test_local_parameters(self):
 
439
        import bzrlib.transport.readonly as readonly
 
440
        # connect to . in readonly mode
 
441
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
442
        self.assertEqual(True, transport.listable())
 
443
        self.assertEqual(True, transport.is_readonly())
 
444
 
 
445
    def test_http_parameters(self):
 
446
        from bzrlib.tests.http_server import HttpServer
 
447
        import bzrlib.transport.readonly as readonly
 
448
        # connect to '.' via http which is not listable
 
449
        server = HttpServer()
 
450
        server.setUp()
 
451
        try:
 
452
            transport = get_transport('readonly+' + server.get_url())
 
453
            self.failUnless(isinstance(transport,
 
454
                                       readonly.ReadonlyTransportDecorator))
 
455
            self.assertEqual(False, transport.listable())
 
456
            self.assertEqual(True, transport.is_readonly())
 
457
        finally:
 
458
            server.tearDown()
 
459
 
 
460
 
 
461
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
462
    """NFS decorator specific tests."""
 
463
 
 
464
    def get_nfs_transport(self, url):
 
465
        import bzrlib.transport.fakenfs as fakenfs
 
466
        # connect to url with nfs decoration
 
467
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
468
 
 
469
    def test_local_parameters(self):
 
470
        # the listable and is_readonly parameters
 
471
        # are not changed by the fakenfs decorator
 
472
        transport = self.get_nfs_transport('.')
 
473
        self.assertEqual(True, transport.listable())
 
474
        self.assertEqual(False, transport.is_readonly())
 
475
 
 
476
    def test_http_parameters(self):
 
477
        # the listable and is_readonly parameters
 
478
        # are not changed by the fakenfs decorator
 
479
        from bzrlib.tests.http_server import HttpServer
 
480
        # connect to '.' via http which is not listable
 
481
        server = HttpServer()
 
482
        server.setUp()
 
483
        try:
 
484
            transport = self.get_nfs_transport(server.get_url())
 
485
            self.assertIsInstance(
 
486
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
487
            self.assertEqual(False, transport.listable())
 
488
            self.assertEqual(True, transport.is_readonly())
 
489
        finally:
 
490
            server.tearDown()
 
491
 
 
492
    def test_fakenfs_server_default(self):
 
493
        # a FakeNFSServer() should bring up a local relpath server for itself
 
494
        import bzrlib.transport.fakenfs as fakenfs
 
495
        server = fakenfs.FakeNFSServer()
 
496
        server.setUp()
 
497
        try:
 
498
            # the url should be decorated appropriately
 
499
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
500
            # and we should be able to get a transport for it
 
501
            transport = get_transport(server.get_url())
 
502
            # which must be a FakeNFSTransportDecorator instance.
 
503
            self.assertIsInstance(
 
504
                transport, fakenfs.FakeNFSTransportDecorator)
 
505
        finally:
 
506
            server.tearDown()
 
507
 
 
508
    def test_fakenfs_rename_semantics(self):
 
509
        # a FakeNFS transport must mangle the way rename errors occur to
 
510
        # look like NFS problems.
 
511
        transport = self.get_nfs_transport('.')
 
512
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
513
                        transport=transport)
 
514
        self.assertRaises(errors.ResourceBusy,
 
515
                          transport.rename, 'from', 'to')
 
516
 
 
517
 
 
518
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
519
    """Tests for simulation of VFAT restrictions"""
 
520
 
 
521
    def get_vfat_transport(self, url):
 
522
        """Return vfat-backed transport for test directory"""
 
523
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
524
        return FakeVFATTransportDecorator('vfat+' + url)
 
525
 
 
526
    def test_transport_creation(self):
 
527
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
528
        transport = self.get_vfat_transport('.')
 
529
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
530
 
 
531
    def test_transport_mkdir(self):
 
532
        transport = self.get_vfat_transport('.')
 
533
        transport.mkdir('HELLO')
 
534
        self.assertTrue(transport.has('hello'))
 
535
        self.assertTrue(transport.has('Hello'))
 
536
 
 
537
    def test_forbidden_chars(self):
 
538
        transport = self.get_vfat_transport('.')
 
539
        self.assertRaises(ValueError, transport.has, "<NU>")
 
540
 
 
541
 
 
542
class BadTransportHandler(Transport):
 
543
    def __init__(self, base_url):
 
544
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
545
 
 
546
 
 
547
class BackupTransportHandler(Transport):
 
548
    """Test transport that works as a backup for the BadTransportHandler"""
 
549
    pass
 
550
 
 
551
 
 
552
class TestTransportImplementation(TestCaseInTempDir):
 
553
    """Implementation verification for transports.
 
554
    
 
555
    To verify a transport we need a server factory, which is a callable
 
556
    that accepts no parameters and returns an implementation of
 
557
    bzrlib.transport.Server.
 
558
    
 
559
    That Server is then used to construct transport instances and test
 
560
    the transport via loopback activity.
 
561
 
 
562
    Currently this assumes that the Transport object is connected to the 
 
563
    current working directory.  So that whatever is done 
 
564
    through the transport, should show up in the working 
 
565
    directory, and vice-versa. This is a bug, because its possible to have
 
566
    URL schemes which provide access to something that may not be 
 
567
    result in storage on the local disk, i.e. due to file system limits, or 
 
568
    due to it being a database or some other non-filesystem tool.
 
569
 
 
570
    This also tests to make sure that the functions work with both
 
571
    generators and lists (assuming iter(list) is effectively a generator)
 
572
    """
 
573
    
 
574
    def setUp(self):
 
575
        super(TestTransportImplementation, self).setUp()
 
576
        self._server = self.transport_server()
 
577
        self._server.setUp()
 
578
        self.addCleanup(self._server.tearDown)
 
579
 
 
580
    def get_transport(self, relpath=None):
 
581
        """Return a connected transport to the local directory.
 
582
 
 
583
        :param relpath: a path relative to the base url.
 
584
        """
 
585
        base_url = self._server.get_url()
 
586
        url = self._adjust_url(base_url, relpath)
 
587
        # try getting the transport via the regular interface:
 
588
        t = get_transport(url)
 
589
        # vila--20070607 if the following are commented out the test suite
 
590
        # still pass. Is this really still needed or was it a forgotten
 
591
        # temporary fix ?
 
592
        if not isinstance(t, self.transport_class):
 
593
            # we did not get the correct transport class type. Override the
 
594
            # regular connection behaviour by direct construction.
 
595
            t = self.transport_class(url)
 
596
        return t
 
597
 
 
598
 
 
599
class TestLocalTransports(TestCase):
 
600
 
 
601
    def test_get_transport_from_abspath(self):
 
602
        here = osutils.abspath('.')
 
603
        t = get_transport(here)
 
604
        self.assertIsInstance(t, LocalTransport)
 
605
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
606
 
 
607
    def test_get_transport_from_relpath(self):
 
608
        here = osutils.abspath('.')
 
609
        t = get_transport('.')
 
610
        self.assertIsInstance(t, LocalTransport)
 
611
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
612
 
 
613
    def test_get_transport_from_local_url(self):
 
614
        here = osutils.abspath('.')
 
615
        here_url = urlutils.local_path_to_url(here) + '/'
 
616
        t = get_transport(here_url)
 
617
        self.assertIsInstance(t, LocalTransport)
 
618
        self.assertEquals(t.base, here_url)
 
619
 
 
620
    def test_local_abspath(self):
 
621
        here = osutils.abspath('.')
 
622
        t = get_transport(here)
 
623
        self.assertEquals(t.local_abspath(''), here)
 
624
 
 
625
 
 
626
class TestWin32LocalTransport(TestCase):
 
627
 
 
628
    def test_unc_clone_to_root(self):
 
629
        # Win32 UNC path like \\HOST\path
 
630
        # clone to root should stop at least at \\HOST part
 
631
        # not on \\
 
632
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
633
        for i in xrange(4):
 
634
            t = t.clone('..')
 
635
        self.assertEquals(t.base, 'file://HOST/')
 
636
        # make sure we reach the root
 
637
        t = t.clone('..')
 
638
        self.assertEquals(t.base, 'file://HOST/')
 
639
 
 
640
 
 
641
class TestConnectedTransport(TestCase):
 
642
    """Tests for connected to remote server transports"""
 
643
 
 
644
    def test_parse_url(self):
 
645
        t = ConnectedTransport('http://simple.example.com/home/source')
 
646
        self.assertEquals(t._host, 'simple.example.com')
 
647
        self.assertEquals(t._port, None)
 
648
        self.assertEquals(t._path, '/home/source/')
 
649
        self.failUnless(t._user is None)
 
650
        self.failUnless(t._password is None)
 
651
 
 
652
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
653
 
 
654
    def test_parse_url_with_at_in_user(self):
 
655
        # Bug 228058
 
656
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
657
        self.assertEquals(t._user, 'user@host.com')
 
658
 
 
659
    def test_parse_quoted_url(self):
 
660
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
661
        self.assertEquals(t._host, 'exAmple.com')
 
662
        self.assertEquals(t._port, 2222)
 
663
        self.assertEquals(t._user, 'robey')
 
664
        self.assertEquals(t._password, 'h@t')
 
665
        self.assertEquals(t._path, '/path/')
 
666
 
 
667
        # Base should not keep track of the password
 
668
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
669
 
 
670
    def test_parse_invalid_url(self):
 
671
        self.assertRaises(errors.InvalidURL,
 
672
                          ConnectedTransport,
 
673
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
674
 
 
675
    def test_relpath(self):
 
676
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
677
 
 
678
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
679
        self.assertRaises(errors.PathNotChild, t.relpath,
 
680
                          'http://user@host.com/abs/path/sub')
 
681
        self.assertRaises(errors.PathNotChild, t.relpath,
 
682
                          'sftp://user2@host.com/abs/path/sub')
 
683
        self.assertRaises(errors.PathNotChild, t.relpath,
 
684
                          'sftp://user@otherhost.com/abs/path/sub')
 
685
        self.assertRaises(errors.PathNotChild, t.relpath,
 
686
                          'sftp://user@host.com:33/abs/path/sub')
 
687
        # Make sure it works when we don't supply a username
 
688
        t = ConnectedTransport('sftp://host.com/abs/path')
 
689
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
690
 
 
691
        # Make sure it works when parts of the path will be url encoded
 
692
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
693
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
694
 
 
695
    def test_connection_sharing_propagate_credentials(self):
 
696
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
697
        self.assertEquals('user', t._user)
 
698
        self.assertEquals('host.com', t._host)
 
699
        self.assertIs(None, t._get_connection())
 
700
        self.assertIs(None, t._password)
 
701
        c = t.clone('subdir')
 
702
        self.assertIs(None, c._get_connection())
 
703
        self.assertIs(None, t._password)
 
704
 
 
705
        # Simulate the user entering a password
 
706
        password = 'secret'
 
707
        connection = object()
 
708
        t._set_connection(connection, password)
 
709
        self.assertIs(connection, t._get_connection())
 
710
        self.assertIs(password, t._get_credentials())
 
711
        self.assertIs(connection, c._get_connection())
 
712
        self.assertIs(password, c._get_credentials())
 
713
 
 
714
        # credentials can be updated
 
715
        new_password = 'even more secret'
 
716
        c._update_credentials(new_password)
 
717
        self.assertIs(connection, t._get_connection())
 
718
        self.assertIs(new_password, t._get_credentials())
 
719
        self.assertIs(connection, c._get_connection())
 
720
        self.assertIs(new_password, c._get_credentials())
 
721
 
 
722
 
 
723
class TestReusedTransports(TestCase):
 
724
    """Tests for transport reuse"""
 
725
 
 
726
    def test_reuse_same_transport(self):
 
727
        possible_transports = []
 
728
        t1 = get_transport('http://foo/',
 
729
                           possible_transports=possible_transports)
 
730
        self.assertEqual([t1], possible_transports)
 
731
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
732
        self.assertIs(t1, t2)
 
733
 
 
734
        # Also check that final '/' are handled correctly
 
735
        t3 = get_transport('http://foo/path/')
 
736
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
737
        self.assertIs(t3, t4)
 
738
 
 
739
        t5 = get_transport('http://foo/path')
 
740
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
741
        self.assertIs(t5, t6)
 
742
 
 
743
    def test_don_t_reuse_different_transport(self):
 
744
        t1 = get_transport('http://foo/path')
 
745
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
746
        self.assertIsNot(t1, t2)
 
747
 
 
748
 
 
749
class TestTransportTrace(TestCase):
 
750
 
 
751
    def test_get(self):
 
752
        transport = get_transport('trace+memory://')
 
753
        self.assertIsInstance(
 
754
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
755
 
 
756
    def test_clone_preserves_activity(self):
 
757
        transport = get_transport('trace+memory://')
 
758
        transport2 = transport.clone('.')
 
759
        self.assertTrue(transport is not transport2)
 
760
        self.assertTrue(transport._activity is transport2._activity)
 
761
 
 
762
    # the following specific tests are for the operations that have made use of
 
763
    # logging in tests; we could test every single operation but doing that
 
764
    # still won't cause a test failure when the top level Transport API
 
765
    # changes; so there is little return doing that.
 
766
    def test_get(self):
 
767
        transport = get_transport('trace+memory:///')
 
768
        transport.put_bytes('foo', 'barish')
 
769
        transport.get('foo')
 
770
        expected_result = []
 
771
        # put_bytes records the bytes, not the content to avoid memory
 
772
        # pressure.
 
773
        expected_result.append(('put_bytes', 'foo', 6, None))
 
774
        # get records the file name only.
 
775
        expected_result.append(('get', 'foo'))
 
776
        self.assertEqual(expected_result, transport._activity)
 
777
 
 
778
    def test_readv(self):
 
779
        transport = get_transport('trace+memory:///')
 
780
        transport.put_bytes('foo', 'barish')
 
781
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
782
            upper_limit=6))
 
783
        expected_result = []
 
784
        # put_bytes records the bytes, not the content to avoid memory
 
785
        # pressure.
 
786
        expected_result.append(('put_bytes', 'foo', 6, None))
 
787
        # readv records the supplied offset request
 
788
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
789
        self.assertEqual(expected_result, transport._activity)