1
# Copyright (C) 2005-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
37
from bzrlib.errors import (ConnectionError,
47
from bzrlib.osutils import getcwd
48
from bzrlib.smart import medium
49
from bzrlib.tests import (
55
from bzrlib.tests import test_server
56
from bzrlib.tests.test_transport import TestTransportImplementation
57
from bzrlib.transport import (
60
_get_transport_modules,
62
from bzrlib.transport.memory import MemoryTransport
65
def get_transport_test_permutations(module):
66
"""Get the permutations module wants to have tested."""
67
if getattr(module, 'get_test_permutations', None) is None:
69
"transport module %s doesn't provide get_test_permutations()"
72
return module.get_test_permutations()
75
def transport_test_permutations():
76
"""Return a list of the klass, server_factory pairs to test."""
78
for module in _get_transport_modules():
80
permutations = get_transport_test_permutations(
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
82
for (klass, server_factory) in permutations:
83
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
{"transport_class":klass,
85
"transport_server":server_factory})
86
result.append(scenario)
87
except errors.DependencyNotPresent, e:
88
# Continue even if a dependency prevents us
89
# from adding this test
94
def load_tests(standard_tests, module, loader):
95
"""Multiply tests for tranport implementations."""
96
result = loader.suiteClass()
97
scenarios = transport_test_permutations()
98
return multiply_tests(standard_tests, scenarios, result)
101
class TransportTests(TestTransportImplementation):
104
super(TransportTests, self).setUp()
105
self._captureVar('BZR_NO_SMART_VFS', None)
107
def check_transport_contents(self, content, transport, relpath):
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
111
def test_ensure_base_missing(self):
112
""".ensure_base() should create the directory if it doesn't exist"""
113
t = self.get_transport()
115
if t_a.is_readonly():
116
self.assertRaises(TransportNotPossible,
119
self.assertTrue(t_a.ensure_base())
120
self.assertTrue(t.has('a'))
122
def test_ensure_base_exists(self):
123
""".ensure_base() should just be happy if it already exists"""
124
t = self.get_transport()
130
# ensure_base returns False if it didn't create the base
131
self.assertFalse(t_a.ensure_base())
133
def test_ensure_base_missing_parent(self):
134
""".ensure_base() will fail if the parent dir doesn't exist"""
135
t = self.get_transport()
141
self.assertRaises(NoSuchFile, t_b.ensure_base)
143
def test_external_url(self):
144
""".external_url either works or raises InProcessTransport."""
145
t = self.get_transport()
148
except errors.InProcessTransport:
152
t = self.get_transport()
154
files = ['a', 'b', 'e', 'g', '%']
155
self.build_tree(files, transport=t)
156
self.assertEqual(True, t.has('a'))
157
self.assertEqual(False, t.has('c'))
158
self.assertEqual(True, t.has(urlutils.escape('%')))
159
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
'e', 'f', 'g', 'h'])),
161
[True, True, False, False,
162
True, False, True, False])
163
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
urlutils.escape('%%')]))
166
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
'e', 'f', 'g', 'h']))),
168
[True, True, False, False,
169
True, False, True, False])
170
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
173
def test_has_root_works(self):
174
if self.transport_server is test_server.SmartTCPServer_for_testing:
175
raise TestNotApplicable(
176
"SmartTCPServer_for_testing intentionally does not allow "
178
current_transport = self.get_transport()
179
self.assertTrue(current_transport.has('/'))
180
root = current_transport.clone('/')
181
self.assertTrue(root.has(''))
184
t = self.get_transport()
186
files = ['a', 'b', 'e', 'g']
187
contents = ['contents of a\n',
192
self.build_tree(files, transport=t, line_endings='binary')
193
self.check_transport_contents('contents of a\n', t, 'a')
194
content_f = t.get_multi(files)
195
# Use itertools.izip() instead of use zip() or map(), since they fully
196
# evaluate their inputs, the transport requests should be issued and
197
# handled sequentially (we don't want to force transport to buffer).
198
for content, f in itertools.izip(contents, content_f):
199
self.assertEqual(content, f.read())
201
content_f = t.get_multi(iter(files))
202
# Use itertools.izip() for the same reason
203
for content, f in itertools.izip(contents, content_f):
204
self.assertEqual(content, f.read())
206
def test_get_unknown_file(self):
207
t = self.get_transport()
209
contents = ['contents of a\n',
212
self.build_tree(files, transport=t, line_endings='binary')
213
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
217
def test_get_directory_read_gives_ReadError(self):
218
"""consistent errors for read() on a file returned by get()."""
219
t = self.get_transport()
221
self.build_tree(['a directory/'])
223
t.mkdir('a%20directory')
224
# getting the file must either work or fail with a PathError
226
a_file = t.get('a%20directory')
227
except (errors.PathError, errors.RedirectRequested):
228
# early failure return immediately.
230
# having got a file, read() must either work (i.e. http reading a dir
231
# listing) or fail with ReadError
234
except errors.ReadError:
237
def test_get_bytes(self):
238
t = self.get_transport()
240
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
246
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
249
for content, fname in zip(contents, files):
250
self.assertEqual(content, t.get_bytes(fname))
252
def test_get_bytes_unknown_file(self):
253
t = self.get_transport()
255
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
257
def test_get_with_open_write_stream_sees_all_content(self):
258
t = self.get_transport()
261
handle = t.open_write_stream('foo')
264
self.assertEqual('b', t.get('foo').read())
268
def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
t = self.get_transport()
272
handle = t.open_write_stream('foo')
275
self.assertEqual('b', t.get_bytes('foo'))
276
self.assertEqual('b', t.get('foo').read())
280
def test_put_bytes(self):
281
t = self.get_transport()
284
self.assertRaises(TransportNotPossible,
285
t.put_bytes, 'a', 'some text for a\n')
288
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
290
self.check_transport_contents('some text for a\n', t, 'a')
292
# The contents should be overwritten
293
t.put_bytes('a', 'new text for a\n')
294
self.check_transport_contents('new text for a\n', t, 'a')
296
self.assertRaises(NoSuchFile,
297
t.put_bytes, 'path/doesnt/exist/c', 'contents')
299
def test_put_bytes_non_atomic(self):
300
t = self.get_transport()
303
self.assertRaises(TransportNotPossible,
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
307
self.failIf(t.has('a'))
308
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
310
self.check_transport_contents('some text for a\n', t, 'a')
311
# Put also replaces contents
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
315
# Make sure we can create another file
316
t.put_bytes_non_atomic('d', 'contents for\nd\n')
317
# And overwrite 'a' with empty contents
318
t.put_bytes_non_atomic('a', '')
319
self.check_transport_contents('contents for\nd\n', t, 'd')
320
self.check_transport_contents('', t, 'a')
322
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
324
# Now test the create_parent flag
325
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
create_parent_dir=True)
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
332
# But we still get NoSuchFile if we can't make the parent dir
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
335
create_parent_dir=True)
337
def test_put_bytes_permissions(self):
338
t = self.get_transport()
342
if not t._can_roundtrip_unix_modebits():
343
# Can't roundtrip, so no need to run this test
345
t.put_bytes('mode644', 'test text\n', mode=0644)
346
self.assertTransportMode(t, 'mode644', 0644)
347
t.put_bytes('mode666', 'test text\n', mode=0666)
348
self.assertTransportMode(t, 'mode666', 0666)
349
t.put_bytes('mode600', 'test text\n', mode=0600)
350
self.assertTransportMode(t, 'mode600', 0600)
351
# Yes, you can put_bytes a file such that it becomes readonly
352
t.put_bytes('mode400', 'test text\n', mode=0400)
353
self.assertTransportMode(t, 'mode400', 0400)
355
# The default permissions should be based on the current umask
356
umask = osutils.get_umask()
357
t.put_bytes('nomode', 'test text\n', mode=None)
358
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
360
def test_put_bytes_non_atomic_permissions(self):
361
t = self.get_transport()
365
if not t._can_roundtrip_unix_modebits():
366
# Can't roundtrip, so no need to run this test
368
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
self.assertTransportMode(t, 'mode644', 0644)
370
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
self.assertTransportMode(t, 'mode666', 0666)
372
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
self.assertTransportMode(t, 'mode600', 0600)
374
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
self.assertTransportMode(t, 'mode400', 0400)
377
# The default permissions should be based on the current umask
378
umask = osutils.get_umask()
379
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
382
# We should also be able to set the mode for a parent directory
384
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
dir_mode=0700, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir700', 0700)
387
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
dir_mode=0770, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir770', 0770)
390
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
dir_mode=0777, create_parent_dir=True)
392
self.assertTransportMode(t, 'dir777', 0777)
394
def test_put_file(self):
395
t = self.get_transport()
398
self.assertRaises(TransportNotPossible,
399
t.put_file, 'a', StringIO('some text for a\n'))
402
result = t.put_file('a', StringIO('some text for a\n'))
403
# put_file returns the length of the data written
404
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
406
self.check_transport_contents('some text for a\n', t, 'a')
407
# Put also replaces contents
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
409
self.assertEqual(19, result)
410
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
self.assertRaises(NoSuchFile,
412
t.put_file, 'path/doesnt/exist/c',
413
StringIO('contents'))
415
def test_put_file_non_atomic(self):
416
t = self.get_transport()
419
self.assertRaises(TransportNotPossible,
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
423
self.failIf(t.has('a'))
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
426
self.check_transport_contents('some text for a\n', t, 'a')
427
# Put also replaces contents
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
431
# Make sure we can create another file
432
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
433
# And overwrite 'a' with empty contents
434
t.put_file_non_atomic('a', StringIO(''))
435
self.check_transport_contents('contents for\nd\n', t, 'd')
436
self.check_transport_contents('', t, 'a')
438
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
StringIO('contents\n'))
440
# Now test the create_parent flag
441
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
create_parent_dir=True)
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
448
# But we still get NoSuchFile if we can't make the parent dir
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
StringIO('contents\n'),
451
create_parent_dir=True)
453
def test_put_file_permissions(self):
455
t = self.get_transport()
459
if not t._can_roundtrip_unix_modebits():
460
# Can't roundtrip, so no need to run this test
462
t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
self.assertTransportMode(t, 'mode644', 0644)
464
t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
self.assertTransportMode(t, 'mode666', 0666)
466
t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
self.assertTransportMode(t, 'mode600', 0600)
468
# Yes, you can put a file such that it becomes readonly
469
t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
self.assertTransportMode(t, 'mode400', 0400)
471
# The default permissions should be based on the current umask
472
umask = osutils.get_umask()
473
t.put_file('nomode', StringIO('test text\n'), mode=None)
474
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
476
def test_put_file_non_atomic_permissions(self):
477
t = self.get_transport()
481
if not t._can_roundtrip_unix_modebits():
482
# Can't roundtrip, so no need to run this test
484
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
self.assertTransportMode(t, 'mode644', 0644)
486
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
self.assertTransportMode(t, 'mode666', 0666)
488
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
self.assertTransportMode(t, 'mode600', 0600)
490
# Yes, you can put_file_non_atomic a file such that it becomes readonly
491
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
self.assertTransportMode(t, 'mode400', 0400)
494
# The default permissions should be based on the current umask
495
umask = osutils.get_umask()
496
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
499
# We should also be able to set the mode for a parent directory
502
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
dir_mode=0700, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir700', 0700)
505
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
dir_mode=0770, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir770', 0770)
508
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
dir_mode=0777, create_parent_dir=True)
510
self.assertTransportMode(t, 'dir777', 0777)
512
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
t = self.get_transport()
522
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
540
def test_mkdir(self):
541
t = self.get_transport()
544
# cannot mkdir on readonly transports. We're not testing for
545
# cache coherency because cache behaviour is not currently
546
# defined for the transport interface.
547
self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
554
self.assertEqual(t.has('dir_a'), True)
555
self.assertEqual(t.has('dir_b'), False)
558
self.assertEqual(t.has('dir_b'), True)
560
t.mkdir_multi(['dir_c', 'dir_d'])
562
t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
self.assertEqual(list(t.has_multi(
564
['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
[True, True, True, False,
567
True, True, True, True])
569
# we were testing that a local mkdir followed by a transport
570
# mkdir failed thusly, but given that we * in one process * do not
571
# concurrently fiddle with disk dirs and then use transport to do
572
# things, the win here seems marginal compared to the constraint on
573
# the interface. RBC 20051227
575
self.assertRaises(FileExists, t.mkdir, 'dir_g')
577
# Test get/put in sub-directories
578
t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
583
# mkdir of a dir with an absent parent
584
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
586
def test_mkdir_permissions(self):
587
t = self.get_transport()
590
if not t._can_roundtrip_unix_modebits():
591
# no sense testing on this transport
593
# Test mkdir with a mode
594
t.mkdir('dmode755', mode=0755)
595
self.assertTransportMode(t, 'dmode755', 0755)
596
t.mkdir('dmode555', mode=0555)
597
self.assertTransportMode(t, 'dmode555', 0555)
598
t.mkdir('dmode777', mode=0777)
599
self.assertTransportMode(t, 'dmode777', 0777)
600
t.mkdir('dmode700', mode=0700)
601
self.assertTransportMode(t, 'dmode700', 0700)
602
t.mkdir_multi(['mdmode755'], mode=0755)
603
self.assertTransportMode(t, 'mdmode755', 0755)
605
# Default mode should be based on umask
606
umask = osutils.get_umask()
607
t.mkdir('dnomode', mode=None)
608
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
610
def test_opening_a_file_stream_creates_file(self):
611
t = self.get_transport()
614
handle = t.open_write_stream('foo')
616
self.assertEqual('', t.get_bytes('foo'))
620
def test_opening_a_file_stream_can_set_mode(self):
621
t = self.get_transport()
624
if not t._can_roundtrip_unix_modebits():
625
# Can't roundtrip, so no need to run this test
627
def check_mode(name, mode, expected):
628
handle = t.open_write_stream(name, mode=mode)
630
self.assertTransportMode(t, name, expected)
631
check_mode('mode644', 0644, 0644)
632
check_mode('mode666', 0666, 0666)
633
check_mode('mode600', 0600, 0600)
634
# The default permissions should be based on the current umask
635
check_mode('nomode', None, 0666 & ~osutils.get_umask())
637
def test_copy_to(self):
638
# FIXME: test: same server to same server (partly done)
639
# same protocol two servers
640
# and different protocols (done for now except for MemoryTransport.
643
def simple_copy_files(transport_from, transport_to):
644
files = ['a', 'b', 'c', 'd']
645
self.build_tree(files, transport=transport_from)
646
self.assertEqual(4, transport_from.copy_to(files, transport_to))
648
self.check_transport_contents(transport_to.get(f).read(),
651
t = self.get_transport()
652
temp_transport = MemoryTransport('memory:///')
653
simple_copy_files(t, temp_transport)
654
if not t.is_readonly():
655
t.mkdir('copy_to_simple')
656
t2 = t.clone('copy_to_simple')
657
simple_copy_files(t, t2)
660
# Test that copying into a missing directory raises
663
self.build_tree(['e/', 'e/f'])
666
t.put_bytes('e/f', 'contents of e')
667
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
temp_transport.mkdir('e')
669
t.copy_to(['e/f'], temp_transport)
672
temp_transport = MemoryTransport('memory:///')
674
files = ['a', 'b', 'c', 'd']
675
t.copy_to(iter(files), temp_transport)
677
self.check_transport_contents(temp_transport.get(f).read(),
681
for mode in (0666, 0644, 0600, 0400):
682
temp_transport = MemoryTransport("memory:///")
683
t.copy_to(files, temp_transport, mode=mode)
685
self.assertTransportMode(temp_transport, f, mode)
687
def test_create_prefix(self):
688
t = self.get_transport()
689
sub = t.clone('foo').clone('bar')
692
except TransportNotPossible:
693
self.assertTrue(t.is_readonly())
695
self.assertTrue(t.has('foo/bar'))
697
def test_append_file(self):
698
t = self.get_transport()
701
self.assertRaises(TransportNotPossible,
702
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
704
t.put_bytes('a', 'diff\ncontents for\na\n')
705
t.put_bytes('b', 'contents\nfor b\n')
708
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
710
self.check_transport_contents(
711
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
714
# a file with no parent should fail..
715
self.assertRaises(NoSuchFile,
716
t.append_file, 'missing/path', StringIO('content'))
718
# And we can create new files, too
720
t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
self.check_transport_contents('some text\nfor a missing file\n',
724
def test_append_bytes(self):
725
t = self.get_transport()
728
self.assertRaises(TransportNotPossible,
729
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
732
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
736
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
738
self.check_transport_contents(
739
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
742
# a file with no parent should fail..
743
self.assertRaises(NoSuchFile,
744
t.append_bytes, 'missing/path', 'content')
746
def test_append_multi(self):
747
t = self.get_transport()
751
t.put_bytes('a', 'diff\ncontents for\na\n'
752
'add\nsome\nmore\ncontents\n')
753
t.put_bytes('b', 'contents\nfor b\n')
755
self.assertEqual((43, 15),
756
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
('b', StringIO('some\nmore\nfor\nb\n'))]))
759
self.check_transport_contents(
760
'diff\ncontents for\na\n'
761
'add\nsome\nmore\ncontents\n'
762
'and\nthen\nsome\nmore\n',
764
self.check_transport_contents(
766
'some\nmore\nfor\nb\n',
769
self.assertEqual((62, 31),
770
t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
('b', StringIO('from an iterator\n'))])))
772
self.check_transport_contents(
773
'diff\ncontents for\na\n'
774
'add\nsome\nmore\ncontents\n'
775
'and\nthen\nsome\nmore\n'
776
'a little bit more\n',
778
self.check_transport_contents(
780
'some\nmore\nfor\nb\n'
781
'from an iterator\n',
784
self.assertEqual((80, 0),
785
t.append_multi([('a', StringIO('some text in a\n')),
786
('d', StringIO('missing file r\n'))]))
788
self.check_transport_contents(
789
'diff\ncontents for\na\n'
790
'add\nsome\nmore\ncontents\n'
791
'and\nthen\nsome\nmore\n'
792
'a little bit more\n'
795
self.check_transport_contents('missing file r\n', t, 'd')
797
def test_append_file_mode(self):
798
"""Check that append accepts a mode parameter"""
799
# check append accepts a mode
800
t = self.get_transport()
802
self.assertRaises(TransportNotPossible,
803
t.append_file, 'f', StringIO('f'), mode=None)
805
t.append_file('f', StringIO('f'), mode=None)
807
def test_append_bytes_mode(self):
808
# check append_bytes accepts a mode
809
t = self.get_transport()
811
self.assertRaises(TransportNotPossible,
812
t.append_bytes, 'f', 'f', mode=None)
814
t.append_bytes('f', 'f', mode=None)
816
def test_delete(self):
817
# TODO: Test Transport.delete
818
t = self.get_transport()
820
# Not much to do with a readonly transport
822
self.assertRaises(TransportNotPossible, t.delete, 'missing')
825
t.put_bytes('a', 'a little bit of text\n')
826
self.failUnless(t.has('a'))
828
self.failIf(t.has('a'))
830
self.assertRaises(NoSuchFile, t.delete, 'a')
832
t.put_bytes('a', 'a text\n')
833
t.put_bytes('b', 'b text\n')
834
t.put_bytes('c', 'c text\n')
835
self.assertEqual([True, True, True],
836
list(t.has_multi(['a', 'b', 'c'])))
837
t.delete_multi(['a', 'c'])
838
self.assertEqual([False, True, False],
839
list(t.has_multi(['a', 'b', 'c'])))
840
self.failIf(t.has('a'))
841
self.failUnless(t.has('b'))
842
self.failIf(t.has('c'))
844
self.assertRaises(NoSuchFile,
845
t.delete_multi, ['a', 'b', 'c'])
847
self.assertRaises(NoSuchFile,
848
t.delete_multi, iter(['a', 'b', 'c']))
850
t.put_bytes('a', 'another a text\n')
851
t.put_bytes('c', 'another c text\n')
852
t.delete_multi(iter(['a', 'b', 'c']))
854
# We should have deleted everything
855
# SftpServer creates control files in the
856
# working directory, so we can just do a
858
# self.assertEqual([], os.listdir('.'))
860
def test_recommended_page_size(self):
861
"""Transports recommend a page size for partial access to files."""
862
t = self.get_transport()
863
self.assertIsInstance(t.recommended_page_size(), int)
865
def test_rmdir(self):
866
t = self.get_transport()
867
# Not much to do with a readonly transport
869
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
874
# ftp may not be able to raise NoSuchFile for lack of
875
# details when failing
876
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
878
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
880
def test_rmdir_not_empty(self):
881
"""Deleting a non-empty directory raises an exception
883
sftp (and possibly others) don't give us a specific "directory not
884
empty" exception -- we can just see that the operation failed.
886
t = self.get_transport()
891
self.assertRaises(PathError, t.rmdir, 'adir')
893
def test_rmdir_empty_but_similar_prefix(self):
894
"""rmdir does not get confused by sibling paths.
896
A naive implementation of MemoryTransport would refuse to rmdir
897
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
898
uses "path.startswith(dir)" on all file paths to determine if directory
901
t = self.get_transport()
905
t.put_bytes('foo-bar', '')
908
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
self.failUnless(t.has('foo-bar'))
911
def test_rename_dir_succeeds(self):
912
t = self.get_transport()
914
raise TestSkipped("transport is readonly")
916
t.mkdir('adir/asubdir')
917
t.rename('adir', 'bdir')
918
self.assertTrue(t.has('bdir/asubdir'))
919
self.assertFalse(t.has('adir'))
921
def test_rename_dir_nonempty(self):
922
"""Attempting to replace a nonemtpy directory should fail"""
923
t = self.get_transport()
925
raise TestSkipped("transport is readonly")
927
t.mkdir('adir/asubdir')
929
t.mkdir('bdir/bsubdir')
930
# any kind of PathError would be OK, though we normally expect
932
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
933
# nothing was changed so it should still be as before
934
self.assertTrue(t.has('bdir/bsubdir'))
935
self.assertFalse(t.has('adir/bdir'))
936
self.assertFalse(t.has('adir/bsubdir'))
938
def test_rename_across_subdirs(self):
939
t = self.get_transport()
941
raise TestNotApplicable("transport is readonly")
946
ta.put_bytes('f', 'aoeu')
947
ta.rename('f', '../b/f')
948
self.assertTrue(tb.has('f'))
949
self.assertFalse(ta.has('f'))
950
self.assertTrue(t.has('b/f'))
952
def test_delete_tree(self):
953
t = self.get_transport()
955
# Not much to do with a readonly transport
957
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
960
# and does it like listing ?
963
t.delete_tree('adir')
964
except TransportNotPossible:
965
# ok, this transport does not support delete_tree
968
# did it delete that trivial case?
969
self.assertRaises(NoSuchFile, t.stat, 'adir')
971
self.build_tree(['adir/',
979
t.delete_tree('adir')
980
# adir should be gone now.
981
self.assertRaises(NoSuchFile, t.stat, 'adir')
984
t = self.get_transport()
989
# TODO: I would like to use os.listdir() to
990
# make sure there are no extra files, but SftpServer
991
# creates control files in the working directory
992
# perhaps all of this could be done in a subdirectory
994
t.put_bytes('a', 'a first file\n')
995
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
998
self.failUnless(t.has('b'))
999
self.failIf(t.has('a'))
1001
self.check_transport_contents('a first file\n', t, 'b')
1002
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1005
t.put_bytes('c', 'c this file\n')
1007
self.failIf(t.has('c'))
1008
self.check_transport_contents('c this file\n', t, 'b')
1010
# TODO: Try to write a test for atomicity
1011
# TODO: Test moving into a non-existent subdirectory
1012
# TODO: Test Transport.move_multi
1014
def test_copy(self):
1015
t = self.get_transport()
1020
t.put_bytes('a', 'a file\n')
1022
self.check_transport_contents('a file\n', t, 'b')
1024
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1026
# What should the assert be if you try to copy a
1027
# file over a directory?
1028
#self.assertRaises(Something, t.copy, 'a', 'c')
1029
t.put_bytes('d', 'text in d\n')
1031
self.check_transport_contents('text in d\n', t, 'b')
1033
# TODO: test copy_multi
1035
def test_connection_error(self):
1036
"""ConnectionError is raised when connection is impossible.
1038
The error should be raised from the first operation on the transport.
1041
url = self._server.get_bogus_url()
1042
except NotImplementedError:
1043
raise TestSkipped("Transport %s has no bogus URL support." %
1044
self._server.__class__)
1045
t = get_transport(url)
1046
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1048
def test_stat(self):
1049
# TODO: Test stat, just try once, and if it throws, stop testing
1050
from stat import S_ISDIR, S_ISREG
1052
t = self.get_transport()
1056
except TransportNotPossible, e:
1057
# This transport cannot stat
1060
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1061
sizes = [14, 0, 16, 0, 18]
1062
self.build_tree(paths, transport=t, line_endings='binary')
1064
for path, size in zip(paths, sizes):
1066
if path.endswith('/'):
1067
self.failUnless(S_ISDIR(st.st_mode))
1068
# directory sizes are meaningless
1070
self.failUnless(S_ISREG(st.st_mode))
1071
self.assertEqual(size, st.st_size)
1073
remote_stats = list(t.stat_multi(paths))
1074
remote_iter_stats = list(t.stat_multi(iter(paths)))
1076
self.assertRaises(NoSuchFile, t.stat, 'q')
1077
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1079
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
subdir = t.clone('subdir')
1083
subdir.stat('./file')
1086
def test_hardlink(self):
1087
from stat import ST_NLINK
1089
t = self.get_transport()
1091
source_name = "original_target"
1092
link_name = "target_link"
1094
self.build_tree([source_name], transport=t)
1097
t.hardlink(source_name, link_name)
1099
self.failUnless(t.has(source_name))
1100
self.failUnless(t.has(link_name))
1102
st = t.stat(link_name)
1103
self.failUnlessEqual(st[ST_NLINK], 2)
1104
except TransportNotPossible:
1105
raise TestSkipped("Transport %s does not support hardlinks." %
1106
self._server.__class__)
1108
def test_symlink(self):
1109
from stat import S_ISLNK
1111
t = self.get_transport()
1113
source_name = "original_target"
1114
link_name = "target_link"
1116
self.build_tree([source_name], transport=t)
1119
t.symlink(source_name, link_name)
1121
self.failUnless(t.has(source_name))
1122
self.failUnless(t.has(link_name))
1124
st = t.stat(link_name)
1125
self.failUnless(S_ISLNK(st.st_mode))
1126
except TransportNotPossible:
1127
raise TestSkipped("Transport %s does not support symlinks." %
1128
self._server.__class__)
1130
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1132
def test_list_dir(self):
1133
# TODO: Test list_dir, just try once, and if it throws, stop testing
1134
t = self.get_transport()
1136
if not t.listable():
1137
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1140
def sorted_list(d, transport):
1141
l = list(transport.list_dir(d))
1145
self.assertEqual([], sorted_list('.', t))
1146
# c2 is precisely one letter longer than c here to test that
1147
# suffixing is not confused.
1148
# a%25b checks that quoting is done consistently across transports
1149
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1151
if not t.is_readonly():
1152
self.build_tree(tree_names, transport=t)
1154
self.build_tree(tree_names)
1157
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1159
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1160
self.assertEqual(['d', 'e'], sorted_list('c', t))
1162
# Cloning the transport produces an equivalent listing
1163
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1165
if not t.is_readonly():
1172
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1173
self.assertEqual(['e'], sorted_list('c', t))
1175
self.assertListRaises(PathError, t.list_dir, 'q')
1176
self.assertListRaises(PathError, t.list_dir, 'c/f')
1177
# 'a' is a file, list_dir should raise an error
1178
self.assertListRaises(PathError, t.list_dir, 'a')
1180
def test_list_dir_result_is_url_escaped(self):
1181
t = self.get_transport()
1182
if not t.listable():
1183
raise TestSkipped("transport not listable")
1185
if not t.is_readonly():
1186
self.build_tree(['a/', 'a/%'], transport=t)
1188
self.build_tree(['a/', 'a/%'])
1190
names = list(t.list_dir('a'))
1191
self.assertEqual(['%25'], names)
1192
self.assertIsInstance(names[0], str)
1194
def test_clone_preserve_info(self):
1195
t1 = self.get_transport()
1196
if not isinstance(t1, ConnectedTransport):
1197
raise TestSkipped("not a connected transport")
1199
t2 = t1.clone('subdir')
1200
self.assertEquals(t1._scheme, t2._scheme)
1201
self.assertEquals(t1._user, t2._user)
1202
self.assertEquals(t1._password, t2._password)
1203
self.assertEquals(t1._host, t2._host)
1204
self.assertEquals(t1._port, t2._port)
1206
def test__reuse_for(self):
1207
t = self.get_transport()
1208
if not isinstance(t, ConnectedTransport):
1209
raise TestSkipped("not a connected transport")
1211
def new_url(scheme=None, user=None, password=None,
1212
host=None, port=None, path=None):
1213
"""Build a new url from t.base changing only parts of it.
1215
Only the parameters different from None will be changed.
1217
if scheme is None: scheme = t._scheme
1218
if user is None: user = t._user
1219
if password is None: password = t._password
1220
if user is None: user = t._user
1221
if host is None: host = t._host
1222
if port is None: port = t._port
1223
if path is None: path = t._path
1224
return t._unsplit_url(scheme, user, password, host, port, path)
1226
if t._scheme == 'ftp':
1230
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1235
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1236
# passwords are not taken into account because:
1237
# - it makes no sense to have two different valid passwords for the
1239
# - _password in ConnectedTransport is intended to collect what the
1240
# user specified from the command-line and there are cases where the
1241
# new url can contain no password (if the url was built from an
1242
# existing transport.base for example)
1243
# - password are considered part of the credentials provided at
1244
# connection creation time and as such may not be present in the url
1245
# (they may be typed by the user when prompted for example)
1246
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
# We will not connect, we can use a invalid host
1248
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1253
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1254
# No point in trying to reuse a transport for a local URL
1255
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1257
def test_connection_sharing(self):
1258
t = self.get_transport()
1259
if not isinstance(t, ConnectedTransport):
1260
raise TestSkipped("not a connected transport")
1262
c = t.clone('subdir')
1263
# Some transports will create the connection only when needed
1264
t.has('surely_not') # Force connection
1265
self.assertIs(t._get_connection(), c._get_connection())
1267
# Temporary failure, we need to create a new dummy connection
1268
new_connection = object()
1269
t._set_connection(new_connection)
1270
# Check that both transports use the same connection
1271
self.assertIs(new_connection, t._get_connection())
1272
self.assertIs(new_connection, c._get_connection())
1274
def test_reuse_connection_for_various_paths(self):
1275
t = self.get_transport()
1276
if not isinstance(t, ConnectedTransport):
1277
raise TestSkipped("not a connected transport")
1279
t.has('surely_not') # Force connection
1280
self.assertIsNot(None, t._get_connection())
1282
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1283
self.assertIsNot(t, subdir)
1284
self.assertIs(t._get_connection(), subdir._get_connection())
1286
home = subdir._reuse_for(t.base + 'home')
1287
self.assertIs(t._get_connection(), home._get_connection())
1288
self.assertIs(subdir._get_connection(), home._get_connection())
1290
def test_clone(self):
1291
# TODO: Test that clone moves up and down the filesystem
1292
t1 = self.get_transport()
1294
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.failUnless(t1.has('a'))
1297
self.failUnless(t1.has('b/c'))
1298
self.failIf(t1.has('c'))
1301
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.failUnless(t2.has('c'))
1304
self.failIf(t2.has('a'))
1307
self.failUnless(t3.has('a'))
1308
self.failIf(t3.has('c'))
1310
self.failIf(t1.has('b/d'))
1311
self.failIf(t2.has('d'))
1312
self.failIf(t3.has('b/d'))
1314
if t1.is_readonly():
1315
self.build_tree_contents([('b/d', 'newfile\n')])
1317
t2.put_bytes('d', 'newfile\n')
1319
self.failUnless(t1.has('b/d'))
1320
self.failUnless(t2.has('d'))
1321
self.failUnless(t3.has('b/d'))
1323
def test_clone_to_root(self):
1324
orig_transport = self.get_transport()
1325
# Repeatedly go up to a parent directory until we're at the root
1326
# directory of this transport
1327
root_transport = orig_transport
1328
new_transport = root_transport.clone("..")
1329
# as we are walking up directories, the path must be
1330
# growing less, except at the top
1331
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
or new_transport.base == root_transport.base)
1333
while new_transport.base != root_transport.base:
1334
root_transport = new_transport
1335
new_transport = root_transport.clone("..")
1336
# as we are walking up directories, the path must be
1337
# growing less, except at the top
1338
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
or new_transport.base == root_transport.base)
1341
# Cloning to "/" should take us to exactly the same location.
1342
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1343
# the abspath of "/" from the original transport should be the same
1344
# as the base at the root:
1345
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1347
# At the root, the URL must still end with / as its a directory
1348
self.assertEqual(root_transport.base[-1], '/')
1350
def test_clone_from_root(self):
1351
"""At the root, cloning to a simple dir should just do string append."""
1352
orig_transport = self.get_transport()
1353
root_transport = orig_transport.clone('/')
1354
self.assertEqual(root_transport.base + '.bzr/',
1355
root_transport.clone('.bzr').base)
1357
def test_base_url(self):
1358
t = self.get_transport()
1359
self.assertEqual('/', t.base[-1])
1361
def test_relpath(self):
1362
t = self.get_transport()
1363
self.assertEqual('', t.relpath(t.base))
1365
self.assertEqual('', t.relpath(t.base[:-1]))
1366
# subdirs which don't exist should still give relpaths.
1367
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1368
# trailing slash should be the same.
1369
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1371
def test_relpath_at_root(self):
1372
t = self.get_transport()
1373
# clone all the way to the top
1374
new_transport = t.clone('..')
1375
while new_transport.base != t.base:
1377
new_transport = t.clone('..')
1378
# we must be able to get a relpath below the root
1379
self.assertEqual('', t.relpath(t.base))
1380
# and a deeper one should work too
1381
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1383
def test_abspath(self):
1384
# smoke test for abspath. Corner cases for backends like unix fs's
1385
# that have aliasing problems like symlinks should go in backend
1386
# specific test cases.
1387
transport = self.get_transport()
1389
self.assertEqual(transport.base + 'relpath',
1390
transport.abspath('relpath'))
1392
# This should work without raising an error.
1393
transport.abspath("/")
1395
# the abspath of "/" and "/foo/.." should result in the same location
1396
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1398
self.assertEqual(transport.clone("/").abspath('foo'),
1399
transport.abspath("/foo"))
1401
def test_win32_abspath(self):
1402
# Note: we tried to set sys.platform='win32' so we could test on
1403
# other platforms too, but then osutils does platform specific
1404
# things at import time which defeated us...
1405
if sys.platform != 'win32':
1407
'Testing drive letters in abspath implemented only for win32')
1409
# smoke test for abspath on win32.
1410
# a transport based on 'file:///' never fully qualifies the drive.
1411
transport = get_transport("file:///")
1412
self.failUnlessEqual(transport.abspath("/"), "file:///")
1414
# but a transport that starts with a drive spec must keep it.
1415
transport = get_transport("file:///C:/")
1416
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1418
def test_local_abspath(self):
1419
transport = self.get_transport()
1421
p = transport.local_abspath('.')
1422
except (errors.NotLocalUrl, TransportNotPossible), e:
1423
# should be formattable
1426
self.assertEqual(getcwd(), p)
1428
def test_abspath_at_root(self):
1429
t = self.get_transport()
1430
# clone all the way to the top
1431
new_transport = t.clone('..')
1432
while new_transport.base != t.base:
1434
new_transport = t.clone('..')
1435
# we must be able to get a abspath of the root when we ask for
1436
# t.abspath('..') - this due to our choice that clone('..')
1437
# should return the root from the root, combined with the desire that
1438
# the url from clone('..') and from abspath('..') should be the same.
1439
self.assertEqual(t.base, t.abspath('..'))
1440
# '' should give us the root
1441
self.assertEqual(t.base, t.abspath(''))
1442
# and a path should append to the url
1443
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1445
def test_iter_files_recursive(self):
1446
transport = self.get_transport()
1447
if not transport.listable():
1448
self.assertRaises(TransportNotPossible,
1449
transport.iter_files_recursive)
1451
self.build_tree(['isolated/',
1455
'isolated/dir/b%25z', # make sure quoting is correct
1457
transport=transport)
1458
paths = set(transport.iter_files_recursive())
1459
# nb the directories are not converted
1460
self.assertEqual(paths,
1461
set(['isolated/dir/foo',
1463
'isolated/dir/b%2525z',
1465
sub_transport = transport.clone('isolated')
1466
paths = set(sub_transport.iter_files_recursive())
1467
self.assertEqual(paths,
1468
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1470
def test_copy_tree(self):
1471
# TODO: test file contents and permissions are preserved. This test was
1472
# added just to ensure that quoting was handled correctly.
1473
# -- David Allouche 2006-08-11
1474
transport = self.get_transport()
1475
if not transport.listable():
1476
self.assertRaises(TransportNotPossible,
1477
transport.iter_files_recursive)
1479
if transport.is_readonly():
1481
self.build_tree(['from/',
1485
'from/dir/b%25z', # make sure quoting is correct
1487
transport=transport)
1488
transport.copy_tree('from', 'to')
1489
paths = set(transport.iter_files_recursive())
1490
self.assertEqual(paths,
1491
set(['from/dir/foo',
1500
def test_copy_tree_to_transport(self):
1501
transport = self.get_transport()
1502
if not transport.listable():
1503
self.assertRaises(TransportNotPossible,
1504
transport.iter_files_recursive)
1506
if transport.is_readonly():
1508
self.build_tree(['from/',
1512
'from/dir/b%25z', # make sure quoting is correct
1514
transport=transport)
1515
from_transport = transport.clone('from')
1516
to_transport = transport.clone('to')
1517
to_transport.ensure_base()
1518
from_transport.copy_tree_to_transport(to_transport)
1519
paths = set(transport.iter_files_recursive())
1520
self.assertEqual(paths,
1521
set(['from/dir/foo',
1530
def test_unicode_paths(self):
1531
"""Test that we can read/write files with Unicode names."""
1532
t = self.get_transport()
1534
# With FAT32 and certain encodings on win32
1535
# '\xe5' and '\xe4' actually map to the same file
1536
# adding a suffix kicks in the 'preserving but insensitive'
1537
# route, and maintains the right files
1538
files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
u'\xe4.2', # a w/ dots iso-8859-1
1540
u'\u017d', # Z with umlat iso-8859-2
1541
u'\u062c', # Arabic j
1542
u'\u0410', # Russian A
1543
u'\u65e5', # Kanji person
1546
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
if no_unicode_support:
1548
raise tests.KnownFailure("test server cannot handle unicode paths")
1551
self.build_tree(files, transport=t, line_endings='binary')
1552
except UnicodeError:
1553
raise TestSkipped("cannot handle unicode paths in current encoding")
1555
# A plain unicode string is not a valid url
1557
self.assertRaises(InvalidURL, t.get, fname)
1560
fname_utf8 = fname.encode('utf-8')
1561
contents = 'contents of %s\n' % (fname_utf8,)
1562
self.check_transport_contents(contents, t, urlutils.escape(fname))
1564
def test_connect_twice_is_same_content(self):
1565
# check that our server (whatever it is) is accessible reliably
1566
# via get_transport and multiple connections share content.
1567
transport = self.get_transport()
1568
if transport.is_readonly():
1570
transport.put_bytes('foo', 'bar')
1571
transport3 = self.get_transport()
1572
self.check_transport_contents('bar', transport3, 'foo')
1574
# now opening at a relative url should give use a sane result:
1575
transport.mkdir('newdir')
1576
transport5 = self.get_transport('newdir')
1577
transport6 = transport5.clone('..')
1578
self.check_transport_contents('bar', transport6, 'foo')
1580
def test_lock_write(self):
1581
"""Test transport-level write locks.
1583
These are deprecated and transports may decline to support them.
1585
transport = self.get_transport()
1586
if transport.is_readonly():
1587
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1589
transport.put_bytes('lock', '')
1591
lock = transport.lock_write('lock')
1592
except TransportNotPossible:
1594
# TODO make this consistent on all platforms:
1595
# self.assertRaises(LockError, transport.lock_write, 'lock')
1598
def test_lock_read(self):
1599
"""Test transport-level read locks.
1601
These are deprecated and transports may decline to support them.
1603
transport = self.get_transport()
1604
if transport.is_readonly():
1605
file('lock', 'w').close()
1607
transport.put_bytes('lock', '')
1609
lock = transport.lock_read('lock')
1610
except TransportNotPossible:
1612
# TODO make this consistent on all platforms:
1613
# self.assertRaises(LockError, transport.lock_read, 'lock')
1616
def test_readv(self):
1617
transport = self.get_transport()
1618
if transport.is_readonly():
1619
file('a', 'w').write('0123456789')
1621
transport.put_bytes('a', '0123456789')
1623
d = list(transport.readv('a', ((0, 1),)))
1624
self.assertEqual(d[0], (0, '0'))
1626
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
self.assertEqual(d[0], (0, '0'))
1628
self.assertEqual(d[1], (1, '1'))
1629
self.assertEqual(d[2], (3, '34'))
1630
self.assertEqual(d[3], (9, '9'))
1632
def test_readv_out_of_order(self):
1633
transport = self.get_transport()
1634
if transport.is_readonly():
1635
file('a', 'w').write('0123456789')
1637
transport.put_bytes('a', '01234567890')
1639
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
self.assertEqual(d[0], (1, '1'))
1641
self.assertEqual(d[1], (9, '9'))
1642
self.assertEqual(d[2], (0, '0'))
1643
self.assertEqual(d[3], (3, '34'))
1645
def test_readv_with_adjust_for_latency(self):
1646
transport = self.get_transport()
1647
# the adjust for latency flag expands the data region returned
1648
# according to a per-transport heuristic, so testing is a little
1649
# tricky as we need more data than the largest combining that our
1650
# transports do. To accomodate this we generate random data and cross
1651
# reference the returned data with the random data. To avoid doing
1652
# multiple large random byte look ups we do several tests on the same
1654
content = osutils.rand_bytes(200*1024)
1655
content_size = len(content)
1656
if transport.is_readonly():
1657
self.build_tree_contents([('a', content)])
1659
transport.put_bytes('a', content)
1660
def check_result_data(result_vector):
1661
for item in result_vector:
1662
data_len = len(item[1])
1663
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1666
result = list(transport.readv('a', ((0, 30),),
1667
adjust_for_latency=True, upper_limit=content_size))
1668
# we expect 1 result, from 0, to something > 30
1669
self.assertEqual(1, len(result))
1670
self.assertEqual(0, result[0][0])
1671
self.assertTrue(len(result[0][1]) >= 30)
1672
check_result_data(result)
1673
# end of file corner case
1674
result = list(transport.readv('a', ((204700, 100),),
1675
adjust_for_latency=True, upper_limit=content_size))
1676
# we expect 1 result, from 204800- its length, to the end
1677
self.assertEqual(1, len(result))
1678
data_len = len(result[0][1])
1679
self.assertEqual(204800-data_len, result[0][0])
1680
self.assertTrue(data_len >= 100)
1681
check_result_data(result)
1682
# out of order ranges are made in order
1683
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1684
adjust_for_latency=True, upper_limit=content_size))
1685
# we expect 2 results, in order, start and end.
1686
self.assertEqual(2, len(result))
1688
data_len = len(result[0][1])
1689
self.assertEqual(0, result[0][0])
1690
self.assertTrue(data_len >= 30)
1692
data_len = len(result[1][1])
1693
self.assertEqual(204800-data_len, result[1][0])
1694
self.assertTrue(data_len >= 100)
1695
check_result_data(result)
1696
# close ranges get combined (even if out of order)
1697
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1698
result = list(transport.readv('a', request_vector,
1699
adjust_for_latency=True, upper_limit=content_size))
1700
self.assertEqual(1, len(result))
1701
data_len = len(result[0][1])
1702
# minimum length is from 400 to 1034 - 634
1703
self.assertTrue(data_len >= 634)
1704
# must contain the region 400 to 1034
1705
self.assertTrue(result[0][0] <= 400)
1706
self.assertTrue(result[0][0] + data_len >= 1034)
1707
check_result_data(result)
1709
def test_readv_with_adjust_for_latency_with_big_file(self):
1710
transport = self.get_transport()
1711
# test from observed failure case.
1712
if transport.is_readonly():
1713
file('a', 'w').write('a'*1024*1024)
1715
transport.put_bytes('a', 'a'*1024*1024)
1716
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
(465373, 800), (947422, 800)]
1719
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
found_items = [False]*9
1721
for pos, (start, length) in enumerate(broken_vector):
1722
# check the range is covered by the result
1723
for offset, data in results:
1724
if offset <= start and start + length <= offset + len(data):
1725
found_items[pos] = True
1726
self.assertEqual([True]*9, found_items)
1728
def test_get_with_open_write_stream_sees_all_content(self):
1729
t = self.get_transport()
1732
handle = t.open_write_stream('foo')
1735
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1739
def test_get_smart_medium(self):
1740
"""All transports must either give a smart medium, or know they can't.
1742
transport = self.get_transport()
1744
client_medium = transport.get_smart_medium()
1745
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1746
except errors.NoSmartMedium:
1747
# as long as we got it we're fine
1750
def test_readv_short_read(self):
1751
transport = self.get_transport()
1752
if transport.is_readonly():
1753
file('a', 'w').write('0123456789')
1755
transport.put_bytes('a', '01234567890')
1757
# This is intentionally reading off the end of the file
1758
# since we are sure that it cannot get there
1759
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
# Can be raised by paramiko
1762
transport.readv, 'a', [(1,1), (8,10)])
1764
# This is trying to seek past the end of the file, it should
1765
# also raise a special error
1766
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
transport.readv, 'a', [(12,2)])