~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(jelmer) Add ControlDirFormat.supports_transport(). (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import itertools
 
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
 
27
import stat
 
28
import sys
 
29
 
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
 
36
    urlutils,
 
37
    )
 
38
from bzrlib.errors import (ConnectionError,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           NoSuchFile,
 
42
                           PathError,
 
43
                           TransportNotPossible,
 
44
                           )
 
45
from bzrlib.osutils import getcwd
 
46
from bzrlib.smart import medium
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
 
53
from bzrlib.tests.test_transport import TestTransportImplementation
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    _get_transport_modules,
 
57
    )
 
58
from bzrlib.transport.memory import MemoryTransport
 
59
 
 
60
 
 
61
def get_transport_test_permutations(module):
 
62
    """Get the permutations module wants to have tested."""
 
63
    if getattr(module, 'get_test_permutations', None) is None:
 
64
        raise AssertionError(
 
65
            "transport module %s doesn't provide get_test_permutations()"
 
66
            % module.__name__)
 
67
        return []
 
68
    return module.get_test_permutations()
 
69
 
 
70
 
 
71
def transport_test_permutations():
 
72
    """Return a list of the klass, server_factory pairs to test."""
 
73
    result = []
 
74
    for module in _get_transport_modules():
 
75
        try:
 
76
            permutations = get_transport_test_permutations(
 
77
                pyutils.get_named_object(module))
 
78
            for (klass, server_factory) in permutations:
 
79
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
80
                    {"transport_class":klass,
 
81
                     "transport_server":server_factory})
 
82
                result.append(scenario)
 
83
        except errors.DependencyNotPresent, e:
 
84
            # Continue even if a dependency prevents us
 
85
            # from adding this test
 
86
            pass
 
87
    return result
 
88
 
 
89
 
 
90
def load_tests(standard_tests, module, loader):
 
91
    """Multiply tests for tranport implementations."""
 
92
    result = loader.suiteClass()
 
93
    scenarios = transport_test_permutations()
 
94
    return multiply_tests(standard_tests, scenarios, result)
 
95
 
 
96
 
 
97
class TransportTests(TestTransportImplementation):
 
98
 
 
99
    def setUp(self):
 
100
        super(TransportTests, self).setUp()
 
101
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
102
 
 
103
    def check_transport_contents(self, content, transport, relpath):
 
104
        """Check that transport.get_bytes(relpath) == content."""
 
105
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
106
 
 
107
    def test_ensure_base_missing(self):
 
108
        """.ensure_base() should create the directory if it doesn't exist"""
 
109
        t = self.get_transport()
 
110
        t_a = t.clone('a')
 
111
        if t_a.is_readonly():
 
112
            self.assertRaises(TransportNotPossible,
 
113
                              t_a.ensure_base)
 
114
            return
 
115
        self.assertTrue(t_a.ensure_base())
 
116
        self.assertTrue(t.has('a'))
 
117
 
 
118
    def test_ensure_base_exists(self):
 
119
        """.ensure_base() should just be happy if it already exists"""
 
120
        t = self.get_transport()
 
121
        if t.is_readonly():
 
122
            return
 
123
 
 
124
        t.mkdir('a')
 
125
        t_a = t.clone('a')
 
126
        # ensure_base returns False if it didn't create the base
 
127
        self.assertFalse(t_a.ensure_base())
 
128
 
 
129
    def test_ensure_base_missing_parent(self):
 
130
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
131
        t = self.get_transport()
 
132
        if t.is_readonly():
 
133
            return
 
134
 
 
135
        t_a = t.clone('a')
 
136
        t_b = t_a.clone('b')
 
137
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
138
 
 
139
    def test_external_url(self):
 
140
        """.external_url either works or raises InProcessTransport."""
 
141
        t = self.get_transport()
 
142
        try:
 
143
            t.external_url()
 
144
        except errors.InProcessTransport:
 
145
            pass
 
146
 
 
147
    def test_has(self):
 
148
        t = self.get_transport()
 
149
 
 
150
        files = ['a', 'b', 'e', 'g', '%']
 
151
        self.build_tree(files, transport=t)
 
152
        self.assertEqual(True, t.has('a'))
 
153
        self.assertEqual(False, t.has('c'))
 
154
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
155
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
156
                                           'e', 'f', 'g', 'h'])),
 
157
                         [True, True, False, False,
 
158
                          True, False, True, False])
 
159
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
160
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
161
                                           urlutils.escape('%%')]))
 
162
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
163
                                                'e', 'f', 'g', 'h']))),
 
164
                         [True, True, False, False,
 
165
                          True, False, True, False])
 
166
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
167
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
168
 
 
169
    def test_has_root_works(self):
 
170
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
171
            raise TestNotApplicable(
 
172
                "SmartTCPServer_for_testing intentionally does not allow "
 
173
                "access to /.")
 
174
        current_transport = self.get_transport()
 
175
        self.assertTrue(current_transport.has('/'))
 
176
        root = current_transport.clone('/')
 
177
        self.assertTrue(root.has(''))
 
178
 
 
179
    def test_get(self):
 
180
        t = self.get_transport()
 
181
 
 
182
        files = ['a', 'b', 'e', 'g']
 
183
        contents = ['contents of a\n',
 
184
                    'contents of b\n',
 
185
                    'contents of e\n',
 
186
                    'contents of g\n',
 
187
                    ]
 
188
        self.build_tree(files, transport=t, line_endings='binary')
 
189
        self.check_transport_contents('contents of a\n', t, 'a')
 
190
        content_f = t.get_multi(files)
 
191
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
192
        # evaluate their inputs, the transport requests should be issued and
 
193
        # handled sequentially (we don't want to force transport to buffer).
 
194
        for content, f in itertools.izip(contents, content_f):
 
195
            self.assertEqual(content, f.read())
 
196
 
 
197
        content_f = t.get_multi(iter(files))
 
198
        # Use itertools.izip() for the same reason
 
199
        for content, f in itertools.izip(contents, content_f):
 
200
            self.assertEqual(content, f.read())
 
201
 
 
202
    def test_get_unknown_file(self):
 
203
        t = self.get_transport()
 
204
        files = ['a', 'b']
 
205
        contents = ['contents of a\n',
 
206
                    'contents of b\n',
 
207
                    ]
 
208
        self.build_tree(files, transport=t, line_endings='binary')
 
209
        self.assertRaises(NoSuchFile, t.get, 'c')
 
210
        def iterate_and_close(func, *args):
 
211
            for f in func(*args):
 
212
                # We call f.read() here because things like paramiko actually
 
213
                # spawn a thread to prefetch the content, which we want to
 
214
                # consume before we close the handle.
 
215
                content = f.read()
 
216
                f.close()
 
217
        self.assertRaises(NoSuchFile, iterate_and_close,
 
218
                          t.get_multi, ['a', 'b', 'c'])
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, iter(['a', 'b', 'c']))
 
221
 
 
222
    def test_get_directory_read_gives_ReadError(self):
 
223
        """consistent errors for read() on a file returned by get()."""
 
224
        t = self.get_transport()
 
225
        if t.is_readonly():
 
226
            self.build_tree(['a directory/'])
 
227
        else:
 
228
            t.mkdir('a%20directory')
 
229
        # getting the file must either work or fail with a PathError
 
230
        try:
 
231
            a_file = t.get('a%20directory')
 
232
        except (errors.PathError, errors.RedirectRequested):
 
233
            # early failure return immediately.
 
234
            return
 
235
        # having got a file, read() must either work (i.e. http reading a dir
 
236
        # listing) or fail with ReadError
 
237
        try:
 
238
            a_file.read()
 
239
        except errors.ReadError:
 
240
            pass
 
241
 
 
242
    def test_get_bytes(self):
 
243
        t = self.get_transport()
 
244
 
 
245
        files = ['a', 'b', 'e', 'g']
 
246
        contents = ['contents of a\n',
 
247
                    'contents of b\n',
 
248
                    'contents of e\n',
 
249
                    'contents of g\n',
 
250
                    ]
 
251
        self.build_tree(files, transport=t, line_endings='binary')
 
252
        self.check_transport_contents('contents of a\n', t, 'a')
 
253
 
 
254
        for content, fname in zip(contents, files):
 
255
            self.assertEqual(content, t.get_bytes(fname))
 
256
 
 
257
    def test_get_bytes_unknown_file(self):
 
258
        t = self.get_transport()
 
259
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
260
 
 
261
    def test_get_with_open_write_stream_sees_all_content(self):
 
262
        t = self.get_transport()
 
263
        if t.is_readonly():
 
