1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
37
from bzrlib.errors import (ConnectionError,
47
from bzrlib.osutils import getcwd
48
from bzrlib.smart import medium
49
from bzrlib.tests import (
55
from bzrlib.tests import test_server
56
from bzrlib.tests.test_transport import TestTransportImplementation
57
from bzrlib.transport import (
60
_get_transport_modules,
62
from bzrlib.transport.memory import MemoryTransport
65
def get_transport_test_permutations(module):
66
"""Get the permutations module wants to have tested."""
67
if getattr(module, 'get_test_permutations', None) is None:
69
"transport module %s doesn't provide get_test_permutations()"
72
return module.get_test_permutations()
75
def transport_test_permutations():
76
"""Return a list of the klass, server_factory pairs to test."""
78
for module in _get_transport_modules():
80
permutations = get_transport_test_permutations(
81
reduce(getattr, (module).split('.')[1:], __import__(module)))
82
for (klass, server_factory) in permutations:
83
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
{"transport_class":klass,
85
"transport_server":server_factory})
86
result.append(scenario)
87
except errors.DependencyNotPresent, e:
88
# Continue even if a dependency prevents us
89
# from adding this test
94
def load_tests(standard_tests, module, loader):
95
"""Multiply tests for tranport implementations."""
96
result = loader.suiteClass()
97
scenarios = transport_test_permutations()
98
return multiply_tests(standard_tests, scenarios, result)
101
class TransportTests(TestTransportImplementation):
104
super(TransportTests, self).setUp()
105
self._captureVar('BZR_NO_SMART_VFS', None)
107
def check_transport_contents(self, content, transport, relpath):
108
"""Check that transport.get(relpath).read() == content."""
109
self.assertEqualDiff(content, transport.get(relpath).read())
111
def test_ensure_base_missing(self):
112
""".ensure_base() should create the directory if it doesn't exist"""
113
t = self.get_transport()
115
if t_a.is_readonly():
116
self.assertRaises(TransportNotPossible,
119
self.assertTrue(t_a.ensure_base())
120
self.assertTrue(t.has('a'))
122
def test_ensure_base_exists(self):
123
""".ensure_base() should just be happy if it already exists"""
124
t = self.get_transport()
130
# ensure_base returns False if it didn't create the base
131
self.assertFalse(t_a.ensure_base())
133
def test_ensure_base_missing_parent(self):
134
""".ensure_base() will fail if the parent dir doesn't exist"""
135
t = self.get_transport()
141
self.assertRaises(NoSuchFile, t_b.ensure_base)
143
def test_external_url(self):
144
""".external_url either works or raises InProcessTransport."""
145
t = self.get_transport()
148
except errors.InProcessTransport:
152
t = self.get_transport()
154
files = ['a', 'b', 'e', 'g', '%']
155
self.build_tree(files, transport=t)
156
self.assertEqual(True, t.has('a'))
157
self.assertEqual(False, t.has('c'))
158
self.assertEqual(True, t.has(urlutils.escape('%')))
159
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
'e', 'f', 'g', 'h'])),
161
[True, True, False, False,
162
True, False, True, False])
163
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
urlutils.escape('%%')]))
166
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
'e', 'f', 'g', 'h']))),
168
[True, True, False, False,
169
True, False, True, False])
170
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
173
def test_has_root_works(self):
174
if self.transport_server is test_server.SmartTCPServer_for_testing:
175
raise TestNotApplicable(
176
"SmartTCPServer_for_testing intentionally does not allow "
178
current_transport = self.get_transport()
179
self.assertTrue(current_transport.has('/'))
180
root = current_transport.clone('/')
181
self.assertTrue(root.has(''))
184
t = self.get_transport()
186
files = ['a', 'b', 'e', 'g']
187
contents = ['contents of a\n',
192
self.build_tree(files, transport=t, line_endings='binary')
193
self.check_transport_contents('contents of a\n', t, 'a')
194
content_f = t.get_multi(files)
195
# Use itertools.izip() instead of use zip() or map(), since they fully
196
# evaluate their inputs, the transport requests should be issued and
197
# handled sequentially (we don't want to force transport to buffer).
198
for content, f in itertools.izip(contents, content_f):
199
self.assertEqual(content, f.read())
201
content_f = t.get_multi(iter(files))
202
# Use itertools.izip() for the same reason
203
for content, f in itertools.izip(contents, content_f):
204
self.assertEqual(content, f.read())
206
def test_get_unknown_file(self):
207
t = self.get_transport()
209
contents = ['contents of a\n',
212
self.build_tree(files, transport=t, line_endings='binary')
213
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
217
def test_get_directory_read_gives_ReadError(self):
218
"""consistent errors for read() on a file returned by get()."""
219
t = self.get_transport()
221
self.build_tree(['a directory/'])
223
t.mkdir('a%20directory')
224
# getting the file must either work or fail with a PathError
226
a_file = t.get('a%20directory')
227
except (errors.PathError, errors.RedirectRequested):
228
# early failure return immediately.
230
# having got a file, read() must either work (i.e. http reading a dir
231
# listing) or fail with ReadError
234
except errors.ReadError:
237
def test_get_bytes(self):
238
t = self.get_transport()
240
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
246
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
249
for content, fname in zip(contents, files):
250
self.assertEqual(content, t.get_bytes(fname))
252
def test_get_bytes_unknown_file(self):
253
t = self.get_transport()
255
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
257
def test_get_with_open_write_stream_sees_all_content(self):
258
t = self.get_transport()
261
handle = t.open_write_stream('foo')
264
self.assertEqual('b', t.get('foo').read())
268
def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
t = self.get_transport()
272
handle = t.open_write_stream('foo')
275
self.assertEqual('b', t.get_bytes('foo'))
276
self.assertEqual('b', t.get('foo').read())
280
def test_put_bytes(self):
281
t = self.get_transport()
284
self.assertRaises(TransportNotPossible,
285
t.put_bytes, 'a', 'some text for a\n')
288
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
290
self.check_transport_contents('some text for a\n', t, 'a')
292
# The contents should be overwritten
293
t.put_bytes('a', 'new text for a\n')
294
self.check_transport_contents('new text for a\n', t, 'a')
296
self.assertRaises(NoSuchFile,
297
t.put_bytes, 'path/doesnt/exist/c', 'contents')
299
def test_put_bytes_non_atomic(self):
300
t = self.get_transport()
303
self.assertRaises(TransportNotPossible,
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
307
self.failIf(t.has('a'))
308
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
310
self.check_transport_contents('some text for a\n', t, 'a')
311
# Put also replaces contents
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
315
# Make sure we can create another file
316
t.put_bytes_non_atomic('d', 'contents for\nd\n')
317
# And overwrite 'a' with empty contents
318
t.put_bytes_non_atomic('a', '')
319
self.check_transport_contents('contents for\nd\n', t, 'd')
320
self.check_transport_contents('', t, 'a')
322
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
324
# Now test the create_parent flag
325
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
create_parent_dir=True)
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
332
# But we still get NoSuchFile if we can't make the parent dir
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
335
create_parent_dir=True)
337
def test_put_bytes_permissions(self):
338
t = self.get_transport()
342
if not t._can_roundtrip_unix_modebits():
343
# Can't roundtrip, so no need to run this test
345
t.put_bytes('mode644', 'test text\n', mode=0644)
346
self.assertTransportMode(t, 'mode644', 0644)
347
t.put_bytes('mode666', 'test text\n', mode=0666)
348
self.assertTransportMode(t, 'mode666', 0666)
349
t.put_bytes('mode600', 'test text\n', mode=0600)
350
self.assertTransportMode(t, 'mode600', 0600)
351
# Yes, you can put_bytes a file such that it becomes readonly
352
t.put_bytes('mode400', 'test text\n', mode=0400)
353
self.assertTransportMode(t, 'mode400', 0400)
355
# The default permissions should be based on the current umask
356
umask = osutils.get_umask()
357
t.put_bytes('nomode', 'test text\n', mode=None)
358
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
360
def test_put_bytes_non_atomic_permissions(self):
361
t = self.get_transport()
365
if not t._can_roundtrip_unix_modebits():
366
# Can't roundtrip, so no need to run this test
368
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
self.assertTransportMode(t, 'mode644', 0644)
370
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
self.assertTransportMode(t, 'mode666', 0666)
372
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
self.assertTransportMode(t, 'mode600', 0600)
374
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
self.assertTransportMode(t, 'mode400', 0400)
377
# The default permissions should be based on the current umask
378
umask = osutils.get_umask()
379
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
382
# We should also be able to set the mode for a parent directory
384
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
dir_mode=0700, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir700', 0700)
387
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
dir_mode=0770, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir770', 0770)
390
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
dir_mode=0777, create_parent_dir=True)
392
self.assertTransportMode(t, 'dir777', 0777)
394
def test_put_file(self):
395
t = self.get_transport()
398
self.assertRaises(TransportNotPossible,
399
t.put_file, 'a', StringIO('some text for a\n'))
402
result = t.put_file('a', StringIO('some text for a\n'))
403
# put_file returns the length of the data written
404
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
406
self.check_transport_contents('some text for a\n', t, 'a')
407
# Put also replaces contents
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
409
self.assertEqual(19, result)
410
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
self.assertRaises(NoSuchFile,
412
t.put_file, 'path/doesnt/exist/c',
413
StringIO('contents'))
415
def test_put_file_non_atomic(self):
416
t = self.get_transport()
419
self.assertRaises(TransportNotPossible,
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
423
self.failIf(t.has('a'))
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
426
self.check_transport_contents('some text for a\n', t, 'a')
427
# Put also replaces contents
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
431
# Make sure we can create another file
432
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
433
# And overwrite 'a' with empty contents
434
t.put_file_non_atomic('a', StringIO(''))
435
self.check_transport_contents('contents for\nd\n', t, 'd')
436
self.check_transport_contents('', t, 'a')
438
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
StringIO('contents\n'))
440
# Now test the create_parent flag
441
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
create_parent_dir=True)
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
448
# But we still get NoSuchFile if we can't make the parent dir
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
StringIO('contents\n'),
451
create_parent_dir=True)
453
def test_put_file_permissions(self):
455
t = self.get_transport()
459
if not t._can_roundtrip_unix_modebits():
460
# Can't roundtrip, so no need to run this test
462
t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
self.assertTransportMode(t, 'mode644', 0644)
464
t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
self.assertTransportMode(t, 'mode666', 0666)
466
t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
self.assertTransportMode(t, 'mode600', 0600)
468
# Yes, you can put a file such that it becomes readonly
469
t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
self.assertTransportMode(t, 'mode400', 0400)
471
# The default permissions should be based on the current umask
472
umask = osutils.get_umask()
473
t.put_file('nomode', StringIO('test text\n'), mode=None)
474
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
476
def test_put_file_non_atomic_permissions(self):
477
t = self.get_transport()
481
if not t._can_roundtrip_unix_modebits():
482
# Can't roundtrip, so no need to run this test
484
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
self.assertTransportMode(t, 'mode644', 0644)
486
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
self.assertTransportMode(t, 'mode666', 0666)
488
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
self.assertTransportMode(t, 'mode600', 0600)
490
# Yes, you can put_file_non_atomic a file such that it becomes readonly
491
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
self.assertTransportMode(t, 'mode400', 0400)
494
# The default permissions should be based on the current umask
495
umask = osutils.get_umask()
496
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
499
# We should also be able to set the mode for a parent directory
502
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
dir_mode=0700, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir700', 0700)
505
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
dir_mode=0770, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir770', 0770)
508
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
dir_mode=0777, create_parent_dir=True)
510
self.assertTransportMode(t, 'dir777', 0777)
512
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
t = self.get_transport()
522
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
540
def test_mkdir(self):
541
t = self.get_transport()
544
# cannot mkdir on readonly transports. We're not testing for
545
# cache coherency because cache behaviour is not currently
546
# defined for the transport interface.
547
self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
554
self.assertEqual(t.has('dir_a'), True)
555
self.assertEqual(t.has('dir_b'), False)
558
self.assertEqual(t.has('dir_b'), True)
560
t.mkdir_multi(['dir_c', 'dir_d'])
562
t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
self.assertEqual(list(t.has_multi(
564
['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
[True, True, True, False,
567
True, True, True, True])
569
# we were testing that a local mkdir followed by a transport
570
# mkdir failed thusly, but given that we * in one process * do not
571
# concurrently fiddle with disk dirs and then use transport to do
572
# things, the win here seems marginal compared to the constraint on
573
# the interface. RBC 20051227
575
self.assertRaises(FileExists, t.mkdir, 'dir_g')
577
# Test get/put in sub-directories
578
t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
583
# mkdir of a dir with an absent parent
584
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
586
def test_mkdir_permissions(self):
587
t = self.get_transport()
590
if not t._can_roundtrip_unix_modebits():
591
# no sense testing on this transport
593
# Test mkdir with a mode
594
t.mkdir('dmode755', mode=0755)
595
self.assertTransportMode(t, 'dmode755', 0755)
596
t.mkdir('dmode555', mode=0555)
597
self.assertTransportMode(t, 'dmode555', 0555)
598
t.mkdir('dmode777', mode=0777)
599
self.assertTransportMode(t, 'dmode777', 0777)
600
t.mkdir('dmode700', mode=0700)
601
self.assertTransportMode(t, 'dmode700', 0700)
602
t.mkdir_multi(['mdmode755'], mode=0755)
603
self.assertTransportMode(t, 'mdmode755', 0755)
605
# Default mode should be based on umask
606
umask = osutils.get_umask()
607
t.mkdir('dnomode', mode=None)
608
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
610
def test_opening_a_file_stream_creates_file(self):
611
t = self.get_transport()
614
handle = t.open_write_stream('foo')
616
self.assertEqual('', t.get_bytes('foo'))
620
def test_opening_a_file_stream_can_set_mode(self):
621
t = self.get_transport()
624
if not t._can_roundtrip_unix_modebits():
625
# Can't roundtrip, so no need to run this test
627
def check_mode(name, mode, expected):
628
handle = t.open_write_stream(name, mode=mode)
630
self.assertTransportMode(t, name, expected)
631
check_mode('mode644', 0644, 0644)
632
check_mode('mode666', 0666, 0666)
633
check_mode('mode600', 0600, 0600)
634
# The default permissions should be based on the current umask
635
check_mode('nomode', None, 0666 & ~osutils.get_umask())
637
def test_copy_to(self):
638
# FIXME: test: same server to same server (partly done)
639
# same protocol two servers
640
# and different protocols (done for now except for MemoryTransport.
643
def simple_copy_files(transport_from, transport_to):
644
files = ['a', 'b', 'c', 'd']
645
self.build_tree(files, transport=transport_from)
646
self.assertEqual(4, transport_from.copy_to(files, transport_to))
648
self.check_transport_contents(transport_to.get(f).read(),
651
t = self.get_transport()
652
temp_transport = MemoryTransport('memory:///')
653
simple_copy_files(t, temp_transport)
654
if not t.is_readonly():
655
t.mkdir('copy_to_simple')
656
t2 = t.clone('copy_to_simple')
657
simple_copy_files(t, t2)
660
# Test that copying into a missing directory raises
663
self.build_tree(['e/', 'e/f'])
666
t.put_bytes('e/f', 'contents of e')
667
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
temp_transport.mkdir('e')
669
t.copy_to(['e/f'], temp_transport)
672
temp_transport = MemoryTransport('memory:///')
674
files = ['a', 'b', 'c', 'd']
675
t.copy_to(iter(files), temp_transport)
677
self.check_transport_contents(temp_transport.get(f).read(),
681
for mode in (0666, 0644, 0600, 0400):
682
temp_transport = MemoryTransport("memory:///")
683
t.copy_to(files, temp_transport, mode=mode)
685
self.assertTransportMode(temp_transport, f, mode)
687
def test_create_prefix(self):
688
t = self.get_transport()
689
sub = t.clone('foo').clone('bar')
692
except TransportNotPossible:
693
self.assertTrue(t.is_readonly())
695
self.assertTrue(t.has('foo/bar'))
697
def test_append_file(self):
698
t = self.get_transport()
701
self.assertRaises(TransportNotPossible,
702
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
704
t.put_bytes('a', 'diff\ncontents for\na\n')
705
t.put_bytes('b', 'contents\nfor b\n')
708
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
710
self.check_transport_contents(
711
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
714
# a file with no parent should fail..
715
self.assertRaises(NoSuchFile,
716
t.append_file, 'missing/path', StringIO('content'))
718
# And we can create new files, too
720
t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
self.check_transport_contents('some text\nfor a missing file\n',
724
def test_append_bytes(self):
725
t = self.get_transport()
728
self.assertRaises(TransportNotPossible,
729
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
732
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
736
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
738
self.check_transport_contents(
739
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
742
# a file with no parent should fail..
743
self.assertRaises(NoSuchFile,
744
t.append_bytes, 'missing/path', 'content')
746
def test_append_multi(self):
747
t = self.get_transport()
751
t.put_bytes('a', 'diff\ncontents for\na\n'
752
'add\nsome\nmore\ncontents\n')
753
t.put_bytes('b', 'contents\nfor b\n')
755
self.assertEqual((43, 15),
756
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
('b', StringIO('some\nmore\nfor\nb\n'))]))
759
self.check_transport_contents(
760
'diff\ncontents for\na\n'
761
'add\nsome\nmore\ncontents\n'
762
'and\nthen\nsome\nmore\n',
764
self.check_transport_contents(
766
'some\nmore\nfor\nb\n',
769
self.assertEqual((62, 31),
770
t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
('b', StringIO('from an iterator\n'))])))
772
self.check_transport_contents(
773
'diff\ncontents for\na\n'
774
'add\nsome\nmore\ncontents\n'
775
'and\nthen\nsome\nmore\n'
776
'a little bit more\n',
778
self.check_transport_contents(
780
'some\nmore\nfor\nb\n'
781
'from an iterator\n',
784
self.assertEqual((80, 0),
785
t.append_multi([('a', StringIO('some text in a\n')),
786
('d', StringIO('missing file r\n'))]))
788
self.check_transport_contents(
789
'diff\ncontents for\na\n'
790
'add\nsome\nmore\ncontents\n'
791
'and\nthen\nsome\nmore\n'
792
'a little bit more\n'
795
self.check_transport_contents('missing file r\n', t, 'd')
797
def test_append_file_mode(self):
798
"""Check that append accepts a mode parameter"""
799
# check append accepts a mode
800
t = self.get_transport()
802
self.assertRaises(TransportNotPossible,
803
t.append_file, 'f', StringIO('f'), mode=None)
805
t.append_file('f', StringIO('f'), mode=None)
807
def test_append_bytes_mode(self):
808
# check append_bytes accepts a mode
809
t = self.get_transport()
811
self.assertRaises(TransportNotPossible,
812
t.append_bytes, 'f', 'f', mode=None)
814
t.append_bytes('f', 'f', mode=None)
816
def test_delete(self):
817
# TODO: Test Transport.delete
818
t = self.get_transport()
820
# Not much to do with a readonly transport
822
self.assertRaises(TransportNotPossible, t.delete, 'missing')
825
t.put_bytes('a', 'a little bit of text\n')
826
self.failUnless(t.has('a'))
828
self.failIf(t.has('a'))
830
self.assertRaises(NoSuchFile, t.delete, 'a')
832
t.put_bytes('a', 'a text\n')
833
t.put_bytes('b', 'b text\n')
834
t.put_bytes('c', 'c text\n')
835
self.assertEqual([True, True, True],
836
list(t.has_multi(['a', 'b', 'c'])))
837
t.delete_multi(['a', 'c'])
838
self.assertEqual([False, True, False],
839
list(t.has_multi(['a', 'b', 'c'])))
840
self.failIf(t.has('a'))
841
self.failUnless(t.has('b'))
842
self.failIf(t.has('c'))
844
self.assertRaises(NoSuchFile,
845
t.delete_multi, ['a', 'b', 'c'])
847
self.assertRaises(NoSuchFile,
848
t.delete_multi, iter(['a', 'b', 'c']))
850
t.put_bytes('a', 'another a text\n')
851
t.put_bytes('c', 'another c text\n')
852
t.delete_multi(iter(['a', 'b', 'c']))
854
# We should have deleted everything
855
# SftpServer creates control files in the
856
# working directory, so we can just do a
858
# self.assertEqual([], os.listdir('.'))
860
def test_recommended_page_size(self):
861
"""Transports recommend a page size for partial access to files."""
862
t = self.get_transport()
863
self.assertIsInstance(t.recommended_page_size(), int)
865
def test_rmdir(self):
866
t = self.get_transport()
867
# Not much to do with a readonly transport
869
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
874
# ftp may not be able to raise NoSuchFile for lack of
875
# details when failing
876
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
878
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
880
def test_rmdir_not_empty(self):
881
"""Deleting a non-empty directory raises an exception
883
sftp (and possibly others) don't give us a specific "directory not
884
empty" exception -- we can just see that the operation failed.
886
t = self.get_transport()
891
self.assertRaises(PathError, t.rmdir, 'adir')
893
def test_rmdir_empty_but_similar_prefix(self):
894
"""rmdir does not get confused by sibling paths.
896
A naive implementation of MemoryTransport would refuse to rmdir
897
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
898
uses "path.startswith(dir)" on all file paths to determine if directory
901
t = self.get_transport()
905
t.put_bytes('foo-bar', '')
908
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
self.failUnless(t.has('foo-bar'))
911
def test_rename_dir_succeeds(self):
912
t = self.get_transport()
914
raise TestSkipped("transport is readonly")
916
t.mkdir('adir/asubdir')
917
t.rename('adir', 'bdir')
918
self.assertTrue(t.has('bdir/asubdir'))
919
self.assertFalse(t.has('adir'))
921
def test_rename_dir_nonempty(self):
922
"""Attempting to replace a nonemtpy directory should fail"""
923
t = self.get_transport()
925
raise TestSkipped("transport is readonly")
927
t.mkdir('adir/asubdir')
929
t.mkdir('bdir/bsubdir')
930
# any kind of PathError would be OK, though we normally expect
932
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
933
# nothing was changed so it should still be as before
934
self.assertTrue(t.has('bdir/bsubdir'))
935
self.assertFalse(t.has('adir/bdir'))
936
self.assertFalse(t.has('adir/bsubdir'))
938
def test_rename_across_subdirs(self):
939
t = self.get_transport()
941
raise TestNotApplicable("transport is readonly")
946
ta.put_bytes('f', 'aoeu')
947
ta.rename('f', '../b/f')
948
self.assertTrue(tb.has('f'))
949
self.assertFalse(ta.has('f'))
950
self.assertTrue(t.has('b/f'))
952
def test_delete_tree(self):
953
t = self.get_transport()
955
# Not much to do with a readonly transport
957
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
960
# and does it like listing ?
963
t.delete_tree('adir')
964
except TransportNotPossible:
965
# ok, this transport does not support delete_tree
968
# did it delete that trivial case?
969
self.assertRaises(NoSuchFile, t.stat, 'adir')
971
self.build_tree(['adir/',
979
t.delete_tree('adir')
980
# adir should be gone now.
981
self.assertRaises(NoSuchFile, t.stat, 'adir')
984
t = self.get_transport()
989
# TODO: I would like to use os.listdir() to
990
# make sure there are no extra files, but SftpServer
991
# creates control files in the working directory
992
# perhaps all of this could be done in a subdirectory
994
t.put_bytes('a', 'a first file\n')
995
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
998
self.failUnless(t.has('b'))
999
self.failIf(t.has('a'))
1001
self.check_transport_contents('a first file\n', t, 'b')
1002
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1005
t.put_bytes('c', 'c this file\n')
1007
self.failIf(t.has('c'))
1008
self.check_transport_contents('c this file\n', t, 'b')
1010
# TODO: Try to write a test for atomicity
1011
# TODO: Test moving into a non-existent subdirectory
1012
# TODO: Test Transport.move_multi
1014
def test_copy(self):
1015
t = self.get_transport()
1020
t.put_bytes('a', 'a file\n')
1022
self.check_transport_contents('a file\n', t, 'b')
1024
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1026
# What should the assert be if you try to copy a
1027
# file over a directory?
1028
#self.assertRaises(Something, t.copy, 'a', 'c')
1029
t.put_bytes('d', 'text in d\n')
1031
self.check_transport_contents('text in d\n', t, 'b')
1033
# TODO: test copy_multi
1035
def test_connection_error(self):
1036
"""ConnectionError is raised when connection is impossible.
1038
The error should be raised from the first operation on the transport.
1041
url = self._server.get_bogus_url()
1042
except NotImplementedError:
1043
raise TestSkipped("Transport %s has no bogus URL support." %
1044
self._server.__class__)
1045
t = get_transport(url)
1046
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1048
def test_stat(self):
1049
# TODO: Test stat, just try once, and if it throws, stop testing
1050
from stat import S_ISDIR, S_ISREG
1052
t = self.get_transport()
1056
except TransportNotPossible, e:
1057
# This transport cannot stat
1060
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1061
sizes = [14, 0, 16, 0, 18]
1062
self.build_tree(paths, transport=t, line_endings='binary')
1064
for path, size in zip(paths, sizes):
1066
if path.endswith('/'):
1067
self.failUnless(S_ISDIR(st.st_mode))
1068
# directory sizes are meaningless
1070
self.failUnless(S_ISREG(st.st_mode))
1071
self.assertEqual(size, st.st_size)
1073
remote_stats = list(t.stat_multi(paths))
1074
remote_iter_stats = list(t.stat_multi(iter(paths)))
1076
self.assertRaises(NoSuchFile, t.stat, 'q')
1077
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1079
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
subdir = t.clone('subdir')
1083
subdir.stat('./file')
1086
def test_list_dir(self):
1087
# TODO: Test list_dir, just try once, and if it throws, stop testing
1088
t = self.get_transport()
1090
if not t.listable():
1091
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1094
def sorted_list(d, transport):
1095
l = list(transport.list_dir(d))
1099
self.assertEqual([], sorted_list('.', t))
1100
# c2 is precisely one letter longer than c here to test that
1101
# suffixing is not confused.
1102
# a%25b checks that quoting is done consistently across transports
1103
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1105
if not t.is_readonly():
1106
self.build_tree(tree_names, transport=t)
1108
self.build_tree(tree_names)
1111
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1113
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1114
self.assertEqual(['d', 'e'], sorted_list('c', t))
1116
# Cloning the transport produces an equivalent listing
1117
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1119
if not t.is_readonly():
1126
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1127
self.assertEqual(['e'], sorted_list('c', t))
1129
self.assertListRaises(PathError, t.list_dir, 'q')
1130
self.assertListRaises(PathError, t.list_dir, 'c/f')
1131
# 'a' is a file, list_dir should raise an error
1132
self.assertListRaises(PathError, t.list_dir, 'a')
1134
def test_list_dir_result_is_url_escaped(self):
1135
t = self.get_transport()
1136
if not t.listable():
1137
raise TestSkipped("transport not listable")
1139
if not t.is_readonly():
1140
self.build_tree(['a/', 'a/%'], transport=t)
1142
self.build_tree(['a/', 'a/%'])
1144
names = list(t.list_dir('a'))
1145
self.assertEqual(['%25'], names)
1146
self.assertIsInstance(names[0], str)
1148
def test_clone_preserve_info(self):
1149
t1 = self.get_transport()
1150
if not isinstance(t1, ConnectedTransport):
1151
raise TestSkipped("not a connected transport")
1153
t2 = t1.clone('subdir')
1154
self.assertEquals(t1._scheme, t2._scheme)
1155
self.assertEquals(t1._user, t2._user)
1156
self.assertEquals(t1._password, t2._password)
1157
self.assertEquals(t1._host, t2._host)
1158
self.assertEquals(t1._port, t2._port)
1160
def test__reuse_for(self):
1161
t = self.get_transport()
1162
if not isinstance(t, ConnectedTransport):
1163
raise TestSkipped("not a connected transport")
1165
def new_url(scheme=None, user=None, password=None,
1166
host=None, port=None, path=None):
1167
"""Build a new url from t.base changing only parts of it.
1169
Only the parameters different from None will be changed.
1171
if scheme is None: scheme = t._scheme
1172
if user is None: user = t._user
1173
if password is None: password = t._password
1174
if user is None: user = t._user
1175
if host is None: host = t._host
1176
if port is None: port = t._port
1177
if path is None: path = t._path
1178
return t._unsplit_url(scheme, user, password, host, port, path)
1180
if t._scheme == 'ftp':
1184
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1189
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1190
# passwords are not taken into account because:
1191
# - it makes no sense to have two different valid passwords for the
1193
# - _password in ConnectedTransport is intended to collect what the
1194
# user specified from the command-line and there are cases where the
1195
# new url can contain no password (if the url was built from an
1196
# existing transport.base for example)
1197
# - password are considered part of the credentials provided at
1198
# connection creation time and as such may not be present in the url
1199
# (they may be typed by the user when prompted for example)
1200
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1201
# We will not connect, we can use a invalid host
1202
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1207
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1208
# No point in trying to reuse a transport for a local URL
1209
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1211
def test_connection_sharing(self):
1212
t = self.get_transport()
1213
if not isinstance(t, ConnectedTransport):
1214
raise TestSkipped("not a connected transport")
1216
c = t.clone('subdir')
1217
# Some transports will create the connection only when needed
1218
t.has('surely_not') # Force connection
1219
self.assertIs(t._get_connection(), c._get_connection())
1221
# Temporary failure, we need to create a new dummy connection
1222
new_connection = object()
1223
t._set_connection(new_connection)
1224
# Check that both transports use the same connection
1225
self.assertIs(new_connection, t._get_connection())
1226
self.assertIs(new_connection, c._get_connection())
1228
def test_reuse_connection_for_various_paths(self):
1229
t = self.get_transport()
1230
if not isinstance(t, ConnectedTransport):
1231
raise TestSkipped("not a connected transport")
1233
t.has('surely_not') # Force connection
1234
self.assertIsNot(None, t._get_connection())
1236
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1237
self.assertIsNot(t, subdir)
1238
self.assertIs(t._get_connection(), subdir._get_connection())
1240
home = subdir._reuse_for(t.base + 'home')
1241
self.assertIs(t._get_connection(), home._get_connection())
1242
self.assertIs(subdir._get_connection(), home._get_connection())
1244
def test_clone(self):
1245
# TODO: Test that clone moves up and down the filesystem
1246
t1 = self.get_transport()
1248
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1250
self.failUnless(t1.has('a'))
1251
self.failUnless(t1.has('b/c'))
1252
self.failIf(t1.has('c'))
1255
self.assertEqual(t1.base + 'b/', t2.base)
1257
self.failUnless(t2.has('c'))
1258
self.failIf(t2.has('a'))
1261
self.failUnless(t3.has('a'))
1262
self.failIf(t3.has('c'))
1264
self.failIf(t1.has('b/d'))
1265
self.failIf(t2.has('d'))
1266
self.failIf(t3.has('b/d'))
1268
if t1.is_readonly():
1269
self.build_tree_contents([('b/d', 'newfile\n')])
1271
t2.put_bytes('d', 'newfile\n')
1273
self.failUnless(t1.has('b/d'))
1274
self.failUnless(t2.has('d'))
1275
self.failUnless(t3.has('b/d'))
1277
def test_clone_to_root(self):
1278
orig_transport = self.get_transport()
1279
# Repeatedly go up to a parent directory until we're at the root
1280
# directory of this transport
1281
root_transport = orig_transport
1282
new_transport = root_transport.clone("..")
1283
# as we are walking up directories, the path must be
1284
# growing less, except at the top
1285
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1286
or new_transport.base == root_transport.base)
1287
while new_transport.base != root_transport.base:
1288
root_transport = new_transport
1289
new_transport = root_transport.clone("..")
1290
# as we are walking up directories, the path must be
1291
# growing less, except at the top
1292
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1293
or new_transport.base == root_transport.base)
1295
# Cloning to "/" should take us to exactly the same location.
1296
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1297
# the abspath of "/" from the original transport should be the same
1298
# as the base at the root:
1299
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1301
# At the root, the URL must still end with / as its a directory
1302
self.assertEqual(root_transport.base[-1], '/')
1304
def test_clone_from_root(self):
1305
"""At the root, cloning to a simple dir should just do string append."""
1306
orig_transport = self.get_transport()
1307
root_transport = orig_transport.clone('/')
1308
self.assertEqual(root_transport.base + '.bzr/',
1309
root_transport.clone('.bzr').base)
1311
def test_base_url(self):
1312
t = self.get_transport()
1313
self.assertEqual('/', t.base[-1])
1315
def test_relpath(self):
1316
t = self.get_transport()
1317
self.assertEqual('', t.relpath(t.base))
1319
self.assertEqual('', t.relpath(t.base[:-1]))
1320
# subdirs which don't exist should still give relpaths.
1321
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1322
# trailing slash should be the same.
1323
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1325
def test_relpath_at_root(self):
1326
t = self.get_transport()
1327
# clone all the way to the top
1328
new_transport = t.clone('..')
1329
while new_transport.base != t.base:
1331
new_transport = t.clone('..')
1332
# we must be able to get a relpath below the root
1333
self.assertEqual('', t.relpath(t.base))
1334
# and a deeper one should work too
1335
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1337
def test_abspath(self):
1338
# smoke test for abspath. Corner cases for backends like unix fs's
1339
# that have aliasing problems like symlinks should go in backend
1340
# specific test cases.
1341
transport = self.get_transport()
1343
self.assertEqual(transport.base + 'relpath',
1344
transport.abspath('relpath'))
1346
# This should work without raising an error.
1347
transport.abspath("/")
1349
# the abspath of "/" and "/foo/.." should result in the same location
1350
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1352
self.assertEqual(transport.clone("/").abspath('foo'),
1353
transport.abspath("/foo"))
1355
def test_win32_abspath(self):
1356
# Note: we tried to set sys.platform='win32' so we could test on
1357
# other platforms too, but then osutils does platform specific
1358
# things at import time which defeated us...
1359
if sys.platform != 'win32':
1361
'Testing drive letters in abspath implemented only for win32')
1363
# smoke test for abspath on win32.
1364
# a transport based on 'file:///' never fully qualifies the drive.
1365
transport = get_transport("file:///")
1366
self.failUnlessEqual(transport.abspath("/"), "file:///")
1368
# but a transport that starts with a drive spec must keep it.
1369
transport = get_transport("file:///C:/")
1370
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1372
def test_local_abspath(self):
1373
transport = self.get_transport()
1375
p = transport.local_abspath('.')
1376
except (errors.NotLocalUrl, TransportNotPossible), e:
1377
# should be formattable
1380
self.assertEqual(getcwd(), p)
1382
def test_abspath_at_root(self):
1383
t = self.get_transport()
1384
# clone all the way to the top
1385
new_transport = t.clone('..')
1386
while new_transport.base != t.base:
1388
new_transport = t.clone('..')
1389
# we must be able to get a abspath of the root when we ask for
1390
# t.abspath('..') - this due to our choice that clone('..')
1391
# should return the root from the root, combined with the desire that
1392
# the url from clone('..') and from abspath('..') should be the same.
1393
self.assertEqual(t.base, t.abspath('..'))
1394
# '' should give us the root
1395
self.assertEqual(t.base, t.abspath(''))
1396
# and a path should append to the url
1397
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1399
def test_iter_files_recursive(self):
1400
transport = self.get_transport()
1401
if not transport.listable():
1402
self.assertRaises(TransportNotPossible,
1403
transport.iter_files_recursive)
1405
self.build_tree(['isolated/',
1409
'isolated/dir/b%25z', # make sure quoting is correct
1411
transport=transport)
1412
paths = set(transport.iter_files_recursive())
1413
# nb the directories are not converted
1414
self.assertEqual(paths,
1415
set(['isolated/dir/foo',
1417
'isolated/dir/b%2525z',
1419
sub_transport = transport.clone('isolated')
1420
paths = set(sub_transport.iter_files_recursive())
1421
self.assertEqual(paths,
1422
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1424
def test_copy_tree(self):
1425
# TODO: test file contents and permissions are preserved. This test was
1426
# added just to ensure that quoting was handled correctly.
1427
# -- David Allouche 2006-08-11
1428
transport = self.get_transport()
1429
if not transport.listable():
1430
self.assertRaises(TransportNotPossible,
1431
transport.iter_files_recursive)
1433
if transport.is_readonly():
1435
self.build_tree(['from/',
1439
'from/dir/b%25z', # make sure quoting is correct
1441
transport=transport)
1442
transport.copy_tree('from', 'to')
1443
paths = set(transport.iter_files_recursive())
1444
self.assertEqual(paths,
1445
set(['from/dir/foo',
1454
def test_copy_tree_to_transport(self):
1455
transport = self.get_transport()
1456
if not transport.listable():
1457
self.assertRaises(TransportNotPossible,
1458
transport.iter_files_recursive)
1460
if transport.is_readonly():
1462
self.build_tree(['from/',
1466
'from/dir/b%25z', # make sure quoting is correct
1468
transport=transport)
1469
from_transport = transport.clone('from')
1470
to_transport = transport.clone('to')
1471
to_transport.ensure_base()
1472
from_transport.copy_tree_to_transport(to_transport)
1473
paths = set(transport.iter_files_recursive())
1474
self.assertEqual(paths,
1475
set(['from/dir/foo',
1484
def test_unicode_paths(self):
1485
"""Test that we can read/write files with Unicode names."""
1486
t = self.get_transport()
1488
# With FAT32 and certain encodings on win32
1489
# '\xe5' and '\xe4' actually map to the same file
1490
# adding a suffix kicks in the 'preserving but insensitive'
1491
# route, and maintains the right files
1492
files = [u'\xe5.1', # a w/ circle iso-8859-1
1493
u'\xe4.2', # a w/ dots iso-8859-1
1494
u'\u017d', # Z with umlat iso-8859-2
1495
u'\u062c', # Arabic j
1496
u'\u0410', # Russian A
1497
u'\u65e5', # Kanji person
1500
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1501
if no_unicode_support:
1502
raise tests.KnownFailure("test server cannot handle unicode paths")
1505
self.build_tree(files, transport=t, line_endings='binary')
1506
except UnicodeError:
1507
raise TestSkipped("cannot handle unicode paths in current encoding")
1509
# A plain unicode string is not a valid url
1511
self.assertRaises(InvalidURL, t.get, fname)
1514
fname_utf8 = fname.encode('utf-8')
1515
contents = 'contents of %s\n' % (fname_utf8,)
1516
self.check_transport_contents(contents, t, urlutils.escape(fname))
1518
def test_connect_twice_is_same_content(self):
1519
# check that our server (whatever it is) is accessible reliably
1520
# via get_transport and multiple connections share content.
1521
transport = self.get_transport()
1522
if transport.is_readonly():
1524
transport.put_bytes('foo', 'bar')
1525
transport3 = self.get_transport()
1526
self.check_transport_contents('bar', transport3, 'foo')
1528
# now opening at a relative url should give use a sane result:
1529
transport.mkdir('newdir')
1530
transport5 = self.get_transport('newdir')
1531
transport6 = transport5.clone('..')
1532
self.check_transport_contents('bar', transport6, 'foo')
1534
def test_lock_write(self):
1535
"""Test transport-level write locks.
1537
These are deprecated and transports may decline to support them.
1539
transport = self.get_transport()
1540
if transport.is_readonly():
1541
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1543
transport.put_bytes('lock', '')
1545
lock = transport.lock_write('lock')
1546
except TransportNotPossible:
1548
# TODO make this consistent on all platforms:
1549
# self.assertRaises(LockError, transport.lock_write, 'lock')
1552
def test_lock_read(self):
1553
"""Test transport-level read locks.
1555
These are deprecated and transports may decline to support them.
1557
transport = self.get_transport()
1558
if transport.is_readonly():
1559
file('lock', 'w').close()
1561
transport.put_bytes('lock', '')
1563
lock = transport.lock_read('lock')
1564
except TransportNotPossible:
1566
# TODO make this consistent on all platforms:
1567
# self.assertRaises(LockError, transport.lock_read, 'lock')
1570
def test_readv(self):
1571
transport = self.get_transport()
1572
if transport.is_readonly():
1573
file('a', 'w').write('0123456789')
1575
transport.put_bytes('a', '0123456789')
1577
d = list(transport.readv('a', ((0, 1),)))
1578
self.assertEqual(d[0], (0, '0'))
1580
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1581
self.assertEqual(d[0], (0, '0'))
1582
self.assertEqual(d[1], (1, '1'))
1583
self.assertEqual(d[2], (3, '34'))
1584
self.assertEqual(d[3], (9, '9'))
1586
def test_readv_out_of_order(self):
1587
transport = self.get_transport()
1588
if transport.is_readonly():
1589
file('a', 'w').write('0123456789')
1591
transport.put_bytes('a', '01234567890')
1593
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1594
self.assertEqual(d[0], (1, '1'))
1595
self.assertEqual(d[1], (9, '9'))
1596
self.assertEqual(d[2], (0, '0'))
1597
self.assertEqual(d[3], (3, '34'))
1599
def test_readv_with_adjust_for_latency(self):
1600
transport = self.get_transport()
1601
# the adjust for latency flag expands the data region returned
1602
# according to a per-transport heuristic, so testing is a little
1603
# tricky as we need more data than the largest combining that our
1604
# transports do. To accomodate this we generate random data and cross
1605
# reference the returned data with the random data. To avoid doing
1606
# multiple large random byte look ups we do several tests on the same
1608
content = osutils.rand_bytes(200*1024)
1609
content_size = len(content)
1610
if transport.is_readonly():
1611
self.build_tree_contents([('a', content)])
1613
transport.put_bytes('a', content)
1614
def check_result_data(result_vector):
1615
for item in result_vector:
1616
data_len = len(item[1])
1617
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1620
result = list(transport.readv('a', ((0, 30),),
1621
adjust_for_latency=True, upper_limit=content_size))
1622
# we expect 1 result, from 0, to something > 30
1623
self.assertEqual(1, len(result))
1624
self.assertEqual(0, result[0][0])
1625
self.assertTrue(len(result[0][1]) >= 30)
1626
check_result_data(result)
1627
# end of file corner case
1628
result = list(transport.readv('a', ((204700, 100),),
1629
adjust_for_latency=True, upper_limit=content_size))
1630
# we expect 1 result, from 204800- its length, to the end
1631
self.assertEqual(1, len(result))
1632
data_len = len(result[0][1])
1633
self.assertEqual(204800-data_len, result[0][0])
1634
self.assertTrue(data_len >= 100)
1635
check_result_data(result)
1636
# out of order ranges are made in order
1637
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1638
adjust_for_latency=True, upper_limit=content_size))
1639
# we expect 2 results, in order, start and end.
1640
self.assertEqual(2, len(result))
1642
data_len = len(result[0][1])
1643
self.assertEqual(0, result[0][0])
1644
self.assertTrue(data_len >= 30)
1646
data_len = len(result[1][1])
1647
self.assertEqual(204800-data_len, result[1][0])
1648
self.assertTrue(data_len >= 100)
1649
check_result_data(result)
1650
# close ranges get combined (even if out of order)
1651
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1652
result = list(transport.readv('a', request_vector,
1653
adjust_for_latency=True, upper_limit=content_size))
1654
self.assertEqual(1, len(result))
1655
data_len = len(result[0][1])
1656
# minimum length is from 400 to 1034 - 634
1657
self.assertTrue(data_len >= 634)
1658
# must contain the region 400 to 1034
1659
self.assertTrue(result[0][0] <= 400)
1660
self.assertTrue(result[0][0] + data_len >= 1034)
1661
check_result_data(result)
1663
def test_readv_with_adjust_for_latency_with_big_file(self):
1664
transport = self.get_transport()
1665
# test from observed failure case.
1666
if transport.is_readonly():
1667
file('a', 'w').write('a'*1024*1024)
1669
transport.put_bytes('a', 'a'*1024*1024)
1670
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1671
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1672
(465373, 800), (947422, 800)]
1673
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1674
found_items = [False]*9
1675
for pos, (start, length) in enumerate(broken_vector):
1676
# check the range is covered by the result
1677
for offset, data in results:
1678
if offset <= start and start + length <= offset + len(data):
1679
found_items[pos] = True
1680
self.assertEqual([True]*9, found_items)
1682
def test_get_with_open_write_stream_sees_all_content(self):
1683
t = self.get_transport()
1686
handle = t.open_write_stream('foo')
1689
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1693
def test_get_smart_medium(self):
1694
"""All transports must either give a smart medium, or know they can't.
1696
transport = self.get_transport()
1698
client_medium = transport.get_smart_medium()
1699
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1700
except errors.NoSmartMedium:
1701
# as long as we got it we're fine
1704
def test_readv_short_read(self):
1705
transport = self.get_transport()
1706
if transport.is_readonly():
1707
file('a', 'w').write('0123456789')
1709
transport.put_bytes('a', '01234567890')
1711
# This is intentionally reading off the end of the file
1712
# since we are sure that it cannot get there
1713
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1714
# Can be raised by paramiko
1716
transport.readv, 'a', [(1,1), (8,10)])
1718
# This is trying to seek past the end of the file, it should
1719
# also raise a special error
1720
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1721
transport.readv, 'a', [(12,2)])