18
18
from cStringIO import StringIO
21
25
from bzrlib import (
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
32
from bzrlib.directory_service import directories
33
from bzrlib.transport import (
43
import bzrlib.transport.trace
44
from bzrlib.tests import (
53
50
# TODO: Should possibly split transport-specific tests into their own files.
56
class TestTransport(TestCase):
53
class TestTransport(tests.TestCase):
57
54
"""Test the non transport-concrete class functionality."""
59
56
def test__get_set_protocol_handlers(self):
60
handlers = _get_protocol_handlers()
61
self.assertNotEqual([], handlers.keys( ))
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
66
_set_protocol_handlers(handlers)
57
handlers = transport._get_protocol_handlers()
58
self.assertNotEqual([], handlers.keys())
59
transport._clear_protocol_handlers()
60
self.addCleanup(transport._set_protocol_handlers, handlers)
61
self.assertEqual([], transport._get_protocol_handlers().keys())
68
63
def test_get_transport_modules(self):
69
handlers = _get_protocol_handlers()
64
handlers = transport._get_protocol_handlers()
65
self.addCleanup(transport._set_protocol_handlers, handlers)
70
66
# don't pollute the current handlers
71
_clear_protocol_handlers()
67
transport._clear_protocol_handlers()
72
69
class SampleHandler(object):
73
70
"""I exist, isnt that enough?"""
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
84
_get_transport_modules())
86
_set_protocol_handlers(handlers)
71
transport._clear_protocol_handlers()
72
transport.register_transport_proto('foo')
73
transport.register_lazy_transport('foo',
74
'bzrlib.tests.test_transport',
75
'TestTransport.SampleHandler')
76
transport.register_transport_proto('bar')
77
transport.register_lazy_transport('bar',
78
'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
self.assertEqual([SampleHandler.__module__,
81
'bzrlib.transport.chroot',
82
'bzrlib.transport.pathfilter'],
83
transport._get_transport_modules())
88
85
def test_transport_dependency(self):
89
86
"""Transport with missing dependency causes no error"""
90
saved_handlers = _get_protocol_handlers()
87
saved_handlers = transport._get_protocol_handlers()
88
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
89
# don't pollute the current handlers
92
_clear_protocol_handlers()
90
transport._clear_protocol_handlers()
91
transport.register_transport_proto('foo')
92
transport.register_lazy_transport(
93
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
94
register_transport_proto('foo')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
98
get_transport('foo://fooserver/foo')
99
except UnsupportedProtocol, e:
101
self.assertEquals('Unsupported protocol'
102
' for url "foo://fooserver/foo":'
103
' Unable to import library "some_lib":'
104
' testing missing dependency', str(e))
106
self.fail('Did not raise UnsupportedProtocol')
108
# restore original values
109
_set_protocol_handlers(saved_handlers)
95
transport.get_transport_from_url('foo://fooserver/foo')
96
except errors.UnsupportedProtocol, e:
98
self.assertEquals('Unsupported protocol'
99
' for url "foo://fooserver/foo":'
100
' Unable to import library "some_lib":'
101
' testing missing dependency', str(e))
103
self.fail('Did not raise UnsupportedProtocol')
111
105
def test_transport_fallback(self):
112
106
"""Transport with missing dependency causes no error"""
113
saved_handlers = _get_protocol_handlers()
115
_clear_protocol_handlers()
116
register_transport_proto('foo')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BackupTransportHandler')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BadTransportHandler')
121
t = get_transport('foo://fooserver/foo')
122
self.assertTrue(isinstance(t, BackupTransportHandler))
124
_set_protocol_handlers(saved_handlers)
107
saved_handlers = transport._get_protocol_handlers()
108
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
109
transport._clear_protocol_handlers()
110
transport.register_transport_proto('foo')
111
transport.register_lazy_transport(
112
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
113
transport.register_lazy_transport(
114
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
115
t = transport.get_transport_from_url('foo://fooserver/foo')
116
self.assertTrue(isinstance(t, BackupTransportHandler))
126
118
def test_ssh_hints(self):
127
119
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
129
get_transport('ssh://fooserver/foo')
130
except UnsupportedProtocol, e:
121
transport.get_transport_from_url('ssh://fooserver/foo')
122
except errors.UnsupportedProtocol, e:
132
124
self.assertEquals('Unsupported protocol'
133
125
' for url "ssh://fooserver/foo":'
134
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
126
' bzr supports bzr+ssh to operate over ssh,'
127
' use "bzr+ssh://fooserver/foo".',
137
130
self.fail('Did not raise UnsupportedProtocol')
139
132
def test_LateReadError(self):
140
133
"""The LateReadError helper should raise on read()."""
141
a_file = LateReadError('a path')
134
a_file = transport.LateReadError('a path')
144
except ReadError, error:
137
except errors.ReadError, error:
145
138
self.assertEqual('a path', error.path)
146
self.assertRaises(ReadError, a_file.read, 40)
139
self.assertRaises(errors.ReadError, a_file.read, 40)
149
def test__combine_paths(self):
151
self.assertEqual('/home/sarah/project/foo',
152
t._combine_paths('/home/sarah', 'project/foo'))
153
self.assertEqual('/etc',
154
t._combine_paths('/home/sarah', '../../etc'))
155
self.assertEqual('/etc',
156
t._combine_paths('/home/sarah', '../../../etc'))
157
self.assertEqual('/etc',
158
t._combine_paths('/home/sarah', '/etc'))
160
142
def test_local_abspath_non_local_transport(self):
161
143
# the base implementation should throw
162
t = MemoryTransport()
144
t = memory.MemoryTransport()
163
145
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
146
self.assertEqual('memory:///t is not a local path.', str(e))
167
class TestCoalesceOffsets(TestCase):
149
class TestCoalesceOffsets(tests.TestCase):
169
151
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
coalesce = Transport._coalesce_offsets
171
exp = [_CoalescedOffset(*x) for x in expected]
152
coalesce = transport.Transport._coalesce_offsets
153
exp = [transport._CoalescedOffset(*x) for x in expected]
172
154
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
155
max_size=max_size))
174
156
self.assertEqual(exp, out)
220
202
def test_coalesce_fudge(self):
221
203
self.check([(10, 30, [(0, 10), (20, 10)]),
222
(100, 10, [(0, 10),]),
204
(100, 10, [(0, 10)]),
223
205
], [(10, 10), (30, 10), (100, 10)],
226
208
def test_coalesce_max_size(self):
227
209
self.check([(10, 20, [(0, 10), (10, 10)]),
228
210
(30, 50, [(0, 50)]),
229
211
# If one range is above max_size, it gets its own coalesced
231
(100, 80, [(0, 80),]),],
213
(100, 80, [(0, 80)]),],
232
214
[(10, 10), (20, 10), (30, 50), (100, 80)],
236
217
def test_coalesce_no_max_size(self):
237
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
238
219
[(10, 10), (20, 10), (30, 50), (80, 100)],
241
222
def test_coalesce_default_limit(self):
242
223
# By default we use a 100MB max size.
243
ten_mb = 10*1024*1024
244
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
224
ten_mb = 10 * 1024 * 1024
225
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
245
226
(10*ten_mb, ten_mb, [(0, ten_mb)])],
246
227
[(i*ten_mb, ten_mb) for i in range(11)])
247
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
[(i*ten_mb, ten_mb) for i in range(11)],
228
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
[(i * ten_mb, ten_mb) for i in range(11)],
249
230
max_size=1*1024*1024*1024)
252
class TestMemoryTransport(TestCase):
233
class TestMemoryServer(tests.TestCase):
235
def test_create_server(self):
236
server = memory.MemoryServer()
237
server.start_server()
238
url = server.get_url()
239
self.assertTrue(url in transport.transport_list_registry)
240
t = transport.get_transport_from_url(url)
243
self.assertFalse(url in transport.transport_list_registry)
244
self.assertRaises(errors.UnsupportedProtocol,
245
transport.get_transport, url)
248
class TestMemoryTransport(tests.TestCase):
254
250
def test_get_transport(self):
251
memory.MemoryTransport()
257
253
def test_clone(self):
258
transport = MemoryTransport()
259
self.assertTrue(isinstance(transport, MemoryTransport))
260
self.assertEqual("memory:///", transport.clone("/").base)
254
t = memory.MemoryTransport()
255
self.assertTrue(isinstance(t, memory.MemoryTransport))
256
self.assertEqual("memory:///", t.clone("/").base)
262
258
def test_abspath(self):
263
transport = MemoryTransport()
264
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
259
t = memory.MemoryTransport()
260
self.assertEqual("memory:///relpath", t.abspath('relpath'))
266
262
def test_abspath_of_root(self):
267
transport = MemoryTransport()
268
self.assertEqual("memory:///", transport.base)
269
self.assertEqual("memory:///", transport.abspath('/'))
263
t = memory.MemoryTransport()
264
self.assertEqual("memory:///", t.base)
265
self.assertEqual("memory:///", t.abspath('/'))
271
267
def test_abspath_of_relpath_starting_at_root(self):
272
transport = MemoryTransport()
273
self.assertEqual("memory:///foo", transport.abspath('/foo'))
268
t = memory.MemoryTransport()
269
self.assertEqual("memory:///foo", t.abspath('/foo'))
275
271
def test_append_and_get(self):
276
transport = MemoryTransport()
277
transport.append_bytes('path', 'content')
278
self.assertEqual(transport.get('path').read(), 'content')
279
transport.append_file('path', StringIO('content'))
280
self.assertEqual(transport.get('path').read(), 'contentcontent')
272
t = memory.MemoryTransport()
273
t.append_bytes('path', 'content')
274
self.assertEqual(t.get('path').read(), 'content')
275
t.append_file('path', StringIO('content'))
276
self.assertEqual(t.get('path').read(), 'contentcontent')
282
278
def test_put_and_get(self):
283
transport = MemoryTransport()
284
transport.put_file('path', StringIO('content'))
285
self.assertEqual(transport.get('path').read(), 'content')
286
transport.put_bytes('path', 'content')
287
self.assertEqual(transport.get('path').read(), 'content')
279
t = memory.MemoryTransport()
280
t.put_file('path', StringIO('content'))
281
self.assertEqual(t.get('path').read(), 'content')
282
t.put_bytes('path', 'content')
283
self.assertEqual(t.get('path').read(), 'content')
289
285
def test_append_without_dir_fails(self):
290
transport = MemoryTransport()
291
self.assertRaises(NoSuchFile,
292
transport.append_bytes, 'dir/path', 'content')
286
t = memory.MemoryTransport()
287
self.assertRaises(errors.NoSuchFile,
288
t.append_bytes, 'dir/path', 'content')
294
290
def test_put_without_dir_fails(self):
295
transport = MemoryTransport()
296
self.assertRaises(NoSuchFile,
297
transport.put_file, 'dir/path', StringIO('content'))
291
t = memory.MemoryTransport()
292
self.assertRaises(errors.NoSuchFile,
293
t.put_file, 'dir/path', StringIO('content'))
299
295
def test_get_missing(self):
300
transport = MemoryTransport()
301
self.assertRaises(NoSuchFile, transport.get, 'foo')
296
transport = memory.MemoryTransport()
297
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
303
299
def test_has_missing(self):
304
transport = MemoryTransport()
305
self.assertEquals(False, transport.has('foo'))
300
t = memory.MemoryTransport()
301
self.assertEquals(False, t.has('foo'))
307
303
def test_has_present(self):
308
transport = MemoryTransport()
309
transport.append_bytes('foo', 'content')
310
self.assertEquals(True, transport.has('foo'))
304
t = memory.MemoryTransport()
305
t.append_bytes('foo', 'content')
306
self.assertEquals(True, t.has('foo'))
312
308
def test_list_dir(self):
313
transport = MemoryTransport()
314
transport.put_bytes('foo', 'content')
315
transport.mkdir('dir')
316
transport.put_bytes('dir/subfoo', 'content')
317
transport.put_bytes('dirlike', 'content')
309
t = memory.MemoryTransport()
310
t.put_bytes('foo', 'content')
312
t.put_bytes('dir/subfoo', 'content')
313
t.put_bytes('dirlike', 'content')
319
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
315
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
316
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
322
318
def test_mkdir(self):
323
transport = MemoryTransport()
324
transport.mkdir('dir')
325
transport.append_bytes('dir/path', 'content')
326
self.assertEqual(transport.get('dir/path').read(), 'content')
319
t = memory.MemoryTransport()
321
t.append_bytes('dir/path', 'content')
322
self.assertEqual(t.get('dir/path').read(), 'content')
328
324
def test_mkdir_missing_parent(self):
329
transport = MemoryTransport()
330
self.assertRaises(NoSuchFile,
331
transport.mkdir, 'dir/dir')
325
t = memory.MemoryTransport()
326
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
333
328
def test_mkdir_twice(self):
334
transport = MemoryTransport()
335
transport.mkdir('dir')
336
self.assertRaises(FileExists, transport.mkdir, 'dir')
329
t = memory.MemoryTransport()
331
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
338
333
def test_parameters(self):
339
transport = MemoryTransport()
340
self.assertEqual(True, transport.listable())
341
self.assertEqual(False, transport.is_readonly())
334
t = memory.MemoryTransport()
335
self.assertEqual(True, t.listable())
336
self.assertEqual(False, t.is_readonly())
343
338
def test_iter_files_recursive(self):
344
transport = MemoryTransport()
345
transport.mkdir('dir')
346
transport.put_bytes('dir/foo', 'content')
347
transport.put_bytes('dir/bar', 'content')
348
transport.put_bytes('bar', 'content')
349
paths = set(transport.iter_files_recursive())
339
t = memory.MemoryTransport()
341
t.put_bytes('dir/foo', 'content')
342
t.put_bytes('dir/bar', 'content')
343
t.put_bytes('bar', 'content')
344
paths = set(t.iter_files_recursive())
350
345
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
352
347
def test_stat(self):
353
transport = MemoryTransport()
354
transport.put_bytes('foo', 'content')
355
transport.put_bytes('bar', 'phowar')
356
self.assertEqual(7, transport.stat('foo').st_size)
357
self.assertEqual(6, transport.stat('bar').st_size)
360
class ChrootDecoratorTransportTest(TestCase):
348
t = memory.MemoryTransport()
349
t.put_bytes('foo', 'content')
350
t.put_bytes('bar', 'phowar')
351
self.assertEqual(7, t.stat('foo').st_size)
352
self.assertEqual(6, t.stat('bar').st_size)
355
class ChrootDecoratorTransportTest(tests.TestCase):
361
356
"""Chroot decoration specific tests."""
363
358
def test_abspath(self):
364
359
# The abspath is always relative to the chroot_url.
365
server = ChrootServer(get_transport('memory:///foo/bar/'))
367
transport = get_transport(server.get_url())
368
self.assertEqual(server.get_url(), transport.abspath('/'))
360
server = chroot.ChrootServer(
361
transport.get_transport_from_url('memory:///foo/bar/'))
362
self.start_server(server)
363
t = transport.get_transport_from_url(server.get_url())
364
self.assertEqual(server.get_url(), t.abspath('/'))
370
subdir_transport = transport.clone('subdir')
371
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
366
subdir_t = t.clone('subdir')
367
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
374
369
def test_clone(self):
375
server = ChrootServer(get_transport('memory:///foo/bar/'))
377
transport = get_transport(server.get_url())
370
server = chroot.ChrootServer(
371
transport.get_transport_from_url('memory:///foo/bar/'))
372
self.start_server(server)
373
t = transport.get_transport_from_url(server.get_url())
378
374
# relpath from root and root path are the same
379
relpath_cloned = transport.clone('foo')
380
abspath_cloned = transport.clone('/foo')
375
relpath_cloned = t.clone('foo')
376
abspath_cloned = t.clone('/foo')
381
377
self.assertEqual(server, relpath_cloned.server)
382
378
self.assertEqual(server, abspath_cloned.server)
385
380
def test_chroot_url_preserves_chroot(self):
386
381
"""Calling get_transport on a chroot transport's base should produce a
407
402
This is so that it is not possible to escape a chroot by doing::
408
403
url = chroot_transport.base
409
404
parent_url = urlutils.join(url, '..')
410
new_transport = get_transport(parent_url)
405
new_t = transport.get_transport_from_url(parent_url)
412
server = ChrootServer(get_transport('memory:///path/'))
414
transport = get_transport(server.get_url())
407
server = chroot.ChrootServer(
408
transport.get_transport_from_url('memory:///path/'))
409
self.start_server(server)
410
t = transport.get_transport_from_url(server.get_url())
415
411
self.assertRaises(
416
InvalidURLJoin, urlutils.join, transport.base, '..')
420
class ChrootServerTest(TestCase):
412
errors.InvalidURLJoin, urlutils.join, t.base, '..')
415
class TestChrootServer(tests.TestCase):
422
417
def test_construct(self):
423
backing_transport = MemoryTransport()
424
server = ChrootServer(backing_transport)
418
backing_transport = memory.MemoryTransport()
419
server = chroot.ChrootServer(backing_transport)
425
420
self.assertEqual(backing_transport, server.backing_transport)
427
422
def test_setUp(self):
428
backing_transport = MemoryTransport()
429
server = ChrootServer(backing_transport)
431
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
423
backing_transport = memory.MemoryTransport()
424
server = chroot.ChrootServer(backing_transport)
425
server.start_server()
426
self.addCleanup(server.stop_server)
427
self.assertTrue(server.scheme
428
in transport._get_protocol_handlers().keys())
433
def test_tearDown(self):
434
backing_transport = MemoryTransport()
435
server = ChrootServer(backing_transport)
438
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
430
def test_stop_server(self):
431
backing_transport = memory.MemoryTransport()
432
server = chroot.ChrootServer(backing_transport)
433
server.start_server()
435
self.assertFalse(server.scheme
436
in transport._get_protocol_handlers().keys())
440
438
def test_get_url(self):
441
backing_transport = MemoryTransport()
442
server = ChrootServer(backing_transport)
439
backing_transport = memory.MemoryTransport()
440
server = chroot.ChrootServer(backing_transport)
441
server.start_server()
442
self.addCleanup(server.stop_server)
444
443
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
448
class ReadonlyDecoratorTransportTest(TestCase):
446
class TestHooks(tests.TestCase):
447
"""Basic tests for transport hooks"""
449
def _get_connected_transport(self):
450
return transport.ConnectedTransport("bogus:nowhere")
452
def test_transporthooks_initialisation(self):
453
"""Check all expected transport hook points are set up"""
454
hookpoint = transport.TransportHooks()
455
self.assertTrue("post_connect" in hookpoint,
456
"post_connect not in %s" % (hookpoint,))
458
def test_post_connect(self):
459
"""Ensure the post_connect hook is called when _set_transport is"""
461
transport.Transport.hooks.install_named_hook("post_connect",
463
t = self._get_connected_transport()
464
self.assertLength(0, calls)
465
t._set_connection("connection", "auth")
466
self.assertEqual(calls, [t])
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
"""Pathfilter decoration specific tests."""
472
def test_abspath(self):
473
# The abspath is always relative to the base of the backing transport.
474
server = pathfilter.PathFilteringServer(
475
transport.get_transport_from_url('memory:///foo/bar/'),
477
server.start_server()
478
t = transport.get_transport_from_url(server.get_url())
479
self.assertEqual(server.get_url(), t.abspath('/'))
481
subdir_t = t.clone('subdir')
482
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
485
def make_pf_transport(self, filter_func=None):
486
"""Make a PathFilteringTransport backed by a MemoryTransport.
488
:param filter_func: by default this will be a no-op function. Use this
489
parameter to override it."""
490
if filter_func is None:
491
filter_func = lambda x: x
492
server = pathfilter.PathFilteringServer(
493
transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
494
server.start_server()
495
self.addCleanup(server.stop_server)
496
return transport.get_transport_from_url(server.get_url())
498
def test__filter(self):
499
# _filter (with an identity func as filter_func) always returns
500
# paths relative to the base of the backing transport.
501
t = self.make_pf_transport()
502
self.assertEqual('foo', t._filter('foo'))
503
self.assertEqual('foo/bar', t._filter('foo/bar'))
504
self.assertEqual('', t._filter('..'))
505
self.assertEqual('', t._filter('/'))
506
# The base of the pathfiltering transport is taken into account too.
507
t = t.clone('subdir1/subdir2')
508
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
509
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
510
self.assertEqual('subdir1', t._filter('..'))
511
self.assertEqual('', t._filter('/'))
513
def test_filter_invocation(self):
517
filter_log.append(path)
519
t = self.make_pf_transport(filter)
521
self.assertEqual(['abc'], filter_log)
523
t.clone('abc').has('xyz')
524
self.assertEqual(['abc/xyz'], filter_log)
527
self.assertEqual(['abc'], filter_log)
529
def test_clone(self):
530
t = self.make_pf_transport()
531
# relpath from root and root path are the same
532
relpath_cloned = t.clone('foo')
533
abspath_cloned = t.clone('/foo')
534
self.assertEqual(t.server, relpath_cloned.server)
535
self.assertEqual(t.server, abspath_cloned.server)
537
def test_url_preserves_pathfiltering(self):
538
"""Calling get_transport on a pathfiltered transport's base should
539
produce a transport with exactly the same behaviour as the original
540
pathfiltered transport.
542
This is so that it is not possible to escape (accidentally or
543
otherwise) the filtering by doing::
544
url = filtered_transport.base
545
parent_url = urlutils.join(url, '..')
546
new_t = transport.get_transport_from_url(parent_url)
548
t = self.make_pf_transport()
549
new_t = transport.get_transport_from_url(t.base)
550
self.assertEqual(t.server, new_t.server)
551
self.assertEqual(t.base, new_t.base)
554
class ReadonlyDecoratorTransportTest(tests.TestCase):
449
555
"""Readonly decoration specific tests."""
451
557
def test_local_parameters(self):
452
import bzrlib.transport.readonly as readonly
453
558
# connect to . in readonly mode
454
transport = readonly.ReadonlyTransportDecorator('readonly+.')
455
self.assertEqual(True, transport.listable())
456
self.assertEqual(True, transport.is_readonly())
559
t = readonly.ReadonlyTransportDecorator('readonly+.')
560
self.assertEqual(True, t.listable())
561
self.assertEqual(True, t.is_readonly())
458
563
def test_http_parameters(self):
459
564
from bzrlib.tests.http_server import HttpServer
460
import bzrlib.transport.readonly as readonly
461
565
# connect to '.' via http which is not listable
462
566
server = HttpServer()
465
transport = get_transport('readonly+' + server.get_url())
466
self.failUnless(isinstance(transport,
467
readonly.ReadonlyTransportDecorator))
468
self.assertEqual(False, transport.listable())
469
self.assertEqual(True, transport.is_readonly())
474
class FakeNFSDecoratorTests(TestCaseInTempDir):
567
self.start_server(server)
568
t = transport.get_transport_from_url('readonly+' + server.get_url())
569
self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
570
self.assertEqual(False, t.listable())
571
self.assertEqual(True, t.is_readonly())
574
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
475
575
"""NFS decorator specific tests."""
477
577
def get_nfs_transport(self, url):
478
import bzrlib.transport.fakenfs as fakenfs
479
578
# connect to url with nfs decoration
480
579
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
482
581
def test_local_parameters(self):
483
582
# the listable and is_readonly parameters
484
583
# are not changed by the fakenfs decorator
485
transport = self.get_nfs_transport('.')
486
self.assertEqual(True, transport.listable())
487
self.assertEqual(False, transport.is_readonly())
584
t = self.get_nfs_transport('.')
585
self.assertEqual(True, t.listable())
586
self.assertEqual(False, t.is_readonly())
489
588
def test_http_parameters(self):
490
589
# the listable and is_readonly parameters
612
class TestLocalTransports(TestCase):
701
class TestTransportFromPath(tests.TestCaseInTempDir):
703
def test_with_path(self):
704
t = transport.get_transport_from_path(self.test_dir)
705
self.assertIsInstance(t, local.LocalTransport)
706
self.assertEquals(t.base.rstrip("/"),
707
urlutils.local_path_to_url(self.test_dir))
709
def test_with_url(self):
710
t = transport.get_transport_from_path("file:")
711
self.assertIsInstance(t, local.LocalTransport)
712
self.assertEquals(t.base.rstrip("/"),
713
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
718
def test_with_path(self):
719
self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
722
def test_with_url(self):
723
url = urlutils.local_path_to_url(self.test_dir)
724
t = transport.get_transport_from_url(url)
725
self.assertIsInstance(t, local.LocalTransport)
726
self.assertEquals(t.base.rstrip("/"), url)
728
def test_with_url_and_segment_parameters(self):
729
url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
730
t = transport.get_transport_from_url(url)
731
self.assertIsInstance(t, local.LocalTransport)
732
self.assertEquals(t.base.rstrip("/"), url)
733
with open(os.path.join(self.test_dir, "afile"), 'w') as f:
735
self.assertTrue(t.has("afile"))
738
class TestLocalTransports(tests.TestCase):
614
740
def test_get_transport_from_abspath(self):
615
741
here = osutils.abspath('.')
616
t = get_transport(here)
617
self.assertIsInstance(t, LocalTransport)
742
t = transport.get_transport(here)
743
self.assertIsInstance(t, local.LocalTransport)
618
744
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
620
746
def test_get_transport_from_relpath(self):
621
747
here = osutils.abspath('.')
622
t = get_transport('.')
623
self.assertIsInstance(t, LocalTransport)
748
t = transport.get_transport('.')
749
self.assertIsInstance(t, local.LocalTransport)
624
750
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
626
752
def test_get_transport_from_local_url(self):
627
753
here = osutils.abspath('.')
628
754
here_url = urlutils.local_path_to_url(here) + '/'
629
t = get_transport(here_url)
630
self.assertIsInstance(t, LocalTransport)
755
t = transport.get_transport(here_url)
756
self.assertIsInstance(t, local.LocalTransport)
631
757
self.assertEquals(t.base, here_url)
633
759
def test_local_abspath(self):
634
760
here = osutils.abspath('.')
635
t = get_transport(here)
761
t = transport.get_transport(here)
636
762
self.assertEquals(t.local_abspath(''), here)
639
class TestWin32LocalTransport(TestCase):
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
767
def test_local_transport_mkdir(self):
768
here = osutils.abspath('.')
769
t = transport.get_transport(here)
771
self.assertTrue(os.path.exists('test'))
773
def test_local_transport_mkdir_permission_denied(self):
774
# See https://bugs.launchpad.net/bzr/+bug/606537
775
here = osutils.abspath('.')
776
t = transport.get_transport(here)
777
def fake_chmod(path, mode):
778
e = OSError('permission denied')
779
e.errno = errno.EPERM
781
self.overrideAttr(os, 'chmod', fake_chmod)
783
t.mkdir('test2', mode=0707)
784
self.assertTrue(os.path.exists('test'))
785
self.assertTrue(os.path.exists('test2'))
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
790
def test_local_fdatasync_calls_fdatasync(self):
791
"""Check fdatasync on a stream tries to flush the data to the OS.
793
We can't easily observe the external effect but we can at least see
797
fdatasync = getattr(os, 'fdatasync', sentinel)
798
if fdatasync is sentinel:
799
raise tests.TestNotApplicable('fdatasync not supported')
800
t = self.get_transport('.')
801
calls = self.recordCalls(os, 'fdatasync')
802
w = t.open_write_stream('out')
805
with open('out', 'rb') as f:
806
# Should have been flushed.
807
self.assertEquals(f.read(), 'foo')
808
self.assertEquals(len(calls), 1, calls)
810
def test_missing_directory(self):
811
t = self.get_transport('.')
812
self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
815
class TestWin32LocalTransport(tests.TestCase):
641
817
def test_unc_clone_to_root(self):
642
818
# Win32 UNC path like \\HOST\path
643
819
# clone to root should stop at least at \\HOST part
645
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
821
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
646
822
for i in xrange(4):
647
823
t = t.clone('..')
648
824
self.assertEquals(t.base, 'file://HOST/')
651
827
self.assertEquals(t.base, 'file://HOST/')
654
class TestConnectedTransport(TestCase):
830
class TestConnectedTransport(tests.TestCase):
655
831
"""Tests for connected to remote server transports"""
657
833
def test_parse_url(self):
658
t = ConnectedTransport('http://simple.example.com/home/source')
659
self.assertEquals(t._host, 'simple.example.com')
660
self.assertEquals(t._port, None)
661
self.assertEquals(t._path, '/home/source/')
662
self.failUnless(t._user is None)
663
self.failUnless(t._password is None)
834
t = transport.ConnectedTransport(
835
'http://simple.example.com/home/source')
836
self.assertEquals(t._parsed_url.host, 'simple.example.com')
837
self.assertEquals(t._parsed_url.port, None)
838
self.assertEquals(t._parsed_url.path, '/home/source/')
839
self.assertTrue(t._parsed_url.user is None)
840
self.assertTrue(t._parsed_url.password is None)
665
842
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
667
844
def test_parse_url_with_at_in_user(self):
669
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
670
self.assertEquals(t._user, 'user@host.com')
846
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
847
self.assertEquals(t._parsed_url.user, 'user@host.com')
672
849
def test_parse_quoted_url(self):
673
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
674
self.assertEquals(t._host, 'exAmple.com')
675
self.assertEquals(t._port, 2222)
676
self.assertEquals(t._user, 'robey')
677
self.assertEquals(t._password, 'h@t')
678
self.assertEquals(t._path, '/path/')
850
t = transport.ConnectedTransport(
851
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
852
self.assertEquals(t._parsed_url.host, 'exAmple.com')
853
self.assertEquals(t._parsed_url.port, 2222)
854
self.assertEquals(t._parsed_url.user, 'robey')
855
self.assertEquals(t._parsed_url.password, 'h@t')
856
self.assertEquals(t._parsed_url.path, '/path/')
680
858
# Base should not keep track of the password
681
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
859
self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
683
861
def test_parse_invalid_url(self):
684
862
self.assertRaises(errors.InvalidURL,
863
transport.ConnectedTransport,
686
864
'sftp://lily.org:~janneke/public/bzr/gub')
688
866
def test_relpath(self):
689
t = ConnectedTransport('sftp://user@host.com/abs/path')
867
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
691
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
869
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
692
871
self.assertRaises(errors.PathNotChild, t.relpath,
693
872
'http://user@host.com/abs/path/sub')
694
873
self.assertRaises(errors.PathNotChild, t.relpath,
733
912
self.assertIs(new_password, c._get_credentials())
736
class TestReusedTransports(TestCase):
915
class TestReusedTransports(tests.TestCase):
737
916
"""Tests for transport reuse"""
739
918
def test_reuse_same_transport(self):
740
919
possible_transports = []
741
t1 = get_transport('http://foo/',
742
possible_transports=possible_transports)
920
t1 = transport.get_transport_from_url('http://foo/',
921
possible_transports=possible_transports)
743
922
self.assertEqual([t1], possible_transports)
744
t2 = get_transport('http://foo/', possible_transports=[t1])
923
t2 = transport.get_transport_from_url('http://foo/',
924
possible_transports=[t1])
745
925
self.assertIs(t1, t2)
747
927
# Also check that final '/' are handled correctly
748
t3 = get_transport('http://foo/path/')
749
t4 = get_transport('http://foo/path', possible_transports=[t3])
928
t3 = transport.get_transport_from_url('http://foo/path/')
929
t4 = transport.get_transport_from_url('http://foo/path',
930
possible_transports=[t3])
750
931
self.assertIs(t3, t4)
752
t5 = get_transport('http://foo/path')
753
t6 = get_transport('http://foo/path/', possible_transports=[t5])
933
t5 = transport.get_transport_from_url('http://foo/path')
934
t6 = transport.get_transport_from_url('http://foo/path/',
935
possible_transports=[t5])
754
936
self.assertIs(t5, t6)
756
938
def test_don_t_reuse_different_transport(self):
757
t1 = get_transport('http://foo/path')
758
t2 = get_transport('http://bar/path', possible_transports=[t1])
939
t1 = transport.get_transport_from_url('http://foo/path')
940
t2 = transport.get_transport_from_url('http://bar/path',
941
possible_transports=[t1])
759
942
self.assertIsNot(t1, t2)
762
class TestTransportTrace(TestCase):
945
class TestTransportTrace(tests.TestCase):
765
transport = get_transport('trace+memory://')
947
def test_decorator(self):
948
t = transport.get_transport_from_url('trace+memory://')
766
949
self.assertIsInstance(
767
transport, bzrlib.transport.trace.TransportTraceDecorator)
950
t, bzrlib.transport.trace.TransportTraceDecorator)
769
952
def test_clone_preserves_activity(self):
770
transport = get_transport('trace+memory://')
771
transport2 = transport.clone('.')
772
self.assertTrue(transport is not transport2)
773
self.assertTrue(transport._activity is transport2._activity)
953
t = transport.get_transport_from_url('trace+memory://')
955
self.assertTrue(t is not t2)
956
self.assertTrue(t._activity is t2._activity)
775
958
# the following specific tests are for the operations that have made use of
776
959
# logging in tests; we could test every single operation but doing that
777
960
# still won't cause a test failure when the top level Transport API
778
961
# changes; so there is little return doing that.
779
962
def test_get(self):
780
transport = get_transport('trace+memory:///')
781
transport.put_bytes('foo', 'barish')
963
t = transport.get_transport_from_url('trace+memory:///')
964
t.put_bytes('foo', 'barish')
783
966
expected_result = []
784
967
# put_bytes records the bytes, not the content to avoid memory
786
969
expected_result.append(('put_bytes', 'foo', 6, None))
787
970
# get records the file name only.
788
971
expected_result.append(('get', 'foo'))
789
self.assertEqual(expected_result, transport._activity)
972
self.assertEqual(expected_result, t._activity)
791
974
def test_readv(self):
792
transport = get_transport('trace+memory:///')
793
transport.put_bytes('foo', 'barish')
794
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
975
t = transport.get_transport_from_url('trace+memory:///')
976
t.put_bytes('foo', 'barish')
977
list(t.readv('foo', [(0, 1), (3, 2)],
978
adjust_for_latency=True, upper_limit=6))
796
979
expected_result = []
797
980
# put_bytes records the bytes, not the content to avoid memory
799
982
expected_result.append(('put_bytes', 'foo', 6, None))
800
983
# readv records the supplied offset request
801
984
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
802
self.assertEqual(expected_result, transport._activity)
985
self.assertEqual(expected_result, t._activity)
988
class TestSSHConnections(tests.TestCaseWithTransport):
990
def test_bzr_connect_to_bzr_ssh(self):
991
"""get_transport of a bzr+ssh:// behaves correctly.
993
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
995
# This test actually causes a bzr instance to be invoked, which is very
996
# expensive: it should be the only such test in the test suite.
997
# A reasonable evolution for this would be to simply check inside
998
# check_channel_exec_request that the command is appropriate, and then
999
# satisfy requests in-process.
1000
self.requireFeature(features.paramiko)
1001
# SFTPFullAbsoluteServer has a get_url method, and doesn't
1002
# override the interface (doesn't change self._vendor).
1003
# Note that this does encryption, so can be slow.
1004
from bzrlib.tests import stub_sftp
1006
# Start an SSH server
1007
self.command_executed = []
1008
# XXX: This is horrible -- we define a really dumb SSH server that
1009
# executes commands, and manage the hooking up of stdin/out/err to the
1010
# SSH channel ourselves. Surely this has already been implemented
1014
class StubSSHServer(stub_sftp.StubServer):
1018
def check_channel_exec_request(self, channel, command):
1019
self.test.command_executed.append(command)
1020
proc = subprocess.Popen(
1021
command, shell=True, stdin=subprocess.PIPE,
1022
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1024
# XXX: horribly inefficient, not to mention ugly.
1025
# Start a thread for each of stdin/out/err, and relay bytes
1026
# from the subprocess to channel and vice versa.
1027
def ferry_bytes(read, write, close):
1036
(channel.recv, proc.stdin.write, proc.stdin.close),
1037
(proc.stdout.read, channel.sendall, channel.close),
1038
(proc.stderr.read, channel.sendall_stderr, channel.close)]
1039
started.append(proc)
1040
for read, write, close in file_functions:
1041
t = threading.Thread(
1042
target=ferry_bytes, args=(read, write, close))
1048
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1049
# We *don't* want to override the default SSH vendor: the detected one
1050
# is the one to use.
1052
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1053
# inherits from SFTPServer which forces the SSH vendor to
1054
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1055
self.start_server(ssh_server)
1056
port = ssh_server.port
1058
if sys.platform == 'win32':
1059
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1061
bzr_remote_path = self.get_bzr_path()
1062
self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1064
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
1065
# variable is used to tell bzr what command to run on the remote end.
1066
path_to_branch = osutils.abspath('.')
1067
if sys.platform == 'win32':
1068
# On Windows, we export all drives as '/C:/, etc. So we need to
1069
# prefix a '/' to get the right path.
1070
path_to_branch = '/' + path_to_branch
1071
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1072
t = transport.get_transport(url)
1073
self.permit_url(t.base)
1077
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1078
self.command_executed)
1079
# Make sure to disconnect, so that the remote process can stop, and we
1080
# can cleanup. Then pause the test until everything is shutdown
1081
t._client._medium.disconnect()
1084
# First wait for the subprocess
1086
# And the rest are threads
1087
for t in started[1:]:
1091
class TestUnhtml(tests.TestCase):
1093
"""Tests for unhtml_roughly"""
1095
def test_truncation(self):
1096
fake_html = "<p>something!\n" * 1000
1097
result = http.unhtml_roughly(fake_html)
1098
self.assertEquals(len(result), 1000)
1099
self.assertStartsWith(result, " something!")
1102
class SomeDirectory(object):
1104
def look_up(self, name, url):
1108
class TestLocationToUrl(tests.TestCase):
1110
def get_base_location(self):
1111
path = osutils.abspath('/foo/bar')
1112
if path.startswith('/'):
1113
url = 'file://%s' % (path,)
1115
# On Windows, abspaths start with the drive letter, so we have to
1116
# add in the extra '/'
1117
url = 'file:///%s' % (path,)
1120
def test_regular_url(self):
1121
self.assertEquals("file://foo", location_to_url("file://foo"))
1123
def test_directory(self):
1124
directories.register("bar:", SomeDirectory, "Dummy directory")
1125
self.addCleanup(directories.remove, "bar:")
1126
self.assertEquals("http://bar", location_to_url("bar:"))
1128
def test_unicode_url(self):
1129
self.assertRaises(errors.InvalidURL, location_to_url,
1130
"http://fo/\xc3\xaf".decode("utf-8"))
1132
def test_unicode_path(self):
1133
path, url = self.get_base_location()
1134
location = path + "\xc3\xaf".decode("utf-8")
1136
self.assertEquals(url, location_to_url(location))
1138
def test_path(self):
1139
path, url = self.get_base_location()
1140
self.assertEquals(url, location_to_url(path))
1142
def test_relative_file_url(self):
1143
self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1144
location_to_url("file:bar"))
1146
def test_absolute_file_url(self):
1147
self.assertEquals("file:///bar", location_to_url("file:/bar"))