264
            return
 
265
        handle = t.open_write_stream('foo')
 
266
        try:
 
267
            handle.write('b')
 
268
            self.assertEqual('b', t.get_bytes('foo'))
 
269
        finally:
 
270
            handle.close()
 
271
 
 
272
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
273
        t = self.get_transport()
 
274
        if t.is_readonly():
 
275
            return
 
276
        handle = t.open_write_stream('foo')
 
277
        try:
 
278
            handle.write('b')
 
279
            self.assertEqual('b', t.get_bytes('foo'))
 
280
            f = t.get('foo')
 
281
            try:
 
282
                self.assertEqual('b', f.read())
 
283
            finally:
 
284
                f.close()
 
285
        finally:
 
286
            handle.close()
 
287
 
 
288
    def test_put_bytes(self):
 
289
        t = self.get_transport()
 
290
 
 
291
        if t.is_readonly():
 
292
            self.assertRaises(TransportNotPossible,
 
293
                    t.put_bytes, 'a', 'some text for a\n')
 
294
            return
 
295
 
 
296
        t.put_bytes('a', 'some text for a\n')
 
297
        self.assertTrue(t.has('a'))
 
298
        self.check_transport_contents('some text for a\n', t, 'a')
 
299
 
 
300
        # The contents should be overwritten
 
301
        t.put_bytes('a', 'new text for a\n')
 
302
        self.check_transport_contents('new text for a\n', t, 'a')
 
303
 
 
304
        self.assertRaises(NoSuchFile,
 
305
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
306
 
 
307
    def test_put_bytes_non_atomic(self):
 
308
        t = self.get_transport()
 
309
 
 
310
        if t.is_readonly():
 
311
            self.assertRaises(TransportNotPossible,
 
312
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
313
            return
 
314
 
 
315
        self.assertFalse(t.has('a'))
 
316
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
317
        self.assertTrue(t.has('a'))
 
318
        self.check_transport_contents('some text for a\n', t, 'a')
 
319
        # Put also replaces contents
 
320
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
321
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
322
 
 
323
        # Make sure we can create another file
 
324
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
325
        # And overwrite 'a' with empty contents
 
326
        t.put_bytes_non_atomic('a', '')
 
327
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
328
        self.check_transport_contents('', t, 'a')
 
329
 
 
330
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
331
                                       'contents\n')
 
332
        # Now test the create_parent flag
 
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
334
                                       'contents\n')
 
335
        self.assertFalse(t.has('dir/a'))
 
336
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
337
                               create_parent_dir=True)
 
338
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
339
 
 
340
        # But we still get NoSuchFile if we can't make the parent dir
 
341
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
342
                                       'contents\n',
 
343
                                       create_parent_dir=True)
 
344
 
 
345
    def test_put_bytes_permissions(self):
 
346
        t = self.get_transport()
 
347
 
 
348
        if t.is_readonly():
 
349
            return
 
350
        if not t._can_roundtrip_unix_modebits():
 
351
            # Can't roundtrip, so no need to run this test
 
352
            return
 
353
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
354
        self.assertTransportMode(t, 'mode644', 0644)
 
355
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
356
        self.assertTransportMode(t, 'mode666', 0666)
 
357
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
358
        self.assertTransportMode(t, 'mode600', 0600)
 
359
        # Yes, you can put_bytes a file such that it becomes readonly
 
360
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
361
        self.assertTransportMode(t, 'mode400', 0400)
 
362
 
 
363
        # The default permissions should be based on the current umask
 
364
        umask = osutils.get_umask()
 
365
        t.put_bytes('nomode', 'test text\n', mode=None)
 
366
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
367
 
 
368
    def test_put_bytes_non_atomic_permissions(self):
 
369
        t = self.get_transport()
 
370
 
 
371
        if t.is_readonly():
 
372
            return
 
373
        if not t._can_roundtrip_unix_modebits():
 
374
            # Can't roundtrip, so no need to run this test
 
375
            return
 
376
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
377
        self.assertTransportMode(t, 'mode644', 0644)
 
378
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
379
        self.assertTransportMode(t, 'mode666', 0666)
 
380
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
381
        self.assertTransportMode(t, 'mode600', 0600)
 
382
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
383
        self.assertTransportMode(t, 'mode400', 0400)
 
384
 
 
385
        # The default permissions should be based on the current umask
 
386
        umask = osutils.get_umask()
 
387
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
388
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
389
 
 
390
        # We should also be able to set the mode for a parent directory
 
391
        # when it is created
 
392
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
393
                               dir_mode=0700, create_parent_dir=True)
 
394
        self.assertTransportMode(t, 'dir700', 0700)
 
395
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
396
                               dir_mode=0770, create_parent_dir=True)
 
397
        self.assertTransportMode(t, 'dir770', 0770)
 
398
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
399
                               dir_mode=0777, create_parent_dir=True)
 
400
        self.assertTransportMode(t, 'dir777', 0777)
 
401
 
 
402
    def test_put_file(self):
 
403
        t = self.get_transport()
 
404
 
 
405
        if t.is_readonly():
 
406
            self.assertRaises(TransportNotPossible,
 
407
                    t.put_file, 'a', StringIO('some text for a\n'))
 
408
            return
 
409
 
 
410
        result = t.put_file('a', StringIO('some text for a\n'))
 
411
        # put_file returns the length of the data written
 
412
        self.assertEqual(16, result)
 
413
        self.assertTrue(t.has('a'))
 
414
        self.check_transport_contents('some text for a\n', t, 'a')
 
415
        # Put also replaces contents
 
416
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
417
        self.assertEqual(19, result)
 
418
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
419
        self.assertRaises(NoSuchFile,
 
420
                          t.put_file, 'path/doesnt/exist/c',
 
421
                              StringIO('contents'))
 
422
 
 
423
    def test_put_file_non_atomic(self):
 
424
        t = self.get_transport()
 
425
 
 
426
        if t.is_readonly():
 
427
            self.assertRaises(TransportNotPossible,
 
428
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
429
            return
 
430
 
 
431
        self.assertFalse(t.has('a'))
 
432
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
433
        self.assertTrue(t.has('a'))
 
434
        self.check_transport_contents('some text for a\n', t, 'a')
 
435
        # Put also replaces contents
 
436
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
437
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
438
 
 
439
        # Make sure we can create another file
 
440
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
441
        # And overwrite 'a' with empty contents
 
442
        t.put_file_non_atomic('a', StringIO(''))
 
443
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
444
        self.check_transport_contents('', t, 'a')
 
445
 
 
446
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
447
                                       StringIO('contents\n'))
 
448
        # Now test the create_parent flag
 
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
450
                                       StringIO('contents\n'))
 
451
        self.assertFalse(t.has('dir/a'))
 
452
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
453
                              create_parent_dir=True)
 
454
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
455
 
 
456
        # But we still get NoSuchFile if we can't make the parent dir
 
457
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
458
                                       StringIO('contents\n'),
 
459
                                       create_parent_dir=True)
 
460
 
 
461
    def test_put_file_permissions(self):
 
462
 
 
463
        t = self.get_transport()
 
464
 
 
465
        if t.is_readonly():
 
466
            return
 
467
        if not t._can_roundtrip_unix_modebits():
 
468
            # Can't roundtrip, so no need to run this test
 
469
            return
 
470
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
471
        self.assertTransportMode(t, 'mode644', 0644)
 
472
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
473
        self.assertTransportMode(t, 'mode666', 0666)
 
474
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
475
        self.assertTransportMode(t, 'mode600', 0600)
 
476
        # Yes, you can put a file such that it becomes readonly
 
477
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
478
        self.assertTransportMode(t, 'mode400', 0400)
 
479
        # The default permissions should be based on the current umask
 
480
        umask = osutils.get_umask()
 
481
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
482
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
483
 
 
484
    def test_put_file_non_atomic_permissions(self):
 
485
        t = self.get_transport()
 
486
 
 
487
        if t.is_readonly():
 
488
            return
 
489
        if not t._can_roundtrip_unix_modebits():
 
490
            # Can't roundtrip, so no need to run this test
 
491
            return
 
492
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
493
        self.assertTransportMode(t, 'mode644', 0644)
 
494
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
495
        self.assertTransportMode(t, 'mode666', 0666)
 
496
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
497
        self.assertTransportMode(t, 'mode600', 0600)
 
498
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
499
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
500
        self.assertTransportMode(t, 'mode400', 0400)
 
501
 
 
502
        # The default permissions should be based on the current umask
 
503
        umask = osutils.get_umask()
 
