~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2009-10-02 20:32:50 UTC
  • mto: (4679.6.1 2.1-export-c-api)
  • mto: This revision was merged to the branch mainline in revision 4735.
  • Revision ID: john@arbash-meinel.com-20091002203250-q6iv6o2mwjqp4g53
Add __iter__ support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
import os
19
 
import sys
20
 
import stat
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
21
18
from cStringIO import StringIO
22
19
 
23
20
import bzrlib
24
 
from bzrlib.errors import (NoSuchFile, FileExists,
25
 
                           TransportNotPossible,
26
 
                           ConnectionError,
27
 
                           DependencyNotPresent,
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
28
32
                           UnsupportedProtocol,
29
33
                           )
30
34
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.transport import (_CoalescedOffset,
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
32
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
33
40
                              _get_transport_modules,
34
41
                              get_transport,
 
42
                              LateReadError,
35
43
                              register_lazy_transport,
36
 
                              _set_protocol_handlers,
 
44
                              register_transport_proto,
37
45
                              Transport,
38
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
39
48
from bzrlib.transport.memory import MemoryTransport
40
 
from bzrlib.transport.local import LocalTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
41
54
 
42
55
 
43
56
class TestTransport(TestCase):
45
58
 
46
59
    def test__get_set_protocol_handlers(self):
47
60
        handlers = _get_protocol_handlers()
48
 
        self.assertNotEqual({}, handlers)
 
61
        self.assertNotEqual([], handlers.keys( ))
49
62
        try:
50
 
            _set_protocol_handlers({})
51
 
            self.assertEqual({}, _get_protocol_handlers())
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
52
65
        finally:
53
66
            _set_protocol_handlers(handlers)
54
67
 
55
68
    def test_get_transport_modules(self):
56
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
57
72
        class SampleHandler(object):
58
73
            """I exist, isnt that enough?"""
59
74
        try:
60
 
            my_handlers = {}
61
 
            _set_protocol_handlers(my_handlers)
62
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
63
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
64
 
            self.assertEqual([SampleHandler.__module__],
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
65
84
                             _get_transport_modules())
66
85
        finally:
67
86
            _set_protocol_handlers(handlers)
69
88
    def test_transport_dependency(self):
70
89
        """Transport with missing dependency causes no error"""
71
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
72
93
        try:
 
94
            register_transport_proto('foo')
73
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
74
96
                    'BadTransportHandler')
75
97
            try:
85
107
        finally:
86
108
            # restore original values
87
109
            _set_protocol_handlers(saved_handlers)
88
 
            
 
110
 
89
111
    def test_transport_fallback(self):
90
112
        """Transport with missing dependency causes no error"""
91
113
        saved_handlers = _get_protocol_handlers()
92
114
        try:
93
 
            _set_protocol_handlers({})
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
94
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
95
118
                    'BackupTransportHandler')
96
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
100
123
        finally:
101
124
            _set_protocol_handlers(saved_handlers)
102
125
 
 
126
    def test_ssh_hints(self):
 
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
128
        try:
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
 
131
            e_str = str(e)
 
132
            self.assertEquals('Unsupported protocol'
 
133
                              ' for url "ssh://fooserver/foo":'
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
135
                              str(e))
 
136
        else:
 
137
            self.fail('Did not raise UnsupportedProtocol')
 
138
 
 
139
    def test_LateReadError(self):
 
140
        """The LateReadError helper should raise on read()."""
 
141
        a_file = LateReadError('a path')
 
142
        try:
 
143
            a_file.read()
 
144
        except ReadError, error:
 
145
            self.assertEqual('a path', error.path)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
 
147
        a_file.close()
 
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
 
160
    def test_local_abspath_non_local_transport(self):
 
161
        # the base implementation should throw
 
162
        t = MemoryTransport()
 
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
164
        self.assertEqual('memory:///t is not a local path.', str(e))
 
165
 
103
166
 
