20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
30
from bzrlib import (
35
transport as _mod_transport,
32
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
TransportNotPossible, ConnectionError,
38
from bzrlib.errors import (ConnectionError,
36
45
from bzrlib.osutils import getcwd
37
from bzrlib.tests import TestCaseInTempDir, TestSkipped
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
38
53
from bzrlib.tests.test_transport import TestTransportImplementation
39
from bzrlib.transport import memory
40
import bzrlib.transport
44
"""Append the given text (file-like object) to the supplied filename."""
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
52
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
54
105
def check_transport_contents(self, content, transport, relpath):
55
"""Check that transport.get(relpath).read() == content."""
56
self.assertEqualDiff(content, transport.get(relpath).read())
58
def assertListRaises(self, excClass, func, *args, **kwargs):
59
"""Fail unless excClass is raised when the iterator from func is used.
61
Many transport functions can return generators this makes sure
62
to wrap them in a list() call to make sure the whole generator
63
is run, and that the proper exception is raised.
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
66
list(func(*args, **kwargs))
70
if hasattr(excClass,'__name__'): excName = excClass.__name__
71
else: excName = str(excClass)
72
raise self.failureException, "%s not raised" % excName
146
except errors.InProcessTransport:
74
149
def test_has(self):
75
150
t = self.get_transport()
100
190
self.build_tree(files, transport=t, line_endings='binary')
101
191
self.check_transport_contents('contents of a\n', t, 'a')
102
192
content_f = t.get_multi(files)
103
for content, f in zip(contents, content_f):
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
104
197
self.assertEqual(content, f.read())
106
199
content_f = t.get_multi(iter(files))
107
for content, f in zip(contents, content_f):
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
108
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
110
211
self.assertRaises(NoSuchFile, t.get, 'c')
111
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
112
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
115
t = self.get_transport()
118
self.assertRaises(TransportNotPossible,
119
t.put, 'a', 'some text for a\n')
122
t.put('a', StringIO('some text for a\n'))
123
self.failUnless(t.has('a'))
124
self.check_transport_contents('some text for a\n', t, 'a')
125
# Make sure 'has' is updated
126
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
127
[True, False, False, False, False])
128
# Put also replaces contents
129
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
130
('d', StringIO('contents\nfor d\n'))]),
132
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
133
[True, False, False, True, False])
134
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
135
self.check_transport_contents('contents\nfor d\n', t, 'd')
138
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
139
('d', StringIO('another contents\nfor d\n'))])),
141
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
142
self.check_transport_contents('another contents\nfor d\n', t, 'd')
144
self.assertRaises(NoSuchFile,
145
t.put, 'path/doesnt/exist/c', 'contents')
147
def test_put_permissions(self):
148
t = self.get_transport()
152
if not t._can_roundtrip_unix_modebits():
153
# Can't roundtrip, so no need to run this test
155
t.put('mode644', StringIO('test text\n'), mode=0644)
156
self.assertTransportMode(t, 'mode644', 0644)
157
t.put('mode666', StringIO('test text\n'), mode=0666)
158
self.assertTransportMode(t, 'mode666', 0666)
159
t.put('mode600', StringIO('test text\n'), mode=0600)
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
244
def test_get_bytes(self):
245
t = self.get_transport()
247
files = ['a', 'b', 'e', 'g']
248
contents = ['contents of a\n',
253
self.build_tree(files, transport=t, line_endings='binary')
254
self.check_transport_contents('contents of a\n', t, 'a')
256
for content, fname in zip(contents, files):
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
290
def test_put_bytes(self):
291
t = self.get_transport()
294
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
299
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
302
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
306
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
309
def test_put_bytes_non_atomic(self):
310
t = self.get_transport()
313
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
317
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
319
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
321
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
325
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
# Now test the create_parent flag
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
# But we still get NoSuchFile if we can't make the parent dir
343
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
347
def test_put_bytes_permissions(self):
348
t = self.get_transport()
352
if not t._can_roundtrip_unix_modebits():
353
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0644)
356
self.assertTransportMode(t, 'mode644', 0644)
357
t.put_bytes('mode666', 'test text\n', mode=0666)
358
self.assertTransportMode(t, 'mode666', 0666)
359
t.put_bytes('mode600', 'test text\n', mode=0600)
360
self.assertTransportMode(t, 'mode600', 0600)
361
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0400)
363
self.assertTransportMode(t, 'mode400', 0400)
365
# The default permissions should be based on the current umask
366
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
368
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
370
def test_put_bytes_non_atomic_permissions(self):
371
t = self.get_transport()
375
if not t._can_roundtrip_unix_modebits():
376
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
self.assertTransportMode(t, 'mode644', 0644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
self.assertTransportMode(t, 'mode666', 0666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
self.assertTransportMode(t, 'mode600', 0600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
self.assertTransportMode(t, 'mode400', 0400)
387
# The default permissions should be based on the current umask
388
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
392
# We should also be able to set the mode for a parent directory
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
dir_mode=0700, create_parent_dir=True)
396
self.assertTransportMode(t, 'dir700', 0700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
dir_mode=0770, create_parent_dir=True)
399
self.assertTransportMode(t, 'dir770', 0770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
dir_mode=0777, create_parent_dir=True)
402
self.assertTransportMode(t, 'dir777', 0777)
404
def test_put_file(self):
405
t = self.get_transport()
408
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', StringIO('some text for a\n'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
417
# Put also replaces contents
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
self.assertRaises(NoSuchFile,
422
t.put_file, 'path/doesnt/exist/c',
423
StringIO('contents'))
425
def test_put_file_non_atomic(self):
426
t = self.get_transport()
429
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
433
self.assertFalse(t.has('a'))
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
437
# Put also replaces contents
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
441
# Make sure we can create another file
442
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
# And overwrite 'a' with empty contents
444
t.put_file_non_atomic('a', StringIO(''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
StringIO('contents\n'))
450
# Now test the create_parent flag
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
StringIO('contents\n'))
453
self.assertFalse(t.has('dir/a'))
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
458
# But we still get NoSuchFile if we can't make the parent dir
459
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
StringIO('contents\n'),
461
create_parent_dir=True)
463
def test_put_file_permissions(self):
465
t = self.get_transport()
469
if not t._can_roundtrip_unix_modebits():
470
# Can't roundtrip, so no need to run this test
472
t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
self.assertTransportMode(t, 'mode644', 0644)
474
t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
self.assertTransportMode(t, 'mode666', 0666)
476
t.put_file('mode600', StringIO('test text\n'), mode=0600)
160
477
self.assertTransportMode(t, 'mode600', 0600)
161
478
# Yes, you can put a file such that it becomes readonly
162
t.put('mode400', StringIO('test text\n'), mode=0400)
163
self.assertTransportMode(t, 'mode400', 0400)
164
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
165
self.assertTransportMode(t, 'mmode644', 0644)
167
# The default permissions should be based on the current umask
168
umask = osutils.get_umask()
169
t.put('nomode', StringIO('test text\n'), mode=None)
170
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
479
t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
self.assertTransportMode(t, 'mode400', 0400)
481
# The default permissions should be based on the current umask
482
umask = osutils.get_umask()
483
t.put_file('nomode', StringIO('test text\n'), mode=None)
484
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
486
def test_put_file_non_atomic_permissions(self):
487
t = self.get_transport()
491
if not t._can_roundtrip_unix_modebits():
492
# Can't roundtrip, so no need to run this test
494
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
self.assertTransportMode(t, 'mode644', 0644)
496
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
self.assertTransportMode(t, 'mode666', 0666)
498
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
self.assertTransportMode(t, 'mode600', 0600)
500
# Yes, you can put_file_non_atomic a file such that it becomes readonly
501
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
self.assertTransportMode(t, 'mode400', 0400)
504
# The default permissions should be based on the current umask
505
umask = osutils.get_umask()
506
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
509
# We should also be able to set the mode for a parent directory
512
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
dir_mode=0700, create_parent_dir=True)
514
self.assertTransportMode(t, 'dir700', 0700)
515
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
dir_mode=0770, create_parent_dir=True)
517
self.assertTransportMode(t, 'dir770', 0770)
518
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
dir_mode=0777, create_parent_dir=True)
520
self.assertTransportMode(t, 'dir777', 0777)
522
def test_put_bytes_unicode(self):
523
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
525
# (we don't want to encode unicode here at all, callers should be
526
# strictly passing bytes to put_bytes), but we allow it for backwards
527
# compatibility. At some point we should use a specific exception.
528
# See https://bugs.launchpad.net/bzr/+bug/106898.
529
t = self.get_transport()
532
unicode_string = u'\u1234'
534
(AssertionError, UnicodeEncodeError),
535
t.put_bytes, 'foo', unicode_string)
537
def test_put_file_unicode(self):
538
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
# This situation can happen (and has) if code is careless about the type
540
# of "string" they initialise/write to a StringIO with. We cannot use
541
# cStringIO, because it never returns unicode from read.
542
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
# raise, but we raise it for hysterical raisins.
544
t = self.get_transport()
547
unicode_file = pyStringIO(u'\u1234')
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
172
550
def test_mkdir(self):
173
551
t = self.get_transport()
175
553
if t.is_readonly():
176
# cannot mkdir on readonly transports. We're not testing for
554
# cannot mkdir on readonly transports. We're not testing for
177
555
# cache coherency because cache behaviour is not currently
178
556
# defined for the transport interface.
179
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
292
695
self.assertTransportMode(temp_transport, f, mode)
294
def test_append(self):
295
t = self.get_transport()
298
open('a', 'wb').write('diff\ncontents for\na\n')
299
open('b', 'wb').write('contents\nfor b\n')
302
('a', StringIO('diff\ncontents for\na\n')),
303
('b', StringIO('contents\nfor b\n'))
307
self.assertRaises(TransportNotPossible,
308
t.append, 'a', 'add\nsome\nmore\ncontents\n')
309
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
312
t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
314
self.check_transport_contents(
315
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
319
self.assertRaises(TransportNotPossible,
321
[('a', 'and\nthen\nsome\nmore\n'),
322
('b', 'some\nmore\nfor\nb\n')])
323
_append('a', StringIO('and\nthen\nsome\nmore\n'))
324
_append('b', StringIO('some\nmore\nfor\nb\n'))
326
self.assertEqual((43, 15),
327
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
328
('b', StringIO('some\nmore\nfor\nb\n'))]))
697
def test_create_prefix(self):
698
t = self.get_transport()
699
sub = t.clone('foo').clone('bar')
702
except TransportNotPossible:
703
self.assertTrue(t.is_readonly())
705
self.assertTrue(t.has('foo/bar'))
707
def test_append_file(self):
708
t = self.get_transport()
711
self.assertRaises(TransportNotPossible,
712
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
714
t.put_bytes('a', 'diff\ncontents for\na\n')
715
t.put_bytes('b', 'contents\nfor b\n')
718
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
720
self.check_transport_contents(
721
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
724
# a file with no parent should fail..
725
self.assertRaises(NoSuchFile,
726
t.append_file, 'missing/path', StringIO('content'))
728
# And we can create new files, too
730
t.append_file('c', StringIO('some text\nfor a missing file\n')))
731
self.check_transport_contents('some text\nfor a missing file\n',
734
def test_append_bytes(self):
735
t = self.get_transport()
738
self.assertRaises(TransportNotPossible,
739
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
742
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
743
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
746
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
748
self.check_transport_contents(
749
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
752
# a file with no parent should fail..
753
self.assertRaises(NoSuchFile,
754
t.append_bytes, 'missing/path', 'content')
756
def test_append_multi(self):
757
t = self.get_transport()
761
t.put_bytes('a', 'diff\ncontents for\na\n'
762
'add\nsome\nmore\ncontents\n')
763
t.put_bytes('b', 'contents\nfor b\n')
765
self.assertEqual((43, 15),
766
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
767
('b', StringIO('some\nmore\nfor\nb\n'))]))
329
769
self.check_transport_contents(
330
770
'diff\ncontents for\na\n'
331
771
'add\nsome\nmore\ncontents\n'
721
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
722
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
723
1092
subdir = t.clone('subdir')
724
subdir.stat('./file')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1096
def test_hardlink(self):
1097
from stat import ST_NLINK
1099
t = self.get_transport()
1101
source_name = "original_target"
1102
link_name = "target_link"
1104
self.build_tree([source_name], transport=t)
1107
t.hardlink(source_name, link_name)
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1112
st = t.stat(link_name)
1113
self.assertEqual(st[ST_NLINK], 2)
1114
except TransportNotPossible:
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1116
self._server.__class__)
1118
def test_symlink(self):
1119
from stat import S_ISLNK
1121
t = self.get_transport()
1123
source_name = "original_target"
1124
link_name = "target_link"
1126
self.build_tree([source_name], transport=t)
1129
t.symlink(source_name, link_name)
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1134
st = t.stat(link_name)
1135
self.assertTrue(S_ISLNK(st.st_mode),
1136
"expected symlink, got mode %o" % st.st_mode)
1137
except TransportNotPossible:
1138
raise TestSkipped("Transport %s does not support symlinks." %
1139
self._server.__class__)
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
727
1143
def test_list_dir(self):
728
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
729
1145
t = self.get_transport()
731
1147
if not t.listable():
732
1148
self.assertRaises(TransportNotPossible, t.list_dir, '.')
736
l = list(t.list_dir(d))
1151
def sorted_list(d, transport):
1152
l = list(transport.list_dir(d))
740
# SftpServer creates control files in the working directory
741
# so lets move down a directory to avoid those.
742
if not t.is_readonly():
748
self.assertEqual([], sorted_list(u'.'))
1156
self.assertEqual([], sorted_list('.', t))
749
1157
# c2 is precisely one letter longer than c here to test that
750
1158
# suffixing is not confused.
1159
# a%25b checks that quoting is done consistently across transports
1160
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
751
1162
if not t.is_readonly():
752
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1163
self.build_tree(tree_names, transport=t)
754
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
756
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
757
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
1165
self.build_tree(tree_names)
1168
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1170
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1171
self.assertEqual(['d', 'e'], sorted_list('c', t))
1173
# Cloning the transport produces an equivalent listing
1174
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
759
1176
if not t.is_readonly():
766
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
767
self.assertEqual([u'e'], sorted_list(u'c'))
1183
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1184
self.assertEqual(['e'], sorted_list('c', t))
769
1186
self.assertListRaises(PathError, t.list_dir, 'q')
770
1187
self.assertListRaises(PathError, t.list_dir, 'c/f')
1188
# 'a' is a file, list_dir should raise an error
771
1189
self.assertListRaises(PathError, t.list_dir, 'a')
1191
def test_list_dir_result_is_url_escaped(self):
1192
t = self.get_transport()
1193
if not t.listable():
1194
raise TestSkipped("transport not listable")
1196
if not t.is_readonly():
1197
self.build_tree(['a/', 'a/%'], transport=t)
1199
self.build_tree(['a/', 'a/%'])
1201
names = list(t.list_dir('a'))
1202
self.assertEqual(['%25'], names)
1203
self.assertIsInstance(names[0], str)
1205
def test_clone_preserve_info(self):
1206
t1 = self.get_transport()
1207
if not isinstance(t1, ConnectedTransport):
1208
raise TestSkipped("not a connected transport")
1210
t2 = t1.clone('subdir')
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1217
def test__reuse_for(self):
1218
t = self.get_transport()
1219
if not isinstance(t, ConnectedTransport):
1220
raise TestSkipped("not a connected transport")
1222
def new_url(scheme=None, user=None, password=None,
1223
host=None, port=None, path=None):
1224
"""Build a new url from t.base changing only parts of it.
1226
Only the parameters different from None will be changed.
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1237
if t._parsed_url.scheme == 'ftp':
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1246
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
# passwords are not taken into account because:
1248
# - it makes no sense to have two different valid passwords for the
1250
# - _password in ConnectedTransport is intended to collect what the
1251
# user specified from the command-line and there are cases where the
1252
# new url can contain no password (if the url was built from an
1253
# existing transport.base for example)
1254
# - password are considered part of the credentials provided at
1255
# connection creation time and as such may not be present in the url
1256
# (they may be typed by the user when prompted for example)
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
# We will not connect, we can use a invalid host
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1264
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
# No point in trying to reuse a transport for a local URL
1266
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1268
def test_connection_sharing(self):
1269
t = self.get_transport()
1270
if not isinstance(t, ConnectedTransport):
1271
raise TestSkipped("not a connected transport")
1273
c = t.clone('subdir')
1274
# Some transports will create the connection only when needed
1275
t.has('surely_not') # Force connection
1276
self.assertIs(t._get_connection(), c._get_connection())
1278
# Temporary failure, we need to create a new dummy connection
1279
new_connection = None
1280
t._set_connection(new_connection)
1281
# Check that both transports use the same connection
1282
self.assertIs(new_connection, t._get_connection())
1283
self.assertIs(new_connection, c._get_connection())
1285
def test_reuse_connection_for_various_paths(self):
1286
t = self.get_transport()
1287
if not isinstance(t, ConnectedTransport):
1288
raise TestSkipped("not a connected transport")
1290
t.has('surely_not') # Force connection
1291
self.assertIsNot(None, t._get_connection())
1293
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
self.assertIsNot(t, subdir)
1295
self.assertIs(t._get_connection(), subdir._get_connection())
1297
home = subdir._reuse_for(t.base + 'home')
1298
self.assertIs(t._get_connection(), home._get_connection())
1299
self.assertIs(subdir._get_connection(), home._get_connection())
773
1301
def test_clone(self):
774
1302
# TODO: Test that clone moves up and down the filesystem
775
1303
t1 = self.get_transport()
777
1305
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
779
self.failUnless(t1.has('a'))
780
self.failUnless(t1.has('b/c'))
781
self.failIf(t1.has('c'))
1307
self.assertTrue(t1.has('a'))
1308
self.assertTrue(t1.has('b/c'))
1309
self.assertFalse(t1.has('c'))
783
1311
t2 = t1.clone('b')
784
1312
self.assertEqual(t1.base + 'b/', t2.base)
786
self.failUnless(t2.has('c'))
787
self.failIf(t2.has('a'))
1314
self.assertTrue(t2.has('c'))
1315
self.assertFalse(t2.has('a'))
789
1317
t3 = t2.clone('..')
790
self.failUnless(t3.has('a'))
791
self.failIf(t3.has('c'))
1318
self.assertTrue(t3.has('a'))
1319
self.assertFalse(t3.has('c'))
793
self.failIf(t1.has('b/d'))
794
self.failIf(t2.has('d'))
795
self.failIf(t3.has('b/d'))
1321
self.assertFalse(t1.has('b/d'))
1322
self.assertFalse(t2.has('d'))
1323
self.assertFalse(t3.has('b/d'))
797
1325
if t1.is_readonly():
798
open('b/d', 'wb').write('newfile\n')
1326
self.build_tree_contents([('b/d', 'newfile\n')])
800
t2.put('d', StringIO('newfile\n'))
802
self.failUnless(t1.has('b/d'))
803
self.failUnless(t2.has('d'))
804
self.failUnless(t3.has('b/d'))
1328
t2.put_bytes('d', 'newfile\n')
1330
self.assertTrue(t1.has('b/d'))
1331
self.assertTrue(t2.has('d'))
1332
self.assertTrue(t3.has('b/d'))
1334
def test_clone_to_root(self):
1335
orig_transport = self.get_transport()
1336
# Repeatedly go up to a parent directory until we're at the root
1337
# directory of this transport
1338
root_transport = orig_transport
1339
new_transport = root_transport.clone("..")
1340
# as we are walking up directories, the path must be
1341
# growing less, except at the top
1342
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1343
or new_transport.base == root_transport.base)
1344
while new_transport.base != root_transport.base:
1345
root_transport = new_transport
1346
new_transport = root_transport.clone("..")
1347
# as we are walking up directories, the path must be
1348
# growing less, except at the top
1349
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1350
or new_transport.base == root_transport.base)
1352
# Cloning to "/" should take us to exactly the same location.
1353
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1354
# the abspath of "/" from the original transport should be the same
1355
# as the base at the root:
1356
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1358
# At the root, the URL must still end with / as its a directory
1359
self.assertEqual(root_transport.base[-1], '/')
1361
def test_clone_from_root(self):
1362
"""At the root, cloning to a simple dir should just do string append."""
1363
orig_transport = self.get_transport()
1364
root_transport = orig_transport.clone('/')
1365
self.assertEqual(root_transport.base + '.bzr/',
1366
root_transport.clone('.bzr').base)
1368
def test_base_url(self):
1369
t = self.get_transport()
1370
self.assertEqual('/', t.base[-1])
806
1372
def test_relpath(self):
807
1373
t = self.get_transport()
808
1374
self.assertEqual('', t.relpath(t.base))
809
1375
# base ends with /
810
1376
self.assertEqual('', t.relpath(t.base[:-1]))
811
# subdirs which dont exist should still give relpaths.
1377
# subdirs which don't exist should still give relpaths.
812
1378
self.assertEqual('foo', t.relpath(t.base + 'foo'))
813
1379
# trailing slash should be the same.
814
1380
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
971
1644
def test_readv_out_of_order(self):
972
1645
transport = self.get_transport()
973
1646
if transport.is_readonly():
974
file('a', 'w').write('0123456789')
1647
with file('a', 'w') as f: f.write('0123456789')
976
transport.put('a', StringIO('01234567890'))
1649
transport.put_bytes('a', '01234567890')
978
1651
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
979
1652
self.assertEqual(d[0], (1, '1'))
980
1653
self.assertEqual(d[1], (9, '9'))
981
1654
self.assertEqual(d[2], (0, '0'))
982
1655
self.assertEqual(d[3], (3, '34'))
1657
def test_readv_with_adjust_for_latency(self):
1658
transport = self.get_transport()
1659
# the adjust for latency flag expands the data region returned
1660
# according to a per-transport heuristic, so testing is a little
1661
# tricky as we need more data than the largest combining that our
1662
# transports do. To accomodate this we generate random data and cross
1663
# reference the returned data with the random data. To avoid doing
1664
# multiple large random byte look ups we do several tests on the same
1666
content = osutils.rand_bytes(200*1024)
1667
content_size = len(content)
1668
if transport.is_readonly():
1669
self.build_tree_contents([('a', content)])
1671
transport.put_bytes('a', content)
1672
def check_result_data(result_vector):
1673
for item in result_vector:
1674
data_len = len(item[1])
1675
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1678
result = list(transport.readv('a', ((0, 30),),
1679
adjust_for_latency=True, upper_limit=content_size))
1680
# we expect 1 result, from 0, to something > 30
1681
self.assertEqual(1, len(result))
1682
self.assertEqual(0, result[0][0])
1683
self.assertTrue(len(result[0][1]) >= 30)
1684
check_result_data(result)
1685
# end of file corner case
1686
result = list(transport.readv('a', ((204700, 100),),
1687
adjust_for_latency=True, upper_limit=content_size))
1688
# we expect 1 result, from 204800- its length, to the end
1689
self.assertEqual(1, len(result))
1690
data_len = len(result[0][1])
1691
self.assertEqual(204800-data_len, result[0][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# out of order ranges are made in order
1695
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
adjust_for_latency=True, upper_limit=content_size))
1697
# we expect 2 results, in order, start and end.
1698
self.assertEqual(2, len(result))
1700
data_len = len(result[0][1])
1701
self.assertEqual(0, result[0][0])
1702
self.assertTrue(data_len >= 30)
1704
data_len = len(result[1][1])
1705
self.assertEqual(204800-data_len, result[1][0])
1706
self.assertTrue(data_len >= 100)
1707
check_result_data(result)
1708
# close ranges get combined (even if out of order)
1709
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
result = list(transport.readv('a', request_vector,
1711
adjust_for_latency=True, upper_limit=content_size))
1712
self.assertEqual(1, len(result))
1713
data_len = len(result[0][1])
1714
# minimum length is from 400 to 1034 - 634
1715
self.assertTrue(data_len >= 634)
1716
# must contain the region 400 to 1034
1717
self.assertTrue(result[0][0] <= 400)
1718
self.assertTrue(result[0][0] + data_len >= 1034)
1719
check_result_data(result)
1721
def test_readv_with_adjust_for_latency_with_big_file(self):
1722
transport = self.get_transport()
1723
# test from observed failure case.
1724
if transport.is_readonly():
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1727
transport.put_bytes('a', 'a'*1024*1024)
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
(465373, 800), (947422, 800)]
1731
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
found_items = [False]*9
1733
for pos, (start, length) in enumerate(broken_vector):
1734
# check the range is covered by the result
1735
for offset, data in results:
1736
if offset <= start and start + length <= offset + len(data):
1737
found_items[pos] = True
1738
self.assertEqual([True]*9, found_items)
1740
def test_get_with_open_write_stream_sees_all_content(self):
1741
t = self.get_transport()
1744
handle = t.open_write_stream('foo')
1747
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1751
def test_get_smart_medium(self):
1752
"""All transports must either give a smart medium, or know they can't.
1754
transport = self.get_transport()
1756
client_medium = transport.get_smart_medium()
1757
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1758
except errors.NoSmartMedium:
1759
# as long as we got it we're fine
1762
def test_readv_short_read(self):
1763
transport = self.get_transport()
1764
if transport.is_readonly():
1765
with file('a', 'w') as f: f.write('0123456789')
1767
transport.put_bytes('a', '01234567890')
1769
# This is intentionally reading off the end of the file
1770
# since we are sure that it cannot get there
1771
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1772
# Can be raised by paramiko
1774
transport.readv, 'a', [(1,1), (8,10)])
1776
# This is trying to seek past the end of the file, it should
1777
# also raise a special error
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1808
def test_stat_symlink(self):
1809
# if a transport points directly to a symlink (and supports symlinks
1810
# at all) you can tell this. helps with bug 32669.
1811
t = self.get_transport()
1813
t.symlink('target', 'link')
1814
except TransportNotPossible:
1815
raise TestSkipped("symlinks not supported")
1816
t2 = t.clone('link')
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)