504
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
505
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
506
 
 
507
        # We should also be able to set the mode for a parent directory
 
508
        # when it is created
 
509
        sio = StringIO()
 
510
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
511
                              dir_mode=0700, create_parent_dir=True)
 
512
        self.assertTransportMode(t, 'dir700', 0700)
 
513
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
514
                              dir_mode=0770, create_parent_dir=True)
 
515
        self.assertTransportMode(t, 'dir770', 0770)
 
516
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
517
                              dir_mode=0777, create_parent_dir=True)
 
518
        self.assertTransportMode(t, 'dir777', 0777)
 
519
 
 
520
    def test_put_bytes_unicode(self):
 
521
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
522
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
523
        # (we don't want to encode unicode here at all, callers should be
 
524
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
525
        # compatibility.  At some point we should use a specific exception.
 
526
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
527
        t = self.get_transport()
 
528
        if t.is_readonly():
 
529
            return
 
530
        unicode_string = u'\u1234'
 
531
        self.assertRaises(
 
532
            (AssertionError, UnicodeEncodeError),
 
533
            t.put_bytes, 'foo', unicode_string)
 
534
 
 
535
    def test_put_file_unicode(self):
 
536
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
537
        # This situation can happen (and has) if code is careless about the type
 
538
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
539
        # cStringIO, because it never returns unicode from read.
 
540
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
541
        # raise, but we raise it for hysterical raisins.
 
542
        t = self.get_transport()
 
543
        if t.is_readonly():
 
544
            return
 
545
        unicode_file = pyStringIO(u'\u1234')
 
546
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
547
 
 
548
    def test_mkdir(self):
 
549
        t = self.get_transport()
 
550
 
 
551
        if t.is_readonly():
 
552
            # cannot mkdir on readonly transports. We're not testing for
 
553
            # cache coherency because cache behaviour is not currently
 
554
            # defined for the transport interface.
 
555
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
556
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
557
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
558
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
559
            return
 
560
        # Test mkdir
 
561
        t.mkdir('dir_a')
 
562
        self.assertEqual(t.has('dir_a'), True)
 
563
        self.assertEqual(t.has('dir_b'), False)
 
564
 
 
565
        t.mkdir('dir_b')
 
566
        self.assertEqual(t.has('dir_b'), True)
 
567
 
 
568
        t.mkdir_multi(['dir_c', 'dir_d'])
 