104
167
class TestCoalesceOffsets(TestCase):
105
 
    
106
 
    def check(self, expected, offsets, limit=0, fudge=0):
 
168
 
 
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
107
170
        coalesce = Transport._coalesce_offsets
108
171
        exp = [_CoalescedOffset(*x) for x in expected]
109
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
173
                            max_size=max_size))
110
174
        self.assertEqual(exp, out)
111
175
 
112
176
    def test_coalesce_empty(self):
119
183
        self.check([(0, 10, [(0, 10)]),
120
184
                    (20, 10, [(0, 10)]),
121
185
                   ], [(0, 10), (20, 10)])
122
 
            
 
186
 
123
187
    def test_coalesce_unsorted(self):
124
188
        self.check([(20, 10, [(0, 10)]),
125
189
                    (0, 10, [(0, 10)]),
130
194
                   [(0, 10), (10, 10)])
131
195
 
132
196
    def test_coalesce_overlapped(self):
133
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
134
 
                   [(0, 10), (5, 10)])
 
197
        self.assertRaises(ValueError,
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
135
200
 
136
201
    def test_coalesce_limit(self):
137
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
158
223
                   ], [(10, 10), (30, 10), (100, 10)],
159
224
                   fudge=10
160
225
                  )
 
226
    def test_coalesce_max_size(self):
 
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
228
                    (30, 50, [(0, 50)]),
 
229
                    # If one range is above max_size, it gets its own coalesced
 
230
                    # offset
 
231
                    (100, 80, [(0, 80),]),],
 
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
233
                   max_size=50
 
234
                  )
 
235
 
 
236
    def test_coalesce_no_max_size(self):
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
239
                  )
 
240
 
 
241
    def test_coalesce_default_limit(self):
 
242
        # By default we use a 100MB max size.
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
161
250
 
162
251
 
163
252
class TestMemoryTransport(TestCase):
168
257
    def test_clone(self):
169
258
        transport = MemoryTransport()
170
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
171
261
 
172
262
    def test_abspath(self):
173
263
        transport = MemoryTransport()
174
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
175
265
 
176
 
    def test_relpath(self):
177
 
        transport = MemoryTransport()
 
266
    def test_abspath_of_root(self):
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
 
270
 
 
271
    def test_abspath_of_relpath_starting_at_root(self):
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
178
274
 
179
275
    def test_append_and_get(self):
180
276
        transport = MemoryTransport()
181
 
        transport.append('path', StringIO('content'))
 
277
        transport.append_bytes('path', 'content')
182
278
        self.assertEqual(transport.get('path').read(), 'content')
183
 
        transport.append('path', StringIO('content'))
 
279
        transport.append_file('path', StringIO('content'))
184
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
185
281
 
186
282
    def test_put_and_get(self):
187
283
        transport = MemoryTransport()
188
 
        transport.put('path', StringIO('content'))
 
284
        transport.put_file('path', StringIO('content'))
189
285
        self.assertEqual(transport.get('path').read(), 'content')
190
 
        transport.put('path', StringIO('content'))
 
286
        transport.put_bytes('path', 'content')
191
287
        self.assertEqual(transport.get('path').read(), 'content')
192
288
 
193
289
    def test_append_without_dir_fails(self):
194
290
        transport = MemoryTransport()
195
291
        self.assertRaises(NoSuchFile,
196
 
                          transport.append, 'dir/path', StringIO('content'))
 
292
                          transport.append_bytes, 'dir/path', 'content')
197
293
 
198
294
    def test_put_without_dir_fails(self):
199
295
        transport = MemoryTransport()
200
296
        self.assertRaises(NoSuchFile,
201
 
                          transport.put, 'dir/path', StringIO('content'))
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
202
298
 
203
299
    def test_get_missing(self):
204
300
        transport = MemoryTransport()
210
306
 
211
307
    def test_has_present(self):
212
308
        transport = MemoryTransport()
213
 
        transport.append('foo', StringIO('content'))
 
309
        transport.append_bytes('foo', 'content')
