13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
18
from cStringIO import StringIO
21
24
from bzrlib import (
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
31
from bzrlib.transport import (
40
from bzrlib.tests import (
53
46
# TODO: Should possibly split transport-specific tests into their own files.
56
class TestTransport(TestCase):
49
class TestTransport(tests.TestCase):
57
50
"""Test the non transport-concrete class functionality."""
52
# FIXME: These tests should use addCleanup() and/or overrideAttr() instead
53
# of try/finally -- vila 20100205
59
55
def test__get_set_protocol_handlers(self):
60
handlers = _get_protocol_handlers()
56
handlers = transport._get_protocol_handlers()
61
57
self.assertNotEqual([], handlers.keys( ))
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
59
transport._clear_protocol_handlers()
60
self.assertEqual([], transport._get_protocol_handlers().keys())
66
_set_protocol_handlers(handlers)
62
transport._set_protocol_handlers(handlers)
68
64
def test_get_transport_modules(self):
69
handlers = _get_protocol_handlers()
65
handlers = transport._get_protocol_handlers()
70
66
# don't pollute the current handlers
71
_clear_protocol_handlers()
67
transport._clear_protocol_handlers()
72
68
class SampleHandler(object):
73
69
"""I exist, isnt that enough?"""
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
71
transport._clear_protocol_handlers()
72
transport.register_transport_proto('foo')
73
transport.register_lazy_transport('foo',
74
'bzrlib.tests.test_transport',
75
'TestTransport.SampleHandler')
76
transport.register_transport_proto('bar')
77
transport.register_lazy_transport('bar',
78
'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
82
80
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
84
_get_transport_modules())
81
'bzrlib.transport.chroot',
82
'bzrlib.transport.pathfilter'],
83
transport._get_transport_modules())
86
_set_protocol_handlers(handlers)
85
transport._set_protocol_handlers(handlers)
88
87
def test_transport_dependency(self):
89
88
"""Transport with missing dependency causes no error"""
90
saved_handlers = _get_protocol_handlers()
89
saved_handlers = transport._get_protocol_handlers()
91
90
# don't pollute the current handlers
92
_clear_protocol_handlers()
91
transport._clear_protocol_handlers()
94
register_transport_proto('foo')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
93
transport.register_transport_proto('foo')
94
transport.register_lazy_transport(
95
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
98
get_transport('foo://fooserver/foo')
99
except UnsupportedProtocol, e:
97
transport.get_transport('foo://fooserver/foo')
98
except errors.UnsupportedProtocol, e:
101
100
self.assertEquals('Unsupported protocol'
102
101
' for url "foo://fooserver/foo":'
106
105
self.fail('Did not raise UnsupportedProtocol')
108
107
# restore original values
109
_set_protocol_handlers(saved_handlers)
108
transport._set_protocol_handlers(saved_handlers)
111
110
def test_transport_fallback(self):
112
111
"""Transport with missing dependency causes no error"""
113
saved_handlers = _get_protocol_handlers()
112
saved_handlers = transport._get_protocol_handlers()
115
_clear_protocol_handlers()
116
register_transport_proto('foo')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BackupTransportHandler')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BadTransportHandler')
121
t = get_transport('foo://fooserver/foo')
114
transport._clear_protocol_handlers()
115
transport.register_transport_proto('foo')
116
transport.register_lazy_transport(
117
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
118
transport.register_lazy_transport(
119
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
120
t = transport.get_transport('foo://fooserver/foo')
122
121
self.assertTrue(isinstance(t, BackupTransportHandler))
124
_set_protocol_handlers(saved_handlers)
123
transport._set_protocol_handlers(saved_handlers)
125
def test_ssh_hints(self):
126
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
transport.get_transport('ssh://fooserver/foo')
129
except errors.UnsupportedProtocol, e:
131
self.assertEquals('Unsupported protocol'
132
' for url "ssh://fooserver/foo":'
133
' bzr supports bzr+ssh to operate over ssh,'
134
' use "bzr+ssh://fooserver/foo".',
137
self.fail('Did not raise UnsupportedProtocol')
126
139
def test_LateReadError(self):
127
140
"""The LateReadError helper should raise on read()."""
128
a_file = LateReadError('a path')
141
a_file = transport.LateReadError('a path')
131
except ReadError, error:
144
except errors.ReadError, error:
132
145
self.assertEqual('a path', error.path)
133
self.assertRaises(ReadError, a_file.read, 40)
146
self.assertRaises(errors.ReadError, a_file.read, 40)
136
149
def test__combine_paths(self):
150
t = transport.Transport('/')
138
151
self.assertEqual('/home/sarah/project/foo',
139
152
t._combine_paths('/home/sarah', 'project/foo'))
140
153
self.assertEqual('/etc',
225
238
[(10, 10), (20, 10), (30, 50), (80, 100)],
229
class TestMemoryTransport(TestCase):
241
def test_coalesce_default_limit(self):
242
# By default we use a 100MB max size.
243
ten_mb = 10*1024*1024
244
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
(10*ten_mb, ten_mb, [(0, ten_mb)])],
246
[(i*ten_mb, ten_mb) for i in range(11)])
247
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
[(i*ten_mb, ten_mb) for i in range(11)],
249
max_size=1*1024*1024*1024)
252
class TestMemoryServer(tests.TestCase):
254
def test_create_server(self):
255
server = memory.MemoryServer()
256
server.start_server()
257
url = server.get_url()
258
self.assertTrue(url in transport.transport_list_registry)
259
t = transport.get_transport(url)
262
self.assertFalse(url in transport.transport_list_registry)
263
self.assertRaises(errors.UnsupportedProtocol,
264
transport.get_transport, url)
267
class TestMemoryTransport(tests.TestCase):
231
269
def test_get_transport(self):
270
memory.MemoryTransport()
234
272
def test_clone(self):
235
transport = MemoryTransport()
236
self.assertTrue(isinstance(transport, MemoryTransport))
237
self.assertEqual("memory:///", transport.clone("/").base)
273
t = memory.MemoryTransport()
274
self.assertTrue(isinstance(t, memory.MemoryTransport))
275
self.assertEqual("memory:///", t.clone("/").base)
239
277
def test_abspath(self):
240
transport = MemoryTransport()
241
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
278
t = memory.MemoryTransport()
279
self.assertEqual("memory:///relpath", t.abspath('relpath'))
243
281
def test_abspath_of_root(self):
244
transport = MemoryTransport()
245
self.assertEqual("memory:///", transport.base)
246
self.assertEqual("memory:///", transport.abspath('/'))
282
t = memory.MemoryTransport()
283
self.assertEqual("memory:///", t.base)
284
self.assertEqual("memory:///", t.abspath('/'))
248
286
def test_abspath_of_relpath_starting_at_root(self):
249
transport = MemoryTransport()
250
self.assertEqual("memory:///foo", transport.abspath('/foo'))
287
t = memory.MemoryTransport()
288
self.assertEqual("memory:///foo", t.abspath('/foo'))
252
290
def test_append_and_get(self):
253
transport = MemoryTransport()
254
transport.append_bytes('path', 'content')
255
self.assertEqual(transport.get('path').read(), 'content')
256
transport.append_file('path', StringIO('content'))
257
self.assertEqual(transport.get('path').read(), 'contentcontent')
291
t = memory.MemoryTransport()
292
t.append_bytes('path', 'content')
293
self.assertEqual(t.get('path').read(), 'content')
294
t.append_file('path', StringIO('content'))
295
self.assertEqual(t.get('path').read(), 'contentcontent')
259
297
def test_put_and_get(self):
260
transport = MemoryTransport()
261
transport.put_file('path', StringIO('content'))
262
self.assertEqual(transport.get('path').read(), 'content')
263
transport.put_bytes('path', 'content')
264
self.assertEqual(transport.get('path').read(), 'content')
298
t = memory.MemoryTransport()
299
t.put_file('path', StringIO('content'))
300
self.assertEqual(t.get('path').read(), 'content')
301
t.put_bytes('path', 'content')
302
self.assertEqual(t.get('path').read(), 'content')
266
304
def test_append_without_dir_fails(self):
267
transport = MemoryTransport()
268
self.assertRaises(NoSuchFile,
269
transport.append_bytes, 'dir/path', 'content')
305
t = memory.MemoryTransport()
306
self.assertRaises(errors.NoSuchFile,
307
t.append_bytes, 'dir/path', 'content')
271
309
def test_put_without_dir_fails(self):
272
transport = MemoryTransport()
273
self.assertRaises(NoSuchFile,
274
transport.put_file, 'dir/path', StringIO('content'))
310
t = memory.MemoryTransport()
311
self.assertRaises(errors.NoSuchFile,
312
t.put_file, 'dir/path', StringIO('content'))
276
314
def test_get_missing(self):
277
transport = MemoryTransport()
278
self.assertRaises(NoSuchFile, transport.get, 'foo')
315
transport = memory.MemoryTransport()
316
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
280
318
def test_has_missing(self):
281
transport = MemoryTransport()
282
self.assertEquals(False, transport.has('foo'))
319
t = memory.MemoryTransport()
320
self.assertEquals(False, t.has('foo'))
284
322
def test_has_present(self):
285
transport = MemoryTransport()
286
transport.append_bytes('foo', 'content')
287
self.assertEquals(True, transport.has('foo'))
323
t = memory.MemoryTransport()
324
t.append_bytes('foo', 'content')
325
self.assertEquals(True, t.has('foo'))
289
327
def test_list_dir(self):
290
transport = MemoryTransport()
291
transport.put_bytes('foo', 'content')
292
transport.mkdir('dir')
293
transport.put_bytes('dir/subfoo', 'content')
294
transport.put_bytes('dirlike', 'content')
328
t = memory.MemoryTransport()
329
t.put_bytes('foo', 'content')
331
t.put_bytes('dir/subfoo', 'content')
332
t.put_bytes('dirlike', 'content')
296
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
297
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
334
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
335
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
299
337
def test_mkdir(self):
300
transport = MemoryTransport()
301
transport.mkdir('dir')
302
transport.append_bytes('dir/path', 'content')
303
self.assertEqual(transport.get('dir/path').read(), 'content')
338
t = memory.MemoryTransport()
340
t.append_bytes('dir/path', 'content')
341
self.assertEqual(t.get('dir/path').read(), 'content')
305
343
def test_mkdir_missing_parent(self):
306
transport = MemoryTransport()
307
self.assertRaises(NoSuchFile,
308
transport.mkdir, 'dir/dir')
344
t = memory.MemoryTransport()
345
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
310
347
def test_mkdir_twice(self):
311
transport = MemoryTransport()
312
transport.mkdir('dir')
313
self.assertRaises(FileExists, transport.mkdir, 'dir')
348
t = memory.MemoryTransport()
350
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
315
352
def test_parameters(self):
316
transport = MemoryTransport()
317
self.assertEqual(True, transport.listable())
318
self.assertEqual(False, transport.is_readonly())
353
t = memory.MemoryTransport()
354
self.assertEqual(True, t.listable())
355
self.assertEqual(False, t.is_readonly())
320
357
def test_iter_files_recursive(self):
321
transport = MemoryTransport()
322
transport.mkdir('dir')
323
transport.put_bytes('dir/foo', 'content')
324
transport.put_bytes('dir/bar', 'content')
325
transport.put_bytes('bar', 'content')
326
paths = set(transport.iter_files_recursive())
358
t = memory.MemoryTransport()
360
t.put_bytes('dir/foo', 'content')
361
t.put_bytes('dir/bar', 'content')
362
t.put_bytes('bar', 'content')
363
paths = set(t.iter_files_recursive())
327
364
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
329
366
def test_stat(self):
330
transport = MemoryTransport()
331
transport.put_bytes('foo', 'content')
332
transport.put_bytes('bar', 'phowar')
333
self.assertEqual(7, transport.stat('foo').st_size)
334
self.assertEqual(6, transport.stat('bar').st_size)
337
class ChrootDecoratorTransportTest(TestCase):
367
t = memory.MemoryTransport()
368
t.put_bytes('foo', 'content')
369
t.put_bytes('bar', 'phowar')
370
self.assertEqual(7, t.stat('foo').st_size)
371
self.assertEqual(6, t.stat('bar').st_size)
374
class ChrootDecoratorTransportTest(tests.TestCase):
338
375
"""Chroot decoration specific tests."""
340
377
def test_abspath(self):
341
378
# The abspath is always relative to the chroot_url.
342
server = ChrootServer(get_transport('memory:///foo/bar/'))
344
transport = get_transport(server.get_url())
345
self.assertEqual(server.get_url(), transport.abspath('/'))
379
server = chroot.ChrootServer(
380
transport.get_transport('memory:///foo/bar/'))
381
self.start_server(server)
382
t = transport.get_transport(server.get_url())
383
self.assertEqual(server.get_url(), t.abspath('/'))
347
subdir_transport = transport.clone('subdir')
348
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
385
subdir_t = t.clone('subdir')
386
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
351
388
def test_clone(self):
352
server = ChrootServer(get_transport('memory:///foo/bar/'))
354
transport = get_transport(server.get_url())
389
server = chroot.ChrootServer(
390
transport.get_transport('memory:///foo/bar/'))
391
self.start_server(server)
392
t = transport.get_transport(server.get_url())
355
393
# relpath from root and root path are the same
356
relpath_cloned = transport.clone('foo')
357
abspath_cloned = transport.clone('/foo')
394
relpath_cloned = t.clone('foo')
395
abspath_cloned = t.clone('/foo')
358
396
self.assertEqual(server, relpath_cloned.server)
359
397
self.assertEqual(server, abspath_cloned.server)
362
399
def test_chroot_url_preserves_chroot(self):
363
400
"""Calling get_transport on a chroot transport's base should produce a
364
401
transport with exactly the same behaviour as the original chroot
384
421
This is so that it is not possible to escape a chroot by doing::
385
422
url = chroot_transport.base
386
423
parent_url = urlutils.join(url, '..')
387
new_transport = get_transport(parent_url)
424
new_t = transport.get_transport(parent_url)
389
server = ChrootServer(get_transport('memory:///path/'))
391
transport = get_transport(server.get_url())
426
server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
427
self.start_server(server)
428
t = transport.get_transport(server.get_url())
392
429
self.assertRaises(
393
InvalidURLJoin, urlutils.join, transport.base, '..')
397
class ChrootServerTest(TestCase):
430
errors.InvalidURLJoin, urlutils.join, t.base, '..')
433
class TestChrootServer(tests.TestCase):
399
435
def test_construct(self):
400
backing_transport = MemoryTransport()
401
server = ChrootServer(backing_transport)
436
backing_transport = memory.MemoryTransport()
437
server = chroot.ChrootServer(backing_transport)
402
438
self.assertEqual(backing_transport, server.backing_transport)
404
440
def test_setUp(self):
405
backing_transport = MemoryTransport()
406
server = ChrootServer(backing_transport)
408
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
441
backing_transport = memory.MemoryTransport()
442
server = chroot.ChrootServer(backing_transport)
443
server.start_server()
445
self.assertTrue(server.scheme
446
in transport._get_protocol_handlers().keys())
410
def test_tearDown(self):
411
backing_transport = MemoryTransport()
412
server = ChrootServer(backing_transport)
415
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
450
def test_stop_server(self):
451
backing_transport = memory.MemoryTransport()
452
server = chroot.ChrootServer(backing_transport)
453
server.start_server()
455
self.assertFalse(server.scheme
456
in transport._get_protocol_handlers().keys())
417
458
def test_get_url(self):
418
backing_transport = MemoryTransport()
419
server = ChrootServer(backing_transport)
421
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
425
class ReadonlyDecoratorTransportTest(TestCase):
459
backing_transport = memory.MemoryTransport()
460
server = chroot.ChrootServer(backing_transport)
461
server.start_server()
463
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
468
class PathFilteringDecoratorTransportTest(tests.TestCase):
469
"""Pathfilter decoration specific tests."""
471
def test_abspath(self):
472
# The abspath is always relative to the base of the backing transport.
473
server = pathfilter.PathFilteringServer(
474
transport.get_transport('memory:///foo/bar/'),
476
server.start_server()
477
t = transport.get_transport(server.get_url())
478
self.assertEqual(server.get_url(), t.abspath('/'))
480
subdir_t = t.clone('subdir')
481
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
484
def make_pf_transport(self, filter_func=None):
485
"""Make a PathFilteringTransport backed by a MemoryTransport.
487
:param filter_func: by default this will be a no-op function. Use this
488
parameter to override it."""
489
if filter_func is None:
490
filter_func = lambda x: x
491
server = pathfilter.PathFilteringServer(
492
transport.get_transport('memory:///foo/bar/'), filter_func)
493
server.start_server()
494
self.addCleanup(server.stop_server)
495
return transport.get_transport(server.get_url())
497
def test__filter(self):
498
# _filter (with an identity func as filter_func) always returns
499
# paths relative to the base of the backing transport.
500
t = self.make_pf_transport()
501
self.assertEqual('foo', t._filter('foo'))
502
self.assertEqual('foo/bar', t._filter('foo/bar'))
503
self.assertEqual('', t._filter('..'))
504
self.assertEqual('', t._filter('/'))
505
# The base of the pathfiltering transport is taken into account too.
506
t = t.clone('subdir1/subdir2')
507
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
508
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
509
self.assertEqual('subdir1', t._filter('..'))
510
self.assertEqual('', t._filter('/'))
512
def test_filter_invocation(self):
515
filter_log.append(path)
517
t = self.make_pf_transport(filter)
519
self.assertEqual(['abc'], filter_log)
521
t.clone('abc').has('xyz')
522
self.assertEqual(['abc/xyz'], filter_log)
525
self.assertEqual(['abc'], filter_log)
527
def test_clone(self):
528
t = self.make_pf_transport()
529
# relpath from root and root path are the same
530
relpath_cloned = t.clone('foo')
531
abspath_cloned = t.clone('/foo')
532
self.assertEqual(t.server, relpath_cloned.server)
533
self.assertEqual(t.server, abspath_cloned.server)
535
def test_url_preserves_pathfiltering(self):
536
"""Calling get_transport on a pathfiltered transport's base should
537
produce a transport with exactly the same behaviour as the original
538
pathfiltered transport.
540
This is so that it is not possible to escape (accidentally or
541
otherwise) the filtering by doing::
542
url = filtered_transport.base
543
parent_url = urlutils.join(url, '..')
544
new_t = transport.get_transport(parent_url)
546
t = self.make_pf_transport()
547
new_t = transport.get_transport(t.base)
548
self.assertEqual(t.server, new_t.server)
549
self.assertEqual(t.base, new_t.base)
552
class ReadonlyDecoratorTransportTest(tests.TestCase):
426
553
"""Readonly decoration specific tests."""
428
555
def test_local_parameters(self):
429
import bzrlib.transport.readonly as readonly
430
556
# connect to . in readonly mode
431
transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
self.assertEqual(True, transport.listable())
433
self.assertEqual(True, transport.is_readonly())
557
t = readonly.ReadonlyTransportDecorator('readonly+.')
558
self.assertEqual(True, t.listable())
559
self.assertEqual(True, t.is_readonly())
435
561
def test_http_parameters(self):
436
562
from bzrlib.tests.http_server import HttpServer
437
import bzrlib.transport.readonly as readonly
438
563
# connect to '.' via http which is not listable
439
564
server = HttpServer()
442
transport = get_transport('readonly+' + server.get_url())
443
self.failUnless(isinstance(transport,
444
readonly.ReadonlyTransportDecorator))
445
self.assertEqual(False, transport.listable())
446
self.assertEqual(True, transport.is_readonly())
451
class FakeNFSDecoratorTests(TestCaseInTempDir):
565
self.start_server(server)
566
t = transport.get_transport('readonly+' + server.get_url())
567
self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
568
self.assertEqual(False, t.listable())
569
self.assertEqual(True, t.is_readonly())
572
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
452
573
"""NFS decorator specific tests."""
454
575
def get_nfs_transport(self, url):
455
import bzrlib.transport.fakenfs as fakenfs
456
576
# connect to url with nfs decoration
457
577
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
459
579
def test_local_parameters(self):
460
580
# the listable and is_readonly parameters
461
581
# are not changed by the fakenfs decorator
462
transport = self.get_nfs_transport('.')
463
self.assertEqual(True, transport.listable())
464
self.assertEqual(False, transport.is_readonly())
582
t = self.get_nfs_transport('.')
583
self.assertEqual(True, t.listable())
584
self.assertEqual(False, t.is_readonly())
466
586
def test_http_parameters(self):
467
587
# the listable and is_readonly parameters
469
589
from bzrlib.tests.http_server import HttpServer
470
590
# connect to '.' via http which is not listable
471
591
server = HttpServer()
474
transport = self.get_nfs_transport(server.get_url())
475
self.assertIsInstance(
476
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
self.assertEqual(False, transport.listable())
478
self.assertEqual(True, transport.is_readonly())
592
self.start_server(server)
593
t = self.get_nfs_transport(server.get_url())
594
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
595
self.assertEqual(False, t.listable())
596
self.assertEqual(True, t.is_readonly())
482
598
def test_fakenfs_server_default(self):
483
599
# a FakeNFSServer() should bring up a local relpath server for itself
484
import bzrlib.transport.fakenfs as fakenfs
485
server = fakenfs.FakeNFSServer()
488
# the url should be decorated appropriately
489
self.assertStartsWith(server.get_url(), 'fakenfs+')
490
# and we should be able to get a transport for it
491
transport = get_transport(server.get_url())
492
# which must be a FakeNFSTransportDecorator instance.
493
self.assertIsInstance(
494
transport, fakenfs.FakeNFSTransportDecorator)
600
server = test_server.FakeNFSServer()
601
self.start_server(server)
602
# the url should be decorated appropriately
603
self.assertStartsWith(server.get_url(), 'fakenfs+')
604
# and we should be able to get a transport for it
605
t = transport.get_transport(server.get_url())
606
# which must be a FakeNFSTransportDecorator instance.
607
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
498
609
def test_fakenfs_rename_semantics(self):
499
610
# a FakeNFS transport must mangle the way rename errors occur to
500
611
# look like NFS problems.
501
transport = self.get_nfs_transport('.')
612
t = self.get_nfs_transport('.')
502
613
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
504
self.assertRaises(errors.ResourceBusy,
505
transport.rename, 'from', 'to')
508
class FakeVFATDecoratorTests(TestCaseInTempDir):
615
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
618
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
509
619
"""Tests for simulation of VFAT restrictions"""
511
621
def get_vfat_transport(self, url):
516
626
def test_transport_creation(self):
517
627
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
518
transport = self.get_vfat_transport('.')
519
self.assertIsInstance(transport, FakeVFATTransportDecorator)
628
t = self.get_vfat_transport('.')
629
self.assertIsInstance(t, FakeVFATTransportDecorator)
521
631
def test_transport_mkdir(self):
522
transport = self.get_vfat_transport('.')
523
transport.mkdir('HELLO')
524
self.assertTrue(transport.has('hello'))
525
self.assertTrue(transport.has('Hello'))
632
t = self.get_vfat_transport('.')
634
self.assertTrue(t.has('hello'))
635
self.assertTrue(t.has('Hello'))
527
637
def test_forbidden_chars(self):
528
transport = self.get_vfat_transport('.')
529
self.assertRaises(ValueError, transport.has, "<NU>")
532
class BadTransportHandler(Transport):
638
t = self.get_vfat_transport('.')
639
self.assertRaises(ValueError, t.has, "<NU>")
642
class BadTransportHandler(transport.Transport):
533
643
def __init__(self, base_url):
534
raise DependencyNotPresent('some_lib', 'testing missing dependency')
537
class BackupTransportHandler(Transport):
644
raise errors.DependencyNotPresent('some_lib',
645
'testing missing dependency')
648
class BackupTransportHandler(transport.Transport):
538
649
"""Test transport that works as a backup for the BadTransportHandler"""
542
class TestTransportImplementation(TestCaseInTempDir):
653
class TestTransportImplementation(tests.TestCaseInTempDir):
543
654
"""Implementation verification for transports.
545
656
To verify a transport we need a server factory, which is a callable
546
657
that accepts no parameters and returns an implementation of
547
658
bzrlib.transport.Server.
549
660
That Server is then used to construct transport instances and test
550
661
the transport via loopback activity.
552
Currently this assumes that the Transport object is connected to the
553
current working directory. So that whatever is done
554
through the transport, should show up in the working
663
Currently this assumes that the Transport object is connected to the
664
current working directory. So that whatever is done
665
through the transport, should show up in the working
555
666
directory, and vice-versa. This is a bug, because its possible to have
556
URL schemes which provide access to something that may not be
557
result in storage on the local disk, i.e. due to file system limits, or
667
URL schemes which provide access to something that may not be
668
result in storage on the local disk, i.e. due to file system limits, or
558
669
due to it being a database or some other non-filesystem tool.
560
671
This also tests to make sure that the functions work with both
561
672
generators and lists (assuming iter(list) is effectively a generator)
565
676
super(TestTransportImplementation, self).setUp()
566
677
self._server = self.transport_server()
568
self.addCleanup(self._server.tearDown)
678
self.start_server(self._server)
570
680
def get_transport(self, relpath=None):
571
681
"""Return a connected transport to the local directory.
705
822
self.assertIs(new_password, c._get_credentials())
708
class TestReusedTransports(TestCase):
825
class TestReusedTransports(tests.TestCase):
709
826
"""Tests for transport reuse"""
711
828
def test_reuse_same_transport(self):
712
829
possible_transports = []
713
t1 = get_transport('http://foo/',
714
possible_transports=possible_transports)
830
t1 = transport.get_transport('http://foo/',
831
possible_transports=possible_transports)
715
832
self.assertEqual([t1], possible_transports)
716
t2 = get_transport('http://foo/', possible_transports=[t1])
833
t2 = transport.get_transport('http://foo/',
834
possible_transports=[t1])
717
835
self.assertIs(t1, t2)
719
837
# Also check that final '/' are handled correctly
720
t3 = get_transport('http://foo/path/')
721
t4 = get_transport('http://foo/path', possible_transports=[t3])
838
t3 = transport.get_transport('http://foo/path/')
839
t4 = transport.get_transport('http://foo/path',
840
possible_transports=[t3])
722
841
self.assertIs(t3, t4)
724
t5 = get_transport('http://foo/path')
725
t6 = get_transport('http://foo/path/', possible_transports=[t5])
843
t5 = transport.get_transport('http://foo/path')
844
t6 = transport.get_transport('http://foo/path/',
845
possible_transports=[t5])
726
846
self.assertIs(t5, t6)
728
848
def test_don_t_reuse_different_transport(self):
729
t1 = get_transport('http://foo/path')
730
t2 = get_transport('http://bar/path', possible_transports=[t1])
849
t1 = transport.get_transport('http://foo/path')
850
t2 = transport.get_transport('http://bar/path',
851
possible_transports=[t1])
731
852
self.assertIsNot(t1, t2)
734
class TestTransportTrace(TestCase):
855
class TestTransportTrace(tests.TestCase):
736
857
def test_get(self):
737
transport = get_transport('trace+memory://')
738
self.assertIsInstance(
739
transport, bzrlib.transport.trace.TransportTraceDecorator)
858
t = transport.get_transport('trace+memory://')
859
self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
741
861
def test_clone_preserves_activity(self):
742
transport = get_transport('trace+memory://')
743
transport2 = transport.clone('.')
744
self.assertTrue(transport is not transport2)
745
self.assertTrue(transport._activity is transport2._activity)
862
t = transport.get_transport('trace+memory://')
864
self.assertTrue(t is not t2)
865
self.assertTrue(t._activity is t2._activity)
747
867
# the following specific tests are for the operations that have made use of
748
868
# logging in tests; we could test every single operation but doing that
749
869
# still won't cause a test failure when the top level Transport API
750
870
# changes; so there is little return doing that.
751
871
def test_get(self):
752
transport = get_transport('trace+memory:///')
753
transport.put_bytes('foo', 'barish')
872
t = transport.get_transport('trace+memory:///')
873
t.put_bytes('foo', 'barish')
755
875
expected_result = []
756
876
# put_bytes records the bytes, not the content to avoid memory
758
878
expected_result.append(('put_bytes', 'foo', 6, None))
759
879
# get records the file name only.
760
880
expected_result.append(('get', 'foo'))
761
self.assertEqual(expected_result, transport._activity)
881
self.assertEqual(expected_result, t._activity)
763
883
def test_readv(self):
764
transport = get_transport('trace+memory:///')
765
transport.put_bytes('foo', 'barish')
766
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
884
t = transport.get_transport('trace+memory:///')
885
t.put_bytes('foo', 'barish')
886
list(t.readv('foo', [(0, 1), (3, 2)],
887
adjust_for_latency=True, upper_limit=6))
768
888
expected_result = []
769
889
# put_bytes records the bytes, not the content to avoid memory
771
891
expected_result.append(('put_bytes', 'foo', 6, None))
772
892
# readv records the supplied offset request
773
893
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
774
self.assertEqual(expected_result, transport._activity)
894
self.assertEqual(expected_result, t._activity)
897
class TestSSHConnections(tests.TestCaseWithTransport):
899
def test_bzr_connect_to_bzr_ssh(self):
900
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
902
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
904
# This test actually causes a bzr instance to be invoked, which is very
905
# expensive: it should be the only such test in the test suite.
906
# A reasonable evolution for this would be to simply check inside
907
# check_channel_exec_request that the command is appropriate, and then
908
# satisfy requests in-process.
909
self.requireFeature(features.paramiko)
910
# SFTPFullAbsoluteServer has a get_url method, and doesn't
911
# override the interface (doesn't change self._vendor).
912
# Note that this does encryption, so can be slow.
913
from bzrlib.tests import stub_sftp
915
# Start an SSH server
916
self.command_executed = []
917
# XXX: This is horrible -- we define a really dumb SSH server that
918
# executes commands, and manage the hooking up of stdin/out/err to the
919
# SSH channel ourselves. Surely this has already been implemented
922
class StubSSHServer(stub_sftp.StubServer):
926
def check_channel_exec_request(self, channel, command):
927
self.test.command_executed.append(command)
928
proc = subprocess.Popen(
929
command, shell=True, stdin=subprocess.PIPE,
930
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
932
# XXX: horribly inefficient, not to mention ugly.
933
# Start a thread for each of stdin/out/err, and relay bytes from
934
# the subprocess to channel and vice versa.
935
def ferry_bytes(read, write, close):
944
(channel.recv, proc.stdin.write, proc.stdin.close),
945
(proc.stdout.read, channel.sendall, channel.close),
946
(proc.stderr.read, channel.sendall_stderr, channel.close)]
948
for read, write, close in file_functions:
949
t = threading.Thread(
950
target=ferry_bytes, args=(read, write, close))
956
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
957
# We *don't* want to override the default SSH vendor: the detected one
960
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
961
# inherits from SFTPServer which forces the SSH vendor to
962
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
963
self.start_server(ssh_server)
964
port = ssh_server.port
966
if sys.platform == 'win32':
967
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
969
bzr_remote_path = self.get_bzr_path()
970
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
972
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
973
# variable is used to tell bzr what command to run on the remote end.
974
path_to_branch = osutils.abspath('.')
975
if sys.platform == 'win32':
976
# On Windows, we export all drives as '/C:/, etc. So we need to
977
# prefix a '/' to get the right path.
978
path_to_branch = '/' + path_to_branch
979
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
980
t = transport.get_transport(url)
981
self.permit_url(t.base)
985
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
986
self.command_executed)
987
# Make sure to disconnect, so that the remote process can stop, and we
988
# can cleanup. Then pause the test until everything is shutdown
989
t._client._medium.disconnect()
992
# First wait for the subprocess
994
# And the rest are threads
995
for t in started[1:]:
999
class TestUnhtml(tests.TestCase):
1001
"""Tests for unhtml_roughly"""
1003
def test_truncation(self):
1004
fake_html = "<p>something!\n" * 1000
1005
result = http.unhtml_roughly(fake_html)
1006
self.assertEquals(len(result), 1000)
1007
self.assertStartsWith(result, " something!")