569
 
 
570
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
571
        self.assertEqual(list(t.has_multi(
 
572
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
573
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
574
            [True, True, True, False,
 
575
             True, True, True, True])
 
576
 
 
577
        # we were testing that a local mkdir followed by a transport
 
578
        # mkdir failed thusly, but given that we * in one process * do not
 
579
        # concurrently fiddle with disk dirs and then use transport to do
 
580
        # things, the win here seems marginal compared to the constraint on
 
581
        # the interface. RBC 20051227
 
582
        t.mkdir('dir_g')
 
583
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
584
 
 
585
        # Test get/put in sub-directories
 
586
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
587
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
588
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
589
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
590
 
 
591
        # mkdir of a dir with an absent parent
 
592
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
593
 
 
594
    def test_mkdir_permissions(self):
 
595
        t = self.get_transport()
 
596
        if t.is_readonly():
 
597
            return
 
598
        if not t._can_roundtrip_unix_modebits():
 
599
            # no sense testing on this transport
 
600
            return
 
601
        # Test mkdir with a mode
 
602
        t.mkdir('dmode755', mode=0755)
 
603
        self.assertTransportMode(t, 'dmode755', 0755)
 
604
        t.mkdir('dmode555', mode=0555)
 
605
        self.assertTransportMode(t, 'dmode555', 0555)
 
606
        t.mkdir('dmode777', mode=0777)
 
607
        self.assertTransportMode(t, 'dmode777', 0777)
 
608
        t.mkdir('dmode700', mode=0700)
 
609
        self.assertTransportMode(t, 'dmode700', 0700)
 
610
        t.mkdir_multi(['mdmode755'], mode=0755)
 
611
        self.assertTransportMode(t, 'mdmode755', 0755)
 
612
 
 
613
        # Default mode should be based on umask
 
614
        umask = osutils.get_umask()
 
615
        t.mkdir('dnomode', mode=None)
 
616
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
617
 
 
618
    def test_opening_a_file_stream_creates_file(self):
 
619
        t = self.get_transport()
 
620
        if t.is_readonly():
 
621
            return
 
622
        handle = t.open_write_stream('foo')
 
623
        try:
 
624
            self.assertEqual('', t.get_bytes('foo'))
 
625
        finally:
 
626
            handle.close()
 
627
 
 
628
    def test_opening_a_file_stream_can_set_mode(self):
 
629
        t = self.get_transport()
 
630
        if t.is_readonly():
 
631
            return
 
632
        if not t._can_roundtrip_unix_modebits():
 
633
            # Can't roundtrip, so no need to run this test
 
634
            return
 
635
        def check_mode(name, mode, expected):
 
636
            handle = t.open_write_stream(name, mode=mode)
 
637
            handle.close()
 
638
            self.assertTransportMode(t, name, expected)
 
639
        check_mode('mode644', 0644, 0644)
 
640
        check_mode('mode666', 0666, 0666)
 
641
        check_mode('mode600', 0600, 0600)
 
642
        # The default permissions should be based on the current umask
 
643
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
644
 
 
645
    def test_copy_to(self):
 
646
        # FIXME: test:   same server to same server (partly done)
 
647
        # same protocol two servers
 
648
        # and    different protocols (done for now except for MemoryTransport.
 
649
        # - RBC 20060122
 
650
 
 
651
        def simple_copy_files(transport_from, transport_to):
 
652
            files = ['a', 'b', 'c', 'd']
 
653
            self.build_tree(files, transport=transport_from)
 
654
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
655
            for f in files:
 
656
                self.check_transport_contents(transport_to.get_bytes(f),
 
657
                                              transport_from, f)
 
658
 
 
659
        t = self.get_transport()
 
660
        temp_transport = MemoryTransport('memory:///')
 
661
        simple_copy_files(t, temp_transport)
 
662
        if not t.is_readonly():
 
663
            t.mkdir('copy_to_simple')
 
664
            t2 = t.clone('copy_to_simple')
 
665
            simple_copy_files(t, t2)
 
666
 
 
667
 
 
668
        # Test that copying into a missing directory raises
 
669
        # NoSuchFile
 
670
        if t.is_readonly():
 
671
            self.build_tree(['e/', 'e/f'])
 
672
        else:
 
673
            t.mkdir('e')
 
674
            t.put_bytes('e/f', 'contents of e')
 
675
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
676
        temp_transport.mkdir('e')
 
677
        t.copy_to(['e/f'], temp_transport)
 
678
 
 
679
        del temp_transport
 
680
        temp_transport = MemoryTransport('memory:///')
 
681
 
 
682
        files = ['a', 'b', 'c', 'd']
 
683
        t.copy_to(iter(files), temp_transport)
 
684
        for f in files:
 
685
            self.check_transport_contents(temp_transport.get_bytes(f),
 
686
                                          t, f)
 
687
        del temp_transport
 
688
 
 
689
        for mode in (0666, 0644, 0600, 0400):
 
690
            temp_transport = MemoryTransport("memory:///")
 
691
            t.copy_to(files, temp_transport, mode=mode)
 
692
            for f in files:
 
693
                self.assertTransportMode(temp_transport, f, mode)
 
694
 
 
695
    def test_create_prefix(self):
 
696
        t = self.get_transport()
 
697
        sub = t.clone('foo').clone('bar')
 
698
        try:
 
699
            sub.create_prefix()
 
700
        except TransportNotPossible:
 
701
            self.assertTrue(t.is_readonly())
 
702
        else:
 
703
            self.assertTrue(t.has('foo/bar'))
 
704
 
 
705
    def test_append_file(self):
 
706
        t = self.get_transport()
 
707
 
 
708
        if t.is_readonly():
 
709
            self.assertRaises(TransportNotPossible,
 
710
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
711
            return
 
712
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
713
        t.put_bytes('b', 'contents\nfor b\n')
 
714
 
 
715
        self.assertEqual(20,
 
716
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
717
 
 
718
        self.check_transport_contents(
 
719
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
720
            t, 'a')
 
721
 
 
722
        # a file with no parent should fail..
 
723
        self.assertRaises(NoSuchFile,
 
724
                          t.append_file, 'missing/path', StringIO('content'))
 
725
 
 
726
        # And we can create new files, too
 
727
        self.assertEqual(0,
 
728
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
729
        self.check_transport_contents('some text\nfor a missing file\n',
 
730
                                      t, 'c')
 
731
 
 
732
    def test_append_bytes(self):
 
733
        t = self.get_transport()
 
734
 
 
735
        if t.is_readonly():
 
736
            self.assertRaises(TransportNotPossible,
 
737
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
738
            return
 
739
 
 
740
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
741
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
742
 
 
743
        self.assertEqual(20,
 
744
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
745
 
 
746
        self.check_transport_contents(
 
747
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
748
            t, 'a')
 
749
 
 
750
        # a file with no parent should fail..
 
751
        self.assertRaises(NoSuchFile,
 
752
                          t.append_bytes, 'missing/path', 'content')
 
753
 
 
754
    def test_append_multi(self):
 
755
        t = self.get_transport()
 
756
 
 
757
        if t.is_readonly():
 
758
            return
 
759
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
760
                         'add\nsome\nmore\ncontents\n')
 
761
        t.put_bytes('b', 'contents\nfor b\n')
 
762
 
 
763
        self.assertEqual((43, 15),
 
764
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
765
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
766
 
 
767
        self.check_transport_contents(
 
768
            'diff\ncontents for\na\n'
 
769
            'add\nsome\nmore\ncontents\n'
 
770
            'and\nthen\nsome\nmore\n',
 
771
            t, 'a')
 
772
        self.check_transport_contents(
 
773
                'contents\nfor b\n'
 
774
                'some\nmore\nfor\nb\n',
 
775
                t, 'b')
 
776
 
 
777
        self.assertEqual((62, 31),
 
778
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
779
                                 ('b', StringIO('from an iterator\n'))])))
 
780
        self.check_transport_contents(
 
781
            'diff\ncontents for\na\n'
 
782
            'add\nsome\nmore\ncontents\n'
 
783
            'and\nthen\nsome\nmore\n'
 
784
            'a little bit more\n',
 
785
            t, 'a')
 
786
        self.check_transport_contents(
 
787
                'contents\nfor b\n'
 
788
                'some\nmore\nfor\nb\n'
 
789
                'from an iterator\n',
 
790
                t, 'b')
 
791
 
 
792
        self.assertEqual((80, 0),
 
793
            t.append_multi([('a', StringIO('some text in a\n')),
 
794
                            ('d', StringIO('missing file r\n'))]))
 
795
 
 
796
        self.check_transport_contents(
 
797
            'diff\ncontents for\na\n'
 
798
            'add\nsome\nmore\ncontents\n'
 
799
            'and\nthen\nsome\nmore\n'
 
800
            'a little bit more\n'
 
801
            'some text in a\n',
 
802
            t, 'a')
 
803
        self.check_transport_contents('missing file r\n', t, 'd')
 
804
 
 
805
    def test_append_file_mode(self):
 
806
        """Check that append accepts a mode parameter"""
 
807
        # check append accepts a mode
 
808
        t = self.get_transport()
 
809
        if t.is_readonly():
 
810
            self.assertRaises(TransportNotPossible,
 
811
                t.append_file, 'f', StringIO('f'), mode=None)
 
812
            return
 
813
        t.append_file('f', StringIO('f'), mode=None)
 
814
 
 
815
    def test_append_bytes_mode(self):
 
816
        # check append_bytes accepts a mode
 
817
        t = self.get_transport()
 
818
        if t.is_readonly():
 
819
            self.assertRaises(TransportNotPossible,
 
820
                t.append_bytes, 'f', 'f', mode=None)
 
821
            return
 
822
        t.append_bytes('f', 'f', mode=None)
 
823
 
 
824
    def test_delete(self):
 
825
        # TODO: Test Transport.delete
 
826
        t = self.get_transport()
 
827
 
 
828
        # Not much to do with a readonly transport
 
829
        if t.is_readonly():
 
830
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
831
            return
 
832
 
 
833
        t.put_bytes('a', 'a little bit of text\n')
 
834
        self.assertTrue(t.has('a'))
 
835
        t.delete('a')
 
836
        self.assertFalse(t.has('a'))
 
837
 
 
838
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
839
 
 
840
        t.put_bytes('a', 'a text\n')
 
841
        t.put_bytes('b', 'b text\n')
 
842
        t.put_bytes('c', 'c text\n')
 
843
        self.assertEqual([True, True, True],
 
844
                list(t.has_multi(['a', 'b', 'c'])))
 
845
        t.delete_multi(['a', 'c'])
 
846
        self.assertEqual([False, True, False],
 
847
                list(t.has_multi(['a', 'b', 'c'])))
 
848
        self.assertFalse(t.has('a'))
 
849
        self.assertTrue(t.has('b'))
 
850
        self.assertFalse(t.has('c'))
 
851
 
 
852
        self.assertRaises(NoSuchFile,
 
853
                t.delete_multi, ['a', 'b', 'c'])
 
854
 
 
855
        self.assertRaises(NoSuchFile,
 
856
                t.delete_multi, iter(['a', 'b', 'c']))
 
857
 
 
858
        t.put_bytes('a', 'another a text\n')
 
859
        t.put_bytes('c', 'another c text\n')
 
860
        t.delete_multi(iter(['a', 'b', 'c']))
 
861
 
 
862
        # We should have deleted everything
 
863
        # SftpServer creates control files in the
 
864
        # working directory, so we can just do a
 
865
        # plain "listdir".
 
866
        # self.assertEqual([], os.listdir('.'))
 
867
 
 
868
    def test_recommended_page_size(self):
 
869
        """Transports recommend a page size for partial access to files."""
 
870
        t = self.get_transport()
 
871
        self.assertIsInstance(t.recommended_page_size(), int)
 
872
 
 
873
    def test_rmdir(self):
 
874
        t = self.get_transport()
 
875
        # Not much to do with a readonly transport
 
876
        if t.is_readonly():
 
877
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
878
            return
 
879
        t.mkdir('adir')
 
880
        t.mkdir('adir/bdir')
 
881
        t.rmdir('adir/bdir')
 
882
        # ftp may not be able to raise NoSuchFile for lack of
 
883
        # details when failing
 
884
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
885
        t.rmdir('adir')
 
886
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
887
 
 
888
    def test_rmdir_not_empty(self):
 
889
        """Deleting a non-empty directory raises an exception
 
890
 
 
891
        sftp (and possibly others) don't give us a specific "directory not
 
892
        empty" exception -- we can just see that the operation failed.
 
893
        """
 
894
        t = self.get_transport()
 
895
        if t.is_readonly():
 
896
            return
 
897
        t.mkdir('adir')
 
898
        t.mkdir('adir/bdir')
 
899
        self.assertRaises(PathError, t.rmdir, 'adir')
 
900
 
 
901
    def test_rmdir_empty_but_similar_prefix(self):
 
902
        """rmdir does not get confused by sibling paths.
 
903
 
 
904
        A naive implementation of MemoryTransport would refuse to rmdir
 
905
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
906
        uses "path.startswith(dir)" on all file paths to determine if directory
 
907
        is empty.
 
908
        """
 
909
        t = self.get_transport()
 
910
        if t.is_readonly():
 
911
            return
 
912
        t.mkdir('foo')
 
913
        t.put_bytes('foo-bar', '')
 
914
        t.mkdir('foo-baz')
 
915
        t.rmdir('foo')
 
916
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
917
        self.assertTrue(t.has('foo-bar'))
 
918
 
 
919
    def test_rename_dir_succeeds(self):
 
920
        t = self.get_transport()
 
921
        if t.is_readonly():
 
922
            raise TestSkipped("transport is readonly")
 
923
        t.mkdir('adir')
 
924
        t.mkdir('adir/asubdir')
 
925
        t.rename('adir', 'bdir')
 
926
        self.assertTrue(t.has('bdir/asubdir'))
 
927
        self.assertFalse(t.has('adir'))
 
928
 
 
929
    def test_rename_dir_nonempty(self):
 
930
        """Attempting to replace a nonemtpy directory should fail"""
 
931
        t = self.get_transport()
 
932
        if t.is_readonly():
 
933
            raise TestSkipped("transport is readonly")
 
934
        t.mkdir('adir')
 
935
        t.mkdir('adir/asubdir')
 
936
        t.mkdir('bdir')
 
937
        t.mkdir('bdir/bsubdir')
 
938
        # any kind of PathError would be OK, though we normally expect
 
939
        # DirectoryNotEmpty
 
940
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
941
        # nothing was changed so it should still be as before
 
942
        self.assertTrue(t.has('bdir/bsubdir'))
 
943
        self.assertFalse(t.has('adir/bdir'))
 
944
        self.assertFalse(t.has('adir/bsubdir'))
 
945
 
 
946
    def test_rename_across_subdirs(self):
 
947
        t = self.get_transport()
 
948
        if t.is_readonly():
 
949
            raise TestNotApplicable("transport is readonly")
 
950
        t.mkdir('a')
 
951
        t.mkdir('b')
 
952
        ta = t.clone('a')
 
953
        tb = t.clone('b')
 
954
        ta.put_bytes('f', 'aoeu')
 
955
        ta.rename('f', '../b/f')
 
956
        self.assertTrue(tb.has('f'))
 
957
        self.assertFalse(ta.has('f'))
 
958
        self.assertTrue(t.has('b/f'))
 
959
 
 
960
    def test_delete_tree(self):
 
961
        t = self.get_transport()
 
962
 
 
963
        # Not much to do with a readonly transport
 
964
        if t.is_readonly():
 
965
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
966
            return
 
967
 
 
968
        # and does it like listing ?
 
969
        t.mkdir('adir')
 
970
        try:
 
971
            t.delete_tree('adir')
 
972
        except TransportNotPossible:
 
973
            # ok, this transport does not support delete_tree
 
974
            return
 
975
 
 
976
        # did it delete that trivial case?
 
977
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
978
 
 
979
        self.build_tree(['adir/',
 
980
                         'adir/file',
 
981
                         'adir/subdir/',
 
982
                         'adir/subdir/file',
 
983
                         'adir/subdir2/',
 
984
                         'adir/subdir2/file',
 
985
                         ], transport=t)
 
986
 
 
987
        t.delete_tree('adir')
 
988
        # adir should be gone now.
 
989
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
990
 
 
991
    def test_move(self):
 
992
        t = self.get_transport()
 
993
 
 
994
        if t.is_readonly():
 
995
            return
 
996
 
 
997
        # TODO: I would like to use os.listdir() to
 
998
        # make sure there are no extra files, but SftpServer
 
999
        # creates control files in the working directory
 
1000
        # perhaps all of this could be done in a subdirectory
 
1001
 
 
1002
        t.put_bytes('a', 'a first file\n')
 
1003
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
1004
 
 
1005
        t.move('a', 'b')
 
1006
        self.assertTrue(t.has('b'))
 
1007
        self.assertFalse(t.has('a'))
 
1008
 
 
1009
        self.check_transport_contents('a first file\n', t, 'b')
 
1010
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
1011
 
 
1012
        # Overwrite a file
 
1013
        t.put_bytes('c', 'c this file\n')
 
1014
        t.move('c', 'b')
 
1015
        self.assertFalse(t.has('c'))
 
1016
        self.check_transport_contents('c this file\n', t, 'b')
 
1017
 
 
1018
        # TODO: Try to write a test for atomicity
 
1019
        # TODO: Test moving into a non-existent subdirectory
 
1020
        # TODO: Test Transport.move_multi
 
1021
 
 
1022
    def test_copy(self):
 
1023
        t = self.get_transport()
 
1024
 
 
1025
        if t.is_readonly():
 
1026
            return
 
1027
 
 
1028
        t.put_bytes('a', 'a file\n')
 
1029
        t.copy('a', 'b')
 
1030
        self.check_transport_contents('a file\n', t, 'b')
 
1031
 
 
1032
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
1033
        os.mkdir('c')
 
1034
        # What should the assert be if you try to copy a
 
1035
        # file over a directory?
 
1036
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
1037
        t.put_bytes('d', 'text in d\n')
 
1038
        t.copy('d', 'b')
 
1039
        self.check_transport_contents('text in d\n', t, 'b')
 
1040
 
 
1041
        # TODO: test copy_multi
 
1042
 
 
1043
    def test_connection_error(self):
 
1044
        """ConnectionError is raised when connection is impossible.
 
1045
 
 
1046
        The error should be raised from the first operation on the transport.
 
1047
        """
 
1048
        try:
 
1049
            url = self._server.get_bogus_url()
 
1050
        except NotImplementedError:
 
1051
            raise TestSkipped("Transport %s has no bogus URL support." %
 
1052
                              self._server.__class__)
 
1053
        t = _mod_transport.get_transport_from_url(url)
 
1054
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
1055
 
 
1056
    def test_stat(self):
 
1057
        # TODO: Test stat, just try once, and if it throws, stop testing
 
1058
        from stat import S_ISDIR, S_ISREG
 
1059
 
 
1060
        t = self.get_transport()
 
1061
 
 
1062
        try:
 
1063
            st = t.stat('.')
 
1064
        except TransportNotPossible, e:
 
1065
            # This transport cannot stat
 
1066
            return
 
1067
 
 
1068
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
1069
        sizes = [14, 0, 16, 0, 18]
 
1070
        self.build_tree(paths, transport=t, line_endings='binary')
 
1071
 
 
1072
        for path, size in zip(paths, sizes):
 
1073
            st = t.stat(path)
 
1074
            if path.endswith('/'):
 
1075
                self.assertTrue(S_ISDIR(st.st_mode))
 
1076
                # directory sizes are meaningless
 
1077
            else:
 
1078
                self.assertTrue(S_ISREG(st.st_mode))
 
1079
                self.assertEqual(size, st.st_size)
 
1080
 
 
1081
        remote_stats = list(t.stat_multi(paths))
 
1082
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1083
 
 
1084
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
1085
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
1086
 
 
1087
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1088
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
1089
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
1090
        subdir = t.clone('subdir')
 
1091
        st = subdir.stat('./file')
 
1092
        st = subdir.stat('.')
 
1093
 
 
1094
    def test_hardlink(self):
 
1095
        from stat import ST_NLINK
 
1096
 
 
1097
        t = self.get_transport()
 
1098
 
 
1099
        source_name = "original_target"
 
1100
        link_name = "target_link"
 
1101
 
 
1102
        self.build_tree([source_name], transport=t)
 
1103
 
 
1104
        try:
 
1105
            t.hardlink(source_name, link_name)
 
1106
 
 
1107
            self.assertTrue(t.has(source_name))
 
1108
            self.assertTrue(t.has(link_name))
 
1109
 
 
1110
            st = t.stat(link_name)
 
1111
            self.assertEqual(st[ST_NLINK], 2)
 
1112
        except TransportNotPossible:
 
1113
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1114
                              self._server.__class__)
 
1115
 
 
1116
    def test_symlink(self):
 
1117
        from stat import S_ISLNK
 
1118
 
 
1119
        t = self.get_transport()
 
1120
 
 
1121
        source_name = "original_target"
 
1122
        link_name = "target_link"
 
1123
 
 
1124
        self.build_tree([source_name], transport=t)
 
1125
 
 
1126
        try:
 
1127
            t.symlink(source_name, link_name)
 
1128
 
 
1129
            self.assertTrue(t.has(source_name))
 
1130
            self.assertTrue(t.has(link_name))
 
1131
 
 
1132
            st = t.stat(link_name)
 
1133
            self.assertTrue(S_ISLNK(st.st_mode),
 
1134
                "expected symlink, got mode %o" % st.st_mode)
 
1135
        except TransportNotPossible:
 
1136
            raise TestSkipped("Transport %s does not support symlinks." %
 
1137
                              self._server.__class__)
 
1138
        except IOError:
 
1139
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1140
 
 
1141
    def test_list_dir(self):
 
1142
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1143
        t = self.get_transport()
 
1144
 
 
1145
        if not t.listable():
 
1146
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1147
            return
 
1148
 
 
1149
        def sorted_list(d, transport):
 
1150
            l = list(transport.list_dir(d))
 
1151
            l.sort()
 
1152
            return l
 
1153
 
 
1154
        self.assertEqual([], sorted_list('.', t))
 
1155
        # c2 is precisely one letter longer than c here to test that
 
1156
        # suffixing is not confused.
 
1157
        # a%25b checks that quoting is done consistently across transports
 
1158
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1159
 
 
1160
        if not t.is_readonly():
 
1161
            self.build_tree(tree_names, transport=t)
 
1162
        else:
 
1163
            self.build_tree(tree_names)
 
1164
 
 
1165
        self.assertEqual(
 
1166
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1167
        self.assertEqual(
 
1168
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1169
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1170
 
 
1171
        # Cloning the transport produces an equivalent listing
 
1172
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1173
 
 
1174
        if not t.is_readonly():
 
1175
            t.delete('c/d')
 
1176
            t.delete('b')
 
1177
        else:
 
1178
            os.unlink('c/d')
 
1179
            os.unlink('b')
 
1180
 
 
1181
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1182
        self.assertEqual(['e'], sorted_list('c', t))
 
1183
 
 
1184
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1185
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1186
        # 'a' is a file, list_dir should raise an error
 
1187
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1188
 
 
1189
    def test_list_dir_result_is_url_escaped(self):
 
1190
        t = self.get_transport()
 
1191
        if not t.listable():
 
1192
            raise TestSkipped("transport not listable")
 
1193
 
 
1194
        if not t.is_readonly():
 
1195
            self.build_tree(['a/', 'a/%'], transport=t)
 
1196
        else:
 
1197
            self.build_tree(['a/', 'a/%'])
 
1198
 
 
1199
        names = list(t.list_dir('a'))
 
1200
        self.assertEqual(['%25'], names)
 
1201
        self.assertIsInstance(names[0], str)
 
1202
 
 
1203
    def test_clone_preserve_info(self):
 
1204
        t1 = self.get_transport()
 
1205
        if not isinstance(t1, ConnectedTransport):
 
1206
            raise TestSkipped("not a connected transport")
 
1207
 
 
1208
        t2 = t1.clone('subdir')
 
1209
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1210
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1211
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1212
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1213
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
 
1214
 
 
1215
    def test__reuse_for(self):
 
1216
        t = self.get_transport()
 
1217
        if not isinstance(t, ConnectedTransport):
 
1218
            raise TestSkipped("not a connected transport")
 
1219
 
 
1220
        def new_url(scheme=None, user=None, password=None,
 
1221
                    host=None, port=None, path=None):
 
1222
            """Build a new url from t.base changing only parts of it.
 
1223
 
 
1224
            Only the parameters different from None will be changed.
 
1225
            """
 
1226
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1227
            if user     is None: user     = t._parsed_url.user
 
1228
            if password is None: password = t._parsed_url.password
 
1229
            if user     is None: user     = t._parsed_url.user
 
1230
            if host     is None: host     = t._parsed_url.host
 
1231
            if port     is None: port     = t._parsed_url.port
 
1232
            if path     is None: path     = t._parsed_url.path
 
1233
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1234
 
 
1235
        if t._parsed_url.scheme == 'ftp':
 
1236
            scheme = 'sftp'
 
1237
        else:
 
1238
            scheme = 'ftp'
 
1239
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1240
        if t._parsed_url.user == 'me':
 
1241
            user = 'you'
 
1242
        else:
 
1243
            user = 'me'
 
1244
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1245
        # passwords are not taken into account because:
 
1246
        # - it makes no sense to have two different valid passwords for the
 
1247
        #   same user
 
1248
        # - _password in ConnectedTransport is intended to collect what the
 
1249
        #   user specified from the command-line and there are cases where the
 
1250
        #   new url can contain no password (if the url was built from an
 
1251
        #   existing transport.base for example)
 
1252
        # - password are considered part of the credentials provided at
 
1253
        #   connection creation time and as such may not be present in the url
 
1254
        #   (they may be typed by the user when prompted for example)
 
1255
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1256
        # We will not connect, we can use a invalid host
 
1257
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1258
        if t._parsed_url.port == 1234:
 
1259
            port = 4321
 
1260
        else:
 
1261
            port = 1234
 
1262
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1263
        # No point in trying to reuse a transport for a local URL
 
1264
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1265
 
 
1266
    def test_connection_sharing(self):
 
1267
        t = self.get_transport()
 
1268
        if not isinstance(t, ConnectedTransport):
 
1269
            raise TestSkipped("not a connected transport")
 
1270
 
 
1271
        c = t.clone('subdir')
 
1272
        # Some transports will create the connection  only when needed
 
1273
        t.has('surely_not') # Force connection
 
1274
        self.assertIs(t._get_connection(), c._get_connection())
 
1275
 
 
1276
        # Temporary failure, we need to create a new dummy connection
 
1277
        new_connection = None
 
1278
        t._set_connection(new_connection)
 
1279
        # Check that both transports use the same connection
 
1280
        self.assertIs(new_connection, t._get_connection())
 
1281
        self.assertIs(new_connection, c._get_connection())
 
1282
 
 
1283
    def test_reuse_connection_for_various_paths(self):
 
1284
        t = self.get_transport()
 
1285
        if not isinstance(t, ConnectedTransport):
 
1286
            raise TestSkipped("not a connected transport")
 
1287
 
 
1288
        t.has('surely_not') # Force connection
 
1289
        self.assertIsNot(None, t._get_connection())
 
1290
 
 
1291
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1292
        self.assertIsNot(t, subdir)
 
1293
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1294
 
 
1295
        home = subdir._reuse_for(t.base + 'home')
 
1296
        self.assertIs(t._get_connection(), home._get_connection())
 
1297
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1298
 
 
1299
    def test_clone(self):
 
1300
        # TODO: Test that clone moves up and down the filesystem
 
1301
        t1 = self.get_transport()
 
1302
 
 
1303
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1304
 
 
1305
        self.assertTrue(t1.has('a'))
 
1306
        self.assertTrue(t1.has('b/c'))
 
1307
        self.assertFalse(t1.has('c'))
 
1308
 
 
1309
        t2 = t1.clone('b')
 
1310
        self.assertEqual(t1.base + 'b/', t2.base)
 
1311
 
 
1312
        self.assertTrue(t2.has('c'))
 
1313
        self.assertFalse(t2.has('a'))
 
1314
 
 
1315
        t3 = t2.clone('..')
 
1316
        self.assertTrue(t3.has('a'))
 
1317
        self.assertFalse(t3.has('c'))
 
1318
 
 
1319
        self.assertFalse(t1.has('b/d'))
 
1320
        self.assertFalse(t2.has('d'))
 
1321
        self.assertFalse(t3.has('b/d'))
 
1322
 
 
1323
        if t1.is_readonly():
 
1324
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1325
        else:
 
1326
            t2.put_bytes('d', 'newfile\n')
 
1327
 
 
1328
        self.assertTrue(t1.has('b/d'))
 
1329
        self.assertTrue(t2.has('d'))
 
1330
        self.assertTrue(t3.has('b/d'))
 
1331
 
 
1332
    def test_clone_to_root(self):
 
1333
        orig_transport = self.get_transport()
 
1334
        # Repeatedly go up to a parent directory until we're at the root
 
1335
        # directory of this transport
 
1336
        root_transport = orig_transport
 
1337
        new_transport = root_transport.clone("..")
 
1338
        # as we are walking up directories, the path must be
 
1339
        # growing less, except at the top
 
1340
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1341
            or new_transport.base == root_transport.base)
 
1342
        while new_transport.base != root_transport.base:
 
1343
            root_transport = new_transport
 
1344
            new_transport = root_transport.clone("..")
 
1345
            # as we are walking up directories, the path must be
 
1346
            # growing less, except at the top
 
1347
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1348
                or new_transport.base == root_transport.base)
 
1349
 
 
1350
        # Cloning to "/" should take us to exactly the same location.
 
1351
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1352
        # the abspath of "/" from the original transport should be the same
 
1353
        # as the base at the root:
 
1354
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1355
 
 
1356
        # At the root, the URL must still end with / as its a directory
 
1357
        self.assertEqual(root_transport.base[-1], '/')
 
1358
 
 
1359
    def test_clone_from_root(self):
 
1360
        """At the root, cloning to a simple dir should just do string append."""
 
1361
        orig_transport = self.get_transport()
 
1362
        root_transport = orig_transport.clone('/')
 
1363
        self.assertEqual(root_transport.base + '.bzr/',
 
1364
            root_transport.clone('.bzr').base)
 
1365
 
 
1366
    def test_base_url(self):
 
1367
        t = self.get_transport()
 
1368
        self.assertEqual('/', t.base[-1])
 
1369
 
 
1370
    def test_relpath(self):
 
1371
        t = self.get_transport()
 
1372
        self.assertEqual('', t.relpath(t.base))
 
1373
        # base ends with /
 
1374
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1375
        # subdirs which don't exist should still give relpaths.
 
1376
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1377
        # trailing slash should be the same.
 
1378
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1379
 
 
1380
    def test_relpath_at_root(self):
 
1381
        t = self.get_transport()
 
1382
        # clone all the way to the top
 
1383
        new_transport = t.clone('..')
 
1384
        while new_transport.base != t.base:
 
1385
            t = new_transport
 
1386
            new_transport = t.clone('..')
 
1387
        # we must be able to get a relpath below the root
 
1388
        self.assertEqual('', t.relpath(t.base))
 
1389
        # and a deeper one should work too
 
1390
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1391
 
 
1392
    def test_abspath(self):
 
1393
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1394
        # that have aliasing problems like symlinks should go in backend
 
1395
        # specific test cases.
 
1396
        transport = self.get_transport()
 
1397
 
 
1398
        self.assertEqual(transport.base + 'relpath',
 
1399
                         transport.abspath('relpath'))
 
1400
 
 
1401
        # This should work without raising an error.
 
1402
        transport.abspath("/")
 
1403
 
 
1404
        # the abspath of "/" and "/foo/.." should result in the same location
 
1405
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1406
 
 
1407
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1408
                         transport.abspath("/foo"))
 
1409
 
 
1410
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1411
    def test_win32_abspath(self):
 
1412
        # Note: we tried to set sys.platform='win32' so we could test on
 
1413
        # other platforms too, but then osutils does platform specific
 
1414
        # things at import time which defeated us...
 
1415
        if sys.platform != 'win32':
 
1416
            raise TestSkipped(
 
1417
                'Testing drive letters in abspath implemented only for win32')
 
1418
 
 
1419
        # smoke test for abspath on win32.
 
1420
        # a transport based on 'file:///' never fully qualifies the drive.
 
1421
        transport = _mod_transport.get_transport_from_url("file:///")
 
1422
        self.assertEqual(transport.abspath("/"), "file:///")
 
1423
 
 
1424
        # but a transport that starts with a drive spec must keep it.
 
1425
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1426
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1427
 
 
1428
    def test_local_abspath(self):
 
1429
        transport = self.get_transport()
 
1430
        try:
 
1431
            p = transport.local_abspath('.')
 
1432
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1433
            # should be formattable
 
1434
            s = str(e)
 
1435
        else:
 
1436
            self.assertEqual(getcwd(), p)
 
1437
 
 
1438
    def test_abspath_at_root(self):
 
1439
        t = self.get_transport()
 
1440
        # clone all the way to the top
 
1441
        new_transport = t.clone('..')
 
1442
        while new_transport.base != t.base:
 
1443
            t = new_transport
 
1444
            new_transport = t.clone('..')
 
1445
        # we must be able to get a abspath of the root when we ask for
 
1446
        # t.abspath('..') - this due to our choice that clone('..')
 
1447
        # should return the root from the root, combined with the desire that
 
1448
        # the url from clone('..') and from abspath('..') should be the same.
 
1449
        self.assertEqual(t.base, t.abspath('..'))
 
1450
        # '' should give us the root
 
1451
        self.assertEqual(t.base, t.abspath(''))
 
1452
        # and a path should append to the url
 
1453
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1454
 
 
1455
    def test_iter_files_recursive(self):
 
1456
        transport = self.get_transport()
 
1457
        if not transport.listable():
 
1458
            self.assertRaises(TransportNotPossible,
 
1459
                              transport.iter_files_recursive)
 
1460
            return
 
1461
        self.build_tree(['isolated/',
 
1462
                         'isolated/dir/',
 
1463
                         'isolated/dir/foo',
 
1464
                         'isolated/dir/bar',
 
1465
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1466
                         'isolated/bar'],
 
1467
                        transport=transport)
 
1468
        paths = set(transport.iter_files_recursive())
 
1469
        # nb the directories are not converted
 
1470
        self.assertEqual(paths,
 
1471
                    set(['isolated/dir/foo',
 
1472
                         'isolated/dir/bar',
 
1473
                         'isolated/dir/b%2525z',
 
1474
                         'isolated/bar']))
 
1475
        sub_transport = transport.clone('isolated')
 
1476
        paths = set(sub_transport.iter_files_recursive())
 
1477
        self.assertEqual(paths,
 
1478
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1479
 
 
1480
    def test_copy_tree(self):
 
1481
        # TODO: test file contents and permissions are preserved. This test was
 
1482
        # added just to ensure that quoting was handled correctly.
 
1483
        # -- David Allouche 2006-08-11
 
1484
        transport = self.get_transport()
 
1485
        if not transport.listable():
 
1486
            self.assertRaises(TransportNotPossible,
 
1487
                              transport.iter_files_recursive)
 
1488
            return
 
1489
        if transport.is_readonly():
 
1490
            return
 
1491
        self.build_tree(['from/',
 
1492
                         'from/dir/',
 
1493
                         'from/dir/foo',
 
1494
                         'from/dir/bar',
 
1495
                         'from/dir/b%25z', # make sure quoting is correct
 
1496
                         'from/bar'],
 
1497
                        transport=transport)
 
1498
        transport.copy_tree('from', 'to')
 
1499
        paths = set(transport.iter_files_recursive())
 
1500
        self.assertEqual(paths,
 
1501
                    set(['from/dir/foo',
 
1502
                         'from/dir/bar',
 
1503
                         'from/dir/b%2525z',
 
1504
                         'from/bar',
 
1505
                         'to/dir/foo',
 
1506
                         'to/dir/bar',
 
1507
                         'to/dir/b%2525z',
 
1508
                         'to/bar',]))
 
1509
 
 
1510
    def test_copy_tree_to_transport(self):
 
1511
        transport = self.get_transport()
 
1512
        if not transport.listable():
 
1513
            self.assertRaises(TransportNotPossible,
 
1514
                              transport.iter_files_recursive)
 
1515
            return
 
1516
        if transport.is_readonly():
 
1517
            return
 
1518
        self.build_tree(['from/',
 
1519
                         'from/dir/',
 
1520
                         'from/dir/foo',
 
1521
                         'from/dir/bar',
 
1522
                         'from/dir/b%25z', # make sure quoting is correct
 
1523
                         'from/bar'],
 
1524
                        transport=transport)
 
1525
        from_transport = transport.clone('from')
 
1526
        to_transport = transport.clone('to')
 
1527
        to_transport.ensure_base()
 
1528
        from_transport.copy_tree_to_transport(to_transport)
 
1529
        paths = set(transport.iter_files_recursive())
 
1530
        self.assertEqual(paths,
 
1531
                    set(['from/dir/foo',
 
1532
                         'from/dir/bar',
 
1533
                         'from/dir/b%2525z',
 
1534
                         'from/bar',
 
1535
                         'to/dir/foo',
 
1536
                         'to/dir/bar',
 
1537
                         'to/dir/b%2525z',
 
1538
                         'to/bar',]))
 
1539
 
 
1540
    def test_unicode_paths(self):
 
1541
        """Test that we can read/write files with Unicode names."""
 
1542
        t = self.get_transport()
 
1543
 
 
1544
        # With FAT32 and certain encodings on win32
 
1545
        # '\xe5' and '\xe4' actually map to the same file
 
1546
        # adding a suffix kicks in the 'preserving but insensitive'
 
1547
        # route, and maintains the right files
 
1548
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1549
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1550
                 u'\u017d', # Z with umlat iso-8859-2
 
1551
                 u'\u062c', # Arabic j
 
1552
                 u'\u0410', # Russian A
 
1553
                 u'\u65e5', # Kanji person
 
1554
                ]
 
1555
 
 
1556
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1557
        if no_unicode_support:
 
1558
            self.knownFailure("test server cannot handle unicode paths")
 
1559
 
 
1560
        try:
 
1561
            self.build_tree(files, transport=t, line_endings='binary')
 
1562
        except UnicodeError:
 
1563
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1564
 
 
1565
        # A plain unicode string is not a valid url
 
1566
        for fname in files:
 
1567
            self.assertRaises(InvalidURL, t.get, fname)
 
1568
 
 
1569
        for fname in files:
 
1570
            fname_utf8 = fname.encode('utf-8')
 
1571
            contents = 'contents of %s\n' % (fname_utf8,)
 
1572
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1573
 
 
1574
    def test_connect_twice_is_same_content(self):
 
1575
        # check that our server (whatever it is) is accessible reliably
 
1576
        # via get_transport and multiple connections share content.
 
1577
        transport = self.get_transport()
 
1578
        if transport.is_readonly():
 
1579
            return
 
1580
        transport.put_bytes('foo', 'bar')
 
1581
        transport3 = self.get_transport()
 
1582
        self.check_transport_contents('bar', transport3, 'foo')
 
1583
 
 
1584
        # now opening at a relative url should give use a sane result:
 
1585
        transport.mkdir('newdir')
 
1586
        transport5 = self.get_transport('newdir')
 
1587
        transport6 = transport5.clone('..')
 
1588
        self.check_transport_contents('bar', transport6, 'foo')
 
1589
 
 
1590
    def test_lock_write(self):
 
1591
        """Test transport-level write locks.
 
1592
 
 
1593
        These are deprecated and transports may decline to support them.
 
1594
        """
 
1595
        transport = self.get_transport()
 
1596
        if transport.is_readonly():
 
1597
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1598
            return
 
1599
        transport.put_bytes('lock', '')
 
1600
        try:
 
1601
            lock = transport.lock_write('lock')
 
1602
        except TransportNotPossible:
 
1603
            return
 
1604
        # TODO make this consistent on all platforms:
 
1605
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1606
        lock.unlock()
 
1607
 
 
1608
    def test_lock_read(self):
 
1609
        """Test transport-level read locks.
 
1610
 
 
1611
        These are deprecated and transports may decline to support them.
 
1612
        """
 
1613
        transport = self.get_transport()
 
1614
        if transport.is_readonly():
 
1615
            file('lock', 'w').close()
 
1616
        else:
 
1617
            transport.put_bytes('lock', '')
 
1618
        try:
 
1619
            lock = transport.lock_read('lock')
 
1620
        except TransportNotPossible:
 
1621
            return
 
1622
        # TODO make this consistent on all platforms:
 
1623
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1624
        lock.unlock()
 
1625
 
 
1626
    def test_readv(self):
 
1627
        transport = self.get_transport()
 
1628
        if transport.is_readonly():
 
1629
            with file('a', 'w') as f: f.write('0123456789')
 
1630
        else:
 
1631
            transport.put_bytes('a', '0123456789')
 
1632
 
 
1633
        d = list(transport.readv('a', ((0, 1),)))
 
1634
        self.assertEqual(d[0], (0, '0'))
 
1635
 
 
1636
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1637
        self.assertEqual(d[0], (0, '0'))
 
1638
        self.assertEqual(d[1], (1, '1'))
 
1639
        self.assertEqual(d[2], (3, '34'))
 
1640
        self.assertEqual(d[3], (9, '9'))
 
1641
 
 
1642
    def test_readv_out_of_order(self):
 
1643
        transport = self.get_transport()
 
1644
        if transport.is_readonly():
 
1645
            with file('a', 'w') as f: f.write('0123456789')
 
1646
        else:
 
1647
            transport.put_bytes('a', '01234567890')
 
1648
 
 
1649
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1650
        self.assertEqual(d[0], (1, '1'))
 
1651
        self.assertEqual(d[1], (9, '9'))
 
1652
        self.assertEqual(d[2], (0, '0'))
 
1653
        self.assertEqual(d[3], (3, '34'))
 
1654
 
 
1655
    def test_readv_with_adjust_for_latency(self):
 
1656
        transport = self.get_transport()
 
1657
        # the adjust for latency flag expands the data region returned
 
1658
        # according to a per-transport heuristic, so testing is a little
 
1659
        # tricky as we need more data than the largest combining that our
 
1660
        # transports do. To accomodate this we generate random data and cross
 
1661
        # reference the returned data with the random data. To avoid doing
 
1662
        # multiple large random byte look ups we do several tests on the same
 
1663
        # backing data.
 
1664
        content = osutils.rand_bytes(200*1024)
 
1665
        content_size = len(content)
 
1666
        if transport.is_readonly():
 
1667
            self.build_tree_contents([('a', content)])
 
1668
        else:
 
1669
            transport.put_bytes('a', content)
 
1670
        def check_result_data(result_vector):
 
1671
            for item in result_vector:
 
1672
                data_len = len(item[1])
 
1673
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1674
 
 
1675
        # start corner case
 
1676
        result = list(transport.readv('a', ((0, 30),),
 
1677
            adjust_for_latency=True, upper_limit=content_size))
 
1678
        # we expect 1 result, from 0, to something > 30
 
1679
        self.assertEqual(1, len(result))
 
1680
        self.assertEqual(0, result[0][0])
 
1681
        self.assertTrue(len(result[0][1]) >= 30)
 
1682
        check_result_data(result)
 
1683
        # end of file corner case
 
1684
        result = list(transport.readv('a', ((204700, 100),),
 
1685
            adjust_for_latency=True, upper_limit=content_size))
 
1686
        # we expect 1 result, from 204800- its length, to the end
 
1687
        self.assertEqual(1, len(result))
 
1688
        data_len = len(result[0][1])
 
1689
        self.assertEqual(204800-data_len, result[0][0])
 
1690
        self.assertTrue(data_len >= 100)
 
1691
        check_result_data(result)
 
1692
        # out of order ranges are made in order
 
1693
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1694
            adjust_for_latency=True, upper_limit=content_size))
 