214
310
        self.assertEquals(True, transport.has('foo'))
215
311
 
 
312
    def test_list_dir(self):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
 
318
 
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
321
 
216
322
    def test_mkdir(self):
217
323
        transport = MemoryTransport()
218
324
        transport.mkdir('dir')
219
 
        transport.append('dir/path', StringIO('content'))
 
325
        transport.append_bytes('dir/path', 'content')
220
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
221
327
 
222
328
    def test_mkdir_missing_parent(self):
232
338
    def test_parameters(self):
233
339
        transport = MemoryTransport()
234
340
        self.assertEqual(True, transport.listable())
235
 
        self.assertEqual(False, transport.should_cache())
236
341
        self.assertEqual(False, transport.is_readonly())
237
342
 
238
343
    def test_iter_files_recursive(self):
239
344
        transport = MemoryTransport()
240
345
        transport.mkdir('dir')
241
 
        transport.put('dir/foo', StringIO('content'))
242
 
        transport.put('dir/bar', StringIO('content'))
243
 
        transport.put('bar', StringIO('content'))
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
244
349
        paths = set(transport.iter_files_recursive())
245
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
246
351
 
247
352
    def test_stat(self):
248
353
        transport = MemoryTransport()
249
 
        transport.put('foo', StringIO('content'))
250
 
        transport.put('bar', StringIO('phowar'))
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
251
356
        self.assertEqual(7, transport.stat('foo').st_size)
252
357
        self.assertEqual(6, transport.stat('bar').st_size)
253
358
 
254
 
        
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
 
361
    """Chroot decoration specific tests."""
 
362
 
 
363
    def test_abspath(self):
 
364
        # The abspath is always relative to the chroot_url.
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        self.start_server(server)
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
369
 
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
 
 
373
    def test_clone(self):
 
374
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
375
        self.start_server(server)
 
376
        transport = get_transport(server.get_url())
 
377
        # relpath from root and root path are the same
 
378
        relpath_cloned = transport.clone('foo')
 
379
        abspath_cloned = transport.clone('/foo')
 
380
        self.assertEqual(server, relpath_cloned.server)
 
381
        self.assertEqual(server, abspath_cloned.server)
 
382
 
 
383
    def test_chroot_url_preserves_chroot(self):
 
384
        """Calling get_transport on a chroot transport's base should produce a
 
385
        transport with exactly the same behaviour as the original chroot
 
386
        transport.
 
387
 
 
388
        This is so that it is not possible to escape a chroot by doing::
 
389
            url = chroot_transport.base
 
390
            parent_url = urlutils.join(url, '..')
 
391
            new_transport = get_transport(parent_url)
 
392
        """
 
393
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
394
        self.start_server(server)
 
395
        transport = get_transport(server.get_url())
 
396
        new_transport = get_transport(transport.base)
 
397
        self.assertEqual(transport.server, new_transport.server)
 
398
        self.assertEqual(transport.base, new_transport.base)
 
399
 
 
400
    def test_urljoin_preserves_chroot(self):
 
401
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
402
        URL that escapes the intended chroot.
 
403
 
 
404
        This is so that it is not possible to escape a chroot by doing::
 
405
            url = chroot_transport.base
 
406
            parent_url = urlutils.join(url, '..')
 
407
            new_transport = get_transport(parent_url)
 
