1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
57
_get_transport_modules,
59
from bzrlib.transport.memory import MemoryTransport
60
from bzrlib.transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent, e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(standard_tests, module, loader):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BZR_NO_SMART_VFS', None)
105
def check_transport_contents(self, content, transport, relpath):
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
150
t = self.get_transport()
152
files = ['a', 'b', 'e', 'g', '%']
153
self.build_tree(files, transport=t)
154
self.assertEqual(True, t.has('a'))
155
self.assertEqual(False, t.has('c'))
156
self.assertEqual(True, t.has(urlutils.escape('%')))
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
161
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
168
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
171
def test_has_root_works(self):
172
if self.transport_server is test_server.SmartTCPServer_for_testing:
173
raise TestNotApplicable(
174
"SmartTCPServer_for_testing intentionally does not allow "
176
current_transport = self.get_transport()
177
self.assertTrue(current_transport.has('/'))
178
root = current_transport.clone('/')
179
self.assertTrue(root.has(''))
182
t = self.get_transport()
184
files = ['a', 'b', 'e', 'g']
185
contents = ['contents of a\n',
190
self.build_tree(files, transport=t, line_endings='binary')
191
self.check_transport_contents('contents of a\n', t, 'a')
192
content_f = t.get_multi(files)
193
# Use itertools.izip() instead of use zip() or map(), since they fully
194
# evaluate their inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in itertools.izip(contents, content_f):
197
self.assertEqual(content, f.read())
199
content_f = t.get_multi(iter(files))
200
# Use itertools.izip() for the same reason
201
for content, f in itertools.izip(contents, content_f):
202
self.assertEqual(content, f.read())
204
def test_get_unknown_file(self):
205
t = self.get_transport()
207
contents = ['contents of a\n',
210
self.build_tree(files, transport=t, line_endings='binary')
211
self.assertRaises(NoSuchFile, t.get, 'c')
212
def iterate_and_close(func, *args):
213
for f in func(*args):
214
# We call f.read() here because things like paramiko actually
215
# spawn a thread to prefetch the content, which we want to
216
# consume before we close the handle.
219
self.assertRaises(NoSuchFile, iterate_and_close,
220
t.get_multi, ['a', 'b', 'c'])
221
self.assertRaises(NoSuchFile, iterate_and_close,
222
t.get_multi, iter(['a', 'b', 'c']))
224
def test_get_directory_read_gives_ReadError(self):
225
"""consistent errors for read() on a file returned by get()."""
226
t = self.get_transport()
228
self.build_tree(['a directory/'])
230
t.mkdir('a%20directory')
231
# getting the file must either work or fail with a PathError
233
a_file = t.get('a%20directory')
234
except (errors.PathError, errors.RedirectRequested):
235
# early failure return immediately.
237
# having got a file, read() must either work (i.e. http reading a dir
238
# listing) or fail with ReadError
241
except errors.ReadError:
244
def test_get_bytes(self):
245
t = self.get_transport()
247
files = ['a', 'b', 'e', 'g']
248
contents = ['contents of a\n',
253
self.build_tree(files, transport=t, line_endings='binary')
254
self.check_transport_contents('contents of a\n', t, 'a')
256
for content, fname in zip(contents, files):
257
self.assertEqual(content, t.get_bytes(fname))
259
def test_get_bytes_unknown_file(self):
260
t = self.get_transport()
261
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
263
def test_get_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
274
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
t = self.get_transport()
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
290
def test_put_bytes(self):
291
t = self.get_transport()
294
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
299
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
302
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
306
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
309
def test_put_bytes_non_atomic(self):
310
t = self.get_transport()
313
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
317
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
319
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
321
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
325
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
# Now test the create_parent flag
335
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
# But we still get NoSuchFile if we can't make the parent dir
343
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
347
def test_put_bytes_permissions(self):
348
t = self.get_transport()
352
if not t._can_roundtrip_unix_modebits():
353
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0644)
356
self.assertTransportMode(t, 'mode644', 0644)
357
t.put_bytes('mode666', 'test text\n', mode=0666)
358
self.assertTransportMode(t, 'mode666', 0666)
359
t.put_bytes('mode600', 'test text\n', mode=0600)
360
self.assertTransportMode(t, 'mode600', 0600)
361
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0400)
363
self.assertTransportMode(t, 'mode400', 0400)
365
# The default permissions should be based on the current umask
366
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
368
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
370
def test_put_bytes_non_atomic_permissions(self):
371
t = self.get_transport()
375
if not t._can_roundtrip_unix_modebits():
376
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
self.assertTransportMode(t, 'mode644', 0644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
self.assertTransportMode(t, 'mode666', 0666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
self.assertTransportMode(t, 'mode600', 0600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
self.assertTransportMode(t, 'mode400', 0400)
387
# The default permissions should be based on the current umask
388
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
392
# We should also be able to set the mode for a parent directory
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
dir_mode=0700, create_parent_dir=True)
396
self.assertTransportMode(t, 'dir700', 0700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
dir_mode=0770, create_parent_dir=True)
399
self.assertTransportMode(t, 'dir770', 0770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
dir_mode=0777, create_parent_dir=True)
402
self.assertTransportMode(t, 'dir777', 0777)
404
def test_put_file(self):
405
t = self.get_transport()
408
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', StringIO('some text for a\n'))
412
result = t.put_file('a', StringIO('some text for a\n'))
413
# put_file returns the length of the data written
414
self.assertEqual(16, result)
415
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
417
# Put also replaces contents
418
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
self.assertRaises(NoSuchFile,
422
t.put_file, 'path/doesnt/exist/c',
423
StringIO('contents'))
425
def test_put_file_non_atomic(self):
426
t = self.get_transport()
429
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
433
self.assertFalse(t.has('a'))
434
t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
437
# Put also replaces contents
438
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
441
# Make sure we can create another file
442
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
# And overwrite 'a' with empty contents
444
t.put_file_non_atomic('a', StringIO(''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
StringIO('contents\n'))
450
# Now test the create_parent flag
451
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
StringIO('contents\n'))
453
self.assertFalse(t.has('dir/a'))
454
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
458
# But we still get NoSuchFile if we can't make the parent dir
459
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
StringIO('contents\n'),
461
create_parent_dir=True)
463
def test_put_file_permissions(self):
465
t = self.get_transport()
469
if not t._can_roundtrip_unix_modebits():
470
# Can't roundtrip, so no need to run this test
472
t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
self.assertTransportMode(t, 'mode644', 0644)
474
t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
self.assertTransportMode(t, 'mode666', 0666)
476
t.put_file('mode600', StringIO('test text\n'), mode=0600)
477
self.assertTransportMode(t, 'mode600', 0600)
478
# Yes, you can put a file such that it becomes readonly
479
t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
self.assertTransportMode(t, 'mode400', 0400)
481
# The default permissions should be based on the current umask
482
umask = osutils.get_umask()
483
t.put_file('nomode', StringIO('test text\n'), mode=None)
484
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
486
def test_put_file_non_atomic_permissions(self):
487
t = self.get_transport()
491
if not t._can_roundtrip_unix_modebits():
492
# Can't roundtrip, so no need to run this test
494
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
self.assertTransportMode(t, 'mode644', 0644)
496
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
self.assertTransportMode(t, 'mode666', 0666)
498
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
self.assertTransportMode(t, 'mode600', 0600)
500
# Yes, you can put_file_non_atomic a file such that it becomes readonly
501
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
self.assertTransportMode(t, 'mode400', 0400)
504
# The default permissions should be based on the current umask
505
umask = osutils.get_umask()
506
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
509
# We should also be able to set the mode for a parent directory
512
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
dir_mode=0700, create_parent_dir=True)
514
self.assertTransportMode(t, 'dir700', 0700)
515
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
dir_mode=0770, create_parent_dir=True)
517
self.assertTransportMode(t, 'dir770', 0770)
518
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
dir_mode=0777, create_parent_dir=True)
520
self.assertTransportMode(t, 'dir777', 0777)
522
def test_put_bytes_unicode(self):
523
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
525
# (we don't want to encode unicode here at all, callers should be
526
# strictly passing bytes to put_bytes), but we allow it for backwards
527
# compatibility. At some point we should use a specific exception.
528
# See https://bugs.launchpad.net/bzr/+bug/106898.
529
t = self.get_transport()
532
unicode_string = u'\u1234'
534
(AssertionError, UnicodeEncodeError),
535
t.put_bytes, 'foo', unicode_string)
537
def test_put_file_unicode(self):
538
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
# This situation can happen (and has) if code is careless about the type
540
# of "string" they initialise/write to a StringIO with. We cannot use
541
# cStringIO, because it never returns unicode from read.
542
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
# raise, but we raise it for hysterical raisins.
544
t = self.get_transport()
547
unicode_file = pyStringIO(u'\u1234')
548
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
550
def test_mkdir(self):
551
t = self.get_transport()
554
# cannot mkdir on readonly transports. We're not testing for
555
# cache coherency because cache behaviour is not currently
556
# defined for the transport interface.
557
self.assertRaises(TransportNotPossible, t.mkdir, '.')
558
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
559
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
560
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
564
self.assertEqual(t.has('dir_a'), True)
565
self.assertEqual(t.has('dir_b'), False)
568
self.assertEqual(t.has('dir_b'), True)
570
t.mkdir_multi(['dir_c', 'dir_d'])
572
t.mkdir_multi(iter(['dir_e', 'dir_f']))
573
self.assertEqual(list(t.has_multi(
574
['dir_a', 'dir_b', 'dir_c', 'dir_q',
575
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
576
[True, True, True, False,
577
True, True, True, True])
579
# we were testing that a local mkdir followed by a transport
580
# mkdir failed thusly, but given that we * in one process * do not
581
# concurrently fiddle with disk dirs and then use transport to do
582
# things, the win here seems marginal compared to the constraint on
583
# the interface. RBC 20051227
585
self.assertRaises(FileExists, t.mkdir, 'dir_g')
587
# Test get/put in sub-directories
588
t.put_bytes('dir_a/a', 'contents of dir_a/a')
589
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
590
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
591
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
593
# mkdir of a dir with an absent parent
594
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
596
def test_mkdir_permissions(self):
597
t = self.get_transport()
600
if not t._can_roundtrip_unix_modebits():
601
# no sense testing on this transport
603
# Test mkdir with a mode
604
t.mkdir('dmode755', mode=0755)
605
self.assertTransportMode(t, 'dmode755', 0755)
606
t.mkdir('dmode555', mode=0555)
607
self.assertTransportMode(t, 'dmode555', 0555)
608
t.mkdir('dmode777', mode=0777)
609
self.assertTransportMode(t, 'dmode777', 0777)
610
t.mkdir('dmode700', mode=0700)
611
self.assertTransportMode(t, 'dmode700', 0700)
612
t.mkdir_multi(['mdmode755'], mode=0755)
613
self.assertTransportMode(t, 'mdmode755', 0755)
615
# Default mode should be based on umask
616
umask = osutils.get_umask()
617
t.mkdir('dnomode', mode=None)
618
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
620
def test_opening_a_file_stream_creates_file(self):
621
t = self.get_transport()
624
handle = t.open_write_stream('foo')
626
self.assertEqual('', t.get_bytes('foo'))
630
def test_opening_a_file_stream_can_set_mode(self):
631
t = self.get_transport()
634
if not t._can_roundtrip_unix_modebits():
635
# Can't roundtrip, so no need to run this test
637
def check_mode(name, mode, expected):
638
handle = t.open_write_stream(name, mode=mode)
640
self.assertTransportMode(t, name, expected)
641
check_mode('mode644', 0644, 0644)
642
check_mode('mode666', 0666, 0666)
643
check_mode('mode600', 0600, 0600)
644
# The default permissions should be based on the current umask
645
check_mode('nomode', None, 0666 & ~osutils.get_umask())
647
def test_copy_to(self):
648
# FIXME: test: same server to same server (partly done)
649
# same protocol two servers
650
# and different protocols (done for now except for MemoryTransport.
653
def simple_copy_files(transport_from, transport_to):
654
files = ['a', 'b', 'c', 'd']
655
self.build_tree(files, transport=transport_from)
656
self.assertEqual(4, transport_from.copy_to(files, transport_to))
658
self.check_transport_contents(transport_to.get_bytes(f),
661
t = self.get_transport()
662
temp_transport = MemoryTransport('memory:///')
663
simple_copy_files(t, temp_transport)
664
if not t.is_readonly():
665
t.mkdir('copy_to_simple')
666
t2 = t.clone('copy_to_simple')
667
simple_copy_files(t, t2)
670
# Test that copying into a missing directory raises
673
self.build_tree(['e/', 'e/f'])
676
t.put_bytes('e/f', 'contents of e')
677
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
678
temp_transport.mkdir('e')
679
t.copy_to(['e/f'], temp_transport)
682
temp_transport = MemoryTransport('memory:///')
684
files = ['a', 'b', 'c', 'd']
685
t.copy_to(iter(files), temp_transport)
687
self.check_transport_contents(temp_transport.get_bytes(f),
691
for mode in (0666, 0644, 0600, 0400):
692
temp_transport = MemoryTransport("memory:///")
693
t.copy_to(files, temp_transport, mode=mode)
695
self.assertTransportMode(temp_transport, f, mode)
697
def test_create_prefix(self):
698
t = self.get_transport()
699
sub = t.clone('foo').clone('bar')
702
except TransportNotPossible:
703
self.assertTrue(t.is_readonly())
705
self.assertTrue(t.has('foo/bar'))
707
def test_append_file(self):
708
t = self.get_transport()
711
self.assertRaises(TransportNotPossible,
712
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
714
t.put_bytes('a', 'diff\ncontents for\na\n')
715
t.put_bytes('b', 'contents\nfor b\n')
718
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
720
self.check_transport_contents(
721
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
724
# a file with no parent should fail..
725
self.assertRaises(NoSuchFile,
726
t.append_file, 'missing/path', StringIO('content'))
728
# And we can create new files, too
730
t.append_file('c', StringIO('some text\nfor a missing file\n')))
731
self.check_transport_contents('some text\nfor a missing file\n',
734
def test_append_bytes(self):
735
t = self.get_transport()
738
self.assertRaises(TransportNotPossible,
739
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
742
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
743
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
746
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
748
self.check_transport_contents(
749
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
752
# a file with no parent should fail..
753
self.assertRaises(NoSuchFile,
754
t.append_bytes, 'missing/path', 'content')
756
def test_append_multi(self):
757
t = self.get_transport()
761
t.put_bytes('a', 'diff\ncontents for\na\n'
762
'add\nsome\nmore\ncontents\n')
763
t.put_bytes('b', 'contents\nfor b\n')
765
self.assertEqual((43, 15),
766
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
767
('b', StringIO('some\nmore\nfor\nb\n'))]))
769
self.check_transport_contents(
770
'diff\ncontents for\na\n'
771
'add\nsome\nmore\ncontents\n'
772
'and\nthen\nsome\nmore\n',
774
self.check_transport_contents(
776
'some\nmore\nfor\nb\n',
779
self.assertEqual((62, 31),
780
t.append_multi(iter([('a', StringIO('a little bit more\n')),
781
('b', StringIO('from an iterator\n'))])))
782
self.check_transport_contents(
783
'diff\ncontents for\na\n'
784
'add\nsome\nmore\ncontents\n'
785
'and\nthen\nsome\nmore\n'
786
'a little bit more\n',
788
self.check_transport_contents(
790
'some\nmore\nfor\nb\n'
791
'from an iterator\n',
794
self.assertEqual((80, 0),
795
t.append_multi([('a', StringIO('some text in a\n')),
796
('d', StringIO('missing file r\n'))]))
798
self.check_transport_contents(
799
'diff\ncontents for\na\n'
800
'add\nsome\nmore\ncontents\n'
801
'and\nthen\nsome\nmore\n'
802
'a little bit more\n'
805
self.check_transport_contents('missing file r\n', t, 'd')
807
def test_append_file_mode(self):
808
"""Check that append accepts a mode parameter"""
809
# check append accepts a mode
810
t = self.get_transport()
812
self.assertRaises(TransportNotPossible,
813
t.append_file, 'f', StringIO('f'), mode=None)
815
t.append_file('f', StringIO('f'), mode=None)
817
def test_append_bytes_mode(self):
818
# check append_bytes accepts a mode
819
t = self.get_transport()
821
self.assertRaises(TransportNotPossible,
822
t.append_bytes, 'f', 'f', mode=None)
824
t.append_bytes('f', 'f', mode=None)
826
def test_delete(self):
827
# TODO: Test Transport.delete
828
t = self.get_transport()
830
# Not much to do with a readonly transport
832
self.assertRaises(TransportNotPossible, t.delete, 'missing')
835
t.put_bytes('a', 'a little bit of text\n')
836
self.assertTrue(t.has('a'))
838
self.assertFalse(t.has('a'))
840
self.assertRaises(NoSuchFile, t.delete, 'a')
842
t.put_bytes('a', 'a text\n')
843
t.put_bytes('b', 'b text\n')
844
t.put_bytes('c', 'c text\n')
845
self.assertEqual([True, True, True],
846
list(t.has_multi(['a', 'b', 'c'])))
847
t.delete_multi(['a', 'c'])
848
self.assertEqual([False, True, False],
849
list(t.has_multi(['a', 'b', 'c'])))
850
self.assertFalse(t.has('a'))
851
self.assertTrue(t.has('b'))
852
self.assertFalse(t.has('c'))
854
self.assertRaises(NoSuchFile,
855
t.delete_multi, ['a', 'b', 'c'])
857
self.assertRaises(NoSuchFile,
858
t.delete_multi, iter(['a', 'b', 'c']))
860
t.put_bytes('a', 'another a text\n')
861
t.put_bytes('c', 'another c text\n')
862
t.delete_multi(iter(['a', 'b', 'c']))
864
# We should have deleted everything
865
# SftpServer creates control files in the
866
# working directory, so we can just do a
868
# self.assertEqual([], os.listdir('.'))
870
def test_recommended_page_size(self):
871
"""Transports recommend a page size for partial access to files."""
872
t = self.get_transport()
873
self.assertIsInstance(t.recommended_page_size(), int)
875
def test_rmdir(self):
876
t = self.get_transport()
877
# Not much to do with a readonly transport
879
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
884
# ftp may not be able to raise NoSuchFile for lack of
885
# details when failing
886
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
888
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
890
def test_rmdir_not_empty(self):
891
"""Deleting a non-empty directory raises an exception
893
sftp (and possibly others) don't give us a specific "directory not
894
empty" exception -- we can just see that the operation failed.
896
t = self.get_transport()
901
self.assertRaises(PathError, t.rmdir, 'adir')
903
def test_rmdir_empty_but_similar_prefix(self):
904
"""rmdir does not get confused by sibling paths.
906
A naive implementation of MemoryTransport would refuse to rmdir
907
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
908
uses "path.startswith(dir)" on all file paths to determine if directory
911
t = self.get_transport()
915
t.put_bytes('foo-bar', '')
918
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
919
self.assertTrue(t.has('foo-bar'))
921
def test_rename_dir_succeeds(self):
922
t = self.get_transport()
924
raise TestSkipped("transport is readonly")
926
t.mkdir('adir/asubdir')
927
t.rename('adir', 'bdir')
928
self.assertTrue(t.has('bdir/asubdir'))
929
self.assertFalse(t.has('adir'))
931
def test_rename_dir_nonempty(self):
932
"""Attempting to replace a nonemtpy directory should fail"""
933
t = self.get_transport()
935
raise TestSkipped("transport is readonly")
937
t.mkdir('adir/asubdir')
939
t.mkdir('bdir/bsubdir')
940
# any kind of PathError would be OK, though we normally expect
942
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
943
# nothing was changed so it should still be as before
944
self.assertTrue(t.has('bdir/bsubdir'))
945
self.assertFalse(t.has('adir/bdir'))
946
self.assertFalse(t.has('adir/bsubdir'))
948
def test_rename_across_subdirs(self):
949
t = self.get_transport()
951
raise TestNotApplicable("transport is readonly")
956
ta.put_bytes('f', 'aoeu')
957
ta.rename('f', '../b/f')
958
self.assertTrue(tb.has('f'))
959
self.assertFalse(ta.has('f'))
960
self.assertTrue(t.has('b/f'))
962
def test_delete_tree(self):
963
t = self.get_transport()
965
# Not much to do with a readonly transport
967
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
970
# and does it like listing ?
973
t.delete_tree('adir')
974
except TransportNotPossible:
975
# ok, this transport does not support delete_tree
978
# did it delete that trivial case?
979
self.assertRaises(NoSuchFile, t.stat, 'adir')
981
self.build_tree(['adir/',
989
t.delete_tree('adir')
990
# adir should be gone now.
991
self.assertRaises(NoSuchFile, t.stat, 'adir')
994
t = self.get_transport()
999
# TODO: I would like to use os.listdir() to
1000
# make sure there are no extra files, but SftpServer
1001
# creates control files in the working directory
1002
# perhaps all of this could be done in a subdirectory
1004
t.put_bytes('a', 'a first file\n')
1005
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1008
self.assertTrue(t.has('b'))
1009
self.assertFalse(t.has('a'))
1011
self.check_transport_contents('a first file\n', t, 'b')
1012
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1015
t.put_bytes('c', 'c this file\n')
1017
self.assertFalse(t.has('c'))
1018
self.check_transport_contents('c this file\n', t, 'b')
1020
# TODO: Try to write a test for atomicity
1021
# TODO: Test moving into a non-existent subdirectory
1022
# TODO: Test Transport.move_multi
1024
def test_copy(self):
1025
t = self.get_transport()
1030
t.put_bytes('a', 'a file\n')
1032
self.check_transport_contents('a file\n', t, 'b')
1034
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1036
# What should the assert be if you try to copy a
1037
# file over a directory?
1038
#self.assertRaises(Something, t.copy, 'a', 'c')
1039
t.put_bytes('d', 'text in d\n')
1041
self.check_transport_contents('text in d\n', t, 'b')
1043
# TODO: test copy_multi
1045
def test_connection_error(self):
1046
"""ConnectionError is raised when connection is impossible.
1048
The error should be raised from the first operation on the transport.
1051
url = self._server.get_bogus_url()
1052
except NotImplementedError:
1053
raise TestSkipped("Transport %s has no bogus URL support." %
1054
self._server.__class__)
1055
t = _mod_transport.get_transport_from_url(url)
1056
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1058
def test_stat(self):
1059
# TODO: Test stat, just try once, and if it throws, stop testing
1060
from stat import S_ISDIR, S_ISREG
1062
t = self.get_transport()
1066
except TransportNotPossible, e:
1067
# This transport cannot stat
1070
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1071
sizes = [14, 0, 16, 0, 18]
1072
self.build_tree(paths, transport=t, line_endings='binary')
1074
for path, size in zip(paths, sizes):
1076
if path.endswith('/'):
1077
self.assertTrue(S_ISDIR(st.st_mode))
1078
# directory sizes are meaningless
1080
self.assertTrue(S_ISREG(st.st_mode))
1081
self.assertEqual(size, st.st_size)
1083
remote_stats = list(t.stat_multi(paths))
1084
remote_iter_stats = list(t.stat_multi(iter(paths)))
1086
self.assertRaises(NoSuchFile, t.stat, 'q')
1087
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1089
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1090
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1091
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1092
subdir = t.clone('subdir')
1093
st = subdir.stat('./file')
1094
st = subdir.stat('.')
1096
def test_hardlink(self):
1097
from stat import ST_NLINK
1099
t = self.get_transport()
1101
source_name = "original_target"
1102
link_name = "target_link"
1104
self.build_tree([source_name], transport=t)
1107
t.hardlink(source_name, link_name)
1109
self.assertTrue(t.has(source_name))
1110
self.assertTrue(t.has(link_name))
1112
st = t.stat(link_name)
1113
self.assertEqual(st[ST_NLINK], 2)
1114
except TransportNotPossible:
1115
raise TestSkipped("Transport %s does not support hardlinks." %
1116
self._server.__class__)
1118
def test_symlink(self):
1119
from stat import S_ISLNK
1121
t = self.get_transport()
1123
source_name = "original_target"
1124
link_name = "target_link"
1126
self.build_tree([source_name], transport=t)
1129
t.symlink(source_name, link_name)
1131
self.assertTrue(t.has(source_name))
1132
self.assertTrue(t.has(link_name))
1134
st = t.stat(link_name)
1135
self.assertTrue(S_ISLNK(st.st_mode),
1136
"expected symlink, got mode %o" % st.st_mode)
1137
except TransportNotPossible:
1138
raise TestSkipped("Transport %s does not support symlinks." %
1139
self._server.__class__)
1141
self.knownFailure("Paramiko fails to create symlinks during tests")
1143
def test_list_dir(self):
1144
# TODO: Test list_dir, just try once, and if it throws, stop testing
1145
t = self.get_transport()
1147
if not t.listable():
1148
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1151
def sorted_list(d, transport):
1152
l = list(transport.list_dir(d))
1156
self.assertEqual([], sorted_list('.', t))
1157
# c2 is precisely one letter longer than c here to test that
1158
# suffixing is not confused.
1159
# a%25b checks that quoting is done consistently across transports
1160
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1162
if not t.is_readonly():
1163
self.build_tree(tree_names, transport=t)
1165
self.build_tree(tree_names)
1168
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1170
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1171
self.assertEqual(['d', 'e'], sorted_list('c', t))
1173
# Cloning the transport produces an equivalent listing
1174
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1176
if not t.is_readonly():
1183
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1184
self.assertEqual(['e'], sorted_list('c', t))
1186
self.assertListRaises(PathError, t.list_dir, 'q')
1187
self.assertListRaises(PathError, t.list_dir, 'c/f')
1188
# 'a' is a file, list_dir should raise an error
1189
self.assertListRaises(PathError, t.list_dir, 'a')
1191
def test_list_dir_result_is_url_escaped(self):
1192
t = self.get_transport()
1193
if not t.listable():
1194
raise TestSkipped("transport not listable")
1196
if not t.is_readonly():
1197
self.build_tree(['a/', 'a/%'], transport=t)
1199
self.build_tree(['a/', 'a/%'])
1201
names = list(t.list_dir('a'))
1202
self.assertEqual(['%25'], names)
1203
self.assertIsInstance(names[0], str)
1205
def test_clone_preserve_info(self):
1206
t1 = self.get_transport()
1207
if not isinstance(t1, ConnectedTransport):
1208
raise TestSkipped("not a connected transport")
1210
t2 = t1.clone('subdir')
1211
self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1217
def test__reuse_for(self):
1218
t = self.get_transport()
1219
if not isinstance(t, ConnectedTransport):
1220
raise TestSkipped("not a connected transport")
1222
def new_url(scheme=None, user=None, password=None,
1223
host=None, port=None, path=None):
1224
"""Build a new url from t.base changing only parts of it.
1226
Only the parameters different from None will be changed.
1228
if scheme is None: scheme = t._parsed_url.scheme
1229
if user is None: user = t._parsed_url.user
1230
if password is None: password = t._parsed_url.password
1231
if user is None: user = t._parsed_url.user
1232
if host is None: host = t._parsed_url.host
1233
if port is None: port = t._parsed_url.port
1234
if path is None: path = t._parsed_url.path
1235
return str(urlutils.URL(scheme, user, password, host, port, path))
1237
if t._parsed_url.scheme == 'ftp':
1241
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
if t._parsed_url.user == 'me':
1246
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
# passwords are not taken into account because:
1248
# - it makes no sense to have two different valid passwords for the
1250
# - _password in ConnectedTransport is intended to collect what the
1251
# user specified from the command-line and there are cases where the
1252
# new url can contain no password (if the url was built from an
1253
# existing transport.base for example)
1254
# - password are considered part of the credentials provided at
1255
# connection creation time and as such may not be present in the url
1256
# (they may be typed by the user when prompted for example)
1257
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
# We will not connect, we can use a invalid host
1259
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
if t._parsed_url.port == 1234:
1264
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
# No point in trying to reuse a transport for a local URL
1266
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1268
def test_connection_sharing(self):
1269
t = self.get_transport()
1270
if not isinstance(t, ConnectedTransport):
1271
raise TestSkipped("not a connected transport")
1273
c = t.clone('subdir')
1274
# Some transports will create the connection only when needed
1275
t.has('surely_not') # Force connection
1276
self.assertIs(t._get_connection(), c._get_connection())
1278
# Temporary failure, we need to create a new dummy connection
1279
new_connection = None
1280
t._set_connection(new_connection)
1281
# Check that both transports use the same connection
1282
self.assertIs(new_connection, t._get_connection())
1283
self.assertIs(new_connection, c._get_connection())
1285
def test_reuse_connection_for_various_paths(self):
1286
t = self.get_transport()
1287
if not isinstance(t, ConnectedTransport):
1288
raise TestSkipped("not a connected transport")
1290
t.has('surely_not') # Force connection
1291
self.assertIsNot(None, t._get_connection())
1293
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
self.assertIsNot(t, subdir)
1295
self.assertIs(t._get_connection(), subdir._get_connection())
1297
home = subdir._reuse_for(t.base + 'home')
1298
self.assertIs(t._get_connection(), home._get_connection())
1299
self.assertIs(subdir._get_connection(), home._get_connection())
1301
def test_clone(self):
1302
# TODO: Test that clone moves up and down the filesystem
1303
t1 = self.get_transport()
1305
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1307
self.assertTrue(t1.has('a'))
1308
self.assertTrue(t1.has('b/c'))
1309
self.assertFalse(t1.has('c'))
1312
self.assertEqual(t1.base + 'b/', t2.base)
1314
self.assertTrue(t2.has('c'))
1315
self.assertFalse(t2.has('a'))
1318
self.assertTrue(t3.has('a'))
1319
self.assertFalse(t3.has('c'))
1321
self.assertFalse(t1.has('b/d'))
1322
self.assertFalse(t2.has('d'))
1323
self.assertFalse(t3.has('b/d'))
1325
if t1.is_readonly():
1326
self.build_tree_contents([('b/d', 'newfile\n')])
1328
t2.put_bytes('d', 'newfile\n')
1330
self.assertTrue(t1.has('b/d'))
1331
self.assertTrue(t2.has('d'))
1332
self.assertTrue(t3.has('b/d'))
1334
def test_clone_to_root(self):
1335
orig_transport = self.get_transport()
1336
# Repeatedly go up to a parent directory until we're at the root
1337
# directory of this transport
1338
root_transport = orig_transport
1339
new_transport = root_transport.clone("..")
1340
# as we are walking up directories, the path must be
1341
# growing less, except at the top
1342
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1343
or new_transport.base == root_transport.base)
1344
while new_transport.base != root_transport.base:
1345
root_transport = new_transport
1346
new_transport = root_transport.clone("..")
1347
# as we are walking up directories, the path must be
1348
# growing less, except at the top
1349
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1350
or new_transport.base == root_transport.base)
1352
# Cloning to "/" should take us to exactly the same location.
1353
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1354
# the abspath of "/" from the original transport should be the same
1355
# as the base at the root:
1356
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1358
# At the root, the URL must still end with / as its a directory
1359
self.assertEqual(root_transport.base[-1], '/')
1361
def test_clone_from_root(self):
1362
"""At the root, cloning to a simple dir should just do string append."""
1363
orig_transport = self.get_transport()
1364
root_transport = orig_transport.clone('/')
1365
self.assertEqual(root_transport.base + '.bzr/',
1366
root_transport.clone('.bzr').base)
1368
def test_base_url(self):
1369
t = self.get_transport()
1370
self.assertEqual('/', t.base[-1])
1372
def test_relpath(self):
1373
t = self.get_transport()
1374
self.assertEqual('', t.relpath(t.base))
1376
self.assertEqual('', t.relpath(t.base[:-1]))
1377
# subdirs which don't exist should still give relpaths.
1378
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1379
# trailing slash should be the same.
1380
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1382
def test_relpath_at_root(self):
1383
t = self.get_transport()
1384
# clone all the way to the top
1385
new_transport = t.clone('..')
1386
while new_transport.base != t.base:
1388
new_transport = t.clone('..')
1389
# we must be able to get a relpath below the root
1390
self.assertEqual('', t.relpath(t.base))
1391
# and a deeper one should work too
1392
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1394
def test_abspath(self):
1395
# smoke test for abspath. Corner cases for backends like unix fs's
1396
# that have aliasing problems like symlinks should go in backend
1397
# specific test cases.
1398
transport = self.get_transport()
1400
self.assertEqual(transport.base + 'relpath',
1401
transport.abspath('relpath'))
1403
# This should work without raising an error.
1404
transport.abspath("/")
1406
# the abspath of "/" and "/foo/.." should result in the same location
1407
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1409
self.assertEqual(transport.clone("/").abspath('foo'),
1410
transport.abspath("/foo"))
1412
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1413
def test_win32_abspath(self):
1414
# Note: we tried to set sys.platform='win32' so we could test on
1415
# other platforms too, but then osutils does platform specific
1416
# things at import time which defeated us...
1417
if sys.platform != 'win32':
1419
'Testing drive letters in abspath implemented only for win32')
1421
# smoke test for abspath on win32.
1422
# a transport based on 'file:///' never fully qualifies the drive.
1423
transport = _mod_transport.get_transport_from_url("file:///")
1424
self.assertEqual(transport.abspath("/"), "file:///")
1426
# but a transport that starts with a drive spec must keep it.
1427
transport = _mod_transport.get_transport_from_url("file:///C:/")
1428
self.assertEqual(transport.abspath("/"), "file:///C:/")
1430
def test_local_abspath(self):
1431
transport = self.get_transport()
1433
p = transport.local_abspath('.')
1434
except (errors.NotLocalUrl, TransportNotPossible), e:
1435
# should be formattable
1438
self.assertEqual(getcwd(), p)
1440
def test_abspath_at_root(self):
1441
t = self.get_transport()
1442
# clone all the way to the top
1443
new_transport = t.clone('..')
1444
while new_transport.base != t.base:
1446
new_transport = t.clone('..')
1447
# we must be able to get a abspath of the root when we ask for
1448
# t.abspath('..') - this due to our choice that clone('..')
1449
# should return the root from the root, combined with the desire that
1450
# the url from clone('..') and from abspath('..') should be the same.
1451
self.assertEqual(t.base, t.abspath('..'))
1452
# '' should give us the root
1453
self.assertEqual(t.base, t.abspath(''))
1454
# and a path should append to the url
1455
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1457
def test_iter_files_recursive(self):
1458
transport = self.get_transport()
1459
if not transport.listable():
1460
self.assertRaises(TransportNotPossible,
1461
transport.iter_files_recursive)
1463
self.build_tree(['isolated/',
1467
'isolated/dir/b%25z', # make sure quoting is correct
1469
transport=transport)
1470
paths = set(transport.iter_files_recursive())
1471
# nb the directories are not converted
1472
self.assertEqual(paths,
1473
set(['isolated/dir/foo',
1475
'isolated/dir/b%2525z',
1477
sub_transport = transport.clone('isolated')
1478
paths = set(sub_transport.iter_files_recursive())
1479
self.assertEqual(paths,
1480
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1482
def test_copy_tree(self):
1483
# TODO: test file contents and permissions are preserved. This test was
1484
# added just to ensure that quoting was handled correctly.
1485
# -- David Allouche 2006-08-11
1486
transport = self.get_transport()
1487
if not transport.listable():
1488
self.assertRaises(TransportNotPossible,
1489
transport.iter_files_recursive)
1491
if transport.is_readonly():
1493
self.build_tree(['from/',
1497
'from/dir/b%25z', # make sure quoting is correct
1499
transport=transport)
1500
transport.copy_tree('from', 'to')
1501
paths = set(transport.iter_files_recursive())
1502
self.assertEqual(paths,
1503
set(['from/dir/foo',
1512
def test_copy_tree_to_transport(self):
1513
transport = self.get_transport()
1514
if not transport.listable():
1515
self.assertRaises(TransportNotPossible,
1516
transport.iter_files_recursive)
1518
if transport.is_readonly():
1520
self.build_tree(['from/',
1524
'from/dir/b%25z', # make sure quoting is correct
1526
transport=transport)
1527
from_transport = transport.clone('from')
1528
to_transport = transport.clone('to')
1529
to_transport.ensure_base()
1530
from_transport.copy_tree_to_transport(to_transport)
1531
paths = set(transport.iter_files_recursive())
1532
self.assertEqual(paths,
1533
set(['from/dir/foo',
1542
def test_unicode_paths(self):
1543
"""Test that we can read/write files with Unicode names."""
1544
t = self.get_transport()
1546
# With FAT32 and certain encodings on win32
1547
# '\xe5' and '\xe4' actually map to the same file
1548
# adding a suffix kicks in the 'preserving but insensitive'
1549
# route, and maintains the right files
1550
files = [u'\xe5.1', # a w/ circle iso-8859-1
1551
u'\xe4.2', # a w/ dots iso-8859-1
1552
u'\u017d', # Z with umlat iso-8859-2
1553
u'\u062c', # Arabic j
1554
u'\u0410', # Russian A
1555
u'\u65e5', # Kanji person
1558
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1559
if no_unicode_support:
1560
self.knownFailure("test server cannot handle unicode paths")
1563
self.build_tree(files, transport=t, line_endings='binary')
1564
except UnicodeError:
1565
raise TestSkipped("cannot handle unicode paths in current encoding")
1567
# A plain unicode string is not a valid url
1569
self.assertRaises(InvalidURL, t.get, fname)
1572
fname_utf8 = fname.encode('utf-8')
1573
contents = 'contents of %s\n' % (fname_utf8,)
1574
self.check_transport_contents(contents, t, urlutils.escape(fname))
1576
def test_connect_twice_is_same_content(self):
1577
# check that our server (whatever it is) is accessible reliably
1578
# via get_transport and multiple connections share content.
1579
transport = self.get_transport()
1580
if transport.is_readonly():
1582
transport.put_bytes('foo', 'bar')
1583
transport3 = self.get_transport()
1584
self.check_transport_contents('bar', transport3, 'foo')
1586
# now opening at a relative url should give use a sane result:
1587
transport.mkdir('newdir')
1588
transport5 = self.get_transport('newdir')
1589
transport6 = transport5.clone('..')
1590
self.check_transport_contents('bar', transport6, 'foo')
1592
def test_lock_write(self):
1593
"""Test transport-level write locks.
1595
These are deprecated and transports may decline to support them.
1597
transport = self.get_transport()
1598
if transport.is_readonly():
1599
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1601
transport.put_bytes('lock', '')
1603
lock = transport.lock_write('lock')
1604
except TransportNotPossible:
1606
# TODO make this consistent on all platforms:
1607
# self.assertRaises(LockError, transport.lock_write, 'lock')
1610
def test_lock_read(self):
1611
"""Test transport-level read locks.
1613
These are deprecated and transports may decline to support them.
1615
transport = self.get_transport()
1616
if transport.is_readonly():
1617
file('lock', 'w').close()
1619
transport.put_bytes('lock', '')
1621
lock = transport.lock_read('lock')
1622
except TransportNotPossible:
1624
# TODO make this consistent on all platforms:
1625
# self.assertRaises(LockError, transport.lock_read, 'lock')
1628
def test_readv(self):
1629
transport = self.get_transport()
1630
if transport.is_readonly():
1631
with file('a', 'w') as f: f.write('0123456789')
1633
transport.put_bytes('a', '0123456789')
1635
d = list(transport.readv('a', ((0, 1),)))
1636
self.assertEqual(d[0], (0, '0'))
1638
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1639
self.assertEqual(d[0], (0, '0'))
1640
self.assertEqual(d[1], (1, '1'))
1641
self.assertEqual(d[2], (3, '34'))
1642
self.assertEqual(d[3], (9, '9'))
1644
def test_readv_out_of_order(self):
1645
transport = self.get_transport()
1646
if transport.is_readonly():
1647
with file('a', 'w') as f: f.write('0123456789')
1649
transport.put_bytes('a', '01234567890')
1651
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1652
self.assertEqual(d[0], (1, '1'))
1653
self.assertEqual(d[1], (9, '9'))
1654
self.assertEqual(d[2], (0, '0'))
1655
self.assertEqual(d[3], (3, '34'))
1657
def test_readv_with_adjust_for_latency(self):
1658
transport = self.get_transport()
1659
# the adjust for latency flag expands the data region returned
1660
# according to a per-transport heuristic, so testing is a little
1661
# tricky as we need more data than the largest combining that our
1662
# transports do. To accomodate this we generate random data and cross
1663
# reference the returned data with the random data. To avoid doing
1664
# multiple large random byte look ups we do several tests on the same
1666
content = osutils.rand_bytes(200*1024)
1667
content_size = len(content)
1668
if transport.is_readonly():
1669
self.build_tree_contents([('a', content)])
1671
transport.put_bytes('a', content)
1672
def check_result_data(result_vector):
1673
for item in result_vector:
1674
data_len = len(item[1])
1675
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1678
result = list(transport.readv('a', ((0, 30),),
1679
adjust_for_latency=True, upper_limit=content_size))
1680
# we expect 1 result, from 0, to something > 30
1681
self.assertEqual(1, len(result))
1682
self.assertEqual(0, result[0][0])
1683
self.assertTrue(len(result[0][1]) >= 30)
1684
check_result_data(result)
1685
# end of file corner case
1686
result = list(transport.readv('a', ((204700, 100),),
1687
adjust_for_latency=True, upper_limit=content_size))
1688
# we expect 1 result, from 204800- its length, to the end
1689
self.assertEqual(1, len(result))
1690
data_len = len(result[0][1])
1691
self.assertEqual(204800-data_len, result[0][0])
1692
self.assertTrue(data_len >= 100)
1693
check_result_data(result)
1694
# out of order ranges are made in order
1695
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
adjust_for_latency=True, upper_limit=content_size))
1697
# we expect 2 results, in order, start and end.
1698
self.assertEqual(2, len(result))
1700
data_len = len(result[0][1])
1701
self.assertEqual(0, result[0][0])
1702
self.assertTrue(data_len >= 30)
1704
data_len = len(result[1][1])
1705
self.assertEqual(204800-data_len, result[1][0])
1706
self.assertTrue(data_len >= 100)
1707
check_result_data(result)
1708
# close ranges get combined (even if out of order)
1709
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
result = list(transport.readv('a', request_vector,
1711
adjust_for_latency=True, upper_limit=content_size))
1712
self.assertEqual(1, len(result))
1713
data_len = len(result[0][1])
1714
# minimum length is from 400 to 1034 - 634
1715
self.assertTrue(data_len >= 634)
1716
# must contain the region 400 to 1034
1717
self.assertTrue(result[0][0] <= 400)
1718
self.assertTrue(result[0][0] + data_len >= 1034)
1719
check_result_data(result)
1721
def test_readv_with_adjust_for_latency_with_big_file(self):
1722
transport = self.get_transport()
1723
# test from observed failure case.
1724
if transport.is_readonly():
1725
with file('a', 'w') as f: f.write('a'*1024*1024)
1727
transport.put_bytes('a', 'a'*1024*1024)
1728
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
(465373, 800), (947422, 800)]
1731
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
found_items = [False]*9
1733
for pos, (start, length) in enumerate(broken_vector):
1734
# check the range is covered by the result
1735
for offset, data in results:
1736
if offset <= start and start + length <= offset + len(data):
1737
found_items[pos] = True
1738
self.assertEqual([True]*9, found_items)
1740
def test_get_with_open_write_stream_sees_all_content(self):
1741
t = self.get_transport()
1744
handle = t.open_write_stream('foo')
1747
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1751
def test_get_smart_medium(self):
1752
"""All transports must either give a smart medium, or know they can't.
1754
transport = self.get_transport()
1756
client_medium = transport.get_smart_medium()
1757
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1758
except errors.NoSmartMedium:
1759
# as long as we got it we're fine
1762
def test_readv_short_read(self):
1763
transport = self.get_transport()
1764
if transport.is_readonly():
1765
with file('a', 'w') as f: f.write('0123456789')
1767
transport.put_bytes('a', '01234567890')
1769
# This is intentionally reading off the end of the file
1770
# since we are sure that it cannot get there
1771
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1772
# Can be raised by paramiko
1774
transport.readv, 'a', [(1,1), (8,10)])
1776
# This is trying to seek past the end of the file, it should
1777
# also raise a special error
1778
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1779
transport.readv, 'a', [(12,2)])
1781
def test_no_segment_parameters(self):
1782
"""Segment parameters should be stripped and stored in
1783
transport.segment_parameters."""
1784
transport = self.get_transport("foo")
1785
self.assertEquals({}, transport.get_segment_parameters())
1787
def test_segment_parameters(self):
1788
"""Segment parameters should be stripped and stored in
1789
transport.get_segment_parameters()."""
1790
base_url = self._server.get_url()
1791
parameters = {"key1": "val1", "key2": "val2"}
1792
url = urlutils.join_segment_parameters(base_url, parameters)
1793
transport = _mod_transport.get_transport_from_url(url)
1794
self.assertEquals(parameters, transport.get_segment_parameters())
1796
def test_set_segment_parameters(self):
1797
"""Segment parameters can be set and show up in base."""
1798
transport = self.get_transport("foo")
1799
orig_base = transport.base
1800
transport.set_segment_parameter("arm", "board")
1801
self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
transport.set_segment_parameter("arm", None)
1804
transport.set_segment_parameter("nonexistant", None)
1805
self.assertEquals({}, transport.get_segment_parameters())
1806
self.assertEquals(orig_base, transport.base)
1808
def test_stat_symlink(self):
1809
# if a transport points directly to a symlink (and supports symlinks
1810
# at all) you can tell this. helps with bug 32669.
1811
t = self.get_transport()
1813
t.symlink('target', 'link')
1814
except TransportNotPossible:
1815
raise TestSkipped("symlinks not supported")
1816
t2 = t.clone('link')
1818
self.assertTrue(stat.S_ISLNK(st.st_mode))
1820
def test_abspath_url_unquote_unreserved(self):
1821
"""URLs from abspath should have unreserved characters unquoted
1823
Need consistent quoting notably for tildes, see lp:842223 for more.
1825
t = self.get_transport()
1826
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
self.assertEqual(t.base + "-.09AZ_az~",
1828
t.abspath(needlessly_escaped_dir))
1830
def test_clone_url_unquote_unreserved(self):
1831
"""Base URL of a cloned branch needs unreserved characters unquoted
1833
Cloned transports should be prefix comparable for things like the
1834
isolation checking of tests, see lp:842223 for more.
1836
t1 = self.get_transport()
1837
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
self.build_tree([needlessly_escaped_dir], transport=t1)
1839
t2 = t1.clone(needlessly_escaped_dir)
1840
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1842
def test_hook_post_connection_one(self):
1843
"""Fire post_connect hook after a ConnectedTransport is first used"""
1845
Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
t = self.get_transport()
1847
self.assertEqual([], log)
1848
t.has("non-existant")
1849
if isinstance(t, RemoteTransport):
1850
self.assertEqual([t.get_smart_medium()], log)
1851
elif isinstance(t, ConnectedTransport):
1852
self.assertEqual([t], log)
1854
self.assertEqual([], log)
1856
def test_hook_post_connection_multi(self):
1857
"""Fire post_connect hook once per unshared underlying connection"""
1859
Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
t1 = self.get_transport()
1862
t3 = self.get_transport()
1863
self.assertEqual([], log)
1867
if isinstance(t1, RemoteTransport):
1868
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
elif isinstance(t1, ConnectedTransport):
1870
self.assertEqual([t1, t3], log)
1872
self.assertEqual([], log)