20
20
TransportTestProviderAdapter.
24
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
28
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
37
from bzrlib.errors import (ConnectionError,
31
TransportNotPossible, ConnectionError)
32
from bzrlib.tests import TestCaseInTempDir, TestSkipped
33
from bzrlib.transport import memory, urlescape
34
import bzrlib.transport
38
"""Append the given text (file-like object) to the supplied filename."""
46
class TestTransportImplementation(TestCaseInTempDir):
47
"""Implementation verification for transports.
49
To verify a transport we need a server factory, which is a callable
50
that accepts no parameters and returns an implementation of
51
bzrlib.transport.Server.
53
That Server is then used to construct transport instances and test
54
the transport via loopback activity.
56
Currently this assumes that the Transport object is connected to the
57
current working directory. So that whatever is done
58
through the transport, should show up in the working
59
directory, and vice-versa. This is a bug, because its possible to have
60
URL schemes which provide access to something that may not be
61
result in storage on the local disk, i.e. due to file system limits, or
62
due to it being a database or some other non-filesystem tool.
64
This also tests to make sure that the functions work with both
65
generators and lists (assuming iter(list) is effectively a generator)
47
from bzrlib.osutils import getcwd
48
from bzrlib.smart import medium
49
from bzrlib.tests import (
55
from bzrlib.tests.test_transport import TestTransportImplementation
56
from bzrlib.transport import (
59
_get_transport_modules,
61
from bzrlib.transport.memory import MemoryTransport
64
def get_transport_test_permutations(module):
65
"""Get the permutations module wants to have tested."""
66
if getattr(module, 'get_test_permutations', None) is None:
68
"transport module %s doesn't provide get_test_permutations()"
71
return module.get_test_permutations()
74
def transport_test_permutations():
75
"""Return a list of the klass, server_factory pairs to test."""
77
for module in _get_transport_modules():
79
permutations = get_transport_test_permutations(
80
reduce(getattr, (module).split('.')[1:], __import__(module)))
81
for (klass, server_factory) in permutations:
82
scenario = (server_factory.__name__,
83
{"transport_class":klass,
84
"transport_server":server_factory})
85
result.append(scenario)
86
except errors.DependencyNotPresent, e:
87
# Continue even if a dependency prevents us
88
# from adding this test
93
def load_tests(standard_tests, module, loader):
94
"""Multiply tests for tranport implementations."""
95
result = loader.suiteClass()
96
scenarios = transport_test_permutations()
97
return multiply_tests(standard_tests, scenarios, result)
100
class TransportTests(TestTransportImplementation):
69
super(TestTransportImplementation, self).setUp()
70
self._server = self.transport_server()
103
super(TransportTests, self).setUp()
104
self._captureVar('BZR_NO_SMART_VFS', None)
74
super(TestTransportImplementation, self).tearDown()
75
self._server.tearDown()
77
106
def check_transport_contents(self, content, transport, relpath):
78
107
"""Check that transport.get(relpath).read() == content."""
79
108
self.assertEqualDiff(content, transport.get(relpath).read())
81
def get_transport(self):
82
"""Return a connected transport to the local directory."""
83
t = bzrlib.transport.get_transport(self._server.get_url())
84
self.failUnless(isinstance(t, self.transport_class),
85
"Got the wrong class from get_transport"
86
"(%r, expected %r)" % (t.__class__,
87
self.transport_class))
90
def assertListRaises(self, excClass, func, *args, **kwargs):
91
"""Fail unless excClass is raised when the iterator from func is used.
93
Many transport functions can return generators this makes sure
94
to wrap them in a list() call to make sure the whole generator
95
is run, and that the proper exception is raised.
110
def test_ensure_base_missing(self):
111
""".ensure_base() should create the directory if it doesn't exist"""
112
t = self.get_transport()
114
if t_a.is_readonly():
115
self.assertRaises(TransportNotPossible,
118
self.assertTrue(t_a.ensure_base())
119
self.assertTrue(t.has('a'))
121
def test_ensure_base_exists(self):
122
""".ensure_base() should just be happy if it already exists"""
123
t = self.get_transport()
129
# ensure_base returns False if it didn't create the base
130
self.assertFalse(t_a.ensure_base())
132
def test_ensure_base_missing_parent(self):
133
""".ensure_base() will fail if the parent dir doesn't exist"""
134
t = self.get_transport()
140
self.assertRaises(NoSuchFile, t_b.ensure_base)
142
def test_external_url(self):
143
""".external_url either works or raises InProcessTransport."""
144
t = self.get_transport()
98
list(func(*args, **kwargs))
102
if hasattr(excClass,'__name__'): excName = excClass.__name__
103
else: excName = str(excClass)
104
raise self.failureException, "%s not raised" % excName
147
except errors.InProcessTransport:
106
150
def test_has(self):
107
151
t = self.get_transport()
129
189
'contents of e\n',
130
190
'contents of g\n',
132
self.build_tree(files, transport=t)
192
self.build_tree(files, transport=t, line_endings='binary')
133
193
self.check_transport_contents('contents of a\n', t, 'a')
134
194
content_f = t.get_multi(files)
135
for content, f in zip(contents, content_f):
195
# Use itertools.izip() instead of use zip() or map(), since they fully
196
# evaluate their inputs, the transport requests should be issued and
197
# handled sequentially (we don't want to force transport to buffer).
198
for content, f in itertools.izip(contents, content_f):
136
199
self.assertEqual(content, f.read())
138
201
content_f = t.get_multi(iter(files))
139
for content, f in zip(contents, content_f):
202
# Use itertools.izip() for the same reason
203
for content, f in itertools.izip(contents, content_f):
140
204
self.assertEqual(content, f.read())
206
def test_get_unknown_file(self):
207
t = self.get_transport()
209
contents = ['contents of a\n',
212
self.build_tree(files, transport=t, line_endings='binary')
142
213
self.assertRaises(NoSuchFile, t.get, 'c')
143
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
144
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
147
t = self.get_transport()
150
self.assertRaises(TransportNotPossible,
151
t.put, 'a', 'some text for a\n')
154
t.put('a', StringIO('some text for a\n'))
155
self.failUnless(t.has('a'))
156
self.check_transport_contents('some text for a\n', t, 'a')
157
# Make sure 'has' is updated
158
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
159
[True, False, False, False, False])
160
# Put also replaces contents
161
self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
162
('d', StringIO('contents\nfor d\n'))]),
164
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
165
[True, False, False, True, False])
166
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
167
self.check_transport_contents('contents\nfor d\n', t, 'd')
170
t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
171
('d', StringIO('another contents\nfor d\n'))])),
173
self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
174
self.check_transport_contents('another contents\nfor d\n', t, 'd')
176
self.assertRaises(NoSuchFile,
177
t.put, 'path/doesnt/exist/c', 'contents')
179
def test_put_permissions(self):
180
t = self.get_transport()
184
t.put('mode644', StringIO('test text\n'), mode=0644)
185
self.assertTransportMode(t, 'mode644', 0644)
186
t.put('mode666', StringIO('test text\n'), mode=0666)
187
self.assertTransportMode(t, 'mode666', 0666)
188
t.put('mode600', StringIO('test text\n'), mode=0600)
217
def test_get_directory_read_gives_ReadError(self):
218
"""consistent errors for read() on a file returned by get()."""
219
t = self.get_transport()
221
self.build_tree(['a directory/'])
223
t.mkdir('a%20directory')
224
# getting the file must either work or fail with a PathError
226
a_file = t.get('a%20directory')
227
except (errors.PathError, errors.RedirectRequested):
228
# early failure return immediately.
230
# having got a file, read() must either work (i.e. http reading a dir
231
# listing) or fail with ReadError
234
except errors.ReadError:
237
def test_get_bytes(self):
238
t = self.get_transport()
240
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
246
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
249
for content, fname in zip(contents, files):
250
self.assertEqual(content, t.get_bytes(fname))
252
def test_get_bytes_unknown_file(self):
253
t = self.get_transport()
255
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
257
def test_get_with_open_write_stream_sees_all_content(self):
258
t = self.get_transport()
261
handle = t.open_write_stream('foo')
264
self.assertEqual('b', t.get('foo').read())
268
def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
t = self.get_transport()
272
handle = t.open_write_stream('foo')
275
self.assertEqual('b', t.get_bytes('foo'))
276
self.assertEqual('b', t.get('foo').read())
280
def test_put_bytes(self):
281
t = self.get_transport()
284
self.assertRaises(TransportNotPossible,
285
t.put_bytes, 'a', 'some text for a\n')
288
t.put_bytes('a', 'some text for a\n')
289
self.failUnless(t.has('a'))
290
self.check_transport_contents('some text for a\n', t, 'a')
292
# The contents should be overwritten
293
t.put_bytes('a', 'new text for a\n')
294
self.check_transport_contents('new text for a\n', t, 'a')
296
self.assertRaises(NoSuchFile,
297
t.put_bytes, 'path/doesnt/exist/c', 'contents')
299
def test_put_bytes_non_atomic(self):
300
t = self.get_transport()
303
self.assertRaises(TransportNotPossible,
304
t.put_bytes_non_atomic, 'a', 'some text for a\n')
307
self.failIf(t.has('a'))
308
t.put_bytes_non_atomic('a', 'some text for a\n')
309
self.failUnless(t.has('a'))
310
self.check_transport_contents('some text for a\n', t, 'a')
311
# Put also replaces contents
312
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
315
# Make sure we can create another file
316
t.put_bytes_non_atomic('d', 'contents for\nd\n')
317
# And overwrite 'a' with empty contents
318
t.put_bytes_non_atomic('a', '')
319
self.check_transport_contents('contents for\nd\n', t, 'd')
320
self.check_transport_contents('', t, 'a')
322
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
324
# Now test the create_parent flag
325
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
327
self.failIf(t.has('dir/a'))
328
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
create_parent_dir=True)
330
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
332
# But we still get NoSuchFile if we can't make the parent dir
333
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
335
create_parent_dir=True)
337
def test_put_bytes_permissions(self):
338
t = self.get_transport()
342
if not t._can_roundtrip_unix_modebits():
343
# Can't roundtrip, so no need to run this test
345
t.put_bytes('mode644', 'test text\n', mode=0644)
346
self.assertTransportMode(t, 'mode644', 0644)
347
t.put_bytes('mode666', 'test text\n', mode=0666)
348
self.assertTransportMode(t, 'mode666', 0666)
349
t.put_bytes('mode600', 'test text\n', mode=0600)
350
self.assertTransportMode(t, 'mode600', 0600)
351
# Yes, you can put_bytes a file such that it becomes readonly
352
t.put_bytes('mode400', 'test text\n', mode=0400)
353
self.assertTransportMode(t, 'mode400', 0400)
355
# The default permissions should be based on the current umask
356
umask = osutils.get_umask()
357
t.put_bytes('nomode', 'test text\n', mode=None)
358
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
360
def test_put_bytes_non_atomic_permissions(self):
361
t = self.get_transport()
365
if not t._can_roundtrip_unix_modebits():
366
# Can't roundtrip, so no need to run this test
368
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
self.assertTransportMode(t, 'mode644', 0644)
370
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
self.assertTransportMode(t, 'mode666', 0666)
372
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
self.assertTransportMode(t, 'mode600', 0600)
374
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
self.assertTransportMode(t, 'mode400', 0400)
377
# The default permissions should be based on the current umask
378
umask = osutils.get_umask()
379
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
382
# We should also be able to set the mode for a parent directory
384
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
dir_mode=0700, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir700', 0700)
387
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
dir_mode=0770, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir770', 0770)
390
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
dir_mode=0777, create_parent_dir=True)
392
self.assertTransportMode(t, 'dir777', 0777)
394
def test_put_file(self):
395
t = self.get_transport()
398
self.assertRaises(TransportNotPossible,
399
t.put_file, 'a', StringIO('some text for a\n'))
402
result = t.put_file('a', StringIO('some text for a\n'))
403
# put_file returns the length of the data written
404
self.assertEqual(16, result)
405
self.failUnless(t.has('a'))
406
self.check_transport_contents('some text for a\n', t, 'a')
407
# Put also replaces contents
408
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
409
self.assertEqual(19, result)
410
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
self.assertRaises(NoSuchFile,
412
t.put_file, 'path/doesnt/exist/c',
413
StringIO('contents'))
415
def test_put_file_non_atomic(self):
416
t = self.get_transport()
419
self.assertRaises(TransportNotPossible,
420
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
423
self.failIf(t.has('a'))
424
t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
self.failUnless(t.has('a'))
426
self.check_transport_contents('some text for a\n', t, 'a')
427
# Put also replaces contents
428
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
431
# Make sure we can create another file
432
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
433
# And overwrite 'a' with empty contents
434
t.put_file_non_atomic('a', StringIO(''))
435
self.check_transport_contents('contents for\nd\n', t, 'd')
436
self.check_transport_contents('', t, 'a')
438
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
StringIO('contents\n'))
440
# Now test the create_parent flag
441
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
StringIO('contents\n'))
443
self.failIf(t.has('dir/a'))
444
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
create_parent_dir=True)
446
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
448
# But we still get NoSuchFile if we can't make the parent dir
449
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
StringIO('contents\n'),
451
create_parent_dir=True)
453
def test_put_file_permissions(self):
455
t = self.get_transport()
459
if not t._can_roundtrip_unix_modebits():
460
# Can't roundtrip, so no need to run this test
462
t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
self.assertTransportMode(t, 'mode644', 0644)
464
t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
self.assertTransportMode(t, 'mode666', 0666)
466
t.put_file('mode600', StringIO('test text\n'), mode=0600)
189
467
self.assertTransportMode(t, 'mode600', 0600)
190
468
# Yes, you can put a file such that it becomes readonly
191
t.put('mode400', StringIO('test text\n'), mode=0400)
192
self.assertTransportMode(t, 'mode400', 0400)
193
t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
194
self.assertTransportMode(t, 'mmode644', 0644)
469
t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
self.assertTransportMode(t, 'mode400', 0400)
471
# The default permissions should be based on the current umask
472
umask = osutils.get_umask()
473
t.put_file('nomode', StringIO('test text\n'), mode=None)
474
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
476
def test_put_file_non_atomic_permissions(self):
477
t = self.get_transport()
481
if not t._can_roundtrip_unix_modebits():
482
# Can't roundtrip, so no need to run this test
484
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
self.assertTransportMode(t, 'mode644', 0644)
486
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
self.assertTransportMode(t, 'mode666', 0666)
488
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
self.assertTransportMode(t, 'mode600', 0600)
490
# Yes, you can put_file_non_atomic a file such that it becomes readonly
491
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
self.assertTransportMode(t, 'mode400', 0400)
494
# The default permissions should be based on the current umask
495
umask = osutils.get_umask()
496
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
499
# We should also be able to set the mode for a parent directory
502
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
dir_mode=0700, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir700', 0700)
505
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
dir_mode=0770, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir770', 0770)
508
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
dir_mode=0777, create_parent_dir=True)
510
self.assertTransportMode(t, 'dir777', 0777)
512
def test_put_bytes_unicode(self):
513
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
515
# (we don't want to encode unicode here at all, callers should be
516
# strictly passing bytes to put_bytes), but we allow it for backwards
517
# compatibility. At some point we should use a specific exception.
518
# See https://bugs.launchpad.net/bzr/+bug/106898.
519
t = self.get_transport()
522
unicode_string = u'\u1234'
524
(AssertionError, UnicodeEncodeError),
525
t.put_bytes, 'foo', unicode_string)
527
def test_put_file_unicode(self):
528
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
# This situation can happen (and has) if code is careless about the type
530
# of "string" they initialise/write to a StringIO with. We cannot use
531
# cStringIO, because it never returns unicode from read.
532
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
# raise, but we raise it for hysterical raisins.
534
t = self.get_transport()
537
unicode_file = pyStringIO(u'\u1234')
538
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
196
540
def test_mkdir(self):
197
541
t = self.get_transport()
199
543
if t.is_readonly():
200
# cannot mkdir on readonly transports. We're not testing for
544
# cannot mkdir on readonly transports. We're not testing for
201
545
# cache coherency because cache behaviour is not currently
202
546
# defined for the transport interface.
203
547
self.assertRaises(TransportNotPossible, t.mkdir, '.')
304
679
del temp_transport
306
681
for mode in (0666, 0644, 0600, 0400):
307
temp_transport = MemoryTransport("memory:/")
682
temp_transport = MemoryTransport("memory:///")
308
683
t.copy_to(files, temp_transport, mode=mode)
310
685
self.assertTransportMode(temp_transport, f, mode)
312
def test_append(self):
313
t = self.get_transport()
316
open('a', 'wb').write('diff\ncontents for\na\n')
317
open('b', 'wb').write('contents\nfor b\n')
320
('a', StringIO('diff\ncontents for\na\n')),
321
('b', StringIO('contents\nfor b\n'))
325
self.assertRaises(TransportNotPossible,
326
t.append, 'a', 'add\nsome\nmore\ncontents\n')
327
_append('a', StringIO('add\nsome\nmore\ncontents\n'))
329
t.append('a', StringIO('add\nsome\nmore\ncontents\n'))
331
self.check_transport_contents(
332
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
336
self.assertRaises(TransportNotPossible,
338
[('a', 'and\nthen\nsome\nmore\n'),
339
('b', 'some\nmore\nfor\nb\n')])
340
_append('a', StringIO('and\nthen\nsome\nmore\n'))
341
_append('b', StringIO('some\nmore\nfor\nb\n'))
687
def test_create_prefix(self):
688
t = self.get_transport()
689
sub = t.clone('foo').clone('bar')
692
except TransportNotPossible:
693
self.assertTrue(t.is_readonly())
695
self.assertTrue(t.has('foo/bar'))
697
def test_append_file(self):
698
t = self.get_transport()
701
self.assertRaises(TransportNotPossible,
702
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
704
t.put_bytes('a', 'diff\ncontents for\na\n')
705
t.put_bytes('b', 'contents\nfor b\n')
708
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
710
self.check_transport_contents(
711
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
714
# a file with no parent should fail..
715
self.assertRaises(NoSuchFile,
716
t.append_file, 'missing/path', StringIO('content'))
718
# And we can create new files, too
720
t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
self.check_transport_contents('some text\nfor a missing file\n',
724
def test_append_bytes(self):
725
t = self.get_transport()
728
self.assertRaises(TransportNotPossible,
729
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
732
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
736
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
738
self.check_transport_contents(
739
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
742
# a file with no parent should fail..
743
self.assertRaises(NoSuchFile,
744
t.append_bytes, 'missing/path', 'content')
746
def test_append_multi(self):
747
t = self.get_transport()
751
t.put_bytes('a', 'diff\ncontents for\na\n'
752
'add\nsome\nmore\ncontents\n')
753
t.put_bytes('b', 'contents\nfor b\n')
755
self.assertEqual((43, 15),
343
756
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
344
('b', StringIO('some\nmore\nfor\nb\n'))])
757
('b', StringIO('some\nmore\nfor\nb\n'))]))
345
759
self.check_transport_contents(
346
760
'diff\ncontents for\na\n'
347
761
'add\nsome\nmore\ncontents\n'
732
1086
def test_list_dir(self):
733
1087
# TODO: Test list_dir, just try once, and if it throws, stop testing
734
1088
t = self.get_transport()
736
1090
if not t.listable():
737
1091
self.assertRaises(TransportNotPossible, t.list_dir, '.')
741
l = list(t.list_dir(d))
1094
def sorted_list(d, transport):
1095
l = list(transport.list_dir(d))
745
# SftpServer creates control files in the working directory
746
# so lets move down a directory to avoid those.
747
if not t.is_readonly():
753
self.assertEqual([], sorted_list(u'.'))
1099
self.assertEqual([], sorted_list('.', t))
754
1100
# c2 is precisely one letter longer than c here to test that
755
1101
# suffixing is not confused.
1102
# a%25b checks that quoting is done consistently across transports
1103
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
756
1105
if not t.is_readonly():
757
self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1106
self.build_tree(tree_names, transport=t)
759
self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
761
self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
762
self.assertEqual([u'd', u'e'], sorted_list(u'c'))
1108
self.build_tree(tree_names)
1111
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1113
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1114
self.assertEqual(['d', 'e'], sorted_list('c', t))
1116
# Cloning the transport produces an equivalent listing
1117
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
764
1119
if not t.is_readonly():
771
self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
772
self.assertEqual([u'e'], sorted_list(u'c'))
774
self.assertListRaises(NoSuchFile, t.list_dir, 'q')
775
self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
776
self.assertListRaises(NoSuchFile, t.list_dir, 'a')
1126
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1127
self.assertEqual(['e'], sorted_list('c', t))
1129
self.assertListRaises(PathError, t.list_dir, 'q')
1130
self.assertListRaises(PathError, t.list_dir, 'c/f')
1131
# 'a' is a file, list_dir should raise an error
1132
self.assertListRaises(PathError, t.list_dir, 'a')
1134
def test_list_dir_result_is_url_escaped(self):
1135
t = self.get_transport()
1136
if not t.listable():
1137
raise TestSkipped("transport not listable")
1139
if not t.is_readonly():
1140
self.build_tree(['a/', 'a/%'], transport=t)
1142
self.build_tree(['a/', 'a/%'])
1144
names = list(t.list_dir('a'))
1145
self.assertEqual(['%25'], names)
1146
self.assertIsInstance(names[0], str)
1148
def test_clone_preserve_info(self):
1149
t1 = self.get_transport()
1150
if not isinstance(t1, ConnectedTransport):
1151
raise TestSkipped("not a connected transport")
1153
t2 = t1.clone('subdir')
1154
self.assertEquals(t1._scheme, t2._scheme)
1155
self.assertEquals(t1._user, t2._user)
1156
self.assertEquals(t1._password, t2._password)
1157
self.assertEquals(t1._host, t2._host)
1158
self.assertEquals(t1._port, t2._port)
1160
def test__reuse_for(self):
1161
t = self.get_transport()
1162
if not isinstance(t, ConnectedTransport):
1163
raise TestSkipped("not a connected transport")
1165
def new_url(scheme=None, user=None, password=None,
1166
host=None, port=None, path=None):
1167
"""Build a new url from t.base changing only parts of it.
1169
Only the parameters different from None will be changed.
1171
if scheme is None: scheme = t._scheme
1172
if user is None: user = t._user
1173
if password is None: password = t._password
1174
if user is None: user = t._user
1175
if host is None: host = t._host
1176
if port is None: port = t._port
1177
if path is None: path = t._path
1178
return t._unsplit_url(scheme, user, password, host, port, path)
1180
if t._scheme == 'ftp':
1184
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1189
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1190
# passwords are not taken into account because:
1191
# - it makes no sense to have two different valid passwords for the
1193
# - _password in ConnectedTransport is intended to collect what the
1194
# user specified from the command-line and there are cases where the
1195
# new url can contain no password (if the url was built from an
1196
# existing transport.base for example)
1197
# - password are considered part of the credentials provided at
1198
# connection creation time and as such may not be present in the url
1199
# (they may be typed by the user when prompted for example)
1200
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1201
# We will not connect, we can use a invalid host
1202
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1207
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1208
# No point in trying to reuse a transport for a local URL
1209
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1211
def test_connection_sharing(self):
1212
t = self.get_transport()
1213
if not isinstance(t, ConnectedTransport):
1214
raise TestSkipped("not a connected transport")
1216
c = t.clone('subdir')
1217
# Some transports will create the connection only when needed
1218
t.has('surely_not') # Force connection
1219
self.assertIs(t._get_connection(), c._get_connection())
1221
# Temporary failure, we need to create a new dummy connection
1222
new_connection = object()
1223
t._set_connection(new_connection)
1224
# Check that both transports use the same connection
1225
self.assertIs(new_connection, t._get_connection())
1226
self.assertIs(new_connection, c._get_connection())
1228
def test_reuse_connection_for_various_paths(self):
1229
t = self.get_transport()
1230
if not isinstance(t, ConnectedTransport):
1231
raise TestSkipped("not a connected transport")
1233
t.has('surely_not') # Force connection
1234
self.assertIsNot(None, t._get_connection())
1236
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1237
self.assertIsNot(t, subdir)
1238
self.assertIs(t._get_connection(), subdir._get_connection())
1240
home = subdir._reuse_for(t.base + 'home')
1241
self.assertIs(t._get_connection(), home._get_connection())
1242
self.assertIs(subdir._get_connection(), home._get_connection())
778
1244
def test_clone(self):
779
1245
# TODO: Test that clone moves up and down the filesystem
800
1266
self.failIf(t3.has('b/d'))
802
1268
if t1.is_readonly():
803
open('b/d', 'wb').write('newfile\n')
1269
self.build_tree_contents([('b/d', 'newfile\n')])
805
t2.put('d', StringIO('newfile\n'))
1271
t2.put_bytes('d', 'newfile\n')
807
1273
self.failUnless(t1.has('b/d'))
808
1274
self.failUnless(t2.has('d'))
809
1275
self.failUnless(t3.has('b/d'))
1277
def test_clone_to_root(self):
1278
orig_transport = self.get_transport()
1279
# Repeatedly go up to a parent directory until we're at the root
1280
# directory of this transport
1281
root_transport = orig_transport
1282
new_transport = root_transport.clone("..")
1283
# as we are walking up directories, the path must be
1284
# growing less, except at the top
1285
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1286
or new_transport.base == root_transport.base)
1287
while new_transport.base != root_transport.base:
1288
root_transport = new_transport
1289
new_transport = root_transport.clone("..")
1290
# as we are walking up directories, the path must be
1291
# growing less, except at the top
1292
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1293
or new_transport.base == root_transport.base)
1295
# Cloning to "/" should take us to exactly the same location.
1296
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1297
# the abspath of "/" from the original transport should be the same
1298
# as the base at the root:
1299
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1301
# At the root, the URL must still end with / as its a directory
1302
self.assertEqual(root_transport.base[-1], '/')
1304
def test_clone_from_root(self):
1305
"""At the root, cloning to a simple dir should just do string append."""
1306
orig_transport = self.get_transport()
1307
root_transport = orig_transport.clone('/')
1308
self.assertEqual(root_transport.base + '.bzr/',
1309
root_transport.clone('.bzr').base)
1311
def test_base_url(self):
1312
t = self.get_transport()
1313
self.assertEqual('/', t.base[-1])
811
1315
def test_relpath(self):
812
1316
t = self.get_transport()
813
1317
self.assertEqual('', t.relpath(t.base))
814
1318
# base ends with /
815
1319
self.assertEqual('', t.relpath(t.base[:-1]))
816
# subdirs which dont exist should still give relpaths.
1320
# subdirs which don't exist should still give relpaths.
817
1321
self.assertEqual('foo', t.relpath(t.base + 'foo'))
818
1322
# trailing slash should be the same.
819
1323
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1325
def test_relpath_at_root(self):
1326
t = self.get_transport()
1327
# clone all the way to the top
1328
new_transport = t.clone('..')
1329
while new_transport.base != t.base:
1331
new_transport = t.clone('..')
1332
# we must be able to get a relpath below the root
1333
self.assertEqual('', t.relpath(t.base))
1334
# and a deeper one should work too
1335
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
821
1337
def test_abspath(self):
822
1338
# smoke test for abspath. Corner cases for backends like unix fs's
823
1339
# that have aliasing problems like symlinks should go in backend
824
1340
# specific test cases.
825
1341
transport = self.get_transport()
826
1343
self.assertEqual(transport.base + 'relpath',
827
1344
transport.abspath('relpath'))
1346
# This should work without raising an error.
1347
transport.abspath("/")
1349
# the abspath of "/" and "/foo/.." should result in the same location
1350
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1352
self.assertEqual(transport.clone("/").abspath('foo'),
1353
transport.abspath("/foo"))
1355
def test_win32_abspath(self):
1356
# Note: we tried to set sys.platform='win32' so we could test on
1357
# other platforms too, but then osutils does platform specific
1358
# things at import time which defeated us...
1359
if sys.platform != 'win32':
1361
'Testing drive letters in abspath implemented only for win32')
1363
# smoke test for abspath on win32.
1364
# a transport based on 'file:///' never fully qualifies the drive.
1365
transport = get_transport("file:///")
1366
self.failUnlessEqual(transport.abspath("/"), "file:///")
1368
# but a transport that starts with a drive spec must keep it.
1369
transport = get_transport("file:///C:/")
1370
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1372
def test_local_abspath(self):
1373
transport = self.get_transport()
1375
p = transport.local_abspath('.')
1376
except (errors.NotLocalUrl, TransportNotPossible), e:
1377
# should be formattable
1380
self.assertEqual(getcwd(), p)
1382
def test_abspath_at_root(self):
1383
t = self.get_transport()
1384
# clone all the way to the top
1385
new_transport = t.clone('..')
1386
while new_transport.base != t.base:
1388
new_transport = t.clone('..')
1389
# we must be able to get a abspath of the root when we ask for
1390
# t.abspath('..') - this due to our choice that clone('..')
1391
# should return the root from the root, combined with the desire that
1392
# the url from clone('..') and from abspath('..') should be the same.
1393
self.assertEqual(t.base, t.abspath('..'))
1394
# '' should give us the root
1395
self.assertEqual(t.base, t.abspath(''))
1396
# and a path should append to the url
1397
self.assertEqual(t.base + 'foo', t.abspath('foo'))
829
1399
def test_iter_files_recursive(self):
830
1400
transport = self.get_transport()
831
1401
if not transport.listable():
843
1414
self.assertEqual(paths,
844
1415
set(['isolated/dir/foo',
845
1416
'isolated/dir/bar',
1417
'isolated/dir/b%2525z',
846
1418
'isolated/bar']))
847
1419
sub_transport = transport.clone('isolated')
848
1420
paths = set(sub_transport.iter_files_recursive())
849
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
1421
self.assertEqual(paths,
1422
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1424
def test_copy_tree(self):
1425
# TODO: test file contents and permissions are preserved. This test was
1426
# added just to ensure that quoting was handled correctly.
1427
# -- David Allouche 2006-08-11
1428
transport = self.get_transport()
1429
if not transport.listable():
1430
self.assertRaises(TransportNotPossible,
1431
transport.iter_files_recursive)
1433
if transport.is_readonly():
1435
self.build_tree(['from/',
1439
'from/dir/b%25z', # make sure quoting is correct
1441
transport=transport)
1442
transport.copy_tree('from', 'to')
1443
paths = set(transport.iter_files_recursive())
1444
self.assertEqual(paths,
1445
set(['from/dir/foo',
1454
def test_copy_tree_to_transport(self):
1455
transport = self.get_transport()
1456
if not transport.listable():
1457
self.assertRaises(TransportNotPossible,
1458
transport.iter_files_recursive)
1460
if transport.is_readonly():
1462
self.build_tree(['from/',
1466
'from/dir/b%25z', # make sure quoting is correct
1468
transport=transport)
1469
from_transport = transport.clone('from')
1470
to_transport = transport.clone('to')
1471
to_transport.ensure_base()
1472
from_transport.copy_tree_to_transport(to_transport)
1473
paths = set(transport.iter_files_recursive())
1474
self.assertEqual(paths,
1475
set(['from/dir/foo',
1484
def test_unicode_paths(self):
1485
"""Test that we can read/write files with Unicode names."""
1486
t = self.get_transport()
1488
# With FAT32 and certain encodings on win32
1489
# '\xe5' and '\xe4' actually map to the same file
1490
# adding a suffix kicks in the 'preserving but insensitive'
1491
# route, and maintains the right files
1492
files = [u'\xe5.1', # a w/ circle iso-8859-1
1493
u'\xe4.2', # a w/ dots iso-8859-1
1494
u'\u017d', # Z with umlat iso-8859-2
1495
u'\u062c', # Arabic j
1496
u'\u0410', # Russian A
1497
u'\u65e5', # Kanji person
1501
self.build_tree(files, transport=t, line_endings='binary')
1502
except UnicodeError:
1503
raise TestSkipped("cannot handle unicode paths in current encoding")
1505
# A plain unicode string is not a valid url
1507
self.assertRaises(InvalidURL, t.get, fname)
1510
fname_utf8 = fname.encode('utf-8')
1511
contents = 'contents of %s\n' % (fname_utf8,)
1512
self.check_transport_contents(contents, t, urlutils.escape(fname))
851
1514
def test_connect_twice_is_same_content(self):
852
# check that our server (whatever it is) is accessable reliably
1515
# check that our server (whatever it is) is accessible reliably
853
1516
# via get_transport and multiple connections share content.
854
1517
transport = self.get_transport()
855
1518
if transport.is_readonly():
857
transport.put('foo', StringIO('bar'))
858
transport2 = self.get_transport()
859
self.check_transport_contents('bar', transport2, 'foo')
860
# its base should be usable.
861
transport2 = bzrlib.transport.get_transport(transport.base)
862
self.check_transport_contents('bar', transport2, 'foo')
1520
transport.put_bytes('foo', 'bar')
1521
transport3 = self.get_transport()
1522
self.check_transport_contents('bar', transport3, 'foo')
864
1524
# now opening at a relative url should give use a sane result:
865
1525
transport.mkdir('newdir')
866
transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
867
transport2 = transport2.clone('..')
868
self.check_transport_contents('bar', transport2, 'foo')
1526
transport5 = self.get_transport('newdir')
1527
transport6 = transport5.clone('..')
1528
self.check_transport_contents('bar', transport6, 'foo')
870
1530
def test_lock_write(self):
1531
"""Test transport-level write locks.
1533
These are deprecated and transports may decline to support them.
871
1535
transport = self.get_transport()
872
1536
if transport.is_readonly():
873
1537
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
875
transport.put('lock', StringIO())
876
lock = transport.lock_write('lock')
1539
transport.put_bytes('lock', '')
1541
lock = transport.lock_write('lock')
1542
except TransportNotPossible:
877
1544
# TODO make this consistent on all platforms:
878
1545
# self.assertRaises(LockError, transport.lock_write, 'lock')
881
1548
def test_lock_read(self):
1549
"""Test transport-level read locks.
1551
These are deprecated and transports may decline to support them.
882
1553
transport = self.get_transport()
883
1554
if transport.is_readonly():
884
1555
file('lock', 'w').close()
886
transport.put('lock', StringIO())
887
lock = transport.lock_read('lock')
1557
transport.put_bytes('lock', '')
1559
lock = transport.lock_read('lock')
1560
except TransportNotPossible:
888
1562
# TODO make this consistent on all platforms:
889
1563
# self.assertRaises(LockError, transport.lock_read, 'lock')
1566
def test_readv(self):
1567
transport = self.get_transport()
1568
if transport.is_readonly():
1569
file('a', 'w').write('0123456789')
1571
transport.put_bytes('a', '0123456789')
1573
d = list(transport.readv('a', ((0, 1),)))
1574
self.assertEqual(d[0], (0, '0'))
1576
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1577
self.assertEqual(d[0], (0, '0'))
1578
self.assertEqual(d[1], (1, '1'))
1579
self.assertEqual(d[2], (3, '34'))
1580
self.assertEqual(d[3], (9, '9'))
1582
def test_readv_out_of_order(self):
1583
transport = self.get_transport()
1584
if transport.is_readonly():
1585
file('a', 'w').write('0123456789')
1587
transport.put_bytes('a', '01234567890')
1589
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1590
self.assertEqual(d[0], (1, '1'))
1591
self.assertEqual(d[1], (9, '9'))
1592
self.assertEqual(d[2], (0, '0'))
1593
self.assertEqual(d[3], (3, '34'))
1595
def test_readv_with_adjust_for_latency(self):
1596
transport = self.get_transport()
1597
# the adjust for latency flag expands the data region returned
1598
# according to a per-transport heuristic, so testing is a little
1599
# tricky as we need more data than the largest combining that our
1600
# transports do. To accomodate this we generate random data and cross
1601
# reference the returned data with the random data. To avoid doing
1602
# multiple large random byte look ups we do several tests on the same
1604
content = osutils.rand_bytes(200*1024)
1605
content_size = len(content)
1606
if transport.is_readonly():
1607
self.build_tree_contents([('a', content)])
1609
transport.put_bytes('a', content)
1610
def check_result_data(result_vector):
1611
for item in result_vector:
1612
data_len = len(item[1])
1613
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1616
result = list(transport.readv('a', ((0, 30),),
1617
adjust_for_latency=True, upper_limit=content_size))
1618
# we expect 1 result, from 0, to something > 30
1619
self.assertEqual(1, len(result))
1620
self.assertEqual(0, result[0][0])
1621
self.assertTrue(len(result[0][1]) >= 30)
1622
check_result_data(result)
1623
# end of file corner case
1624
result = list(transport.readv('a', ((204700, 100),),
1625
adjust_for_latency=True, upper_limit=content_size))
1626
# we expect 1 result, from 204800- its length, to the end
1627
self.assertEqual(1, len(result))
1628
data_len = len(result[0][1])
1629
self.assertEqual(204800-data_len, result[0][0])
1630
self.assertTrue(data_len >= 100)
1631
check_result_data(result)
1632
# out of order ranges are made in order
1633
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1634
adjust_for_latency=True, upper_limit=content_size))
1635
# we expect 2 results, in order, start and end.
1636
self.assertEqual(2, len(result))
1638
data_len = len(result[0][1])
1639
self.assertEqual(0, result[0][0])
1640
self.assertTrue(data_len >= 30)
1642
data_len = len(result[1][1])
1643
self.assertEqual(204800-data_len, result[1][0])
1644
self.assertTrue(data_len >= 100)
1645
check_result_data(result)
1646
# close ranges get combined (even if out of order)
1647
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1648
result = list(transport.readv('a', request_vector,
1649
adjust_for_latency=True, upper_limit=content_size))
1650
self.assertEqual(1, len(result))
1651
data_len = len(result[0][1])
1652
# minimum length is from 400 to 1034 - 634
1653
self.assertTrue(data_len >= 634)
1654
# must contain the region 400 to 1034
1655
self.assertTrue(result[0][0] <= 400)
1656
self.assertTrue(result[0][0] + data_len >= 1034)
1657
check_result_data(result)
1659
def test_readv_with_adjust_for_latency_with_big_file(self):
1660
transport = self.get_transport()
1661
# test from observed failure case.
1662
if transport.is_readonly():
1663
file('a', 'w').write('a'*1024*1024)
1665
transport.put_bytes('a', 'a'*1024*1024)
1666
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1667
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1668
(465373, 800), (947422, 800)]
1669
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1670
found_items = [False]*9
1671
for pos, (start, length) in enumerate(broken_vector):
1672
# check the range is covered by the result
1673
for offset, data in results:
1674
if offset <= start and start + length <= offset + len(data):
1675
found_items[pos] = True
1676
self.assertEqual([True]*9, found_items)
1678
def test_get_with_open_write_stream_sees_all_content(self):
1679
t = self.get_transport()
1682
handle = t.open_write_stream('foo')
1685
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1689
def test_get_smart_medium(self):
1690
"""All transports must either give a smart medium, or know they can't.
1692
transport = self.get_transport()
1694
client_medium = transport.get_smart_medium()
1695
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1696
except errors.NoSmartMedium:
1697
# as long as we got it we're fine
1700
def test_readv_short_read(self):
1701
transport = self.get_transport()
1702
if transport.is_readonly():
1703
file('a', 'w').write('0123456789')
1705
transport.put_bytes('a', '01234567890')
1707
# This is intentionally reading off the end of the file
1708
# since we are sure that it cannot get there
1709
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1710
# Can be raised by paramiko
1712
transport.readv, 'a', [(1,1), (8,10)])
1714
# This is trying to seek past the end of the file, it should
1715
# also raise a special error
1716
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1717
transport.readv, 'a', [(12,2)])