20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
30
TransportNotPossible, ConnectionError,
37
from bzrlib.errors import (ConnectionError,
32
44
from bzrlib.osutils import getcwd
33
from bzrlib.tests import TestCaseInTempDir, TestSkipped
34
from bzrlib.transport import memory
35
import bzrlib.transport
36
import bzrlib.urlutils as urlutils
40
"""Append the given text (file-like object) to the supplied filename."""
48
class TestTransportImplementation(TestCaseInTempDir):
49
"""Implementation verification for transports.
51
To verify a transport we need a server factory, which is a callable
52
that accepts no parameters and returns an implementation of
53
bzrlib.transport.Server.
55
That Server is then used to construct transport instances and test
56
the transport via loopback activity.
58
Currently this assumes that the Transport object is connected to the
59
current working directory. So that whatever is done
60
through the transport, should show up in the working
61
directory, and vice-versa. This is a bug, because its possible to have
62
URL schemes which provide access to something that may not be
63
result in storage on the local disk, i.e. due to file system limits, or
64
due to it being a database or some other non-filesystem tool.
66
This also tests to make sure that the functions work with both
67
generators and lists (assuming iter(list) is effectively a generator)
45
from bzrlib.smart import medium
46
from bzrlib.tests import (
51
from bzrlib.tests import test_server
52
from bzrlib.tests.test_transport import TestTransportImplementation
53
from bzrlib.transport import (
56
_get_transport_modules,
58
from bzrlib.transport.memory import MemoryTransport
61
def get_transport_test_permutations(module):
62
"""Get the permutations module wants to have tested."""
63
if getattr(module, 'get_test_permutations', None) is None:
65
"transport module %s doesn't provide get_test_permutations()"
68
return module.get_test_permutations()
71
def transport_test_permutations():
72
"""Return a list of the klass, server_factory pairs to test."""
74
for module in _get_transport_modules():
76
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
78
for (klass, server_factory) in permutations:
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
{"transport_class":klass,
81
"transport_server":server_factory})
82
result.append(scenario)
83
except errors.DependencyNotPresent, e:
84
# Continue even if a dependency prevents us
85
# from adding this test
90
def load_tests(standard_tests, module, loader):
91
"""Multiply tests for tranport implementations."""
92
result = loader.suiteClass()
93
scenarios = transport_test_permutations()
94
return multiply_tests(standard_tests, scenarios, result)
97
class TransportTests(TestTransportImplementation):
71
super(TestTransportImplementation, self).setUp()
72
self._server = self.transport_server()
100
super(TransportTests, self).setUp()
101
self._captureVar('BZR_NO_SMART_VFS', None)
76
super(TestTransportImplementation, self).tearDown()
77
self._server.tearDown()
79
103
def check_transport_contents(self, content, transport, relpath):
80
104
"""Check that transport.get(relpath).read() == content."""
81
105
self.assertEqualDiff(content, transport.get(relpath).read())
83
def get_transport(self):
84
"""Return a connected transport to the local directory."""
85
base_url = self._server.get_url()
86
t = bzrlib.transport.get_transport(base_url)
87
if not isinstance(t, self.transport_class):
88
# we want to make sure to construct one particular class, even if
89
# there are several available implementations of this transport;
90
# therefore construct it by hand rather than through the regular
91
# get_transport method
92
t = self.transport_class(base_url)
95
def assertListRaises(self, excClass, func, *args, **kwargs):
96
"""Fail unless excClass is raised when the iterator from func is used.
98
Many transport functions can return generators this makes sure
99
to wrap them in a list() call to make sure the whole generator
100
is run, and that the proper exception is raised.
107
def test_ensure_base_missing(self):
108
""".ensure_base() should create the directory if it doesn't exist"""
109
t = self.get_transport()
111
if t_a.is_readonly():
112
self.assertRaises(TransportNotPossible,
115
self.assertTrue(t_a.ensure_base())
116
self.assertTrue(t.has('a'))
118
def test_ensure_base_exists(self):
119
""".ensure_base() should just be happy if it already exists"""
120
t = self.get_transport()
126
# ensure_base returns False if it didn't create the base
127
self.assertFalse(t_a.ensure_base())
129
def test_ensure_base_missing_parent(self):
130
""".ensure_base() will fail if the parent dir doesn't exist"""
131
t = self.get_transport()
137
self.assertRaises(NoSuchFile, t_b.ensure_base)
139
def test_external_url(self):
140
""".external_url either works or raises InProcessTransport."""
141
t = self.get_transport()
103
list(func(*args, **kwargs))
107
if hasattr(excClass,'__name__'): excName = excClass.__name__
108
else: excName = str(excClass)
109
raise self.failureException, "%s not raised" % excName
144
except errors.InProcessTransport:
111
147
def test_has(self):
112
148
t = self.get_transport()
137
188
self.build_tree(files, transport=t, line_endings='binary')
138
189
self.check_transport_contents('contents of a\n', t, 'a')
139
190
content_f = t.get_multi(files)
140
for content, f in zip(contents, content_f):
191
# Use itertools.izip() instead of use zip() or map(), since they fully
192
# evaluate their inputs, the transport requests should be issued and
193
# handled sequentially (we don't want to force transport to buffer).
194
for content, f in itertools.izip(contents, content_f):
141
195
self.assertEqual(content, f.read())
143
197
content_f = t.get_multi(iter(files))
144
for content, f in zip(contents, content_f):
198
# Use itertools.izip() for the same reason
199
for content, f in itertools.izip(contents, content_f):
145
200
self.assertEqual(content, f.read())
202
def test_get_unknown_file(self):
203
t = self.get_transport()
205
contents = ['contents of a\n',
208
self.build_tree(files, transport=t, line_endings='binary')
147
209
self.assertRaises(NoSuchFile, t.get, 'c')
148
210
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
149
211
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
152
t = self.get_transport()
155
self.assertRaises(TransportNotPossible,
156
t.put, 'a', 'some text for a\n')
159
t.put('a', StringIO('some text for a\n'))
160
self.failUnless(t.has('a'))
161
self.check_transport_contents('some text for a\n', t, 'a')
162
# Make sure 'has' is updated
163
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
164
[True, False, False, False, False])
165
# Put also replaces contents
166
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
167
('d', StringIO('contents\nfor d\n'))]),
169
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
170
[True, False, False, True, False])
171
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
172
self.check_transport_contents('contents\nfor d\n', t, 'd')
175
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
176
('d', StringIO('another contents\nfor d\n'))])),
178
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
179
self.check_transport_contents('another contents\nfor d\n', t, 'd')
181
self.assertRaises(NoSuchFile,
182
t.put, 'path/doesnt/exist/c', 'contents')
184
def test_put_permissions(self):
185
t = self.get_transport()
189
if not t._can_roundtrip_unix_modebits():
190
# Can't roundtrip, so no need to run this test
192
t.put('mode644', StringIO('test text\n'), mode=0644)
193
self.assertTransportMode(t, 'mode644', 0644)
194
t.put('mode666', StringIO('test text\n'), mode=0666)
195
self.assertTransportMode(t, 'mode666', 0666)
196
t.put('mode600', StringIO('test text\n'), mode=0600)
213
def test_get_directory_read_gives_ReadError(self):
214
"""consistent errors for read() on a file returned by get()."""
215
t = self.get_transport()
217
self.build_tree(['a directory/'])
219
t.mkdir('a%20directory')
220
# getting the file must either work or fail with a PathError
222
a_file = t.get('a%20directory')
223
except (errors.PathError, errors.RedirectRequested):
224
# early failure return immediately.
226
# having got a file, read() must either work (i.e. http reading a dir
227
# listing) or fail with ReadError
230
except errors.ReadError:
233
def test_get_bytes(self):
234
t = self.get_transport()
236
files = ['a', 'b', 'e', 'g']
237
contents = ['contents of a\n',
242
self.build_tree(files, transport=t, line_endings='binary')
243
self.check_transport_contents('contents of a\n', t, 'a')
245
for content, fname in zip(contents, files):
246
self.assertEqual(content, t.get_bytes(fname))
248
def test_get_bytes_unknown_file(self):
249
t = self.get_transport()
250
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
252
def test_get_with_open_write_stream_sees_all_content(self):
253
t = self.get_transport()
256
handle = t.open_write_stream('foo')
259
self.assertEqual('b', t.get('foo').read())
263
def test_get_bytes_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
271
self.assertEqual('b', t.get('foo').read())
275
def test_put_bytes(self):
276
t = self.get_transport()
279
self.assertRaises(TransportNotPossible,
280
t.put_bytes, 'a', 'some text for a\n')
283
t.put_bytes('a', 'some text for a\n')
284
self.failUnless(t.has('a'))
285
self.check_transport_contents('some text for a\n', t, 'a')
287
# The contents should be overwritten
288
t.put_bytes('a', 'new text for a\n')
289
self.check_transport_contents('new text for a\n', t, 'a')
291
self.assertRaises(NoSuchFile,
292
t.put_bytes, 'path/doesnt/exist/c', 'contents')
294
def test_put_bytes_non_atomic(self):
295
t = self.get_transport()
298
self.assertRaises(TransportNotPossible,
299
t.put_bytes_non_atomic, 'a', 'some text for a\n')
302
self.failIf(t.has('a'))
303
t.put_bytes_non_atomic('a', 'some text for a\n')
304
self.failUnless(t.has('a'))
305
self.check_transport_contents('some text for a\n', t, 'a')
306
# Put also replaces contents
307
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
308
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
310
# Make sure we can create another file
311
t.put_bytes_non_atomic('d', 'contents for\nd\n')
312
# And overwrite 'a' with empty contents
313
t.put_bytes_non_atomic('a', '')
314
self.check_transport_contents('contents for\nd\n', t, 'd')
315
self.check_transport_contents('', t, 'a')
317
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
319
# Now test the create_parent flag
320
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
322
self.failIf(t.has('dir/a'))
323
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
324
create_parent_dir=True)
325
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
327
# But we still get NoSuchFile if we can't make the parent dir
328
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
330
create_parent_dir=True)
332
def test_put_bytes_permissions(self):
333
t = self.get_transport()
337
if not t._can_roundtrip_unix_modebits():
338
# Can't roundtrip, so no need to run this test
340
t.put_bytes('mode644', 'test text\n', mode=0644)
341
self.assertTransportMode(t, 'mode644', 0644)
342
t.put_bytes('mode666', 'test text\n', mode=0666)
343
self.assertTransportMode(t, 'mode666', 0666)
344
t.put_bytes('mode600', 'test text\n', mode=0600)
345
self.assertTransportMode(t, 'mode600', 0600)
346
# Yes, you can put_bytes a file such that it becomes readonly
347
t.put_bytes('mode400', 'test text\n', mode=0400)
348
self.assertTransportMode(t, 'mode400', 0400)
350
# The default permissions should be based on the current umask
351
umask = osutils.get_umask()
352
t.put_bytes('nomode', 'test text\n', mode=None)
353
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
355
def test_put_bytes_non_atomic_permissions(self):
356
t = self.get_transport()
360
if not t._can_roundtrip_unix_modebits():
361
# Can't roundtrip, so no need to run this test
363
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
364
self.assertTransportMode(t, 'mode644', 0644)
365
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
366
self.assertTransportMode(t, 'mode666', 0666)
367
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
368
self.assertTransportMode(t, 'mode600', 0600)
369
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
370
self.assertTransportMode(t, 'mode400', 0400)
372
# The default permissions should be based on the current umask
373
umask = osutils.get_umask()
374
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
375
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
377
# We should also be able to set the mode for a parent directory
379
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
380
dir_mode=0700, create_parent_dir=True)
381
self.assertTransportMode(t, 'dir700', 0700)
382
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
383
dir_mode=0770, create_parent_dir=True)
384
self.assertTransportMode(t, 'dir770', 0770)
385
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
386
dir_mode=0777, create_parent_dir=True)
387
self.assertTransportMode(t, 'dir777', 0777)
389
def test_put_file(self):
390
t = self.get_transport()
393
self.assertRaises(TransportNotPossible,
394
t.put_file, 'a', StringIO('some text for a\n'))
397
result = t.put_file('a', StringIO('some text for a\n'))
398
# put_file returns the length of the data written
399
self.assertEqual(16, result)
400
self.failUnless(t.has('a'))
401
self.check_transport_contents('some text for a\n', t, 'a')
402
# Put also replaces contents
403
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
404
self.assertEqual(19, result)
405
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
406
self.assertRaises(NoSuchFile,
407
t.put_file, 'path/doesnt/exist/c',
408
StringIO('contents'))
410
def test_put_file_non_atomic(self):
411
t = self.get_transport()
414
self.assertRaises(TransportNotPossible,
415
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
418
self.failIf(t.has('a'))
419
t.put_file_non_atomic('a', StringIO('some text for a\n'))
420
self.failUnless(t.has('a'))
421
self.check_transport_contents('some text for a\n', t, 'a')
422
# Put also replaces contents
423
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
424
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
426
# Make sure we can create another file
427
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
428
# And overwrite 'a' with empty contents
429
t.put_file_non_atomic('a', StringIO(''))
430
self.check_transport_contents('contents for\nd\n', t, 'd')
431
self.check_transport_contents('', t, 'a')
433
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
434
StringIO('contents\n'))
435
# Now test the create_parent flag
436
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
437
StringIO('contents\n'))
438
self.failIf(t.has('dir/a'))
439
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
440
create_parent_dir=True)
441
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
443
# But we still get NoSuchFile if we can't make the parent dir
444
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
445
StringIO('contents\n'),
446
create_parent_dir=True)
448
def test_put_file_permissions(self):
450
t = self.get_transport()
454
if not t._can_roundtrip_unix_modebits():
455
# Can't roundtrip, so no need to run this test
457
t.put_file('mode644', StringIO('test text\n'), mode=0644)
458
self.assertTransportMode(t, 'mode644', 0644)
459
t.put_file('mode666', StringIO('test text\n'), mode=0666)
460
self.assertTransportMode(t, 'mode666', 0666)
461
t.put_file('mode600', StringIO('test text\n'), mode=0600)
197
462
self.assertTransportMode(t, 'mode600', 0600)
198
463
# Yes, you can put a file such that it becomes readonly
199
t.put('mode400', StringIO('test text\n'), mode=0400)
200
self.assertTransportMode(t, 'mode400', 0400)
201
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
202
self.assertTransportMode(t, 'mmode644', 0644)
464
t.put_file('mode400', StringIO('test text\n'), mode=0400)
465
self.assertTransportMode(t, 'mode400', 0400)
466
# The default permissions should be based on the current umask
467
umask = osutils.get_umask()
468
t.put_file('nomode', StringIO('test text\n'), mode=None)
469
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
471
def test_put_file_non_atomic_permissions(self):
472
t = self.get_transport()
476
if not t._can_roundtrip_unix_modebits():
477
# Can't roundtrip, so no need to run this test
479
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
480
self.assertTransportMode(t, 'mode644', 0644)
481
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
482
self.assertTransportMode(t, 'mode666', 0666)
483
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
484
self.assertTransportMode(t, 'mode600', 0600)
485
# Yes, you can put_file_non_atomic a file such that it becomes readonly
486
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
487
self.assertTransportMode(t, 'mode400', 0400)
489
# The default permissions should be based on the current umask
490
umask = osutils.get_umask()
491
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
492
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
494
# We should also be able to set the mode for a parent directory
497
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
498
dir_mode=0700, create_parent_dir=True)
499
self.assertTransportMode(t, 'dir700', 0700)
500
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
501
dir_mode=0770, create_parent_dir=True)
502
self.assertTransportMode(t, 'dir770', 0770)
503
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
504
dir_mode=0777, create_parent_dir=True)
505
self.assertTransportMode(t, 'dir777', 0777)
507
def test_put_bytes_unicode(self):
508
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
509
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
510
# (we don't want to encode unicode here at all, callers should be
511
# strictly passing bytes to put_bytes), but we allow it for backwards
512
# compatibility. At some point we should use a specific exception.
513
# See https://bugs.launchpad.net/bzr/+bug/106898.
514
t = self.get_transport()
517
unicode_string = u'\u1234'
519
(AssertionError, UnicodeEncodeError),
520
t.put_bytes, 'foo', unicode_string)
522
def test_put_file_unicode(self):
523
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
524
# This situation can happen (and has) if code is careless about the type
525
# of "string" they initialise/write to a StringIO with. We cannot use
526
# cStringIO, because it never returns unicode from read.
527
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
528
# raise, but we raise it for hysterical raisins.
529
t = self.get_transport()
532
unicode_file = pyStringIO(u'\u1234')
533
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
204
535
def test_mkdir(self):
205
536
t = self.get_transport()
207
538
if t.is_readonly():
208
# cannot mkdir on readonly transports. We're not testing for
539
# cannot mkdir on readonly transports. We're not testing for
209
540
# cache coherency because cache behaviour is not currently
210
541
# defined for the transport interface.
211
542
self.assertRaises(TransportNotPossible, t.mkdir, '.')
320
680
self.assertTransportMode(temp_transport, f, mode)
322
def test_append(self):
323
t = self.get_transport()
326
open('a', 'wb').write('diff\ncontents for\na\n')
327
open('b', 'wb').write('contents\nfor b\n')
330
('a', StringIO('diff\ncontents for\na\n')),
331
('b', StringIO('contents\nfor b\n'))
335
self.assertRaises(TransportNotPossible,
336
t.append, 'a', 'add\nsome\nmore\ncontents\n')
337
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
340
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
342
self.check_transport_contents(
343
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
347
self.assertRaises(TransportNotPossible,
349
[('a', 'and\nthen\nsome\nmore\n'),
350
('b', 'some\nmore\nfor\nb\n')])
351
_append('a', StringIO('and\nthen\nsome\nmore\n'))
352
_append('b', StringIO('some\nmore\nfor\nb\n'))
354
self.assertEqual((43, 15),
355
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
356
('b', StringIO('some\nmore\nfor\nb\n'))]))
682
def test_create_prefix(self):
683
t = self.get_transport()
684
sub = t.clone('foo').clone('bar')
687
except TransportNotPossible:
688
self.assertTrue(t.is_readonly())
690
self.assertTrue(t.has('foo/bar'))
692
def test_append_file(self):
693
t = self.get_transport()
696
self.assertRaises(TransportNotPossible,
697
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
699
t.put_bytes('a', 'diff\ncontents for\na\n')
700
t.put_bytes('b', 'contents\nfor b\n')
703
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
705
self.check_transport_contents(
706
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
709
# a file with no parent should fail..
710
self.assertRaises(NoSuchFile,
711
t.append_file, 'missing/path', StringIO('content'))
713
# And we can create new files, too
715
t.append_file('c', StringIO('some text\nfor a missing file\n')))
716
self.check_transport_contents('some text\nfor a missing file\n',
719
def test_append_bytes(self):
720
t = self.get_transport()
723
self.assertRaises(TransportNotPossible,
724
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
727
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
728
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
731
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
733
self.check_transport_contents(
734
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
737
# a file with no parent should fail..
738
self.assertRaises(NoSuchFile,
739
t.append_bytes, 'missing/path', 'content')
741
def test_append_multi(self):
742
t = self.get_transport()
746
t.put_bytes('a', 'diff\ncontents for\na\n'
747
'add\nsome\nmore\ncontents\n')
748
t.put_bytes('b', 'contents\nfor b\n')
750
self.assertEqual((43, 15),
751
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
752
('b', StringIO('some\nmore\nfor\nb\n'))]))
357
754
self.check_transport_contents(
358
755
'diff\ncontents for\na\n'
359
756
'add\nsome\nmore\ncontents\n'
752
1078
subdir.stat('./file')
753
1079
subdir.stat('.')
1081
def test_hardlink(self):
1082
from stat import ST_NLINK
1084
t = self.get_transport()
1086
source_name = "original_target"
1087
link_name = "target_link"
1089
self.build_tree([source_name], transport=t)
1092
t.hardlink(source_name, link_name)
1094
self.failUnless(t.has(source_name))
1095
self.failUnless(t.has(link_name))
1097
st = t.stat(link_name)
1098
self.failUnlessEqual(st[ST_NLINK], 2)
1099
except TransportNotPossible:
1100
raise TestSkipped("Transport %s does not support hardlinks." %
1101
self._server.__class__)
1103
def test_symlink(self):
1104
from stat import S_ISLNK
1106
t = self.get_transport()
1108
source_name = "original_target"
1109
link_name = "target_link"
1111
self.build_tree([source_name], transport=t)
1114
t.symlink(source_name, link_name)
1116
self.failUnless(t.has(source_name))
1117
self.failUnless(t.has(link_name))
1119
st = t.stat(link_name)
1120
self.failUnless(S_ISLNK(st.st_mode),
1121
"expected symlink, got mode %o" % st.st_mode)
1122
except TransportNotPossible:
1123
raise TestSkipped("Transport %s does not support symlinks." %
1124
self._server.__class__)
1126
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
755
1128
def test_list_dir(self):
756
1129
# TODO: Test list_dir, just try once, and if it throws, stop testing
757
1130
t = self.get_transport()
759
1132
if not t.listable():
760
1133
self.assertRaises(TransportNotPossible, t.list_dir, '.')
764
l = list(t.list_dir(d))
1136
def sorted_list(d, transport):
1137
l = list(transport.list_dir(d))
768
# SftpServer creates control files in the working directory
769
# so lets move down a directory to avoid those.
770
if not t.is_readonly():
776
self.assertEqual([], sorted_list(u'.'))
1141
self.assertEqual([], sorted_list('.', t))
777
1142
# c2 is precisely one letter longer than c here to test that
778
1143
# suffixing is not confused.
1144
# a%25b checks that quoting is done consistently across transports
1145
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
779
1147
if not t.is_readonly():
780
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1148
self.build_tree(tree_names, transport=t)
782
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
784
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
785
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
1150
self.build_tree(tree_names)
1153
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1155
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1156
self.assertEqual(['d', 'e'], sorted_list('c', t))
1158
# Cloning the transport produces an equivalent listing
1159
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
787
1161
if not t.is_readonly():
794
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
795
self.assertEqual([u'e'], sorted_list(u'c'))
1168
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1169
self.assertEqual(['e'], sorted_list('c', t))
797
1171
self.assertListRaises(PathError, t.list_dir, 'q')
798
1172
self.assertListRaises(PathError, t.list_dir, 'c/f')
1173
# 'a' is a file, list_dir should raise an error
799
1174
self.assertListRaises(PathError, t.list_dir, 'a')
1176
def test_list_dir_result_is_url_escaped(self):
1177
t = self.get_transport()
1178
if not t.listable():
1179
raise TestSkipped("transport not listable")
1181
if not t.is_readonly():
1182
self.build_tree(['a/', 'a/%'], transport=t)
1184
self.build_tree(['a/', 'a/%'])
1186
names = list(t.list_dir('a'))
1187
self.assertEqual(['%25'], names)
1188
self.assertIsInstance(names[0], str)
1190
def test_clone_preserve_info(self):
1191
t1 = self.get_transport()
1192
if not isinstance(t1, ConnectedTransport):
1193
raise TestSkipped("not a connected transport")
1195
t2 = t1.clone('subdir')
1196
self.assertEquals(t1._scheme, t2._scheme)
1197
self.assertEquals(t1._user, t2._user)
1198
self.assertEquals(t1._password, t2._password)
1199
self.assertEquals(t1._host, t2._host)
1200
self.assertEquals(t1._port, t2._port)
1202
def test__reuse_for(self):
1203
t = self.get_transport()
1204
if not isinstance(t, ConnectedTransport):
1205
raise TestSkipped("not a connected transport")
1207
def new_url(scheme=None, user=None, password=None,
1208
host=None, port=None, path=None):
1209
"""Build a new url from t.base changing only parts of it.
1211
Only the parameters different from None will be changed.
1213
if scheme is None: scheme = t._scheme
1214
if user is None: user = t._user
1215
if password is None: password = t._password
1216
if user is None: user = t._user
1217
if host is None: host = t._host
1218
if port is None: port = t._port
1219
if path is None: path = t._path
1220
return t._unsplit_url(scheme, user, password, host, port, path)
1222
if t._scheme == 'ftp':
1226
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1232
# passwords are not taken into account because:
1233
# - it makes no sense to have two different valid passwords for the
1235
# - _password in ConnectedTransport is intended to collect what the
1236
# user specified from the command-line and there are cases where the
1237
# new url can contain no password (if the url was built from an
1238
# existing transport.base for example)
1239
# - password are considered part of the credentials provided at
1240
# connection creation time and as such may not be present in the url
1241
# (they may be typed by the user when prompted for example)
1242
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1243
# We will not connect, we can use a invalid host
1244
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1250
# No point in trying to reuse a transport for a local URL
1251
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1253
def test_connection_sharing(self):
1254
t = self.get_transport()
1255
if not isinstance(t, ConnectedTransport):
1256
raise TestSkipped("not a connected transport")
1258
c = t.clone('subdir')
1259
# Some transports will create the connection only when needed
1260
t.has('surely_not') # Force connection
1261
self.assertIs(t._get_connection(), c._get_connection())
1263
# Temporary failure, we need to create a new dummy connection
1264
new_connection = None
1265
t._set_connection(new_connection)
1266
# Check that both transports use the same connection
1267
self.assertIs(new_connection, t._get_connection())
1268
self.assertIs(new_connection, c._get_connection())
1270
def test_reuse_connection_for_various_paths(self):
1271
t = self.get_transport()
1272
if not isinstance(t, ConnectedTransport):
1273
raise TestSkipped("not a connected transport")
1275
t.has('surely_not') # Force connection
1276
self.assertIsNot(None, t._get_connection())
1278
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1279
self.assertIsNot(t, subdir)
1280
self.assertIs(t._get_connection(), subdir._get_connection())
1282
home = subdir._reuse_for(t.base + 'home')
1283
self.assertIs(t._get_connection(), home._get_connection())
1284
self.assertIs(subdir._get_connection(), home._get_connection())
801
1286
def test_clone(self):
802
1287
# TODO: Test that clone moves up and down the filesystem
803
1288
t1 = self.get_transport()
988
1614
if transport.is_readonly():
989
1615
file('a', 'w').write('0123456789')
991
transport.put('a', StringIO('01234567890'))
1617
transport.put_bytes('a', '0123456789')
1619
d = list(transport.readv('a', ((0, 1),)))
1620
self.assertEqual(d[0], (0, '0'))
993
1622
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
994
1623
self.assertEqual(d[0], (0, '0'))
995
1624
self.assertEqual(d[1], (1, '1'))
996
1625
self.assertEqual(d[2], (3, '34'))
997
1626
self.assertEqual(d[3], (9, '9'))
1628
def test_readv_out_of_order(self):
1629
transport = self.get_transport()
1630
if transport.is_readonly():
1631
file('a', 'w').write('0123456789')
1633
transport.put_bytes('a', '01234567890')
1635
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1636
self.assertEqual(d[0], (1, '1'))
1637
self.assertEqual(d[1], (9, '9'))
1638
self.assertEqual(d[2], (0, '0'))
1639
self.assertEqual(d[3], (3, '34'))
1641
def test_readv_with_adjust_for_latency(self):
1642
transport = self.get_transport()
1643
# the adjust for latency flag expands the data region returned
1644
# according to a per-transport heuristic, so testing is a little
1645
# tricky as we need more data than the largest combining that our
1646
# transports do. To accomodate this we generate random data and cross
1647
# reference the returned data with the random data. To avoid doing
1648
# multiple large random byte look ups we do several tests on the same
1650
content = osutils.rand_bytes(200*1024)
1651
content_size = len(content)
1652
if transport.is_readonly():
1653
self.build_tree_contents([('a', content)])
1655
transport.put_bytes('a', content)
1656
def check_result_data(result_vector):
1657
for item in result_vector:
1658
data_len = len(item[1])
1659
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1662
result = list(transport.readv('a', ((0, 30),),
1663
adjust_for_latency=True, upper_limit=content_size))
1664
# we expect 1 result, from 0, to something > 30
1665
self.assertEqual(1, len(result))
1666
self.assertEqual(0, result[0][0])
1667
self.assertTrue(len(result[0][1]) >= 30)
1668
check_result_data(result)
1669
# end of file corner case
1670
result = list(transport.readv('a', ((204700, 100),),
1671
adjust_for_latency=True, upper_limit=content_size))
1672
# we expect 1 result, from 204800- its length, to the end
1673
self.assertEqual(1, len(result))
1674
data_len = len(result[0][1])
1675
self.assertEqual(204800-data_len, result[0][0])
1676
self.assertTrue(data_len >= 100)
1677
check_result_data(result)
1678
# out of order ranges are made in order
1679
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1680
adjust_for_latency=True, upper_limit=content_size))
1681
# we expect 2 results, in order, start and end.
1682
self.assertEqual(2, len(result))
1684
data_len = len(result[0][1])
1685
self.assertEqual(0, result[0][0])
1686
self.assertTrue(data_len >= 30)
1688
data_len = len(result[1][1])
1689
self.assertEqual(204800-data_len, result[1][0])
1690
self.assertTrue(data_len >= 100)
1691
check_result_data(result)
1692
# close ranges get combined (even if out of order)
1693
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1694
result = list(transport.readv('a', request_vector,
1695
adjust_for_latency=True, upper_limit=content_size))
1696
self.assertEqual(1, len(result))
1697
data_len = len(result[0][1])
1698
# minimum length is from 400 to 1034 - 634
1699
self.assertTrue(data_len >= 634)
1700
# must contain the region 400 to 1034
1701
self.assertTrue(result[0][0] <= 400)
1702
self.assertTrue(result[0][0] + data_len >= 1034)
1703
check_result_data(result)
1705
def test_readv_with_adjust_for_latency_with_big_file(self):
1706
transport = self.get_transport()
1707
# test from observed failure case.
1708
if transport.is_readonly():
1709
file('a', 'w').write('a'*1024*1024)
1711
transport.put_bytes('a', 'a'*1024*1024)
1712
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1713
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1714
(465373, 800), (947422, 800)]
1715
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1716
found_items = [False]*9
1717
for pos, (start, length) in enumerate(broken_vector):
1718
# check the range is covered by the result
1719
for offset, data in results:
1720
if offset <= start and start + length <= offset + len(data):
1721
found_items[pos] = True
1722
self.assertEqual([True]*9, found_items)
1724
def test_get_with_open_write_stream_sees_all_content(self):
1725
t = self.get_transport()
1728
handle = t.open_write_stream('foo')
1731
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1735
def test_get_smart_medium(self):
1736
"""All transports must either give a smart medium, or know they can't.
1738
transport = self.get_transport()
1740
client_medium = transport.get_smart_medium()
1741
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1742
except errors.NoSmartMedium:
1743
# as long as we got it we're fine
1746
def test_readv_short_read(self):
1747
transport = self.get_transport()
1748
if transport.is_readonly():
1749
file('a', 'w').write('0123456789')
1751
transport.put_bytes('a', '01234567890')
1753
# This is intentionally reading off the end of the file
1754
# since we are sure that it cannot get there
1755
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1756
# Can be raised by paramiko
1758
transport.readv, 'a', [(1,1), (8,10)])
1760
# This is trying to seek past the end of the file, it should
1761
# also raise a special error
1762
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1763
transport.readv, 'a', [(12,2)])
1765
def test_stat_symlink(self):
1766
# if a transport points directly to a symlink (and supports symlinks
1767
# at all) you can tell this. helps with bug 32669.
1768
t = self.get_transport()
1770
t.symlink('target', 'link')
1771
except TransportNotPossible:
1772
raise TestSkipped("symlinks not supported")
1773
t2 = t.clone('link')
1775
self.assertTrue(stat.S_ISLNK(st.st_mode))