20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
31
from bzrlib import (
32
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
TransportNotPossible, ConnectionError,
37
from bzrlib.errors import (ConnectionError,
36
47
from bzrlib.osutils import getcwd
37
from bzrlib.tests import TestCaseInTempDir, TestSkipped
48
from bzrlib.smart import medium
49
from bzrlib.tests import (
38
55
from bzrlib.tests.test_transport import TestTransportImplementation
39
from bzrlib.transport import memory
40
import bzrlib.transport
44
"""Append the given text (file-like object) to the supplied filename."""
56
from bzrlib.transport import (
59
_get_transport_modules,
61
from bzrlib.transport.memory import MemoryTransport
64
def get_transport_test_permutations(module):
65
"""Get the permutations module wants to have tested."""
66
if getattr(module, 'get_test_permutations', None) is None:
68
"transport module %s doesn't provide get_test_permutations()"
71
return module.get_test_permutations()
74
def transport_test_permutations():
75
"""Return a list of the klass, server_factory pairs to test."""
77
for module in _get_transport_modules():
79
permutations = get_transport_test_permutations(
80
reduce(getattr, (module).split('.')[1:], __import__(module)))
81
for (klass, server_factory) in permutations:
82
scenario = (server_factory.__name__,
83
{"transport_class":klass,
84
"transport_server":server_factory})
85
result.append(scenario)
86
except errors.DependencyNotPresent, e:
87
# Continue even if a dependency prevents us
88
# from adding this test
93
def load_tests(standard_tests, module, loader):
94
"""Multiply tests for tranport implementations."""
95
result = loader.suiteClass()
96
scenarios = transport_test_permutations()
97
return multiply_tests(standard_tests, scenarios, result)
52
100
class TransportTests(TestTransportImplementation):
103
super(TransportTests, self).setUp()
104
self._captureVar('BZR_NO_SMART_VFS', None)
54
106
def check_transport_contents(self, content, transport, relpath):
55
107
"""Check that transport.get(relpath).read() == content."""
56
108
self.assertEqualDiff(content, transport.get(relpath).read())
58
def assertListRaises(self, excClass, func, *args, **kwargs):
59
"""Fail unless excClass is raised when the iterator from func is used.
61
Many transport functions can return generators this makes sure
62
to wrap them in a list() call to make sure the whole generator
63
is run, and that the proper exception is raised.
110
def test_ensure_base_missing(self):
111
""".ensure_base() should create the directory if it doesn't exist"""
112
t = self.get_transport()
114
if t_a.is_readonly():
115
self.assertRaises(TransportNotPossible,
118
self.assertTrue(t_a.ensure_base())
119
self.assertTrue(t.has('a'))
121
def test_ensure_base_exists(self):
122
""".ensure_base() should just be happy if it already exists"""
123
t = self.get_transport()
129
# ensure_base returns False if it didn't create the base
130
self.assertFalse(t_a.ensure_base())
132
def test_ensure_base_missing_parent(self):
133
""".ensure_base() will fail if the parent dir doesn't exist"""
134
t = self.get_transport()
140
self.assertRaises(NoSuchFile, t_b.ensure_base)
142
def test_external_url(self):
143
""".external_url either works or raises InProcessTransport."""
144
t = self.get_transport()
66
list(func(*args, **kwargs))
70
if hasattr(excClass,'__name__'): excName = excClass.__name__
71
else: excName = str(excClass)
72
raise self.failureException, "%s not raised" % excName
147
except errors.InProcessTransport:
74
150
def test_has(self):
75
151
t = self.get_transport()
100
192
self.build_tree(files, transport=t, line_endings='binary')
101
193
self.check_transport_contents('contents of a\n', t, 'a')
102
194
content_f = t.get_multi(files)
103
for content, f in zip(contents, content_f):
195
# Use itertools.izip() instead of use zip() or map(), since they fully
196
# evaluate their inputs, the transport requests should be issued and
197
# handled sequentially (we don't want to force transport to buffer).
198
for content, f in itertools.izip(contents, content_f):
104
199
self.assertEqual(content, f.read())
106
201
content_f = t.get_multi(iter(files))
107
for content, f in zip(contents, content_f):
202
# Use itertools.izip() for the same reason
203
for content, f in itertools.izip(contents, content_f):
108
204
self.assertEqual(content, f.read())
206
def test_get_unknown_file(self):
207
t = self.get_transport()
209
contents = ['contents of a\n',
212
self.build_tree(files, transport=t, line_endings='binary')
110
213
self.assertRaises(NoSuchFile, t.get, 'c')
111
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
112
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
115
t = self.get_transport()
118
self.assertRaises(TransportNotPossible,
119
t.put, 'a', 'some text for a\n')
122
t.put('a', StringIO('some text for a\n'))
123
self.failUnless(t.has('a'))
124
self.check_transport_contents('some text for a\n', t, 'a')
125
# Make sure 'has' is updated
126
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
127
[True, False, False, False, False])
128
# Put also replaces contents
129
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
130
('d', StringIO('contents\nfor d\n'))]),
132
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
133
[True, False, False, True, False])
134
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
135
self.check_transport_contents('contents\nfor d\n', t, 'd')
138
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
139
('d', StringIO('another contents\nfor d\n'))])),
141
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
142
self.check_transport_contents('another contents\nfor d\n', t, 'd')
144
self.assertRaises(NoSuchFile,
145
t.put, 'path/doesnt/exist/c', 'contents')
147
def test_put_permissions(self):
148
t = self.get_transport()
152
if not t._can_roundtrip_unix_modebits():
153
# Can't roundtrip, so no need to run this test
155
t.put('mode644', StringIO('test text\n'), mode=0644)
156
self.assertTransportMode(t, 'mode644', 0644)
157
t.put('mode666', StringIO('test text\n'), mode=0666)
158
self.assertTransportMode(t, 'mode666', 0666)
159
t.put('mode600', StringIO('test text\n'), mode=0600)
217
def test_get_directory_read_gives_ReadError(self):
218
"""consistent errors for read() on a file returned by get()."""
219
t = self.get_transport()
221
self.build_tree(['a directory/'])
223
t.mkdir('a%20directory')
224
# getting the file must either work or fail with a PathError
226
a_file = t.get('a%20directory')
227
except (errors.PathError, errors.RedirectRequested):
228
# early failure return immediately.
230
# having got a file, read() must either work (i.e. http reading a dir
231
# listing) or fail with ReadError
234
except errors.ReadError:
237
def test_get_bytes(self):
238
t = self.get_transport()
240
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
246
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
249
for content, fname in zip(contents, files):
250
self.assertEqual(content, t.get_bytes(fname))
252
def test_get_bytes_unknown_file(self):
253
t = self.get_transport()
255
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
257
def test_get_with_open_write_stream_sees_all_content(self):
258
t = self.get_transport()
261
handle = t.open_write_stream('foo')
264
self.assertEqual('b', t.get('foo').read())
268
def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
t = self.get_transport()
272
handle = t.open_write_stream('foo')
275
self.assertEqual('b', t.get_bytes('foo'))
276
self.assertEqual('b', t.get('foo').read())
280
def test_put_bytes(self):
281
t = self.get_transport()
284
self.assertRaises(TransportNotPossible,
285
t.put_bytes, 'a', 'some text for a\n')
288
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
290
self.check_transport_contents('some text for a\n', t, 'a')
292
# The contents should be overwritten
293
t.put_bytes('a', 'new text for a\n')
294
self.check_transport_contents('new text for a\n', t, 'a')
296
self.assertRaises(NoSuchFile,
297
t.put_bytes, 'path/doesnt/exist/c', 'contents')
299
def test_put_bytes_non_atomic(self):
300
t = self.get_transport()
303
self.assertRaises(TransportNotPossible,
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
307
self.failIf(t.has('a'))
308
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
310
self.check_transport_contents('some text for a\n', t, 'a')
311
# Put also replaces contents
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
315
# Make sure we can create another file
316
t.put_bytes_non_atomic('d', 'contents for\nd\n')
317
# And overwrite 'a' with empty contents
318
t.put_bytes_non_atomic('a', '')
319
self.check_transport_contents('contents for\nd\n', t, 'd')
320
self.check_transport_contents('', t, 'a')
322
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
324
# Now test the create_parent flag
325
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
create_parent_dir=True)
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
332
# But we still get NoSuchFile if we can't make the parent dir
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
335
create_parent_dir=True)
337
def test_put_bytes_permissions(self):
338
t = self.get_transport()
342
if not t._can_roundtrip_unix_modebits():
343
# Can't roundtrip, so no need to run this test
345
t.put_bytes('mode644', 'test text\n', mode=0644)
346
self.assertTransportMode(t, 'mode644', 0644)
347
t.put_bytes('mode666', 'test text\n', mode=0666)
348
self.assertTransportMode(t, 'mode666', 0666)
349
t.put_bytes('mode600', 'test text\n', mode=0600)
350
self.assertTransportMode(t, 'mode600', 0600)
351
# Yes, you can put_bytes a file such that it becomes readonly
352
t.put_bytes('mode400', 'test text\n', mode=0400)
353
self.assertTransportMode(t, 'mode400', 0400)
355
# The default permissions should be based on the current umask
356
umask = osutils.get_umask()
357
t.put_bytes('nomode', 'test text\n', mode=None)
358
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
360
def test_put_bytes_non_atomic_permissions(self):
361
t = self.get_transport()
365
if not t._can_roundtrip_unix_modebits():
366
# Can't roundtrip, so no need to run this test
368
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
self.assertTransportMode(t, 'mode644', 0644)
370
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
self.assertTransportMode(t, 'mode666', 0666)
372
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
self.assertTransportMode(t, 'mode600', 0600)
374
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
self.assertTransportMode(t, 'mode400', 0400)
377
# The default permissions should be based on the current umask
378
umask = osutils.get_umask()
379
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
382
# We should also be able to set the mode for a parent directory
384
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
dir_mode=0700, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir700', 0700)
387
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
dir_mode=0770, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir770', 0770)
390
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
dir_mode=0777, create_parent_dir=True)
392
self.assertTransportMode(t, 'dir777', 0777)
394
def test_put_file(self):
395
t = self.get_transport()
398
self.assertRaises(TransportNotPossible,
399
t.put_file, 'a', StringIO('some text for a\n'))
402
result = t.put_file('a', StringIO('some text for a\n'))
403
# put_file returns the length of the data written
404
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
406
self.check_transport_contents('some text for a\n', t, 'a')
407
# Put also replaces contents
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
409
self.assertEqual(19, result)
410
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
self.assertRaises(NoSuchFile,
412
t.put_file, 'path/doesnt/exist/c',
413
StringIO('contents'))
415
def test_put_file_non_atomic(self):
416
t = self.get_transport()
419
self.assertRaises(TransportNotPossible,
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
423
self.failIf(t.has('a'))
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
426
self.check_transport_contents('some text for a\n', t, 'a')
427
# Put also replaces contents
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
431
# Make sure we can create another file
432
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
433
# And overwrite 'a' with empty contents
434
t.put_file_non_atomic('a', StringIO(''))
435
self.check_transport_contents('contents for\nd\n', t, 'd')
436
self.check_transport_contents('', t, 'a')
438
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
StringIO('contents\n'))
440
# Now test the create_parent flag
441
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
create_parent_dir=True)
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
448
# But we still get NoSuchFile if we can't make the parent dir
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
StringIO('contents\n'),
451
create_parent_dir=True)
453
def test_put_file_permissions(self):
455
t = self.get_transport()
459
if not t._can_roundtrip_unix_modebits():
460
# Can't roundtrip, so no need to run this test
462
t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
self.assertTransportMode(t, 'mode644', 0644)
464
t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
self.assertTransportMode(t, 'mode666', 0666)
466
t.put_file('mode600', StringIO('test text\n'), mode=0600)
160
467
self.assertTransportMode(t, 'mode600', 0600)
161
468
# Yes, you can put a file such that it becomes readonly
162
t.put('mode400', StringIO('test text\n'), mode=0400)
163
self.assertTransportMode(t, 'mode400', 0400)
164
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
165
self.assertTransportMode(t, 'mmode644', 0644)
167
# The default permissions should be based on the current umask
168
umask = osutils.get_umask()
169
t.put('nomode', StringIO('test text\n'), mode=None)
170
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
469
t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
self.assertTransportMode(t, 'mode400', 0400)
471
# The default permissions should be based on the current umask
472
umask = osutils.get_umask()
473
t.put_file('nomode', StringIO('test text\n'), mode=None)
474
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
476
def test_put_file_non_atomic_permissions(self):
477
t = self.get_transport()
481
if not t._can_roundtrip_unix_modebits():
482
# Can't roundtrip, so no need to run this test
484
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
self.assertTransportMode(t, 'mode644', 0644)
486
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
self.assertTransportMode(t, 'mode666', 0666)
488
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
self.assertTransportMode(t, 'mode600', 0600)
490
# Yes, you can put_file_non_atomic a file such that it becomes readonly
491
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
self.assertTransportMode(t, 'mode400', 0400)
494
# The default permissions should be based on the current umask
495
umask = osutils.get_umask()
496
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
499
# We should also be able to set the mode for a parent directory
502
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
dir_mode=0700, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir700', 0700)
505
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
dir_mode=0770, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir770', 0770)
508
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
dir_mode=0777, create_parent_dir=True)
510
self.assertTransportMode(t, 'dir777', 0777)
512
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
t = self.get_transport()
522
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
172
540
def test_mkdir(self):
173
541
t = self.get_transport()
175
543
if t.is_readonly():
176
# cannot mkdir on readonly transports. We're not testing for
544
# cannot mkdir on readonly transports. We're not testing for
177
545
# cache coherency because cache behaviour is not currently
178
546
# defined for the transport interface.
179
547
self.assertRaises(TransportNotPossible, t.mkdir, '.')
292
685
self.assertTransportMode(temp_transport, f, mode)
294
def test_append(self):
295
t = self.get_transport()
298
open('a', 'wb').write('diff\ncontents for\na\n')
299
open('b', 'wb').write('contents\nfor b\n')
302
('a', StringIO('diff\ncontents for\na\n')),
303
('b', StringIO('contents\nfor b\n'))
307
self.assertRaises(TransportNotPossible,
308
t.append, 'a', 'add\nsome\nmore\ncontents\n')
309
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
312
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
314
self.check_transport_contents(
315
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
319
self.assertRaises(TransportNotPossible,
321
[('a', 'and\nthen\nsome\nmore\n'),
322
('b', 'some\nmore\nfor\nb\n')])
323
_append('a', StringIO('and\nthen\nsome\nmore\n'))
324
_append('b', StringIO('some\nmore\nfor\nb\n'))
326
self.assertEqual((43, 15),
327
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
328
('b', StringIO('some\nmore\nfor\nb\n'))]))
687
def test_create_prefix(self):
688
t = self.get_transport()
689
sub = t.clone('foo').clone('bar')
692
except TransportNotPossible:
693
self.assertTrue(t.is_readonly())
695
self.assertTrue(t.has('foo/bar'))
697
def test_append_file(self):
698
t = self.get_transport()
701
self.assertRaises(TransportNotPossible,
702
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
704
t.put_bytes('a', 'diff\ncontents for\na\n')
705
t.put_bytes('b', 'contents\nfor b\n')
708
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
710
self.check_transport_contents(
711
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
714
# a file with no parent should fail..
715
self.assertRaises(NoSuchFile,
716
t.append_file, 'missing/path', StringIO('content'))
718
# And we can create new files, too
720
t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
self.check_transport_contents('some text\nfor a missing file\n',
724
def test_append_bytes(self):
725
t = self.get_transport()
728
self.assertRaises(TransportNotPossible,
729
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
732
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
736
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
738
self.check_transport_contents(
739
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
742
# a file with no parent should fail..
743
self.assertRaises(NoSuchFile,
744
t.append_bytes, 'missing/path', 'content')
746
def test_append_multi(self):
747
t = self.get_transport()
751
t.put_bytes('a', 'diff\ncontents for\na\n'
752
'add\nsome\nmore\ncontents\n')
753
t.put_bytes('b', 'contents\nfor b\n')
755
self.assertEqual((43, 15),
756
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
('b', StringIO('some\nmore\nfor\nb\n'))]))
329
759
self.check_transport_contents(
330
760
'diff\ncontents for\na\n'
331
761
'add\nsome\nmore\ncontents\n'
727
1086
def test_list_dir(self):
728
1087
# TODO: Test list_dir, just try once, and if it throws, stop testing
729
1088
t = self.get_transport()
731
1090
if not t.listable():
732
1091
self.assertRaises(TransportNotPossible, t.list_dir, '.')
736
l = list(t.list_dir(d))
1094
def sorted_list(d, transport):
1095
l = list(transport.list_dir(d))
740
# SftpServer creates control files in the working directory
741
# so lets move down a directory to avoid those.
742
if not t.is_readonly():
748
self.assertEqual([], sorted_list(u'.'))
1099
self.assertEqual([], sorted_list('.', t))
749
1100
# c2 is precisely one letter longer than c here to test that
750
1101
# suffixing is not confused.
1102
# a%25b checks that quoting is done consistently across transports
1103
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
751
1105
if not t.is_readonly():
752
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1106
self.build_tree(tree_names, transport=t)
754
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
756
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
757
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
1108
self.build_tree(tree_names)
1111
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1113
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1114
self.assertEqual(['d', 'e'], sorted_list('c', t))
1116
# Cloning the transport produces an equivalent listing
1117
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
759
1119
if not t.is_readonly():
766
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
767
self.assertEqual([u'e'], sorted_list(u'c'))
1126
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1127
self.assertEqual(['e'], sorted_list('c', t))
769
1129
self.assertListRaises(PathError, t.list_dir, 'q')
770
1130
self.assertListRaises(PathError, t.list_dir, 'c/f')
1131
# 'a' is a file, list_dir should raise an error
771
1132
self.assertListRaises(PathError, t.list_dir, 'a')
1134
def test_list_dir_result_is_url_escaped(self):
1135
t = self.get_transport()
1136
if not t.listable():
1137
raise TestSkipped("transport not listable")
1139
if not t.is_readonly():
1140
self.build_tree(['a/', 'a/%'], transport=t)
1142
self.build_tree(['a/', 'a/%'])
1144
names = list(t.list_dir('a'))
1145
self.assertEqual(['%25'], names)
1146
self.assertIsInstance(names[0], str)
1148
def test_clone_preserve_info(self):
1149
t1 = self.get_transport()
1150
if not isinstance(t1, ConnectedTransport):
1151
raise TestSkipped("not a connected transport")
1153
t2 = t1.clone('subdir')
1154
self.assertEquals(t1._scheme, t2._scheme)
1155
self.assertEquals(t1._user, t2._user)
1156
self.assertEquals(t1._password, t2._password)
1157
self.assertEquals(t1._host, t2._host)
1158
self.assertEquals(t1._port, t2._port)
1160
def test__reuse_for(self):
1161
t = self.get_transport()
1162
if not isinstance(t, ConnectedTransport):
1163
raise TestSkipped("not a connected transport")
1165
def new_url(scheme=None, user=None, password=None,
1166
host=None, port=None, path=None):
1167
"""Build a new url from t.base changing only parts of it.
1169
Only the parameters different from None will be changed.
1171
if scheme is None: scheme = t._scheme
1172
if user is None: user = t._user
1173
if password is None: password = t._password
1174
if user is None: user = t._user
1175
if host is None: host = t._host
1176
if port is None: port = t._port
1177
if path is None: path = t._path
1178
return t._unsplit_url(scheme, user, password, host, port, path)
1180
if t._scheme == 'ftp':
1184
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1189
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1190
# passwords are not taken into account because:
1191
# - it makes no sense to have two different valid passwords for the
1193
# - _password in ConnectedTransport is intended to collect what the
1194
# user specified from the command-line and there are cases where the
1195
# new url can contain no password (if the url was built from an
1196
# existing transport.base for example)
1197
# - password are considered part of the credentials provided at
1198
# connection creation time and as such may not be present in the url
1199
# (they may be typed by the user when prompted for example)
1200
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1201
# We will not connect, we can use a invalid host
1202
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1207
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1208
# No point in trying to reuse a transport for a local URL
1209
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1211
def test_connection_sharing(self):
1212
t = self.get_transport()
1213
if not isinstance(t, ConnectedTransport):
1214
raise TestSkipped("not a connected transport")
1216
c = t.clone('subdir')
1217
# Some transports will create the connection only when needed
1218
t.has('surely_not') # Force connection
1219
self.assertIs(t._get_connection(), c._get_connection())
1221
# Temporary failure, we need to create a new dummy connection
1222
new_connection = object()
1223
t._set_connection(new_connection)
1224
# Check that both transports use the same connection
1225
self.assertIs(new_connection, t._get_connection())
1226
self.assertIs(new_connection, c._get_connection())
1228
def test_reuse_connection_for_various_paths(self):
1229
t = self.get_transport()
1230
if not isinstance(t, ConnectedTransport):
1231
raise TestSkipped("not a connected transport")
1233
t.has('surely_not') # Force connection
1234
self.assertIsNot(None, t._get_connection())
1236
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1237
self.assertIsNot(t, subdir)
1238
self.assertIs(t._get_connection(), subdir._get_connection())
1240
home = subdir._reuse_for(t.base + 'home')
1241
self.assertIs(t._get_connection(), home._get_connection())
1242
self.assertIs(subdir._get_connection(), home._get_connection())
773
1244
def test_clone(self):
774
1245
# TODO: Test that clone moves up and down the filesystem
775
1246
t1 = self.get_transport()
973
1584
if transport.is_readonly():
974
1585
file('a', 'w').write('0123456789')
976
transport.put('a', StringIO('01234567890'))
1587
transport.put_bytes('a', '01234567890')
978
1589
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
979
1590
self.assertEqual(d[0], (1, '1'))
980
1591
self.assertEqual(d[1], (9, '9'))
981
1592
self.assertEqual(d[2], (0, '0'))
982
1593
self.assertEqual(d[3], (3, '34'))
1595
def test_readv_with_adjust_for_latency(self):
1596
transport = self.get_transport()
1597
# the adjust for latency flag expands the data region returned
1598
# according to a per-transport heuristic, so testing is a little
1599
# tricky as we need more data than the largest combining that our
1600
# transports do. To accomodate this we generate random data and cross
1601
# reference the returned data with the random data. To avoid doing
1602
# multiple large random byte look ups we do several tests on the same
1604
content = osutils.rand_bytes(200*1024)
1605
content_size = len(content)
1606
if transport.is_readonly():
1607
self.build_tree_contents([('a', content)])
1609
transport.put_bytes('a', content)
1610
def check_result_data(result_vector):
1611
for item in result_vector:
1612
data_len = len(item[1])
1613
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1616
result = list(transport.readv('a', ((0, 30),),
1617
adjust_for_latency=True, upper_limit=content_size))
1618
# we expect 1 result, from 0, to something > 30
1619
self.assertEqual(1, len(result))
1620
self.assertEqual(0, result[0][0])
1621
self.assertTrue(len(result[0][1]) >= 30)
1622
check_result_data(result)
1623
# end of file corner case
1624
result = list(transport.readv('a', ((204700, 100),),
1625
adjust_for_latency=True, upper_limit=content_size))
1626
# we expect 1 result, from 204800- its length, to the end
1627
self.assertEqual(1, len(result))
1628
data_len = len(result[0][1])
1629
self.assertEqual(204800-data_len, result[0][0])
1630
self.assertTrue(data_len >= 100)
1631
check_result_data(result)
1632
# out of order ranges are made in order
1633
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1634
adjust_for_latency=True, upper_limit=content_size))
1635
# we expect 2 results, in order, start and end.
1636
self.assertEqual(2, len(result))
1638
data_len = len(result[0][1])
1639
self.assertEqual(0, result[0][0])
1640
self.assertTrue(data_len >= 30)
1642
data_len = len(result[1][1])
1643
self.assertEqual(204800-data_len, result[1][0])
1644
self.assertTrue(data_len >= 100)
1645
check_result_data(result)
1646
# close ranges get combined (even if out of order)
1647
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1648
result = list(transport.readv('a', request_vector,
1649
adjust_for_latency=True, upper_limit=content_size))
1650
self.assertEqual(1, len(result))
1651
data_len = len(result[0][1])
1652
# minimum length is from 400 to 1034 - 634
1653
self.assertTrue(data_len >= 634)
1654
# must contain the region 400 to 1034
1655
self.assertTrue(result[0][0] <= 400)
1656
self.assertTrue(result[0][0] + data_len >= 1034)
1657
check_result_data(result)
1659
def test_readv_with_adjust_for_latency_with_big_file(self):
1660
transport = self.get_transport()
1661
# test from observed failure case.
1662
if transport.is_readonly():
1663
file('a', 'w').write('a'*1024*1024)
1665
transport.put_bytes('a', 'a'*1024*1024)
1666
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1667
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1668
(465373, 800), (947422, 800)]
1669
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1670
found_items = [False]*9
1671
for pos, (start, length) in enumerate(broken_vector):
1672
# check the range is covered by the result
1673
for offset, data in results:
1674
if offset <= start and start + length <= offset + len(data):
1675
found_items[pos] = True
1676
self.assertEqual([True]*9, found_items)
1678
def test_get_with_open_write_stream_sees_all_content(self):
1679
t = self.get_transport()
1682
handle = t.open_write_stream('foo')
1685
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1689
def test_get_smart_medium(self):
1690
"""All transports must either give a smart medium, or know they can't.
1692
transport = self.get_transport()
1694
client_medium = transport.get_smart_medium()
1695
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1696
except errors.NoSmartMedium:
1697
# as long as we got it we're fine
1700
def test_readv_short_read(self):
1701
transport = self.get_transport()
1702
if transport.is_readonly():
1703
file('a', 'w').write('0123456789')
1705
transport.put_bytes('a', '01234567890')
1707
# This is intentionally reading off the end of the file
1708
# since we are sure that it cannot get there
1709
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1710
# Can be raised by paramiko
1712
transport.readv, 'a', [(1,1), (8,10)])
1714
# This is trying to seek past the end of the file, it should
1715
# also raise a special error
1716
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1717
transport.readv, 'a', [(12,2)])