408
        """
 
409
        server = ChrootServer(get_transport('memory:///path/'))
 
410
        self.start_server(server)
 
411
        transport = get_transport(server.get_url())
 
412
        self.assertRaises(
 
413
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
414
 
 
415
 
 
416
class ChrootServerTest(TestCase):
 
417
 
 
418
    def test_construct(self):
 
419
        backing_transport = MemoryTransport()
 
420
        server = ChrootServer(backing_transport)
 
421
        self.assertEqual(backing_transport, server.backing_transport)
 
422
 
 
423
    def test_setUp(self):
 
424
        backing_transport = MemoryTransport()
 
425
        server = ChrootServer(backing_transport)
 
426
        server.setUp()
 
427
        try:
 
428
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
429
        finally:
 
430
            server.tearDown()
 
431
 
 
432
    def test_tearDown(self):
 
433
        backing_transport = MemoryTransport()
 
434
        server = ChrootServer(backing_transport)
 
435
        server.setUp()
 
436
        server.tearDown()
 
437
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
438
 
 
439
    def test_get_url(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        try:
 
444
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
445
        finally:
 
446
            server.tearDown()
 
447
 
 
448
 
255
449
class ReadonlyDecoratorTransportTest(TestCase):
256
450
    """Readonly decoration specific tests."""
257
451
 
260
454
        # connect to . in readonly mode
261
455
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
262
456
        self.assertEqual(True, transport.listable())
263
 
        self.assertEqual(False, transport.should_cache())
264
457
        self.assertEqual(True, transport.is_readonly())
265
458
 
266
459
    def test_http_parameters(self):
 
460
        from bzrlib.tests.http_server import HttpServer
267
461
        import bzrlib.transport.readonly as readonly
268
 
        from bzrlib.transport.http import HttpServer
269
 
        # connect to . via http which is not listable
 
462
        # connect to '.' via http which is not listable
270
463
        server = HttpServer()
271
 
        server.setUp()
272
 
        try:
273
 
            transport = get_transport('readonly+' + server.get_url())
274
 
            self.failUnless(isinstance(transport,
275
 
                                       readonly.ReadonlyTransportDecorator))
276
 
            self.assertEqual(False, transport.listable())
277
 
            self.assertEqual(True, transport.should_cache())
278
 
            self.assertEqual(True, transport.is_readonly())
279
 
        finally:
280
 
            server.tearDown()
 
464
        self.start_server(server)
 
465
        transport = get_transport('readonly+' + server.get_url())
 
466
        self.failUnless(isinstance(transport,
 
467
                                   readonly.ReadonlyTransportDecorator))
 
468
        self.assertEqual(False, transport.listable())
 
469
        self.assertEqual(True, transport.is_readonly())
281
470
 
282
471
 
283
472
class FakeNFSDecoratorTests(TestCaseInTempDir):
289
478
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
290
479
 
291
480
    def test_local_parameters(self):
292
 
        # the listable, should_cache and is_readonly parameters
 
481
        # the listable and is_readonly parameters
293
482
        # are not changed by the fakenfs decorator
294
483
        transport = self.get_nfs_transport('.')
295
484
        self.assertEqual(True, transport.listable())
296
 
        self.assertEqual(False, transport.should_cache())
297
485
        self.assertEqual(False, transport.is_readonly())
298
486
 
299
487
    def test_http_parameters(self):
300
 
        # the listable, should_cache and is_readonly parameters
 
488
        # the listable and is_readonly parameters
301
489
        # are not changed by the fakenfs decorator
302
 
        from bzrlib.transport.http import HttpServer
303
 
        # connect to . via http which is not listable
 
490
        from bzrlib.tests.http_server import HttpServer
 
491
        # connect to '.' via http which is not listable
304
492
        server = HttpServer()
305
 
        server.setUp()
306
 
        try:
307
 
            transport = self.get_nfs_transport(server.get_url())
308
 
            self.assertIsInstance(
309
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
310
 
            self.assertEqual(False, transport.listable())
311
 
            self.assertEqual(True, transport.should_cache())
312
 
            self.assertEqual(True, transport.is_readonly())
313
 
        finally:
314
 
            server.tearDown()
 
493
        self.start_server(server)
 
494
        transport = self.get_nfs_transport(server.get_url())
 
495
        self.assertIsInstance(
 
496
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
497
        self.assertEqual(False, transport.listable())
 
498
        self.assertEqual(True, transport.is_readonly())
315
499
 
316
500
    def test_fakenfs_server_default(self):
317
501
        # a FakeNFSServer() should bring up a local relpath server for itself
318
502
        import bzrlib.transport.fakenfs as fakenfs
319
503
        server = fakenfs.FakeNFSServer()
320
 
        server.setUp()
321
 
        try:
322
 
            # the server should be a relpath localhost server
323
 
            self.assertEqual(server.get_url(), 'fakenfs+.')
324
 
            # and we should be able to get a transport for it
325
 
            transport = get_transport(server.get_url())
326
 
            # which must be a FakeNFSTransportDecorator instance.
327
 
            self.assertIsInstance(
328
 
                transport, fakenfs.FakeNFSTransportDecorator)
329
 
        finally:
330
 
            server.tearDown()
 
504
        self.start_server(server)
 
505
        # the url should be decorated appropriately
 
506
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
507
        # and we should be able to get a transport for it
 
508
        transport = get_transport(server.get_url())
 
509
        # which must be a FakeNFSTransportDecorator instance.
 
510
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
331
511
 
332
512
    def test_fakenfs_rename_semantics(self):
333
513
        # a FakeNFS transport must mangle the way rename errors occur to
335
515
        transport = self.get_nfs_transport('.')
336
516
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
337
517
                        transport=transport)
338
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
518
        self.assertRaises(errors.ResourceBusy,
339
519
                          transport.rename, 'from', 'to')
340
520
 
341
521
 
375
555
 
376
556
class TestTransportImplementation(TestCaseInTempDir):
377
557
    """Implementation verification for transports.
