1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
28
transport as _mod_transport,
31
from bzrlib.transport import (
36
from bzrlib.errors import (DependencyNotPresent,
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
45
from bzrlib.transport import (_clear_protocol_handlers,
48
_get_protocol_handlers,
49
_set_protocol_handlers,
50
_get_transport_modules,
53
register_lazy_transport,
54
register_transport_proto,
57
from bzrlib.transport.chroot import ChrootServer
58
from bzrlib.transport.local import (LocalTransport,
59
EmulatedWin32LocalTransport)
60
from bzrlib.transport.pathfilter import PathFilteringServer
63
# TODO: Should possibly split transport-specific tests into their own files.
66
class TestTransport(TestCase):
67
"""Test the non transport-concrete class functionality."""
69
def test__get_set_protocol_handlers(self):
70
handlers = _get_protocol_handlers()
71
self.assertNotEqual([], handlers.keys( ))
73
_clear_protocol_handlers()
74
self.assertEqual([], _get_protocol_handlers().keys())
76
_set_protocol_handlers(handlers)
78
def test_get_transport_modules(self):
79
handlers = _get_protocol_handlers()
80
# don't pollute the current handlers
81
_clear_protocol_handlers()
82
class SampleHandler(object):
83
"""I exist, isnt that enough?"""
85
_clear_protocol_handlers()
86
register_transport_proto('foo')
87
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
88
'TestTransport.SampleHandler')
89
register_transport_proto('bar')
90
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
91
'TestTransport.SampleHandler')
92
self.assertEqual([SampleHandler.__module__,
93
'bzrlib.transport.chroot',
94
'bzrlib.transport.pathfilter'],
95
_get_transport_modules())
97
_set_protocol_handlers(handlers)
99
def test_transport_dependency(self):
100
"""Transport with missing dependency causes no error"""
101
saved_handlers = _get_protocol_handlers()
102
# don't pollute the current handlers
103
_clear_protocol_handlers()
105
register_transport_proto('foo')
106
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
107
'BadTransportHandler')
109
get_transport('foo://fooserver/foo')
110
except UnsupportedProtocol, e:
112
self.assertEquals('Unsupported protocol'
113
' for url "foo://fooserver/foo":'
114
' Unable to import library "some_lib":'
115
' testing missing dependency', str(e))
117
self.fail('Did not raise UnsupportedProtocol')
119
# restore original values
120
_set_protocol_handlers(saved_handlers)
122
def test_transport_fallback(self):
123
"""Transport with missing dependency causes no error"""
124
saved_handlers = _get_protocol_handlers()
126
_clear_protocol_handlers()
127
register_transport_proto('foo')
128
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
129
'BackupTransportHandler')
130
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
131
'BadTransportHandler')
132
t = get_transport('foo://fooserver/foo')
133
self.assertTrue(isinstance(t, BackupTransportHandler))
135
_set_protocol_handlers(saved_handlers)
137
def test_ssh_hints(self):
138
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
140
get_transport('ssh://fooserver/foo')
141
except UnsupportedProtocol, e:
143
self.assertEquals('Unsupported protocol'
144
' for url "ssh://fooserver/foo":'
145
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
148
self.fail('Did not raise UnsupportedProtocol')
150
def test_LateReadError(self):
151
"""The LateReadError helper should raise on read()."""
152
a_file = LateReadError('a path')
155
except ReadError, error:
156
self.assertEqual('a path', error.path)
157
self.assertRaises(ReadError, a_file.read, 40)
160
def test__combine_paths(self):
162
self.assertEqual('/home/sarah/project/foo',
163
t._combine_paths('/home/sarah', 'project/foo'))
164
self.assertEqual('/etc',
165
t._combine_paths('/home/sarah', '../../etc'))
166
self.assertEqual('/etc',
167
t._combine_paths('/home/sarah', '../../../etc'))
168
self.assertEqual('/etc',
169
t._combine_paths('/home/sarah', '/etc'))
171
def test_local_abspath_non_local_transport(self):
172
# the base implementation should throw
173
t = memory.MemoryTransport()
174
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
175
self.assertEqual('memory:///t is not a local path.', str(e))
178
class TestCoalesceOffsets(TestCase):
180
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
181
coalesce = Transport._coalesce_offsets
182
exp = [_CoalescedOffset(*x) for x in expected]
183
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
185
self.assertEqual(exp, out)
187
def test_coalesce_empty(self):
190
def test_coalesce_simple(self):
191
self.check([(0, 10, [(0, 10)])], [(0, 10)])
193
def test_coalesce_unrelated(self):
194
self.check([(0, 10, [(0, 10)]),
196
], [(0, 10), (20, 10)])
198
def test_coalesce_unsorted(self):
199
self.check([(20, 10, [(0, 10)]),
201
], [(20, 10), (0, 10)])
203
def test_coalesce_nearby(self):
204
self.check([(0, 20, [(0, 10), (10, 10)])],
207
def test_coalesce_overlapped(self):
208
self.assertRaises(ValueError,
209
self.check, [(0, 15, [(0, 10), (5, 10)])],
212
def test_coalesce_limit(self):
213
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
214
(30, 10), (40, 10)]),
215
(60, 50, [(0, 10), (10, 10), (20, 10),
216
(30, 10), (40, 10)]),
217
], [(10, 10), (20, 10), (30, 10), (40, 10),
218
(50, 10), (60, 10), (70, 10), (80, 10),
219
(90, 10), (100, 10)],
222
def test_coalesce_no_limit(self):
223
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
224
(30, 10), (40, 10), (50, 10),
225
(60, 10), (70, 10), (80, 10),
227
], [(10, 10), (20, 10), (30, 10), (40, 10),
228
(50, 10), (60, 10), (70, 10), (80, 10),
229
(90, 10), (100, 10)])
231
def test_coalesce_fudge(self):
232
self.check([(10, 30, [(0, 10), (20, 10)]),
233
(100, 10, [(0, 10),]),
234
], [(10, 10), (30, 10), (100, 10)],
237
def test_coalesce_max_size(self):
238
self.check([(10, 20, [(0, 10), (10, 10)]),
240
# If one range is above max_size, it gets its own coalesced
242
(100, 80, [(0, 80),]),],
243
[(10, 10), (20, 10), (30, 50), (100, 80)],
247
def test_coalesce_no_max_size(self):
248
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
249
[(10, 10), (20, 10), (30, 50), (80, 100)],
252
def test_coalesce_default_limit(self):
253
# By default we use a 100MB max size.
254
ten_mb = 10*1024*1024
255
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
256
(10*ten_mb, ten_mb, [(0, ten_mb)])],
257
[(i*ten_mb, ten_mb) for i in range(11)])
258
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
259
[(i*ten_mb, ten_mb) for i in range(11)],
260
max_size=1*1024*1024*1024)
263
class TestMemoryServer(TestCase):
265
def test_create_server(self):
266
server = memory.MemoryServer()
267
server.start_server()
268
url = server.get_url()
269
self.assertTrue(url in _mod_transport.transport_list_registry)
270
t = _mod_transport.get_transport(url)
273
self.assertFalse(url in _mod_transport.transport_list_registry)
274
self.assertRaises(errors.UnsupportedProtocol,
275
_mod_transport.get_transport, url)
278
class TestMemoryTransport(TestCase):
280
def test_get_transport(self):
281
memory.MemoryTransport()
283
def test_clone(self):
284
transport = memory.MemoryTransport()
285
self.assertTrue(isinstance(transport, memory.MemoryTransport))
286
self.assertEqual("memory:///", transport.clone("/").base)
288
def test_abspath(self):
289
transport = memory.MemoryTransport()
290
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
292
def test_abspath_of_root(self):
293
transport = memory.MemoryTransport()
294
self.assertEqual("memory:///", transport.base)
295
self.assertEqual("memory:///", transport.abspath('/'))
297
def test_abspath_of_relpath_starting_at_root(self):
298
transport = memory.MemoryTransport()
299
self.assertEqual("memory:///foo", transport.abspath('/foo'))
301
def test_append_and_get(self):
302
transport = memory.MemoryTransport()
303
transport.append_bytes('path', 'content')
304
self.assertEqual(transport.get('path').read(), 'content')
305
transport.append_file('path', StringIO('content'))
306
self.assertEqual(transport.get('path').read(), 'contentcontent')
308
def test_put_and_get(self):
309
transport = memory.MemoryTransport()
310
transport.put_file('path', StringIO('content'))
311
self.assertEqual(transport.get('path').read(), 'content')
312
transport.put_bytes('path', 'content')
313
self.assertEqual(transport.get('path').read(), 'content')
315
def test_append_without_dir_fails(self):
316
transport = memory.MemoryTransport()
317
self.assertRaises(NoSuchFile,
318
transport.append_bytes, 'dir/path', 'content')
320
def test_put_without_dir_fails(self):
321
transport = memory.MemoryTransport()
322
self.assertRaises(NoSuchFile,
323
transport.put_file, 'dir/path', StringIO('content'))
325
def test_get_missing(self):
326
transport = memory.MemoryTransport()
327
self.assertRaises(NoSuchFile, transport.get, 'foo')
329
def test_has_missing(self):
330
transport = memory.MemoryTransport()
331
self.assertEquals(False, transport.has('foo'))
333
def test_has_present(self):
334
transport = memory.MemoryTransport()
335
transport.append_bytes('foo', 'content')
336
self.assertEquals(True, transport.has('foo'))
338
def test_list_dir(self):
339
transport = memory.MemoryTransport()
340
transport.put_bytes('foo', 'content')
341
transport.mkdir('dir')
342
transport.put_bytes('dir/subfoo', 'content')
343
transport.put_bytes('dirlike', 'content')
345
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
346
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
348
def test_mkdir(self):
349
transport = memory.MemoryTransport()
350
transport.mkdir('dir')
351
transport.append_bytes('dir/path', 'content')
352
self.assertEqual(transport.get('dir/path').read(), 'content')
354
def test_mkdir_missing_parent(self):
355
transport = memory.MemoryTransport()
356
self.assertRaises(NoSuchFile,
357
transport.mkdir, 'dir/dir')
359
def test_mkdir_twice(self):
360
transport = memory.MemoryTransport()
361
transport.mkdir('dir')
362
self.assertRaises(FileExists, transport.mkdir, 'dir')
364
def test_parameters(self):
365
transport = memory.MemoryTransport()
366
self.assertEqual(True, transport.listable())
367
self.assertEqual(False, transport.is_readonly())
369
def test_iter_files_recursive(self):
370
transport = memory.MemoryTransport()
371
transport.mkdir('dir')
372
transport.put_bytes('dir/foo', 'content')
373
transport.put_bytes('dir/bar', 'content')
374
transport.put_bytes('bar', 'content')
375
paths = set(transport.iter_files_recursive())
376
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
379
transport = memory.MemoryTransport()
380
transport.put_bytes('foo', 'content')
381
transport.put_bytes('bar', 'phowar')
382
self.assertEqual(7, transport.stat('foo').st_size)
383
self.assertEqual(6, transport.stat('bar').st_size)
386
class ChrootDecoratorTransportTest(TestCase):
387
"""Chroot decoration specific tests."""
389
def test_abspath(self):
390
# The abspath is always relative to the chroot_url.
391
server = ChrootServer(get_transport('memory:///foo/bar/'))
392
self.start_server(server)
393
transport = get_transport(server.get_url())
394
self.assertEqual(server.get_url(), transport.abspath('/'))
396
subdir_transport = transport.clone('subdir')
397
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
399
def test_clone(self):
400
server = ChrootServer(get_transport('memory:///foo/bar/'))
401
self.start_server(server)
402
transport = get_transport(server.get_url())
403
# relpath from root and root path are the same
404
relpath_cloned = transport.clone('foo')
405
abspath_cloned = transport.clone('/foo')
406
self.assertEqual(server, relpath_cloned.server)
407
self.assertEqual(server, abspath_cloned.server)
409
def test_chroot_url_preserves_chroot(self):
410
"""Calling get_transport on a chroot transport's base should produce a
411
transport with exactly the same behaviour as the original chroot
414
This is so that it is not possible to escape a chroot by doing::
415
url = chroot_transport.base
416
parent_url = urlutils.join(url, '..')
417
new_transport = get_transport(parent_url)
419
server = ChrootServer(get_transport('memory:///path/subpath'))
420
self.start_server(server)
421
transport = get_transport(server.get_url())
422
new_transport = get_transport(transport.base)
423
self.assertEqual(transport.server, new_transport.server)
424
self.assertEqual(transport.base, new_transport.base)
426
def test_urljoin_preserves_chroot(self):
427
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
428
URL that escapes the intended chroot.
430
This is so that it is not possible to escape a chroot by doing::
431
url = chroot_transport.base
432
parent_url = urlutils.join(url, '..')
433
new_transport = get_transport(parent_url)
435
server = ChrootServer(get_transport('memory:///path/'))
436
self.start_server(server)
437
transport = get_transport(server.get_url())
439
InvalidURLJoin, urlutils.join, transport.base, '..')
442
class ChrootServerTest(TestCase):
444
def test_construct(self):
445
backing_transport = memory.MemoryTransport()
446
server = ChrootServer(backing_transport)
447
self.assertEqual(backing_transport, server.backing_transport)
449
def test_setUp(self):
450
backing_transport = memory.MemoryTransport()
451
server = ChrootServer(backing_transport)
452
server.start_server()
454
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
458
def test_stop_server(self):
459
backing_transport = memory.MemoryTransport()
460
server = ChrootServer(backing_transport)
461
server.start_server()
463
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
465
def test_get_url(self):
466
backing_transport = memory.MemoryTransport()
467
server = ChrootServer(backing_transport)
468
server.start_server()
470
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
475
class PathFilteringDecoratorTransportTest(TestCase):
476
"""Pathfilter decoration specific tests."""
478
def test_abspath(self):
479
# The abspath is always relative to the base of the backing transport.
480
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
482
server.start_server()
483
transport = get_transport(server.get_url())
484
self.assertEqual(server.get_url(), transport.abspath('/'))
486
subdir_transport = transport.clone('subdir')
487
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
490
def make_pf_transport(self, filter_func=None):
491
"""Make a PathFilteringTransport backed by a MemoryTransport.
493
:param filter_func: by default this will be a no-op function. Use this
494
parameter to override it."""
495
if filter_func is None:
496
filter_func = lambda x: x
497
server = PathFilteringServer(
498
get_transport('memory:///foo/bar/'), filter_func)
499
server.start_server()
500
self.addCleanup(server.stop_server)
501
return get_transport(server.get_url())
503
def test__filter(self):
504
# _filter (with an identity func as filter_func) always returns
505
# paths relative to the base of the backing transport.
506
transport = self.make_pf_transport()
507
self.assertEqual('foo', transport._filter('foo'))
508
self.assertEqual('foo/bar', transport._filter('foo/bar'))
509
self.assertEqual('', transport._filter('..'))
510
self.assertEqual('', transport._filter('/'))
511
# The base of the pathfiltering transport is taken into account too.
512
transport = transport.clone('subdir1/subdir2')
513
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
515
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
516
self.assertEqual('subdir1', transport._filter('..'))
517
self.assertEqual('', transport._filter('/'))
519
def test_filter_invocation(self):
522
filter_log.append(path)
524
transport = self.make_pf_transport(filter)
526
self.assertEqual(['abc'], filter_log)
528
transport.clone('abc').has('xyz')
529
self.assertEqual(['abc/xyz'], filter_log)
531
transport.has('/abc')
532
self.assertEqual(['abc'], filter_log)
534
def test_clone(self):
535
transport = self.make_pf_transport()
536
# relpath from root and root path are the same
537
relpath_cloned = transport.clone('foo')
538
abspath_cloned = transport.clone('/foo')
539
self.assertEqual(transport.server, relpath_cloned.server)
540
self.assertEqual(transport.server, abspath_cloned.server)
542
def test_url_preserves_pathfiltering(self):
543
"""Calling get_transport on a pathfiltered transport's base should
544
produce a transport with exactly the same behaviour as the original
545
pathfiltered transport.
547
This is so that it is not possible to escape (accidentally or
548
otherwise) the filtering by doing::
549
url = filtered_transport.base
550
parent_url = urlutils.join(url, '..')
551
new_transport = get_transport(parent_url)
553
transport = self.make_pf_transport()
554
new_transport = get_transport(transport.base)
555
self.assertEqual(transport.server, new_transport.server)
556
self.assertEqual(transport.base, new_transport.base)
559
class ReadonlyDecoratorTransportTest(TestCase):
560
"""Readonly decoration specific tests."""
562
def test_local_parameters(self):
563
# connect to . in readonly mode
564
transport = readonly.ReadonlyTransportDecorator('readonly+.')
565
self.assertEqual(True, transport.listable())
566
self.assertEqual(True, transport.is_readonly())
568
def test_http_parameters(self):
569
from bzrlib.tests.http_server import HttpServer
570
# connect to '.' via http which is not listable
571
server = HttpServer()
572
self.start_server(server)
573
transport = get_transport('readonly+' + server.get_url())
574
self.failUnless(isinstance(transport,
575
readonly.ReadonlyTransportDecorator))
576
self.assertEqual(False, transport.listable())
577
self.assertEqual(True, transport.is_readonly())
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
581
"""NFS decorator specific tests."""
583
def get_nfs_transport(self, url):
584
# connect to url with nfs decoration
585
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
587
def test_local_parameters(self):
588
# the listable and is_readonly parameters
589
# are not changed by the fakenfs decorator
590
transport = self.get_nfs_transport('.')
591
self.assertEqual(True, transport.listable())
592
self.assertEqual(False, transport.is_readonly())
594
def test_http_parameters(self):
595
# the listable and is_readonly parameters
596
# are not changed by the fakenfs decorator
597
from bzrlib.tests.http_server import HttpServer
598
# connect to '.' via http which is not listable
599
server = HttpServer()
600
self.start_server(server)
601
transport = self.get_nfs_transport(server.get_url())
602
self.assertIsInstance(
603
transport, fakenfs.FakeNFSTransportDecorator)
604
self.assertEqual(False, transport.listable())
605
self.assertEqual(True, transport.is_readonly())
607
def test_fakenfs_server_default(self):
608
# a FakeNFSServer() should bring up a local relpath server for itself
609
server = fakenfs.FakeNFSServer()
610
self.start_server(server)
611
# the url should be decorated appropriately
612
self.assertStartsWith(server.get_url(), 'fakenfs+')
613
# and we should be able to get a transport for it
614
transport = get_transport(server.get_url())
615
# which must be a FakeNFSTransportDecorator instance.
616
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
618
def test_fakenfs_rename_semantics(self):
619
# a FakeNFS transport must mangle the way rename errors occur to
620
# look like NFS problems.
621
transport = self.get_nfs_transport('.')
622
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
624
self.assertRaises(errors.ResourceBusy,
625
transport.rename, 'from', 'to')
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
629
"""Tests for simulation of VFAT restrictions"""
631
def get_vfat_transport(self, url):
632
"""Return vfat-backed transport for test directory"""
633
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
634
return FakeVFATTransportDecorator('vfat+' + url)
636
def test_transport_creation(self):
637
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
638
transport = self.get_vfat_transport('.')
639
self.assertIsInstance(transport, FakeVFATTransportDecorator)
641
def test_transport_mkdir(self):
642
transport = self.get_vfat_transport('.')
643
transport.mkdir('HELLO')
644
self.assertTrue(transport.has('hello'))
645
self.assertTrue(transport.has('Hello'))
647
def test_forbidden_chars(self):
648
transport = self.get_vfat_transport('.')
649
self.assertRaises(ValueError, transport.has, "<NU>")
652
class BadTransportHandler(Transport):
653
def __init__(self, base_url):
654
raise DependencyNotPresent('some_lib', 'testing missing dependency')
657
class BackupTransportHandler(Transport):
658
"""Test transport that works as a backup for the BadTransportHandler"""
662
class TestTransportImplementation(TestCaseInTempDir):
663
"""Implementation verification for transports.
665
To verify a transport we need a server factory, which is a callable
666
that accepts no parameters and returns an implementation of
667
bzrlib.transport.Server.
669
That Server is then used to construct transport instances and test
670
the transport via loopback activity.
672
Currently this assumes that the Transport object is connected to the
673
current working directory. So that whatever is done
674
through the transport, should show up in the working
675
directory, and vice-versa. This is a bug, because its possible to have
676
URL schemes which provide access to something that may not be
677
result in storage on the local disk, i.e. due to file system limits, or
678
due to it being a database or some other non-filesystem tool.
680
This also tests to make sure that the functions work with both
681
generators and lists (assuming iter(list) is effectively a generator)
685
super(TestTransportImplementation, self).setUp()
686
self._server = self.transport_server()
687
self.start_server(self._server)
689
def get_transport(self, relpath=None):
690
"""Return a connected transport to the local directory.
692
:param relpath: a path relative to the base url.
694
base_url = self._server.get_url()
695
url = self._adjust_url(base_url, relpath)
696
# try getting the transport via the regular interface:
697
t = get_transport(url)
698
# vila--20070607 if the following are commented out the test suite
699
# still pass. Is this really still needed or was it a forgotten
701
if not isinstance(t, self.transport_class):
702
# we did not get the correct transport class type. Override the
703
# regular connection behaviour by direct construction.
704
t = self.transport_class(url)
708
class TestLocalTransports(TestCase):
710
def test_get_transport_from_abspath(self):
711
here = osutils.abspath('.')
712
t = get_transport(here)
713
self.assertIsInstance(t, LocalTransport)
714
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
716
def test_get_transport_from_relpath(self):
717
here = osutils.abspath('.')
718
t = get_transport('.')
719
self.assertIsInstance(t, LocalTransport)
720
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
722
def test_get_transport_from_local_url(self):
723
here = osutils.abspath('.')
724
here_url = urlutils.local_path_to_url(here) + '/'
725
t = get_transport(here_url)
726
self.assertIsInstance(t, LocalTransport)
727
self.assertEquals(t.base, here_url)
729
def test_local_abspath(self):
730
here = osutils.abspath('.')
731
t = get_transport(here)
732
self.assertEquals(t.local_abspath(''), here)
735
class TestWin32LocalTransport(TestCase):
737
def test_unc_clone_to_root(self):
738
# Win32 UNC path like \\HOST\path
739
# clone to root should stop at least at \\HOST part
741
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
744
self.assertEquals(t.base, 'file://HOST/')
745
# make sure we reach the root
747
self.assertEquals(t.base, 'file://HOST/')
750
class TestConnectedTransport(TestCase):
751
"""Tests for connected to remote server transports"""
753
def test_parse_url(self):
754
t = ConnectedTransport('http://simple.example.com/home/source')
755
self.assertEquals(t._host, 'simple.example.com')
756
self.assertEquals(t._port, None)
757
self.assertEquals(t._path, '/home/source/')
758
self.failUnless(t._user is None)
759
self.failUnless(t._password is None)
761
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
763
def test_parse_url_with_at_in_user(self):
765
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
766
self.assertEquals(t._user, 'user@host.com')
768
def test_parse_quoted_url(self):
769
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
770
self.assertEquals(t._host, 'exAmple.com')
771
self.assertEquals(t._port, 2222)
772
self.assertEquals(t._user, 'robey')
773
self.assertEquals(t._password, 'h@t')
774
self.assertEquals(t._path, '/path/')
776
# Base should not keep track of the password
777
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
779
def test_parse_invalid_url(self):
780
self.assertRaises(errors.InvalidURL,
782
'sftp://lily.org:~janneke/public/bzr/gub')
784
def test_relpath(self):
785
t = ConnectedTransport('sftp://user@host.com/abs/path')
787
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
788
self.assertRaises(errors.PathNotChild, t.relpath,
789
'http://user@host.com/abs/path/sub')
790
self.assertRaises(errors.PathNotChild, t.relpath,
791
'sftp://user2@host.com/abs/path/sub')
792
self.assertRaises(errors.PathNotChild, t.relpath,
793
'sftp://user@otherhost.com/abs/path/sub')
794
self.assertRaises(errors.PathNotChild, t.relpath,
795
'sftp://user@host.com:33/abs/path/sub')
796
# Make sure it works when we don't supply a username
797
t = ConnectedTransport('sftp://host.com/abs/path')
798
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
800
# Make sure it works when parts of the path will be url encoded
801
t = ConnectedTransport('sftp://host.com/dev/%path')
802
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
804
def test_connection_sharing_propagate_credentials(self):
805
t = ConnectedTransport('ftp://user@host.com/abs/path')
806
self.assertEquals('user', t._user)
807
self.assertEquals('host.com', t._host)
808
self.assertIs(None, t._get_connection())
809
self.assertIs(None, t._password)
810
c = t.clone('subdir')
811
self.assertIs(None, c._get_connection())
812
self.assertIs(None, t._password)
814
# Simulate the user entering a password
816
connection = object()
817
t._set_connection(connection, password)
818
self.assertIs(connection, t._get_connection())
819
self.assertIs(password, t._get_credentials())
820
self.assertIs(connection, c._get_connection())
821
self.assertIs(password, c._get_credentials())
823
# credentials can be updated
824
new_password = 'even more secret'
825
c._update_credentials(new_password)
826
self.assertIs(connection, t._get_connection())
827
self.assertIs(new_password, t._get_credentials())
828
self.assertIs(connection, c._get_connection())
829
self.assertIs(new_password, c._get_credentials())
832
class TestReusedTransports(TestCase):
833
"""Tests for transport reuse"""
835
def test_reuse_same_transport(self):
836
possible_transports = []
837
t1 = get_transport('http://foo/',
838
possible_transports=possible_transports)
839
self.assertEqual([t1], possible_transports)
840
t2 = get_transport('http://foo/', possible_transports=[t1])
841
self.assertIs(t1, t2)
843
# Also check that final '/' are handled correctly
844
t3 = get_transport('http://foo/path/')
845
t4 = get_transport('http://foo/path', possible_transports=[t3])
846
self.assertIs(t3, t4)
848
t5 = get_transport('http://foo/path')
849
t6 = get_transport('http://foo/path/', possible_transports=[t5])
850
self.assertIs(t5, t6)
852
def test_don_t_reuse_different_transport(self):
853
t1 = get_transport('http://foo/path')
854
t2 = get_transport('http://bar/path', possible_transports=[t1])
855
self.assertIsNot(t1, t2)
858
class TestTransportTrace(TestCase):
861
transport = get_transport('trace+memory://')
862
self.assertIsInstance(
863
transport, bzrlib.transport.trace.TransportTraceDecorator)
865
def test_clone_preserves_activity(self):
866
transport = get_transport('trace+memory://')
867
transport2 = transport.clone('.')
868
self.assertTrue(transport is not transport2)
869
self.assertTrue(transport._activity is transport2._activity)
871
# the following specific tests are for the operations that have made use of
872
# logging in tests; we could test every single operation but doing that
873
# still won't cause a test failure when the top level Transport API
874
# changes; so there is little return doing that.
876
transport = get_transport('trace+memory:///')
877
transport.put_bytes('foo', 'barish')
880
# put_bytes records the bytes, not the content to avoid memory
882
expected_result.append(('put_bytes', 'foo', 6, None))
883
# get records the file name only.
884
expected_result.append(('get', 'foo'))
885
self.assertEqual(expected_result, transport._activity)
887
def test_readv(self):
888
transport = get_transport('trace+memory:///')
889
transport.put_bytes('foo', 'barish')
890
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
893
# put_bytes records the bytes, not the content to avoid memory
895
expected_result.append(('put_bytes', 'foo', 6, None))
896
# readv records the supplied offset request
897
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
898
self.assertEqual(expected_result, transport._activity)
901
class TestSSHConnections(tests.TestCaseWithTransport):
903
def test_bzr_connect_to_bzr_ssh(self):
904
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
906
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
908
# This test actually causes a bzr instance to be invoked, which is very
909
# expensive: it should be the only such test in the test suite.
910
# A reasonable evolution for this would be to simply check inside
911
# check_channel_exec_request that the command is appropriate, and then
912
# satisfy requests in-process.
913
self.requireFeature(features.paramiko)
914
# SFTPFullAbsoluteServer has a get_url method, and doesn't
915
# override the interface (doesn't change self._vendor).
916
# Note that this does encryption, so can be slow.
917
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
918
from bzrlib.tests.stub_sftp import StubServer
920
# Start an SSH server
921
self.command_executed = []
922
# XXX: This is horrible -- we define a really dumb SSH server that
923
# executes commands, and manage the hooking up of stdin/out/err to the
924
# SSH channel ourselves. Surely this has already been implemented
927
class StubSSHServer(StubServer):
931
def check_channel_exec_request(self, channel, command):
932
self.test.command_executed.append(command)
933
proc = subprocess.Popen(
934
command, shell=True, stdin=subprocess.PIPE,
935
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
937
# XXX: horribly inefficient, not to mention ugly.
938
# Start a thread for each of stdin/out/err, and relay bytes from
939
# the subprocess to channel and vice versa.
940
def ferry_bytes(read, write, close):
949
(channel.recv, proc.stdin.write, proc.stdin.close),
950
(proc.stdout.read, channel.sendall, channel.close),
951
(proc.stderr.read, channel.sendall_stderr, channel.close)]
953
for read, write, close in file_functions:
954
t = threading.Thread(
955
target=ferry_bytes, args=(read, write, close))
961
ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
962
# We *don't* want to override the default SSH vendor: the detected one
964
self.start_server(ssh_server)
965
port = ssh_server._listener.port
967
if sys.platform == 'win32':
968
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
970
bzr_remote_path = self.get_bzr_path()
971
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
973
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
974
# variable is used to tell bzr what command to run on the remote end.
975
path_to_branch = osutils.abspath('.')
976
if sys.platform == 'win32':
977
# On Windows, we export all drives as '/C:/, etc. So we need to
978
# prefix a '/' to get the right path.
979
path_to_branch = '/' + path_to_branch
980
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
981
t = get_transport(url)
982
self.permit_url(t.base)
986
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
987
self.command_executed)
988
# Make sure to disconnect, so that the remote process can stop, and we
989
# can cleanup. Then pause the test until everything is shutdown
990
t._client._medium.disconnect()
993
# First wait for the subprocess
995
# And the rest are threads
996
for t in started[1:]: