1
# Copyright (C) 2005-2011, 2015 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
105
def check_transport_contents(self, content, transport, relpath):
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
150
t = self.get_transport()
152
files = ['a', 'b', 'e', 'g', '%']
153
self.build_tree(files, transport=t)
154
self.assertEqual(True, t.has('a'))
155
self.assertEqual(False, t.has('c'))
156
self.assertEqual(True, t.has(urlutils.escape('%')))
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
176
current_transport = self.get_transport()
177
self.assertTrue(current_transport.has('/'))
178
root = current_transport.clone('/')
179
self.assertTrue(root.has(''))
182
t = self.get_transport()
184
files = ['a', 'b', 'e', 'g']
185
contents = ['contents of a\n',
190
self.build_tree(files, transport=t, line_endings='binary')
191
self.check_transport_contents('contents of a\n', t, 'a')
192
content_f = t.get_multi(files)
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
197
self.assertEqual(content, f.read())
199
content_f = t.get_multi(iter(files))
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
211
self.assertRaises(NoSuchFile, t.get, 'c')
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
244
def test_get_bytes(self):
245
t = self.get_transport()
247
files = ['a', 'b', 'e', 'g']
248
contents = ['contents of a\n',
253
self.build_tree(files, transport=t, line_endings='binary')
254
self.check_transport_contents('contents of a\n', t, 'a')
256
for content, fname in zip(contents, files):
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
290
def test_put_bytes(self):
291
t = self.get_transport()
294
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
299
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
302
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
306
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
309
def test_put_bytes_non_atomic(self):
310
t = self.get_transport()
313
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
317
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
319
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
321
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
325
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
# Now test the create_parent flag
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
# But we still get NoSuchFile if we can't make the parent dir
343
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
347
def test_put_bytes_permissions(self):
348
t = self.get_transport()
352
if not t._can_roundtrip_unix_modebits():
353
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0644)
356
self.assertTransportMode(t, 'mode644', 0644)
357
t.put_bytes('mode666', 'test text\n', mode=0666)
358
self.assertTransportMode(t, 'mode666', 0666)
359
t.put_bytes('mode600', 'test text\n', mode=0600)
360
self.assertTransportMode(t, 'mode600', 0600)
361
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0400)
363
self.assertTransportMode(t, 'mode400', 0400)
365
# The default permissions should be based on the current umask
366
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
368
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
370
def test_put_bytes_non_atomic_permissions(self):
371
t = self.get_transport()
375
if not t._can_roundtrip_unix_modebits():
376
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
self.assertTransportMode(t, 'mode644', 0644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
self.assertTransportMode(t, 'mode666', 0666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
self.assertTransportMode(t, 'mode600', 0600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
self.assertTransportMode(t, 'mode400', 0400)
387
# The default permissions should be based on the current umask
388
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
392
# We should also be able to set the mode for a parent directory
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
dir_mode=0700, create_parent_dir=True)
396
self.assertTransportMode(t, 'dir700', 0700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
dir_mode=0770, create_parent_dir=True)
399
self.assertTransportMode(t, 'dir770', 0770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
dir_mode=0777, create_parent_dir=True)
402
self.assertTransportMode(t, 'dir777', 0777)
404
def test_put_file(self):
405
t = self.get_transport()
408
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', StringIO('some text for a\n'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
417
# Put also replaces contents
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
self.assertRaises(NoSuchFile,
422
t.put_file, 'path/doesnt/exist/c',
423
StringIO('contents'))
425
def test_put_file_non_atomic(self):
426
t = self.get_transport()
429
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
433
self.assertFalse(t.has('a'))
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
437
# Put also replaces contents
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
441
# Make sure we can create another file
442
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
# And overwrite 'a' with empty contents
444
t.put_file_non_atomic('a', StringIO(''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
StringIO('contents\n'))
450
# Now test the create_parent flag
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
StringIO('contents\n'))
453
self.assertFalse(t.has('dir/a'))
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
458
# But we still get NoSuchFile if we can't make the parent dir
459
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
StringIO('contents\n'),
461
create_parent_dir=True)
463
def test_put_file_permissions(self):
465
t = self.get_transport()
469
if not t._can_roundtrip_unix_modebits():
470
# Can't roundtrip, so no need to run this test
472
t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
self.assertTransportMode(t, 'mode644', 0644)
474
t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
self.assertTransportMode(t, 'mode666', 0666)
476
t.put_file('mode600', StringIO('test text\n'), mode=0600)
477
self.assertTransportMode(t, 'mode600', 0600)
478
# Yes, you can put a file such that it becomes readonly
479
t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
self.assertTransportMode(t, 'mode400', 0400)
481
# The default permissions should be based on the current umask
482
umask = osutils.get_umask()
483
t.put_file('nomode', StringIO('test text\n'), mode=None)
484
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
486
def test_put_file_non_atomic_permissions(self):
487
t = self.get_transport()
491
if not t._can_roundtrip_unix_modebits():
492
# Can't roundtrip, so no need to run this test
494
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
self.assertTransportMode(t, 'mode644', 0644)
496
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
self.assertTransportMode(t, 'mode666', 0666)
498
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
self.assertTransportMode(t, 'mode600', 0600)
500
# Yes, you can put_file_non_atomic a file such that it becomes readonly
501
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
self.assertTransportMode(t, 'mode400', 0400)
504
# The default permissions should be based on the current umask
505
umask = osutils.get_umask()
506
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
509
# We should also be able to set the mode for a parent directory
512
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
dir_mode=0700, create_parent_dir=True)
514
self.assertTransportMode(t, 'dir700', 0700)
515
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
dir_mode=0770, create_parent_dir=True)
517
self.assertTransportMode(t, 'dir770', 0770)
518
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
dir_mode=0777, create_parent_dir=True)
520
self.assertTransportMode(t, 'dir777', 0777)
522
def test_put_bytes_unicode(self):
523
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
525
# (we don't want to encode unicode here at all, callers should be
526
# strictly passing bytes to put_bytes), but we allow it for backwards
527
# compatibility. At some point we should use a specific exception.
528
# See https://bugs.launchpad.net/bzr/+bug/106898.
529
t = self.get_transport()
532
unicode_string = u'\u1234'
534
(AssertionError, UnicodeEncodeError),
535
t.put_bytes, 'foo', unicode_string)
537
def test_put_file_unicode(self):
538
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
# This situation can happen (and has) if code is careless about the type
540
# of "string" they initialise/write to a StringIO with. We cannot use
541
# cStringIO, because it never returns unicode from read.
542
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
# raise, but we raise it for hysterical raisins.
544
t = self.get_transport()
547
unicode_file = pyStringIO(u'\u1234')
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
550
def test_mkdir(self):
551
t = self.get_transport()
554
# cannot mkdir on readonly transports. We're not testing for
555
# cache coherency because cache behaviour is not currently
556
# defined for the transport interface.
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
558
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
559
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
560
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
564
self.assertEqual(t.has('dir_a'), True)
565
self.assertEqual(t.has('dir_b'), False)
568
self.assertEqual(t.has('dir_b'), True)
570
t.mkdir_multi(['dir_c', 'dir_d'])
572
t.mkdir_multi(iter(['dir_e', 'dir_f']))
573
self.assertEqual(list(t.has_multi(
574
['dir_a', 'dir_b', 'dir_c', 'dir_q',
575
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
576
[True, True, True, False,
577
True, True, True, True])
579
# we were testing that a local mkdir followed by a transport
580
# mkdir failed thusly, but given that we * in one process * do not
581
# concurrently fiddle with disk dirs and then use transport to do
582
# things, the win here seems marginal compared to the constraint on
583
# the interface. RBC 20051227
585
self.assertRaises(FileExists, t.mkdir, 'dir_g')
587
# Test get/put in sub-directories
588
t.put_bytes('dir_a/a', 'contents of dir_a/a')
589
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
590
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
591
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
593
# mkdir of a dir with an absent parent
594
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
596
def test_mkdir_permissions(self):
597
t = self.get_transport()
600
if not t._can_roundtrip_unix_modebits():
601
# no sense testing on this transport
603
# Test mkdir with a mode
604
t.mkdir('dmode755', mode=0755)
605
self.assertTransportMode(t, 'dmode755', 0755)
606
t.mkdir('dmode555', mode=0555)
607
self.assertTransportMode(t, 'dmode555', 0555)
608
t.mkdir('dmode777', mode=0777)
609
self.assertTransportMode(t, 'dmode777', 0777)
610
t.mkdir('dmode700', mode=0700)
611
self.assertTransportMode(t, 'dmode700', 0700)
612
t.mkdir_multi(['mdmode755'], mode=0755)
613
self.assertTransportMode(t, 'mdmode755', 0755)
615
# Default mode should be based on umask
616
umask = osutils.get_umask()
617
t.mkdir('dnomode', mode=None)
618
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
620
def test_opening_a_file_stream_creates_file(self):
621
t = self.get_transport()
624
handle = t.open_write_stream('foo')
626
self.assertEqual('', t.get_bytes('foo'))
630
def test_opening_a_file_stream_can_set_mode(self):
631
t = self.get_transport()
633
self.assertRaises((TransportNotPossible, NotImplementedError),
634
t.open_write_stream, 'foo')
636
if not t._can_roundtrip_unix_modebits():
637
# Can't roundtrip, so no need to run this test
640
def check_mode(name, mode, expected):
641
handle = t.open_write_stream(name, mode=mode)
643
self.assertTransportMode(t, name, expected)
644
check_mode('mode644', 0644, 0644)
645
check_mode('mode666', 0666, 0666)
646
check_mode('mode600', 0600, 0600)
647
# The default permissions should be based on the current umask
648
check_mode('nomode', None, 0666 & ~osutils.get_umask())
650
def test_copy_to(self):
651
# FIXME: test: same server to same server (partly done)
652
# same protocol two servers
653
# and different protocols (done for now except for MemoryTransport.
656
def simple_copy_files(transport_from, transport_to):
657
files = ['a', 'b', 'c', 'd']
658
self.build_tree(files, transport=transport_from)
659
self.assertEqual(4, transport_from.copy_to(files, transport_to))
661
self.check_transport_contents(transport_to.get_bytes(f),
664
t = self.get_transport()
665
temp_transport = MemoryTransport('memory:///')
666
simple_copy_files(t, temp_transport)
667
if not t.is_readonly():
668
t.mkdir('copy_to_simple')
669
t2 = t.clone('copy_to_simple')
670
simple_copy_files(t, t2)
673
# Test that copying into a missing directory raises
676
self.build_tree(['e/', 'e/f'])
679
t.put_bytes('e/f', 'contents of e')
680
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
681
temp_transport.mkdir('e')
682
t.copy_to(['e/f'], temp_transport)
685
temp_transport = MemoryTransport('memory:///')
687
files = ['a', 'b', 'c', 'd']
688
t.copy_to(iter(files), temp_transport)
690
self.check_transport_contents(temp_transport.get_bytes(f),
694
for mode in (0666, 0644, 0600, 0400):
695
temp_transport = MemoryTransport("memory:///")
696
t.copy_to(files, temp_transport, mode=mode)
698
self.assertTransportMode(temp_transport, f, mode)
700
def test_create_prefix(self):
701
t = self.get_transport()
702
sub = t.clone('foo').clone('bar')
705
except TransportNotPossible:
706
self.assertTrue(t.is_readonly())
708
self.assertTrue(t.has('foo/bar'))
710
def test_append_file(self):
711
t = self.get_transport()
714
self.assertRaises(TransportNotPossible,
715
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
717
t.put_bytes('a', 'diff\ncontents for\na\n')
718
t.put_bytes('b', 'contents\nfor b\n')
721
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
723
self.check_transport_contents(
724
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
727
# a file with no parent should fail..
728
self.assertRaises(NoSuchFile,
729
t.append_file, 'missing/path', StringIO('content'))
731
# And we can create new files, too
733
t.append_file('c', StringIO('some text\nfor a missing file\n')))
734
self.check_transport_contents('some text\nfor a missing file\n',
737
def test_append_bytes(self):
738
t = self.get_transport()
741
self.assertRaises(TransportNotPossible,
742
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
745
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
746
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
749
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
751
self.check_transport_contents(
752
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
755
# a file with no parent should fail..
756
self.assertRaises(NoSuchFile,
757
t.append_bytes, 'missing/path', 'content')
759
def test_append_multi(self):
760
t = self.get_transport()
764
t.put_bytes('a', 'diff\ncontents for\na\n'
765
'add\nsome\nmore\ncontents\n')
766
t.put_bytes('b', 'contents\nfor b\n')
768
self.assertEqual((43, 15),
769
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
770
('b', StringIO('some\nmore\nfor\nb\n'))]))
772
self.check_transport_contents(
773
'diff\ncontents for\na\n'
774
'add\nsome\nmore\ncontents\n'
775
'and\nthen\nsome\nmore\n',
777
self.check_transport_contents(
779
'some\nmore\nfor\nb\n',
782
self.assertEqual((62, 31),
783
t.append_multi(iter([('a', StringIO('a little bit more\n')),
784
('b', StringIO('from an iterator\n'))])))
785
self.check_transport_contents(
786
'diff\ncontents for\na\n'
787
'add\nsome\nmore\ncontents\n'
788
'and\nthen\nsome\nmore\n'
789
'a little bit more\n',
791
self.check_transport_contents(
793
'some\nmore\nfor\nb\n'
794
'from an iterator\n',
797
self.assertEqual((80, 0),
798
t.append_multi([('a', StringIO('some text in a\n')),
799
('d', StringIO('missing file r\n'))]))
801
self.check_transport_contents(
802
'diff\ncontents for\na\n'
803
'add\nsome\nmore\ncontents\n'
804
'and\nthen\nsome\nmore\n'
805
'a little bit more\n'
808
self.check_transport_contents('missing file r\n', t, 'd')
810
def test_append_file_mode(self):
811
"""Check that append accepts a mode parameter"""
812
# check append accepts a mode
813
t = self.get_transport()
815
self.assertRaises(TransportNotPossible,
816
t.append_file, 'f', StringIO('f'), mode=None)
818
t.append_file('f', StringIO('f'), mode=None)
820
def test_append_bytes_mode(self):
821
# check append_bytes accepts a mode
822
t = self.get_transport()
824
self.assertRaises(TransportNotPossible,
825
t.append_bytes, 'f', 'f', mode=None)
827
t.append_bytes('f', 'f', mode=None)
829
def test_delete(self):
830
# TODO: Test Transport.delete
831
t = self.get_transport()
833
# Not much to do with a readonly transport
835
self.assertRaises(TransportNotPossible, t.delete, 'missing')
838
t.put_bytes('a', 'a little bit of text\n')
839
self.assertTrue(t.has('a'))
841
self.assertFalse(t.has('a'))
843
self.assertRaises(NoSuchFile, t.delete, 'a')
845
t.put_bytes('a', 'a text\n')
846
t.put_bytes('b', 'b text\n')
847
t.put_bytes('c', 'c text\n')
848
self.assertEqual([True, True, True],
849
list(t.has_multi(['a', 'b', 'c'])))
850
t.delete_multi(['a', 'c'])
851
self.assertEqual([False, True, False],
852
list(t.has_multi(['a', 'b', 'c'])))
853
self.assertFalse(t.has('a'))
854
self.assertTrue(t.has('b'))
855
self.assertFalse(t.has('c'))
857
self.assertRaises(NoSuchFile,
858
t.delete_multi, ['a', 'b', 'c'])
860
self.assertRaises(NoSuchFile,
861
t.delete_multi, iter(['a', 'b', 'c']))
863
t.put_bytes('a', 'another a text\n')
864
t.put_bytes('c', 'another c text\n')
865
t.delete_multi(iter(['a', 'b', 'c']))
867
# We should have deleted everything
868
# SftpServer creates control files in the
869
# working directory, so we can just do a
871
# self.assertEqual([], os.listdir('.'))
873
def test_recommended_page_size(self):
874
"""Transports recommend a page size for partial access to files."""
875
t = self.get_transport()
876
self.assertIsInstance(t.recommended_page_size(), int)
878
def test_rmdir(self):
879
t = self.get_transport()
880
# Not much to do with a readonly transport
882
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
887
# ftp may not be able to raise NoSuchFile for lack of
888
# details when failing
889
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
891
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
893
def test_rmdir_not_empty(self):
894
"""Deleting a non-empty directory raises an exception
896
sftp (and possibly others) don't give us a specific "directory not
897
empty" exception -- we can just see that the operation failed.
899
t = self.get_transport()
904
self.assertRaises(PathError, t.rmdir, 'adir')
906
def test_rmdir_empty_but_similar_prefix(self):
907
"""rmdir does not get confused by sibling paths.
909
A naive implementation of MemoryTransport would refuse to rmdir
910
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
911
uses "path.startswith(dir)" on all file paths to determine if directory
914
t = self.get_transport()
918
t.put_bytes('foo-bar', '')
921
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
922
self.assertTrue(t.has('foo-bar'))
924
def test_rename_dir_succeeds(self):
925
t = self.get_transport()
927
self.assertRaises((TransportNotPossible, NotImplementedError),
928
t.rename, 'foo', 'bar')
931
t.mkdir('adir/asubdir')
932
t.rename('adir', 'bdir')
933
self.assertTrue(t.has('bdir/asubdir'))
934
self.assertFalse(t.has('adir'))
936
def test_rename_dir_nonempty(self):
937
"""Attempting to replace a nonemtpy directory should fail"""
938
t = self.get_transport()
940
self.assertRaises((TransportNotPossible, NotImplementedError),
941
t.rename, 'foo', 'bar')
944
t.mkdir('adir/asubdir')
946
t.mkdir('bdir/bsubdir')
947
# any kind of PathError would be OK, though we normally expect
949
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
950
# nothing was changed so it should still be as before
951
self.assertTrue(t.has('bdir/bsubdir'))
952
self.assertFalse(t.has('adir/bdir'))
953
self.assertFalse(t.has('adir/bsubdir'))
955
def test_rename_across_subdirs(self):
956
t = self.get_transport()
958
raise TestNotApplicable("transport is readonly")
963
ta.put_bytes('f', 'aoeu')
964
ta.rename('f', '../b/f')
965
self.assertTrue(tb.has('f'))
966
self.assertFalse(ta.has('f'))
967
self.assertTrue(t.has('b/f'))
969
def test_delete_tree(self):
970
t = self.get_transport()
972
# Not much to do with a readonly transport
974
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
977
# and does it like listing ?
980
t.delete_tree('adir')
981
except TransportNotPossible:
982
# ok, this transport does not support delete_tree
985
# did it delete that trivial case?
986
self.assertRaises(NoSuchFile, t.stat, 'adir')
988
self.build_tree(['adir/',
996
t.delete_tree('adir')
997
# adir should be gone now.
998
self.assertRaises(NoSuchFile, t.stat, 'adir')
1000
def test_move(self):
1001
t = self.get_transport()
1006
# TODO: I would like to use os.listdir() to
1007
# make sure there are no extra files, but SftpServer
1008
# creates control files in the working directory
1009
# perhaps all of this could be done in a subdirectory
1011
t.put_bytes('a', 'a first file\n')
1012
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1015
self.assertTrue(t.has('b'))
1016
self.assertFalse(t.has('a'))
1018
self.check_transport_contents('a first file\n', t, 'b')
1019
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1022
t.put_bytes('c', 'c this file\n')
1024
self.assertFalse(t.has('c'))
1025
self.check_transport_contents('c this file\n', t, 'b')
1027
# TODO: Try to write a test for atomicity
1028
# TODO: Test moving into a non-existent subdirectory
1029
# TODO: Test Transport.move_multi
1031
def test_copy(self):
1032
t = self.get_transport()
1037
t.put_bytes('a', 'a file\n')
1039
self.check_transport_contents('a file\n', t, 'b')
1041
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1043
# What should the assert be if you try to copy a
1044
# file over a directory?
1045
#self.assertRaises(Something, t.copy, 'a', 'c')
1046
t.put_bytes('d', 'text in d\n')
1048
self.check_transport_contents('text in d\n', t, 'b')
1050
# TODO: test copy_multi
1052
def test_connection_error(self):
1053
"""ConnectionError is raised when connection is impossible.
1055
The error should be raised from the first operation on the transport.
1058
url = self._server.get_bogus_url()
1059
except NotImplementedError:
1060
raise TestSkipped("Transport %s has no bogus URL support." %
1061
self._server.__class__)
1062
t = _mod_transport.get_transport_from_url(url)
1063
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1065
def test_stat(self):
1066
# TODO: Test stat, just try once, and if it throws, stop testing
1067
from stat import S_ISDIR, S_ISREG
1069
t = self.get_transport()
1073
except TransportNotPossible, e:
1074
# This transport cannot stat
1077
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1078
sizes = [14, 0, 16, 0, 18]
1079
self.build_tree(paths, transport=t, line_endings='binary')
1081
for path, size in zip(paths, sizes):
1083
if path.endswith('/'):
1084
self.assertTrue(S_ISDIR(st.st_mode))
1085
# directory sizes are meaningless
1087
self.assertTrue(S_ISREG(st.st_mode))
1088
self.assertEqual(size, st.st_size)
1090
remote_stats = list(t.stat_multi(paths))
1091
remote_iter_stats = list(t.stat_multi(iter(paths)))
1093
self.assertRaises(NoSuchFile, t.stat, 'q')
1094
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1096
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1097
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1098
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1099
subdir = t.clone('subdir')
1100
st = subdir.stat('./file')
1101
st = subdir.stat('.')
1103
def test_hardlink(self):
1104
from stat import ST_NLINK
1106
t = self.get_transport()
1108
source_name = "original_target"
1109
link_name = "target_link"
1111
self.build_tree([source_name], transport=t)
1114
t.hardlink(source_name, link_name)
1116
self.assertTrue(t.has(source_name))
1117
self.assertTrue(t.has(link_name))
1119
st = t.stat(link_name)
1120
self.assertEqual(st[ST_NLINK], 2)
1121
except TransportNotPossible:
1122
raise TestSkipped("Transport %s does not support hardlinks." %
1123
self._server.__class__)
1125
def test_symlink(self):
1126
from stat import S_ISLNK
1128
t = self.get_transport()
1130
source_name = "original_target"
1131
link_name = "target_link"
1133
self.build_tree([source_name], transport=t)
1136
t.symlink(source_name, link_name)
1138
self.assertTrue(t.has(source_name))
1139
self.assertTrue(t.has(link_name))
1141
st = t.stat(link_name)
1142
self.assertTrue(S_ISLNK(st.st_mode),
1143
"expected symlink, got mode %o" % st.st_mode)
1144
except TransportNotPossible:
1145
raise TestSkipped("Transport %s does not support symlinks." %
1146
self._server.__class__)
1148
self.knownFailure("Paramiko fails to create symlinks during tests")
1150
def test_list_dir(self):
1151
# TODO: Test list_dir, just try once, and if it throws, stop testing
1152
t = self.get_transport()
1154
if not t.listable():
1155
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1158
def sorted_list(d, transport):
1159
l = list(transport.list_dir(d))
1163
self.assertEqual([], sorted_list('.', t))
1164
# c2 is precisely one letter longer than c here to test that
1165
# suffixing is not confused.
1166
# a%25b checks that quoting is done consistently across transports
1167
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1169
if not t.is_readonly():
1170
self.build_tree(tree_names, transport=t)
1172
self.build_tree(tree_names)
1175
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1177
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1178
self.assertEqual(['d', 'e'], sorted_list('c', t))
1180
# Cloning the transport produces an equivalent listing
1181
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1183
if not t.is_readonly():
1190
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1191
self.assertEqual(['e'], sorted_list('c', t))
1193
self.assertListRaises(PathError, t.list_dir, 'q')
1194
self.assertListRaises(PathError, t.list_dir, 'c/f')
1195
# 'a' is a file, list_dir should raise an error
1196
self.assertListRaises(PathError, t.list_dir, 'a')
1198
def test_list_dir_result_is_url_escaped(self):
1199
t = self.get_transport()
1200
if not t.listable():
1201
raise TestSkipped("transport not listable")
1203
if not t.is_readonly():
1204
self.build_tree(['a/', 'a/%'], transport=t)
1206
self.build_tree(['a/', 'a/%'])
1208
names = list(t.list_dir('a'))
1209
self.assertEqual(['%25'], names)
1210
self.assertIsInstance(names[0], str)
1212
def test_clone_preserve_info(self):
1213
t1 = self.get_transport()
1214
if not isinstance(t1, ConnectedTransport):
1215
raise TestSkipped("not a connected transport")
1217
t2 = t1.clone('subdir')
1218
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1219
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1220
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1221
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1222
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1224
def test__reuse_for(self):
1225
t = self.get_transport()
1226
if not isinstance(t, ConnectedTransport):
1227
raise TestSkipped("not a connected transport")
1229
def new_url(scheme=None, user=None, password=None,
1230
host=None, port=None, path=None):
1231
"""Build a new url from t.base changing only parts of it.
1233
Only the parameters different from None will be changed.
1235
if scheme is None: scheme = t._parsed_url.scheme
1236
if user is None: user = t._parsed_url.user
1237
if password is None: password = t._parsed_url.password
1238
if user is None: user = t._parsed_url.user
1239
if host is None: host = t._parsed_url.host
1240
if port is None: port = t._parsed_url.port
1241
if path is None: path = t._parsed_url.path
1242
return str(urlutils.URL(scheme, user, password, host, port, path))
1244
if t._parsed_url.scheme == 'ftp':
1248
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1249
if t._parsed_url.user == 'me':
1253
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1254
# passwords are not taken into account because:
1255
# - it makes no sense to have two different valid passwords for the
1257
# - _password in ConnectedTransport is intended to collect what the
1258
# user specified from the command-line and there are cases where the
1259
# new url can contain no password (if the url was built from an
1260
# existing transport.base for example)
1261
# - password are considered part of the credentials provided at
1262
# connection creation time and as such may not be present in the url
1263
# (they may be typed by the user when prompted for example)
1264
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1265
# We will not connect, we can use a invalid host
1266
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1267
if t._parsed_url.port == 1234:
1271
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1272
# No point in trying to reuse a transport for a local URL
1273
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1275
def test_connection_sharing(self):
1276
t = self.get_transport()
1277
if not isinstance(t, ConnectedTransport):
1278
raise TestSkipped("not a connected transport")
1280
c = t.clone('subdir')
1281
# Some transports will create the connection only when needed
1282
t.has('surely_not') # Force connection
1283
self.assertIs(t._get_connection(), c._get_connection())
1285
# Temporary failure, we need to create a new dummy connection
1286
new_connection = None
1287
t._set_connection(new_connection)
1288
# Check that both transports use the same connection
1289
self.assertIs(new_connection, t._get_connection())
1290
self.assertIs(new_connection, c._get_connection())
1292
def test_reuse_connection_for_various_paths(self):
1293
t = self.get_transport()
1294
if not isinstance(t, ConnectedTransport):
1295
raise TestSkipped("not a connected transport")
1297
t.has('surely_not') # Force connection
1298
self.assertIsNot(None, t._get_connection())
1300
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1301
self.assertIsNot(t, subdir)
1302
self.assertIs(t._get_connection(), subdir._get_connection())
1304
home = subdir._reuse_for(t.base + 'home')
1305
self.assertIs(t._get_connection(), home._get_connection())
1306
self.assertIs(subdir._get_connection(), home._get_connection())
1308
def test_clone(self):
1309
# TODO: Test that clone moves up and down the filesystem
1310
t1 = self.get_transport()
1312
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1314
self.assertTrue(t1.has('a'))
1315
self.assertTrue(t1.has('b/c'))
1316
self.assertFalse(t1.has('c'))
1319
self.assertEqual(t1.base + 'b/', t2.base)
1321
self.assertTrue(t2.has('c'))
1322
self.assertFalse(t2.has('a'))
1325
self.assertTrue(t3.has('a'))
1326
self.assertFalse(t3.has('c'))
1328
self.assertFalse(t1.has('b/d'))
1329
self.assertFalse(t2.has('d'))
1330
self.assertFalse(t3.has('b/d'))
1332
if t1.is_readonly():
1333
self.build_tree_contents([('b/d', 'newfile\n')])
1335
t2.put_bytes('d', 'newfile\n')
1337
self.assertTrue(t1.has('b/d'))
1338
self.assertTrue(t2.has('d'))
1339
self.assertTrue(t3.has('b/d'))
1341
def test_clone_to_root(self):
1342
orig_transport = self.get_transport()
1343
# Repeatedly go up to a parent directory until we're at the root
1344
# directory of this transport
1345
root_transport = orig_transport
1346
new_transport = root_transport.clone("..")
1347
# as we are walking up directories, the path must be
1348
# growing less, except at the top
1349
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1350
or new_transport.base == root_transport.base)
1351
while new_transport.base != root_transport.base:
1352
root_transport = new_transport
1353
new_transport = root_transport.clone("..")
1354
# as we are walking up directories, the path must be
1355
# growing less, except at the top
1356
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1357
or new_transport.base == root_transport.base)
1359
# Cloning to "/" should take us to exactly the same location.
1360
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1361
# the abspath of "/" from the original transport should be the same
1362
# as the base at the root:
1363
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1365
# At the root, the URL must still end with / as its a directory
1366
self.assertEqual(root_transport.base[-1], '/')
1368
def test_clone_from_root(self):
1369
"""At the root, cloning to a simple dir should just do string append."""
1370
orig_transport = self.get_transport()
1371
root_transport = orig_transport.clone('/')
1372
self.assertEqual(root_transport.base + '.bzr/',
1373
root_transport.clone('.bzr').base)
1375
def test_base_url(self):
1376
t = self.get_transport()
1377
self.assertEqual('/', t.base[-1])
1379
def test_relpath(self):
1380
t = self.get_transport()
1381
self.assertEqual('', t.relpath(t.base))
1383
self.assertEqual('', t.relpath(t.base[:-1]))
1384
# subdirs which don't exist should still give relpaths.
1385
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1386
# trailing slash should be the same.
1387
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1389
def test_relpath_at_root(self):
1390
t = self.get_transport()
1391
# clone all the way to the top
1392
new_transport = t.clone('..')
1393
while new_transport.base != t.base:
1395
new_transport = t.clone('..')
1396
# we must be able to get a relpath below the root
1397
self.assertEqual('', t.relpath(t.base))
1398
# and a deeper one should work too
1399
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1401
def test_abspath(self):
1402
# smoke test for abspath. Corner cases for backends like unix fs's
1403
# that have aliasing problems like symlinks should go in backend
1404
# specific test cases.
1405
transport = self.get_transport()
1407
self.assertEqual(transport.base + 'relpath',
1408
transport.abspath('relpath'))
1410
# This should work without raising an error.
1411
transport.abspath("/")
1413
# the abspath of "/" and "/foo/.." should result in the same location
1414
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1416
self.assertEqual(transport.clone("/").abspath('foo'),
1417
transport.abspath("/foo"))
1419
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1420
def test_win32_abspath(self):
1421
# Note: we tried to set sys.platform='win32' so we could test on
1422
# other platforms too, but then osutils does platform specific
1423
# things at import time which defeated us...
1424
if sys.platform != 'win32':
1426
'Testing drive letters in abspath implemented only for win32')
1428
# smoke test for abspath on win32.
1429
# a transport based on 'file:///' never fully qualifies the drive.
1430
transport = _mod_transport.get_transport_from_url("file:///")
1431
self.assertEqual(transport.abspath("/"), "file:///")
1433
# but a transport that starts with a drive spec must keep it.
1434
transport = _mod_transport.get_transport_from_url("file:///C:/")
1435
self.assertEqual(transport.abspath("/"), "file:///C:/")
1437
def test_local_abspath(self):
1438
transport = self.get_transport()
1440
p = transport.local_abspath('.')
1441
except (errors.NotLocalUrl, TransportNotPossible), e:
1442
# should be formattable
1445
self.assertEqual(getcwd(), p)
1447
def test_abspath_at_root(self):
1448
t = self.get_transport()
1449
# clone all the way to the top
1450
new_transport = t.clone('..')
1451
while new_transport.base != t.base:
1453
new_transport = t.clone('..')
1454
# we must be able to get a abspath of the root when we ask for
1455
# t.abspath('..') - this due to our choice that clone('..')
1456
# should return the root from the root, combined with the desire that
1457
# the url from clone('..') and from abspath('..') should be the same.
1458
self.assertEqual(t.base, t.abspath('..'))
1459
# '' should give us the root
1460
self.assertEqual(t.base, t.abspath(''))
1461
# and a path should append to the url
1462
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1464
def test_iter_files_recursive(self):
1465
transport = self.get_transport()
1466
if not transport.listable():
1467
self.assertRaises(TransportNotPossible,
1468
transport.iter_files_recursive)
1470
self.build_tree(['isolated/',
1474
'isolated/dir/b%25z', # make sure quoting is correct
1476
transport=transport)
1477
paths = set(transport.iter_files_recursive())
1478
# nb the directories are not converted
1479
self.assertEqual(paths,
1480
set(['isolated/dir/foo',
1482
'isolated/dir/b%2525z',
1484
sub_transport = transport.clone('isolated')
1485
paths = set(sub_transport.iter_files_recursive())
1486
self.assertEqual(paths,
1487
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1489
def test_copy_tree(self):
1490
# TODO: test file contents and permissions are preserved. This test was
1491
# added just to ensure that quoting was handled correctly.
1492
# -- David Allouche 2006-08-11
1493
transport = self.get_transport()
1494
if not transport.listable():
1495
self.assertRaises(TransportNotPossible,
1496
transport.iter_files_recursive)
1498
if transport.is_readonly():
1500
self.build_tree(['from/',
1504
'from/dir/b%25z', # make sure quoting is correct
1506
transport=transport)
1507
transport.copy_tree('from', 'to')
1508
paths = set(transport.iter_files_recursive())
1509
self.assertEqual(paths,
1510
set(['from/dir/foo',
1519
def test_copy_tree_to_transport(self):
1520
transport = self.get_transport()
1521
if not transport.listable():
1522
self.assertRaises(TransportNotPossible,
1523
transport.iter_files_recursive)
1525
if transport.is_readonly():
1527
self.build_tree(['from/',
1531
'from/dir/b%25z', # make sure quoting is correct
1533
transport=transport)
1534
from_transport = transport.clone('from')
1535
to_transport = transport.clone('to')
1536
to_transport.ensure_base()
1537
from_transport.copy_tree_to_transport(to_transport)
1538
paths = set(transport.iter_files_recursive())
1539
self.assertEqual(paths,
1540
set(['from/dir/foo',
1549
def test_unicode_paths(self):
1550
"""Test that we can read/write files with Unicode names."""
1551
t = self.get_transport()
1553
# With FAT32 and certain encodings on win32
1554
# '\xe5' and '\xe4' actually map to the same file
1555
# adding a suffix kicks in the 'preserving but insensitive'
1556
# route, and maintains the right files
1557
files = [u'\xe5.1', # a w/ circle iso-8859-1
1558
u'\xe4.2', # a w/ dots iso-8859-1
1559
u'\u017d', # Z with umlat iso-8859-2
1560
u'\u062c', # Arabic j
1561
u'\u0410', # Russian A
1562
u'\u65e5', # Kanji person
1565
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1566
if no_unicode_support:
1567
self.knownFailure("test server cannot handle unicode paths")
1570
self.build_tree(files, transport=t, line_endings='binary')
1571
except UnicodeError:
1572
raise TestSkipped("cannot handle unicode paths in current encoding")
1574
# A plain unicode string is not a valid url
1576
self.assertRaises(InvalidURL, t.get, fname)
1579
fname_utf8 = fname.encode('utf-8')
1580
contents = 'contents of %s\n' % (fname_utf8,)
1581
self.check_transport_contents(contents, t, urlutils.escape(fname))
1583
def test_connect_twice_is_same_content(self):
1584
# check that our server (whatever it is) is accessible reliably
1585
# via get_transport and multiple connections share content.
1586
transport = self.get_transport()
1587
if transport.is_readonly():
1589
transport.put_bytes('foo', 'bar')
1590
transport3 = self.get_transport()
1591
self.check_transport_contents('bar', transport3, 'foo')
1593
# now opening at a relative url should give use a sane result:
1594
transport.mkdir('newdir')
1595
transport5 = self.get_transport('newdir')
1596
transport6 = transport5.clone('..')
1597
self.check_transport_contents('bar', transport6, 'foo')
1599
def test_lock_write(self):
1600
"""Test transport-level write locks.
1602
These are deprecated and transports may decline to support them.
1604
transport = self.get_transport()
1605
if transport.is_readonly():
1606
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1608
transport.put_bytes('lock', '')
1610
lock = transport.lock_write('lock')
1611
except TransportNotPossible:
1613
# TODO make this consistent on all platforms:
1614
# self.assertRaises(LockError, transport.lock_write, 'lock')
1617
def test_lock_read(self):
1618
"""Test transport-level read locks.
1620
These are deprecated and transports may decline to support them.
1622
transport = self.get_transport()
1623
if transport.is_readonly():
1624
file('lock', 'w').close()
1626
transport.put_bytes('lock', '')
1628
lock = transport.lock_read('lock')
1629
except TransportNotPossible:
1631
# TODO make this consistent on all platforms:
1632
# self.assertRaises(LockError, transport.lock_read, 'lock')
1635
def test_readv(self):
1636
transport = self.get_transport()
1637
if transport.is_readonly():
1638
with file('a', 'w') as f: f.write('0123456789')
1640
transport.put_bytes('a', '0123456789')
1642
d = list(transport.readv('a', ((0, 1),)))
1643
self.assertEqual(d[0], (0, '0'))
1645
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1646
self.assertEqual(d[0], (0, '0'))
1647
self.assertEqual(d[1], (1, '1'))
1648
self.assertEqual(d[2], (3, '34'))
1649
self.assertEqual(d[3], (9, '9'))
1651
def test_readv_out_of_order(self):
1652
transport = self.get_transport()
1653
if transport.is_readonly():
1654
with file('a', 'w') as f: f.write('0123456789')
1656
transport.put_bytes('a', '01234567890')
1658
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1659
self.assertEqual(d[0], (1, '1'))
1660
self.assertEqual(d[1], (9, '9'))
1661
self.assertEqual(d[2], (0, '0'))
1662
self.assertEqual(d[3], (3, '34'))
1664
def test_readv_with_adjust_for_latency(self):
1665
transport = self.get_transport()
1666
# the adjust for latency flag expands the data region returned
1667
# according to a per-transport heuristic, so testing is a little
1668
# tricky as we need more data than the largest combining that our
1669
# transports do. To accomodate this we generate random data and cross
1670
# reference the returned data with the random data. To avoid doing
1671
# multiple large random byte look ups we do several tests on the same
1673
content = osutils.rand_bytes(200*1024)
1674
content_size = len(content)
1675
if transport.is_readonly():
1676
self.build_tree_contents([('a', content)])
1678
transport.put_bytes('a', content)
1679
def check_result_data(result_vector):
1680
for item in result_vector:
1681
data_len = len(item[1])
1682
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1685
result = list(transport.readv('a', ((0, 30),),
1686
adjust_for_latency=True, upper_limit=content_size))
1687
# we expect 1 result, from 0, to something > 30
1688
self.assertEqual(1, len(result))
1689
self.assertEqual(0, result[0][0])
1690
self.assertTrue(len(result[0][1]) >= 30)
1691
check_result_data(result)
1692
# end of file corner case
1693
result = list(transport.readv('a', ((204700, 100),),
1694
adjust_for_latency=True, upper_limit=content_size))
1695
# we expect 1 result, from 204800- its length, to the end
1696
self.assertEqual(1, len(result))
1697
data_len = len(result[0][1])
1698
self.assertEqual(204800-data_len, result[0][0])
1699
self.assertTrue(data_len >= 100)
1700
check_result_data(result)
1701
# out of order ranges are made in order
1702
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1703
adjust_for_latency=True, upper_limit=content_size))
1704
# we expect 2 results, in order, start and end.
1705
self.assertEqual(2, len(result))
1707
data_len = len(result[0][1])
1708
self.assertEqual(0, result[0][0])
1709
self.assertTrue(data_len >= 30)
1711
data_len = len(result[1][1])
1712
self.assertEqual(204800-data_len, result[1][0])
1713
self.assertTrue(data_len >= 100)
1714
check_result_data(result)
1715
# close ranges get combined (even if out of order)
1716
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1717
result = list(transport.readv('a', request_vector,
1718
adjust_for_latency=True, upper_limit=content_size))
1719
self.assertEqual(1, len(result))
1720
data_len = len(result[0][1])
1721
# minimum length is from 400 to 1034 - 634
1722
self.assertTrue(data_len >= 634)
1723
# must contain the region 400 to 1034
1724
self.assertTrue(result[0][0] <= 400)
1725
self.assertTrue(result[0][0] + data_len >= 1034)
1726
check_result_data(result)
1728
def test_readv_with_adjust_for_latency_with_big_file(self):
1729
transport = self.get_transport()
1730
# test from observed failure case.
1731
if transport.is_readonly():
1732
with file('a', 'w') as f: f.write('a'*1024*1024)
1734
transport.put_bytes('a', 'a'*1024*1024)
1735
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1736
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1737
(465373, 800), (947422, 800)]
1738
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1739
found_items = [False]*9
1740
for pos, (start, length) in enumerate(broken_vector):
1741
# check the range is covered by the result
1742
for offset, data in results:
1743
if offset <= start and start + length <= offset + len(data):
1744
found_items[pos] = True
1745
self.assertEqual([True]*9, found_items)
1747
def test_get_with_open_write_stream_sees_all_content(self):
1748
t = self.get_transport()
1751
handle = t.open_write_stream('foo')
1754
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1758
def test_get_smart_medium(self):
1759
"""All transports must either give a smart medium, or know they can't.
1761
transport = self.get_transport()
1763
client_medium = transport.get_smart_medium()
1764
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1765
except errors.NoSmartMedium:
1766
# as long as we got it we're fine
1769
def test_readv_short_read(self):
1770
transport = self.get_transport()
1771
if transport.is_readonly():
1772
with file('a', 'w') as f: f.write('0123456789')
1774
transport.put_bytes('a', '01234567890')
1776
# This is intentionally reading off the end of the file
1777
# since we are sure that it cannot get there
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1779
# Can be raised by paramiko
1781
transport.readv, 'a', [(1,1), (8,10)])
1783
# This is trying to seek past the end of the file, it should
1784
# also raise a special error
1785
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1786
transport.readv, 'a', [(12,2)])
1788
def test_no_segment_parameters(self):
1789
"""Segment parameters should be stripped and stored in
1790
transport.segment_parameters."""
1791
transport = self.get_transport("foo")
1792
self.assertEquals({}, transport.get_segment_parameters())
1794
def test_segment_parameters(self):
1795
"""Segment parameters should be stripped and stored in
1796
transport.get_segment_parameters()."""
1797
base_url = self._server.get_url()
1798
parameters = {"key1": "val1", "key2": "val2"}
1799
url = urlutils.join_segment_parameters(base_url, parameters)
1800
transport = _mod_transport.get_transport_from_url(url)
1801
self.assertEquals(parameters, transport.get_segment_parameters())
1803
def test_set_segment_parameters(self):
1804
"""Segment parameters can be set and show up in base."""
1805
transport = self.get_transport("foo")
1806
orig_base = transport.base
1807
transport.set_segment_parameter("arm", "board")
1808
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1809
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1810
transport.set_segment_parameter("arm", None)
1811
transport.set_segment_parameter("nonexistant", None)
1812
self.assertEquals({}, transport.get_segment_parameters())
1813
self.assertEquals(orig_base, transport.base)
1815
def test_stat_symlink(self):
1816
# if a transport points directly to a symlink (and supports symlinks
1817
# at all) you can tell this. helps with bug 32669.
1818
t = self.get_transport()
1820
t.symlink('target', 'link')
1821
except TransportNotPossible:
1822
raise TestSkipped("symlinks not supported")
1823
t2 = t.clone('link')
1825
self.assertTrue(stat.S_ISLNK(st.st_mode))
1827
def test_abspath_url_unquote_unreserved(self):
1828
"""URLs from abspath should have unreserved characters unquoted
1830
Need consistent quoting notably for tildes, see lp:842223 for more.
1832
t = self.get_transport()
1833
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1834
self.assertEqual(t.base + "-.09AZ_az~",
1835
t.abspath(needlessly_escaped_dir))
1837
def test_clone_url_unquote_unreserved(self):
1838
"""Base URL of a cloned branch needs unreserved characters unquoted
1840
Cloned transports should be prefix comparable for things like the
1841
isolation checking of tests, see lp:842223 for more.
1843
t1 = self.get_transport()
1844
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1845
self.build_tree([needlessly_escaped_dir], transport=t1)
1846
t2 = t1.clone(needlessly_escaped_dir)
1847
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1849
def test_hook_post_connection_one(self):
1850
"""Fire post_connect hook after a ConnectedTransport is first used"""
1852
Transport.hooks.install_named_hook("post_connect", log.append, None)
1853
t = self.get_transport()
1854
self.assertEqual([], log)
1855
t.has("non-existant")
1856
if isinstance(t, RemoteTransport):
1857
self.assertEqual([t.get_smart_medium()], log)
1858
elif isinstance(t, ConnectedTransport):
1859
self.assertEqual([t], log)
1861
self.assertEqual([], log)
1863
def test_hook_post_connection_multi(self):
1864
"""Fire post_connect hook once per unshared underlying connection"""
1866
Transport.hooks.install_named_hook("post_connect", log.append, None)
1867
t1 = self.get_transport()
1869
t3 = self.get_transport()
1870
self.assertEqual([], log)
1874
if isinstance(t1, RemoteTransport):
1875
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1876
elif isinstance(t1, ConnectedTransport):
1877
self.assertEqual([t1, t3], log)
1879
self.assertEqual([], log)