378
 
    
 
558
 
379
559
    To verify a transport we need a server factory, which is a callable
380
560
    that accepts no parameters and returns an implementation of
381
561
    bzrlib.transport.Server.
382
 
    
 
562
 
383
563
    That Server is then used to construct transport instances and test
384
564
    the transport via loopback activity.
385
565
 
386
 
    Currently this assumes that the Transport object is connected to the 
387
 
    current working directory.  So that whatever is done 
388
 
    through the transport, should show up in the working 
 
566
    Currently this assumes that the Transport object is connected to the
 
567
    current working directory.  So that whatever is done
 
568
    through the transport, should show up in the working
389
569
    directory, and vice-versa. This is a bug, because its possible to have
390
 
    URL schemes which provide access to something that may not be 
391
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
570
    URL schemes which provide access to something that may not be
 
571
    result in storage on the local disk, i.e. due to file system limits, or
392
572
    due to it being a database or some other non-filesystem tool.
393
573
 
394
574
    This also tests to make sure that the functions work with both
395
575
    generators and lists (assuming iter(list) is effectively a generator)
396
576
    """
397
 
    
 
577
 
398
578
    def setUp(self):
399
579
        super(TestTransportImplementation, self).setUp()
400
580
        self._server = self.transport_server()
401
 
        self._server.setUp()
402
 
 
403
 
    def tearDown(self):
404
 
        super(TestTransportImplementation, self).tearDown()
405
 
        self._server.tearDown()
406
 
        
407
 
    def get_transport(self):
408
 
        """Return a connected transport to the local directory."""
 
581
        self.start_server(self._server)
 
582
 
 
583
    def get_transport(self, relpath=None):
 
584
        """Return a connected transport to the local directory.
 
585
 
 
586
        :param relpath: a path relative to the base url.
 
587
        """
409
588
        base_url = self._server.get_url()
 
589
        url = self._adjust_url(base_url, relpath)
410
590
        # try getting the transport via the regular interface:
411
 
        t = get_transport(base_url)
412
 
        if not isinstance(t, self.transport_class): 
 
591
        t = get_transport(url)
 
592
        # vila--20070607 if the following are commented out the test suite
 
593
        # still pass. Is this really still needed or was it a forgotten
 
594
        # temporary fix ?
 
595
        if not isinstance(t, self.transport_class):
413
596
            # we did not get the correct transport class type. Override the
414
597
            # regular connection behaviour by direct construction.
415
 
            t = self.transport_class(base_url)
 
598
            t = self.transport_class(url)
416
599
        return t
 
600
 
 
601
 
 
602
class TestLocalTransports(TestCase):
 
603
 
 
604
    def test_get_transport_from_abspath(self):
 
605
        here = osutils.abspath('.')
 
606
        t = get_transport(here)
 
607
        self.assertIsInstance(t, LocalTransport)
 
608
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
609
 
 
610
    def test_get_transport_from_relpath(self):
 
611
        here = osutils.abspath('.')
 
612
        t = get_transport('.')
 
613
        self.assertIsInstance(t, LocalTransport)
 
614
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
615
 
 
616
    def test_get_transport_from_local_url(self):
 
617
        here = osutils.abspath('.')
 
618
        here_url = urlutils.local_path_to_url(here) + '/'
 
619
        t = get_transport(here_url)
 
620
        self.assertIsInstance(t, LocalTransport)
 
621
        self.assertEquals(t.base, here_url)
 
622
 
 
623
    def test_local_abspath(self):
 
624
        here = osutils.abspath('.')
 
625
        t = get_transport(here)
 
626
        self.assertEquals(t.local_abspath(''), here)
 
627
 
 
628
 
 
629
class TestWin32LocalTransport(TestCase):
 
630
 
 
631
    def test_unc_clone_to_root(self):
 
632
        # Win32 UNC path like \\HOST\path
 
633
        # clone to root should stop at least at \\HOST part
 
634
        # not on \\
 
635
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
636
        for i in xrange(4):
 
637
            t = t.clone('..')
 
638
        self.assertEquals(t.base, 'file://HOST/')
 
639
        # make sure we reach the root
 
640
        t = t.clone('..')
 
641
        self.assertEquals(t.base, 'file://HOST/')
 
642
 
 
643
 
 
644
class TestConnectedTransport(TestCase):
 
645
    """Tests for connected to remote server transports"""
 
646
 
 
647
    def test_parse_url(self):
 
648
        t = ConnectedTransport('http://simple.example.com/home/source')
 
649
        self.assertEquals(t._host, 'simple.example.com')
 
650
        self.assertEquals(t._port, None)
 
651
        self.assertEquals(t._path, '/home/source/')
 
652
        self.failUnless(t._user is None)
 
653
        self.failUnless(t._password is None)
 
654
 
 
655
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
656
 
 
657
    def test_parse_url_with_at_in_user(self):
 
658
        # Bug 228058
 
659
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
660
        self.assertEquals(t._user, 'user@host.com')
 
661
 
 
662
    def test_parse_quoted_url(self):
 
663
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
664
        self.assertEquals(t._host, 'exAmple.com')
 
665
        self.assertEquals(t._port, 2222)
 
666
        self.assertEquals(t._user, 'robey')
 
667
        self.assertEquals(t._password, 'h@t')
 
668
        self.assertEquals(t._path, '/path/')
 
669
 
 
670
        # Base should not keep track of the password
 
671
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
672
 
 
673
    def test_parse_invalid_url(self):
 
674
        self.assertRaises(errors.InvalidURL,
 
675
                          ConnectedTransport,
 
676
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
677
 
 
678
    def test_relpath(self):
 
679
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
680
 
 
681
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
682
        self.assertRaises(errors.PathNotChild, t.relpath,
 
683
                          'http://user@host.com/abs/path/sub')
 
684
        self.assertRaises(errors.PathNotChild, t.relpath,
 
685
                          'sftp://user2@host.com/abs/path/sub')
 
686
        self.assertRaises(errors.PathNotChild, t.relpath,
 
687
                          'sftp://user@otherhost.com/abs/path/sub')
 
688
        self.assertRaises(errors.PathNotChild, t.relpath,
 
689
                          'sftp://user@host.com:33/abs/path/sub')
 
690
        # Make sure it works when we don't supply a username
 
691
        t = ConnectedTransport('sftp://host.com/abs/path')
 
692
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
693
 
 
694
        # Make sure it works when parts of the path will be url encoded
 
695
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
696
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
697
 
 
698
    def test_connection_sharing_propagate_credentials(self):
 
699
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
700
        self.assertEquals('user', t._user)
 
701
        self.assertEquals('host.com', t._host)
 
702
        self.assertIs(None, t._get_connection())
 
703
        self.assertIs(None, t._password)
 
704
        c = t.clone('subdir')
 
705
        self.assertIs(None, c._get_connection())
 
706
        self.assertIs(None, t._password)
 
707
 
 
708
        # Simulate the user entering a password
 
709
        password = 'secret'
 
710
        connection = object()
 
711
        t._set_connection(connection, password)
 
712
        self.assertIs(connection, t._get_connection())
 
713
        self.assertIs(password, t._get_credentials())
 
714
        self.assertIs(connection, c._get_connection())
 
715
        self.assertIs(password, c._get_credentials())
 
716
 
 
717
        # credentials can be updated
 
718
        new_password = 'even more secret'
 
719
        c._update_credentials(new_password)
 
720
        self.assertIs(connection, t._get_connection())
 
721
        self.assertIs(new_password, t._get_credentials())
 
722
        self.assertIs(connection, c._get_connection())
 
723
        self.assertIs(new_password, c._get_credentials())
 
724
 
 
725
 
 
726
class TestReusedTransports(TestCase):
 
727
    """Tests for transport reuse"""
 
728
 
 
729
    def test_reuse_same_transport(self):
 
730
        possible_transports = []
 
731
        t1 = get_transport('http://foo/',
 
732
                           possible_transports=possible_transports)
 
733
        self.assertEqual([t1], possible_transports)
 
734
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
735
        self.assertIs(t1, t2)
 
736
 
 
737
        # Also check that final '/' are handled correctly
 
738
        t3 = get_transport('http://foo/path/')
 
739
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
740
        self.assertIs(t3, t4)
 
741
 
 
742
        t5 = get_transport('http://foo/path')
 
743
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
744
        self.assertIs(t5, t6)
 
745
 
 
746
    def test_don_t_reuse_different_transport(self):
 
747
        t1 = get_transport('http://foo/path')
 
748
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
749
        self.assertIsNot(t1, t2)
 
750
 
 
751
 
 
752
class TestTransportTrace(TestCase):
 
753
 
 
754
    def test_get(self):
 
755
        transport = get_transport('trace+memory://')
 
756
        self.assertIsInstance(
 
757
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
758
 
 
759
    def test_clone_preserves_activity(self):
 
760
        transport = get_transport('trace+memory://')
 
761
        transport2 = transport.clone('.')
 
762
        self.assertTrue(transport is not transport2)
 
763
        self.assertTrue(transport._activity is transport2._activity)
 
764
 
 
765
    # the following specific tests are for the operations that have made use of
 
766
    # logging in tests; we could test every single operation but doing that
 
767
    # still won't cause a test failure when the top level Transport API
 
768
    # changes; so there is little return doing that.
 
769
    def test_get(self):
 
770
        transport = get_transport('trace+memory:///')
 
771
        transport.put_bytes('foo', 'barish')
 
772
        transport.get('foo')
 
773
        expected_result = []
 
774
        # put_bytes records the bytes, not the content to avoid memory
 
775
        # pressure.
 
776
        expected_result.append(('put_bytes', 'foo', 6, None))
 
777
        # get records the file name only.
 
778
        expected_result.append(('get', 'foo'))
 
779
        self.assertEqual(expected_result, transport._activity)
 
780
 
 
781
    def test_readv(self):
 
782
        transport = get_transport('trace+memory:///')
 
783
        transport.put_bytes('foo', 'barish')
 
784
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
785
            upper_limit=6))
 
786
        expected_result = []
 
787
        # put_bytes records the bytes, not the content to avoid memory
 
788
        # pressure.
 
789
        expected_result.append(('put_bytes', 'foo', 6, None))
 
790
        # readv records the supplied offset request
 
791
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
792
        self.assertEqual(expected_result, transport._activity)