1695
        # we expect 2 results, in order, start and end.
 
1696
        self.assertEqual(2, len(result))
 
1697
        # start
 
1698
        data_len = len(result[0][1])
 
1699
        self.assertEqual(0, result[0][0])
 
1700
        self.assertTrue(data_len >= 30)
 
1701
        # end
 
1702
        data_len = len(result[1][1])
 
1703
        self.assertEqual(204800-data_len, result[1][0])
 
1704
        self.assertTrue(data_len >= 100)
 
1705
        check_result_data(result)
 
1706
        # close ranges get combined (even if out of order)
 
1707
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1708
            result = list(transport.readv('a', request_vector,
 
1709
                adjust_for_latency=True, upper_limit=content_size))
 
1710
            self.assertEqual(1, len(result))
 
1711
            data_len = len(result[0][1])
 
1712
            # minimum length is from 400 to 1034 - 634
 
1713
            self.assertTrue(data_len >= 634)
 
1714
            # must contain the region 400 to 1034
 
1715
            self.assertTrue(result[0][0] <= 400)
 
1716
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1717
            check_result_data(result)
 
1718
 
 
1719
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1720
        transport = self.get_transport()
 
1721
        # test from observed failure case.
 
1722
        if transport.is_readonly():
 
1723
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1724
        else:
 
