1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
105
def check_transport_contents(self, content, transport, relpath):
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
150
t = self.get_transport()
152
files = ['a', 'b', 'e', 'g', '%']
153
self.build_tree(files, transport=t)
154
self.assertEqual(True, t.has('a'))
155
self.assertEqual(False, t.has('c'))
156
self.assertEqual(True, t.has(urlutils.escape('%')))
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
176
current_transport = self.get_transport()
177
self.assertTrue(current_transport.has('/'))
178
root = current_transport.clone('/')
179
self.assertTrue(root.has(''))
182
t = self.get_transport()
184
files = ['a', 'b', 'e', 'g']
185
contents = ['contents of a\n',
190
self.build_tree(files, transport=t, line_endings='binary')
191
self.check_transport_contents('contents of a\n', t, 'a')
192
content_f = t.get_multi(files)
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
197
self.assertEqual(content, f.read())
199
content_f = t.get_multi(iter(files))
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
211
self.assertRaises(NoSuchFile, t.get, 'c')
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
244
def test_get_bytes(self):
245
t = self.get_transport()
247
files = ['a', 'b', 'e', 'g']
248
contents = ['contents of a\n',
253
self.build_tree(files, transport=t, line_endings='binary')
254
self.check_transport_contents('contents of a\n', t, 'a')
256
for content, fname in zip(contents, files):
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
290
def test_put_bytes(self):
291
t = self.get_transport()
294
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
299
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
302
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
306
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
309
def test_put_bytes_non_atomic(self):
310
t = self.get_transport()
313
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
317
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
319
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
321
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
325
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
# Now test the create_parent flag
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
# But we still get NoSuchFile if we can't make the parent dir
343
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
347
def test_put_bytes_permissions(self):
348
t = self.get_transport()
352
if not t._can_roundtrip_unix_modebits():
353
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0644)
356
self.assertTransportMode(t, 'mode644', 0644)
357
t.put_bytes('mode666', 'test text\n', mode=0666)
358
self.assertTransportMode(t, 'mode666', 0666)
359
t.put_bytes('mode600', 'test text\n', mode=0600)
360
self.assertTransportMode(t, 'mode600', 0600)
361
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0400)
363
self.assertTransportMode(t, 'mode400', 0400)
365
# The default permissions should be based on the current umask
366
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
368
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
370
def test_put_bytes_non_atomic_permissions(self):
371
t = self.get_transport()
375
if not t._can_roundtrip_unix_modebits():
376
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
self.assertTransportMode(t, 'mode644', 0644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
self.assertTransportMode(t, 'mode666', 0666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
self.assertTransportMode(t, 'mode600', 0600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
self.assertTransportMode(t, 'mode400', 0400)
387
# The default permissions should be based on the current umask
388
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
392
# We should also be able to set the mode for a parent directory
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
dir_mode=0700, create_parent_dir=True)
396
self.assertTransportMode(t, 'dir700', 0700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
dir_mode=0770, create_parent_dir=True)
399
self.assertTransportMode(t, 'dir770', 0770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
dir_mode=0777, create_parent_dir=True)
402
self.assertTransportMode(t, 'dir777', 0777)
404
def test_put_file(self):
405
t = self.get_transport()
408
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', StringIO('some text for a\n'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
417
# Put also replaces contents
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
self.assertRaises(NoSuchFile,
422
t.put_file, 'path/doesnt/exist/c',
423
StringIO('contents'))
425
def test_put_file_non_atomic(self):
426
t = self.get_transport()
429
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
433
self.assertFalse(t.has('a'))
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
437
# Put also replaces contents
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
441
# Make sure we can create another file
442
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
# And overwrite 'a' with empty contents
444
t.put_file_non_atomic('a', StringIO(''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
StringIO('contents\n'))
450
# Now test the create_parent flag
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
StringIO('contents\n'))
453
self.assertFalse(t.has('dir/a'))
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
458
# But we still get NoSuchFile if we can't make the parent dir
459
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
StringIO('contents\n'),
461
create_parent_dir=True)
463
def test_put_file_permissions(self):
465
t = self.get_transport()
469
if not t._can_roundtrip_unix_modebits():
470
# Can't roundtrip, so no need to run this test
472
t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
self.assertTransportMode(t, 'mode644', 0644)
474
t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
self.assertTransportMode(t, 'mode666', 0666)
476
t.put_file('mode600', StringIO('test text\n'), mode=0600)
477
self.assertTransportMode(t, 'mode600', 0600)
478
# Yes, you can put a file such that it becomes readonly
479
t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
self.assertTransportMode(t, 'mode400', 0400)
481
# The default permissions should be based on the current umask
482
umask = osutils.get_umask()
483
t.put_file('nomode', StringIO('test text\n'), mode=None)
484
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
486
def test_put_file_non_atomic_permissions(self):
487
t = self.get_transport()
491
if not t._can_roundtrip_unix_modebits():
492
# Can't roundtrip, so no need to run this test
494
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
self.assertTransportMode(t, 'mode644', 0644)
496
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
self.assertTransportMode(t, 'mode666', 0666)
498
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
self.assertTransportMode(t, 'mode600', 0600)
500
# Yes, you can put_file_non_atomic a file such that it becomes readonly
501
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
self.assertTransportMode(t, 'mode400', 0400)
504
# The default permissions should be based on the current umask
505
umask = osutils.get_umask()
506
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
509
# We should also be able to set the mode for a parent directory
512
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
dir_mode=0700, create_parent_dir=True)
514
self.assertTransportMode(t, 'dir700', 0700)
515
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
dir_mode=0770, create_parent_dir=True)
517
self.assertTransportMode(t, 'dir770', 0770)
518
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
dir_mode=0777, create_parent_dir=True)
520
self.assertTransportMode(t, 'dir777', 0777)
522
def test_put_bytes_unicode(self):
523
t = self.get_transport()
526
unicode_string = u'\u1234'
527
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
529
def test_mkdir(self):
530
t = self.get_transport()
533
# cannot mkdir on readonly transports. We're not testing for
534
# cache coherency because cache behaviour is not currently
535
# defined for the transport interface.
536
self.assertRaises(TransportNotPossible, t.mkdir, '.')
537
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
538
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
539
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
543
self.assertEqual(t.has('dir_a'), True)
544
self.assertEqual(t.has('dir_b'), False)
547
self.assertEqual(t.has('dir_b'), True)
549
t.mkdir_multi(['dir_c', 'dir_d'])
551
t.mkdir_multi(iter(['dir_e', 'dir_f']))
552
self.assertEqual(list(t.has_multi(
553
['dir_a', 'dir_b', 'dir_c', 'dir_q',
554
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
555
[True, True, True, False,
556
True, True, True, True])
558
# we were testing that a local mkdir followed by a transport
559
# mkdir failed thusly, but given that we * in one process * do not
560
# concurrently fiddle with disk dirs and then use transport to do
561
# things, the win here seems marginal compared to the constraint on
562
# the interface. RBC 20051227
564
self.assertRaises(FileExists, t.mkdir, 'dir_g')
566
# Test get/put in sub-directories
567
t.put_bytes('dir_a/a', 'contents of dir_a/a')
568
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
569
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
570
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
572
# mkdir of a dir with an absent parent
573
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
575
def test_mkdir_permissions(self):
576
t = self.get_transport()
579
if not t._can_roundtrip_unix_modebits():
580
# no sense testing on this transport
582
# Test mkdir with a mode
583
t.mkdir('dmode755', mode=0755)
584
self.assertTransportMode(t, 'dmode755', 0755)
585
t.mkdir('dmode555', mode=0555)
586
self.assertTransportMode(t, 'dmode555', 0555)
587
t.mkdir('dmode777', mode=0777)
588
self.assertTransportMode(t, 'dmode777', 0777)
589
t.mkdir('dmode700', mode=0700)
590
self.assertTransportMode(t, 'dmode700', 0700)
591
t.mkdir_multi(['mdmode755'], mode=0755)
592
self.assertTransportMode(t, 'mdmode755', 0755)
594
# Default mode should be based on umask
595
umask = osutils.get_umask()
596
t.mkdir('dnomode', mode=None)
597
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
599
def test_opening_a_file_stream_creates_file(self):
600
t = self.get_transport()
603
handle = t.open_write_stream('foo')
605
self.assertEqual('', t.get_bytes('foo'))
609
def test_opening_a_file_stream_can_set_mode(self):
610
t = self.get_transport()
612
self.assertRaises((TransportNotPossible, NotImplementedError),
613
t.open_write_stream, 'foo')
615
if not t._can_roundtrip_unix_modebits():
616
# Can't roundtrip, so no need to run this test
619
def check_mode(name, mode, expected):
620
handle = t.open_write_stream(name, mode=mode)
622
self.assertTransportMode(t, name, expected)
623
check_mode('mode644', 0644, 0644)
624
check_mode('mode666', 0666, 0666)
625
check_mode('mode600', 0600, 0600)
626
# The default permissions should be based on the current umask
627
check_mode('nomode', None, 0666 & ~osutils.get_umask())
629
def test_copy_to(self):
630
# FIXME: test: same server to same server (partly done)
631
# same protocol two servers
632
# and different protocols (done for now except for MemoryTransport.
635
def simple_copy_files(transport_from, transport_to):
636
files = ['a', 'b', 'c', 'd']
637
self.build_tree(files, transport=transport_from)
638
self.assertEqual(4, transport_from.copy_to(files, transport_to))
640
self.check_transport_contents(transport_to.get_bytes(f),
643
t = self.get_transport()
644
temp_transport = MemoryTransport('memory:///')
645
simple_copy_files(t, temp_transport)
646
if not t.is_readonly():
647
t.mkdir('copy_to_simple')
648
t2 = t.clone('copy_to_simple')
649
simple_copy_files(t, t2)
652
# Test that copying into a missing directory raises
655
self.build_tree(['e/', 'e/f'])
658
t.put_bytes('e/f', 'contents of e')
659
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
660
temp_transport.mkdir('e')
661
t.copy_to(['e/f'], temp_transport)
664
temp_transport = MemoryTransport('memory:///')
666
files = ['a', 'b', 'c', 'd']
667
t.copy_to(iter(files), temp_transport)
669
self.check_transport_contents(temp_transport.get_bytes(f),
673
for mode in (0666, 0644, 0600, 0400):
674
temp_transport = MemoryTransport("memory:///")
675
t.copy_to(files, temp_transport, mode=mode)
677
self.assertTransportMode(temp_transport, f, mode)
679
def test_create_prefix(self):
680
t = self.get_transport()
681
sub = t.clone('foo').clone('bar')
684
except TransportNotPossible:
685
self.assertTrue(t.is_readonly())
687
self.assertTrue(t.has('foo/bar'))
689
def test_append_file(self):
690
t = self.get_transport()
693
self.assertRaises(TransportNotPossible,
694
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
696
t.put_bytes('a', 'diff\ncontents for\na\n')
697
t.put_bytes('b', 'contents\nfor b\n')
700
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
702
self.check_transport_contents(
703
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
706
# a file with no parent should fail..
707
self.assertRaises(NoSuchFile,
708
t.append_file, 'missing/path', StringIO('content'))
710
# And we can create new files, too
712
t.append_file('c', StringIO('some text\nfor a missing file\n')))
713
self.check_transport_contents('some text\nfor a missing file\n',
716
def test_append_bytes(self):
717
t = self.get_transport()
720
self.assertRaises(TransportNotPossible,
721
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
724
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
725
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
728
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
730
self.check_transport_contents(
731
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
734
# a file with no parent should fail..
735
self.assertRaises(NoSuchFile,
736
t.append_bytes, 'missing/path', 'content')
738
def test_append_multi(self):
739
t = self.get_transport()
743
t.put_bytes('a', 'diff\ncontents for\na\n'
744
'add\nsome\nmore\ncontents\n')
745
t.put_bytes('b', 'contents\nfor b\n')
747
self.assertEqual((43, 15),
748
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
749
('b', StringIO('some\nmore\nfor\nb\n'))]))
751
self.check_transport_contents(
752
'diff\ncontents for\na\n'
753
'add\nsome\nmore\ncontents\n'
754
'and\nthen\nsome\nmore\n',
756
self.check_transport_contents(
758
'some\nmore\nfor\nb\n',
761
self.assertEqual((62, 31),
762
t.append_multi(iter([('a', StringIO('a little bit more\n')),
763
('b', StringIO('from an iterator\n'))])))
764
self.check_transport_contents(
765
'diff\ncontents for\na\n'
766
'add\nsome\nmore\ncontents\n'
767
'and\nthen\nsome\nmore\n'
768
'a little bit more\n',
770
self.check_transport_contents(
772
'some\nmore\nfor\nb\n'
773
'from an iterator\n',
776
self.assertEqual((80, 0),
777
t.append_multi([('a', StringIO('some text in a\n')),
778
('d', StringIO('missing file r\n'))]))
780
self.check_transport_contents(
781
'diff\ncontents for\na\n'
782
'add\nsome\nmore\ncontents\n'
783
'and\nthen\nsome\nmore\n'
784
'a little bit more\n'
787
self.check_transport_contents('missing file r\n', t, 'd')
789
def test_append_file_mode(self):
790
"""Check that append accepts a mode parameter"""
791
# check append accepts a mode
792
t = self.get_transport()
794
self.assertRaises(TransportNotPossible,
795
t.append_file, 'f', StringIO('f'), mode=None)
797
t.append_file('f', StringIO('f'), mode=None)
799
def test_append_bytes_mode(self):
800
# check append_bytes accepts a mode
801
t = self.get_transport()
803
self.assertRaises(TransportNotPossible,
804
t.append_bytes, 'f', 'f', mode=None)
806
t.append_bytes('f', 'f', mode=None)
808
def test_delete(self):
809
# TODO: Test Transport.delete
810
t = self.get_transport()
812
# Not much to do with a readonly transport
814
self.assertRaises(TransportNotPossible, t.delete, 'missing')
817
t.put_bytes('a', 'a little bit of text\n')
818
self.assertTrue(t.has('a'))
820
self.assertFalse(t.has('a'))
822
self.assertRaises(NoSuchFile, t.delete, 'a')
824
t.put_bytes('a', 'a text\n')
825
t.put_bytes('b', 'b text\n')
826
t.put_bytes('c', 'c text\n')
827
self.assertEqual([True, True, True],
828
list(t.has_multi(['a', 'b', 'c'])))
829
t.delete_multi(['a', 'c'])
830
self.assertEqual([False, True, False],
831
list(t.has_multi(['a', 'b', 'c'])))
832
self.assertFalse(t.has('a'))
833
self.assertTrue(t.has('b'))
834
self.assertFalse(t.has('c'))
836
self.assertRaises(NoSuchFile,
837
t.delete_multi, ['a', 'b', 'c'])
839
self.assertRaises(NoSuchFile,
840
t.delete_multi, iter(['a', 'b', 'c']))
842
t.put_bytes('a', 'another a text\n')
843
t.put_bytes('c', 'another c text\n')
844
t.delete_multi(iter(['a', 'b', 'c']))
846
# We should have deleted everything
847
# SftpServer creates control files in the
848
# working directory, so we can just do a
850
# self.assertEqual([], os.listdir('.'))
852
def test_recommended_page_size(self):
853
"""Transports recommend a page size for partial access to files."""
854
t = self.get_transport()
855
self.assertIsInstance(t.recommended_page_size(), int)
857
def test_rmdir(self):
858
t = self.get_transport()
859
# Not much to do with a readonly transport
861
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
866
# ftp may not be able to raise NoSuchFile for lack of
867
# details when failing
868
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
870
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
872
def test_rmdir_not_empty(self):
873
"""Deleting a non-empty directory raises an exception
875
sftp (and possibly others) don't give us a specific "directory not
876
empty" exception -- we can just see that the operation failed.
878
t = self.get_transport()
883
self.assertRaises(PathError, t.rmdir, 'adir')
885
def test_rmdir_empty_but_similar_prefix(self):
886
"""rmdir does not get confused by sibling paths.
888
A naive implementation of MemoryTransport would refuse to rmdir
889
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
890
uses "path.startswith(dir)" on all file paths to determine if directory
893
t = self.get_transport()
897
t.put_bytes('foo-bar', '')
900
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
901
self.assertTrue(t.has('foo-bar'))
903
def test_rename_dir_succeeds(self):
904
t = self.get_transport()
906
self.assertRaises((TransportNotPossible, NotImplementedError),
907
t.rename, 'foo', 'bar')
910
t.mkdir('adir/asubdir')
911
t.rename('adir', 'bdir')
912
self.assertTrue(t.has('bdir/asubdir'))
913
self.assertFalse(t.has('adir'))
915
def test_rename_dir_nonempty(self):
916
"""Attempting to replace a nonemtpy directory should fail"""
917
t = self.get_transport()
919
self.assertRaises((TransportNotPossible, NotImplementedError),
920
t.rename, 'foo', 'bar')
923
t.mkdir('adir/asubdir')
925
t.mkdir('bdir/bsubdir')
926
# any kind of PathError would be OK, though we normally expect
928
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
929
# nothing was changed so it should still be as before
930
self.assertTrue(t.has('bdir/bsubdir'))
931
self.assertFalse(t.has('adir/bdir'))
932
self.assertFalse(t.has('adir/bsubdir'))
934
def test_rename_across_subdirs(self):
935
t = self.get_transport()
937
raise TestNotApplicable("transport is readonly")
942
ta.put_bytes('f', 'aoeu')
943
ta.rename('f', '../b/f')
944
self.assertTrue(tb.has('f'))
945
self.assertFalse(ta.has('f'))
946
self.assertTrue(t.has('b/f'))
948
def test_delete_tree(self):
949
t = self.get_transport()
951
# Not much to do with a readonly transport
953
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
956
# and does it like listing ?
959
t.delete_tree('adir')
960
except TransportNotPossible:
961
# ok, this transport does not support delete_tree
964
# did it delete that trivial case?
965
self.assertRaises(NoSuchFile, t.stat, 'adir')
967
self.build_tree(['adir/',
975
t.delete_tree('adir')
976
# adir should be gone now.
977
self.assertRaises(NoSuchFile, t.stat, 'adir')
980
t = self.get_transport()
985
# TODO: I would like to use os.listdir() to
986
# make sure there are no extra files, but SftpServer
987
# creates control files in the working directory
988
# perhaps all of this could be done in a subdirectory
990
t.put_bytes('a', 'a first file\n')
991
self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
994
self.assertTrue(t.has('b'))
995
self.assertFalse(t.has('a'))
997
self.check_transport_contents('a first file\n', t, 'b')
998
self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1001
t.put_bytes('c', 'c this file\n')
1003
self.assertFalse(t.has('c'))
1004
self.check_transport_contents('c this file\n', t, 'b')
1006
# TODO: Try to write a test for atomicity
1007
# TODO: Test moving into a non-existent subdirectory
1008
# TODO: Test Transport.move_multi
1010
def test_copy(self):
1011
t = self.get_transport()
1016
t.put_bytes('a', 'a file\n')
1018
self.check_transport_contents('a file\n', t, 'b')
1020
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1022
# What should the assert be if you try to copy a
1023
# file over a directory?
1024
#self.assertRaises(Something, t.copy, 'a', 'c')
1025
t.put_bytes('d', 'text in d\n')
1027
self.check_transport_contents('text in d\n', t, 'b')
1029
# TODO: test copy_multi
1031
def test_connection_error(self):
1032
"""ConnectionError is raised when connection is impossible.
1034
The error should be raised from the first operation on the transport.
1037
url = self._server.get_bogus_url()
1038
except NotImplementedError:
1039
raise TestSkipped("Transport %s has no bogus URL support." %
1040
self._server.__class__)
1041
t = _mod_transport.get_transport_from_url(url)
1042
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1044
def test_stat(self):
1045
# TODO: Test stat, just try once, and if it throws, stop testing
1046
from stat import S_ISDIR, S_ISREG
1048
t = self.get_transport()
1052
except TransportNotPossible, e:
1053
# This transport cannot stat
1056
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1057
sizes = [14, 0, 16, 0, 18]
1058
self.build_tree(paths, transport=t, line_endings='binary')
1060
for path, size in zip(paths, sizes):
1062
if path.endswith('/'):
1063
self.assertTrue(S_ISDIR(st.st_mode))
1064
# directory sizes are meaningless
1066
self.assertTrue(S_ISREG(st.st_mode))
1067
self.assertEqual(size, st.st_size)
1069
remote_stats = list(t.stat_multi(paths))
1070
remote_iter_stats = list(t.stat_multi(iter(paths)))
1072
self.assertRaises(NoSuchFile, t.stat, 'q')
1073
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1075
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1076
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1077
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1078
subdir = t.clone('subdir')
1079
st = subdir.stat('./file')
1080
st = subdir.stat('.')
1082
def test_hardlink(self):
1083
from stat import ST_NLINK
1085
t = self.get_transport()
1087
source_name = "original_target"
1088
link_name = "target_link"
1090
self.build_tree([source_name], transport=t)
1093
t.hardlink(source_name, link_name)
1095
self.assertTrue(t.has(source_name))
1096
self.assertTrue(t.has(link_name))
1098
st = t.stat(link_name)
1099
self.assertEqual(st[ST_NLINK], 2)
1100
except TransportNotPossible:
1101
raise TestSkipped("Transport %s does not support hardlinks." %
1102
self._server.__class__)
1104
def test_symlink(self):
1105
from stat import S_ISLNK
1107
t = self.get_transport()
1109
source_name = "original_target"
1110
link_name = "target_link"
1112
self.build_tree([source_name], transport=t)
1115
t.symlink(source_name, link_name)
1117
self.assertTrue(t.has(source_name))
1118
self.assertTrue(t.has(link_name))
1120
st = t.stat(link_name)
1121
self.assertTrue(S_ISLNK(st.st_mode),
1122
"expected symlink, got mode %o" % st.st_mode)
1123
except TransportNotPossible:
1124
raise TestSkipped("Transport %s does not support symlinks." %
1125
self._server.__class__)
1127
self.knownFailure("Paramiko fails to create symlinks during tests")
1129
def test_list_dir(self):
1130
# TODO: Test list_dir, just try once, and if it throws, stop testing
1131
t = self.get_transport()
1133
if not t.listable():
1134
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1137
def sorted_list(d, transport):
1138
l = list(transport.list_dir(d))
1142
self.assertEqual([], sorted_list('.', t))
1143
# c2 is precisely one letter longer than c here to test that
1144
# suffixing is not confused.
1145
# a%25b checks that quoting is done consistently across transports
1146
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1148
if not t.is_readonly():
1149
self.build_tree(tree_names, transport=t)
1151
self.build_tree(tree_names)
1154
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1156
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1157
self.assertEqual(['d', 'e'], sorted_list('c', t))
1159
# Cloning the transport produces an equivalent listing
1160
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1162
if not t.is_readonly():
1169
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1170
self.assertEqual(['e'], sorted_list('c', t))
1172
self.assertListRaises(PathError, t.list_dir, 'q')
1173
self.assertListRaises(PathError, t.list_dir, 'c/f')
1174
# 'a' is a file, list_dir should raise an error
1175
self.assertListRaises(PathError, t.list_dir, 'a')
1177
def test_list_dir_result_is_url_escaped(self):
1178
t = self.get_transport()
1179
if not t.listable():
1180
raise TestSkipped("transport not listable")
1182
if not t.is_readonly():
1183
self.build_tree(['a/', 'a/%'], transport=t)
1185
self.build_tree(['a/', 'a/%'])
1187
names = list(t.list_dir('a'))
1188
self.assertEqual(['%25'], names)
1189
self.assertIsInstance(names[0], str)
1191
def test_clone_preserve_info(self):
1192
t1 = self.get_transport()
1193
if not isinstance(t1, ConnectedTransport):
1194
raise TestSkipped("not a connected transport")
1196
t2 = t1.clone('subdir')
1197
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1198
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1199
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1200
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1201
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1203
def test__reuse_for(self):
1204
t = self.get_transport()
1205
if not isinstance(t, ConnectedTransport):
1206
raise TestSkipped("not a connected transport")
1208
def new_url(scheme=None, user=None, password=None,
1209
host=None, port=None, path=None):
1210
"""Build a new url from t.base changing only parts of it.
1212
Only the parameters different from None will be changed.
1214
if scheme is None: scheme = t._parsed_url.scheme
1215
if user is None: user = t._parsed_url.user
1216
if password is None: password = t._parsed_url.password
1217
if user is None: user = t._parsed_url.user
1218
if host is None: host = t._parsed_url.host
1219
if port is None: port = t._parsed_url.port
1220
if path is None: path = t._parsed_url.path
1221
return str(urlutils.URL(scheme, user, password, host, port, path))
1223
if t._parsed_url.scheme == 'ftp':
1227
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1228
if t._parsed_url.user == 'me':
1232
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1233
# passwords are not taken into account because:
1234
# - it makes no sense to have two different valid passwords for the
1236
# - _password in ConnectedTransport is intended to collect what the
1237
# user specified from the command-line and there are cases where the
1238
# new url can contain no password (if the url was built from an
1239
# existing transport.base for example)
1240
# - password are considered part of the credentials provided at
1241
# connection creation time and as such may not be present in the url
1242
# (they may be typed by the user when prompted for example)
1243
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1244
# We will not connect, we can use a invalid host
1245
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1246
if t._parsed_url.port == 1234:
1250
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1251
# No point in trying to reuse a transport for a local URL
1252
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1254
def test_connection_sharing(self):
1255
t = self.get_transport()
1256
if not isinstance(t, ConnectedTransport):
1257
raise TestSkipped("not a connected transport")
1259
c = t.clone('subdir')
1260
# Some transports will create the connection only when needed
1261
t.has('surely_not') # Force connection
1262
self.assertIs(t._get_connection(), c._get_connection())
1264
# Temporary failure, we need to create a new dummy connection
1265
new_connection = None
1266
t._set_connection(new_connection)
1267
# Check that both transports use the same connection
1268
self.assertIs(new_connection, t._get_connection())
1269
self.assertIs(new_connection, c._get_connection())
1271
def test_reuse_connection_for_various_paths(self):
1272
t = self.get_transport()
1273
if not isinstance(t, ConnectedTransport):
1274
raise TestSkipped("not a connected transport")
1276
t.has('surely_not') # Force connection
1277
self.assertIsNot(None, t._get_connection())
1279
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1280
self.assertIsNot(t, subdir)
1281
self.assertIs(t._get_connection(), subdir._get_connection())
1283
home = subdir._reuse_for(t.base + 'home')
1284
self.assertIs(t._get_connection(), home._get_connection())
1285
self.assertIs(subdir._get_connection(), home._get_connection())
1287
def test_clone(self):
1288
# TODO: Test that clone moves up and down the filesystem
1289
t1 = self.get_transport()
1291
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1293
self.assertTrue(t1.has('a'))
1294
self.assertTrue(t1.has('b/c'))
1295
self.assertFalse(t1.has('c'))
1298
self.assertEqual(t1.base + 'b/', t2.base)
1300
self.assertTrue(t2.has('c'))
1301
self.assertFalse(t2.has('a'))
1304
self.assertTrue(t3.has('a'))
1305
self.assertFalse(t3.has('c'))
1307
self.assertFalse(t1.has('b/d'))
1308
self.assertFalse(t2.has('d'))
1309
self.assertFalse(t3.has('b/d'))
1311
if t1.is_readonly():
1312
self.build_tree_contents([('b/d', 'newfile\n')])
1314
t2.put_bytes('d', 'newfile\n')
1316
self.assertTrue(t1.has('b/d'))
1317
self.assertTrue(t2.has('d'))
1318
self.assertTrue(t3.has('b/d'))
1320
def test_clone_to_root(self):
1321
orig_transport = self.get_transport()
1322
# Repeatedly go up to a parent directory until we're at the root
1323
# directory of this transport
1324
root_transport = orig_transport
1325
new_transport = root_transport.clone("..")
1326
# as we are walking up directories, the path must be
1327
# growing less, except at the top
1328
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1329
or new_transport.base == root_transport.base)
1330
while new_transport.base != root_transport.base:
1331
root_transport = new_transport
1332
new_transport = root_transport.clone("..")
1333
# as we are walking up directories, the path must be
1334
# growing less, except at the top
1335
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1336
or new_transport.base == root_transport.base)
1338
# Cloning to "/" should take us to exactly the same location.
1339
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1340
# the abspath of "/" from the original transport should be the same
1341
# as the base at the root:
1342
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1344
# At the root, the URL must still end with / as its a directory
1345
self.assertEqual(root_transport.base[-1], '/')
1347
def test_clone_from_root(self):
1348
"""At the root, cloning to a simple dir should just do string append."""
1349
orig_transport = self.get_transport()
1350
root_transport = orig_transport.clone('/')
1351
self.assertEqual(root_transport.base + '.bzr/',
1352
root_transport.clone('.bzr').base)
1354
def test_base_url(self):
1355
t = self.get_transport()
1356
self.assertEqual('/', t.base[-1])
1358
def test_relpath(self):
1359
t = self.get_transport()
1360
self.assertEqual('', t.relpath(t.base))
1362
self.assertEqual('', t.relpath(t.base[:-1]))
1363
# subdirs which don't exist should still give relpaths.
1364
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1365
# trailing slash should be the same.
1366
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1368
def test_relpath_at_root(self):
1369
t = self.get_transport()
1370
# clone all the way to the top
1371
new_transport = t.clone('..')
1372
while new_transport.base != t.base:
1374
new_transport = t.clone('..')
1375
# we must be able to get a relpath below the root
1376
self.assertEqual('', t.relpath(t.base))
1377
# and a deeper one should work too
1378
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1380
def test_abspath(self):
1381
# smoke test for abspath. Corner cases for backends like unix fs's
1382
# that have aliasing problems like symlinks should go in backend
1383
# specific test cases.
1384
transport = self.get_transport()
1386
self.assertEqual(transport.base + 'relpath',
1387
transport.abspath('relpath'))
1389
# This should work without raising an error.
1390
transport.abspath("/")
1392
# the abspath of "/" and "/foo/.." should result in the same location
1393
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1395
self.assertEqual(transport.clone("/").abspath('foo'),
1396
transport.abspath("/foo"))
1398
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1399
def test_win32_abspath(self):
1400
# Note: we tried to set sys.platform='win32' so we could test on
1401
# other platforms too, but then osutils does platform specific
1402
# things at import time which defeated us...
1403
if sys.platform != 'win32':
1405
'Testing drive letters in abspath implemented only for win32')
1407
# smoke test for abspath on win32.
1408
# a transport based on 'file:///' never fully qualifies the drive.
1409
transport = _mod_transport.get_transport_from_url("file:///")
1410
self.assertEqual(transport.abspath("/"), "file:///")
1412
# but a transport that starts with a drive spec must keep it.
1413
transport = _mod_transport.get_transport_from_url("file:///C:/")
1414
self.assertEqual(transport.abspath("/"), "file:///C:/")
1416
def test_local_abspath(self):
1417
transport = self.get_transport()
1419
p = transport.local_abspath('.')
1420
except (errors.NotLocalUrl, TransportNotPossible), e:
1421
# should be formattable
1424
self.assertEqual(getcwd(), p)
1426
def test_abspath_at_root(self):
1427
t = self.get_transport()
1428
# clone all the way to the top
1429
new_transport = t.clone('..')
1430
while new_transport.base != t.base:
1432
new_transport = t.clone('..')
1433
# we must be able to get a abspath of the root when we ask for
1434
# t.abspath('..') - this due to our choice that clone('..')
1435
# should return the root from the root, combined with the desire that
1436
# the url from clone('..') and from abspath('..') should be the same.
1437
self.assertEqual(t.base, t.abspath('..'))
1438
# '' should give us the root
1439
self.assertEqual(t.base, t.abspath(''))
1440
# and a path should append to the url
1441
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1443
def test_iter_files_recursive(self):
1444
transport = self.get_transport()
1445
if not transport.listable():
1446
self.assertRaises(TransportNotPossible,
1447
transport.iter_files_recursive)
1449
self.build_tree(['isolated/',
1453
'isolated/dir/b%25z', # make sure quoting is correct
1455
transport=transport)
1456
paths = set(transport.iter_files_recursive())
1457
# nb the directories are not converted
1458
self.assertEqual(paths,
1459
set(['isolated/dir/foo',
1461
'isolated/dir/b%2525z',
1463
sub_transport = transport.clone('isolated')
1464
paths = set(sub_transport.iter_files_recursive())
1465
self.assertEqual(paths,
1466
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1468
def test_copy_tree(self):
1469
# TODO: test file contents and permissions are preserved. This test was
1470
# added just to ensure that quoting was handled correctly.
1471
# -- David Allouche 2006-08-11
1472
transport = self.get_transport()
1473
if not transport.listable():
1474
self.assertRaises(TransportNotPossible,
1475
transport.iter_files_recursive)
1477
if transport.is_readonly():
1479
self.build_tree(['from/',
1483
'from/dir/b%25z', # make sure quoting is correct
1485
transport=transport)
1486
transport.copy_tree('from', 'to')
1487
paths = set(transport.iter_files_recursive())
1488
self.assertEqual(paths,
1489
set(['from/dir/foo',
1498
def test_copy_tree_to_transport(self):
1499
transport = self.get_transport()
1500
if not transport.listable():
1501
self.assertRaises(TransportNotPossible,
1502
transport.iter_files_recursive)
1504
if transport.is_readonly():
1506
self.build_tree(['from/',
1510
'from/dir/b%25z', # make sure quoting is correct
1512
transport=transport)
1513
from_transport = transport.clone('from')
1514
to_transport = transport.clone('to')
1515
to_transport.ensure_base()
1516
from_transport.copy_tree_to_transport(to_transport)
1517
paths = set(transport.iter_files_recursive())
1518
self.assertEqual(paths,
1519
set(['from/dir/foo',
1528
def test_unicode_paths(self):
1529
"""Test that we can read/write files with Unicode names."""
1530
t = self.get_transport()
1532
# With FAT32 and certain encodings on win32
1533
# '\xe5' and '\xe4' actually map to the same file
1534
# adding a suffix kicks in the 'preserving but insensitive'
1535
# route, and maintains the right files
1536
files = [u'\xe5.1', # a w/ circle iso-8859-1
1537
u'\xe4.2', # a w/ dots iso-8859-1
1538
u'\u017d', # Z with umlat iso-8859-2
1539
u'\u062c', # Arabic j
1540
u'\u0410', # Russian A
1541
u'\u65e5', # Kanji person
1544
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1545
if no_unicode_support:
1546
self.knownFailure("test server cannot handle unicode paths")
1549
self.build_tree(files, transport=t, line_endings='binary')
1550
except UnicodeError:
1551
raise TestSkipped("cannot handle unicode paths in current encoding")
1553
# A plain unicode string is not a valid url
1555
self.assertRaises(InvalidURL, t.get, fname)
1558
fname_utf8 = fname.encode('utf-8')
1559
contents = 'contents of %s\n' % (fname_utf8,)
1560
self.check_transport_contents(contents, t, urlutils.escape(fname))
1562
def test_connect_twice_is_same_content(self):
1563
# check that our server (whatever it is) is accessible reliably
1564
# via get_transport and multiple connections share content.
1565
transport = self.get_transport()
1566
if transport.is_readonly():
1568
transport.put_bytes('foo', 'bar')
1569
transport3 = self.get_transport()
1570
self.check_transport_contents('bar', transport3, 'foo')
1572
# now opening at a relative url should give use a sane result:
1573
transport.mkdir('newdir')
1574
transport5 = self.get_transport('newdir')
1575
transport6 = transport5.clone('..')
1576
self.check_transport_contents('bar', transport6, 'foo')
1578
def test_lock_write(self):
1579
"""Test transport-level write locks.
1581
These are deprecated and transports may decline to support them.
1583
transport = self.get_transport()
1584
if transport.is_readonly():
1585
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1587
transport.put_bytes('lock', '')
1589
lock = transport.lock_write('lock')
1590
except TransportNotPossible:
1592
# TODO make this consistent on all platforms:
1593
# self.assertRaises(LockError, transport.lock_write, 'lock')
1596
def test_lock_read(self):
1597
"""Test transport-level read locks.
1599
These are deprecated and transports may decline to support them.
1601
transport = self.get_transport()
1602
if transport.is_readonly():
1603
file('lock', 'w').close()
1605
transport.put_bytes('lock', '')
1607
lock = transport.lock_read('lock')
1608
except TransportNotPossible:
1610
# TODO make this consistent on all platforms:
1611
# self.assertRaises(LockError, transport.lock_read, 'lock')
1614
def test_readv(self):
1615
transport = self.get_transport()
1616
if transport.is_readonly():
1617
with file('a', 'w') as f: f.write('0123456789')
1619
transport.put_bytes('a', '0123456789')
1621
d = list(transport.readv('a', ((0, 1),)))
1622
self.assertEqual(d[0], (0, '0'))
1624
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1625
self.assertEqual(d[0], (0, '0'))
1626
self.assertEqual(d[1], (1, '1'))
1627
self.assertEqual(d[2], (3, '34'))
1628
self.assertEqual(d[3], (9, '9'))
1630
def test_readv_out_of_order(self):
1631
transport = self.get_transport()
1632
if transport.is_readonly():
1633
with file('a', 'w') as f: f.write('0123456789')
1635
transport.put_bytes('a', '01234567890')
1637
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1638
self.assertEqual(d[0], (1, '1'))
1639
self.assertEqual(d[1], (9, '9'))
1640
self.assertEqual(d[2], (0, '0'))
1641
self.assertEqual(d[3], (3, '34'))
1643
def test_readv_with_adjust_for_latency(self):
1644
transport = self.get_transport()
1645
# the adjust for latency flag expands the data region returned
1646
# according to a per-transport heuristic, so testing is a little
1647
# tricky as we need more data than the largest combining that our
1648
# transports do. To accomodate this we generate random data and cross
1649
# reference the returned data with the random data. To avoid doing
1650
# multiple large random byte look ups we do several tests on the same
1652
content = osutils.rand_bytes(200*1024)
1653
content_size = len(content)
1654
if transport.is_readonly():
1655
self.build_tree_contents([('a', content)])
1657
transport.put_bytes('a', content)
1658
def check_result_data(result_vector):
1659
for item in result_vector:
1660
data_len = len(item[1])
1661
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1664
result = list(transport.readv('a', ((0, 30),),
1665
adjust_for_latency=True, upper_limit=content_size))
1666
# we expect 1 result, from 0, to something > 30
1667
self.assertEqual(1, len(result))
1668
self.assertEqual(0, result[0][0])
1669
self.assertTrue(len(result[0][1]) >= 30)
1670
check_result_data(result)
1671
# end of file corner case
1672
result = list(transport.readv('a', ((204700, 100),),
1673
adjust_for_latency=True, upper_limit=content_size))
1674
# we expect 1 result, from 204800- its length, to the end
1675
self.assertEqual(1, len(result))
1676
data_len = len(result[0][1])
1677
self.assertEqual(204800-data_len, result[0][0])
1678
self.assertTrue(data_len >= 100)
1679
check_result_data(result)
1680
# out of order ranges are made in order
1681
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1682
adjust_for_latency=True, upper_limit=content_size))
1683
# we expect 2 results, in order, start and end.
1684
self.assertEqual(2, len(result))
1686
data_len = len(result[0][1])
1687
self.assertEqual(0, result[0][0])
1688
self.assertTrue(data_len >= 30)
1690
data_len = len(result[1][1])
1691
self.assertEqual(204800-data_len, result[1][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# close ranges get combined (even if out of order)
1695
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1696
result = list(transport.readv('a', request_vector,
1697
adjust_for_latency=True, upper_limit=content_size))
1698
self.assertEqual(1, len(result))
1699
data_len = len(result[0][1])
1700
# minimum length is from 400 to 1034 - 634
1701
self.assertTrue(data_len >= 634)
1702
# must contain the region 400 to 1034
1703
self.assertTrue(result[0][0] <= 400)
1704
self.assertTrue(result[0][0] + data_len >= 1034)
1705
check_result_data(result)
1707
def test_readv_with_adjust_for_latency_with_big_file(self):
1708
transport = self.get_transport()
1709
# test from observed failure case.
1710
if transport.is_readonly():
1711
with file('a', 'w') as f: f.write('a'*1024*1024)
1713
transport.put_bytes('a', 'a'*1024*1024)
1714
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1715
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1716
(465373, 800), (947422, 800)]
1717
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1718
found_items = [False]*9
1719
for pos, (start, length) in enumerate(broken_vector):
1720
# check the range is covered by the result
1721
for offset, data in results:
1722
if offset <= start and start + length <= offset + len(data):
1723
found_items[pos] = True
1724
self.assertEqual([True]*9, found_items)
1726
def test_get_with_open_write_stream_sees_all_content(self):
1727
t = self.get_transport()
1730
handle = t.open_write_stream('foo')
1733
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1737
def test_get_smart_medium(self):
1738
"""All transports must either give a smart medium, or know they can't.
1740
transport = self.get_transport()
1742
client_medium = transport.get_smart_medium()
1743
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1744
except errors.NoSmartMedium:
1745
# as long as we got it we're fine
1748
def test_readv_short_read(self):
1749
transport = self.get_transport()
1750
if transport.is_readonly():
1751
with file('a', 'w') as f: f.write('0123456789')
1753
transport.put_bytes('a', '01234567890')
1755
# This is intentionally reading off the end of the file
1756
# since we are sure that it cannot get there
1757
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1758
# Can be raised by paramiko
1760
transport.readv, 'a', [(1,1), (8,10)])
1762
# This is trying to seek past the end of the file, it should
1763
# also raise a special error
1764
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1765
transport.readv, 'a', [(12,2)])
1767
def test_no_segment_parameters(self):
1768
"""Segment parameters should be stripped and stored in
1769
transport.segment_parameters."""
1770
transport = self.get_transport("foo")
1771
self.assertEqual({}, transport.get_segment_parameters())
1773
def test_segment_parameters(self):
1774
"""Segment parameters should be stripped and stored in
1775
transport.get_segment_parameters()."""
1776
base_url = self._server.get_url()
1777
parameters = {"key1": "val1", "key2": "val2"}
1778
url = urlutils.join_segment_parameters(base_url, parameters)
1779
transport = _mod_transport.get_transport_from_url(url)
1780
self.assertEqual(parameters, transport.get_segment_parameters())
1782
def test_set_segment_parameters(self):
1783
"""Segment parameters can be set and show up in base."""
1784
transport = self.get_transport("foo")
1785
orig_base = transport.base
1786
transport.set_segment_parameter("arm", "board")
1787
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1788
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1789
transport.set_segment_parameter("arm", None)
1790
transport.set_segment_parameter("nonexistant", None)
1791
self.assertEqual({}, transport.get_segment_parameters())
1792
self.assertEqual(orig_base, transport.base)
1794
def test_stat_symlink(self):
1795
# if a transport points directly to a symlink (and supports symlinks
1796
# at all) you can tell this. helps with bug 32669.
1797
t = self.get_transport()
1799
t.symlink('target', 'link')
1800
except TransportNotPossible:
1801
raise TestSkipped("symlinks not supported")
1802
t2 = t.clone('link')
1804
self.assertTrue(stat.S_ISLNK(st.st_mode))
1806
def test_abspath_url_unquote_unreserved(self):
1807
"""URLs from abspath should have unreserved characters unquoted
1809
Need consistent quoting notably for tildes, see lp:842223 for more.
1811
t = self.get_transport()
1812
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1813
self.assertEqual(t.base + "-.09AZ_az~",
1814
t.abspath(needlessly_escaped_dir))
1816
def test_clone_url_unquote_unreserved(self):
1817
"""Base URL of a cloned branch needs unreserved characters unquoted
1819
Cloned transports should be prefix comparable for things like the
1820
isolation checking of tests, see lp:842223 for more.
1822
t1 = self.get_transport()
1823
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1824
self.build_tree([needlessly_escaped_dir], transport=t1)
1825
t2 = t1.clone(needlessly_escaped_dir)
1826
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1828
def test_hook_post_connection_one(self):
1829
"""Fire post_connect hook after a ConnectedTransport is first used"""
1831
Transport.hooks.install_named_hook("post_connect", log.append, None)
1832
t = self.get_transport()
1833
self.assertEqual([], log)
1834
t.has("non-existant")
1835
if isinstance(t, RemoteTransport):
1836
self.assertEqual([t.get_smart_medium()], log)
1837
elif isinstance(t, ConnectedTransport):
1838
self.assertEqual([t], log)
1840
self.assertEqual([], log)
1842
def test_hook_post_connection_multi(self):
1843
"""Fire post_connect hook once per unshared underlying connection"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t1 = self.get_transport()
1848
t3 = self.get_transport()
1849
self.assertEqual([], log)
1853
if isinstance(t1, RemoteTransport):
1854
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1855
elif isinstance(t1, ConnectedTransport):
1856
self.assertEqual([t1, t3], log)
1858
self.assertEqual([], log)