1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
31
from bzrlib.errors import (DependencyNotPresent,
39
from bzrlib.tests import features, TestCase, TestCaseInTempDir
40
from bzrlib.transport import (_clear_protocol_handlers,
43
_get_protocol_handlers,
44
_set_protocol_handlers,
45
_get_transport_modules,
48
register_lazy_transport,
49
register_transport_proto,
52
from bzrlib.transport.chroot import ChrootServer
53
from bzrlib.transport.memory import MemoryTransport
54
from bzrlib.transport.local import (LocalTransport,
55
EmulatedWin32LocalTransport)
56
from bzrlib.transport.pathfilter import PathFilteringServer
59
# TODO: Should possibly split transport-specific tests into their own files.
62
class TestTransport(TestCase):
63
"""Test the non transport-concrete class functionality."""
65
def test__get_set_protocol_handlers(self):
66
handlers = _get_protocol_handlers()
67
self.assertNotEqual([], handlers.keys( ))
69
_clear_protocol_handlers()
70
self.assertEqual([], _get_protocol_handlers().keys())
72
_set_protocol_handlers(handlers)
74
def test_get_transport_modules(self):
75
handlers = _get_protocol_handlers()
76
# don't pollute the current handlers
77
_clear_protocol_handlers()
78
class SampleHandler(object):
79
"""I exist, isnt that enough?"""
81
_clear_protocol_handlers()
82
register_transport_proto('foo')
83
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
84
'TestTransport.SampleHandler')
85
register_transport_proto('bar')
86
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
87
'TestTransport.SampleHandler')
88
self.assertEqual([SampleHandler.__module__,
89
'bzrlib.transport.chroot',
90
'bzrlib.transport.pathfilter'],
91
_get_transport_modules())
93
_set_protocol_handlers(handlers)
95
def test_transport_dependency(self):
96
"""Transport with missing dependency causes no error"""
97
saved_handlers = _get_protocol_handlers()
98
# don't pollute the current handlers
99
_clear_protocol_handlers()
101
register_transport_proto('foo')
102
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
'BadTransportHandler')
105
get_transport('foo://fooserver/foo')
106
except UnsupportedProtocol, e:
108
self.assertEquals('Unsupported protocol'
109
' for url "foo://fooserver/foo":'
110
' Unable to import library "some_lib":'
111
' testing missing dependency', str(e))
113
self.fail('Did not raise UnsupportedProtocol')
115
# restore original values
116
_set_protocol_handlers(saved_handlers)
118
def test_transport_fallback(self):
119
"""Transport with missing dependency causes no error"""
120
saved_handlers = _get_protocol_handlers()
122
_clear_protocol_handlers()
123
register_transport_proto('foo')
124
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
125
'BackupTransportHandler')
126
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
127
'BadTransportHandler')
128
t = get_transport('foo://fooserver/foo')
129
self.assertTrue(isinstance(t, BackupTransportHandler))
131
_set_protocol_handlers(saved_handlers)
133
def test_ssh_hints(self):
134
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
136
get_transport('ssh://fooserver/foo')
137
except UnsupportedProtocol, e:
139
self.assertEquals('Unsupported protocol'
140
' for url "ssh://fooserver/foo":'
141
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
144
self.fail('Did not raise UnsupportedProtocol')
146
def test_LateReadError(self):
147
"""The LateReadError helper should raise on read()."""
148
a_file = LateReadError('a path')
151
except ReadError, error:
152
self.assertEqual('a path', error.path)
153
self.assertRaises(ReadError, a_file.read, 40)
156
def test__combine_paths(self):
158
self.assertEqual('/home/sarah/project/foo',
159
t._combine_paths('/home/sarah', 'project/foo'))
160
self.assertEqual('/etc',
161
t._combine_paths('/home/sarah', '../../etc'))
162
self.assertEqual('/etc',
163
t._combine_paths('/home/sarah', '../../../etc'))
164
self.assertEqual('/etc',
165
t._combine_paths('/home/sarah', '/etc'))
167
def test_local_abspath_non_local_transport(self):
168
# the base implementation should throw
169
t = MemoryTransport()
170
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
171
self.assertEqual('memory:///t is not a local path.', str(e))
174
class TestCoalesceOffsets(TestCase):
176
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
177
coalesce = Transport._coalesce_offsets
178
exp = [_CoalescedOffset(*x) for x in expected]
179
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
181
self.assertEqual(exp, out)
183
def test_coalesce_empty(self):
186
def test_coalesce_simple(self):
187
self.check([(0, 10, [(0, 10)])], [(0, 10)])
189
def test_coalesce_unrelated(self):
190
self.check([(0, 10, [(0, 10)]),
192
], [(0, 10), (20, 10)])
194
def test_coalesce_unsorted(self):
195
self.check([(20, 10, [(0, 10)]),
197
], [(20, 10), (0, 10)])
199
def test_coalesce_nearby(self):
200
self.check([(0, 20, [(0, 10), (10, 10)])],
203
def test_coalesce_overlapped(self):
204
self.assertRaises(ValueError,
205
self.check, [(0, 15, [(0, 10), (5, 10)])],
208
def test_coalesce_limit(self):
209
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
210
(30, 10), (40, 10)]),
211
(60, 50, [(0, 10), (10, 10), (20, 10),
212
(30, 10), (40, 10)]),
213
], [(10, 10), (20, 10), (30, 10), (40, 10),
214
(50, 10), (60, 10), (70, 10), (80, 10),
215
(90, 10), (100, 10)],
218
def test_coalesce_no_limit(self):
219
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
220
(30, 10), (40, 10), (50, 10),
221
(60, 10), (70, 10), (80, 10),
223
], [(10, 10), (20, 10), (30, 10), (40, 10),
224
(50, 10), (60, 10), (70, 10), (80, 10),
225
(90, 10), (100, 10)])
227
def test_coalesce_fudge(self):
228
self.check([(10, 30, [(0, 10), (20, 10)]),
229
(100, 10, [(0, 10),]),
230
], [(10, 10), (30, 10), (100, 10)],
233
def test_coalesce_max_size(self):
234
self.check([(10, 20, [(0, 10), (10, 10)]),
236
# If one range is above max_size, it gets its own coalesced
238
(100, 80, [(0, 80),]),],
239
[(10, 10), (20, 10), (30, 50), (100, 80)],
243
def test_coalesce_no_max_size(self):
244
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
245
[(10, 10), (20, 10), (30, 50), (80, 100)],
248
def test_coalesce_default_limit(self):
249
# By default we use a 100MB max size.
250
ten_mb = 10*1024*1024
251
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
252
(10*ten_mb, ten_mb, [(0, ten_mb)])],
253
[(i*ten_mb, ten_mb) for i in range(11)])
254
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
255
[(i*ten_mb, ten_mb) for i in range(11)],
256
max_size=1*1024*1024*1024)
259
class TestMemoryTransport(TestCase):
261
def test_get_transport(self):
264
def test_clone(self):
265
transport = MemoryTransport()
266
self.assertTrue(isinstance(transport, MemoryTransport))
267
self.assertEqual("memory:///", transport.clone("/").base)
269
def test_abspath(self):
270
transport = MemoryTransport()
271
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
273
def test_abspath_of_root(self):
274
transport = MemoryTransport()
275
self.assertEqual("memory:///", transport.base)
276
self.assertEqual("memory:///", transport.abspath('/'))
278
def test_abspath_of_relpath_starting_at_root(self):
279
transport = MemoryTransport()
280
self.assertEqual("memory:///foo", transport.abspath('/foo'))
282
def test_append_and_get(self):
283
transport = MemoryTransport()
284
transport.append_bytes('path', 'content')
285
self.assertEqual(transport.get('path').read(), 'content')
286
transport.append_file('path', StringIO('content'))
287
self.assertEqual(transport.get('path').read(), 'contentcontent')
289
def test_put_and_get(self):
290
transport = MemoryTransport()
291
transport.put_file('path', StringIO('content'))
292
self.assertEqual(transport.get('path').read(), 'content')
293
transport.put_bytes('path', 'content')
294
self.assertEqual(transport.get('path').read(), 'content')
296
def test_append_without_dir_fails(self):
297
transport = MemoryTransport()
298
self.assertRaises(NoSuchFile,
299
transport.append_bytes, 'dir/path', 'content')
301
def test_put_without_dir_fails(self):
302
transport = MemoryTransport()
303
self.assertRaises(NoSuchFile,
304
transport.put_file, 'dir/path', StringIO('content'))
306
def test_get_missing(self):
307
transport = MemoryTransport()
308
self.assertRaises(NoSuchFile, transport.get, 'foo')
310
def test_has_missing(self):
311
transport = MemoryTransport()
312
self.assertEquals(False, transport.has('foo'))
314
def test_has_present(self):
315
transport = MemoryTransport()
316
transport.append_bytes('foo', 'content')
317
self.assertEquals(True, transport.has('foo'))
319
def test_list_dir(self):
320
transport = MemoryTransport()
321
transport.put_bytes('foo', 'content')
322
transport.mkdir('dir')
323
transport.put_bytes('dir/subfoo', 'content')
324
transport.put_bytes('dirlike', 'content')
326
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
327
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
329
def test_mkdir(self):
330
transport = MemoryTransport()
331
transport.mkdir('dir')
332
transport.append_bytes('dir/path', 'content')
333
self.assertEqual(transport.get('dir/path').read(), 'content')
335
def test_mkdir_missing_parent(self):
336
transport = MemoryTransport()
337
self.assertRaises(NoSuchFile,
338
transport.mkdir, 'dir/dir')
340
def test_mkdir_twice(self):
341
transport = MemoryTransport()
342
transport.mkdir('dir')
343
self.assertRaises(FileExists, transport.mkdir, 'dir')
345
def test_parameters(self):
346
transport = MemoryTransport()
347
self.assertEqual(True, transport.listable())
348
self.assertEqual(False, transport.is_readonly())
350
def test_iter_files_recursive(self):
351
transport = MemoryTransport()
352
transport.mkdir('dir')
353
transport.put_bytes('dir/foo', 'content')
354
transport.put_bytes('dir/bar', 'content')
355
transport.put_bytes('bar', 'content')
356
paths = set(transport.iter_files_recursive())
357
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
360
transport = MemoryTransport()
361
transport.put_bytes('foo', 'content')
362
transport.put_bytes('bar', 'phowar')
363
self.assertEqual(7, transport.stat('foo').st_size)
364
self.assertEqual(6, transport.stat('bar').st_size)
367
class ChrootDecoratorTransportTest(TestCase):
368
"""Chroot decoration specific tests."""
370
def test_abspath(self):
371
# The abspath is always relative to the chroot_url.
372
server = ChrootServer(get_transport('memory:///foo/bar/'))
373
self.start_server(server)
374
transport = get_transport(server.get_url())
375
self.assertEqual(server.get_url(), transport.abspath('/'))
377
subdir_transport = transport.clone('subdir')
378
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
380
def test_clone(self):
381
server = ChrootServer(get_transport('memory:///foo/bar/'))
382
self.start_server(server)
383
transport = get_transport(server.get_url())
384
# relpath from root and root path are the same
385
relpath_cloned = transport.clone('foo')
386
abspath_cloned = transport.clone('/foo')
387
self.assertEqual(server, relpath_cloned.server)
388
self.assertEqual(server, abspath_cloned.server)
390
def test_chroot_url_preserves_chroot(self):
391
"""Calling get_transport on a chroot transport's base should produce a
392
transport with exactly the same behaviour as the original chroot
395
This is so that it is not possible to escape a chroot by doing::
396
url = chroot_transport.base
397
parent_url = urlutils.join(url, '..')
398
new_transport = get_transport(parent_url)
400
server = ChrootServer(get_transport('memory:///path/subpath'))
401
self.start_server(server)
402
transport = get_transport(server.get_url())
403
new_transport = get_transport(transport.base)
404
self.assertEqual(transport.server, new_transport.server)
405
self.assertEqual(transport.base, new_transport.base)
407
def test_urljoin_preserves_chroot(self):
408
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
409
URL that escapes the intended chroot.
411
This is so that it is not possible to escape a chroot by doing::
412
url = chroot_transport.base
413
parent_url = urlutils.join(url, '..')
414
new_transport = get_transport(parent_url)
416
server = ChrootServer(get_transport('memory:///path/'))
417
self.start_server(server)
418
transport = get_transport(server.get_url())
420
InvalidURLJoin, urlutils.join, transport.base, '..')
423
class ChrootServerTest(TestCase):
425
def test_construct(self):
426
backing_transport = MemoryTransport()
427
server = ChrootServer(backing_transport)
428
self.assertEqual(backing_transport, server.backing_transport)
430
def test_setUp(self):
431
backing_transport = MemoryTransport()
432
server = ChrootServer(backing_transport)
435
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
439
def test_tearDown(self):
440
backing_transport = MemoryTransport()
441
server = ChrootServer(backing_transport)
444
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
446
def test_get_url(self):
447
backing_transport = MemoryTransport()
448
server = ChrootServer(backing_transport)
451
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
456
class PathFilteringDecoratorTransportTest(TestCase):
457
"""Pathfilter decoration specific tests."""
459
def test_abspath(self):
460
# The abspath is always relative to the base of the backing transport.
461
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
464
transport = get_transport(server.get_url())
465
self.assertEqual(server.get_url(), transport.abspath('/'))
467
subdir_transport = transport.clone('subdir')
468
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
471
def make_pf_transport(self, filter_func=None):
472
"""Make a PathFilteringTransport backed by a MemoryTransport.
474
:param filter_func: by default this will be a no-op function. Use this
475
parameter to override it."""
476
if filter_func is None:
477
filter_func = lambda x: x
478
server = PathFilteringServer(
479
get_transport('memory:///foo/bar/'), filter_func)
481
self.addCleanup(server.tearDown)
482
return get_transport(server.get_url())
484
def test__filter(self):
485
# _filter (with an identity func as filter_func) always returns
486
# paths relative to the base of the backing transport.
487
transport = self.make_pf_transport()
488
self.assertEqual('foo', transport._filter('foo'))
489
self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
self.assertEqual('', transport._filter('..'))
491
self.assertEqual('', transport._filter('/'))
492
# The base of the pathfiltering transport is taken into account too.
493
transport = transport.clone('subdir1/subdir2')
494
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
496
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
self.assertEqual('subdir1', transport._filter('..'))
498
self.assertEqual('', transport._filter('/'))
500
def test_filter_invocation(self):
503
filter_log.append(path)
505
transport = self.make_pf_transport(filter)
507
self.assertEqual(['abc'], filter_log)
509
transport.clone('abc').has('xyz')
510
self.assertEqual(['abc/xyz'], filter_log)
512
transport.has('/abc')
513
self.assertEqual(['abc'], filter_log)
515
def test_clone(self):
516
transport = self.make_pf_transport()
517
# relpath from root and root path are the same
518
relpath_cloned = transport.clone('foo')
519
abspath_cloned = transport.clone('/foo')
520
self.assertEqual(transport.server, relpath_cloned.server)
521
self.assertEqual(transport.server, abspath_cloned.server)
523
def test_url_preserves_pathfiltering(self):
524
"""Calling get_transport on a pathfiltered transport's base should
525
produce a transport with exactly the same behaviour as the original
526
pathfiltered transport.
528
This is so that it is not possible to escape (accidentally or
529
otherwise) the filtering by doing::
530
url = filtered_transport.base
531
parent_url = urlutils.join(url, '..')
532
new_transport = get_transport(parent_url)
534
transport = self.make_pf_transport()
535
new_transport = get_transport(transport.base)
536
self.assertEqual(transport.server, new_transport.server)
537
self.assertEqual(transport.base, new_transport.base)
540
class ReadonlyDecoratorTransportTest(TestCase):
541
"""Readonly decoration specific tests."""
543
def test_local_parameters(self):
544
import bzrlib.transport.readonly as readonly
545
# connect to . in readonly mode
546
transport = readonly.ReadonlyTransportDecorator('readonly+.')
547
self.assertEqual(True, transport.listable())
548
self.assertEqual(True, transport.is_readonly())
550
def test_http_parameters(self):
551
from bzrlib.tests.http_server import HttpServer
552
import bzrlib.transport.readonly as readonly
553
# connect to '.' via http which is not listable
554
server = HttpServer()
555
self.start_server(server)
556
transport = get_transport('readonly+' + server.get_url())
557
self.failUnless(isinstance(transport,
558
readonly.ReadonlyTransportDecorator))
559
self.assertEqual(False, transport.listable())
560
self.assertEqual(True, transport.is_readonly())
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
564
"""NFS decorator specific tests."""
566
def get_nfs_transport(self, url):
567
import bzrlib.transport.fakenfs as fakenfs
568
# connect to url with nfs decoration
569
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
571
def test_local_parameters(self):
572
# the listable and is_readonly parameters
573
# are not changed by the fakenfs decorator
574
transport = self.get_nfs_transport('.')
575
self.assertEqual(True, transport.listable())
576
self.assertEqual(False, transport.is_readonly())
578
def test_http_parameters(self):
579
# the listable and is_readonly parameters
580
# are not changed by the fakenfs decorator
581
from bzrlib.tests.http_server import HttpServer
582
# connect to '.' via http which is not listable
583
server = HttpServer()
584
self.start_server(server)
585
transport = self.get_nfs_transport(server.get_url())
586
self.assertIsInstance(
587
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
588
self.assertEqual(False, transport.listable())
589
self.assertEqual(True, transport.is_readonly())
591
def test_fakenfs_server_default(self):
592
# a FakeNFSServer() should bring up a local relpath server for itself
593
import bzrlib.transport.fakenfs as fakenfs
594
server = fakenfs.FakeNFSServer()
595
self.start_server(server)
596
# the url should be decorated appropriately
597
self.assertStartsWith(server.get_url(), 'fakenfs+')
598
# and we should be able to get a transport for it
599
transport = get_transport(server.get_url())
600
# which must be a FakeNFSTransportDecorator instance.
601
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
603
def test_fakenfs_rename_semantics(self):
604
# a FakeNFS transport must mangle the way rename errors occur to
605
# look like NFS problems.
606
transport = self.get_nfs_transport('.')
607
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
609
self.assertRaises(errors.ResourceBusy,
610
transport.rename, 'from', 'to')
613
class FakeVFATDecoratorTests(TestCaseInTempDir):
614
"""Tests for simulation of VFAT restrictions"""
616
def get_vfat_transport(self, url):
617
"""Return vfat-backed transport for test directory"""
618
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
619
return FakeVFATTransportDecorator('vfat+' + url)
621
def test_transport_creation(self):
622
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
623
transport = self.get_vfat_transport('.')
624
self.assertIsInstance(transport, FakeVFATTransportDecorator)
626
def test_transport_mkdir(self):
627
transport = self.get_vfat_transport('.')
628
transport.mkdir('HELLO')
629
self.assertTrue(transport.has('hello'))
630
self.assertTrue(transport.has('Hello'))
632
def test_forbidden_chars(self):
633
transport = self.get_vfat_transport('.')
634
self.assertRaises(ValueError, transport.has, "<NU>")
637
class BadTransportHandler(Transport):
638
def __init__(self, base_url):
639
raise DependencyNotPresent('some_lib', 'testing missing dependency')
642
class BackupTransportHandler(Transport):
643
"""Test transport that works as a backup for the BadTransportHandler"""
647
class TestTransportImplementation(TestCaseInTempDir):
648
"""Implementation verification for transports.
650
To verify a transport we need a server factory, which is a callable
651
that accepts no parameters and returns an implementation of
652
bzrlib.transport.Server.
654
That Server is then used to construct transport instances and test
655
the transport via loopback activity.
657
Currently this assumes that the Transport object is connected to the
658
current working directory. So that whatever is done
659
through the transport, should show up in the working
660
directory, and vice-versa. This is a bug, because its possible to have
661
URL schemes which provide access to something that may not be
662
result in storage on the local disk, i.e. due to file system limits, or
663
due to it being a database or some other non-filesystem tool.
665
This also tests to make sure that the functions work with both
666
generators and lists (assuming iter(list) is effectively a generator)
670
super(TestTransportImplementation, self).setUp()
671
self._server = self.transport_server()
672
self.start_server(self._server)
674
def get_transport(self, relpath=None):
675
"""Return a connected transport to the local directory.
677
:param relpath: a path relative to the base url.
679
base_url = self._server.get_url()
680
url = self._adjust_url(base_url, relpath)
681
# try getting the transport via the regular interface:
682
t = get_transport(url)
683
# vila--20070607 if the following are commented out the test suite
684
# still pass. Is this really still needed or was it a forgotten
686
if not isinstance(t, self.transport_class):
687
# we did not get the correct transport class type. Override the
688
# regular connection behaviour by direct construction.
689
t = self.transport_class(url)
693
class TestLocalTransports(TestCase):
695
def test_get_transport_from_abspath(self):
696
here = osutils.abspath('.')
697
t = get_transport(here)
698
self.assertIsInstance(t, LocalTransport)
699
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
701
def test_get_transport_from_relpath(self):
702
here = osutils.abspath('.')
703
t = get_transport('.')
704
self.assertIsInstance(t, LocalTransport)
705
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
707
def test_get_transport_from_local_url(self):
708
here = osutils.abspath('.')
709
here_url = urlutils.local_path_to_url(here) + '/'
710
t = get_transport(here_url)
711
self.assertIsInstance(t, LocalTransport)
712
self.assertEquals(t.base, here_url)
714
def test_local_abspath(self):
715
here = osutils.abspath('.')
716
t = get_transport(here)
717
self.assertEquals(t.local_abspath(''), here)
720
class TestWin32LocalTransport(TestCase):
722
def test_unc_clone_to_root(self):
723
# Win32 UNC path like \\HOST\path
724
# clone to root should stop at least at \\HOST part
726
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
729
self.assertEquals(t.base, 'file://HOST/')
730
# make sure we reach the root
732
self.assertEquals(t.base, 'file://HOST/')
735
class TestConnectedTransport(TestCase):
736
"""Tests for connected to remote server transports"""
738
def test_parse_url(self):
739
t = ConnectedTransport('http://simple.example.com/home/source')
740
self.assertEquals(t._host, 'simple.example.com')
741
self.assertEquals(t._port, None)
742
self.assertEquals(t._path, '/home/source/')
743
self.failUnless(t._user is None)
744
self.failUnless(t._password is None)
746
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
748
def test_parse_url_with_at_in_user(self):
750
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
751
self.assertEquals(t._user, 'user@host.com')
753
def test_parse_quoted_url(self):
754
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
755
self.assertEquals(t._host, 'exAmple.com')
756
self.assertEquals(t._port, 2222)
757
self.assertEquals(t._user, 'robey')
758
self.assertEquals(t._password, 'h@t')
759
self.assertEquals(t._path, '/path/')
761
# Base should not keep track of the password
762
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
764
def test_parse_invalid_url(self):
765
self.assertRaises(errors.InvalidURL,
767
'sftp://lily.org:~janneke/public/bzr/gub')
769
def test_relpath(self):
770
t = ConnectedTransport('sftp://user@host.com/abs/path')
772
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
773
self.assertRaises(errors.PathNotChild, t.relpath,
774
'http://user@host.com/abs/path/sub')
775
self.assertRaises(errors.PathNotChild, t.relpath,
776
'sftp://user2@host.com/abs/path/sub')
777
self.assertRaises(errors.PathNotChild, t.relpath,
778
'sftp://user@otherhost.com/abs/path/sub')
779
self.assertRaises(errors.PathNotChild, t.relpath,
780
'sftp://user@host.com:33/abs/path/sub')
781
# Make sure it works when we don't supply a username
782
t = ConnectedTransport('sftp://host.com/abs/path')
783
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
785
# Make sure it works when parts of the path will be url encoded
786
t = ConnectedTransport('sftp://host.com/dev/%path')
787
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
789
def test_connection_sharing_propagate_credentials(self):
790
t = ConnectedTransport('ftp://user@host.com/abs/path')
791
self.assertEquals('user', t._user)
792
self.assertEquals('host.com', t._host)
793
self.assertIs(None, t._get_connection())
794
self.assertIs(None, t._password)
795
c = t.clone('subdir')
796
self.assertIs(None, c._get_connection())
797
self.assertIs(None, t._password)
799
# Simulate the user entering a password
801
connection = object()
802
t._set_connection(connection, password)
803
self.assertIs(connection, t._get_connection())
804
self.assertIs(password, t._get_credentials())
805
self.assertIs(connection, c._get_connection())
806
self.assertIs(password, c._get_credentials())
808
# credentials can be updated
809
new_password = 'even more secret'
810
c._update_credentials(new_password)
811
self.assertIs(connection, t._get_connection())
812
self.assertIs(new_password, t._get_credentials())
813
self.assertIs(connection, c._get_connection())
814
self.assertIs(new_password, c._get_credentials())
817
class TestReusedTransports(TestCase):
818
"""Tests for transport reuse"""
820
def test_reuse_same_transport(self):
821
possible_transports = []
822
t1 = get_transport('http://foo/',
823
possible_transports=possible_transports)
824
self.assertEqual([t1], possible_transports)
825
t2 = get_transport('http://foo/', possible_transports=[t1])
826
self.assertIs(t1, t2)
828
# Also check that final '/' are handled correctly
829
t3 = get_transport('http://foo/path/')
830
t4 = get_transport('http://foo/path', possible_transports=[t3])
831
self.assertIs(t3, t4)
833
t5 = get_transport('http://foo/path')
834
t6 = get_transport('http://foo/path/', possible_transports=[t5])
835
self.assertIs(t5, t6)
837
def test_don_t_reuse_different_transport(self):
838
t1 = get_transport('http://foo/path')
839
t2 = get_transport('http://bar/path', possible_transports=[t1])
840
self.assertIsNot(t1, t2)
843
class TestTransportTrace(TestCase):
846
transport = get_transport('trace+memory://')
847
self.assertIsInstance(
848
transport, bzrlib.transport.trace.TransportTraceDecorator)
850
def test_clone_preserves_activity(self):
851
transport = get_transport('trace+memory://')
852
transport2 = transport.clone('.')
853
self.assertTrue(transport is not transport2)
854
self.assertTrue(transport._activity is transport2._activity)
856
# the following specific tests are for the operations that have made use of
857
# logging in tests; we could test every single operation but doing that
858
# still won't cause a test failure when the top level Transport API
859
# changes; so there is little return doing that.
861
transport = get_transport('trace+memory:///')
862
transport.put_bytes('foo', 'barish')
865
# put_bytes records the bytes, not the content to avoid memory
867
expected_result.append(('put_bytes', 'foo', 6, None))
868
# get records the file name only.
869
expected_result.append(('get', 'foo'))
870
self.assertEqual(expected_result, transport._activity)
872
def test_readv(self):
873
transport = get_transport('trace+memory:///')
874
transport.put_bytes('foo', 'barish')
875
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
878
# put_bytes records the bytes, not the content to avoid memory
880
expected_result.append(('put_bytes', 'foo', 6, None))
881
# readv records the supplied offset request
882
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
883
self.assertEqual(expected_result, transport._activity)
886
class TestSSHConnections(tests.TestCaseWithTransport):
888
def test_bzr_connect_to_bzr_ssh(self):
889
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
891
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
893
# This test actually causes a bzr instance to be invoked, which is very
894
# expensive: it should be the only such test in the test suite.
895
# A reasonable evolution for this would be to simply check inside
896
# check_channel_exec_request that the command is appropriate, and then
897
# satisfy requests in-process.
898
self.requireFeature(features.paramiko)
899
# SFTPFullAbsoluteServer has a get_url method, and doesn't
900
# override the interface (doesn't change self._vendor).
901
# Note that this does encryption, so can be slow.
902
from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
from bzrlib.tests.stub_sftp import StubServer
905
# Start an SSH server
906
self.command_executed = []
907
# XXX: This is horrible -- we define a really dumb SSH server that
908
# executes commands, and manage the hooking up of stdin/out/err to the
909
# SSH channel ourselves. Surely this has already been implemented
912
class StubSSHServer(StubServer):
916
def check_channel_exec_request(self, channel, command):
917
self.test.command_executed.append(command)
918
proc = subprocess.Popen(
919
command, shell=True, stdin=subprocess.PIPE,
920
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
922
# XXX: horribly inefficient, not to mention ugly.
923
# Start a thread for each of stdin/out/err, and relay bytes from
924
# the subprocess to channel and vice versa.
925
def ferry_bytes(read, write, close):
934
(channel.recv, proc.stdin.write, proc.stdin.close),
935
(proc.stdout.read, channel.sendall, channel.close),
936
(proc.stderr.read, channel.sendall_stderr, channel.close)]
938
for read, write, close in file_functions:
939
t = threading.Thread(
940
target=ferry_bytes, args=(read, write, close))
946
ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
947
# We *don't* want to override the default SSH vendor: the detected one
949
self.start_server(ssh_server)
950
port = ssh_server._listener.port
952
if sys.platform == 'win32':
953
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
955
bzr_remote_path = self.get_bzr_path()
956
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
958
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
959
# variable is used to tell bzr what command to run on the remote end.
960
path_to_branch = osutils.abspath('.')
961
if sys.platform == 'win32':
962
# On Windows, we export all drives as '/C:/, etc. So we need to
963
# prefix a '/' to get the right path.
964
path_to_branch = '/' + path_to_branch
965
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
966
t = get_transport(url)
967
self.permit_url(t.base)
971
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
972
self.command_executed)
973
# Make sure to disconnect, so that the remote process can stop, and we
974
# can cleanup. Then pause the test until everything is shutdown
975
t._client._medium.disconnect()
978
# First wait for the subprocess
980
# And the rest are threads
981
for t in started[1:]: