13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
18
from cStringIO import StringIO
21
25
from bzrlib import (
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
32
from bzrlib.directory_service import directories
33
from bzrlib.transport import (
43
import bzrlib.transport.trace
44
from bzrlib.tests import (
53
50
# TODO: Should possibly split transport-specific tests into their own files.
56
class TestTransport(TestCase):
53
class TestTransport(tests.TestCase):
57
54
"""Test the non transport-concrete class functionality."""
59
56
def test__get_set_protocol_handlers(self):
60
handlers = _get_protocol_handlers()
61
self.assertNotEqual([], handlers.keys( ))
63
_clear_protocol_handlers()
64
self.assertEqual([], _get_protocol_handlers().keys())
66
_set_protocol_handlers(handlers)
57
handlers = transport._get_protocol_handlers()
58
self.assertNotEqual([], handlers.keys())
59
transport._clear_protocol_handlers()
60
self.addCleanup(transport._set_protocol_handlers, handlers)
61
self.assertEqual([], transport._get_protocol_handlers().keys())
68
63
def test_get_transport_modules(self):
69
handlers = _get_protocol_handlers()
64
handlers = transport._get_protocol_handlers()
65
self.addCleanup(transport._set_protocol_handlers, handlers)
70
66
# don't pollute the current handlers
71
_clear_protocol_handlers()
67
transport._clear_protocol_handlers()
72
69
class SampleHandler(object):
73
70
"""I exist, isnt that enough?"""
75
_clear_protocol_handlers()
76
register_transport_proto('foo')
77
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
79
register_transport_proto('bar')
80
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'bzrlib.transport.chroot'],
84
_get_transport_modules())
86
_set_protocol_handlers(handlers)
71
transport._clear_protocol_handlers()
72
transport.register_transport_proto('foo')
73
transport.register_lazy_transport('foo',
74
'bzrlib.tests.test_transport',
75
'TestTransport.SampleHandler')
76
transport.register_transport_proto('bar')
77
transport.register_lazy_transport('bar',
78
'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
self.assertEqual([SampleHandler.__module__,
81
'bzrlib.transport.chroot',
82
'bzrlib.transport.pathfilter'],
83
transport._get_transport_modules())
88
85
def test_transport_dependency(self):
89
86
"""Transport with missing dependency causes no error"""
90
saved_handlers = _get_protocol_handlers()
87
saved_handlers = transport._get_protocol_handlers()
88
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
89
# don't pollute the current handlers
92
_clear_protocol_handlers()
90
transport._clear_protocol_handlers()
91
transport.register_transport_proto('foo')
92
transport.register_lazy_transport(
93
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
94
register_transport_proto('foo')
95
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
'BadTransportHandler')
98
get_transport('foo://fooserver/foo')
99
except UnsupportedProtocol, e:
101
self.assertEquals('Unsupported protocol'
102
' for url "foo://fooserver/foo":'
103
' Unable to import library "some_lib":'
104
' testing missing dependency', str(e))
106
self.fail('Did not raise UnsupportedProtocol')
108
# restore original values
109
_set_protocol_handlers(saved_handlers)
95
transport.get_transport_from_url('foo://fooserver/foo')
96
except errors.UnsupportedProtocol, e:
98
self.assertEqual('Unsupported protocol'
99
' for url "foo://fooserver/foo":'
100
' Unable to import library "some_lib":'
101
' testing missing dependency', str(e))
103
self.fail('Did not raise UnsupportedProtocol')
111
105
def test_transport_fallback(self):
112
106
"""Transport with missing dependency causes no error"""
113
saved_handlers = _get_protocol_handlers()
107
saved_handlers = transport._get_protocol_handlers()
108
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
109
transport._clear_protocol_handlers()
110
transport.register_transport_proto('foo')
111
transport.register_lazy_transport(
112
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
113
transport.register_lazy_transport(
114
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
115
t = transport.get_transport_from_url('foo://fooserver/foo')
116
self.assertTrue(isinstance(t, BackupTransportHandler))
118
def test_ssh_hints(self):
119
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
115
_clear_protocol_handlers()
116
register_transport_proto('foo')
117
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
'BackupTransportHandler')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BadTransportHandler')
121
t = get_transport('foo://fooserver/foo')
122
self.assertTrue(isinstance(t, BackupTransportHandler))
124
_set_protocol_handlers(saved_handlers)
121
transport.get_transport_from_url('ssh://fooserver/foo')
122
except errors.UnsupportedProtocol, e:
124
self.assertEqual('Unsupported protocol'
125
' for url "ssh://fooserver/foo":'
126
' bzr supports bzr+ssh to operate over ssh,'
127
' use "bzr+ssh://fooserver/foo".',
130
self.fail('Did not raise UnsupportedProtocol')
126
132
def test_LateReadError(self):
127
133
"""The LateReadError helper should raise on read()."""
128
a_file = LateReadError('a path')
134
a_file = transport.LateReadError('a path')
131
except ReadError, error:
137
except errors.ReadError, error:
132
138
self.assertEqual('a path', error.path)
133
self.assertRaises(ReadError, a_file.read, 40)
139
self.assertRaises(errors.ReadError, a_file.read, 40)
136
def test__combine_paths(self):
138
self.assertEqual('/home/sarah/project/foo',
139
t._combine_paths('/home/sarah', 'project/foo'))
140
self.assertEqual('/etc',
141
t._combine_paths('/home/sarah', '../../etc'))
142
self.assertEqual('/etc',
143
t._combine_paths('/home/sarah', '../../../etc'))
144
self.assertEqual('/etc',
145
t._combine_paths('/home/sarah', '/etc'))
147
142
def test_local_abspath_non_local_transport(self):
148
143
# the base implementation should throw
149
t = MemoryTransport()
144
t = memory.MemoryTransport()
150
145
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
146
self.assertEqual('memory:///t is not a local path.', str(e))
154
class TestCoalesceOffsets(TestCase):
149
class TestCoalesceOffsets(tests.TestCase):
156
151
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
157
coalesce = Transport._coalesce_offsets
158
exp = [_CoalescedOffset(*x) for x in expected]
152
coalesce = transport.Transport._coalesce_offsets
153
exp = [transport._CoalescedOffset(*x) for x in expected]
159
154
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
160
155
max_size=max_size))
161
156
self.assertEqual(exp, out)
207
202
def test_coalesce_fudge(self):
208
203
self.check([(10, 30, [(0, 10), (20, 10)]),
209
(100, 10, [(0, 10),]),
204
(100, 10, [(0, 10)]),
210
205
], [(10, 10), (30, 10), (100, 10)],
213
208
def test_coalesce_max_size(self):
214
209
self.check([(10, 20, [(0, 10), (10, 10)]),
215
210
(30, 50, [(0, 50)]),
216
211
# If one range is above max_size, it gets its own coalesced
218
(100, 80, [(0, 80),]),],
213
(100, 80, [(0, 80)]),],
219
214
[(10, 10), (20, 10), (30, 50), (100, 80)],
223
217
def test_coalesce_no_max_size(self):
224
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
218
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
225
219
[(10, 10), (20, 10), (30, 50), (80, 100)],
229
class TestMemoryTransport(TestCase):
222
def test_coalesce_default_limit(self):
223
# By default we use a 100MB max size.
224
ten_mb = 10 * 1024 * 1024
225
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
226
(10*ten_mb, ten_mb, [(0, ten_mb)])],
227
[(i*ten_mb, ten_mb) for i in range(11)])
228
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
229
[(i * ten_mb, ten_mb) for i in range(11)],
230
max_size=1*1024*1024*1024)
233
class TestMemoryServer(tests.TestCase):
235
def test_create_server(self):
236
server = memory.MemoryServer()
237
server.start_server()
238
url = server.get_url()
239
self.assertTrue(url in transport.transport_list_registry)
240
t = transport.get_transport_from_url(url)
243
self.assertFalse(url in transport.transport_list_registry)
244
self.assertRaises(errors.UnsupportedProtocol,
245
transport.get_transport, url)
248
class TestMemoryTransport(tests.TestCase):
231
250
def test_get_transport(self):
251
memory.MemoryTransport()
234
253
def test_clone(self):
235
transport = MemoryTransport()
236
self.assertTrue(isinstance(transport, MemoryTransport))
237
self.assertEqual("memory:///", transport.clone("/").base)
254
t = memory.MemoryTransport()
255
self.assertTrue(isinstance(t, memory.MemoryTransport))
256
self.assertEqual("memory:///", t.clone("/").base)
239
258
def test_abspath(self):
240
transport = MemoryTransport()
241
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
259
t = memory.MemoryTransport()
260
self.assertEqual("memory:///relpath", t.abspath('relpath'))
243
262
def test_abspath_of_root(self):
244
transport = MemoryTransport()
245
self.assertEqual("memory:///", transport.base)
246
self.assertEqual("memory:///", transport.abspath('/'))
263
t = memory.MemoryTransport()
264
self.assertEqual("memory:///", t.base)
265
self.assertEqual("memory:///", t.abspath('/'))
248
267
def test_abspath_of_relpath_starting_at_root(self):
249
transport = MemoryTransport()
250
self.assertEqual("memory:///foo", transport.abspath('/foo'))
268
t = memory.MemoryTransport()
269
self.assertEqual("memory:///foo", t.abspath('/foo'))
252
271
def test_append_and_get(self):
253
transport = MemoryTransport()
254
transport.append_bytes('path', 'content')
255
self.assertEqual(transport.get('path').read(), 'content')
256
transport.append_file('path', StringIO('content'))
257
self.assertEqual(transport.get('path').read(), 'contentcontent')
272
t = memory.MemoryTransport()
273
t.append_bytes('path', 'content')
274
self.assertEqual(t.get('path').read(), 'content')
275
t.append_file('path', StringIO('content'))
276
self.assertEqual(t.get('path').read(), 'contentcontent')
259
278
def test_put_and_get(self):
260
transport = MemoryTransport()
261
transport.put_file('path', StringIO('content'))
262
self.assertEqual(transport.get('path').read(), 'content')
263
transport.put_bytes('path', 'content')
264
self.assertEqual(transport.get('path').read(), 'content')
279
t = memory.MemoryTransport()
280
t.put_file('path', StringIO('content'))
281
self.assertEqual(t.get('path').read(), 'content')
282
t.put_bytes('path', 'content')
283
self.assertEqual(t.get('path').read(), 'content')
266
285
def test_append_without_dir_fails(self):
267
transport = MemoryTransport()
268
self.assertRaises(NoSuchFile,
269
transport.append_bytes, 'dir/path', 'content')
286
t = memory.MemoryTransport()
287
self.assertRaises(errors.NoSuchFile,
288
t.append_bytes, 'dir/path', 'content')
271
290
def test_put_without_dir_fails(self):
272
transport = MemoryTransport()
273
self.assertRaises(NoSuchFile,
274
transport.put_file, 'dir/path', StringIO('content'))
291
t = memory.MemoryTransport()
292
self.assertRaises(errors.NoSuchFile,
293
t.put_file, 'dir/path', StringIO('content'))
276
295
def test_get_missing(self):
277
transport = MemoryTransport()
278
self.assertRaises(NoSuchFile, transport.get, 'foo')
296
transport = memory.MemoryTransport()
297
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
280
299
def test_has_missing(self):
281
transport = MemoryTransport()
282
self.assertEquals(False, transport.has('foo'))
300
t = memory.MemoryTransport()
301
self.assertEqual(False, t.has('foo'))
284
303
def test_has_present(self):
285
transport = MemoryTransport()
286
transport.append_bytes('foo', 'content')
287
self.assertEquals(True, transport.has('foo'))
304
t = memory.MemoryTransport()
305
t.append_bytes('foo', 'content')
306
self.assertEqual(True, t.has('foo'))
289
308
def test_list_dir(self):
290
transport = MemoryTransport()
291
transport.put_bytes('foo', 'content')
292
transport.mkdir('dir')
293
transport.put_bytes('dir/subfoo', 'content')
294
transport.put_bytes('dirlike', 'content')
309
t = memory.MemoryTransport()
310
t.put_bytes('foo', 'content')
312
t.put_bytes('dir/subfoo', 'content')
313
t.put_bytes('dirlike', 'content')
296
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
297
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
315
self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
316
self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
299
318
def test_mkdir(self):
300
transport = MemoryTransport()
301
transport.mkdir('dir')
302
transport.append_bytes('dir/path', 'content')
303
self.assertEqual(transport.get('dir/path').read(), 'content')
319
t = memory.MemoryTransport()
321
t.append_bytes('dir/path', 'content')
322
self.assertEqual(t.get('dir/path').read(), 'content')
305
324
def test_mkdir_missing_parent(self):
306
transport = MemoryTransport()
307
self.assertRaises(NoSuchFile,
308
transport.mkdir, 'dir/dir')
325
t = memory.MemoryTransport()
326
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
310
328
def test_mkdir_twice(self):
311
transport = MemoryTransport()
312
transport.mkdir('dir')
313
self.assertRaises(FileExists, transport.mkdir, 'dir')
329
t = memory.MemoryTransport()
331
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
315
333
def test_parameters(self):
316
transport = MemoryTransport()
317
self.assertEqual(True, transport.listable())
318
self.assertEqual(False, transport.is_readonly())
334
t = memory.MemoryTransport()
335
self.assertEqual(True, t.listable())
336
self.assertEqual(False, t.is_readonly())
320
338
def test_iter_files_recursive(self):
321
transport = MemoryTransport()
322
transport.mkdir('dir')
323
transport.put_bytes('dir/foo', 'content')
324
transport.put_bytes('dir/bar', 'content')
325
transport.put_bytes('bar', 'content')
326
paths = set(transport.iter_files_recursive())
339
t = memory.MemoryTransport()
341
t.put_bytes('dir/foo', 'content')
342
t.put_bytes('dir/bar', 'content')
343
t.put_bytes('bar', 'content')
344
paths = set(t.iter_files_recursive())
327
345
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
329
347
def test_stat(self):
330
transport = MemoryTransport()
331
transport.put_bytes('foo', 'content')
332
transport.put_bytes('bar', 'phowar')
333
self.assertEqual(7, transport.stat('foo').st_size)
334
self.assertEqual(6, transport.stat('bar').st_size)
337
class ChrootDecoratorTransportTest(TestCase):
348
t = memory.MemoryTransport()
349
t.put_bytes('foo', 'content')
350
t.put_bytes('bar', 'phowar')
351
self.assertEqual(7, t.stat('foo').st_size)
352
self.assertEqual(6, t.stat('bar').st_size)
355
class ChrootDecoratorTransportTest(tests.TestCase):
338
356
"""Chroot decoration specific tests."""
340
358
def test_abspath(self):
341
359
# The abspath is always relative to the chroot_url.
342
server = ChrootServer(get_transport('memory:///foo/bar/'))
344
transport = get_transport(server.get_url())
345
self.assertEqual(server.get_url(), transport.abspath('/'))
360
server = chroot.ChrootServer(
361
transport.get_transport_from_url('memory:///foo/bar/'))
362
self.start_server(server)
363
t = transport.get_transport_from_url(server.get_url())
364
self.assertEqual(server.get_url(), t.abspath('/'))
347
subdir_transport = transport.clone('subdir')
348
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
366
subdir_t = t.clone('subdir')
367
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
351
369
def test_clone(self):
352
server = ChrootServer(get_transport('memory:///foo/bar/'))
354
transport = get_transport(server.get_url())
370
server = chroot.ChrootServer(
371
transport.get_transport_from_url('memory:///foo/bar/'))
372
self.start_server(server)
373
t = transport.get_transport_from_url(server.get_url())
355
374
# relpath from root and root path are the same
356
relpath_cloned = transport.clone('foo')
357
abspath_cloned = transport.clone('/foo')
375
relpath_cloned = t.clone('foo')
376
abspath_cloned = t.clone('/foo')
358
377
self.assertEqual(server, relpath_cloned.server)
359
378
self.assertEqual(server, abspath_cloned.server)
362
380
def test_chroot_url_preserves_chroot(self):
363
381
"""Calling get_transport on a chroot transport's base should produce a
364
382
transport with exactly the same behaviour as the original chroot
384
402
This is so that it is not possible to escape a chroot by doing::
385
403
url = chroot_transport.base
386
404
parent_url = urlutils.join(url, '..')
387
new_transport = get_transport(parent_url)
405
new_t = transport.get_transport_from_url(parent_url)
389
server = ChrootServer(get_transport('memory:///path/'))
391
transport = get_transport(server.get_url())
407
server = chroot.ChrootServer(
408
transport.get_transport_from_url('memory:///path/'))
409
self.start_server(server)
410
t = transport.get_transport_from_url(server.get_url())
392
411
self.assertRaises(
393
InvalidURLJoin, urlutils.join, transport.base, '..')
397
class ChrootServerTest(TestCase):
412
errors.InvalidURLJoin, urlutils.join, t.base, '..')
415
class TestChrootServer(tests.TestCase):
399
417
def test_construct(self):
400
backing_transport = MemoryTransport()
401
server = ChrootServer(backing_transport)
418
backing_transport = memory.MemoryTransport()
419
server = chroot.ChrootServer(backing_transport)
402
420
self.assertEqual(backing_transport, server.backing_transport)
404
422
def test_setUp(self):
405
backing_transport = MemoryTransport()
406
server = ChrootServer(backing_transport)
408
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
423
backing_transport = memory.MemoryTransport()
424
server = chroot.ChrootServer(backing_transport)
425
server.start_server()
426
self.addCleanup(server.stop_server)
427
self.assertTrue(server.scheme
428
in transport._get_protocol_handlers().keys())
410
def test_tearDown(self):
411
backing_transport = MemoryTransport()
412
server = ChrootServer(backing_transport)
415
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
430
def test_stop_server(self):
431
backing_transport = memory.MemoryTransport()
432
server = chroot.ChrootServer(backing_transport)
433
server.start_server()
435
self.assertFalse(server.scheme
436
in transport._get_protocol_handlers().keys())
417
438
def test_get_url(self):
418
backing_transport = MemoryTransport()
419
server = ChrootServer(backing_transport)
439
backing_transport = memory.MemoryTransport()
440
server = chroot.ChrootServer(backing_transport)
441
server.start_server()
442
self.addCleanup(server.stop_server)
421
443
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
425
class ReadonlyDecoratorTransportTest(TestCase):
446
class TestHooks(tests.TestCase):
447
"""Basic tests for transport hooks"""
449
def _get_connected_transport(self):
450
return transport.ConnectedTransport("bogus:nowhere")
452
def test_transporthooks_initialisation(self):
453
"""Check all expected transport hook points are set up"""
454
hookpoint = transport.TransportHooks()
455
self.assertTrue("post_connect" in hookpoint,
456
"post_connect not in %s" % (hookpoint,))
458
def test_post_connect(self):
459
"""Ensure the post_connect hook is called when _set_transport is"""
461
transport.Transport.hooks.install_named_hook("post_connect",
463
t = self._get_connected_transport()
464
self.assertLength(0, calls)
465
t._set_connection("connection", "auth")
466
self.assertEqual(calls, [t])
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
"""Pathfilter decoration specific tests."""
472
def test_abspath(self):
473
# The abspath is always relative to the base of the backing transport.
474
server = pathfilter.PathFilteringServer(
475
transport.get_transport_from_url('memory:///foo/bar/'),
477
server.start_server()
478
t = transport.get_transport_from_url(server.get_url())
479
self.assertEqual(server.get_url(), t.abspath('/'))
481
subdir_t = t.clone('subdir')
482
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
485
def make_pf_transport(self, filter_func=None):
486
"""Make a PathFilteringTransport backed by a MemoryTransport.
488
:param filter_func: by default this will be a no-op function. Use this
489
parameter to override it."""
490
if filter_func is None:
491
filter_func = lambda x: x
492
server = pathfilter.PathFilteringServer(
493
transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
494
server.start_server()
495
self.addCleanup(server.stop_server)
496
return transport.get_transport_from_url(server.get_url())
498
def test__filter(self):
499
# _filter (with an identity func as filter_func) always returns
500
# paths relative to the base of the backing transport.
501
t = self.make_pf_transport()
502
self.assertEqual('foo', t._filter('foo'))
503
self.assertEqual('foo/bar', t._filter('foo/bar'))
504
self.assertEqual('', t._filter('..'))
505
self.assertEqual('', t._filter('/'))
506
# The base of the pathfiltering transport is taken into account too.
507
t = t.clone('subdir1/subdir2')
508
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
509
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
510
self.assertEqual('subdir1', t._filter('..'))
511
self.assertEqual('', t._filter('/'))
513
def test_filter_invocation(self):
517
filter_log.append(path)
519
t = self.make_pf_transport(filter)
521
self.assertEqual(['abc'], filter_log)
523
t.clone('abc').has('xyz')
524
self.assertEqual(['abc/xyz'], filter_log)
527
self.assertEqual(['abc'], filter_log)
529
def test_clone(self):
530
t = self.make_pf_transport()
531
# relpath from root and root path are the same
532
relpath_cloned = t.clone('foo')
533
abspath_cloned = t.clone('/foo')
534
self.assertEqual(t.server, relpath_cloned.server)
535
self.assertEqual(t.server, abspath_cloned.server)
537
def test_url_preserves_pathfiltering(self):
538
"""Calling get_transport on a pathfiltered transport's base should
539
produce a transport with exactly the same behaviour as the original
540
pathfiltered transport.
542
This is so that it is not possible to escape (accidentally or
543
otherwise) the filtering by doing::
544
url = filtered_transport.base
545
parent_url = urlutils.join(url, '..')
546
new_t = transport.get_transport_from_url(parent_url)
548
t = self.make_pf_transport()
549
new_t = transport.get_transport_from_url(t.base)
550
self.assertEqual(t.server, new_t.server)
551
self.assertEqual(t.base, new_t.base)
554
class ReadonlyDecoratorTransportTest(tests.TestCase):
426
555
"""Readonly decoration specific tests."""
428
557
def test_local_parameters(self):
429
import bzrlib.transport.readonly as readonly
430
558
# connect to . in readonly mode
431
transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
self.assertEqual(True, transport.listable())
433
self.assertEqual(True, transport.is_readonly())
559
t = readonly.ReadonlyTransportDecorator('readonly+.')
560
self.assertEqual(True, t.listable())
561
self.assertEqual(True, t.is_readonly())
435
563
def test_http_parameters(self):
436
from bzrlib.tests.HttpServer import HttpServer
437
import bzrlib.transport.readonly as readonly
438
# connect to . via http which is not listable
564
from bzrlib.tests.http_server import HttpServer
565
# connect to '.' via http which is not listable
439
566
server = HttpServer()
442
transport = get_transport('readonly+' + server.get_url())
443
self.failUnless(isinstance(transport,
444
readonly.ReadonlyTransportDecorator))
445
self.assertEqual(False, transport.listable())
446
self.assertEqual(True, transport.is_readonly())
451
class FakeNFSDecoratorTests(TestCaseInTempDir):
567
self.start_server(server)
568
t = transport.get_transport_from_url('readonly+' + server.get_url())
569
self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
570
self.assertEqual(False, t.listable())
571
self.assertEqual(True, t.is_readonly())
574
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
452
575
"""NFS decorator specific tests."""
454
577
def get_nfs_transport(self, url):
455
import bzrlib.transport.fakenfs as fakenfs
456
578
# connect to url with nfs decoration
457
579
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
459
581
def test_local_parameters(self):
460
582
# the listable and is_readonly parameters
461
583
# are not changed by the fakenfs decorator
462
transport = self.get_nfs_transport('.')
463
self.assertEqual(True, transport.listable())
464
self.assertEqual(False, transport.is_readonly())
584
t = self.get_nfs_transport('.')
585
self.assertEqual(True, t.listable())
586
self.assertEqual(False, t.is_readonly())
466
588
def test_http_parameters(self):
467
589
# the listable and is_readonly parameters
468
590
# are not changed by the fakenfs decorator
469
from bzrlib.tests.HttpServer import HttpServer
470
# connect to . via http which is not listable
591
from bzrlib.tests.http_server import HttpServer
592
# connect to '.' via http which is not listable
471
593
server = HttpServer()
474
transport = self.get_nfs_transport(server.get_url())
475
self.assertIsInstance(
476
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
self.assertEqual(False, transport.listable())
478
self.assertEqual(True, transport.is_readonly())
594
self.start_server(server)
595
t = self.get_nfs_transport(server.get_url())
596
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
597
self.assertEqual(False, t.listable())
598
self.assertEqual(True, t.is_readonly())
482
600
def test_fakenfs_server_default(self):
483
601
# a FakeNFSServer() should bring up a local relpath server for itself
484
import bzrlib.transport.fakenfs as fakenfs
485
server = fakenfs.FakeNFSServer()
488
# the url should be decorated appropriately
489
self.assertStartsWith(server.get_url(), 'fakenfs+')
490
# and we should be able to get a transport for it
491
transport = get_transport(server.get_url())
492
# which must be a FakeNFSTransportDecorator instance.
493
self.assertIsInstance(
494
transport, fakenfs.FakeNFSTransportDecorator)
602
server = test_server.FakeNFSServer()
603
self.start_server(server)
604
# the url should be decorated appropriately
605
self.assertStartsWith(server.get_url(), 'fakenfs+')
606
# and we should be able to get a transport for it
607
t = transport.get_transport_from_url(server.get_url())
608
# which must be a FakeNFSTransportDecorator instance.
609
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
498
611
def test_fakenfs_rename_semantics(self):
499
612
# a FakeNFS transport must mangle the way rename errors occur to
500
613
# look like NFS problems.
501
transport = self.get_nfs_transport('.')
614
t = self.get_nfs_transport('.')
502
615
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
504
self.assertRaises(errors.ResourceBusy,
505
transport.rename, 'from', 'to')
508
class FakeVFATDecoratorTests(TestCaseInTempDir):
617
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
620
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
509
621
"""Tests for simulation of VFAT restrictions"""
511
623
def get_vfat_transport(self, url):
516
628
def test_transport_creation(self):
517
629
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
518
transport = self.get_vfat_transport('.')
519
self.assertIsInstance(transport, FakeVFATTransportDecorator)
630
t = self.get_vfat_transport('.')
631
self.assertIsInstance(t, FakeVFATTransportDecorator)
521
633
def test_transport_mkdir(self):
522
transport = self.get_vfat_transport('.')
523
transport.mkdir('HELLO')
524
self.assertTrue(transport.has('hello'))
525
self.assertTrue(transport.has('Hello'))
634
t = self.get_vfat_transport('.')
636
self.assertTrue(t.has('hello'))
637
self.assertTrue(t.has('Hello'))
527
639
def test_forbidden_chars(self):
528
transport = self.get_vfat_transport('.')
529
self.assertRaises(ValueError, transport.has, "<NU>")
532
class BadTransportHandler(Transport):
640
t = self.get_vfat_transport('.')
641
self.assertRaises(ValueError, t.has, "<NU>")
644
class BadTransportHandler(transport.Transport):
533
645
def __init__(self, base_url):
534
raise DependencyNotPresent('some_lib', 'testing missing dependency')
537
class BackupTransportHandler(Transport):
646
raise errors.DependencyNotPresent('some_lib',
647
'testing missing dependency')
650
class BackupTransportHandler(transport.Transport):
538
651
"""Test transport that works as a backup for the BadTransportHandler"""
542
class TestTransportImplementation(TestCaseInTempDir):
655
class TestTransportImplementation(tests.TestCaseInTempDir):
543
656
"""Implementation verification for transports.
545
658
To verify a transport we need a server factory, which is a callable
546
659
that accepts no parameters and returns an implementation of
547
660
bzrlib.transport.Server.
549
662
That Server is then used to construct transport instances and test
550
663
the transport via loopback activity.
552
Currently this assumes that the Transport object is connected to the
553
current working directory. So that whatever is done
554
through the transport, should show up in the working
665
Currently this assumes that the Transport object is connected to the
666
current working directory. So that whatever is done
667
through the transport, should show up in the working
555
668
directory, and vice-versa. This is a bug, because its possible to have
556
URL schemes which provide access to something that may not be
557
result in storage on the local disk, i.e. due to file system limits, or
669
URL schemes which provide access to something that may not be
670
result in storage on the local disk, i.e. due to file system limits, or
558
671
due to it being a database or some other non-filesystem tool.
560
673
This also tests to make sure that the functions work with both
561
674
generators and lists (assuming iter(list) is effectively a generator)
565
678
super(TestTransportImplementation, self).setUp()
566
679
self._server = self.transport_server()
568
self.addCleanup(self._server.tearDown)
680
self.start_server(self._server)
570
682
def get_transport(self, relpath=None):
571
683
"""Return a connected transport to the local directory.
589
class TestLocalTransports(TestCase):
701
class TestTransportFromPath(tests.TestCaseInTempDir):
703
def test_with_path(self):
704
t = transport.get_transport_from_path(self.test_dir)
705
self.assertIsInstance(t, local.LocalTransport)
706
self.assertEqual(t.base.rstrip("/"),
707
urlutils.local_path_to_url(self.test_dir))
709
def test_with_url(self):
710
t = transport.get_transport_from_path("file:")
711
self.assertIsInstance(t, local.LocalTransport)
712
self.assertEqual(t.base.rstrip("/"),
713
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
718
def test_with_path(self):
719
self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
722
def test_with_url(self):
723
url = urlutils.local_path_to_url(self.test_dir)
724
t = transport.get_transport_from_url(url)
725
self.assertIsInstance(t, local.LocalTransport)
726
self.assertEqual(t.base.rstrip("/"), url)
728
def test_with_url_and_segment_parameters(self):
729
url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
730
t = transport.get_transport_from_url(url)
731
self.assertIsInstance(t, local.LocalTransport)
732
self.assertEqual(t.base.rstrip("/"), url)
733
with open(os.path.join(self.test_dir, "afile"), 'w') as f:
735
self.assertTrue(t.has("afile"))
738
class TestLocalTransports(tests.TestCase):
591
740
def test_get_transport_from_abspath(self):
592
741
here = osutils.abspath('.')
593
t = get_transport(here)
594
self.assertIsInstance(t, LocalTransport)
595
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
742
t = transport.get_transport(here)
743
self.assertIsInstance(t, local.LocalTransport)
744
self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
597
746
def test_get_transport_from_relpath(self):
598
747
here = osutils.abspath('.')
599
t = get_transport('.')
600
self.assertIsInstance(t, LocalTransport)
601
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
748
t = transport.get_transport('.')
749
self.assertIsInstance(t, local.LocalTransport)
750
self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
603
752
def test_get_transport_from_local_url(self):
604
753
here = osutils.abspath('.')
605
754
here_url = urlutils.local_path_to_url(here) + '/'
606
t = get_transport(here_url)
607
self.assertIsInstance(t, LocalTransport)
608
self.assertEquals(t.base, here_url)
755
t = transport.get_transport(here_url)
756
self.assertIsInstance(t, local.LocalTransport)
757
self.assertEqual(t.base, here_url)
610
759
def test_local_abspath(self):
611
760
here = osutils.abspath('.')
612
t = get_transport(here)
613
self.assertEquals(t.local_abspath(''), here)
616
class TestWin32LocalTransport(TestCase):
761
t = transport.get_transport(here)
762
self.assertEqual(t.local_abspath(''), here)
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
767
def test_local_transport_mkdir(self):
768
here = osutils.abspath('.')
769
t = transport.get_transport(here)
771
self.assertTrue(os.path.exists('test'))
773
def test_local_transport_mkdir_permission_denied(self):
774
# See https://bugs.launchpad.net/bzr/+bug/606537
775
here = osutils.abspath('.')
776
t = transport.get_transport(here)
777
def fake_chmod(path, mode):
778
e = OSError('permission denied')
779
e.errno = errno.EPERM
781
self.overrideAttr(os, 'chmod', fake_chmod)
783
t.mkdir('test2', mode=0707)
784
self.assertTrue(os.path.exists('test'))
785
self.assertTrue(os.path.exists('test2'))
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
790
def test_local_fdatasync_calls_fdatasync(self):
791
"""Check fdatasync on a stream tries to flush the data to the OS.
793
We can't easily observe the external effect but we can at least see
797
fdatasync = getattr(os, 'fdatasync', sentinel)
798
if fdatasync is sentinel:
799
raise tests.TestNotApplicable('fdatasync not supported')
800
t = self.get_transport('.')
801
calls = self.recordCalls(os, 'fdatasync')
802
w = t.open_write_stream('out')
805
with open('out', 'rb') as f:
806
# Should have been flushed.
807
self.assertEqual(f.read(), 'foo')
808
self.assertEqual(len(calls), 1, calls)
810
def test_missing_directory(self):
811
t = self.get_transport('.')
812
self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
815
class TestWin32LocalTransport(tests.TestCase):
618
817
def test_unc_clone_to_root(self):
818
self.requireFeature(features.win32_feature)
619
819
# Win32 UNC path like \\HOST\path
620
820
# clone to root should stop at least at \\HOST part
622
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
822
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
623
823
for i in xrange(4):
624
824
t = t.clone('..')
625
self.assertEquals(t.base, 'file://HOST/')
825
self.assertEqual(t.base, 'file://HOST/')
626
826
# make sure we reach the root
627
827
t = t.clone('..')
628
self.assertEquals(t.base, 'file://HOST/')
631
class TestConnectedTransport(TestCase):
828
self.assertEqual(t.base, 'file://HOST/')
831
class TestConnectedTransport(tests.TestCase):
632
832
"""Tests for connected to remote server transports"""
634
834
def test_parse_url(self):
635
t = ConnectedTransport('http://simple.example.com/home/source')
636
self.assertEquals(t._host, 'simple.example.com')
637
self.assertEquals(t._port, None)
638
self.assertEquals(t._path, '/home/source/')
639
self.failUnless(t._user is None)
640
self.failUnless(t._password is None)
642
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
835
t = transport.ConnectedTransport(
836
'http://simple.example.com/home/source')
837
self.assertEqual(t._parsed_url.host, 'simple.example.com')
838
self.assertEqual(t._parsed_url.port, None)
839
self.assertEqual(t._parsed_url.path, '/home/source/')
840
self.assertTrue(t._parsed_url.user is None)
841
self.assertTrue(t._parsed_url.password is None)
843
self.assertEqual(t.base, 'http://simple.example.com/home/source/')
845
def test_parse_url_with_at_in_user(self):
847
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
848
self.assertEqual(t._parsed_url.user, 'user@host.com')
644
850
def test_parse_quoted_url(self):
645
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
646
self.assertEquals(t._host, 'exAmple.com')
647
self.assertEquals(t._port, 2222)
648
self.assertEquals(t._user, 'robey')
649
self.assertEquals(t._password, 'h@t')
650
self.assertEquals(t._path, '/path/')
851
t = transport.ConnectedTransport(
852
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
853
self.assertEqual(t._parsed_url.host, 'exAmple.com')
854
self.assertEqual(t._parsed_url.port, 2222)
855
self.assertEqual(t._parsed_url.user, 'robey')
856
self.assertEqual(t._parsed_url.password, 'h@t')
857
self.assertEqual(t._parsed_url.path, '/path/')
652
859
# Base should not keep track of the password
653
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
860
self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
655
862
def test_parse_invalid_url(self):
656
863
self.assertRaises(errors.InvalidURL,
864
transport.ConnectedTransport,
658
865
'sftp://lily.org:~janneke/public/bzr/gub')
660
867
def test_relpath(self):
661
t = ConnectedTransport('sftp://user@host.com/abs/path')
868
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
663
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
870
self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
664
872
self.assertRaises(errors.PathNotChild, t.relpath,
665
873
'http://user@host.com/abs/path/sub')
666
874
self.assertRaises(errors.PathNotChild, t.relpath,
705
913
self.assertIs(new_password, c._get_credentials())
708
class TestReusedTransports(TestCase):
916
class TestReusedTransports(tests.TestCase):
709
917
"""Tests for transport reuse"""
711
919
def test_reuse_same_transport(self):
712
920
possible_transports = []
713
t1 = get_transport('http://foo/',
714
possible_transports=possible_transports)
921
t1 = transport.get_transport_from_url('http://foo/',
922
possible_transports=possible_transports)
715
923
self.assertEqual([t1], possible_transports)
716
t2 = get_transport('http://foo/', possible_transports=[t1])
924
t2 = transport.get_transport_from_url('http://foo/',
925
possible_transports=[t1])
717
926
self.assertIs(t1, t2)
719
928
# Also check that final '/' are handled correctly
720
t3 = get_transport('http://foo/path/')
721
t4 = get_transport('http://foo/path', possible_transports=[t3])
929
t3 = transport.get_transport_from_url('http://foo/path/')
930
t4 = transport.get_transport_from_url('http://foo/path',
931
possible_transports=[t3])
722
932
self.assertIs(t3, t4)
724
t5 = get_transport('http://foo/path')
725
t6 = get_transport('http://foo/path/', possible_transports=[t5])
934
t5 = transport.get_transport_from_url('http://foo/path')
935
t6 = transport.get_transport_from_url('http://foo/path/',
936
possible_transports=[t5])
726
937
self.assertIs(t5, t6)
728
939
def test_don_t_reuse_different_transport(self):
729
t1 = get_transport('http://foo/path')
730
t2 = get_transport('http://bar/path', possible_transports=[t1])
940
t1 = transport.get_transport_from_url('http://foo/path')
941
t2 = transport.get_transport_from_url('http://bar/path',
942
possible_transports=[t1])
731
943
self.assertIsNot(t1, t2)
734
class TestTransportTrace(TestCase):
946
class TestTransportTrace(tests.TestCase):
737
transport = get_transport('trace+memory://')
948
def test_decorator(self):
949
t = transport.get_transport_from_url('trace+memory://')
738
950
self.assertIsInstance(
739
transport, bzrlib.transport.trace.TransportTraceDecorator)
951
t, bzrlib.transport.trace.TransportTraceDecorator)
741
953
def test_clone_preserves_activity(self):
742
transport = get_transport('trace+memory://')
743
transport2 = transport.clone('.')
744
self.assertTrue(transport is not transport2)
745
self.assertTrue(transport._activity is transport2._activity)
954
t = transport.get_transport_from_url('trace+memory://')
956
self.assertTrue(t is not t2)
957
self.assertTrue(t._activity is t2._activity)
747
959
# the following specific tests are for the operations that have made use of
748
960
# logging in tests; we could test every single operation but doing that
749
961
# still won't cause a test failure when the top level Transport API
750
962
# changes; so there is little return doing that.
751
963
def test_get(self):
752
transport = get_transport('trace+memory:///')
753
transport.put_bytes('foo', 'barish')
964
t = transport.get_transport_from_url('trace+memory:///')
965
t.put_bytes('foo', 'barish')
755
967
expected_result = []
756
968
# put_bytes records the bytes, not the content to avoid memory
758
970
expected_result.append(('put_bytes', 'foo', 6, None))
759
971
# get records the file name only.
760
972
expected_result.append(('get', 'foo'))
761
self.assertEqual(expected_result, transport._activity)
973
self.assertEqual(expected_result, t._activity)
763
975
def test_readv(self):
764
transport = get_transport('trace+memory:///')
765
transport.put_bytes('foo', 'barish')
766
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
976
t = transport.get_transport_from_url('trace+memory:///')
977
t.put_bytes('foo', 'barish')
978
list(t.readv('foo', [(0, 1), (3, 2)],
979
adjust_for_latency=True, upper_limit=6))
768
980
expected_result = []
769
981
# put_bytes records the bytes, not the content to avoid memory
771
983
expected_result.append(('put_bytes', 'foo', 6, None))
772
984
# readv records the supplied offset request
773
985
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
774
self.assertEqual(expected_result, transport._activity)
986
self.assertEqual(expected_result, t._activity)
989
class TestSSHConnections(tests.TestCaseWithTransport):
991
def test_bzr_connect_to_bzr_ssh(self):
992
"""get_transport of a bzr+ssh:// behaves correctly.
994
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
996
# This test actually causes a bzr instance to be invoked, which is very
997
# expensive: it should be the only such test in the test suite.
998
# A reasonable evolution for this would be to simply check inside
999
# check_channel_exec_request that the command is appropriate, and then
1000
# satisfy requests in-process.
1001
self.requireFeature(features.paramiko)
1002
# SFTPFullAbsoluteServer has a get_url method, and doesn't
1003
# override the interface (doesn't change self._vendor).
1004
# Note that this does encryption, so can be slow.
1005
from bzrlib.tests import stub_sftp
1007
# Start an SSH server
1008
self.command_executed = []
1009
# XXX: This is horrible -- we define a really dumb SSH server that
1010
# executes commands, and manage the hooking up of stdin/out/err to the
1011
# SSH channel ourselves. Surely this has already been implemented
1015
class StubSSHServer(stub_sftp.StubServer):
1019
def check_channel_exec_request(self, channel, command):
1020
self.test.command_executed.append(command)
1021
proc = subprocess.Popen(
1022
command, shell=True, stdin=subprocess.PIPE,
1023
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1025
# XXX: horribly inefficient, not to mention ugly.
1026
# Start a thread for each of stdin/out/err, and relay bytes
1027
# from the subprocess to channel and vice versa.
1028
def ferry_bytes(read, write, close):
1037
(channel.recv, proc.stdin.write, proc.stdin.close),
1038
(proc.stdout.read, channel.sendall, channel.close),
1039
(proc.stderr.read, channel.sendall_stderr, channel.close)]
1040
started.append(proc)
1041
for read, write, close in file_functions:
1042
t = threading.Thread(
1043
target=ferry_bytes, args=(read, write, close))
1049
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1050
# We *don't* want to override the default SSH vendor: the detected one
1051
# is the one to use.
1053
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1054
# inherits from SFTPServer which forces the SSH vendor to
1055
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1056
self.start_server(ssh_server)
1057
port = ssh_server.port
1059
if sys.platform == 'win32':
1060
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1062
bzr_remote_path = self.get_bzr_path()
1063
self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1065
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
1066
# variable is used to tell bzr what command to run on the remote end.
1067
path_to_branch = osutils.abspath('.')
1068
if sys.platform == 'win32':
1069
# On Windows, we export all drives as '/C:/, etc. So we need to
1070
# prefix a '/' to get the right path.
1071
path_to_branch = '/' + path_to_branch
1072
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1073
t = transport.get_transport(url)
1074
self.permit_url(t.base)
1078
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1079
self.command_executed)
1080
# Make sure to disconnect, so that the remote process can stop, and we
1081
# can cleanup. Then pause the test until everything is shutdown
1082
t._client._medium.disconnect()
1085
# First wait for the subprocess
1087
# And the rest are threads
1088
for t in started[1:]:
1092
class TestUnhtml(tests.TestCase):
1094
"""Tests for unhtml_roughly"""
1096
def test_truncation(self):
1097
fake_html = "<p>something!\n" * 1000
1098
result = http.unhtml_roughly(fake_html)
1099
self.assertEqual(len(result), 1000)
1100
self.assertStartsWith(result, " something!")
1103
class SomeDirectory(object):
1105
def look_up(self, name, url):
1109
class TestLocationToUrl(tests.TestCase):
1111
def get_base_location(self):
1112
path = osutils.abspath('/foo/bar')
1113
if path.startswith('/'):
1114
url = 'file://%s' % (path,)
1116
# On Windows, abspaths start with the drive letter, so we have to
1117
# add in the extra '/'
1118
url = 'file:///%s' % (path,)
1121
def test_regular_url(self):
1122
self.assertEqual("file://foo", location_to_url("file://foo"))
1124
def test_directory(self):
1125
directories.register("bar:", SomeDirectory, "Dummy directory")
1126
self.addCleanup(directories.remove, "bar:")
1127
self.assertEqual("http://bar", location_to_url("bar:"))
1129
def test_unicode_url(self):
1130
self.assertRaises(errors.InvalidURL, location_to_url,
1131
"http://fo/\xc3\xaf".decode("utf-8"))
1133
def test_unicode_path(self):
1134
path, url = self.get_base_location()
1135
location = path + "\xc3\xaf".decode("utf-8")
1137
self.assertEqual(url, location_to_url(location))
1139
def test_path(self):
1140
path, url = self.get_base_location()
1141
self.assertEqual(url, location_to_url(path))
1143
def test_relative_file_url(self):
1144
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1145
location_to_url("file:bar"))
1147
def test_absolute_file_url(self):
1148
self.assertEqual("file:///bar", location_to_url("file:/bar"))