20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
31
TransportNotPossible, ConnectionError)
32
from bzrlib.tests import TestCaseInTempDir, TestSkipped
33
from bzrlib.transport import memory, urlescape
34
import bzrlib.transport
38
"""Append the given text (file-like object) to the supplied filename."""
46
class TestTransportImplementation(TestCaseInTempDir):
47
"""Implementation verification for transports.
49
To verify a transport we need a server factory, which is a callable
50
that accepts no parameters and returns an implementation of
51
bzrlib.transport.Server.
53
That Server is then used to construct transport instances and test
54
the transport via loopback activity.
56
Currently this assumes that the Transport object is connected to the
57
current working directory. So that whatever is done
58
through the transport, should show up in the working
59
directory, and vice-versa. This is a bug, because its possible to have
60
URL schemes which provide access to something that may not be
61
result in storage on the local disk, i.e. due to file system limits, or
62
due to it being a database or some other non-filesystem tool.
64
This also tests to make sure that the functions work with both
65
generators and lists (assuming iter(list) is effectively a generator)
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
99
class TransportTests(TestTransportImplementation):
69
super(TestTransportImplementation, self).setUp()
70
self._server = self.transport_server()
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
74
super(TestTransportImplementation, self).tearDown()
75
self._server.tearDown()
77
105
def check_transport_contents(self, content, transport, relpath):
78
"""Check that transport.get(relpath).read() == content."""
79
self.assertEqualDiff(content, transport.get(relpath).read())
81
def get_transport(self):
82
"""Return a connected transport to the local directory."""
83
base_url = self._server.get_url()
84
t = bzrlib.transport.get_transport(base_url)
85
if not isinstance(t, self.transport_class):
86
# we want to make sure to construct one particular class, even if
87
# there are several available implementations of this transport;
88
# therefore construct it by hand rather than through the regular
89
# get_transport method
90
t = self.transport_class(base_url)
93
def assertListRaises(self, excClass, func, *args, **kwargs):
94
"""Fail unless excClass is raised when the iterator from func is used.
96
Many transport functions can return generators this makes sure
97
to wrap them in a list() call to make sure the whole generator
98
is run, and that the proper exception is raised.
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
101
list(func(*args, **kwargs))
105
if hasattr(excClass,'__name__'): excName = excClass.__name__
106
else: excName = str(excClass)
107
raise self.failureException, "%s not raised" % excName
146
except errors.InProcessTransport:
109
149
def test_has(self):
110
150
t = self.get_transport()
135
190
self.build_tree(files, transport=t, line_endings='binary')
136
191
self.check_transport_contents('contents of a\n', t, 'a')
137
192
content_f = t.get_multi(files)
138
for content, f in zip(contents, content_f):
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
139
197
self.assertEqual(content, f.read())
141
199
content_f = t.get_multi(iter(files))
142
for content, f in zip(contents, content_f):
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
143
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
145
211
self.assertRaises(NoSuchFile, t.get, 'c')
146
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
147
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
150
t = self.get_transport()
153
self.assertRaises(TransportNotPossible,
154
t.put, 'a', 'some text for a\n')
157
t.put('a', StringIO('some text for a\n'))
158
self.failUnless(t.has('a'))
159
self.check_transport_contents('some text for a\n', t, 'a')
160
# Make sure 'has' is updated
161
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
162
[True, False, False, False, False])
163
# Put also replaces contents
164
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
165
('d', StringIO('contents\nfor d\n'))]),
167
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
168
[True, False, False, True, False])
169
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
170
self.check_transport_contents('contents\nfor d\n', t, 'd')
173
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
174
('d', StringIO('another contents\nfor d\n'))])),
176
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
177
self.check_transport_contents('another contents\nfor d\n', t, 'd')
179
self.assertRaises(NoSuchFile,
180
t.put, 'path/doesnt/exist/c', 'contents')
182
def test_put_permissions(self):
183
t = self.get_transport()
187
t.put('mode644', StringIO('test text\n'), mode=0644)
188
self.assertTransportMode(t, 'mode644', 0644)
189
t.put('mode666', StringIO('test text\n'), mode=0666)
190
self.assertTransportMode(t, 'mode666', 0666)
191
t.put('mode600', StringIO('test text\n'), mode=0600)
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
244
def test_get_bytes(self):
245
t = self.get_transport()
247
files = ['a', 'b', 'e', 'g']
248
contents = ['contents of a\n',
253
self.build_tree(files, transport=t, line_endings='binary')
254
self.check_transport_contents('contents of a\n', t, 'a')
256
for content, fname in zip(contents, files):
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
290
def test_put_bytes(self):
291
t = self.get_transport()
294
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
299
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
302
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
306
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
309
def test_put_bytes_non_atomic(self):
310
t = self.get_transport()
313
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
317
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
319
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
321
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
325
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
# Now test the create_parent flag
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
# But we still get NoSuchFile if we can't make the parent dir
343
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
347
def test_put_bytes_permissions(self):
348
t = self.get_transport()
352
if not t._can_roundtrip_unix_modebits():
353
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0644)
356
self.assertTransportMode(t, 'mode644', 0644)
357
t.put_bytes('mode666', 'test text\n', mode=0666)
358
self.assertTransportMode(t, 'mode666', 0666)
359
t.put_bytes('mode600', 'test text\n', mode=0600)
360
self.assertTransportMode(t, 'mode600', 0600)
361
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0400)
363
self.assertTransportMode(t, 'mode400', 0400)
365
# The default permissions should be based on the current umask
366
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
368
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
370
def test_put_bytes_non_atomic_permissions(self):
371
t = self.get_transport()
375
if not t._can_roundtrip_unix_modebits():
376
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
self.assertTransportMode(t, 'mode644', 0644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
self.assertTransportMode(t, 'mode666', 0666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
self.assertTransportMode(t, 'mode600', 0600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
self.assertTransportMode(t, 'mode400', 0400)
387
# The default permissions should be based on the current umask
388
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
392
# We should also be able to set the mode for a parent directory
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
dir_mode=0700, create_parent_dir=True)
396
self.assertTransportMode(t, 'dir700', 0700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
dir_mode=0770, create_parent_dir=True)
399
self.assertTransportMode(t, 'dir770', 0770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
dir_mode=0777, create_parent_dir=True)
402
self.assertTransportMode(t, 'dir777', 0777)
404
def test_put_file(self):
405
t = self.get_transport()
408
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', StringIO('some text for a\n'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
417
# Put also replaces contents
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
self.assertRaises(NoSuchFile,
422
t.put_file, 'path/doesnt/exist/c',
423
StringIO('contents'))
425
def test_put_file_non_atomic(self):
426
t = self.get_transport()
429
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
433
self.assertFalse(t.has('a'))
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
437
# Put also replaces contents
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
441
# Make sure we can create another file
442
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
# And overwrite 'a' with empty contents
444
t.put_file_non_atomic('a', StringIO(''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
StringIO('contents\n'))
450
# Now test the create_parent flag
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
StringIO('contents\n'))
453
self.assertFalse(t.has('dir/a'))
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
458
# But we still get NoSuchFile if we can't make the parent dir
459
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
StringIO('contents\n'),
461
create_parent_dir=True)
463
def test_put_file_permissions(self):
465
t = self.get_transport()
469
if not t._can_roundtrip_unix_modebits():
470
# Can't roundtrip, so no need to run this test
472
t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
self.assertTransportMode(t, 'mode644', 0644)
474
t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
self.assertTransportMode(t, 'mode666', 0666)
476
t.put_file('mode600', StringIO('test text\n'), mode=0600)
192
477
self.assertTransportMode(t, 'mode600', 0600)
193
478
# Yes, you can put a file such that it becomes readonly
194
t.put('mode400', StringIO('test text\n'), mode=0400)
195
self.assertTransportMode(t, 'mode400', 0400)
196
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
197
self.assertTransportMode(t, 'mmode644', 0644)
479
t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
self.assertTransportMode(t, 'mode400', 0400)
481
# The default permissions should be based on the current umask
482
umask = osutils.get_umask()
483
t.put_file('nomode', StringIO('test text\n'), mode=None)
484
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
486
def test_put_file_non_atomic_permissions(self):
487
t = self.get_transport()
491
if not t._can_roundtrip_unix_modebits():
492
# Can't roundtrip, so no need to run this test
494
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
self.assertTransportMode(t, 'mode644', 0644)
496
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
self.assertTransportMode(t, 'mode666', 0666)
498
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
self.assertTransportMode(t, 'mode600', 0600)
500
# Yes, you can put_file_non_atomic a file such that it becomes readonly
501
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
self.assertTransportMode(t, 'mode400', 0400)
504
# The default permissions should be based on the current umask
505
umask = osutils.get_umask()
506
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
509
# We should also be able to set the mode for a parent directory
512
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
dir_mode=0700, create_parent_dir=True)
514
self.assertTransportMode(t, 'dir700', 0700)
515
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
dir_mode=0770, create_parent_dir=True)
517
self.assertTransportMode(t, 'dir770', 0770)
518
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
dir_mode=0777, create_parent_dir=True)
520
self.assertTransportMode(t, 'dir777', 0777)
522
def test_put_bytes_unicode(self):
523
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
525
# (we don't want to encode unicode here at all, callers should be
526
# strictly passing bytes to put_bytes), but we allow it for backwards
527
# compatibility. At some point we should use a specific exception.
528
# See https://bugs.launchpad.net/bzr/+bug/106898.
529
t = self.get_transport()
532
unicode_string = u'\u1234'
534
(AssertionError, UnicodeEncodeError),
535
t.put_bytes, 'foo', unicode_string)
537
def test_put_file_unicode(self):
538
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
# This situation can happen (and has) if code is careless about the type
540
# of "string" they initialise/write to a StringIO with. We cannot use
541
# cStringIO, because it never returns unicode from read.
542
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
# raise, but we raise it for hysterical raisins.
544
t = self.get_transport()
547
unicode_file = pyStringIO(u'\u1234')
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
199
550
def test_mkdir(self):
200
551
t = self.get_transport()
202
553
if t.is_readonly():
203
# cannot mkdir on readonly transports. We're not testing for
554
# cannot mkdir on readonly transports. We're not testing for
204
555
# cache coherency because cache behaviour is not currently
205
556
# defined for the transport interface.
206
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
290
673
self.build_tree(['e/', 'e/f'])
293
t.put('e/f', StringIO('contents of e'))
676
t.put_bytes('e/f', 'contents of e')
294
677
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
295
678
temp_transport.mkdir('e')
296
679
t.copy_to(['e/f'], temp_transport)
298
681
del temp_transport
299
temp_transport = MemoryTransport('memory:/')
682
temp_transport = MemoryTransport('memory:///')
301
684
files = ['a', 'b', 'c', 'd']
302
685
t.copy_to(iter(files), temp_transport)
304
self.check_transport_contents(temp_transport.get(f).read(),
687
self.check_transport_contents(temp_transport.get_bytes(f),
306
689
del temp_transport
308
691
for mode in (0666, 0644, 0600, 0400):
309
temp_transport = MemoryTransport("memory:/")
692
temp_transport = MemoryTransport("memory:///")
310
693
t.copy_to(files, temp_transport, mode=mode)
312
695
self.assertTransportMode(temp_transport, f, mode)
314
def test_append(self):
315
t = self.get_transport()
318
open('a', 'wb').write('diff\ncontents for\na\n')
319
open('b', 'wb').write('contents\nfor b\n')
322
('a', StringIO('diff\ncontents for\na\n')),
323
('b', StringIO('contents\nfor b\n'))
327
self.assertRaises(TransportNotPossible,
328
t.append, 'a', 'add\nsome\nmore\ncontents\n')
329
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
332
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
334
self.check_transport_contents(
335
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
339
self.assertRaises(TransportNotPossible,
341
[('a', 'and\nthen\nsome\nmore\n'),
342
('b', 'some\nmore\nfor\nb\n')])
343
_append('a', StringIO('and\nthen\nsome\nmore\n'))
344
_append('b', StringIO('some\nmore\nfor\nb\n'))
346
self.assertEqual((43, 15),
347
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
348
('b', StringIO('some\nmore\nfor\nb\n'))]))
697
def test_create_prefix(self):
698
t = self.get_transport()
699
sub = t.clone('foo').clone('bar')
702
except TransportNotPossible:
703
self.assertTrue(t.is_readonly())
705
self.assertTrue(t.has('foo/bar'))
707
def test_append_file(self):
708
t = self.get_transport()
711
self.assertRaises(TransportNotPossible,
712
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
714
t.put_bytes('a', 'diff\ncontents for\na\n')
715
t.put_bytes('b', 'contents\nfor b\n')
718
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
720
self.check_transport_contents(
721
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
724
# a file with no parent should fail..
725
self.assertRaises(NoSuchFile,
726
t.append_file, 'missing/path', StringIO('content'))
728
# And we can create new files, too
730
t.append_file('c', StringIO('some text\nfor a missing file\n')))
731
self.check_transport_contents('some text\nfor a missing file\n',
734
def test_append_bytes(self):
735
t = self.get_transport()
738
self.assertRaises(TransportNotPossible,
739
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
742
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
743
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
746
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
748
self.check_transport_contents(
749
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
752
# a file with no parent should fail..
753
self.assertRaises(NoSuchFile,
754
t.append_bytes, 'missing/path', 'content')
756
def test_append_multi(self):
757
t = self.get_transport()
761
t.put_bytes('a', 'diff\ncontents for\na\n'
762
'add\nsome\nmore\ncontents\n')
763
t.put_bytes('b', 'contents\nfor b\n')
765
self.assertEqual((43, 15),
766
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
767
('b', StringIO('some\nmore\nfor\nb\n'))]))
349
769
self.check_transport_contents(
350
770
'diff\ncontents for\na\n'
351
771
'add\nsome\nmore\ncontents\n'
733
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
734
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
735
1092
subdir = t.clone('subdir')
736
subdir.stat('./file')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1096
def test_hardlink(self):
1097
from stat import ST_NLINK
1099
t = self.get_transport()
1101
source_name = "original_target"
1102
link_name = "target_link"
1104
self.build_tree([source_name], transport=t)
1107
t.hardlink(source_name, link_name)
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1112
st = t.stat(link_name)
1113
self.assertEqual(st[ST_NLINK], 2)
1114
except TransportNotPossible:
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1116
self._server.__class__)
1118
def test_symlink(self):
1119
from stat import S_ISLNK
1121
t = self.get_transport()
1123
source_name = "original_target"
1124
link_name = "target_link"
1126
self.build_tree([source_name], transport=t)
1129
t.symlink(source_name, link_name)
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1134
st = t.stat(link_name)
1135
self.assertTrue(S_ISLNK(st.st_mode),
1136
"expected symlink, got mode %o" % st.st_mode)
1137
except TransportNotPossible:
1138
raise TestSkipped("Transport %s does not support symlinks." %
1139
self._server.__class__)
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
739
1143
def test_list_dir(self):
740
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
741
1145
t = self.get_transport()
743
1147
if not t.listable():
744
1148
self.assertRaises(TransportNotPossible, t.list_dir, '.')
748
l = list(t.list_dir(d))
1151
def sorted_list(d, transport):
1152
l = list(transport.list_dir(d))
752
# SftpServer creates control files in the working directory
753
# so lets move down a directory to avoid those.
754
if not t.is_readonly():
760
self.assertEqual([], sorted_list(u'.'))
1156
self.assertEqual([], sorted_list('.', t))
761
1157
# c2 is precisely one letter longer than c here to test that
762
1158
# suffixing is not confused.
1159
# a%25b checks that quoting is done consistently across transports
1160
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
763
1162
if not t.is_readonly():
764
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1163
self.build_tree(tree_names, transport=t)
766
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
768
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
769
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
1165
self.build_tree(tree_names)
1168
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1170
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1171
self.assertEqual(['d', 'e'], sorted_list('c', t))
1173
# Cloning the transport produces an equivalent listing
1174
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
771
1176
if not t.is_readonly():
778
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
779
self.assertEqual([u'e'], sorted_list(u'c'))
781
self.assertListRaises(NoSuchFile, t.list_dir, 'q')
782
self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
783
self.assertListRaises(NoSuchFile, t.list_dir, 'a')
1183
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1184
self.assertEqual(['e'], sorted_list('c', t))
1186
self.assertListRaises(PathError, t.list_dir, 'q')
1187
self.assertListRaises(PathError, t.list_dir, 'c/f')
1188
# 'a' is a file, list_dir should raise an error
1189
self.assertListRaises(PathError, t.list_dir, 'a')
1191
def test_list_dir_result_is_url_escaped(self):
1192
t = self.get_transport()
1193
if not t.listable():
1194
raise TestSkipped("transport not listable")
1196
if not t.is_readonly():
1197
self.build_tree(['a/', 'a/%'], transport=t)
1199
self.build_tree(['a/', 'a/%'])
1201
names = list(t.list_dir('a'))
1202
self.assertEqual(['%25'], names)
1203
self.assertIsInstance(names[0], str)
1205
def test_clone_preserve_info(self):
1206
t1 = self.get_transport()
1207
if not isinstance(t1, ConnectedTransport):
1208
raise TestSkipped("not a connected transport")
1210
t2 = t1.clone('subdir')
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1217
def test__reuse_for(self):
1218
t = self.get_transport()
1219
if not isinstance(t, ConnectedTransport):
1220
raise TestSkipped("not a connected transport")
1222
def new_url(scheme=None, user=None, password=None,
1223
host=None, port=None, path=None):
1224
"""Build a new url from t.base changing only parts of it.
1226
Only the parameters different from None will be changed.
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1237
if t._parsed_url.scheme == 'ftp':
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1246
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
# passwords are not taken into account because:
1248
# - it makes no sense to have two different valid passwords for the
1250
# - _password in ConnectedTransport is intended to collect what the
1251
# user specified from the command-line and there are cases where the
1252
# new url can contain no password (if the url was built from an
1253
# existing transport.base for example)
1254
# - password are considered part of the credentials provided at
1255
# connection creation time and as such may not be present in the url
1256
# (they may be typed by the user when prompted for example)
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
# We will not connect, we can use a invalid host
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1264
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
# No point in trying to reuse a transport for a local URL
1266
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1268
def test_connection_sharing(self):
1269
t = self.get_transport()
1270
if not isinstance(t, ConnectedTransport):
1271
raise TestSkipped("not a connected transport")
1273
c = t.clone('subdir')
1274
# Some transports will create the connection only when needed
1275
t.has('surely_not') # Force connection
1276
self.assertIs(t._get_connection(), c._get_connection())
1278
# Temporary failure, we need to create a new dummy connection
1279
new_connection = None
1280
t._set_connection(new_connection)
1281
# Check that both transports use the same connection
1282
self.assertIs(new_connection, t._get_connection())
1283
self.assertIs(new_connection, c._get_connection())
1285
def test_reuse_connection_for_various_paths(self):
1286
t = self.get_transport()
1287
if not isinstance(t, ConnectedTransport):
1288
raise TestSkipped("not a connected transport")
1290
t.has('surely_not') # Force connection
1291
self.assertIsNot(None, t._get_connection())
1293
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
self.assertIsNot(t, subdir)
1295
self.assertIs(t._get_connection(), subdir._get_connection())
1297
home = subdir._reuse_for(t.base + 'home')
1298
self.assertIs(t._get_connection(), home._get_connection())
1299
self.assertIs(subdir._get_connection(), home._get_connection())
785
1301
def test_clone(self):
786
1302
# TODO: Test that clone moves up and down the filesystem
882
1472
self.assertEqual(paths,
883
1473
set(['isolated/dir/foo',
884
1474
'isolated/dir/bar',
1475
'isolated/dir/b%2525z',
885
1476
'isolated/bar']))
886
1477
sub_transport = transport.clone('isolated')
887
1478
paths = set(sub_transport.iter_files_recursive())
888
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
1479
self.assertEqual(paths,
1480
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1482
def test_copy_tree(self):
1483
# TODO: test file contents and permissions are preserved. This test was
1484
# added just to ensure that quoting was handled correctly.
1485
# -- David Allouche 2006-08-11
1486
transport = self.get_transport()
1487
if not transport.listable():
1488
self.assertRaises(TransportNotPossible,
1489
transport.iter_files_recursive)
1491
if transport.is_readonly():
1493
self.build_tree(['from/',
1497
'from/dir/b%25z', # make sure quoting is correct
1499
transport=transport)
1500
transport.copy_tree('from', 'to')
1501
paths = set(transport.iter_files_recursive())
1502
self.assertEqual(paths,
1503
set(['from/dir/foo',
1512
def test_copy_tree_to_transport(self):
1513
transport = self.get_transport()
1514
if not transport.listable():
1515
self.assertRaises(TransportNotPossible,
1516
transport.iter_files_recursive)
1518
if transport.is_readonly():
1520
self.build_tree(['from/',
1524
'from/dir/b%25z', # make sure quoting is correct
1526
transport=transport)
1527
from_transport = transport.clone('from')
1528
to_transport = transport.clone('to')
1529
to_transport.ensure_base()
1530
from_transport.copy_tree_to_transport(to_transport)
1531
paths = set(transport.iter_files_recursive())
1532
self.assertEqual(paths,
1533
set(['from/dir/foo',
1542
def test_unicode_paths(self):
1543
"""Test that we can read/write files with Unicode names."""
1544
t = self.get_transport()
1546
# With FAT32 and certain encodings on win32
1547
# '\xe5' and '\xe4' actually map to the same file
1548
# adding a suffix kicks in the 'preserving but insensitive'
1549
# route, and maintains the right files
1550
files = [u'\xe5.1', # a w/ circle iso-8859-1
1551
u'\xe4.2', # a w/ dots iso-8859-1
1552
u'\u017d', # Z with umlat iso-8859-2
1553
u'\u062c', # Arabic j
1554
u'\u0410', # Russian A
1555
u'\u65e5', # Kanji person
1558
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1559
if no_unicode_support:
1560
self.knownFailure("test server cannot handle unicode paths")
1563
self.build_tree(files, transport=t, line_endings='binary')
1564
except UnicodeError:
1565
raise TestSkipped("cannot handle unicode paths in current encoding")
1567
# A plain unicode string is not a valid url
1569
self.assertRaises(InvalidURL, t.get, fname)
1572
fname_utf8 = fname.encode('utf-8')
1573
contents = 'contents of %s\n' % (fname_utf8,)
1574
self.check_transport_contents(contents, t, urlutils.escape(fname))
890
1576
def test_connect_twice_is_same_content(self):
891
# check that our server (whatever it is) is accessable reliably
1577
# check that our server (whatever it is) is accessible reliably
892
1578
# via get_transport and multiple connections share content.
893
1579
transport = self.get_transport()
894
1580
if transport.is_readonly():
896
transport.put('foo', StringIO('bar'))
897
transport2 = self.get_transport()
898
self.check_transport_contents('bar', transport2, 'foo')
899
# its base should be usable.
900
transport2 = bzrlib.transport.get_transport(transport.base)
901
self.check_transport_contents('bar', transport2, 'foo')
1582
transport.put_bytes('foo', 'bar')
1583
transport3 = self.get_transport()
1584
self.check_transport_contents('bar', transport3, 'foo')
903
1586
# now opening at a relative url should give use a sane result:
904
1587
transport.mkdir('newdir')
905
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
906
transport2 = transport2.clone('..')
907
self.check_transport_contents('bar', transport2, 'foo')
1588
transport5 = self.get_transport('newdir')
1589
transport6 = transport5.clone('..')
1590
self.check_transport_contents('bar', transport6, 'foo')
909
1592
def test_lock_write(self):
1593
"""Test transport-level write locks.
1595
These are deprecated and transports may decline to support them.
910
1597
transport = self.get_transport()
911
1598
if transport.is_readonly():
912
1599
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
914
transport.put('lock', StringIO())
915
lock = transport.lock_write('lock')
1601
transport.put_bytes('lock', '')
1603
lock = transport.lock_write('lock')
1604
except TransportNotPossible:
916
1606
# TODO make this consistent on all platforms:
917
1607
# self.assertRaises(LockError, transport.lock_write, 'lock')
920
1610
def test_lock_read(self):
1611
"""Test transport-level read locks.
1613
These are deprecated and transports may decline to support them.
921
1615
transport = self.get_transport()
922
1616
if transport.is_readonly():
923
1617
file('lock', 'w').close()
925
transport.put('lock', StringIO())
926
lock = transport.lock_read('lock')
1619
transport.put_bytes('lock', '')
1621
lock = transport.lock_read('lock')
1622
except TransportNotPossible:
927
1624
# TODO make this consistent on all platforms:
928
1625
# self.assertRaises(LockError, transport.lock_read, 'lock')
931
1628
def test_readv(self):
932
1629
transport = self.get_transport()
933
1630
if transport.is_readonly():
934
file('a', 'w').write('0123456789')
1631
with file('a', 'w') as f: f.write('0123456789')
936
transport.put('a', StringIO('01234567890'))
1633
transport.put_bytes('a', '0123456789')
1635
d = list(transport.readv('a', ((0, 1),)))
1636
self.assertEqual(d[0], (0, '0'))
938
1638
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
939
1639
self.assertEqual(d[0], (0, '0'))
940
1640
self.assertEqual(d[1], (1, '1'))
941
1641
self.assertEqual(d[2], (3, '34'))
942
1642
self.assertEqual(d[3], (9, '9'))
1644
def test_readv_out_of_order(self):
1645
transport = self.get_transport()
1646
if transport.is_readonly():
1647
with file('a', 'w') as f: f.write('0123456789')
1649
transport.put_bytes('a', '01234567890')
1651
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1652
self.assertEqual(d[0], (1, '1'))
1653
self.assertEqual(d[1], (9, '9'))
1654
self.assertEqual(d[2], (0, '0'))
1655
self.assertEqual(d[3], (3, '34'))
1657
def test_readv_with_adjust_for_latency(self):
1658
transport = self.get_transport()
1659
# the adjust for latency flag expands the data region returned
1660
# according to a per-transport heuristic, so testing is a little
1661
# tricky as we need more data than the largest combining that our
1662
# transports do. To accomodate this we generate random data and cross
1663
# reference the returned data with the random data. To avoid doing
1664
# multiple large random byte look ups we do several tests on the same
1666
content = osutils.rand_bytes(200*1024)
1667
content_size = len(content)
1668
if transport.is_readonly():
1669
self.build_tree_contents([('a', content)])
1671
transport.put_bytes('a', content)
1672
def check_result_data(result_vector):
1673
for item in result_vector:
1674
data_len = len(item[1])
1675
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1678
result = list(transport.readv('a', ((0, 30),),
1679
adjust_for_latency=True, upper_limit=content_size))
1680
# we expect 1 result, from 0, to something > 30
1681
self.assertEqual(1, len(result))
1682
self.assertEqual(0, result[0][0])
1683
self.assertTrue(len(result[0][1]) >= 30)
1684
check_result_data(result)
1685
# end of file corner case
1686
result = list(transport.readv('a', ((204700, 100),),
1687
adjust_for_latency=True, upper_limit=content_size))
1688
# we expect 1 result, from 204800- its length, to the end
1689
self.assertEqual(1, len(result))
1690
data_len = len(result[0][1])
1691
self.assertEqual(204800-data_len, result[0][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# out of order ranges are made in order
1695
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
adjust_for_latency=True, upper_limit=content_size))
1697
# we expect 2 results, in order, start and end.
1698
self.assertEqual(2, len(result))
1700
data_len = len(result[0][1])
1701
self.assertEqual(0, result[0][0])
1702
self.assertTrue(data_len >= 30)
1704
data_len = len(result[1][1])
1705
self.assertEqual(204800-data_len, result[1][0])
1706
self.assertTrue(data_len >= 100)
1707
check_result_data(result)
1708
# close ranges get combined (even if out of order)
1709
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
result = list(transport.readv('a', request_vector,
1711
adjust_for_latency=True, upper_limit=content_size))
1712
self.assertEqual(1, len(result))
1713
data_len = len(result[0][1])
1714
# minimum length is from 400 to 1034 - 634
1715
self.assertTrue(data_len >= 634)
1716
# must contain the region 400 to 1034
1717
self.assertTrue(result[0][0] <= 400)
1718
self.assertTrue(result[0][0] + data_len >= 1034)
1719
check_result_data(result)
1721
def test_readv_with_adjust_for_latency_with_big_file(self):
1722
transport = self.get_transport()
1723
# test from observed failure case.
1724
if transport.is_readonly():
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1727
transport.put_bytes('a', 'a'*1024*1024)
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
(465373, 800), (947422, 800)]
1731
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
found_items = [False]*9
1733
for pos, (start, length) in enumerate(broken_vector):
1734
# check the range is covered by the result
1735
for offset, data in results:
1736
if offset <= start and start + length <= offset + len(data):
1737
found_items[pos] = True
1738
self.assertEqual([True]*9, found_items)
1740
def test_get_with_open_write_stream_sees_all_content(self):
1741
t = self.get_transport()
1744
handle = t.open_write_stream('foo')
1747
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1751
def test_get_smart_medium(self):
1752
"""All transports must either give a smart medium, or know they can't.
1754
transport = self.get_transport()
1756
client_medium = transport.get_smart_medium()
1757
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1758
except errors.NoSmartMedium:
1759
# as long as we got it we're fine
1762
def test_readv_short_read(self):
1763
transport = self.get_transport()
1764
if transport.is_readonly():
1765
with file('a', 'w') as f: f.write('0123456789')
1767
transport.put_bytes('a', '01234567890')
1769
# This is intentionally reading off the end of the file
1770
# since we are sure that it cannot get there
1771
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1772
# Can be raised by paramiko
1774
transport.readv, 'a', [(1,1), (8,10)])
1776
# This is trying to seek past the end of the file, it should
1777
# also raise a special error
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1808
def test_stat_symlink(self):
1809
# if a transport points directly to a symlink (and supports symlinks
1810
# at all) you can tell this. helps with bug 32669.
1811
t = self.get_transport()
1813
t.symlink('target', 'link')
1814
except TransportNotPossible:
1815
raise TestSkipped("symlinks not supported")
1816
t2 = t.clone('link')
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)