1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
56
_get_transport_modules,
58
from bzrlib.transport.memory import MemoryTransport
61
def get_transport_test_permutations(module):
62
"""Get the permutations module wants to have tested."""
63
if getattr(module, 'get_test_permutations', None) is None:
65
"transport module %s doesn't provide get_test_permutations()"
68
return module.get_test_permutations()
71
def transport_test_permutations():
72
"""Return a list of the klass, server_factory pairs to test."""
74
for module in _get_transport_modules():
76
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
78
for (klass, server_factory) in permutations:
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
{"transport_class":klass,
81
"transport_server":server_factory})
82
result.append(scenario)
83
except errors.DependencyNotPresent, e:
84
# Continue even if a dependency prevents us
85
# from adding this test
90
def load_tests(standard_tests, module, loader):
91
"""Multiply tests for tranport implementations."""
92
result = loader.suiteClass()
93
scenarios = transport_test_permutations()
94
return multiply_tests(standard_tests, scenarios, result)
97
class TransportTests(TestTransportImplementation):
100
super(TransportTests, self).setUp()
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
103
def check_transport_contents(self, content, transport, relpath):
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
107
def test_ensure_base_missing(self):
108
""".ensure_base() should create the directory if it doesn't exist"""
109
t = self.get_transport()
111
if t_a.is_readonly():
112
self.assertRaises(TransportNotPossible,
115
self.assertTrue(t_a.ensure_base())
116
self.assertTrue(t.has('a'))
118
def test_ensure_base_exists(self):
119
""".ensure_base() should just be happy if it already exists"""
120
t = self.get_transport()
126
# ensure_base returns False if it didn't create the base
127
self.assertFalse(t_a.ensure_base())
129
def test_ensure_base_missing_parent(self):
130
""".ensure_base() will fail if the parent dir doesn't exist"""
131
t = self.get_transport()
137
self.assertRaises(NoSuchFile, t_b.ensure_base)
139
def test_external_url(self):
140
""".external_url either works or raises InProcessTransport."""
141
t = self.get_transport()
144
except errors.InProcessTransport:
148
t = self.get_transport()
150
files = ['a', 'b', 'e', 'g', '%']
151
self.build_tree(files, transport=t)
152
self.assertEqual(True, t.has('a'))
153
self.assertEqual(False, t.has('c'))
154
self.assertEqual(True, t.has(urlutils.escape('%')))
155
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
156
'e', 'f', 'g', 'h'])),
157
[True, True, False, False,
158
True, False, True, False])
159
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
160
self.assertEqual(False, t.has_any(['c', 'd', 'f',
161
urlutils.escape('%%')]))
162
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
163
'e', 'f', 'g', 'h']))),
164
[True, True, False, False,
165
True, False, True, False])
166
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
167
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
169
def test_has_root_works(self):
170
if self.transport_server is test_server.SmartTCPServer_for_testing:
171
raise TestNotApplicable(
172
"SmartTCPServer_for_testing intentionally does not allow "
174
current_transport = self.get_transport()
175
self.assertTrue(current_transport.has('/'))
176
root = current_transport.clone('/')
177
self.assertTrue(root.has(''))
180
t = self.get_transport()
182
files = ['a', 'b', 'e', 'g']
183
contents = ['contents of a\n',
188
self.build_tree(files, transport=t, line_endings='binary')
189
self.check_transport_contents('contents of a\n', t, 'a')
190
content_f = t.get_multi(files)
191
# Use itertools.izip() instead of use zip() or map(), since they fully
192
# evaluate their inputs, the transport requests should be issued and
193
# handled sequentially (we don't want to force transport to buffer).
194
for content, f in itertools.izip(contents, content_f):
195
self.assertEqual(content, f.read())
197
content_f = t.get_multi(iter(files))
198
# Use itertools.izip() for the same reason
199
for content, f in itertools.izip(contents, content_f):
200
self.assertEqual(content, f.read())
202
def test_get_unknown_file(self):
203
t = self.get_transport()
205
contents = ['contents of a\n',
208
self.build_tree(files, transport=t, line_endings='binary')
209
self.assertRaises(NoSuchFile, t.get, 'c')
210
def iterate_and_close(func, *args):
211
for f in func(*args):
212
# We call f.read() here because things like paramiko actually
213
# spawn a thread to prefetch the content, which we want to
214
# consume before we close the handle.
217
self.assertRaises(NoSuchFile, iterate_and_close,
218
t.get_multi, ['a', 'b', 'c'])
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, iter(['a', 'b', 'c']))
222
def test_get_directory_read_gives_ReadError(self):
223
"""consistent errors for read() on a file returned by get()."""
224
t = self.get_transport()
226
self.build_tree(['a directory/'])
228
t.mkdir('a%20directory')
229
# getting the file must either work or fail with a PathError
231
a_file = t.get('a%20directory')
232
except (errors.PathError, errors.RedirectRequested):
233
# early failure return immediately.
235
# having got a file, read() must either work (i.e. http reading a dir
236
# listing) or fail with ReadError
239
except errors.ReadError:
242
def test_get_bytes(self):
243
t = self.get_transport()
245
files = ['a', 'b', 'e', 'g']
246
contents = ['contents of a\n',
251
self.build_tree(files, transport=t, line_endings='binary')
252
self.check_transport_contents('contents of a\n', t, 'a')
254
for content, fname in zip(contents, files):
255
self.assertEqual(content, t.get_bytes(fname))
257
def test_get_bytes_unknown_file(self):
258
t = self.get_transport()
259
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
261
def test_get_with_open_write_stream_sees_all_content(self):
262
t = self.get_transport()
265
handle = t.open_write_stream('foo')
268
self.assertEqual('b', t.get_bytes('foo'))
272
def test_get_bytes_with_open_write_stream_sees_all_content(self):
273
t = self.get_transport()
276
handle = t.open_write_stream('foo')
279
self.assertEqual('b', t.get_bytes('foo'))
282
self.assertEqual('b', f.read())
288
def test_put_bytes(self):
289
t = self.get_transport()
292
self.assertRaises(TransportNotPossible,
293
t.put_bytes, 'a', 'some text for a\n')
296
t.put_bytes('a', 'some text for a\n')
297
self.assertTrue(t.has('a'))
298
self.check_transport_contents('some text for a\n', t, 'a')
300
# The contents should be overwritten
301
t.put_bytes('a', 'new text for a\n')
302
self.check_transport_contents('new text for a\n', t, 'a')
304
self.assertRaises(NoSuchFile,
305
t.put_bytes, 'path/doesnt/exist/c', 'contents')
307
def test_put_bytes_non_atomic(self):
308
t = self.get_transport()
311
self.assertRaises(TransportNotPossible,
312
t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
self.assertFalse(t.has('a'))
316
t.put_bytes_non_atomic('a', 'some text for a\n')
317
self.assertTrue(t.has('a'))
318
self.check_transport_contents('some text for a\n', t, 'a')
319
# Put also replaces contents
320
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
321
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
323
# Make sure we can create another file
324
t.put_bytes_non_atomic('d', 'contents for\nd\n')
325
# And overwrite 'a' with empty contents
326
t.put_bytes_non_atomic('a', '')
327
self.check_transport_contents('contents for\nd\n', t, 'd')
328
self.check_transport_contents('', t, 'a')
330
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
332
# Now test the create_parent flag
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
335
self.assertFalse(t.has('dir/a'))
336
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
337
create_parent_dir=True)
338
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
340
# But we still get NoSuchFile if we can't make the parent dir
341
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
343
create_parent_dir=True)
345
def test_put_bytes_permissions(self):
346
t = self.get_transport()
350
if not t._can_roundtrip_unix_modebits():
351
# Can't roundtrip, so no need to run this test
353
t.put_bytes('mode644', 'test text\n', mode=0644)
354
self.assertTransportMode(t, 'mode644', 0644)
355
t.put_bytes('mode666', 'test text\n', mode=0666)
356
self.assertTransportMode(t, 'mode666', 0666)
357
t.put_bytes('mode600', 'test text\n', mode=0600)
358
self.assertTransportMode(t, 'mode600', 0600)
359
# Yes, you can put_bytes a file such that it becomes readonly
360
t.put_bytes('mode400', 'test text\n', mode=0400)
361
self.assertTransportMode(t, 'mode400', 0400)
363
# The default permissions should be based on the current umask
364
umask = osutils.get_umask()
365
t.put_bytes('nomode', 'test text\n', mode=None)
366
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
368
def test_put_bytes_non_atomic_permissions(self):
369
t = self.get_transport()
373
if not t._can_roundtrip_unix_modebits():
374
# Can't roundtrip, so no need to run this test
376
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
377
self.assertTransportMode(t, 'mode644', 0644)
378
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
379
self.assertTransportMode(t, 'mode666', 0666)
380
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
381
self.assertTransportMode(t, 'mode600', 0600)
382
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
383
self.assertTransportMode(t, 'mode400', 0400)
385
# The default permissions should be based on the current umask
386
umask = osutils.get_umask()
387
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
388
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
390
# We should also be able to set the mode for a parent directory
392
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
393
dir_mode=0700, create_parent_dir=True)
394
self.assertTransportMode(t, 'dir700', 0700)
395
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
396
dir_mode=0770, create_parent_dir=True)
397
self.assertTransportMode(t, 'dir770', 0770)
398
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
399
dir_mode=0777, create_parent_dir=True)
400
self.assertTransportMode(t, 'dir777', 0777)
402
def test_put_file(self):
403
t = self.get_transport()
406
self.assertRaises(TransportNotPossible,
407
t.put_file, 'a', StringIO('some text for a\n'))
410
result = t.put_file('a', StringIO('some text for a\n'))
411
# put_file returns the length of the data written
412
self.assertEqual(16, result)
413
self.assertTrue(t.has('a'))
414
self.check_transport_contents('some text for a\n', t, 'a')
415
# Put also replaces contents
416
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
417
self.assertEqual(19, result)
418
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
419
self.assertRaises(NoSuchFile,
420
t.put_file, 'path/doesnt/exist/c',
421
StringIO('contents'))
423
def test_put_file_non_atomic(self):
424
t = self.get_transport()
427
self.assertRaises(TransportNotPossible,
428
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
self.assertFalse(t.has('a'))
432
t.put_file_non_atomic('a', StringIO('some text for a\n'))
433
self.assertTrue(t.has('a'))
434
self.check_transport_contents('some text for a\n', t, 'a')
435
# Put also replaces contents
436
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
437
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
439
# Make sure we can create another file
440
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
441
# And overwrite 'a' with empty contents
442
t.put_file_non_atomic('a', StringIO(''))
443
self.check_transport_contents('contents for\nd\n', t, 'd')
444
self.check_transport_contents('', t, 'a')
446
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
447
StringIO('contents\n'))
448
# Now test the create_parent flag
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
450
StringIO('contents\n'))
451
self.assertFalse(t.has('dir/a'))
452
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
453
create_parent_dir=True)
454
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
456
# But we still get NoSuchFile if we can't make the parent dir
457
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
458
StringIO('contents\n'),
459
create_parent_dir=True)
461
def test_put_file_permissions(self):
463
t = self.get_transport()
467
if not t._can_roundtrip_unix_modebits():
468
# Can't roundtrip, so no need to run this test
470
t.put_file('mode644', StringIO('test text\n'), mode=0644)
471
self.assertTransportMode(t, 'mode644', 0644)
472
t.put_file('mode666', StringIO('test text\n'), mode=0666)
473
self.assertTransportMode(t, 'mode666', 0666)
474
t.put_file('mode600', StringIO('test text\n'), mode=0600)
475
self.assertTransportMode(t, 'mode600', 0600)
476
# Yes, you can put a file such that it becomes readonly
477
t.put_file('mode400', StringIO('test text\n'), mode=0400)
478
self.assertTransportMode(t, 'mode400', 0400)
479
# The default permissions should be based on the current umask
480
umask = osutils.get_umask()
481
t.put_file('nomode', StringIO('test text\n'), mode=None)
482
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
484
def test_put_file_non_atomic_permissions(self):
485
t = self.get_transport()
489
if not t._can_roundtrip_unix_modebits():
490
# Can't roundtrip, so no need to run this test
492
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
493
self.assertTransportMode(t, 'mode644', 0644)
494
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
495
self.assertTransportMode(t, 'mode666', 0666)
496
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
497
self.assertTransportMode(t, 'mode600', 0600)
498
# Yes, you can put_file_non_atomic a file such that it becomes readonly
499
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
500
self.assertTransportMode(t, 'mode400', 0400)
502
# The default permissions should be based on the current umask
503
umask = osutils.get_umask()
504
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
505
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
507
# We should also be able to set the mode for a parent directory
510
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
511
dir_mode=0700, create_parent_dir=True)
512
self.assertTransportMode(t, 'dir700', 0700)
513
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
514
dir_mode=0770, create_parent_dir=True)
515
self.assertTransportMode(t, 'dir770', 0770)
516
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
517
dir_mode=0777, create_parent_dir=True)
518
self.assertTransportMode(t, 'dir777', 0777)
520
def test_put_bytes_unicode(self):
521
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
522
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
523
# (we don't want to encode unicode here at all, callers should be
524
# strictly passing bytes to put_bytes), but we allow it for backwards
525
# compatibility. At some point we should use a specific exception.
526
# See https://bugs.launchpad.net/bzr/+bug/106898.
527
t = self.get_transport()
530
unicode_string = u'\u1234'
532
(AssertionError, UnicodeEncodeError),
533
t.put_bytes, 'foo', unicode_string)
535
def test_put_file_unicode(self):
536
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
537
# This situation can happen (and has) if code is careless about the type
538
# of "string" they initialise/write to a StringIO with. We cannot use
539
# cStringIO, because it never returns unicode from read.
540
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
541
# raise, but we raise it for hysterical raisins.
542
t = self.get_transport()
545
unicode_file = pyStringIO(u'\u1234')
546
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
548
def test_mkdir(self):
549
t = self.get_transport()
552
# cannot mkdir on readonly transports. We're not testing for
553
# cache coherency because cache behaviour is not currently
554
# defined for the transport interface.
555
self.assertRaises(TransportNotPossible, t.mkdir, '.')
556
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
557
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
558
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
562
self.assertEqual(t.has('dir_a'), True)
563
self.assertEqual(t.has('dir_b'), False)
566
self.assertEqual(t.has('dir_b'), True)
568
t.mkdir_multi(['dir_c', 'dir_d'])
570
t.mkdir_multi(iter(['dir_e', 'dir_f']))
571
self.assertEqual(list(t.has_multi(
572
['dir_a', 'dir_b', 'dir_c', 'dir_q',
573
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
574
[True, True, True, False,
575
True, True, True, True])
577
# we were testing that a local mkdir followed by a transport
578
# mkdir failed thusly, but given that we * in one process * do not
579
# concurrently fiddle with disk dirs and then use transport to do
580
# things, the win here seems marginal compared to the constraint on
581
# the interface. RBC 20051227
583
self.assertRaises(FileExists, t.mkdir, 'dir_g')
585
# Test get/put in sub-directories
586
t.put_bytes('dir_a/a', 'contents of dir_a/a')
587
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
588
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
589
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
591
# mkdir of a dir with an absent parent
592
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
594
def test_mkdir_permissions(self):
595
t = self.get_transport()
598
if not t._can_roundtrip_unix_modebits():
599
# no sense testing on this transport
601
# Test mkdir with a mode
602
t.mkdir('dmode755', mode=0755)
603
self.assertTransportMode(t, 'dmode755', 0755)
604
t.mkdir('dmode555', mode=0555)
605
self.assertTransportMode(t, 'dmode555', 0555)
606
t.mkdir('dmode777', mode=0777)
607
self.assertTransportMode(t, 'dmode777', 0777)
608
t.mkdir('dmode700', mode=0700)
609
self.assertTransportMode(t, 'dmode700', 0700)
610
t.mkdir_multi(['mdmode755'], mode=0755)
611
self.assertTransportMode(t, 'mdmode755', 0755)
613
# Default mode should be based on umask
614
umask = osutils.get_umask()
615
t.mkdir('dnomode', mode=None)
616
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
618
def test_opening_a_file_stream_creates_file(self):
619
t = self.get_transport()
622
handle = t.open_write_stream('foo')
624
self.assertEqual('', t.get_bytes('foo'))
628
def test_opening_a_file_stream_can_set_mode(self):
629
t = self.get_transport()
632
if not t._can_roundtrip_unix_modebits():
633
# Can't roundtrip, so no need to run this test
635
def check_mode(name, mode, expected):
636
handle = t.open_write_stream(name, mode=mode)
638
self.assertTransportMode(t, name, expected)
639
check_mode('mode644', 0644, 0644)
640
check_mode('mode666', 0666, 0666)
641
check_mode('mode600', 0600, 0600)
642
# The default permissions should be based on the current umask
643
check_mode('nomode', None, 0666 & ~osutils.get_umask())
645
def test_copy_to(self):
646
# FIXME: test: same server to same server (partly done)
647
# same protocol two servers
648
# and different protocols (done for now except for MemoryTransport.
651
def simple_copy_files(transport_from, transport_to):
652
files = ['a', 'b', 'c', 'd']
653
self.build_tree(files, transport=transport_from)
654
self.assertEqual(4, transport_from.copy_to(files, transport_to))
656
self.check_transport_contents(transport_to.get_bytes(f),
659
t = self.get_transport()
660
temp_transport = MemoryTransport('memory:///')
661
simple_copy_files(t, temp_transport)
662
if not t.is_readonly():
663
t.mkdir('copy_to_simple')
664
t2 = t.clone('copy_to_simple')
665
simple_copy_files(t, t2)
668
# Test that copying into a missing directory raises
671
self.build_tree(['e/', 'e/f'])
674
t.put_bytes('e/f', 'contents of e')
675
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
676
temp_transport.mkdir('e')
677
t.copy_to(['e/f'], temp_transport)
680
temp_transport = MemoryTransport('memory:///')
682
files = ['a', 'b', 'c', 'd']
683
t.copy_to(iter(files), temp_transport)
685
self.check_transport_contents(temp_transport.get_bytes(f),
689
for mode in (0666, 0644, 0600, 0400):
690
temp_transport = MemoryTransport("memory:///")
691
t.copy_to(files, temp_transport, mode=mode)
693
self.assertTransportMode(temp_transport, f, mode)
695
def test_create_prefix(self):
696
t = self.get_transport()
697
sub = t.clone('foo').clone('bar')
700
except TransportNotPossible:
701
self.assertTrue(t.is_readonly())
703
self.assertTrue(t.has('foo/bar'))
705
def test_append_file(self):
706
t = self.get_transport()
709
self.assertRaises(TransportNotPossible,
710
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
712
t.put_bytes('a', 'diff\ncontents for\na\n')
713
t.put_bytes('b', 'contents\nfor b\n')
716
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
718
self.check_transport_contents(
719
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
722
# a file with no parent should fail..
723
self.assertRaises(NoSuchFile,
724
t.append_file, 'missing/path', StringIO('content'))
726
# And we can create new files, too
728
t.append_file('c', StringIO('some text\nfor a missing file\n')))
729
self.check_transport_contents('some text\nfor a missing file\n',
732
def test_append_bytes(self):
733
t = self.get_transport()
736
self.assertRaises(TransportNotPossible,
737
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
740
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
741
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
744
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
746
self.check_transport_contents(
747
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
750
# a file with no parent should fail..
751
self.assertRaises(NoSuchFile,
752
t.append_bytes, 'missing/path', 'content')
754
def test_append_multi(self):
755
t = self.get_transport()
759
t.put_bytes('a', 'diff\ncontents for\na\n'
760
'add\nsome\nmore\ncontents\n')
761
t.put_bytes('b', 'contents\nfor b\n')
763
self.assertEqual((43, 15),
764
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
765
('b', StringIO('some\nmore\nfor\nb\n'))]))
767
self.check_transport_contents(
768
'diff\ncontents for\na\n'
769
'add\nsome\nmore\ncontents\n'
770
'and\nthen\nsome\nmore\n',
772
self.check_transport_contents(
774
'some\nmore\nfor\nb\n',
777
self.assertEqual((62, 31),
778
t.append_multi(iter([('a', StringIO('a little bit more\n')),
779
('b', StringIO('from an iterator\n'))])))
780
self.check_transport_contents(
781
'diff\ncontents for\na\n'
782
'add\nsome\nmore\ncontents\n'
783
'and\nthen\nsome\nmore\n'
784
'a little bit more\n',
786
self.check_transport_contents(
788
'some\nmore\nfor\nb\n'
789
'from an iterator\n',
792
self.assertEqual((80, 0),
793
t.append_multi([('a', StringIO('some text in a\n')),
794
('d', StringIO('missing file r\n'))]))
796
self.check_transport_contents(
797
'diff\ncontents for\na\n'
798
'add\nsome\nmore\ncontents\n'
799
'and\nthen\nsome\nmore\n'
800
'a little bit more\n'
803
self.check_transport_contents('missing file r\n', t, 'd')
805
def test_append_file_mode(self):
806
"""Check that append accepts a mode parameter"""
807
# check append accepts a mode
808
t = self.get_transport()
810
self.assertRaises(TransportNotPossible,
811
t.append_file, 'f', StringIO('f'), mode=None)
813
t.append_file('f', StringIO('f'), mode=None)
815
def test_append_bytes_mode(self):
816
# check append_bytes accepts a mode
817
t = self.get_transport()
819
self.assertRaises(TransportNotPossible,
820
t.append_bytes, 'f', 'f', mode=None)
822
t.append_bytes('f', 'f', mode=None)
824
def test_delete(self):
825
# TODO: Test Transport.delete
826
t = self.get_transport()
828
# Not much to do with a readonly transport
830
self.assertRaises(TransportNotPossible, t.delete, 'missing')
833
t.put_bytes('a', 'a little bit of text\n')
834
self.assertTrue(t.has('a'))
836
self.assertFalse(t.has('a'))
838
self.assertRaises(NoSuchFile, t.delete, 'a')
840
t.put_bytes('a', 'a text\n')
841
t.put_bytes('b', 'b text\n')
842
t.put_bytes('c', 'c text\n')
843
self.assertEqual([True, True, True],
844
list(t.has_multi(['a', 'b', 'c'])))
845
t.delete_multi(['a', 'c'])
846
self.assertEqual([False, True, False],
847
list(t.has_multi(['a', 'b', 'c'])))
848
self.assertFalse(t.has('a'))
849
self.assertTrue(t.has('b'))
850
self.assertFalse(t.has('c'))
852
self.assertRaises(NoSuchFile,
853
t.delete_multi, ['a', 'b', 'c'])
855
self.assertRaises(NoSuchFile,
856
t.delete_multi, iter(['a', 'b', 'c']))
858
t.put_bytes('a', 'another a text\n')
859
t.put_bytes('c', 'another c text\n')
860
t.delete_multi(iter(['a', 'b', 'c']))
862
# We should have deleted everything
863
# SftpServer creates control files in the
864
# working directory, so we can just do a
866
# self.assertEqual([], os.listdir('.'))
868
def test_recommended_page_size(self):
869
"""Transports recommend a page size for partial access to files."""
870
t = self.get_transport()
871
self.assertIsInstance(t.recommended_page_size(), int)
873
def test_rmdir(self):
874
t = self.get_transport()
875
# Not much to do with a readonly transport
877
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
882
# ftp may not be able to raise NoSuchFile for lack of
883
# details when failing
884
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
886
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
888
def test_rmdir_not_empty(self):
889
"""Deleting a non-empty directory raises an exception
891
sftp (and possibly others) don't give us a specific "directory not
892
empty" exception -- we can just see that the operation failed.
894
t = self.get_transport()
899
self.assertRaises(PathError, t.rmdir, 'adir')
901
def test_rmdir_empty_but_similar_prefix(self):
902
"""rmdir does not get confused by sibling paths.
904
A naive implementation of MemoryTransport would refuse to rmdir
905
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
906
uses "path.startswith(dir)" on all file paths to determine if directory
909
t = self.get_transport()
913
t.put_bytes('foo-bar', '')
916
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
917
self.assertTrue(t.has('foo-bar'))
919
def test_rename_dir_succeeds(self):
920
t = self.get_transport()
922
raise TestSkipped("transport is readonly")
924
t.mkdir('adir/asubdir')
925
t.rename('adir', 'bdir')
926
self.assertTrue(t.has('bdir/asubdir'))
927
self.assertFalse(t.has('adir'))
929
def test_rename_dir_nonempty(self):
930
"""Attempting to replace a nonemtpy directory should fail"""
931
t = self.get_transport()
933
raise TestSkipped("transport is readonly")
935
t.mkdir('adir/asubdir')
937
t.mkdir('bdir/bsubdir')
938
# any kind of PathError would be OK, though we normally expect
940
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
941
# nothing was changed so it should still be as before
942
self.assertTrue(t.has('bdir/bsubdir'))
943
self.assertFalse(t.has('adir/bdir'))
944
self.assertFalse(t.has('adir/bsubdir'))
946
def test_rename_across_subdirs(self):
947
t = self.get_transport()
949
raise TestNotApplicable("transport is readonly")
954
ta.put_bytes('f', 'aoeu')
955
ta.rename('f', '../b/f')
956
self.assertTrue(tb.has('f'))
957
self.assertFalse(ta.has('f'))
958
self.assertTrue(t.has('b/f'))
960
def test_delete_tree(self):
961
t = self.get_transport()
963
# Not much to do with a readonly transport
965
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
968
# and does it like listing ?
971
t.delete_tree('adir')
972
except TransportNotPossible:
973
# ok, this transport does not support delete_tree
976
# did it delete that trivial case?
977
self.assertRaises(NoSuchFile, t.stat, 'adir')
979
self.build_tree(['adir/',
987
t.delete_tree('adir')
988
# adir should be gone now.
989
self.assertRaises(NoSuchFile, t.stat, 'adir')
992
t = self.get_transport()
997
# TODO: I would like to use os.listdir() to
998
# make sure there are no extra files, but SftpServer
999
# creates control files in the working directory
1000
# perhaps all of this could be done in a subdirectory
1002
t.put_bytes('a', 'a first file\n')
1003
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1006
self.assertTrue(t.has('b'))
1007
self.assertFalse(t.has('a'))
1009
self.check_transport_contents('a first file\n', t, 'b')
1010
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1013
t.put_bytes('c', 'c this file\n')
1015
self.assertFalse(t.has('c'))
1016
self.check_transport_contents('c this file\n', t, 'b')
1018
# TODO: Try to write a test for atomicity
1019
# TODO: Test moving into a non-existent subdirectory
1020
# TODO: Test Transport.move_multi
1022
def test_copy(self):
1023
t = self.get_transport()
1028
t.put_bytes('a', 'a file\n')
1030
self.check_transport_contents('a file\n', t, 'b')
1032
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1034
# What should the assert be if you try to copy a
1035
# file over a directory?
1036
#self.assertRaises(Something, t.copy, 'a', 'c')
1037
t.put_bytes('d', 'text in d\n')
1039
self.check_transport_contents('text in d\n', t, 'b')
1041
# TODO: test copy_multi
1043
def test_connection_error(self):
1044
"""ConnectionError is raised when connection is impossible.
1046
The error should be raised from the first operation on the transport.
1049
url = self._server.get_bogus_url()
1050
except NotImplementedError:
1051
raise TestSkipped("Transport %s has no bogus URL support." %
1052
self._server.__class__)
1053
t = _mod_transport.get_transport_from_url(url)
1054
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1056
def test_stat(self):
1057
# TODO: Test stat, just try once, and if it throws, stop testing
1058
from stat import S_ISDIR, S_ISREG
1060
t = self.get_transport()
1064
except TransportNotPossible, e:
1065
# This transport cannot stat
1068
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1069
sizes = [14, 0, 16, 0, 18]
1070
self.build_tree(paths, transport=t, line_endings='binary')
1072
for path, size in zip(paths, sizes):
1074
if path.endswith('/'):
1075
self.assertTrue(S_ISDIR(st.st_mode))
1076
# directory sizes are meaningless
1078
self.assertTrue(S_ISREG(st.st_mode))
1079
self.assertEqual(size, st.st_size)
1081
remote_stats = list(t.stat_multi(paths))
1082
remote_iter_stats = list(t.stat_multi(iter(paths)))
1084
self.assertRaises(NoSuchFile, t.stat, 'q')
1085
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1087
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1088
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1089
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1090
subdir = t.clone('subdir')
1091
st = subdir.stat('./file')
1092
st = subdir.stat('.')
1094
def test_hardlink(self):
1095
from stat import ST_NLINK
1097
t = self.get_transport()
1099
source_name = "original_target"
1100
link_name = "target_link"
1102
self.build_tree([source_name], transport=t)
1105
t.hardlink(source_name, link_name)
1107
self.assertTrue(t.has(source_name))
1108
self.assertTrue(t.has(link_name))
1110
st = t.stat(link_name)
1111
self.assertEqual(st[ST_NLINK], 2)
1112
except TransportNotPossible:
1113
raise TestSkipped("Transport %s does not support hardlinks." %
1114
self._server.__class__)
1116
def test_symlink(self):
1117
from stat import S_ISLNK
1119
t = self.get_transport()
1121
source_name = "original_target"
1122
link_name = "target_link"
1124
self.build_tree([source_name], transport=t)
1127
t.symlink(source_name, link_name)
1129
self.assertTrue(t.has(source_name))
1130
self.assertTrue(t.has(link_name))
1132
st = t.stat(link_name)
1133
self.assertTrue(S_ISLNK(st.st_mode),
1134
"expected symlink, got mode %o" % st.st_mode)
1135
except TransportNotPossible:
1136
raise TestSkipped("Transport %s does not support symlinks." %
1137
self._server.__class__)
1139
self.knownFailure("Paramiko fails to create symlinks during tests")
1141
def test_list_dir(self):
1142
# TODO: Test list_dir, just try once, and if it throws, stop testing
1143
t = self.get_transport()
1145
if not t.listable():
1146
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1149
def sorted_list(d, transport):
1150
l = list(transport.list_dir(d))
1154
self.assertEqual([], sorted_list('.', t))
1155
# c2 is precisely one letter longer than c here to test that
1156
# suffixing is not confused.
1157
# a%25b checks that quoting is done consistently across transports
1158
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1160
if not t.is_readonly():
1161
self.build_tree(tree_names, transport=t)
1163
self.build_tree(tree_names)
1166
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1168
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1169
self.assertEqual(['d', 'e'], sorted_list('c', t))
1171
# Cloning the transport produces an equivalent listing
1172
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1174
if not t.is_readonly():
1181
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1182
self.assertEqual(['e'], sorted_list('c', t))
1184
self.assertListRaises(PathError, t.list_dir, 'q')
1185
self.assertListRaises(PathError, t.list_dir, 'c/f')
1186
# 'a' is a file, list_dir should raise an error
1187
self.assertListRaises(PathError, t.list_dir, 'a')
1189
def test_list_dir_result_is_url_escaped(self):
1190
t = self.get_transport()
1191
if not t.listable():
1192
raise TestSkipped("transport not listable")
1194
if not t.is_readonly():
1195
self.build_tree(['a/', 'a/%'], transport=t)
1197
self.build_tree(['a/', 'a/%'])
1199
names = list(t.list_dir('a'))
1200
self.assertEqual(['%25'], names)
1201
self.assertIsInstance(names[0], str)
1203
def test_clone_preserve_info(self):
1204
t1 = self.get_transport()
1205
if not isinstance(t1, ConnectedTransport):
1206
raise TestSkipped("not a connected transport")
1208
t2 = t1.clone('subdir')
1209
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1210
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1211
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1212
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1213
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1215
def test__reuse_for(self):
1216
t = self.get_transport()
1217
if not isinstance(t, ConnectedTransport):
1218
raise TestSkipped("not a connected transport")
1220
def new_url(scheme=None, user=None, password=None,
1221
host=None, port=None, path=None):
1222
"""Build a new url from t.base changing only parts of it.
1224
Only the parameters different from None will be changed.
1226
if scheme is None: scheme = t._parsed_url.scheme
1227
if user is None: user = t._parsed_url.user
1228
if password is None: password = t._parsed_url.password
1229
if user is None: user = t._parsed_url.user
1230
if host is None: host = t._parsed_url.host
1231
if port is None: port = t._parsed_url.port
1232
if path is None: path = t._parsed_url.path
1233
return str(urlutils.URL(scheme, user, password, host, port, path))
1235
if t._parsed_url.scheme == 'ftp':
1239
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1240
if t._parsed_url.user == 'me':
1244
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1245
# passwords are not taken into account because:
1246
# - it makes no sense to have two different valid passwords for the
1248
# - _password in ConnectedTransport is intended to collect what the
1249
# user specified from the command-line and there are cases where the
1250
# new url can contain no password (if the url was built from an
1251
# existing transport.base for example)
1252
# - password are considered part of the credentials provided at
1253
# connection creation time and as such may not be present in the url
1254
# (they may be typed by the user when prompted for example)
1255
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1256
# We will not connect, we can use a invalid host
1257
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1258
if t._parsed_url.port == 1234:
1262
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1263
# No point in trying to reuse a transport for a local URL
1264
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1266
def test_connection_sharing(self):
1267
t = self.get_transport()
1268
if not isinstance(t, ConnectedTransport):
1269
raise TestSkipped("not a connected transport")
1271
c = t.clone('subdir')
1272
# Some transports will create the connection only when needed
1273
t.has('surely_not') # Force connection
1274
self.assertIs(t._get_connection(), c._get_connection())
1276
# Temporary failure, we need to create a new dummy connection
1277
new_connection = None
1278
t._set_connection(new_connection)
1279
# Check that both transports use the same connection
1280
self.assertIs(new_connection, t._get_connection())
1281
self.assertIs(new_connection, c._get_connection())
1283
def test_reuse_connection_for_various_paths(self):
1284
t = self.get_transport()
1285
if not isinstance(t, ConnectedTransport):
1286
raise TestSkipped("not a connected transport")
1288
t.has('surely_not') # Force connection
1289
self.assertIsNot(None, t._get_connection())
1291
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1292
self.assertIsNot(t, subdir)
1293
self.assertIs(t._get_connection(), subdir._get_connection())
1295
home = subdir._reuse_for(t.base + 'home')
1296
self.assertIs(t._get_connection(), home._get_connection())
1297
self.assertIs(subdir._get_connection(), home._get_connection())
1299
def test_clone(self):
1300
# TODO: Test that clone moves up and down the filesystem
1301
t1 = self.get_transport()
1303
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1305
self.assertTrue(t1.has('a'))
1306
self.assertTrue(t1.has('b/c'))
1307
self.assertFalse(t1.has('c'))
1310
self.assertEqual(t1.base + 'b/', t2.base)
1312
self.assertTrue(t2.has('c'))
1313
self.assertFalse(t2.has('a'))
1316
self.assertTrue(t3.has('a'))
1317
self.assertFalse(t3.has('c'))
1319
self.assertFalse(t1.has('b/d'))
1320
self.assertFalse(t2.has('d'))
1321
self.assertFalse(t3.has('b/d'))
1323
if t1.is_readonly():
1324
self.build_tree_contents([('b/d', 'newfile\n')])
1326
t2.put_bytes('d', 'newfile\n')
1328
self.assertTrue(t1.has('b/d'))
1329
self.assertTrue(t2.has('d'))
1330
self.assertTrue(t3.has('b/d'))
1332
def test_clone_to_root(self):
1333
orig_transport = self.get_transport()
1334
# Repeatedly go up to a parent directory until we're at the root
1335
# directory of this transport
1336
root_transport = orig_transport
1337
new_transport = root_transport.clone("..")
1338
# as we are walking up directories, the path must be
1339
# growing less, except at the top
1340
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1341
or new_transport.base == root_transport.base)
1342
while new_transport.base != root_transport.base:
1343
root_transport = new_transport
1344
new_transport = root_transport.clone("..")
1345
# as we are walking up directories, the path must be
1346
# growing less, except at the top
1347
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1348
or new_transport.base == root_transport.base)
1350
# Cloning to "/" should take us to exactly the same location.
1351
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1352
# the abspath of "/" from the original transport should be the same
1353
# as the base at the root:
1354
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1356
# At the root, the URL must still end with / as its a directory
1357
self.assertEqual(root_transport.base[-1], '/')
1359
def test_clone_from_root(self):
1360
"""At the root, cloning to a simple dir should just do string append."""
1361
orig_transport = self.get_transport()
1362
root_transport = orig_transport.clone('/')
1363
self.assertEqual(root_transport.base + '.bzr/',
1364
root_transport.clone('.bzr').base)
1366
def test_base_url(self):
1367
t = self.get_transport()
1368
self.assertEqual('/', t.base[-1])
1370
def test_relpath(self):
1371
t = self.get_transport()
1372
self.assertEqual('', t.relpath(t.base))
1374
self.assertEqual('', t.relpath(t.base[:-1]))
1375
# subdirs which don't exist should still give relpaths.
1376
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1377
# trailing slash should be the same.
1378
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1380
def test_relpath_at_root(self):
1381
t = self.get_transport()
1382
# clone all the way to the top
1383
new_transport = t.clone('..')
1384
while new_transport.base != t.base:
1386
new_transport = t.clone('..')
1387
# we must be able to get a relpath below the root
1388
self.assertEqual('', t.relpath(t.base))
1389
# and a deeper one should work too
1390
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1392
def test_abspath(self):
1393
# smoke test for abspath. Corner cases for backends like unix fs's
1394
# that have aliasing problems like symlinks should go in backend
1395
# specific test cases.
1396
transport = self.get_transport()
1398
self.assertEqual(transport.base + 'relpath',
1399
transport.abspath('relpath'))
1401
# This should work without raising an error.
1402
transport.abspath("/")
1404
# the abspath of "/" and "/foo/.." should result in the same location
1405
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1407
self.assertEqual(transport.clone("/").abspath('foo'),
1408
transport.abspath("/foo"))
1410
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1411
def test_win32_abspath(self):
1412
# Note: we tried to set sys.platform='win32' so we could test on
1413
# other platforms too, but then osutils does platform specific
1414
# things at import time which defeated us...
1415
if sys.platform != 'win32':
1417
'Testing drive letters in abspath implemented only for win32')
1419
# smoke test for abspath on win32.
1420
# a transport based on 'file:///' never fully qualifies the drive.
1421
transport = _mod_transport.get_transport_from_url("file:///")
1422
self.assertEqual(transport.abspath("/"), "file:///")
1424
# but a transport that starts with a drive spec must keep it.
1425
transport = _mod_transport.get_transport_from_url("file:///C:/")
1426
self.assertEqual(transport.abspath("/"), "file:///C:/")
1428
def test_local_abspath(self):
1429
transport = self.get_transport()
1431
p = transport.local_abspath('.')
1432
except (errors.NotLocalUrl, TransportNotPossible), e:
1433
# should be formattable
1436
self.assertEqual(getcwd(), p)
1438
def test_abspath_at_root(self):
1439
t = self.get_transport()
1440
# clone all the way to the top
1441
new_transport = t.clone('..')
1442
while new_transport.base != t.base:
1444
new_transport = t.clone('..')
1445
# we must be able to get a abspath of the root when we ask for
1446
# t.abspath('..') - this due to our choice that clone('..')
1447
# should return the root from the root, combined with the desire that
1448
# the url from clone('..') and from abspath('..') should be the same.
1449
self.assertEqual(t.base, t.abspath('..'))
1450
# '' should give us the root
1451
self.assertEqual(t.base, t.abspath(''))
1452
# and a path should append to the url
1453
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1455
def test_iter_files_recursive(self):
1456
transport = self.get_transport()
1457
if not transport.listable():
1458
self.assertRaises(TransportNotPossible,
1459
transport.iter_files_recursive)
1461
self.build_tree(['isolated/',
1465
'isolated/dir/b%25z', # make sure quoting is correct
1467
transport=transport)
1468
paths = set(transport.iter_files_recursive())
1469
# nb the directories are not converted
1470
self.assertEqual(paths,
1471
set(['isolated/dir/foo',
1473
'isolated/dir/b%2525z',
1475
sub_transport = transport.clone('isolated')
1476
paths = set(sub_transport.iter_files_recursive())
1477
self.assertEqual(paths,
1478
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1480
def test_copy_tree(self):
1481
# TODO: test file contents and permissions are preserved. This test was
1482
# added just to ensure that quoting was handled correctly.
1483
# -- David Allouche 2006-08-11
1484
transport = self.get_transport()
1485
if not transport.listable():
1486
self.assertRaises(TransportNotPossible,
1487
transport.iter_files_recursive)
1489
if transport.is_readonly():
1491
self.build_tree(['from/',
1495
'from/dir/b%25z', # make sure quoting is correct
1497
transport=transport)
1498
transport.copy_tree('from', 'to')
1499
paths = set(transport.iter_files_recursive())
1500
self.assertEqual(paths,
1501
set(['from/dir/foo',
1510
def test_copy_tree_to_transport(self):
1511
transport = self.get_transport()
1512
if not transport.listable():
1513
self.assertRaises(TransportNotPossible,
1514
transport.iter_files_recursive)
1516
if transport.is_readonly():
1518
self.build_tree(['from/',
1522
'from/dir/b%25z', # make sure quoting is correct
1524
transport=transport)
1525
from_transport = transport.clone('from')
1526
to_transport = transport.clone('to')
1527
to_transport.ensure_base()
1528
from_transport.copy_tree_to_transport(to_transport)
1529
paths = set(transport.iter_files_recursive())
1530
self.assertEqual(paths,
1531
set(['from/dir/foo',
1540
def test_unicode_paths(self):
1541
"""Test that we can read/write files with Unicode names."""
1542
t = self.get_transport()
1544
# With FAT32 and certain encodings on win32
1545
# '\xe5' and '\xe4' actually map to the same file
1546
# adding a suffix kicks in the 'preserving but insensitive'
1547
# route, and maintains the right files
1548
files = [u'\xe5.1', # a w/ circle iso-8859-1
1549
u'\xe4.2', # a w/ dots iso-8859-1
1550
u'\u017d', # Z with umlat iso-8859-2
1551
u'\u062c', # Arabic j
1552
u'\u0410', # Russian A
1553
u'\u65e5', # Kanji person
1556
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1557
if no_unicode_support:
1558
self.knownFailure("test server cannot handle unicode paths")
1561
self.build_tree(files, transport=t, line_endings='binary')
1562
except UnicodeError:
1563
raise TestSkipped("cannot handle unicode paths in current encoding")
1565
# A plain unicode string is not a valid url
1567
self.assertRaises(InvalidURL, t.get, fname)
1570
fname_utf8 = fname.encode('utf-8')
1571
contents = 'contents of %s\n' % (fname_utf8,)
1572
self.check_transport_contents(contents, t, urlutils.escape(fname))
1574
def test_connect_twice_is_same_content(self):
1575
# check that our server (whatever it is) is accessible reliably
1576
# via get_transport and multiple connections share content.
1577
transport = self.get_transport()
1578
if transport.is_readonly():
1580
transport.put_bytes('foo', 'bar')
1581
transport3 = self.get_transport()
1582
self.check_transport_contents('bar', transport3, 'foo')
1584
# now opening at a relative url should give use a sane result:
1585
transport.mkdir('newdir')
1586
transport5 = self.get_transport('newdir')
1587
transport6 = transport5.clone('..')
1588
self.check_transport_contents('bar', transport6, 'foo')
1590
def test_lock_write(self):
1591
"""Test transport-level write locks.
1593
These are deprecated and transports may decline to support them.
1595
transport = self.get_transport()
1596
if transport.is_readonly():
1597
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1599
transport.put_bytes('lock', '')
1601
lock = transport.lock_write('lock')
1602
except TransportNotPossible:
1604
# TODO make this consistent on all platforms:
1605
# self.assertRaises(LockError, transport.lock_write, 'lock')
1608
def test_lock_read(self):
1609
"""Test transport-level read locks.
1611
These are deprecated and transports may decline to support them.
1613
transport = self.get_transport()
1614
if transport.is_readonly():
1615
file('lock', 'w').close()
1617
transport.put_bytes('lock', '')
1619
lock = transport.lock_read('lock')
1620
except TransportNotPossible:
1622
# TODO make this consistent on all platforms:
1623
# self.assertRaises(LockError, transport.lock_read, 'lock')
1626
def test_readv(self):
1627
transport = self.get_transport()
1628
if transport.is_readonly():
1629
with file('a', 'w') as f: f.write('0123456789')
1631
transport.put_bytes('a', '0123456789')
1633
d = list(transport.readv('a', ((0, 1),)))
1634
self.assertEqual(d[0], (0, '0'))
1636
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1637
self.assertEqual(d[0], (0, '0'))
1638
self.assertEqual(d[1], (1, '1'))
1639
self.assertEqual(d[2], (3, '34'))
1640
self.assertEqual(d[3], (9, '9'))
1642
def test_readv_out_of_order(self):
1643
transport = self.get_transport()
1644
if transport.is_readonly():
1645
with file('a', 'w') as f: f.write('0123456789')
1647
transport.put_bytes('a', '01234567890')
1649
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1650
self.assertEqual(d[0], (1, '1'))
1651
self.assertEqual(d[1], (9, '9'))
1652
self.assertEqual(d[2], (0, '0'))
1653
self.assertEqual(d[3], (3, '34'))
1655
def test_readv_with_adjust_for_latency(self):
1656
transport = self.get_transport()
1657
# the adjust for latency flag expands the data region returned
1658
# according to a per-transport heuristic, so testing is a little
1659
# tricky as we need more data than the largest combining that our
1660
# transports do. To accomodate this we generate random data and cross
1661
# reference the returned data with the random data. To avoid doing
1662
# multiple large random byte look ups we do several tests on the same
1664
content = osutils.rand_bytes(200*1024)
1665
content_size = len(content)
1666
if transport.is_readonly():
1667
self.build_tree_contents([('a', content)])
1669
transport.put_bytes('a', content)
1670
def check_result_data(result_vector):
1671
for item in result_vector:
1672
data_len = len(item[1])
1673
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1676
result = list(transport.readv('a', ((0, 30),),
1677
adjust_for_latency=True, upper_limit=content_size))
1678
# we expect 1 result, from 0, to something > 30
1679
self.assertEqual(1, len(result))
1680
self.assertEqual(0, result[0][0])
1681
self.assertTrue(len(result[0][1]) >= 30)
1682
check_result_data(result)
1683
# end of file corner case
1684
result = list(transport.readv('a', ((204700, 100),),
1685
adjust_for_latency=True, upper_limit=content_size))
1686
# we expect 1 result, from 204800- its length, to the end
1687
self.assertEqual(1, len(result))
1688
data_len = len(result[0][1])
1689
self.assertEqual(204800-data_len, result[0][0])
1690
self.assertTrue(data_len >= 100)
1691
check_result_data(result)
1692
# out of order ranges are made in order
1693
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1694
adjust_for_latency=True, upper_limit=content_size))
1695
# we expect 2 results, in order, start and end.
1696
self.assertEqual(2, len(result))
1698
data_len = len(result[0][1])
1699
self.assertEqual(0, result[0][0])
1700
self.assertTrue(data_len >= 30)
1702
data_len = len(result[1][1])
1703
self.assertEqual(204800-data_len, result[1][0])
1704
self.assertTrue(data_len >= 100)
1705
check_result_data(result)
1706
# close ranges get combined (even if out of order)
1707
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1708
result = list(transport.readv('a', request_vector,
1709
adjust_for_latency=True, upper_limit=content_size))
1710
self.assertEqual(1, len(result))
1711
data_len = len(result[0][1])
1712
# minimum length is from 400 to 1034 - 634
1713
self.assertTrue(data_len >= 634)
1714
# must contain the region 400 to 1034
1715
self.assertTrue(result[0][0] <= 400)
1716
self.assertTrue(result[0][0] + data_len >= 1034)
1717
check_result_data(result)
1719
def test_readv_with_adjust_for_latency_with_big_file(self):
1720
transport = self.get_transport()
1721
# test from observed failure case.
1722
if transport.is_readonly():
1723
with file('a', 'w') as f: f.write('a'*1024*1024)
1725
transport.put_bytes('a', 'a'*1024*1024)
1726
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1727
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1728
(465373, 800), (947422, 800)]
1729
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1730
found_items = [False]*9
1731
for pos, (start, length) in enumerate(broken_vector):
1732
# check the range is covered by the result
1733
for offset, data in results:
1734
if offset <= start and start + length <= offset + len(data):
1735
found_items[pos] = True
1736
self.assertEqual([True]*9, found_items)
1738
def test_get_with_open_write_stream_sees_all_content(self):
1739
t = self.get_transport()
1742
handle = t.open_write_stream('foo')
1745
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1749
def test_get_smart_medium(self):
1750
"""All transports must either give a smart medium, or know they can't.
1752
transport = self.get_transport()
1754
client_medium = transport.get_smart_medium()
1755
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1756
except errors.NoSmartMedium:
1757
# as long as we got it we're fine
1760
def test_readv_short_read(self):
1761
transport = self.get_transport()
1762
if transport.is_readonly():
1763
with file('a', 'w') as f: f.write('0123456789')
1765
transport.put_bytes('a', '01234567890')
1767
# This is intentionally reading off the end of the file
1768
# since we are sure that it cannot get there
1769
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1770
# Can be raised by paramiko
1772
transport.readv, 'a', [(1,1), (8,10)])
1774
# This is trying to seek past the end of the file, it should
1775
# also raise a special error
1776
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1777
transport.readv, 'a', [(12,2)])
1779
def test_no_segment_parameters(self):
1780
"""Segment parameters should be stripped and stored in
1781
transport.segment_parameters."""
1782
transport = self.get_transport("foo")
1783
self.assertEquals({}, transport.get_segment_parameters())
1785
def test_segment_parameters(self):
1786
"""Segment parameters should be stripped and stored in
1787
transport.get_segment_parameters()."""
1788
base_url = self._server.get_url()
1789
parameters = {"key1": "val1", "key2": "val2"}
1790
url = urlutils.join_segment_parameters(base_url, parameters)
1791
transport = _mod_transport.get_transport_from_url(url)
1792
self.assertEquals(parameters, transport.get_segment_parameters())
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))