13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
21
from cStringIO import StringIO
24
from bzrlib import urlutils
25
from bzrlib.errors import (NoSuchFile, FileExists,
32
from bzrlib.tests import TestCase, TestCaseInTempDir
33
from bzrlib.transport import (_CoalescedOffset,
34
_get_protocol_handlers,
35
_get_transport_modules,
37
register_lazy_transport,
38
_set_protocol_handlers,
41
from bzrlib.transport.memory import MemoryTransport
42
from bzrlib.transport.local import (LocalTransport,
43
EmulatedWin32LocalTransport)
32
from bzrlib.directory_service import directories
33
from bzrlib.transport import (
43
import bzrlib.transport.trace
44
from bzrlib.tests import (
46
50
# TODO: Should possibly split transport-specific tests into their own files.
49
class TestTransport(TestCase):
53
class TestTransport(tests.TestCase):
50
54
"""Test the non transport-concrete class functionality."""
52
56
def test__get_set_protocol_handlers(self):
53
handlers = _get_protocol_handlers()
54
self.assertNotEqual({}, handlers)
56
_set_protocol_handlers({})
57
self.assertEqual({}, _get_protocol_handlers())
59
_set_protocol_handlers(handlers)
57
handlers = transport._get_protocol_handlers()
58
self.assertNotEqual([], handlers.keys())
59
transport._clear_protocol_handlers()
60
self.addCleanup(transport._set_protocol_handlers, handlers)
61
self.assertEqual([], transport._get_protocol_handlers().keys())
61
63
def test_get_transport_modules(self):
62
handlers = _get_protocol_handlers()
64
handlers = transport._get_protocol_handlers()
65
self.addCleanup(transport._set_protocol_handlers, handlers)
66
# don't pollute the current handlers
67
transport._clear_protocol_handlers()
63
69
class SampleHandler(object):
64
70
"""I exist, isnt that enough?"""
67
_set_protocol_handlers(my_handlers)
68
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
69
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
70
self.assertEqual([SampleHandler.__module__],
71
_get_transport_modules())
73
_set_protocol_handlers(handlers)
71
transport._clear_protocol_handlers()
72
transport.register_transport_proto('foo')
73
transport.register_lazy_transport('foo',
74
'bzrlib.tests.test_transport',
75
'TestTransport.SampleHandler')
76
transport.register_transport_proto('bar')
77
transport.register_lazy_transport('bar',
78
'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
self.assertEqual([SampleHandler.__module__,
81
'bzrlib.transport.chroot',
82
'bzrlib.transport.pathfilter'],
83
transport._get_transport_modules())
75
85
def test_transport_dependency(self):
76
86
"""Transport with missing dependency causes no error"""
77
saved_handlers = _get_protocol_handlers()
87
saved_handlers = transport._get_protocol_handlers()
88
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
# don't pollute the current handlers
90
transport._clear_protocol_handlers()
91
transport.register_transport_proto('foo')
92
transport.register_lazy_transport(
93
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
79
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
80
'BadTransportHandler')
82
get_transport('foo://fooserver/foo')
83
except UnsupportedProtocol, e:
85
self.assertEquals('Unsupported protocol'
86
' for url "foo://fooserver/foo":'
87
' Unable to import library "some_lib":'
88
' testing missing dependency', str(e))
90
self.fail('Did not raise UnsupportedProtocol')
92
# restore original values
93
_set_protocol_handlers(saved_handlers)
95
transport.get_transport_from_url('foo://fooserver/foo')
96
except errors.UnsupportedProtocol, e:
98
self.assertEquals('Unsupported protocol'
99
' for url "foo://fooserver/foo":'
100
' Unable to import library "some_lib":'
101
' testing missing dependency', str(e))
103
self.fail('Did not raise UnsupportedProtocol')
95
105
def test_transport_fallback(self):
96
106
"""Transport with missing dependency causes no error"""
97
saved_handlers = _get_protocol_handlers()
99
_set_protocol_handlers({})
100
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
101
'BackupTransportHandler')
102
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
'BadTransportHandler')
104
t = get_transport('foo://fooserver/foo')
105
self.assertTrue(isinstance(t, BackupTransportHandler))
107
_set_protocol_handlers(saved_handlers)
109
def test__combine_paths(self):
111
self.assertEqual('/home/sarah/project/foo',
112
t._combine_paths('/home/sarah', 'project/foo'))
113
self.assertEqual('/etc',
114
t._combine_paths('/home/sarah', '../../etc'))
115
self.assertEqual('/etc',
116
t._combine_paths('/home/sarah', '../../../etc'))
117
self.assertEqual('/etc',
118
t._combine_paths('/home/sarah', '/etc'))
121
class TestCoalesceOffsets(TestCase):
123
def check(self, expected, offsets, limit=0, fudge=0):
124
coalesce = Transport._coalesce_offsets
125
exp = [_CoalescedOffset(*x) for x in expected]
126
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
107
saved_handlers = transport._get_protocol_handlers()
108
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
109
transport._clear_protocol_handlers()
110
transport.register_transport_proto('foo')
111
transport.register_lazy_transport(
112
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
113
transport.register_lazy_transport(
114
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
115
t = transport.get_transport_from_url('foo://fooserver/foo')
116
self.assertTrue(isinstance(t, BackupTransportHandler))
118
def test_ssh_hints(self):
119
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
121
transport.get_transport_from_url('ssh://fooserver/foo')
122
except errors.UnsupportedProtocol, e:
124
self.assertEquals('Unsupported protocol'
125
' for url "ssh://fooserver/foo":'
126
' bzr supports bzr+ssh to operate over ssh,'
127
' use "bzr+ssh://fooserver/foo".',
130
self.fail('Did not raise UnsupportedProtocol')
132
def test_LateReadError(self):
133
"""The LateReadError helper should raise on read()."""
134
a_file = transport.LateReadError('a path')
137
except errors.ReadError, error:
138
self.assertEqual('a path', error.path)
139
self.assertRaises(errors.ReadError, a_file.read, 40)
142
def test_local_abspath_non_local_transport(self):
143
# the base implementation should throw
144
t = memory.MemoryTransport()
145
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
146
self.assertEqual('memory:///t is not a local path.', str(e))
149
class TestCoalesceOffsets(tests.TestCase):
151
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
152
coalesce = transport.Transport._coalesce_offsets
153
exp = [transport._CoalescedOffset(*x) for x in expected]
154
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
127
156
self.assertEqual(exp, out)
129
158
def test_coalesce_empty(self):
172
202
def test_coalesce_fudge(self):
173
203
self.check([(10, 30, [(0, 10), (20, 10)]),
174
(100, 10, [(0, 10),]),
204
(100, 10, [(0, 10)]),
175
205
], [(10, 10), (30, 10), (100, 10)],
208
def test_coalesce_max_size(self):
209
self.check([(10, 20, [(0, 10), (10, 10)]),
211
# If one range is above max_size, it gets its own coalesced
213
(100, 80, [(0, 80)]),],
214
[(10, 10), (20, 10), (30, 50), (100, 80)],
217
def test_coalesce_no_max_size(self):
218
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
219
[(10, 10), (20, 10), (30, 50), (80, 100)],
180
class TestMemoryTransport(TestCase):
222
def test_coalesce_default_limit(self):
223
# By default we use a 100MB max size.
224
ten_mb = 10 * 1024 * 1024
225
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
226
(10*ten_mb, ten_mb, [(0, ten_mb)])],
227
[(i*ten_mb, ten_mb) for i in range(11)])
228
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
[(i * ten_mb, ten_mb) for i in range(11)],
230
max_size=1*1024*1024*1024)
233
class TestMemoryServer(tests.TestCase):
235
def test_create_server(self):
236
server = memory.MemoryServer()
237
server.start_server()
238
url = server.get_url()
239
self.assertTrue(url in transport.transport_list_registry)
240
t = transport.get_transport_from_url(url)
243
self.assertFalse(url in transport.transport_list_registry)
244
self.assertRaises(errors.UnsupportedProtocol,
245
transport.get_transport, url)
248
class TestMemoryTransport(tests.TestCase):
182
250
def test_get_transport(self):
251
memory.MemoryTransport()
185
253
def test_clone(self):
186
transport = MemoryTransport()
187
self.assertTrue(isinstance(transport, MemoryTransport))
188
self.assertEqual("memory:///", transport.clone("/").base)
254
t = memory.MemoryTransport()
255
self.assertTrue(isinstance(t, memory.MemoryTransport))
256
self.assertEqual("memory:///", t.clone("/").base)
190
258
def test_abspath(self):
191
transport = MemoryTransport()
192
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
259
t = memory.MemoryTransport()
260
self.assertEqual("memory:///relpath", t.abspath('relpath'))
194
262
def test_abspath_of_root(self):
195
transport = MemoryTransport()
196
self.assertEqual("memory:///", transport.base)
197
self.assertEqual("memory:///", transport.abspath('/'))
263
t = memory.MemoryTransport()
264
self.assertEqual("memory:///", t.base)
265
self.assertEqual("memory:///", t.abspath('/'))
199
267
def test_abspath_of_relpath_starting_at_root(self):
200
transport = MemoryTransport()
201
self.assertEqual("memory:///foo", transport.abspath('/foo'))
268
t = memory.MemoryTransport()
269
self.assertEqual("memory:///foo", t.abspath('/foo'))
203
271
def test_append_and_get(self):
204
transport = MemoryTransport()
205
transport.append_bytes('path', 'content')
206
self.assertEqual(transport.get('path').read(), 'content')
207
transport.append_file('path', StringIO('content'))
208
self.assertEqual(transport.get('path').read(), 'contentcontent')
272
t = memory.MemoryTransport()
273
t.append_bytes('path', 'content')
274
self.assertEqual(t.get('path').read(), 'content')
275
t.append_file('path', StringIO('content'))
276
self.assertEqual(t.get('path').read(), 'contentcontent')
210
278
def test_put_and_get(self):
211
transport = MemoryTransport()
212
transport.put_file('path', StringIO('content'))
213
self.assertEqual(transport.get('path').read(), 'content')
214
transport.put_bytes('path', 'content')
215
self.assertEqual(transport.get('path').read(), 'content')
279
t = memory.MemoryTransport()
280
t.put_file('path', StringIO('content'))
281
self.assertEqual(t.get('path').read(), 'content')
282
t.put_bytes('path', 'content')
283
self.assertEqual(t.get('path').read(), 'content')
217
285
def test_append_without_dir_fails(self):
218
transport = MemoryTransport()
219
self.assertRaises(NoSuchFile,
220
transport.append_bytes, 'dir/path', 'content')
286
t = memory.MemoryTransport()
287
self.assertRaises(errors.NoSuchFile,
288
t.append_bytes, 'dir/path', 'content')
222
290
def test_put_without_dir_fails(self):
223
transport = MemoryTransport()
224
self.assertRaises(NoSuchFile,
225
transport.put_file, 'dir/path', StringIO('content'))
291
t = memory.MemoryTransport()
292
self.assertRaises(errors.NoSuchFile,
293
t.put_file, 'dir/path', StringIO('content'))
227
295
def test_get_missing(self):
228
transport = MemoryTransport()
229
self.assertRaises(NoSuchFile, transport.get, 'foo')
296
transport = memory.MemoryTransport()
297
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
231
299
def test_has_missing(self):
232
transport = MemoryTransport()
233
self.assertEquals(False, transport.has('foo'))
300
t = memory.MemoryTransport()
301
self.assertEquals(False, t.has('foo'))
235
303
def test_has_present(self):
236
transport = MemoryTransport()
237
transport.append_bytes('foo', 'content')
238
self.assertEquals(True, transport.has('foo'))
304
t = memory.MemoryTransport()
305
t.append_bytes('foo', 'content')
306
self.assertEquals(True, t.has('foo'))
240
308
def test_list_dir(self):
241
transport = MemoryTransport()
242
transport.put_bytes('foo', 'content')
243
transport.mkdir('dir')
244
transport.put_bytes('dir/subfoo', 'content')
245
transport.put_bytes('dirlike', 'content')
309
t = memory.MemoryTransport()
310
t.put_bytes('foo', 'content')
312
t.put_bytes('dir/subfoo', 'content')
313
t.put_bytes('dirlike', 'content')
247
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
248
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
315
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
316
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
250
318
def test_mkdir(self):
251
transport = MemoryTransport()
252
transport.mkdir('dir')
253
transport.append_bytes('dir/path', 'content')
254
self.assertEqual(transport.get('dir/path').read(), 'content')
319
t = memory.MemoryTransport()
321
t.append_bytes('dir/path', 'content')
322
self.assertEqual(t.get('dir/path').read(), 'content')
256
324
def test_mkdir_missing_parent(self):
257
transport = MemoryTransport()
258
self.assertRaises(NoSuchFile,
259
transport.mkdir, 'dir/dir')
325
t = memory.MemoryTransport()
326
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
261
328
def test_mkdir_twice(self):
262
transport = MemoryTransport()
263
transport.mkdir('dir')
264
self.assertRaises(FileExists, transport.mkdir, 'dir')
329
t = memory.MemoryTransport()
331
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
266
333
def test_parameters(self):
267
transport = MemoryTransport()
268
self.assertEqual(True, transport.listable())
269
self.assertEqual(False, transport.should_cache())
270
self.assertEqual(False, transport.is_readonly())
334
t = memory.MemoryTransport()
335
self.assertEqual(True, t.listable())
336
self.assertEqual(False, t.is_readonly())
272
338
def test_iter_files_recursive(self):
273
transport = MemoryTransport()
274
transport.mkdir('dir')
275
transport.put_bytes('dir/foo', 'content')
276
transport.put_bytes('dir/bar', 'content')
277
transport.put_bytes('bar', 'content')
278
paths = set(transport.iter_files_recursive())
339
t = memory.MemoryTransport()
341
t.put_bytes('dir/foo', 'content')
342
t.put_bytes('dir/bar', 'content')
343
t.put_bytes('bar', 'content')
344
paths = set(t.iter_files_recursive())
279
345
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
281
347
def test_stat(self):
282
transport = MemoryTransport()
283
transport.put_bytes('foo', 'content')
284
transport.put_bytes('bar', 'phowar')
285
self.assertEqual(7, transport.stat('foo').st_size)
286
self.assertEqual(6, transport.stat('bar').st_size)
289
class ChrootDecoratorTransportTest(TestCase):
348
t = memory.MemoryTransport()
349
t.put_bytes('foo', 'content')
350
t.put_bytes('bar', 'phowar')
351
self.assertEqual(7, t.stat('foo').st_size)
352
self.assertEqual(6, t.stat('bar').st_size)
355
class ChrootDecoratorTransportTest(tests.TestCase):
290
356
"""Chroot decoration specific tests."""
358
def test_abspath(self):
359
# The abspath is always relative to the chroot_url.
360
server = chroot.ChrootServer(
361
transport.get_transport_from_url('memory:///foo/bar/'))
362
self.start_server(server)
363
t = transport.get_transport_from_url(server.get_url())
364
self.assertEqual(server.get_url(), t.abspath('/'))
366
subdir_t = t.clone('subdir')
367
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
369
def test_clone(self):
370
server = chroot.ChrootServer(
371
transport.get_transport_from_url('memory:///foo/bar/'))
372
self.start_server(server)
373
t = transport.get_transport_from_url(server.get_url())
374
# relpath from root and root path are the same
375
relpath_cloned = t.clone('foo')
376
abspath_cloned = t.clone('/foo')
377
self.assertEqual(server, relpath_cloned.server)
378
self.assertEqual(server, abspath_cloned.server)
380
def test_chroot_url_preserves_chroot(self):
381
"""Calling get_transport on a chroot transport's base should produce a
382
transport with exactly the same behaviour as the original chroot
385
This is so that it is not possible to escape a chroot by doing::
386
url = chroot_transport.base
387
parent_url = urlutils.join(url, '..')
388
new_t = transport.get_transport_from_url(parent_url)
390
server = chroot.ChrootServer(
391
transport.get_transport_from_url('memory:///path/subpath'))
392
self.start_server(server)
393
t = transport.get_transport_from_url(server.get_url())
394
new_t = transport.get_transport_from_url(t.base)
395
self.assertEqual(t.server, new_t.server)
396
self.assertEqual(t.base, new_t.base)
398
def test_urljoin_preserves_chroot(self):
399
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
400
URL that escapes the intended chroot.
402
This is so that it is not possible to escape a chroot by doing::
403
url = chroot_transport.base
404
parent_url = urlutils.join(url, '..')
405
new_t = transport.get_transport_from_url(parent_url)
407
server = chroot.ChrootServer(
408
transport.get_transport_from_url('memory:///path/'))
409
self.start_server(server)
410
t = transport.get_transport_from_url(server.get_url())
412
errors.InvalidURLJoin, urlutils.join, t.base, '..')
415
class TestChrootServer(tests.TestCase):
292
417
def test_construct(self):
293
from bzrlib.transport import chroot
294
transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
295
self.assertEqual('memory:///pathA/', transport.chroot_url)
297
transport = chroot.ChrootTransportDecorator(
298
'chroot+memory:///path/B', chroot='memory:///path/')
299
self.assertEqual('memory:///path/', transport.chroot_url)
301
def test_append_file(self):
302
transport = get_transport('chroot+memory:///foo/bar')
303
self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
305
def test_append_bytes(self):
306
transport = get_transport('chroot+memory:///foo/bar')
307
self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
418
backing_transport = memory.MemoryTransport()
419
server = chroot.ChrootServer(backing_transport)
420
self.assertEqual(backing_transport, server.backing_transport)
422
def test_setUp(self):
423
backing_transport = memory.MemoryTransport()
424
server = chroot.ChrootServer(backing_transport)
425
server.start_server()
426
self.addCleanup(server.stop_server)
427
self.assertTrue(server.scheme
428
in transport._get_protocol_handlers().keys())
430
def test_stop_server(self):
431
backing_transport = memory.MemoryTransport()
432
server = chroot.ChrootServer(backing_transport)
433
server.start_server()
435
self.assertFalse(server.scheme
436
in transport._get_protocol_handlers().keys())
438
def test_get_url(self):
439
backing_transport = memory.MemoryTransport()
440
server = chroot.ChrootServer(backing_transport)
441
server.start_server()
442
self.addCleanup(server.stop_server)
443
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
446
class TestHooks(tests.TestCase):
447
"""Basic tests for transport hooks"""
449
def _get_connected_transport(self):
450
return transport.ConnectedTransport("bogus:nowhere")
452
def test_transporthooks_initialisation(self):
453
"""Check all expected transport hook points are set up"""
454
hookpoint = transport.TransportHooks()
455
self.assertTrue("post_connect" in hookpoint,
456
"post_connect not in %s" % (hookpoint,))
458
def test_post_connect(self):
459
"""Ensure the post_connect hook is called when _set_transport is"""
461
transport.Transport.hooks.install_named_hook("post_connect",
463
t = self._get_connected_transport()
464
self.assertLength(0, calls)
465
t._set_connection("connection", "auth")
466
self.assertEqual(calls, [t])
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
"""Pathfilter decoration specific tests."""
472
def test_abspath(self):
473
# The abspath is always relative to the base of the backing transport.
474
server = pathfilter.PathFilteringServer(
475
transport.get_transport_from_url('memory:///foo/bar/'),
477
server.start_server()
478
t = transport.get_transport_from_url(server.get_url())
479
self.assertEqual(server.get_url(), t.abspath('/'))
481
subdir_t = t.clone('subdir')
482
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
485
def make_pf_transport(self, filter_func=None):
486
"""Make a PathFilteringTransport backed by a MemoryTransport.
488
:param filter_func: by default this will be a no-op function. Use this
489
parameter to override it."""
490
if filter_func is None:
491
filter_func = lambda x: x
492
server = pathfilter.PathFilteringServer(
493
transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
494
server.start_server()
495
self.addCleanup(server.stop_server)
496
return transport.get_transport_from_url(server.get_url())
498
def test__filter(self):
499
# _filter (with an identity func as filter_func) always returns
500
# paths relative to the base of the backing transport.
501
t = self.make_pf_transport()
502
self.assertEqual('foo', t._filter('foo'))
503
self.assertEqual('foo/bar', t._filter('foo/bar'))
504
self.assertEqual('', t._filter('..'))
505
self.assertEqual('', t._filter('/'))
506
# The base of the pathfiltering transport is taken into account too.
507
t = t.clone('subdir1/subdir2')
508
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
509
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
510
self.assertEqual('subdir1', t._filter('..'))
511
self.assertEqual('', t._filter('/'))
513
def test_filter_invocation(self):
517
filter_log.append(path)
519
t = self.make_pf_transport(filter)
521
self.assertEqual(['abc'], filter_log)
523
t.clone('abc').has('xyz')
524
self.assertEqual(['abc/xyz'], filter_log)
527
self.assertEqual(['abc'], filter_log)
309
529
def test_clone(self):
310
transport = get_transport('chroot+memory:///foo/bar')
311
self.assertRaises(PathNotChild, transport.clone, '/foo')
313
def test_delete(self):
314
transport = get_transport('chroot+memory:///foo/bar')
315
self.assertRaises(PathNotChild, transport.delete, '/foo')
317
def test_delete_tree(self):
318
transport = get_transport('chroot+memory:///foo/bar')
319
self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
322
transport = get_transport('chroot+memory:///foo/bar')
323
self.assertRaises(PathNotChild, transport.get, '/foo')
325
def test_get_bytes(self):
326
transport = get_transport('chroot+memory:///foo/bar')
327
self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
330
transport = get_transport('chroot+memory:///foo/bar')
331
self.assertRaises(PathNotChild, transport.has, '/foo')
333
def test_list_dir(self):
334
transport = get_transport('chroot+memory:///foo/bar')
335
self.assertRaises(PathNotChild, transport.list_dir, '/foo')
337
def test_lock_read(self):
338
transport = get_transport('chroot+memory:///foo/bar')
339
self.assertRaises(PathNotChild, transport.lock_read, '/foo')
341
def test_lock_write(self):
342
transport = get_transport('chroot+memory:///foo/bar')
343
self.assertRaises(PathNotChild, transport.lock_write, '/foo')
345
def test_mkdir(self):
346
transport = get_transport('chroot+memory:///foo/bar')
347
self.assertRaises(PathNotChild, transport.mkdir, '/foo')
349
def test_put_bytes(self):
350
transport = get_transport('chroot+memory:///foo/bar')
351
self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
353
def test_put_file(self):
354
transport = get_transport('chroot+memory:///foo/bar')
355
self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
357
def test_rename(self):
358
transport = get_transport('chroot+memory:///foo/bar')
359
self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
360
self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
362
def test_rmdir(self):
363
transport = get_transport('chroot+memory:///foo/bar')
364
self.assertRaises(PathNotChild, transport.rmdir, '/foo')
367
transport = get_transport('chroot+memory:///foo/bar')
368
self.assertRaises(PathNotChild, transport.stat, '/foo')
371
class ReadonlyDecoratorTransportTest(TestCase):
530
t = self.make_pf_transport()
531
# relpath from root and root path are the same
532
relpath_cloned = t.clone('foo')
533
abspath_cloned = t.clone('/foo')
534
self.assertEqual(t.server, relpath_cloned.server)
535
self.assertEqual(t.server, abspath_cloned.server)
537
def test_url_preserves_pathfiltering(self):
538
"""Calling get_transport on a pathfiltered transport's base should
539
produce a transport with exactly the same behaviour as the original
540
pathfiltered transport.
542
This is so that it is not possible to escape (accidentally or
543
otherwise) the filtering by doing::
544
url = filtered_transport.base
545
parent_url = urlutils.join(url, '..')
546
new_t = transport.get_transport_from_url(parent_url)
548
t = self.make_pf_transport()
549
new_t = transport.get_transport_from_url(t.base)
550
self.assertEqual(t.server, new_t.server)
551
self.assertEqual(t.base, new_t.base)
554
class ReadonlyDecoratorTransportTest(tests.TestCase):
372
555
"""Readonly decoration specific tests."""
374
557
def test_local_parameters(self):
375
import bzrlib.transport.readonly as readonly
376
558
# connect to . in readonly mode
377
transport = readonly.ReadonlyTransportDecorator('readonly+.')
378
self.assertEqual(True, transport.listable())
379
self.assertEqual(False, transport.should_cache())
380
self.assertEqual(True, transport.is_readonly())
559
t = readonly.ReadonlyTransportDecorator('readonly+.')
560
self.assertEqual(True, t.listable())
561
self.assertEqual(True, t.is_readonly())
382
563
def test_http_parameters(self):
383
from bzrlib.tests.HttpServer import HttpServer
384
import bzrlib.transport.readonly as readonly
385
# connect to . via http which is not listable
564
from bzrlib.tests.http_server import HttpServer
565
# connect to '.' via http which is not listable
386
566
server = HttpServer()
389
transport = get_transport('readonly+' + server.get_url())
390
self.failUnless(isinstance(transport,
391
readonly.ReadonlyTransportDecorator))
392
self.assertEqual(False, transport.listable())
393
self.assertEqual(True, transport.should_cache())
394
self.assertEqual(True, transport.is_readonly())
399
class FakeNFSDecoratorTests(TestCaseInTempDir):
567
self.start_server(server)
568
t = transport.get_transport_from_url('readonly+' + server.get_url())
569
self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
570
self.assertEqual(False, t.listable())
571
self.assertEqual(True, t.is_readonly())
574
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
400
575
"""NFS decorator specific tests."""
402
577
def get_nfs_transport(self, url):
403
import bzrlib.transport.fakenfs as fakenfs
404
578
# connect to url with nfs decoration
405
579
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
407
581
def test_local_parameters(self):
408
# the listable, should_cache and is_readonly parameters
582
# the listable and is_readonly parameters
409
583
# are not changed by the fakenfs decorator
410
transport = self.get_nfs_transport('.')
411
self.assertEqual(True, transport.listable())
412
self.assertEqual(False, transport.should_cache())
413
self.assertEqual(False, transport.is_readonly())
584
t = self.get_nfs_transport('.')
585
self.assertEqual(True, t.listable())
586
self.assertEqual(False, t.is_readonly())
415
588
def test_http_parameters(self):
416
# the listable, should_cache and is_readonly parameters
589
# the listable and is_readonly parameters
417
590
# are not changed by the fakenfs decorator
418
from bzrlib.tests.HttpServer import HttpServer
419
# connect to . via http which is not listable
591
from bzrlib.tests.http_server import HttpServer
592
# connect to '.' via http which is not listable
420
593
server = HttpServer()
423
transport = self.get_nfs_transport(server.get_url())
424
self.assertIsInstance(
425
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
426
self.assertEqual(False, transport.listable())
427
self.assertEqual(True, transport.should_cache())
428
self.assertEqual(True, transport.is_readonly())
594
self.start_server(server)
595
t = self.get_nfs_transport(server.get_url())
596
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
597
self.assertEqual(False, t.listable())
598
self.assertEqual(True, t.is_readonly())
432
600
def test_fakenfs_server_default(self):
433
601
# a FakeNFSServer() should bring up a local relpath server for itself
434
import bzrlib.transport.fakenfs as fakenfs
435
server = fakenfs.FakeNFSServer()
438
# the url should be decorated appropriately
439
self.assertStartsWith(server.get_url(), 'fakenfs+')
440
# and we should be able to get a transport for it
441
transport = get_transport(server.get_url())
442
# which must be a FakeNFSTransportDecorator instance.
443
self.assertIsInstance(
444
transport, fakenfs.FakeNFSTransportDecorator)
602
server = test_server.FakeNFSServer()
603
self.start_server(server)
604
# the url should be decorated appropriately
605
self.assertStartsWith(server.get_url(), 'fakenfs+')
606
# and we should be able to get a transport for it
607
t = transport.get_transport_from_url(server.get_url())
608
# which must be a FakeNFSTransportDecorator instance.
609
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
448
611
def test_fakenfs_rename_semantics(self):
449
612
# a FakeNFS transport must mangle the way rename errors occur to
450
613
# look like NFS problems.
451
transport = self.get_nfs_transport('.')
614
t = self.get_nfs_transport('.')
452
615
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
454
self.assertRaises(bzrlib.errors.ResourceBusy,
455
transport.rename, 'from', 'to')
458
class FakeVFATDecoratorTests(TestCaseInTempDir):
617
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
620
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
459
621
"""Tests for simulation of VFAT restrictions"""
461
623
def get_vfat_transport(self, url):
466
628
def test_transport_creation(self):
467
629
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
468
transport = self.get_vfat_transport('.')
469
self.assertIsInstance(transport, FakeVFATTransportDecorator)
630
t = self.get_vfat_transport('.')
631
self.assertIsInstance(t, FakeVFATTransportDecorator)
471
633
def test_transport_mkdir(self):
472
transport = self.get_vfat_transport('.')
473
transport.mkdir('HELLO')
474
self.assertTrue(transport.has('hello'))
475
self.assertTrue(transport.has('Hello'))
634
t = self.get_vfat_transport('.')
636
self.assertTrue(t.has('hello'))
637
self.assertTrue(t.has('Hello'))
477
639
def test_forbidden_chars(self):
478
transport = self.get_vfat_transport('.')
479
self.assertRaises(ValueError, transport.has, "<NU>")
482
class BadTransportHandler(Transport):
640
t = self.get_vfat_transport('.')
641
self.assertRaises(ValueError, t.has, "<NU>")
644
class BadTransportHandler(transport.Transport):
483
645
def __init__(self, base_url):
484
raise DependencyNotPresent('some_lib', 'testing missing dependency')
487
class BackupTransportHandler(Transport):
646
raise errors.DependencyNotPresent('some_lib',
647
'testing missing dependency')
650
class BackupTransportHandler(transport.Transport):
488
651
"""Test transport that works as a backup for the BadTransportHandler"""
492
class TestTransportImplementation(TestCaseInTempDir):
655
class TestTransportImplementation(tests.TestCaseInTempDir):
493
656
"""Implementation verification for transports.
495
658
To verify a transport we need a server factory, which is a callable
496
659
that accepts no parameters and returns an implementation of
497
660
bzrlib.transport.Server.
499
662
That Server is then used to construct transport instances and test
500
663
the transport via loopback activity.
502
Currently this assumes that the Transport object is connected to the
503
current working directory. So that whatever is done
504
through the transport, should show up in the working
665
Currently this assumes that the Transport object is connected to the
666
current working directory. So that whatever is done
667
through the transport, should show up in the working
505
668
directory, and vice-versa. This is a bug, because its possible to have
506
URL schemes which provide access to something that may not be
507
result in storage on the local disk, i.e. due to file system limits, or
669
URL schemes which provide access to something that may not be
670
result in storage on the local disk, i.e. due to file system limits, or
508
671
due to it being a database or some other non-filesystem tool.
510
673
This also tests to make sure that the functions work with both
511
674
generators and lists (assuming iter(list) is effectively a generator)
515
678
super(TestTransportImplementation, self).setUp()
516
679
self._server = self.transport_server()
520
super(TestTransportImplementation, self).tearDown()
521
self._server.tearDown()
523
def get_transport(self):
524
"""Return a connected transport to the local directory."""
680
self.start_server(self._server)
682
def get_transport(self, relpath=None):
683
"""Return a connected transport to the local directory.
685
:param relpath: a path relative to the base url.
525
687
base_url = self._server.get_url()
688
url = self._adjust_url(base_url, relpath)
526
689
# try getting the transport via the regular interface:
527
t = get_transport(base_url)
690
t = transport.get_transport_from_url(url)
691
# vila--20070607 if the following are commented out the test suite
692
# still pass. Is this really still needed or was it a forgotten
528
694
if not isinstance(t, self.transport_class):
529
695
# we did not get the correct transport class type. Override the
530
696
# regular connection behaviour by direct construction.
531
t = self.transport_class(base_url)
697
t = self.transport_class(url)
535
class TestLocalTransports(TestCase):
701
class TestTransportFromPath(tests.TestCaseInTempDir):
703
def test_with_path(self):
704
t = transport.get_transport_from_path(self.test_dir)
705
self.assertIsInstance(t, local.LocalTransport)
706
self.assertEquals(t.base.rstrip("/"),
707
urlutils.local_path_to_url(self.test_dir))
709
def test_with_url(self):
710
t = transport.get_transport_from_path("file:")
711
self.assertIsInstance(t, local.LocalTransport)
712
self.assertEquals(t.base.rstrip("/"),
713
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
718
def test_with_path(self):
719
self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
722
def test_with_url(self):
723
url = urlutils.local_path_to_url(self.test_dir)
724
t = transport.get_transport_from_url(url)
725
self.assertIsInstance(t, local.LocalTransport)
726
self.assertEquals(t.base.rstrip("/"), url)
728
def test_with_url_and_segment_parameters(self):
729
url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
730
t = transport.get_transport_from_url(url)
731
self.assertIsInstance(t, local.LocalTransport)
732
self.assertEquals(t.base.rstrip("/"), url)
733
with open(os.path.join(self.test_dir, "afile"), 'w') as f:
735
self.assertTrue(t.has("afile"))
738
class TestLocalTransports(tests.TestCase):
537
740
def test_get_transport_from_abspath(self):
538
here = os.path.abspath('.')
539
t = get_transport(here)
540
self.assertIsInstance(t, LocalTransport)
741
here = osutils.abspath('.')
742
t = transport.get_transport(here)
743
self.assertIsInstance(t, local.LocalTransport)
541
744
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
543
746
def test_get_transport_from_relpath(self):
544
here = os.path.abspath('.')
545
t = get_transport('.')
546
self.assertIsInstance(t, LocalTransport)
747
here = osutils.abspath('.')
748
t = transport.get_transport('.')
749
self.assertIsInstance(t, local.LocalTransport)
547
750
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
549
752
def test_get_transport_from_local_url(self):
550
here = os.path.abspath('.')
753
here = osutils.abspath('.')
551
754
here_url = urlutils.local_path_to_url(here) + '/'
552
t = get_transport(here_url)
553
self.assertIsInstance(t, LocalTransport)
755
t = transport.get_transport(here_url)
756
self.assertIsInstance(t, local.LocalTransport)
554
757
self.assertEquals(t.base, here_url)
557
class TestWin32LocalTransport(TestCase):
759
def test_local_abspath(self):
760
here = osutils.abspath('.')
761
t = transport.get_transport(here)
762
self.assertEquals(t.local_abspath(''), here)
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
767
def test_local_transport_mkdir(self):
768
here = osutils.abspath('.')
769
t = transport.get_transport(here)
771
self.assertTrue(os.path.exists('test'))
773
def test_local_transport_mkdir_permission_denied(self):
774
# See https://bugs.launchpad.net/bzr/+bug/606537
775
here = osutils.abspath('.')
776
t = transport.get_transport(here)
777
def fake_chmod(path, mode):
778
e = OSError('permission denied')
779
e.errno = errno.EPERM
781
self.overrideAttr(os, 'chmod', fake_chmod)
783
t.mkdir('test2', mode=0707)
784
self.assertTrue(os.path.exists('test'))
785
self.assertTrue(os.path.exists('test2'))
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
790
def test_local_fdatasync_calls_fdatasync(self):
791
"""Check fdatasync on a stream tries to flush the data to the OS.
793
We can't easily observe the external effect but we can at least see
797
fdatasync = getattr(os, 'fdatasync', sentinel)
798
if fdatasync is sentinel:
799
raise tests.TestNotApplicable('fdatasync not supported')
800
t = self.get_transport('.')
801
calls = self.recordCalls(os, 'fdatasync')
802
w = t.open_write_stream('out')
805
with open('out', 'rb') as f:
806
# Should have been flushed.
807
self.assertEquals(f.read(), 'foo')
808
self.assertEquals(len(calls), 1, calls)
810
def test_missing_directory(self):
811
t = self.get_transport('.')
812
self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
815
class TestWin32LocalTransport(tests.TestCase):
559
817
def test_unc_clone_to_root(self):
560
818
# Win32 UNC path like \\HOST\path
561
819
# clone to root should stop at least at \\HOST part
563
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
821
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
564
822
for i in xrange(4):
565
823
t = t.clone('..')
566
824
self.assertEquals(t.base, 'file://HOST/')
567
825
# make sure we reach the root
568
826
t = t.clone('..')
569
827
self.assertEquals(t.base, 'file://HOST/')
830
class TestConnectedTransport(tests.TestCase):
831
"""Tests for connected to remote server transports"""
833
def test_parse_url(self):
834
t = transport.ConnectedTransport(
835
'http://simple.example.com/home/source')
836
self.assertEquals(t._parsed_url.host, 'simple.example.com')
837
self.assertEquals(t._parsed_url.port, None)
838
self.assertEquals(t._parsed_url.path, '/home/source/')
839
self.assertTrue(t._parsed_url.user is None)
840
self.assertTrue(t._parsed_url.password is None)
842
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
844
def test_parse_url_with_at_in_user(self):
846
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
847
self.assertEquals(t._parsed_url.user, 'user@host.com')
849
def test_parse_quoted_url(self):
850
t = transport.ConnectedTransport(
851
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
852
self.assertEquals(t._parsed_url.host, 'exAmple.com')
853
self.assertEquals(t._parsed_url.port, 2222)
854
self.assertEquals(t._parsed_url.user, 'robey')
855
self.assertEquals(t._parsed_url.password, 'h@t')
856
self.assertEquals(t._parsed_url.path, '/path/')
858
# Base should not keep track of the password
859
self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
861
def test_parse_invalid_url(self):
862
self.assertRaises(errors.InvalidURL,
863
transport.ConnectedTransport,
864
'sftp://lily.org:~janneke/public/bzr/gub')
866
def test_relpath(self):
867
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
869
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
871
self.assertRaises(errors.PathNotChild, t.relpath,
872
'http://user@host.com/abs/path/sub')
873
self.assertRaises(errors.PathNotChild, t.relpath,
874
'sftp://user2@host.com/abs/path/sub')
875
self.assertRaises(errors.PathNotChild, t.relpath,
876
'sftp://user@otherhost.com/abs/path/sub')
877
self.assertRaises(errors.PathNotChild, t.relpath,
878
'sftp://user@host.com:33/abs/path/sub')
879
# Make sure it works when we don't supply a username
880
t = transport.ConnectedTransport('sftp://host.com/abs/path')
881
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
883
# Make sure it works when parts of the path will be url encoded
884
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
885
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
887
def test_connection_sharing_propagate_credentials(self):
888
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
889
self.assertEquals('user', t._parsed_url.user)
890
self.assertEquals('host.com', t._parsed_url.host)
891
self.assertIs(None, t._get_connection())
892
self.assertIs(None, t._parsed_url.password)
893
c = t.clone('subdir')
894
self.assertIs(None, c._get_connection())
895
self.assertIs(None, t._parsed_url.password)
897
# Simulate the user entering a password
899
connection = object()
900
t._set_connection(connection, password)
901
self.assertIs(connection, t._get_connection())
902
self.assertIs(password, t._get_credentials())
903
self.assertIs(connection, c._get_connection())
904
self.assertIs(password, c._get_credentials())
906
# credentials can be updated
907
new_password = 'even more secret'
908
c._update_credentials(new_password)
909
self.assertIs(connection, t._get_connection())
910
self.assertIs(new_password, t._get_credentials())
911
self.assertIs(connection, c._get_connection())
912
self.assertIs(new_password, c._get_credentials())
915
class TestReusedTransports(tests.TestCase):
916
"""Tests for transport reuse"""
918
def test_reuse_same_transport(self):
919
possible_transports = []
920
t1 = transport.get_transport_from_url('http://foo/',
921
possible_transports=possible_transports)
922
self.assertEqual([t1], possible_transports)
923
t2 = transport.get_transport_from_url('http://foo/',
924
possible_transports=[t1])
925
self.assertIs(t1, t2)
927
# Also check that final '/' are handled correctly
928
t3 = transport.get_transport_from_url('http://foo/path/')
929
t4 = transport.get_transport_from_url('http://foo/path',
930
possible_transports=[t3])
931
self.assertIs(t3, t4)
933
t5 = transport.get_transport_from_url('http://foo/path')
934
t6 = transport.get_transport_from_url('http://foo/path/',
935
possible_transports=[t5])
936
self.assertIs(t5, t6)
938
def test_don_t_reuse_different_transport(self):
939
t1 = transport.get_transport_from_url('http://foo/path')
940
t2 = transport.get_transport_from_url('http://bar/path',
941
possible_transports=[t1])
942
self.assertIsNot(t1, t2)
945
class TestTransportTrace(tests.TestCase):
947
def test_decorator(self):
948
t = transport.get_transport_from_url('trace+memory://')
949
self.assertIsInstance(
950
t, bzrlib.transport.trace.TransportTraceDecorator)
952
def test_clone_preserves_activity(self):
953
t = transport.get_transport_from_url('trace+memory://')
955
self.assertTrue(t is not t2)
956
self.assertTrue(t._activity is t2._activity)
958
# the following specific tests are for the operations that have made use of
959
# logging in tests; we could test every single operation but doing that
960
# still won't cause a test failure when the top level Transport API
961
# changes; so there is little return doing that.
963
t = transport.get_transport_from_url('trace+memory:///')
964
t.put_bytes('foo', 'barish')
967
# put_bytes records the bytes, not the content to avoid memory
969
expected_result.append(('put_bytes', 'foo', 6, None))
970
# get records the file name only.
971
expected_result.append(('get', 'foo'))
972
self.assertEqual(expected_result, t._activity)
974
def test_readv(self):
975
t = transport.get_transport_from_url('trace+memory:///')
976
t.put_bytes('foo', 'barish')
977
list(t.readv('foo', [(0, 1), (3, 2)],
978
adjust_for_latency=True, upper_limit=6))
980
# put_bytes records the bytes, not the content to avoid memory
982
expected_result.append(('put_bytes', 'foo', 6, None))
983
# readv records the supplied offset request
984
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
985
self.assertEqual(expected_result, t._activity)
988
class TestSSHConnections(tests.TestCaseWithTransport):
990
def test_bzr_connect_to_bzr_ssh(self):
991
"""get_transport of a bzr+ssh:// behaves correctly.
993
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
995
# This test actually causes a bzr instance to be invoked, which is very
996
# expensive: it should be the only such test in the test suite.
997
# A reasonable evolution for this would be to simply check inside
998
# check_channel_exec_request that the command is appropriate, and then
999
# satisfy requests in-process.
1000
self.requireFeature(features.paramiko)
1001
# SFTPFullAbsoluteServer has a get_url method, and doesn't
1002
# override the interface (doesn't change self._vendor).
1003
# Note that this does encryption, so can be slow.
1004
from bzrlib.tests import stub_sftp
1006
# Start an SSH server
1007
self.command_executed = []
1008
# XXX: This is horrible -- we define a really dumb SSH server that
1009
# executes commands, and manage the hooking up of stdin/out/err to the
1010
# SSH channel ourselves. Surely this has already been implemented
1014
class StubSSHServer(stub_sftp.StubServer):
1018
def check_channel_exec_request(self, channel, command):
1019
self.test.command_executed.append(command)
1020
proc = subprocess.Popen(
1021
command, shell=True, stdin=subprocess.PIPE,
1022
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1024
# XXX: horribly inefficient, not to mention ugly.
1025
# Start a thread for each of stdin/out/err, and relay bytes
1026
# from the subprocess to channel and vice versa.
1027
def ferry_bytes(read, write, close):
1036
(channel.recv, proc.stdin.write, proc.stdin.close),
1037
(proc.stdout.read, channel.sendall, channel.close),
1038
(proc.stderr.read, channel.sendall_stderr, channel.close)]
1039
started.append(proc)
1040
for read, write, close in file_functions:
1041
t = threading.Thread(
1042
target=ferry_bytes, args=(read, write, close))
1048
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1049
# We *don't* want to override the default SSH vendor: the detected one
1050
# is the one to use.
1052
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1053
# inherits from SFTPServer which forces the SSH vendor to
1054
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1055
self.start_server(ssh_server)
1056
port = ssh_server.port
1058
if sys.platform == 'win32':
1059
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1061
bzr_remote_path = self.get_bzr_path()
1062
self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1064
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
1065
# variable is used to tell bzr what command to run on the remote end.
1066
path_to_branch = osutils.abspath('.')
1067
if sys.platform == 'win32':
1068
# On Windows, we export all drives as '/C:/, etc. So we need to
1069
# prefix a '/' to get the right path.
1070
path_to_branch = '/' + path_to_branch
1071
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1072
t = transport.get_transport(url)
1073
self.permit_url(t.base)
1077
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1078
self.command_executed)
1079
# Make sure to disconnect, so that the remote process can stop, and we
1080
# can cleanup. Then pause the test until everything is shutdown
1081
t._client._medium.disconnect()
1084
# First wait for the subprocess
1086
# And the rest are threads
1087
for t in started[1:]:
1091
class TestUnhtml(tests.TestCase):
1093
"""Tests for unhtml_roughly"""
1095
def test_truncation(self):
1096
fake_html = "<p>something!\n" * 1000
1097
result = http.unhtml_roughly(fake_html)
1098
self.assertEquals(len(result), 1000)
1099
self.assertStartsWith(result, " something!")
1102
class SomeDirectory(object):
1104
def look_up(self, name, url):
1108
class TestLocationToUrl(tests.TestCase):
1110
def get_base_location(self):
1111
path = osutils.abspath('/foo/bar')
1112
if path.startswith('/'):
1113
url = 'file://%s' % (path,)
1115
# On Windows, abspaths start with the drive letter, so we have to
1116
# add in the extra '/'
1117
url = 'file:///%s' % (path,)
1120
def test_regular_url(self):
1121
self.assertEquals("file://foo", location_to_url("file://foo"))
1123
def test_directory(self):
1124
directories.register("bar:", SomeDirectory, "Dummy directory")
1125
self.addCleanup(directories.remove, "bar:")
1126
self.assertEquals("http://bar", location_to_url("bar:"))
1128
def test_unicode_url(self):
1129
self.assertRaises(errors.InvalidURL, location_to_url,
1130
"http://fo/\xc3\xaf".decode("utf-8"))
1132
def test_unicode_path(self):
1133
path, url = self.get_base_location()
1134
location = path + "\xc3\xaf".decode("utf-8")
1136
self.assertEquals(url, location_to_url(location))
1138
def test_path(self):
1139
path, url = self.get_base_location()
1140
self.assertEquals(url, location_to_url(path))
1142
def test_relative_file_url(self):
1143
self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1144
location_to_url("file:bar"))
1146
def test_absolute_file_url(self):
1147
self.assertEquals("file:///bar", location_to_url("file:/bar"))