13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from cStringIO import StringIO
24
24
from bzrlib import (
31
from bzrlib.transport import (
39
from bzrlib.tests import features
28
from bzrlib.errors import (ConnectionError,
40
from bzrlib.tests import TestCase, TestCaseInTempDir
41
from bzrlib.transport import (_CoalescedOffset,
42
_get_protocol_handlers,
43
_set_protocol_handlers,
44
_get_transport_modules,
47
register_lazy_transport,
48
register_transport_proto,
49
_clear_protocol_handlers,
52
from bzrlib.transport.chroot import ChrootServer
53
from bzrlib.transport.memory import MemoryTransport
54
from bzrlib.transport.local import (LocalTransport,
55
EmulatedWin32LocalTransport)
42
58
# TODO: Should possibly split transport-specific tests into their own files.
45
class TestTransport(tests.TestCase):
61
class TestTransport(TestCase):
46
62
"""Test the non transport-concrete class functionality."""
48
# FIXME: These tests should use addCleanup() and/or overrideAttr() instead
49
# of try/finally -- vila 20100205
51
64
def test__get_set_protocol_handlers(self):
52
handlers = transport._get_protocol_handlers()
65
handlers = _get_protocol_handlers()
53
66
self.assertNotEqual([], handlers.keys( ))
55
transport._clear_protocol_handlers()
56
self.assertEqual([], transport._get_protocol_handlers().keys())
68
_clear_protocol_handlers()
69
self.assertEqual([], _get_protocol_handlers().keys())
58
transport._set_protocol_handlers(handlers)
71
_set_protocol_handlers(handlers)
60
73
def test_get_transport_modules(self):
61
handlers = transport._get_protocol_handlers()
62
# don't pollute the current handlers
63
transport._clear_protocol_handlers()
74
handlers = _get_protocol_handlers()
64
75
class SampleHandler(object):
65
76
"""I exist, isnt that enough?"""
67
transport._clear_protocol_handlers()
68
transport.register_transport_proto('foo')
69
transport.register_lazy_transport('foo',
70
'bzrlib.tests.test_transport',
71
'TestTransport.SampleHandler')
72
transport.register_transport_proto('bar')
73
transport.register_lazy_transport('bar',
74
'bzrlib.tests.test_transport',
75
'TestTransport.SampleHandler')
76
self.assertEqual([SampleHandler.__module__,
77
'bzrlib.transport.chroot',
78
'bzrlib.transport.pathfilter'],
79
transport._get_transport_modules())
78
_clear_protocol_handlers()
79
register_transport_proto('foo')
80
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
81
register_transport_proto('bar')
82
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
83
self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
84
_get_transport_modules())
81
transport._set_protocol_handlers(handlers)
86
_set_protocol_handlers(handlers)
83
88
def test_transport_dependency(self):
84
89
"""Transport with missing dependency causes no error"""
85
saved_handlers = transport._get_protocol_handlers()
86
# don't pollute the current handlers
87
transport._clear_protocol_handlers()
90
saved_handlers = _get_protocol_handlers()
89
transport.register_transport_proto('foo')
90
transport.register_lazy_transport(
91
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
92
register_transport_proto('foo')
93
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
94
'BadTransportHandler')
93
transport.get_transport('foo://fooserver/foo')
94
except errors.UnsupportedProtocol, e:
96
get_transport('foo://fooserver/foo')
97
except UnsupportedProtocol, e:
96
99
self.assertEquals('Unsupported protocol'
97
100
' for url "foo://fooserver/foo":'
101
104
self.fail('Did not raise UnsupportedProtocol')
103
106
# restore original values
104
transport._set_protocol_handlers(saved_handlers)
107
_set_protocol_handlers(saved_handlers)
106
109
def test_transport_fallback(self):
107
110
"""Transport with missing dependency causes no error"""
108
saved_handlers = transport._get_protocol_handlers()
111
saved_handlers = _get_protocol_handlers()
110
transport._clear_protocol_handlers()
111
transport.register_transport_proto('foo')
112
transport.register_lazy_transport(
113
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
114
transport.register_lazy_transport(
115
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
116
t = transport.get_transport('foo://fooserver/foo')
113
_clear_protocol_handlers()
114
register_transport_proto('foo')
115
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
116
'BackupTransportHandler')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BadTransportHandler')
119
t = get_transport('foo://fooserver/foo')
117
120
self.assertTrue(isinstance(t, BackupTransportHandler))
119
transport._set_protocol_handlers(saved_handlers)
121
def test_ssh_hints(self):
122
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
124
transport.get_transport('ssh://fooserver/foo')
125
except errors.UnsupportedProtocol, e:
127
self.assertEquals('Unsupported protocol'
128
' for url "ssh://fooserver/foo":'
129
' bzr supports bzr+ssh to operate over ssh,'
130
' use "bzr+ssh://fooserver/foo".',
133
self.fail('Did not raise UnsupportedProtocol')
122
_set_protocol_handlers(saved_handlers)
135
124
def test_LateReadError(self):
136
125
"""The LateReadError helper should raise on read()."""
137
a_file = transport.LateReadError('a path')
126
a_file = LateReadError('a path')
140
except errors.ReadError, error:
129
except ReadError, error:
141
130
self.assertEqual('a path', error.path)
142
self.assertRaises(errors.ReadError, a_file.read, 40)
131
self.assertRaises(ReadError, a_file.read, 40)
145
134
def test__combine_paths(self):
146
t = transport.Transport('/')
147
136
self.assertEqual('/home/sarah/project/foo',
148
137
t._combine_paths('/home/sarah', 'project/foo'))
149
138
self.assertEqual('/etc',
219
206
], [(10, 10), (30, 10), (100, 10)],
222
def test_coalesce_max_size(self):
223
self.check([(10, 20, [(0, 10), (10, 10)]),
225
# If one range is above max_size, it gets its own coalesced
227
(100, 80, [(0, 80),]),],
228
[(10, 10), (20, 10), (30, 50), (100, 80)],
232
def test_coalesce_no_max_size(self):
233
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
234
[(10, 10), (20, 10), (30, 50), (80, 100)],
237
def test_coalesce_default_limit(self):
238
# By default we use a 100MB max size.
239
ten_mb = 10*1024*1024
240
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
241
(10*ten_mb, ten_mb, [(0, ten_mb)])],
242
[(i*ten_mb, ten_mb) for i in range(11)])
243
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
244
[(i*ten_mb, ten_mb) for i in range(11)],
245
max_size=1*1024*1024*1024)
248
class TestMemoryServer(tests.TestCase):
250
def test_create_server(self):
251
server = memory.MemoryServer()
252
server.start_server()
253
url = server.get_url()
254
self.assertTrue(url in transport.transport_list_registry)
255
t = transport.get_transport(url)
258
self.assertFalse(url in transport.transport_list_registry)
259
self.assertRaises(errors.UnsupportedProtocol,
260
transport.get_transport, url)
263
class TestMemoryTransport(tests.TestCase):
211
class TestMemoryTransport(TestCase):
265
213
def test_get_transport(self):
266
memory.MemoryTransport()
268
216
def test_clone(self):
269
t = memory.MemoryTransport()
270
self.assertTrue(isinstance(t, memory.MemoryTransport))
271
self.assertEqual("memory:///", t.clone("/").base)
217
transport = MemoryTransport()
218
self.assertTrue(isinstance(transport, MemoryTransport))
219
self.assertEqual("memory:///", transport.clone("/").base)
273
221
def test_abspath(self):
274
t = memory.MemoryTransport()
275
self.assertEqual("memory:///relpath", t.abspath('relpath'))
222
transport = MemoryTransport()
223
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
277
225
def test_abspath_of_root(self):
278
t = memory.MemoryTransport()
279
self.assertEqual("memory:///", t.base)
280
self.assertEqual("memory:///", t.abspath('/'))
226
transport = MemoryTransport()
227
self.assertEqual("memory:///", transport.base)
228
self.assertEqual("memory:///", transport.abspath('/'))
282
230
def test_abspath_of_relpath_starting_at_root(self):
283
t = memory.MemoryTransport()
284
self.assertEqual("memory:///foo", t.abspath('/foo'))
231
transport = MemoryTransport()
232
self.assertEqual("memory:///foo", transport.abspath('/foo'))
286
234
def test_append_and_get(self):
287
t = memory.MemoryTransport()
288
t.append_bytes('path', 'content')
289
self.assertEqual(t.get('path').read(), 'content')
290
t.append_file('path', StringIO('content'))
291
self.assertEqual(t.get('path').read(), 'contentcontent')
235
transport = MemoryTransport()
236
transport.append_bytes('path', 'content')
237
self.assertEqual(transport.get('path').read(), 'content')
238
transport.append_file('path', StringIO('content'))
239
self.assertEqual(transport.get('path').read(), 'contentcontent')
293
241
def test_put_and_get(self):
294
t = memory.MemoryTransport()
295
t.put_file('path', StringIO('content'))
296
self.assertEqual(t.get('path').read(), 'content')
297
t.put_bytes('path', 'content')
298
self.assertEqual(t.get('path').read(), 'content')
242
transport = MemoryTransport()
243
transport.put_file('path', StringIO('content'))
244
self.assertEqual(transport.get('path').read(), 'content')
245
transport.put_bytes('path', 'content')
246
self.assertEqual(transport.get('path').read(), 'content')
300
248
def test_append_without_dir_fails(self):
301
t = memory.MemoryTransport()
302
self.assertRaises(errors.NoSuchFile,
303
t.append_bytes, 'dir/path', 'content')
249
transport = MemoryTransport()
250
self.assertRaises(NoSuchFile,
251
transport.append_bytes, 'dir/path', 'content')
305
253
def test_put_without_dir_fails(self):
306
t = memory.MemoryTransport()
307
self.assertRaises(errors.NoSuchFile,
308
t.put_file, 'dir/path', StringIO('content'))
254
transport = MemoryTransport()
255
self.assertRaises(NoSuchFile,
256
transport.put_file, 'dir/path', StringIO('content'))
310
258
def test_get_missing(self):
311
transport = memory.MemoryTransport()
312
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
259
transport = MemoryTransport()
260
self.assertRaises(NoSuchFile, transport.get, 'foo')
314
262
def test_has_missing(self):
315
t = memory.MemoryTransport()
316
self.assertEquals(False, t.has('foo'))
263
transport = MemoryTransport()
264
self.assertEquals(False, transport.has('foo'))
318
266
def test_has_present(self):
319
t = memory.MemoryTransport()
320
t.append_bytes('foo', 'content')
321
self.assertEquals(True, t.has('foo'))
267
transport = MemoryTransport()
268
transport.append_bytes('foo', 'content')
269
self.assertEquals(True, transport.has('foo'))
323
271
def test_list_dir(self):
324
t = memory.MemoryTransport()
325
t.put_bytes('foo', 'content')
327
t.put_bytes('dir/subfoo', 'content')
328
t.put_bytes('dirlike', 'content')
272
transport = MemoryTransport()
273
transport.put_bytes('foo', 'content')
274
transport.mkdir('dir')
275
transport.put_bytes('dir/subfoo', 'content')
276
transport.put_bytes('dirlike', 'content')
330
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
331
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
278
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
279
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
333
281
def test_mkdir(self):
334
t = memory.MemoryTransport()
336
t.append_bytes('dir/path', 'content')
337
self.assertEqual(t.get('dir/path').read(), 'content')
282
transport = MemoryTransport()
283
transport.mkdir('dir')
284
transport.append_bytes('dir/path', 'content')
285
self.assertEqual(transport.get('dir/path').read(), 'content')
339
287
def test_mkdir_missing_parent(self):
340
t = memory.MemoryTransport()
341
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
288
transport = MemoryTransport()
289
self.assertRaises(NoSuchFile,
290
transport.mkdir, 'dir/dir')
343
292
def test_mkdir_twice(self):
344
t = memory.MemoryTransport()
346
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
293
transport = MemoryTransport()
294
transport.mkdir('dir')
295
self.assertRaises(FileExists, transport.mkdir, 'dir')
348
297
def test_parameters(self):
349
t = memory.MemoryTransport()
350
self.assertEqual(True, t.listable())
351
self.assertEqual(False, t.is_readonly())
298
transport = MemoryTransport()
299
self.assertEqual(True, transport.listable())
300
self.assertEqual(False, transport.should_cache())
301
self.assertEqual(False, transport.is_readonly())
353
303
def test_iter_files_recursive(self):
354
t = memory.MemoryTransport()
356
t.put_bytes('dir/foo', 'content')
357
t.put_bytes('dir/bar', 'content')
358
t.put_bytes('bar', 'content')
359
paths = set(t.iter_files_recursive())
304
transport = MemoryTransport()
305
transport.mkdir('dir')
306
transport.put_bytes('dir/foo', 'content')
307
transport.put_bytes('dir/bar', 'content')
308
transport.put_bytes('bar', 'content')
309
paths = set(transport.iter_files_recursive())
360
310
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
362
312
def test_stat(self):
363
t = memory.MemoryTransport()
364
t.put_bytes('foo', 'content')
365
t.put_bytes('bar', 'phowar')
366
self.assertEqual(7, t.stat('foo').st_size)
367
self.assertEqual(6, t.stat('bar').st_size)
370
class ChrootDecoratorTransportTest(tests.TestCase):
313
transport = MemoryTransport()
314
transport.put_bytes('foo', 'content')
315
transport.put_bytes('bar', 'phowar')
316
self.assertEqual(7, transport.stat('foo').st_size)
317
self.assertEqual(6, transport.stat('bar').st_size)
320
class ChrootDecoratorTransportTest(TestCase):
371
321
"""Chroot decoration specific tests."""
373
323
def test_abspath(self):
374
324
# The abspath is always relative to the chroot_url.
375
server = chroot.ChrootServer(
376
transport.get_transport('memory:///foo/bar/'))
377
self.start_server(server)
378
t = transport.get_transport(server.get_url())
379
self.assertEqual(server.get_url(), t.abspath('/'))
325
server = ChrootServer(get_transport('memory:///foo/bar/'))
327
transport = get_transport(server.get_url())
328
self.assertEqual(server.get_url(), transport.abspath('/'))
381
subdir_t = t.clone('subdir')
382
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
330
subdir_transport = transport.clone('subdir')
331
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
384
334
def test_clone(self):
385
server = chroot.ChrootServer(
386
transport.get_transport('memory:///foo/bar/'))
387
self.start_server(server)
388
t = transport.get_transport(server.get_url())
335
server = ChrootServer(get_transport('memory:///foo/bar/'))
337
transport = get_transport(server.get_url())
389
338
# relpath from root and root path are the same
390
relpath_cloned = t.clone('foo')
391
abspath_cloned = t.clone('/foo')
339
relpath_cloned = transport.clone('foo')
340
abspath_cloned = transport.clone('/foo')
392
341
self.assertEqual(server, relpath_cloned.server)
393
342
self.assertEqual(server, abspath_cloned.server)
395
345
def test_chroot_url_preserves_chroot(self):
396
346
"""Calling get_transport on a chroot transport's base should produce a
397
347
transport with exactly the same behaviour as the original chroot
417
367
This is so that it is not possible to escape a chroot by doing::
418
368
url = chroot_transport.base
419
369
parent_url = urlutils.join(url, '..')
420
new_t = transport.get_transport(parent_url)
370
new_transport = get_transport(parent_url)
422
server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
423
self.start_server(server)
424
t = transport.get_transport(server.get_url())
372
server = ChrootServer(get_transport('memory:///path/'))
374
transport = get_transport(server.get_url())
425
375
self.assertRaises(
426
errors.InvalidURLJoin, urlutils.join, t.base, '..')
429
class TestChrootServer(tests.TestCase):
376
InvalidURLJoin, urlutils.join, transport.base, '..')
380
class ChrootServerTest(TestCase):
431
382
def test_construct(self):
432
backing_transport = memory.MemoryTransport()
433
server = chroot.ChrootServer(backing_transport)
383
backing_transport = MemoryTransport()
384
server = ChrootServer(backing_transport)
434
385
self.assertEqual(backing_transport, server.backing_transport)
436
387
def test_setUp(self):
437
backing_transport = memory.MemoryTransport()
438
server = chroot.ChrootServer(backing_transport)
439
server.start_server()
441
self.assertTrue(server.scheme
442
in transport._get_protocol_handlers().keys())
388
backing_transport = MemoryTransport()
389
server = ChrootServer(backing_transport)
391
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
446
def test_stop_server(self):
447
backing_transport = memory.MemoryTransport()
448
server = chroot.ChrootServer(backing_transport)
449
server.start_server()
451
self.assertFalse(server.scheme
452
in transport._get_protocol_handlers().keys())
393
def test_tearDown(self):
394
backing_transport = MemoryTransport()
395
server = ChrootServer(backing_transport)
398
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
454
400
def test_get_url(self):
455
backing_transport = memory.MemoryTransport()
456
server = chroot.ChrootServer(backing_transport)
457
server.start_server()
459
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
class PathFilteringDecoratorTransportTest(tests.TestCase):
465
"""Pathfilter decoration specific tests."""
467
def test_abspath(self):
468
# The abspath is always relative to the base of the backing transport.
469
server = pathfilter.PathFilteringServer(
470
transport.get_transport('memory:///foo/bar/'),
472
server.start_server()
473
t = transport.get_transport(server.get_url())
474
self.assertEqual(server.get_url(), t.abspath('/'))
476
subdir_t = t.clone('subdir')
477
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
480
def make_pf_transport(self, filter_func=None):
481
"""Make a PathFilteringTransport backed by a MemoryTransport.
483
:param filter_func: by default this will be a no-op function. Use this
484
parameter to override it."""
485
if filter_func is None:
486
filter_func = lambda x: x
487
server = pathfilter.PathFilteringServer(
488
transport.get_transport('memory:///foo/bar/'), filter_func)
489
server.start_server()
490
self.addCleanup(server.stop_server)
491
return transport.get_transport(server.get_url())
493
def test__filter(self):
494
# _filter (with an identity func as filter_func) always returns
495
# paths relative to the base of the backing transport.
496
t = self.make_pf_transport()
497
self.assertEqual('foo', t._filter('foo'))
498
self.assertEqual('foo/bar', t._filter('foo/bar'))
499
self.assertEqual('', t._filter('..'))
500
self.assertEqual('', t._filter('/'))
501
# The base of the pathfiltering transport is taken into account too.
502
t = t.clone('subdir1/subdir2')
503
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
504
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
505
self.assertEqual('subdir1', t._filter('..'))
506
self.assertEqual('', t._filter('/'))
508
def test_filter_invocation(self):
511
filter_log.append(path)
513
t = self.make_pf_transport(filter)
515
self.assertEqual(['abc'], filter_log)
517
t.clone('abc').has('xyz')
518
self.assertEqual(['abc/xyz'], filter_log)
521
self.assertEqual(['abc'], filter_log)
523
def test_clone(self):
524
t = self.make_pf_transport()
525
# relpath from root and root path are the same
526
relpath_cloned = t.clone('foo')
527
abspath_cloned = t.clone('/foo')
528
self.assertEqual(t.server, relpath_cloned.server)
529
self.assertEqual(t.server, abspath_cloned.server)
531
def test_url_preserves_pathfiltering(self):
532
"""Calling get_transport on a pathfiltered transport's base should
533
produce a transport with exactly the same behaviour as the original
534
pathfiltered transport.
536
This is so that it is not possible to escape (accidentally or
537
otherwise) the filtering by doing::
538
url = filtered_transport.base
539
parent_url = urlutils.join(url, '..')
540
new_t = transport.get_transport(parent_url)
542
t = self.make_pf_transport()
543
new_t = transport.get_transport(t.base)
544
self.assertEqual(t.server, new_t.server)
545
self.assertEqual(t.base, new_t.base)
548
class ReadonlyDecoratorTransportTest(tests.TestCase):
401
backing_transport = MemoryTransport()
402
server = ChrootServer(backing_transport)
404
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
408
class ReadonlyDecoratorTransportTest(TestCase):
549
409
"""Readonly decoration specific tests."""
551
411
def test_local_parameters(self):
412
import bzrlib.transport.readonly as readonly
552
413
# connect to . in readonly mode
553
t = readonly.ReadonlyTransportDecorator('readonly+.')
554
self.assertEqual(True, t.listable())
555
self.assertEqual(True, t.is_readonly())
414
transport = readonly.ReadonlyTransportDecorator('readonly+.')
415
self.assertEqual(True, transport.listable())
416
self.assertEqual(False, transport.should_cache())
417
self.assertEqual(True, transport.is_readonly())
557
419
def test_http_parameters(self):
558
from bzrlib.tests.http_server import HttpServer
559
# connect to '.' via http which is not listable
420
from bzrlib.tests.HttpServer import HttpServer
421
import bzrlib.transport.readonly as readonly
422
# connect to . via http which is not listable
560
423
server = HttpServer()
561
self.start_server(server)
562
t = transport.get_transport('readonly+' + server.get_url())
563
self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
564
self.assertEqual(False, t.listable())
565
self.assertEqual(True, t.is_readonly())
568
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
426
transport = get_transport('readonly+' + server.get_url())
427
self.failUnless(isinstance(transport,
428
readonly.ReadonlyTransportDecorator))
429
self.assertEqual(False, transport.listable())
430
self.assertEqual(True, transport.should_cache())
431
self.assertEqual(True, transport.is_readonly())
436
class FakeNFSDecoratorTests(TestCaseInTempDir):
569
437
"""NFS decorator specific tests."""
571
439
def get_nfs_transport(self, url):
440
import bzrlib.transport.fakenfs as fakenfs
572
441
# connect to url with nfs decoration
573
442
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
575
444
def test_local_parameters(self):
576
# the listable and is_readonly parameters
445
# the listable, should_cache and is_readonly parameters
577
446
# are not changed by the fakenfs decorator
578
t = self.get_nfs_transport('.')
579
self.assertEqual(True, t.listable())
580
self.assertEqual(False, t.is_readonly())
447
transport = self.get_nfs_transport('.')
448
self.assertEqual(True, transport.listable())
449
self.assertEqual(False, transport.should_cache())
450
self.assertEqual(False, transport.is_readonly())
582
452
def test_http_parameters(self):
583
# the listable and is_readonly parameters
453
# the listable, should_cache and is_readonly parameters
584
454
# are not changed by the fakenfs decorator
585
from bzrlib.tests.http_server import HttpServer
586
# connect to '.' via http which is not listable
455
from bzrlib.tests.HttpServer import HttpServer
456
# connect to . via http which is not listable
587
457
server = HttpServer()
588
self.start_server(server)
589
t = self.get_nfs_transport(server.get_url())
590
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
591
self.assertEqual(False, t.listable())
592
self.assertEqual(True, t.is_readonly())
460
transport = self.get_nfs_transport(server.get_url())
461
self.assertIsInstance(
462
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
463
self.assertEqual(False, transport.listable())
464
self.assertEqual(True, transport.should_cache())
465
self.assertEqual(True, transport.is_readonly())
594
469
def test_fakenfs_server_default(self):
595
470
# a FakeNFSServer() should bring up a local relpath server for itself
471
import bzrlib.transport.fakenfs as fakenfs
596
472
server = fakenfs.FakeNFSServer()
597
self.start_server(server)
598
# the url should be decorated appropriately
599
self.assertStartsWith(server.get_url(), 'fakenfs+')
600
# and we should be able to get a transport for it
601
t = transport.get_transport(server.get_url())
602
# which must be a FakeNFSTransportDecorator instance.
603
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
475
# the url should be decorated appropriately
476
self.assertStartsWith(server.get_url(), 'fakenfs+')
477
# and we should be able to get a transport for it
478
transport = get_transport(server.get_url())
479
# which must be a FakeNFSTransportDecorator instance.
480
self.assertIsInstance(
481
transport, fakenfs.FakeNFSTransportDecorator)
605
485
def test_fakenfs_rename_semantics(self):
606
486
# a FakeNFS transport must mangle the way rename errors occur to
607
487
# look like NFS problems.
608
t = self.get_nfs_transport('.')
488
transport = self.get_nfs_transport('.')
609
489
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
611
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
614
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
491
self.assertRaises(errors.ResourceBusy,
492
transport.rename, 'from', 'to')
495
class FakeVFATDecoratorTests(TestCaseInTempDir):
615
496
"""Tests for simulation of VFAT restrictions"""
617
498
def get_vfat_transport(self, url):
622
503
def test_transport_creation(self):
623
504
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
624
t = self.get_vfat_transport('.')
625
self.assertIsInstance(t, FakeVFATTransportDecorator)
505
transport = self.get_vfat_transport('.')
506
self.assertIsInstance(transport, FakeVFATTransportDecorator)
627
508
def test_transport_mkdir(self):
628
t = self.get_vfat_transport('.')
630
self.assertTrue(t.has('hello'))
631
self.assertTrue(t.has('Hello'))
509
transport = self.get_vfat_transport('.')
510
transport.mkdir('HELLO')
511
self.assertTrue(transport.has('hello'))
512
self.assertTrue(transport.has('Hello'))
633
514
def test_forbidden_chars(self):
634
t = self.get_vfat_transport('.')
635
self.assertRaises(ValueError, t.has, "<NU>")
638
class BadTransportHandler(transport.Transport):
515
transport = self.get_vfat_transport('.')
516
self.assertRaises(ValueError, transport.has, "<NU>")
519
class BadTransportHandler(Transport):
639
520
def __init__(self, base_url):
640
raise errors.DependencyNotPresent('some_lib',
641
'testing missing dependency')
644
class BackupTransportHandler(transport.Transport):
521
raise DependencyNotPresent('some_lib', 'testing missing dependency')
524
class BackupTransportHandler(Transport):
645
525
"""Test transport that works as a backup for the BadTransportHandler"""
649
class TestTransportImplementation(tests.TestCaseInTempDir):
529
class TestTransportImplementation(TestCaseInTempDir):
650
530
"""Implementation verification for transports.
652
532
To verify a transport we need a server factory, which is a callable
653
533
that accepts no parameters and returns an implementation of
654
534
bzrlib.transport.Server.
656
536
That Server is then used to construct transport instances and test
657
537
the transport via loopback activity.
659
Currently this assumes that the Transport object is connected to the
660
current working directory. So that whatever is done
661
through the transport, should show up in the working
539
Currently this assumes that the Transport object is connected to the
540
current working directory. So that whatever is done
541
through the transport, should show up in the working
662
542
directory, and vice-versa. This is a bug, because its possible to have
663
URL schemes which provide access to something that may not be
664
result in storage on the local disk, i.e. due to file system limits, or
543
URL schemes which provide access to something that may not be
544
result in storage on the local disk, i.e. due to file system limits, or
665
545
due to it being a database or some other non-filesystem tool.
667
547
This also tests to make sure that the functions work with both
668
548
generators and lists (assuming iter(list) is effectively a generator)
672
552
super(TestTransportImplementation, self).setUp()
673
553
self._server = self.transport_server()
674
self.start_server(self._server)
555
self.addCleanup(self._server.tearDown)
676
557
def get_transport(self, relpath=None):
677
558
"""Return a connected transport to the local directory.
734
612
self.assertEquals(t.base, 'file://HOST/')
737
class TestConnectedTransport(tests.TestCase):
738
"""Tests for connected to remote server transports"""
740
def test_parse_url(self):
741
t = transport.ConnectedTransport(
742
'http://simple.example.com/home/source')
743
self.assertEquals(t._host, 'simple.example.com')
744
self.assertEquals(t._port, None)
745
self.assertEquals(t._path, '/home/source/')
746
self.failUnless(t._user is None)
747
self.failUnless(t._password is None)
749
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
751
def test_parse_url_with_at_in_user(self):
753
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
754
self.assertEquals(t._user, 'user@host.com')
756
def test_parse_quoted_url(self):
757
t = transport.ConnectedTransport(
758
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
759
self.assertEquals(t._host, 'exAmple.com')
760
self.assertEquals(t._port, 2222)
761
self.assertEquals(t._user, 'robey')
762
self.assertEquals(t._password, 'h@t')
763
self.assertEquals(t._path, '/path/')
765
# Base should not keep track of the password
766
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
768
def test_parse_invalid_url(self):
769
self.assertRaises(errors.InvalidURL,
770
transport.ConnectedTransport,
771
'sftp://lily.org:~janneke/public/bzr/gub')
773
def test_relpath(self):
774
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
776
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
777
self.assertRaises(errors.PathNotChild, t.relpath,
778
'http://user@host.com/abs/path/sub')
779
self.assertRaises(errors.PathNotChild, t.relpath,
780
'sftp://user2@host.com/abs/path/sub')
781
self.assertRaises(errors.PathNotChild, t.relpath,
782
'sftp://user@otherhost.com/abs/path/sub')
783
self.assertRaises(errors.PathNotChild, t.relpath,
784
'sftp://user@host.com:33/abs/path/sub')
785
# Make sure it works when we don't supply a username
786
t = transport.ConnectedTransport('sftp://host.com/abs/path')
787
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
789
# Make sure it works when parts of the path will be url encoded
790
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
791
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
793
def test_connection_sharing_propagate_credentials(self):
794
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
795
self.assertEquals('user', t._user)
796
self.assertEquals('host.com', t._host)
797
self.assertIs(None, t._get_connection())
798
self.assertIs(None, t._password)
799
c = t.clone('subdir')
800
self.assertIs(None, c._get_connection())
801
self.assertIs(None, t._password)
803
# Simulate the user entering a password
805
connection = object()
806
t._set_connection(connection, password)
807
self.assertIs(connection, t._get_connection())
808
self.assertIs(password, t._get_credentials())
809
self.assertIs(connection, c._get_connection())
810
self.assertIs(password, c._get_credentials())
812
# credentials can be updated
813
new_password = 'even more secret'
814
c._update_credentials(new_password)
815
self.assertIs(connection, t._get_connection())
816
self.assertIs(new_password, t._get_credentials())
817
self.assertIs(connection, c._get_connection())
818
self.assertIs(new_password, c._get_credentials())
821
class TestReusedTransports(tests.TestCase):
822
"""Tests for transport reuse"""
824
def test_reuse_same_transport(self):
825
possible_transports = []
826
t1 = transport.get_transport('http://foo/',
827
possible_transports=possible_transports)
828
self.assertEqual([t1], possible_transports)
829
t2 = transport.get_transport('http://foo/',
830
possible_transports=[t1])
831
self.assertIs(t1, t2)
833
# Also check that final '/' are handled correctly
834
t3 = transport.get_transport('http://foo/path/')
835
t4 = transport.get_transport('http://foo/path',
836
possible_transports=[t3])
837
self.assertIs(t3, t4)
839
t5 = transport.get_transport('http://foo/path')
840
t6 = transport.get_transport('http://foo/path/',
841
possible_transports=[t5])
842
self.assertIs(t5, t6)
844
def test_don_t_reuse_different_transport(self):
845
t1 = transport.get_transport('http://foo/path')
846
t2 = transport.get_transport('http://bar/path',
847
possible_transports=[t1])
848
self.assertIsNot(t1, t2)
851
class TestTransportTrace(tests.TestCase):
854
t = transport.get_transport('trace+memory://')
855
self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
857
def test_clone_preserves_activity(self):
858
t = transport.get_transport('trace+memory://')
860
self.assertTrue(t is not t2)
861
self.assertTrue(t._activity is t2._activity)
863
# the following specific tests are for the operations that have made use of
864
# logging in tests; we could test every single operation but doing that
865
# still won't cause a test failure when the top level Transport API
866
# changes; so there is little return doing that.
868
t = transport.get_transport('trace+memory:///')
869
t.put_bytes('foo', 'barish')
872
# put_bytes records the bytes, not the content to avoid memory
874
expected_result.append(('put_bytes', 'foo', 6, None))
875
# get records the file name only.
876
expected_result.append(('get', 'foo'))
877
self.assertEqual(expected_result, t._activity)
879
def test_readv(self):
880
t = transport.get_transport('trace+memory:///')
881
t.put_bytes('foo', 'barish')
882
list(t.readv('foo', [(0, 1), (3, 2)],
883
adjust_for_latency=True, upper_limit=6))
885
# put_bytes records the bytes, not the content to avoid memory
887
expected_result.append(('put_bytes', 'foo', 6, None))
888
# readv records the supplied offset request
889
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
890
self.assertEqual(expected_result, t._activity)
893
class TestSSHConnections(tests.TestCaseWithTransport):
895
def test_bzr_connect_to_bzr_ssh(self):
896
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
898
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
900
# This test actually causes a bzr instance to be invoked, which is very
901
# expensive: it should be the only such test in the test suite.
902
# A reasonable evolution for this would be to simply check inside
903
# check_channel_exec_request that the command is appropriate, and then
904
# satisfy requests in-process.
905
self.requireFeature(features.paramiko)
906
# SFTPFullAbsoluteServer has a get_url method, and doesn't
907
# override the interface (doesn't change self._vendor).
908
# Note that this does encryption, so can be slow.
909
from bzrlib.tests import stub_sftp
911
# Start an SSH server
912
self.command_executed = []
913
# XXX: This is horrible -- we define a really dumb SSH server that
914
# executes commands, and manage the hooking up of stdin/out/err to the
915
# SSH channel ourselves. Surely this has already been implemented
918
class StubSSHServer(stub_sftp.StubServer):
922
def check_channel_exec_request(self, channel, command):
923
self.test.command_executed.append(command)
924
proc = subprocess.Popen(
925
command, shell=True, stdin=subprocess.PIPE,
926
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
928
# XXX: horribly inefficient, not to mention ugly.
929
# Start a thread for each of stdin/out/err, and relay bytes from
930
# the subprocess to channel and vice versa.
931
def ferry_bytes(read, write, close):
940
(channel.recv, proc.stdin.write, proc.stdin.close),
941
(proc.stdout.read, channel.sendall, channel.close),
942
(proc.stderr.read, channel.sendall_stderr, channel.close)]
944
for read, write, close in file_functions:
945
t = threading.Thread(
946
target=ferry_bytes, args=(read, write, close))
952
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
953
# We *don't* want to override the default SSH vendor: the detected one
955
self.start_server(ssh_server)
956
port = ssh_server._listener.port
958
if sys.platform == 'win32':
959
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
961
bzr_remote_path = self.get_bzr_path()
962
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
964
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
965
# variable is used to tell bzr what command to run on the remote end.
966
path_to_branch = osutils.abspath('.')
967
if sys.platform == 'win32':
968
# On Windows, we export all drives as '/C:/, etc. So we need to
969
# prefix a '/' to get the right path.
970
path_to_branch = '/' + path_to_branch
971
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
972
t = transport.get_transport(url)
973
self.permit_url(t.base)
977
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
978
self.command_executed)
979
# Make sure to disconnect, so that the remote process can stop, and we
980
# can cleanup. Then pause the test until everything is shutdown
981
t._client._medium.disconnect()
984
# First wait for the subprocess
986
# And the rest are threads
987
for t in started[1:]:
615
def get_test_permutations():
616
"""Return transport permutations to be used in testing.
618
This module registers some transports, but they're only for testing
619
registration. We don't really want to run all the transport tests against