1725
            transport.put_bytes('a', 'a'*1024*1024)
 
1726
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1727
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1728
            (465373, 800), (947422, 800)]
 
1729
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1730
        found_items = [False]*9
 
1731
        for pos, (start, length) in enumerate(broken_vector):
 
1732
            # check the range is covered by the result
 
1733
            for offset, data in results:
 
1734
                if offset <= start and start + length <= offset + len(data):
 
1735
                    found_items[pos] = True
 
1736
        self.assertEqual([True]*9, found_items)
 
1737
 
 
1738
    def test_get_with_open_write_stream_sees_all_content(self):
 
1739
        t = self.get_transport()
 
1740
        if t.is_readonly():
 
1741
            return
 
1742
        handle = t.open_write_stream('foo')
 
1743
        try:
 
1744
            handle.write('bcd')
 
1745
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1746
        finally:
 
1747
            handle.close()
 
1748
 
 
1749
    def test_get_smart_medium(self):
 
1750
        """All transports must either give a smart medium, or know they can't.
 
1751
        """
 
1752
        transport = self.get_transport()
 
1753
        try:
 
1754
            client_medium = transport.get_smart_medium()
 
1755
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1756
        except errors.NoSmartMedium:
 
1757
            # as long as we got it we're fine
 
1758
            pass
 
1759
 
 
1760
    def test_readv_short_read(self):
 
1761
        transport = self.get_transport()
 
1762
        if transport.is_readonly():
 
1763
            with file('a', 'w') as f: f.write('0123456789')
 
1764
        else:
 
1765
            transport.put_bytes('a', '01234567890')
 
1766
 
 
1767
        # This is intentionally reading off the end of the file
 
1768
        # since we are sure that it cannot get there
 
1769
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1770
                               # Can be raised by paramiko
 
1771
                               AssertionError),
 
1772
                              transport.readv, 'a', [(1,1), (8,10)])
 
1773
 
 
1774
        # This is trying to seek past the end of the file, it should
 
1775
        # also raise a special error
 
1776
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1777
                              transport.readv, 'a', [(12,2)])
 
1778
 
 
1779
    def test_no_segment_parameters(self):
 
1780
        """Segment parameters should be stripped and stored in
 
1781
        transport.segment_parameters."""
 
1782
        transport = self.get_transport("foo")
 
1783
        self.assertEquals({}, transport.get_segment_parameters())
 
1784
 
 
1785
    def test_segment_parameters(self):
 
1786
        """Segment parameters should be stripped and stored in
 
1787
        transport.get_segment_parameters()."""
 
1788
        base_url = self._server.get_url()
 
1789
        parameters = {"key1": "val1", "key2": "val2"}
 
1790
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1791
        transport = _mod_transport.get_transport_from_url(url)
 
1792
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))