1
# Copyright (C) 2005-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
31
from bzrlib.transport import (
39
from bzrlib.tests import (
45
# TODO: Should possibly split transport-specific tests into their own files.
48
class TestTransport(tests.TestCase):
49
"""Test the non transport-concrete class functionality."""
51
# FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
# of try/finally -- vila 20100205
54
def test__get_set_protocol_handlers(self):
55
handlers = transport._get_protocol_handlers()
56
self.assertNotEqual([], handlers.keys( ))
58
transport._clear_protocol_handlers()
59
self.assertEqual([], transport._get_protocol_handlers().keys())
61
transport._set_protocol_handlers(handlers)
63
def test_get_transport_modules(self):
64
handlers = transport._get_protocol_handlers()
65
# don't pollute the current handlers
66
transport._clear_protocol_handlers()
67
class SampleHandler(object):
68
"""I exist, isnt that enough?"""
70
transport._clear_protocol_handlers()
71
transport.register_transport_proto('foo')
72
transport.register_lazy_transport('foo',
73
'bzrlib.tests.test_transport',
74
'TestTransport.SampleHandler')
75
transport.register_transport_proto('bar')
76
transport.register_lazy_transport('bar',
77
'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
self.assertEqual([SampleHandler.__module__,
80
'bzrlib.transport.chroot',
81
'bzrlib.transport.pathfilter'],
82
transport._get_transport_modules())
84
transport._set_protocol_handlers(handlers)
86
def test_transport_dependency(self):
87
"""Transport with missing dependency causes no error"""
88
saved_handlers = transport._get_protocol_handlers()
89
# don't pollute the current handlers
90
transport._clear_protocol_handlers()
92
transport.register_transport_proto('foo')
93
transport.register_lazy_transport(
94
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
transport.get_transport('foo://fooserver/foo')
97
except errors.UnsupportedProtocol, e:
99
self.assertEquals('Unsupported protocol'
100
' for url "foo://fooserver/foo":'
101
' Unable to import library "some_lib":'
102
' testing missing dependency', str(e))
104
self.fail('Did not raise UnsupportedProtocol')
106
# restore original values
107
transport._set_protocol_handlers(saved_handlers)
109
def test_transport_fallback(self):
110
"""Transport with missing dependency causes no error"""
111
saved_handlers = transport._get_protocol_handlers()
113
transport._clear_protocol_handlers()
114
transport.register_transport_proto('foo')
115
transport.register_lazy_transport(
116
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
transport.register_lazy_transport(
118
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
t = transport.get_transport('foo://fooserver/foo')
120
self.assertTrue(isinstance(t, BackupTransportHandler))
122
transport._set_protocol_handlers(saved_handlers)
124
def test_ssh_hints(self):
125
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
transport.get_transport('ssh://fooserver/foo')
128
except errors.UnsupportedProtocol, e:
130
self.assertEquals('Unsupported protocol'
131
' for url "ssh://fooserver/foo":'
132
' bzr supports bzr+ssh to operate over ssh,'
133
' use "bzr+ssh://fooserver/foo".',
136
self.fail('Did not raise UnsupportedProtocol')
138
def test_LateReadError(self):
139
"""The LateReadError helper should raise on read()."""
140
a_file = transport.LateReadError('a path')
143
except errors.ReadError, error:
144
self.assertEqual('a path', error.path)
145
self.assertRaises(errors.ReadError, a_file.read, 40)
148
def test__combine_paths(self):
149
t = transport.Transport('/')
150
self.assertEqual('/home/sarah/project/foo',
151
t._combine_paths('/home/sarah', 'project/foo'))
152
self.assertEqual('/etc',
153
t._combine_paths('/home/sarah', '../../etc'))
154
self.assertEqual('/etc',
155
t._combine_paths('/home/sarah', '../../../etc'))
156
self.assertEqual('/etc',
157
t._combine_paths('/home/sarah', '/etc'))
159
def test_local_abspath_non_local_transport(self):
160
# the base implementation should throw
161
t = memory.MemoryTransport()
162
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
163
self.assertEqual('memory:///t is not a local path.', str(e))
166
class TestCoalesceOffsets(tests.TestCase):
168
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
169
coalesce = transport.Transport._coalesce_offsets
170
exp = [transport._CoalescedOffset(*x) for x in expected]
171
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
self.assertEqual(exp, out)
175
def test_coalesce_empty(self):
178
def test_coalesce_simple(self):
179
self.check([(0, 10, [(0, 10)])], [(0, 10)])
181
def test_coalesce_unrelated(self):
182
self.check([(0, 10, [(0, 10)]),
184
], [(0, 10), (20, 10)])
186
def test_coalesce_unsorted(self):
187
self.check([(20, 10, [(0, 10)]),
189
], [(20, 10), (0, 10)])
191
def test_coalesce_nearby(self):
192
self.check([(0, 20, [(0, 10), (10, 10)])],
195
def test_coalesce_overlapped(self):
196
self.assertRaises(ValueError,
197
self.check, [(0, 15, [(0, 10), (5, 10)])],
200
def test_coalesce_limit(self):
201
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
202
(30, 10), (40, 10)]),
203
(60, 50, [(0, 10), (10, 10), (20, 10),
204
(30, 10), (40, 10)]),
205
], [(10, 10), (20, 10), (30, 10), (40, 10),
206
(50, 10), (60, 10), (70, 10), (80, 10),
207
(90, 10), (100, 10)],
210
def test_coalesce_no_limit(self):
211
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
212
(30, 10), (40, 10), (50, 10),
213
(60, 10), (70, 10), (80, 10),
215
], [(10, 10), (20, 10), (30, 10), (40, 10),
216
(50, 10), (60, 10), (70, 10), (80, 10),
217
(90, 10), (100, 10)])
219
def test_coalesce_fudge(self):
220
self.check([(10, 30, [(0, 10), (20, 10)]),
221
(100, 10, [(0, 10),]),
222
], [(10, 10), (30, 10), (100, 10)],
225
def test_coalesce_max_size(self):
226
self.check([(10, 20, [(0, 10), (10, 10)]),
228
# If one range is above max_size, it gets its own coalesced
230
(100, 80, [(0, 80),]),],
231
[(10, 10), (20, 10), (30, 50), (100, 80)],
235
def test_coalesce_no_max_size(self):
236
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
237
[(10, 10), (20, 10), (30, 50), (80, 100)],
240
def test_coalesce_default_limit(self):
241
# By default we use a 100MB max size.
242
ten_mb = 10*1024*1024
243
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
244
(10*ten_mb, ten_mb, [(0, ten_mb)])],
245
[(i*ten_mb, ten_mb) for i in range(11)])
246
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
[(i*ten_mb, ten_mb) for i in range(11)],
248
max_size=1*1024*1024*1024)
251
class TestMemoryServer(tests.TestCase):
253
def test_create_server(self):
254
server = memory.MemoryServer()
255
server.start_server()
256
url = server.get_url()
257
self.assertTrue(url in transport.transport_list_registry)
258
t = transport.get_transport(url)
261
self.assertFalse(url in transport.transport_list_registry)
262
self.assertRaises(errors.UnsupportedProtocol,
263
transport.get_transport, url)
266
class TestMemoryTransport(tests.TestCase):
268
def test_get_transport(self):
269
memory.MemoryTransport()
271
def test_clone(self):
272
t = memory.MemoryTransport()
273
self.assertTrue(isinstance(t, memory.MemoryTransport))
274
self.assertEqual("memory:///", t.clone("/").base)
276
def test_abspath(self):
277
t = memory.MemoryTransport()
278
self.assertEqual("memory:///relpath", t.abspath('relpath'))
280
def test_abspath_of_root(self):
281
t = memory.MemoryTransport()
282
self.assertEqual("memory:///", t.base)
283
self.assertEqual("memory:///", t.abspath('/'))
285
def test_abspath_of_relpath_starting_at_root(self):
286
t = memory.MemoryTransport()
287
self.assertEqual("memory:///foo", t.abspath('/foo'))
289
def test_append_and_get(self):
290
t = memory.MemoryTransport()
291
t.append_bytes('path', 'content')
292
self.assertEqual(t.get('path').read(), 'content')
293
t.append_file('path', StringIO('content'))
294
self.assertEqual(t.get('path').read(), 'contentcontent')
296
def test_put_and_get(self):
297
t = memory.MemoryTransport()
298
t.put_file('path', StringIO('content'))
299
self.assertEqual(t.get('path').read(), 'content')
300
t.put_bytes('path', 'content')
301
self.assertEqual(t.get('path').read(), 'content')
303
def test_append_without_dir_fails(self):
304
t = memory.MemoryTransport()
305
self.assertRaises(errors.NoSuchFile,
306
t.append_bytes, 'dir/path', 'content')
308
def test_put_without_dir_fails(self):
309
t = memory.MemoryTransport()
310
self.assertRaises(errors.NoSuchFile,
311
t.put_file, 'dir/path', StringIO('content'))
313
def test_get_missing(self):
314
transport = memory.MemoryTransport()
315
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
317
def test_has_missing(self):
318
t = memory.MemoryTransport()
319
self.assertEquals(False, t.has('foo'))
321
def test_has_present(self):
322
t = memory.MemoryTransport()
323
t.append_bytes('foo', 'content')
324
self.assertEquals(True, t.has('foo'))
326
def test_list_dir(self):
327
t = memory.MemoryTransport()
328
t.put_bytes('foo', 'content')
330
t.put_bytes('dir/subfoo', 'content')
331
t.put_bytes('dirlike', 'content')
333
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
336
def test_mkdir(self):
337
t = memory.MemoryTransport()
339
t.append_bytes('dir/path', 'content')
340
self.assertEqual(t.get('dir/path').read(), 'content')
342
def test_mkdir_missing_parent(self):
343
t = memory.MemoryTransport()
344
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
346
def test_mkdir_twice(self):
347
t = memory.MemoryTransport()
349
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
351
def test_parameters(self):
352
t = memory.MemoryTransport()
353
self.assertEqual(True, t.listable())
354
self.assertEqual(False, t.is_readonly())
356
def test_iter_files_recursive(self):
357
t = memory.MemoryTransport()
359
t.put_bytes('dir/foo', 'content')
360
t.put_bytes('dir/bar', 'content')
361
t.put_bytes('bar', 'content')
362
paths = set(t.iter_files_recursive())
363
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
366
t = memory.MemoryTransport()
367
t.put_bytes('foo', 'content')
368
t.put_bytes('bar', 'phowar')
369
self.assertEqual(7, t.stat('foo').st_size)
370
self.assertEqual(6, t.stat('bar').st_size)
373
class ChrootDecoratorTransportTest(tests.TestCase):
374
"""Chroot decoration specific tests."""
376
def test_abspath(self):
377
# The abspath is always relative to the chroot_url.
378
server = chroot.ChrootServer(
379
transport.get_transport('memory:///foo/bar/'))
380
self.start_server(server)
381
t = transport.get_transport(server.get_url())
382
self.assertEqual(server.get_url(), t.abspath('/'))
384
subdir_t = t.clone('subdir')
385
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
387
def test_clone(self):
388
server = chroot.ChrootServer(
389
transport.get_transport('memory:///foo/bar/'))
390
self.start_server(server)
391
t = transport.get_transport(server.get_url())
392
# relpath from root and root path are the same
393
relpath_cloned = t.clone('foo')
394
abspath_cloned = t.clone('/foo')
395
self.assertEqual(server, relpath_cloned.server)
396
self.assertEqual(server, abspath_cloned.server)
398
def test_chroot_url_preserves_chroot(self):
399
"""Calling get_transport on a chroot transport's base should produce a
400
transport with exactly the same behaviour as the original chroot
403
This is so that it is not possible to escape a chroot by doing::
404
url = chroot_transport.base
405
parent_url = urlutils.join(url, '..')
406
new_t = transport.get_transport(parent_url)
408
server = chroot.ChrootServer(
409
transport.get_transport('memory:///path/subpath'))
410
self.start_server(server)
411
t = transport.get_transport(server.get_url())
412
new_t = transport.get_transport(t.base)
413
self.assertEqual(t.server, new_t.server)
414
self.assertEqual(t.base, new_t.base)
416
def test_urljoin_preserves_chroot(self):
417
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
418
URL that escapes the intended chroot.
420
This is so that it is not possible to escape a chroot by doing::
421
url = chroot_transport.base
422
parent_url = urlutils.join(url, '..')
423
new_t = transport.get_transport(parent_url)
425
server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
426
self.start_server(server)
427
t = transport.get_transport(server.get_url())
429
errors.InvalidURLJoin, urlutils.join, t.base, '..')
432
class TestChrootServer(tests.TestCase):
434
def test_construct(self):
435
backing_transport = memory.MemoryTransport()
436
server = chroot.ChrootServer(backing_transport)
437
self.assertEqual(backing_transport, server.backing_transport)
439
def test_setUp(self):
440
backing_transport = memory.MemoryTransport()
441
server = chroot.ChrootServer(backing_transport)
442
server.start_server()
444
self.assertTrue(server.scheme
445
in transport._get_protocol_handlers().keys())
449
def test_stop_server(self):
450
backing_transport = memory.MemoryTransport()
451
server = chroot.ChrootServer(backing_transport)
452
server.start_server()
454
self.assertFalse(server.scheme
455
in transport._get_protocol_handlers().keys())
457
def test_get_url(self):
458
backing_transport = memory.MemoryTransport()
459
server = chroot.ChrootServer(backing_transport)
460
server.start_server()
462
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
467
class TestHooks(tests.TestCase):
468
"""Basic tests for transport hooks"""
470
def _get_connected_transport(self):
471
return transport.ConnectedTransport("bogus:nowhere")
473
def test_transporthooks_initialisation(self):
474
"""Check all expected transport hook points are set up"""
475
hookpoint = transport.TransportHooks()
476
self.assertTrue("post_connect" in hookpoint,
477
"post_connect not in %s" % (hookpoint,))
479
def test_post_connect(self):
480
"""Ensure the post_connect hook is called when _set_transport is"""
482
transport.Transport.hooks.install_named_hook("post_connect",
484
t = self._get_connected_transport()
485
self.assertLength(0, calls)
486
t._set_connection("connection", "auth")
487
self.assertEqual(calls, [t])
490
class PathFilteringDecoratorTransportTest(tests.TestCase):
491
"""Pathfilter decoration specific tests."""
493
def test_abspath(self):
494
# The abspath is always relative to the base of the backing transport.
495
server = pathfilter.PathFilteringServer(
496
transport.get_transport('memory:///foo/bar/'),
498
server.start_server()
499
t = transport.get_transport(server.get_url())
500
self.assertEqual(server.get_url(), t.abspath('/'))
502
subdir_t = t.clone('subdir')
503
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
506
def make_pf_transport(self, filter_func=None):
507
"""Make a PathFilteringTransport backed by a MemoryTransport.
509
:param filter_func: by default this will be a no-op function. Use this
510
parameter to override it."""
511
if filter_func is None:
512
filter_func = lambda x: x
513
server = pathfilter.PathFilteringServer(
514
transport.get_transport('memory:///foo/bar/'), filter_func)
515
server.start_server()
516
self.addCleanup(server.stop_server)
517
return transport.get_transport(server.get_url())
519
def test__filter(self):
520
# _filter (with an identity func as filter_func) always returns
521
# paths relative to the base of the backing transport.
522
t = self.make_pf_transport()
523
self.assertEqual('foo', t._filter('foo'))
524
self.assertEqual('foo/bar', t._filter('foo/bar'))
525
self.assertEqual('', t._filter('..'))
526
self.assertEqual('', t._filter('/'))
527
# The base of the pathfiltering transport is taken into account too.
528
t = t.clone('subdir1/subdir2')
529
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
530
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
531
self.assertEqual('subdir1', t._filter('..'))
532
self.assertEqual('', t._filter('/'))
534
def test_filter_invocation(self):
537
filter_log.append(path)
539
t = self.make_pf_transport(filter)
541
self.assertEqual(['abc'], filter_log)
543
t.clone('abc').has('xyz')
544
self.assertEqual(['abc/xyz'], filter_log)
547
self.assertEqual(['abc'], filter_log)
549
def test_clone(self):
550
t = self.make_pf_transport()
551
# relpath from root and root path are the same
552
relpath_cloned = t.clone('foo')
553
abspath_cloned = t.clone('/foo')
554
self.assertEqual(t.server, relpath_cloned.server)
555
self.assertEqual(t.server, abspath_cloned.server)
557
def test_url_preserves_pathfiltering(self):
558
"""Calling get_transport on a pathfiltered transport's base should
559
produce a transport with exactly the same behaviour as the original
560
pathfiltered transport.
562
This is so that it is not possible to escape (accidentally or
563
otherwise) the filtering by doing::
564
url = filtered_transport.base
565
parent_url = urlutils.join(url, '..')
566
new_t = transport.get_transport(parent_url)
568
t = self.make_pf_transport()
569
new_t = transport.get_transport(t.base)
570
self.assertEqual(t.server, new_t.server)
571
self.assertEqual(t.base, new_t.base)
574
class ReadonlyDecoratorTransportTest(tests.TestCase):
575
"""Readonly decoration specific tests."""
577
def test_local_parameters(self):
578
# connect to . in readonly mode
579
t = readonly.ReadonlyTransportDecorator('readonly+.')
580
self.assertEqual(True, t.listable())
581
self.assertEqual(True, t.is_readonly())
583
def test_http_parameters(self):
584
from bzrlib.tests.http_server import HttpServer
585
# connect to '.' via http which is not listable
586
server = HttpServer()
587
self.start_server(server)
588
t = transport.get_transport('readonly+' + server.get_url())
589
self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
590
self.assertEqual(False, t.listable())
591
self.assertEqual(True, t.is_readonly())
594
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
595
"""NFS decorator specific tests."""
597
def get_nfs_transport(self, url):
598
# connect to url with nfs decoration
599
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
601
def test_local_parameters(self):
602
# the listable and is_readonly parameters
603
# are not changed by the fakenfs decorator
604
t = self.get_nfs_transport('.')
605
self.assertEqual(True, t.listable())
606
self.assertEqual(False, t.is_readonly())
608
def test_http_parameters(self):
609
# the listable and is_readonly parameters
610
# are not changed by the fakenfs decorator
611
from bzrlib.tests.http_server import HttpServer
612
# connect to '.' via http which is not listable
613
server = HttpServer()
614
self.start_server(server)
615
t = self.get_nfs_transport(server.get_url())
616
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
617
self.assertEqual(False, t.listable())
618
self.assertEqual(True, t.is_readonly())
620
def test_fakenfs_server_default(self):
621
# a FakeNFSServer() should bring up a local relpath server for itself
622
server = test_server.FakeNFSServer()
623
self.start_server(server)
624
# the url should be decorated appropriately
625
self.assertStartsWith(server.get_url(), 'fakenfs+')
626
# and we should be able to get a transport for it
627
t = transport.get_transport(server.get_url())
628
# which must be a FakeNFSTransportDecorator instance.
629
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
631
def test_fakenfs_rename_semantics(self):
632
# a FakeNFS transport must mangle the way rename errors occur to
633
# look like NFS problems.
634
t = self.get_nfs_transport('.')
635
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
637
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
640
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
641
"""Tests for simulation of VFAT restrictions"""
643
def get_vfat_transport(self, url):
644
"""Return vfat-backed transport for test directory"""
645
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
646
return FakeVFATTransportDecorator('vfat+' + url)
648
def test_transport_creation(self):
649
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
650
t = self.get_vfat_transport('.')
651
self.assertIsInstance(t, FakeVFATTransportDecorator)
653
def test_transport_mkdir(self):
654
t = self.get_vfat_transport('.')
656
self.assertTrue(t.has('hello'))
657
self.assertTrue(t.has('Hello'))
659
def test_forbidden_chars(self):
660
t = self.get_vfat_transport('.')
661
self.assertRaises(ValueError, t.has, "<NU>")
664
class BadTransportHandler(transport.Transport):
665
def __init__(self, base_url):
666
raise errors.DependencyNotPresent('some_lib',
667
'testing missing dependency')
670
class BackupTransportHandler(transport.Transport):
671
"""Test transport that works as a backup for the BadTransportHandler"""
675
class TestTransportImplementation(tests.TestCaseInTempDir):
676
"""Implementation verification for transports.
678
To verify a transport we need a server factory, which is a callable
679
that accepts no parameters and returns an implementation of
680
bzrlib.transport.Server.
682
That Server is then used to construct transport instances and test
683
the transport via loopback activity.
685
Currently this assumes that the Transport object is connected to the
686
current working directory. So that whatever is done
687
through the transport, should show up in the working
688
directory, and vice-versa. This is a bug, because its possible to have
689
URL schemes which provide access to something that may not be
690
result in storage on the local disk, i.e. due to file system limits, or
691
due to it being a database or some other non-filesystem tool.
693
This also tests to make sure that the functions work with both
694
generators and lists (assuming iter(list) is effectively a generator)
698
super(TestTransportImplementation, self).setUp()
699
self._server = self.transport_server()
700
self.start_server(self._server)
702
def get_transport(self, relpath=None):
703
"""Return a connected transport to the local directory.
705
:param relpath: a path relative to the base url.
707
base_url = self._server.get_url()
708
url = self._adjust_url(base_url, relpath)
709
# try getting the transport via the regular interface:
710
t = transport.get_transport(url)
711
# vila--20070607 if the following are commented out the test suite
712
# still pass. Is this really still needed or was it a forgotten
714
if not isinstance(t, self.transport_class):
715
# we did not get the correct transport class type. Override the
716
# regular connection behaviour by direct construction.
717
t = self.transport_class(url)
721
class TestLocalTransports(tests.TestCase):
723
def test_get_transport_from_abspath(self):
724
here = osutils.abspath('.')
725
t = transport.get_transport(here)
726
self.assertIsInstance(t, local.LocalTransport)
727
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
729
def test_get_transport_from_relpath(self):
730
here = osutils.abspath('.')
731
t = transport.get_transport('.')
732
self.assertIsInstance(t, local.LocalTransport)
733
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
735
def test_get_transport_from_local_url(self):
736
here = osutils.abspath('.')
737
here_url = urlutils.local_path_to_url(here) + '/'
738
t = transport.get_transport(here_url)
739
self.assertIsInstance(t, local.LocalTransport)
740
self.assertEquals(t.base, here_url)
742
def test_local_abspath(self):
743
here = osutils.abspath('.')
744
t = transport.get_transport(here)
745
self.assertEquals(t.local_abspath(''), here)
748
class TestWin32LocalTransport(tests.TestCase):
750
def test_unc_clone_to_root(self):
751
# Win32 UNC path like \\HOST\path
752
# clone to root should stop at least at \\HOST part
754
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
757
self.assertEquals(t.base, 'file://HOST/')
758
# make sure we reach the root
760
self.assertEquals(t.base, 'file://HOST/')
763
class TestConnectedTransport(tests.TestCase):
764
"""Tests for connected to remote server transports"""
766
def test_parse_url(self):
767
t = transport.ConnectedTransport(
768
'http://simple.example.com/home/source')
769
self.assertEquals(t._host, 'simple.example.com')
770
self.assertEquals(t._port, None)
771
self.assertEquals(t._path, '/home/source/')
772
self.failUnless(t._user is None)
773
self.failUnless(t._password is None)
775
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
777
def test_parse_url_with_at_in_user(self):
779
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
780
self.assertEquals(t._user, 'user@host.com')
782
def test_parse_quoted_url(self):
783
t = transport.ConnectedTransport(
784
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
785
self.assertEquals(t._host, 'exAmple.com')
786
self.assertEquals(t._port, 2222)
787
self.assertEquals(t._user, 'robey')
788
self.assertEquals(t._password, 'h@t')
789
self.assertEquals(t._path, '/path/')
791
# Base should not keep track of the password
792
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
794
def test_parse_invalid_url(self):
795
self.assertRaises(errors.InvalidURL,
796
transport.ConnectedTransport,
797
'sftp://lily.org:~janneke/public/bzr/gub')
799
def test_relpath(self):
800
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
802
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
803
self.assertRaises(errors.PathNotChild, t.relpath,
804
'http://user@host.com/abs/path/sub')
805
self.assertRaises(errors.PathNotChild, t.relpath,
806
'sftp://user2@host.com/abs/path/sub')
807
self.assertRaises(errors.PathNotChild, t.relpath,
808
'sftp://user@otherhost.com/abs/path/sub')
809
self.assertRaises(errors.PathNotChild, t.relpath,
810
'sftp://user@host.com:33/abs/path/sub')
811
# Make sure it works when we don't supply a username
812
t = transport.ConnectedTransport('sftp://host.com/abs/path')
813
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
815
# Make sure it works when parts of the path will be url encoded
816
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
817
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
819
def test_connection_sharing_propagate_credentials(self):
820
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
821
self.assertEquals('user', t._user)
822
self.assertEquals('host.com', t._host)
823
self.assertIs(None, t._get_connection())
824
self.assertIs(None, t._password)
825
c = t.clone('subdir')
826
self.assertIs(None, c._get_connection())
827
self.assertIs(None, t._password)
829
# Simulate the user entering a password
831
connection = object()
832
t._set_connection(connection, password)
833
self.assertIs(connection, t._get_connection())
834
self.assertIs(password, t._get_credentials())
835
self.assertIs(connection, c._get_connection())
836
self.assertIs(password, c._get_credentials())
838
# credentials can be updated
839
new_password = 'even more secret'
840
c._update_credentials(new_password)
841
self.assertIs(connection, t._get_connection())
842
self.assertIs(new_password, t._get_credentials())
843
self.assertIs(connection, c._get_connection())
844
self.assertIs(new_password, c._get_credentials())
847
class TestReusedTransports(tests.TestCase):
848
"""Tests for transport reuse"""
850
def test_reuse_same_transport(self):
851
possible_transports = []
852
t1 = transport.get_transport('http://foo/',
853
possible_transports=possible_transports)
854
self.assertEqual([t1], possible_transports)
855
t2 = transport.get_transport('http://foo/',
856
possible_transports=[t1])
857
self.assertIs(t1, t2)
859
# Also check that final '/' are handled correctly
860
t3 = transport.get_transport('http://foo/path/')
861
t4 = transport.get_transport('http://foo/path',
862
possible_transports=[t3])
863
self.assertIs(t3, t4)
865
t5 = transport.get_transport('http://foo/path')
866
t6 = transport.get_transport('http://foo/path/',
867
possible_transports=[t5])
868
self.assertIs(t5, t6)
870
def test_don_t_reuse_different_transport(self):
871
t1 = transport.get_transport('http://foo/path')
872
t2 = transport.get_transport('http://bar/path',
873
possible_transports=[t1])
874
self.assertIsNot(t1, t2)
877
class TestTransportTrace(tests.TestCase):
880
t = transport.get_transport('trace+memory://')
881
self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
883
def test_clone_preserves_activity(self):
884
t = transport.get_transport('trace+memory://')
886
self.assertTrue(t is not t2)
887
self.assertTrue(t._activity is t2._activity)
889
# the following specific tests are for the operations that have made use of
890
# logging in tests; we could test every single operation but doing that
891
# still won't cause a test failure when the top level Transport API
892
# changes; so there is little return doing that.
894
t = transport.get_transport('trace+memory:///')
895
t.put_bytes('foo', 'barish')
898
# put_bytes records the bytes, not the content to avoid memory
900
expected_result.append(('put_bytes', 'foo', 6, None))
901
# get records the file name only.
902
expected_result.append(('get', 'foo'))
903
self.assertEqual(expected_result, t._activity)
905
def test_readv(self):
906
t = transport.get_transport('trace+memory:///')
907
t.put_bytes('foo', 'barish')
908
list(t.readv('foo', [(0, 1), (3, 2)],
909
adjust_for_latency=True, upper_limit=6))
911
# put_bytes records the bytes, not the content to avoid memory
913
expected_result.append(('put_bytes', 'foo', 6, None))
914
# readv records the supplied offset request
915
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
916
self.assertEqual(expected_result, t._activity)
919
class TestSSHConnections(tests.TestCaseWithTransport):
921
def test_bzr_connect_to_bzr_ssh(self):
922
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
924
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
926
# This test actually causes a bzr instance to be invoked, which is very
927
# expensive: it should be the only such test in the test suite.
928
# A reasonable evolution for this would be to simply check inside
929
# check_channel_exec_request that the command is appropriate, and then
930
# satisfy requests in-process.
931
self.requireFeature(features.paramiko)
932
# SFTPFullAbsoluteServer has a get_url method, and doesn't
933
# override the interface (doesn't change self._vendor).
934
# Note that this does encryption, so can be slow.
935
from bzrlib.tests import stub_sftp
937
# Start an SSH server
938
self.command_executed = []
939
# XXX: This is horrible -- we define a really dumb SSH server that
940
# executes commands, and manage the hooking up of stdin/out/err to the
941
# SSH channel ourselves. Surely this has already been implemented
944
class StubSSHServer(stub_sftp.StubServer):
948
def check_channel_exec_request(self, channel, command):
949
self.test.command_executed.append(command)
950
proc = subprocess.Popen(
951
command, shell=True, stdin=subprocess.PIPE,
952
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
954
# XXX: horribly inefficient, not to mention ugly.
955
# Start a thread for each of stdin/out/err, and relay bytes from
956
# the subprocess to channel and vice versa.
957
def ferry_bytes(read, write, close):
966
(channel.recv, proc.stdin.write, proc.stdin.close),
967
(proc.stdout.read, channel.sendall, channel.close),
968
(proc.stderr.read, channel.sendall_stderr, channel.close)]
970
for read, write, close in file_functions:
971
t = threading.Thread(
972
target=ferry_bytes, args=(read, write, close))
978
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
979
# We *don't* want to override the default SSH vendor: the detected one
982
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
983
# inherits from SFTPServer which forces the SSH vendor to
984
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
985
self.start_server(ssh_server)
986
port = ssh_server.port
988
if sys.platform == 'win32':
989
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
991
bzr_remote_path = self.get_bzr_path()
992
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
994
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
995
# variable is used to tell bzr what command to run on the remote end.
996
path_to_branch = osutils.abspath('.')
997
if sys.platform == 'win32':
998
# On Windows, we export all drives as '/C:/, etc. So we need to
999
# prefix a '/' to get the right path.
1000
path_to_branch = '/' + path_to_branch
1001
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1002
t = transport.get_transport(url)
1003
self.permit_url(t.base)
1007
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1008
self.command_executed)
1009
# Make sure to disconnect, so that the remote process can stop, and we
1010
# can cleanup. Then pause the test until everything is shutdown
1011
t._client._medium.disconnect()
1014
# First wait for the subprocess
1016
# And the rest are threads
1017
for t in started[1:]: