~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(vila) Forbid more operations on ReadonlyTransportDecorator (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2015 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import itertools
 
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
 
27
import stat
 
28
import sys
 
29
 
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
 
36
    urlutils,
 
37
    )
 
38
from bzrlib.errors import (ConnectionError,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           NoSuchFile,
 
42
                           PathError,
 
43
                           TransportNotPossible,
 
44
                           )
 
45
from bzrlib.osutils import getcwd
 
46
from bzrlib.smart import medium
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
 
53
from bzrlib.tests.test_transport import TestTransportImplementation
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(standard_tests, module, loader):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
 
97
 
 
98
 
 
99
class TransportTests(TestTransportImplementation):
 
100
 
 
101
    def setUp(self):
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
104
 
 
105
    def check_transport_contents(self, content, transport, relpath):
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
 
149
    def test_has(self):
 
150
        t = self.get_transport()
 
151
 
 
152
        files = ['a', 'b', 'e', 'g', '%']
 
153
        self.build_tree(files, transport=t)
 
154
        self.assertEqual(True, t.has('a'))
 
155
        self.assertEqual(False, t.has('c'))
 
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
 
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
 
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
170
 
 
171
    def test_has_root_works(self):
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
            raise TestNotApplicable(
 
174
                "SmartTCPServer_for_testing intentionally does not allow "
 
175
                "access to /.")
 
176
        current_transport = self.get_transport()
 
177
        self.assertTrue(current_transport.has('/'))
 
178
        root = current_transport.clone('/')
 
179
        self.assertTrue(root.has(''))
 
180
 
 
181
    def test_get(self):
 
182
        t = self.get_transport()
 
183
 
 
184
        files = ['a', 'b', 'e', 'g']
 
185
        contents = ['contents of a\n',
 
186
                    'contents of b\n',
 
187
                    'contents of e\n',
 
188
                    'contents of g\n',
 
189
                    ]
 
190
        self.build_tree(files, transport=t, line_endings='binary')
 
191
        self.check_transport_contents('contents of a\n', t, 'a')
 
192
        content_f = t.get_multi(files)
 
193
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
194
        # evaluate their inputs, the transport requests should be issued and
 
195
        # handled sequentially (we don't want to force transport to buffer).
 
196
        for content, f in itertools.izip(contents, content_f):
 
197
            self.assertEqual(content, f.read())
 
198
 
 
199
        content_f = t.get_multi(iter(files))
 
200
        # Use itertools.izip() for the same reason
 
201
        for content, f in itertools.izip(contents, content_f):
 
202
            self.assertEqual(content, f.read())
 
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
 
211
        self.assertRaises(NoSuchFile, t.get, 'c')
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
 
223
 
 
224
    def test_get_directory_read_gives_ReadError(self):
 
225
        """consistent errors for read() on a file returned by get()."""
 
226
        t = self.get_transport()
 
227
        if t.is_readonly():
 
228
            self.build_tree(['a directory/'])
 
229
        else:
 
230
            t.mkdir('a%20directory')
 
231
        # getting the file must either work or fail with a PathError
 
232
        try:
 
233
            a_file = t.get('a%20directory')
 
234
        except (errors.PathError, errors.RedirectRequested):
 
235
            # early failure return immediately.
 
236
            return
 
237
        # having got a file, read() must either work (i.e. http reading a dir
 
238
        # listing) or fail with ReadError
 
239
        try:
 
240
            a_file.read()
 
241
        except errors.ReadError:
 
242
            pass
 
243
 
 
244
    def test_get_bytes(self):
 
245
        t = self.get_transport()
 
246
 
 
247
        files = ['a', 'b', 'e', 'g']
 
248
        contents = ['contents of a\n',
 
249
                    'contents of b\n',
 
250
                    'contents of e\n',
 
251
                    'contents of g\n',
 
252
                    ]
 
253
        self.build_tree(files, transport=t, line_endings='binary')
 
254
        self.check_transport_contents('contents of a\n', t, 'a')
 
255
 
 
256
        for content, fname in zip(contents, files):
 
257
            self.assertEqual(content, t.get_bytes(fname))
 
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
 
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
262
 
 
263
    def test_get_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
        finally:
 
272
            handle.close()
 
273
 
 
274
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
275
        t = self.get_transport()
 
276
        if t.is_readonly():
 
277
            return
 
278
        handle = t.open_write_stream('foo')
 
279
        try:
 
280
            handle.write('b')
 
281
            self.assertEqual('b', t.get_bytes('foo'))
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
 
287
        finally:
 
288
            handle.close()
 
289
 
 
290
    def test_put_bytes(self):
 
291
        t = self.get_transport()
 
292
 
 
293
        if t.is_readonly():
 
294
            self.assertRaises(TransportNotPossible,
 
295
                    t.put_bytes, 'a', 'some text for a\n')
 
296
            return
 
297
 
 
298
        t.put_bytes('a', 'some text for a\n')
 
299
        self.assertTrue(t.has('a'))
 
300
        self.check_transport_contents('some text for a\n', t, 'a')
 
301
 
 
302
        # The contents should be overwritten
 
303
        t.put_bytes('a', 'new text for a\n')
 
304
        self.check_transport_contents('new text for a\n', t, 'a')
 
305
 
 
306
        self.assertRaises(NoSuchFile,
 
307
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
308
 
 
309
    def test_put_bytes_non_atomic(self):
 
310
        t = self.get_transport()
 
311
 
 
312
        if t.is_readonly():
 
313
            self.assertRaises(TransportNotPossible,
 
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
315
            return
 
316
 
 
317
        self.assertFalse(t.has('a'))
 
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
319
        self.assertTrue(t.has('a'))
 
320
        self.check_transport_contents('some text for a\n', t, 'a')
 
321
        # Put also replaces contents
 
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
323
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
324
 
 
325
        # Make sure we can create another file
 
326
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
327
        # And overwrite 'a' with empty contents
 
328
        t.put_bytes_non_atomic('a', '')
 
329
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
330
        self.check_transport_contents('', t, 'a')
 
331
 
 
332
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
333
                                       'contents\n')
 
334
        # Now test the create_parent flag
 
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
336
                                       'contents\n')
 
337
        self.assertFalse(t.has('dir/a'))
 
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
339
                               create_parent_dir=True)
 
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
341
 
 
342
        # But we still get NoSuchFile if we can't make the parent dir
 
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
344
                                       'contents\n',
 
345
                                       create_parent_dir=True)
 
346
 
 
347
    def test_put_bytes_permissions(self):
 
348
        t = self.get_transport()
 
349
 
 
350
        if t.is_readonly():
 
351
            return
 
352
        if not t._can_roundtrip_unix_modebits():
 
353
            # Can't roundtrip, so no need to run this test
 
354
            return
 
355
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
356
        self.assertTransportMode(t, 'mode644', 0644)
 
357
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
358
        self.assertTransportMode(t, 'mode666', 0666)
 
359
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
360
        self.assertTransportMode(t, 'mode600', 0600)
 
361
        # Yes, you can put_bytes a file such that it becomes readonly
 
362
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
363
        self.assertTransportMode(t, 'mode400', 0400)
 
364
 
 
365
        # The default permissions should be based on the current umask
 
366
        umask = osutils.get_umask()
 
367
        t.put_bytes('nomode', 'test text\n', mode=None)
 
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
369
 
 
370
    def test_put_bytes_non_atomic_permissions(self):
 
371
        t = self.get_transport()
 
372
 
 
373
        if t.is_readonly():
 
374
            return
 
375
        if not t._can_roundtrip_unix_modebits():
 
376
            # Can't roundtrip, so no need to run this test
 
377
            return
 
378
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
379
        self.assertTransportMode(t, 'mode644', 0644)
 
380
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
381
        self.assertTransportMode(t, 'mode666', 0666)
 
382
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
383
        self.assertTransportMode(t, 'mode600', 0600)
 
384
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
385
        self.assertTransportMode(t, 'mode400', 0400)
 
386
 
 
387
        # The default permissions should be based on the current umask
 
388
        umask = osutils.get_umask()
 
389
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
390
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
391
 
 
392
        # We should also be able to set the mode for a parent directory
 
393
        # when it is created
 
394
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
395
                               dir_mode=0700, create_parent_dir=True)
 
396
        self.assertTransportMode(t, 'dir700', 0700)
 
397
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
398
                               dir_mode=0770, create_parent_dir=True)
 
399
        self.assertTransportMode(t, 'dir770', 0770)
 
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
401
                               dir_mode=0777, create_parent_dir=True)
 
402
        self.assertTransportMode(t, 'dir777', 0777)
 
403
 
 
404
    def test_put_file(self):
 
405
        t = self.get_transport()
 
406
 
 
407
        if t.is_readonly():
 
408
            self.assertRaises(TransportNotPossible,
 
409
                    t.put_file, 'a', StringIO('some text for a\n'))
 
410
            return
 
411
 
 
412
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        # put_file returns the length of the data written
 
414
        self.assertEqual(16, result)
 
415
        self.assertTrue(t.has('a'))
 
416
        self.check_transport_contents('some text for a\n', t, 'a')
 
417
        # Put also replaces contents
 
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        self.assertEqual(19, result)
 
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
421
        self.assertRaises(NoSuchFile,
 
422
                          t.put_file, 'path/doesnt/exist/c',
 
423
                              StringIO('contents'))
 
424
 
 
425
    def test_put_file_non_atomic(self):
 
426
        t = self.get_transport()
 
427
 
 
428
        if t.is_readonly():
 
429
            self.assertRaises(TransportNotPossible,
 
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
431
            return
 
432
 
 
433
        self.assertFalse(t.has('a'))
 
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
435
        self.assertTrue(t.has('a'))
 
436
        self.check_transport_contents('some text for a\n', t, 'a')
 
437
        # Put also replaces contents
 
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
439
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
440
 
 
441
        # Make sure we can create another file
 
442
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
443
        # And overwrite 'a' with empty contents
 
444
        t.put_file_non_atomic('a', StringIO(''))
 
445
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
446
        self.check_transport_contents('', t, 'a')
 
447
 
 
448
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
449
                                       StringIO('contents\n'))
 
450
        # Now test the create_parent flag
 
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
452
                                       StringIO('contents\n'))
 
453
        self.assertFalse(t.has('dir/a'))
 
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
455
                              create_parent_dir=True)
 
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
457
 
 
458
        # But we still get NoSuchFile if we can't make the parent dir
 
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
460
                                       StringIO('contents\n'),
 
461
                                       create_parent_dir=True)
 
462
 
 
463
    def test_put_file_permissions(self):
 
464
 
 
465
        t = self.get_transport()
 
466
 
 
467
        if t.is_readonly():
 
468
            return
 
469
        if not t._can_roundtrip_unix_modebits():
 
470
            # Can't roundtrip, so no need to run this test
 
471
            return
 
472
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
473
        self.assertTransportMode(t, 'mode644', 0644)
 
474
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
475
        self.assertTransportMode(t, 'mode666', 0666)
 
476
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
477
        self.assertTransportMode(t, 'mode600', 0600)
 
478
        # Yes, you can put a file such that it becomes readonly
 
479
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
480
        self.assertTransportMode(t, 'mode400', 0400)
 
481
        # The default permissions should be based on the current umask
 
482
        umask = osutils.get_umask()
 
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
485
 
 
486
    def test_put_file_non_atomic_permissions(self):
 
487
        t = self.get_transport()
 
488
 
 
489
        if t.is_readonly():
 
490
            return
 
491
        if not t._can_roundtrip_unix_modebits():
 
492
            # Can't roundtrip, so no need to run this test
 
493
            return
 
494
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
495
        self.assertTransportMode(t, 'mode644', 0644)
 
496
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
497
        self.assertTransportMode(t, 'mode666', 0666)
 
498
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
499
        self.assertTransportMode(t, 'mode600', 0600)
 
500
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
501
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
502
        self.assertTransportMode(t, 'mode400', 0400)
 
503
 
 
504
        # The default permissions should be based on the current umask
 
505
        umask = osutils.get_umask()
 
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
508
 
 
509
        # We should also be able to set the mode for a parent directory
 
510
        # when it is created
 
511
        sio = StringIO()
 
512
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
513
                              dir_mode=0700, create_parent_dir=True)
 
514
        self.assertTransportMode(t, 'dir700', 0700)
 
515
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
516
                              dir_mode=0770, create_parent_dir=True)
 
517
        self.assertTransportMode(t, 'dir770', 0770)
 
518
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
519
                              dir_mode=0777, create_parent_dir=True)
 
520
        self.assertTransportMode(t, 'dir777', 0777)
 
521
 
 
522
    def test_put_bytes_unicode(self):
 
523
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
524
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
525
        # (we don't want to encode unicode here at all, callers should be
 
526
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
527
        # compatibility.  At some point we should use a specific exception.
 
528
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
529
        t = self.get_transport()
 
530
        if t.is_readonly():
 
531
            return
 
532
        unicode_string = u'\u1234'
 
533
        self.assertRaises(
 
534
            (AssertionError, UnicodeEncodeError),
 
535
            t.put_bytes, 'foo', unicode_string)
 
536
 
 
537
    def test_put_file_unicode(self):
 
538
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
539
        # This situation can happen (and has) if code is careless about the type
 
540
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
541
        # cStringIO, because it never returns unicode from read.
 
542
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
543
        # raise, but we raise it for hysterical raisins.
 
544
        t = self.get_transport()
 
545
        if t.is_readonly():
 
546
            return
 
547
        unicode_file = pyStringIO(u'\u1234')
 
548
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
549
 
 
550
    def test_mkdir(self):
 
551
        t = self.get_transport()
 
552
 
 
553
        if t.is_readonly():
 
554
            # cannot mkdir on readonly transports. We're not testing for
 
555
            # cache coherency because cache behaviour is not currently
 
556
            # defined for the transport interface.
 
557
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
558
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
559
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
560
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
561
            return
 
562
        # Test mkdir
 
563
        t.mkdir('dir_a')
 
564
        self.assertEqual(t.has('dir_a'), True)
 
565
        self.assertEqual(t.has('dir_b'), False)
 
566
 
 
567
        t.mkdir('dir_b')
 
568
        self.assertEqual(t.has('dir_b'), True)
 
569
 
 
570
        t.mkdir_multi(['dir_c', 'dir_d'])
 
571
 
 
572
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
573
        self.assertEqual(list(t.has_multi(
 
574
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
575
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
576
            [True, True, True, False,
 
577
             True, True, True, True])
 
578
 
 
579
        # we were testing that a local mkdir followed by a transport
 
580
        # mkdir failed thusly, but given that we * in one process * do not
 
581
        # concurrently fiddle with disk dirs and then use transport to do
 
582
        # things, the win here seems marginal compared to the constraint on
 
583
        # the interface. RBC 20051227
 
584
        t.mkdir('dir_g')
 
585
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
586
 
 
587
        # Test get/put in sub-directories
 
588
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
589
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
590
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
591
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
592
 
 
593
        # mkdir of a dir with an absent parent
 
594
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
595
 
 
596
    def test_mkdir_permissions(self):
 
597
        t = self.get_transport()
 
598
        if t.is_readonly():
 
599
            return
 
600
        if not t._can_roundtrip_unix_modebits():
 
601
            # no sense testing on this transport
 
602
            return
 
603
        # Test mkdir with a mode
 
604
        t.mkdir('dmode755', mode=0755)
 
605
        self.assertTransportMode(t, 'dmode755', 0755)
 
606
        t.mkdir('dmode555', mode=0555)
 
607
        self.assertTransportMode(t, 'dmode555', 0555)
 
608
        t.mkdir('dmode777', mode=0777)
 
609
        self.assertTransportMode(t, 'dmode777', 0777)
 
610
        t.mkdir('dmode700', mode=0700)
 
611
        self.assertTransportMode(t, 'dmode700', 0700)
 
612
        t.mkdir_multi(['mdmode755'], mode=0755)
 
613
        self.assertTransportMode(t, 'mdmode755', 0755)
 
614
 
 
615
        # Default mode should be based on umask
 
616
        umask = osutils.get_umask()
 
617
        t.mkdir('dnomode', mode=None)
 
618
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
619
 
 
620
    def test_opening_a_file_stream_creates_file(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        handle = t.open_write_stream('foo')
 
625
        try:
 
626
            self.assertEqual('', t.get_bytes('foo'))
 
627
        finally:
 
628
            handle.close()
 
629
 
 
630
    def test_opening_a_file_stream_can_set_mode(self):
 
631
        t = self.get_transport()
 
632
        if t.is_readonly():
 
633
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
634
                              t.open_write_stream, 'foo')
 
635
            return
 
636
        if not t._can_roundtrip_unix_modebits():
 
637
            # Can't roundtrip, so no need to run this test
 
638
            return
 
639
 
 
640
        def check_mode(name, mode, expected):
 
641
            handle = t.open_write_stream(name, mode=mode)
 
642
            handle.close()
 
643
            self.assertTransportMode(t, name, expected)
 
644
        check_mode('mode644', 0644, 0644)
 
645
        check_mode('mode666', 0666, 0666)
 
646
        check_mode('mode600', 0600, 0600)
 
647
        # The default permissions should be based on the current umask
 
648
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
649
 
 
650
    def test_copy_to(self):
 
651
        # FIXME: test:   same server to same server (partly done)
 
652
        # same protocol two servers
 
653
        # and    different protocols (done for now except for MemoryTransport.
 
654
        # - RBC 20060122
 
655
 
 
656
        def simple_copy_files(transport_from, transport_to):
 
657
            files = ['a', 'b', 'c', 'd']
 
658
            self.build_tree(files, transport=transport_from)
 
659
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
660
            for f in files:
 
661
                self.check_transport_contents(transport_to.get_bytes(f),
 
662
                                              transport_from, f)
 
663
 
 
664
        t = self.get_transport()
 
665
        temp_transport = MemoryTransport('memory:///')
 
666
        simple_copy_files(t, temp_transport)
 
667
        if not t.is_readonly():
 
668
            t.mkdir('copy_to_simple')
 
669
            t2 = t.clone('copy_to_simple')
 
670
            simple_copy_files(t, t2)
 
671
 
 
672
 
 
673
        # Test that copying into a missing directory raises
 
674
        # NoSuchFile
 
675
        if t.is_readonly():
 
676
            self.build_tree(['e/', 'e/f'])
 
677
        else:
 
678
            t.mkdir('e')
 
679
            t.put_bytes('e/f', 'contents of e')
 
680
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
681
        temp_transport.mkdir('e')
 
682
        t.copy_to(['e/f'], temp_transport)
 
683
 
 
684
        del temp_transport
 
685
        temp_transport = MemoryTransport('memory:///')
 
686
 
 
687
        files = ['a', 'b', 'c', 'd']
 
688
        t.copy_to(iter(files), temp_transport)
 
689
        for f in files:
 
690
            self.check_transport_contents(temp_transport.get_bytes(f),
 
691
                                          t, f)
 
692
        del temp_transport
 
693
 
 
694
        for mode in (0666, 0644, 0600, 0400):
 
695
            temp_transport = MemoryTransport("memory:///")
 
696
            t.copy_to(files, temp_transport, mode=mode)
 
697
            for f in files:
 
698
                self.assertTransportMode(temp_transport, f, mode)
 
699
 
 
700
    def test_create_prefix(self):
 
701
        t = self.get_transport()
 
702
        sub = t.clone('foo').clone('bar')
 
703
        try:
 
704
            sub.create_prefix()
 
705
        except TransportNotPossible:
 
706
            self.assertTrue(t.is_readonly())
 
707
        else:
 
708
            self.assertTrue(t.has('foo/bar'))
 
709
 
 
710
    def test_append_file(self):
 
711
        t = self.get_transport()
 
712
 
 
713
        if t.is_readonly():
 
714
            self.assertRaises(TransportNotPossible,
 
715
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
716
            return
 
717
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
718
        t.put_bytes('b', 'contents\nfor b\n')
 
719
 
 
720
        self.assertEqual(20,
 
721
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
722
 
 
723
        self.check_transport_contents(
 
724
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
725
            t, 'a')
 
726
 
 
727
        # a file with no parent should fail..
 
728
        self.assertRaises(NoSuchFile,
 
729
                          t.append_file, 'missing/path', StringIO('content'))
 
730
 
 
731
        # And we can create new files, too
 
732
        self.assertEqual(0,
 
733
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
734
        self.check_transport_contents('some text\nfor a missing file\n',
 
735
                                      t, 'c')
 
736
 
 
737
    def test_append_bytes(self):
 
738
        t = self.get_transport()
 
739
 
 
740
        if t.is_readonly():
 
741
            self.assertRaises(TransportNotPossible,
 
742
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
743
            return
 
744
 
 
745
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
746
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
747
 
 
748
        self.assertEqual(20,
 
749
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
750
 
 
751
        self.check_transport_contents(
 
752
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
753
            t, 'a')
 
754
 
 
755
        # a file with no parent should fail..
 
756
        self.assertRaises(NoSuchFile,
 
757
                          t.append_bytes, 'missing/path', 'content')
 
758
 
 
759
    def test_append_multi(self):
 
760
        t = self.get_transport()
 
761
 
 
762
        if t.is_readonly():
 
763
            return
 
764
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
765
                         'add\nsome\nmore\ncontents\n')
 
766
        t.put_bytes('b', 'contents\nfor b\n')
 
767
 
 
768
        self.assertEqual((43, 15),
 
769
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
770
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
771
 
 
772
        self.check_transport_contents(
 
773
            'diff\ncontents for\na\n'
 
774
            'add\nsome\nmore\ncontents\n'
 
775
            'and\nthen\nsome\nmore\n',
 
776
            t, 'a')
 
777
        self.check_transport_contents(
 
778
                'contents\nfor b\n'
 
779
                'some\nmore\nfor\nb\n',
 
780
                t, 'b')
 
781
 
 
782
        self.assertEqual((62, 31),
 
783
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
784
                                 ('b', StringIO('from an iterator\n'))])))
 
785
        self.check_transport_contents(
 
786
            'diff\ncontents for\na\n'
 
787
            'add\nsome\nmore\ncontents\n'
 
788
            'and\nthen\nsome\nmore\n'
 
789
            'a little bit more\n',
 
790
            t, 'a')
 
791
        self.check_transport_contents(
 
792
                'contents\nfor b\n'
 
793
                'some\nmore\nfor\nb\n'
 
794
                'from an iterator\n',
 
795
                t, 'b')
 
796
 
 
797
        self.assertEqual((80, 0),
 
798
            t.append_multi([('a', StringIO('some text in a\n')),
 
799
                            ('d', StringIO('missing file r\n'))]))
 
800
 
 
801
        self.check_transport_contents(
 
802
            'diff\ncontents for\na\n'
 
803
            'add\nsome\nmore\ncontents\n'
 
804
            'and\nthen\nsome\nmore\n'
 
805
            'a little bit more\n'
 
806
            'some text in a\n',
 
807
            t, 'a')
 
808
        self.check_transport_contents('missing file r\n', t, 'd')
 
809
 
 
810
    def test_append_file_mode(self):
 
811
        """Check that append accepts a mode parameter"""
 
812
        # check append accepts a mode
 
813
        t = self.get_transport()
 
814
        if t.is_readonly():
 
815
            self.assertRaises(TransportNotPossible,
 
816
                t.append_file, 'f', StringIO('f'), mode=None)
 
817
            return
 
818
        t.append_file('f', StringIO('f'), mode=None)
 
819
 
 
820
    def test_append_bytes_mode(self):
 
821
        # check append_bytes accepts a mode
 
822
        t = self.get_transport()
 
823
        if t.is_readonly():
 
824
            self.assertRaises(TransportNotPossible,
 
825
                t.append_bytes, 'f', 'f', mode=None)
 
826
            return
 
827
        t.append_bytes('f', 'f', mode=None)
 
828
 
 
829
    def test_delete(self):
 
830
        # TODO: Test Transport.delete
 
831
        t = self.get_transport()
 
832
 
 
833
        # Not much to do with a readonly transport
 
834
        if t.is_readonly():
 
835
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
836
            return
 
837
 
 
838
        t.put_bytes('a', 'a little bit of text\n')
 
839
        self.assertTrue(t.has('a'))
 
840
        t.delete('a')
 
841
        self.assertFalse(t.has('a'))
 
842
 
 
843
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
844
 
 
845
        t.put_bytes('a', 'a text\n')
 
846
        t.put_bytes('b', 'b text\n')
 
847
        t.put_bytes('c', 'c text\n')
 
848
        self.assertEqual([True, True, True],
 
849
                list(t.has_multi(['a', 'b', 'c'])))
 
850
        t.delete_multi(['a', 'c'])
 
851
        self.assertEqual([False, True, False],
 
852
                list(t.has_multi(['a', 'b', 'c'])))
 
853
        self.assertFalse(t.has('a'))
 
854
        self.assertTrue(t.has('b'))
 
855
        self.assertFalse(t.has('c'))
 
856
 
 
857
        self.assertRaises(NoSuchFile,
 
858
                t.delete_multi, ['a', 'b', 'c'])
 
859
 
 
860
        self.assertRaises(NoSuchFile,
 
861
                t.delete_multi, iter(['a', 'b', 'c']))
 
862
 
 
863
        t.put_bytes('a', 'another a text\n')
 
864
        t.put_bytes('c', 'another c text\n')
 
865
        t.delete_multi(iter(['a', 'b', 'c']))
 
866
 
 
867
        # We should have deleted everything
 
868
        # SftpServer creates control files in the
 
869
        # working directory, so we can just do a
 
870
        # plain "listdir".
 
871
        # self.assertEqual([], os.listdir('.'))
 
872
 
 
873
    def test_recommended_page_size(self):
 
874
        """Transports recommend a page size for partial access to files."""
 
875
        t = self.get_transport()
 
876
        self.assertIsInstance(t.recommended_page_size(), int)
 
877
 
 
878
    def test_rmdir(self):
 
879
        t = self.get_transport()
 
880
        # Not much to do with a readonly transport
 
881
        if t.is_readonly():
 
882
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
883
            return
 
884
        t.mkdir('adir')
 
885
        t.mkdir('adir/bdir')
 
886
        t.rmdir('adir/bdir')
 
887
        # ftp may not be able to raise NoSuchFile for lack of
 
888
        # details when failing
 
889
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
890
        t.rmdir('adir')
 
891
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
892
 
 
893
    def test_rmdir_not_empty(self):
 
894
        """Deleting a non-empty directory raises an exception
 
895
 
 
896
        sftp (and possibly others) don't give us a specific "directory not
 
897
        empty" exception -- we can just see that the operation failed.
 
898
        """
 
899
        t = self.get_transport()
 
900
        if t.is_readonly():
 
901
            return
 
902
        t.mkdir('adir')
 
903
        t.mkdir('adir/bdir')
 
904
        self.assertRaises(PathError, t.rmdir, 'adir')
 
905
 
 
906
    def test_rmdir_empty_but_similar_prefix(self):
 
907
        """rmdir does not get confused by sibling paths.
 
908
 
 
909
        A naive implementation of MemoryTransport would refuse to rmdir
 
910
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
911
        uses "path.startswith(dir)" on all file paths to determine if directory
 
912
        is empty.
 
913
        """
 
914
        t = self.get_transport()
 
915
        if t.is_readonly():
 
916
            return
 
917
        t.mkdir('foo')
 
918
        t.put_bytes('foo-bar', '')
 
919
        t.mkdir('foo-baz')
 
920
        t.rmdir('foo')
 
921
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
922
        self.assertTrue(t.has('foo-bar'))
 
923
 
 
924
    def test_rename_dir_succeeds(self):
 
925
        t = self.get_transport()
 
926
        if t.is_readonly():
 
927
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
928
                              t.rename, 'foo', 'bar')
 
929
            return
 
930
        t.mkdir('adir')
 
931
        t.mkdir('adir/asubdir')
 
932
        t.rename('adir', 'bdir')
 
933
        self.assertTrue(t.has('bdir/asubdir'))
 
934
        self.assertFalse(t.has('adir'))
 
935
 
 
936
    def test_rename_dir_nonempty(self):
 
937
        """Attempting to replace a nonemtpy directory should fail"""
 
938
        t = self.get_transport()
 
939
        if t.is_readonly():
 
940
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
941
                              t.rename, 'foo', 'bar')
 
942
            return
 
943
        t.mkdir('adir')
 
944
        t.mkdir('adir/asubdir')
 
945
        t.mkdir('bdir')
 
946
        t.mkdir('bdir/bsubdir')
 
947
        # any kind of PathError would be OK, though we normally expect
 
948
        # DirectoryNotEmpty
 
949
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
950
        # nothing was changed so it should still be as before
 
951
        self.assertTrue(t.has('bdir/bsubdir'))
 
952
        self.assertFalse(t.has('adir/bdir'))
 
953
        self.assertFalse(t.has('adir/bsubdir'))
 
954
 
 
955
    def test_rename_across_subdirs(self):
 
956
        t = self.get_transport()
 
957
        if t.is_readonly():
 
958
            raise TestNotApplicable("transport is readonly")
 
959
        t.mkdir('a')
 
960
        t.mkdir('b')
 
961
        ta = t.clone('a')
 
962
        tb = t.clone('b')
 
963
        ta.put_bytes('f', 'aoeu')
 
964
        ta.rename('f', '../b/f')
 
965
        self.assertTrue(tb.has('f'))
 
966
        self.assertFalse(ta.has('f'))
 
967
        self.assertTrue(t.has('b/f'))
 
968
 
 
969
    def test_delete_tree(self):
 
970
        t = self.get_transport()
 
971
 
 
972
        # Not much to do with a readonly transport
 
973
        if t.is_readonly():
 
974
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
975
            return
 
976
 
 
977
        # and does it like listing ?
 
978
        t.mkdir('adir')
 
979
        try:
 
980
            t.delete_tree('adir')
 
981
        except TransportNotPossible:
 
982
            # ok, this transport does not support delete_tree
 
983
            return
 
984
 
 
985
        # did it delete that trivial case?
 
986
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
987
 
 
988
        self.build_tree(['adir/',
 
989
                         'adir/file',
 
990
                         'adir/subdir/',
 
991
                         'adir/subdir/file',
 
992
                         'adir/subdir2/',
 
993
                         'adir/subdir2/file',
 
994
                         ], transport=t)
 
995
 
 
996
        t.delete_tree('adir')
 
997
        # adir should be gone now.
 
998
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
999
 
 
1000
    def test_move(self):
 
1001
        t = self.get_transport()
 
1002
 
 
1003
        if t.is_readonly():
 
1004
            return
 
1005
 
 
1006
        # TODO: I would like to use os.listdir() to
 
1007
        # make sure there are no extra files, but SftpServer
 
1008
        # creates control files in the working directory
 
1009
        # perhaps all of this could be done in a subdirectory
 
1010
 
 
1011
        t.put_bytes('a', 'a first file\n')
 
1012
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
1013
 
 
1014
        t.move('a', 'b')
 
1015
        self.assertTrue(t.has('b'))
 
1016
        self.assertFalse(t.has('a'))
 
1017
 
 
1018
        self.check_transport_contents('a first file\n', t, 'b')
 
1019
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
1020
 
 
1021
        # Overwrite a file
 
1022
        t.put_bytes('c', 'c this file\n')
 
1023
        t.move('c', 'b')
 
1024
        self.assertFalse(t.has('c'))
 
1025
        self.check_transport_contents('c this file\n', t, 'b')
 
1026
 
 
1027
        # TODO: Try to write a test for atomicity
 
1028
        # TODO: Test moving into a non-existent subdirectory
 
1029
        # TODO: Test Transport.move_multi
 
1030
 
 
1031
    def test_copy(self):
 
1032
        t = self.get_transport()
 
1033
 
 
1034
        if t.is_readonly():
 
1035
            return
 
1036
 
 
1037
        t.put_bytes('a', 'a file\n')
 
1038
        t.copy('a', 'b')
 
1039
        self.check_transport_contents('a file\n', t, 'b')
 
1040
 
 
1041
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
1042
        os.mkdir('c')
 
1043
        # What should the assert be if you try to copy a
 
1044
        # file over a directory?
 
1045
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
1046
        t.put_bytes('d', 'text in d\n')
 
1047
        t.copy('d', 'b')
 
1048
        self.check_transport_contents('text in d\n', t, 'b')
 
1049
 
 
1050
        # TODO: test copy_multi
 
1051
 
 
1052
    def test_connection_error(self):
 
1053
        """ConnectionError is raised when connection is impossible.
 
1054
 
 
1055
        The error should be raised from the first operation on the transport.
 
1056
        """
 
1057
        try:
 
1058
            url = self._server.get_bogus_url()
 
1059
        except NotImplementedError:
 
1060
            raise TestSkipped("Transport %s has no bogus URL support." %
 
1061
                              self._server.__class__)
 
1062
        t = _mod_transport.get_transport_from_url(url)
 
1063
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
1064
 
 
1065
    def test_stat(self):
 
1066
        # TODO: Test stat, just try once, and if it throws, stop testing
 
1067
        from stat import S_ISDIR, S_ISREG
 
1068
 
 
1069
        t = self.get_transport()
 
1070
 
 
1071
        try:
 
1072
            st = t.stat('.')
 
1073
        except TransportNotPossible, e:
 
1074
            # This transport cannot stat
 
1075
            return
 
1076
 
 
1077
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
1078
        sizes = [14, 0, 16, 0, 18]
 
1079
        self.build_tree(paths, transport=t, line_endings='binary')
 
1080
 
 
1081
        for path, size in zip(paths, sizes):
 
1082
            st = t.stat(path)
 
1083
            if path.endswith('/'):
 
1084
                self.assertTrue(S_ISDIR(st.st_mode))
 
1085
                # directory sizes are meaningless
 
1086
            else:
 
1087
                self.assertTrue(S_ISREG(st.st_mode))
 
1088
                self.assertEqual(size, st.st_size)
 
1089
 
 
1090
        remote_stats = list(t.stat_multi(paths))
 
1091
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1092
 
 
1093
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
1094
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
1095
 
 
1096
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1097
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
1098
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
1099
        subdir = t.clone('subdir')
 
1100
        st = subdir.stat('./file')
 
1101
        st = subdir.stat('.')
 
1102
 
 
1103
    def test_hardlink(self):
 
1104
        from stat import ST_NLINK
 
1105
 
 
1106
        t = self.get_transport()
 
1107
 
 
1108
        source_name = "original_target"
 
1109
        link_name = "target_link"
 
1110
 
 
1111
        self.build_tree([source_name], transport=t)
 
1112
 
 
1113
        try:
 
1114
            t.hardlink(source_name, link_name)
 
1115
 
 
1116
            self.assertTrue(t.has(source_name))
 
1117
            self.assertTrue(t.has(link_name))
 
1118
 
 
1119
            st = t.stat(link_name)
 
1120
            self.assertEqual(st[ST_NLINK], 2)
 
1121
        except TransportNotPossible:
 
1122
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1123
                              self._server.__class__)
 
1124
 
 
1125
    def test_symlink(self):
 
1126
        from stat import S_ISLNK
 
1127
 
 
1128
        t = self.get_transport()
 
1129
 
 
1130
        source_name = "original_target"
 
1131
        link_name = "target_link"
 
1132
 
 
1133
        self.build_tree([source_name], transport=t)
 
1134
 
 
1135
        try:
 
1136
            t.symlink(source_name, link_name)
 
1137
 
 
1138
            self.assertTrue(t.has(source_name))
 
1139
            self.assertTrue(t.has(link_name))
 
1140
 
 
1141
            st = t.stat(link_name)
 
1142
            self.assertTrue(S_ISLNK(st.st_mode),
 
1143
                "expected symlink, got mode %o" % st.st_mode)
 
1144
        except TransportNotPossible:
 
1145
            raise TestSkipped("Transport %s does not support symlinks." %
 
1146
                              self._server.__class__)
 
1147
        except IOError:
 
1148
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1149
 
 
1150
    def test_list_dir(self):
 
1151
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1152
        t = self.get_transport()
 
1153
 
 
1154
        if not t.listable():
 
1155
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1156
            return
 
1157
 
 
1158
        def sorted_list(d, transport):
 
1159
            l = list(transport.list_dir(d))
 
1160
            l.sort()
 
1161
            return l
 
1162
 
 
1163
        self.assertEqual([], sorted_list('.', t))
 
1164
        # c2 is precisely one letter longer than c here to test that
 
1165
        # suffixing is not confused.
 
1166
        # a%25b checks that quoting is done consistently across transports
 
1167
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1168
 
 
1169
        if not t.is_readonly():
 
1170
            self.build_tree(tree_names, transport=t)
 
1171
        else:
 
1172
            self.build_tree(tree_names)
 
1173
 
 
1174
        self.assertEqual(
 
1175
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1176
        self.assertEqual(
 
1177
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1178
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1179
 
 
1180
        # Cloning the transport produces an equivalent listing
 
1181
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1182
 
 
1183
        if not t.is_readonly():
 
1184
            t.delete('c/d')
 
1185
            t.delete('b')
 
1186
        else:
 
1187
            os.unlink('c/d')
 
1188
            os.unlink('b')
 
1189
 
 
1190
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1191
        self.assertEqual(['e'], sorted_list('c', t))
 
1192
 
 
1193
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1194
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1195
        # 'a' is a file, list_dir should raise an error
 
1196
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1197
 
 
1198
    def test_list_dir_result_is_url_escaped(self):
 
1199
        t = self.get_transport()
 
1200
        if not t.listable():
 
1201
            raise TestSkipped("transport not listable")
 
1202
 
 
1203
        if not t.is_readonly():
 
1204
            self.build_tree(['a/', 'a/%'], transport=t)
 
1205
        else:
 
1206
            self.build_tree(['a/', 'a/%'])
 
1207
 
 
1208
        names = list(t.list_dir('a'))
 
1209
        self.assertEqual(['%25'], names)
 
1210
        self.assertIsInstance(names[0], str)
 
1211
 
 
1212
    def test_clone_preserve_info(self):
 
1213
        t1 = self.get_transport()
 
1214
        if not isinstance(t1, ConnectedTransport):
 
1215
            raise TestSkipped("not a connected transport")
 
1216
 
 
1217
        t2 = t1.clone('subdir')
 
1218
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1219
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1220
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1221
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1222
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
 
1223
 
 
1224
    def test__reuse_for(self):
 
1225
        t = self.get_transport()
 
1226
        if not isinstance(t, ConnectedTransport):
 
1227
            raise TestSkipped("not a connected transport")
 
1228
 
 
1229
        def new_url(scheme=None, user=None, password=None,
 
1230
                    host=None, port=None, path=None):
 
1231
            """Build a new url from t.base changing only parts of it.
 
1232
 
 
1233
            Only the parameters different from None will be changed.
 
1234
            """
 
1235
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1236
            if user     is None: user     = t._parsed_url.user
 
1237
            if password is None: password = t._parsed_url.password
 
1238
            if user     is None: user     = t._parsed_url.user
 
1239
            if host     is None: host     = t._parsed_url.host
 
1240
            if port     is None: port     = t._parsed_url.port
 
1241
            if path     is None: path     = t._parsed_url.path
 
1242
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1243
 
 
1244
        if t._parsed_url.scheme == 'ftp':
 
1245
            scheme = 'sftp'
 
1246
        else:
 
1247
            scheme = 'ftp'
 
1248
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1249
        if t._parsed_url.user == 'me':
 
1250
            user = 'you'
 
1251
        else:
 
1252
            user = 'me'
 
1253
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1254
        # passwords are not taken into account because:
 
1255
        # - it makes no sense to have two different valid passwords for the
 
1256
        #   same user
 
1257
        # - _password in ConnectedTransport is intended to collect what the
 
1258
        #   user specified from the command-line and there are cases where the
 
1259
        #   new url can contain no password (if the url was built from an
 
1260
        #   existing transport.base for example)
 
1261
        # - password are considered part of the credentials provided at
 
1262
        #   connection creation time and as such may not be present in the url
 
1263
        #   (they may be typed by the user when prompted for example)
 
1264
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1265
        # We will not connect, we can use a invalid host
 
1266
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1267
        if t._parsed_url.port == 1234:
 
1268
            port = 4321
 
1269
        else:
 
1270
            port = 1234
 
1271
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1272
        # No point in trying to reuse a transport for a local URL
 
1273
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1274
 
 
1275
    def test_connection_sharing(self):
 
1276
        t = self.get_transport()
 
1277
        if not isinstance(t, ConnectedTransport):
 
1278
            raise TestSkipped("not a connected transport")
 
1279
 
 
1280
        c = t.clone('subdir')
 
1281
        # Some transports will create the connection  only when needed
 
1282
        t.has('surely_not') # Force connection
 
1283
        self.assertIs(t._get_connection(), c._get_connection())
 
1284
 
 
1285
        # Temporary failure, we need to create a new dummy connection
 
1286
        new_connection = None
 
1287
        t._set_connection(new_connection)
 
1288
        # Check that both transports use the same connection
 
1289
        self.assertIs(new_connection, t._get_connection())
 
1290
        self.assertIs(new_connection, c._get_connection())
 
1291
 
 
1292
    def test_reuse_connection_for_various_paths(self):
 
1293
        t = self.get_transport()
 
1294
        if not isinstance(t, ConnectedTransport):
 
1295
            raise TestSkipped("not a connected transport")
 
1296
 
 
1297
        t.has('surely_not') # Force connection
 
1298
        self.assertIsNot(None, t._get_connection())
 
1299
 
 
1300
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1301
        self.assertIsNot(t, subdir)
 
1302
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1303
 
 
1304
        home = subdir._reuse_for(t.base + 'home')
 
1305
        self.assertIs(t._get_connection(), home._get_connection())
 
1306
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1307
 
 
1308
    def test_clone(self):
 
1309
        # TODO: Test that clone moves up and down the filesystem
 
1310
        t1 = self.get_transport()
 
1311
 
 
1312
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1313
 
 
1314
        self.assertTrue(t1.has('a'))
 
1315
        self.assertTrue(t1.has('b/c'))
 
1316
        self.assertFalse(t1.has('c'))
 
1317
 
 
1318
        t2 = t1.clone('b')
 
1319
        self.assertEqual(t1.base + 'b/', t2.base)
 
1320
 
 
1321
        self.assertTrue(t2.has('c'))
 
1322
        self.assertFalse(t2.has('a'))
 
1323
 
 
1324
        t3 = t2.clone('..')
 
1325
        self.assertTrue(t3.has('a'))
 
1326
        self.assertFalse(t3.has('c'))
 
1327
 
 
1328
        self.assertFalse(t1.has('b/d'))
 
1329
        self.assertFalse(t2.has('d'))
 
1330
        self.assertFalse(t3.has('b/d'))
 
1331
 
 
1332
        if t1.is_readonly():
 
1333
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1334
        else:
 
1335
            t2.put_bytes('d', 'newfile\n')
 
1336
 
 
1337
        self.assertTrue(t1.has('b/d'))
 
1338
        self.assertTrue(t2.has('d'))
 
1339
        self.assertTrue(t3.has('b/d'))
 
1340
 
 
1341
    def test_clone_to_root(self):
 
1342
        orig_transport = self.get_transport()
 
1343
        # Repeatedly go up to a parent directory until we're at the root
 
1344
        # directory of this transport
 
1345
        root_transport = orig_transport
 
1346
        new_transport = root_transport.clone("..")
 
1347
        # as we are walking up directories, the path must be
 
1348
        # growing less, except at the top
 
1349
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1350
            or new_transport.base == root_transport.base)
 
1351
        while new_transport.base != root_transport.base:
 
1352
            root_transport = new_transport
 
1353
            new_transport = root_transport.clone("..")
 
1354
            # as we are walking up directories, the path must be
 
1355
            # growing less, except at the top
 
1356
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1357
                or new_transport.base == root_transport.base)
 
1358
 
 
1359
        # Cloning to "/" should take us to exactly the same location.
 
1360
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1361
        # the abspath of "/" from the original transport should be the same
 
1362
        # as the base at the root:
 
1363
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1364
 
 
1365
        # At the root, the URL must still end with / as its a directory
 
1366
        self.assertEqual(root_transport.base[-1], '/')
 
1367
 
 
1368
    def test_clone_from_root(self):
 
1369
        """At the root, cloning to a simple dir should just do string append."""
 
1370
        orig_transport = self.get_transport()
 
1371
        root_transport = orig_transport.clone('/')
 
1372
        self.assertEqual(root_transport.base + '.bzr/',
 
1373
            root_transport.clone('.bzr').base)
 
1374
 
 
1375
    def test_base_url(self):
 
1376
        t = self.get_transport()
 
1377
        self.assertEqual('/', t.base[-1])
 
1378
 
 
1379
    def test_relpath(self):
 
1380
        t = self.get_transport()
 
1381
        self.assertEqual('', t.relpath(t.base))
 
1382
        # base ends with /
 
1383
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1384
        # subdirs which don't exist should still give relpaths.
 
1385
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1386
        # trailing slash should be the same.
 
1387
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1388
 
 
1389
    def test_relpath_at_root(self):
 
1390
        t = self.get_transport()
 
1391
        # clone all the way to the top
 
1392
        new_transport = t.clone('..')
 
1393
        while new_transport.base != t.base:
 
1394
            t = new_transport
 
1395
            new_transport = t.clone('..')
 
1396
        # we must be able to get a relpath below the root
 
1397
        self.assertEqual('', t.relpath(t.base))
 
1398
        # and a deeper one should work too
 
1399
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1400
 
 
1401
    def test_abspath(self):
 
1402
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1403
        # that have aliasing problems like symlinks should go in backend
 
1404
        # specific test cases.
 
1405
        transport = self.get_transport()
 
1406
 
 
1407
        self.assertEqual(transport.base + 'relpath',
 
1408
                         transport.abspath('relpath'))
 
1409
 
 
1410
        # This should work without raising an error.
 
1411
        transport.abspath("/")
 
1412
 
 
1413
        # the abspath of "/" and "/foo/.." should result in the same location
 
1414
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1415
 
 
1416
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1417
                         transport.abspath("/foo"))
 
1418
 
 
1419
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1420
    def test_win32_abspath(self):
 
1421
        # Note: we tried to set sys.platform='win32' so we could test on
 
1422
        # other platforms too, but then osutils does platform specific
 
1423
        # things at import time which defeated us...
 
1424
        if sys.platform != 'win32':
 
1425
            raise TestSkipped(
 
1426
                'Testing drive letters in abspath implemented only for win32')
 
1427
 
 
1428
        # smoke test for abspath on win32.
 
1429
        # a transport based on 'file:///' never fully qualifies the drive.
 
1430
        transport = _mod_transport.get_transport_from_url("file:///")
 
1431
        self.assertEqual(transport.abspath("/"), "file:///")
 
1432
 
 
1433
        # but a transport that starts with a drive spec must keep it.
 
1434
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1435
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1436
 
 
1437
    def test_local_abspath(self):
 
1438
        transport = self.get_transport()
 
1439
        try:
 
1440
            p = transport.local_abspath('.')
 
1441
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1442
            # should be formattable
 
1443
            s = str(e)
 
1444
        else:
 
1445
            self.assertEqual(getcwd(), p)
 
1446
 
 
1447
    def test_abspath_at_root(self):
 
1448
        t = self.get_transport()
 
1449
        # clone all the way to the top
 
1450
        new_transport = t.clone('..')
 
1451
        while new_transport.base != t.base:
 
1452
            t = new_transport
 
1453
            new_transport = t.clone('..')
 
1454
        # we must be able to get a abspath of the root when we ask for
 
1455
        # t.abspath('..') - this due to our choice that clone('..')
 
1456
        # should return the root from the root, combined with the desire that
 
1457
        # the url from clone('..') and from abspath('..') should be the same.
 
1458
        self.assertEqual(t.base, t.abspath('..'))
 
1459
        # '' should give us the root
 
1460
        self.assertEqual(t.base, t.abspath(''))
 
1461
        # and a path should append to the url
 
1462
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1463
 
 
1464
    def test_iter_files_recursive(self):
 
1465
        transport = self.get_transport()
 
1466
        if not transport.listable():
 
1467
            self.assertRaises(TransportNotPossible,
 
1468
                              transport.iter_files_recursive)
 
1469
            return
 
1470
        self.build_tree(['isolated/',
 
1471
                         'isolated/dir/',
 
1472
                         'isolated/dir/foo',
 
1473
                         'isolated/dir/bar',
 
1474
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1475
                         'isolated/bar'],
 
1476
                        transport=transport)
 
1477
        paths = set(transport.iter_files_recursive())
 
1478
        # nb the directories are not converted
 
1479
        self.assertEqual(paths,
 
1480
                    set(['isolated/dir/foo',
 
1481
                         'isolated/dir/bar',
 
1482
                         'isolated/dir/b%2525z',
 
1483
                         'isolated/bar']))
 
1484
        sub_transport = transport.clone('isolated')
 
1485
        paths = set(sub_transport.iter_files_recursive())
 
1486
        self.assertEqual(paths,
 
1487
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1488
 
 
1489
    def test_copy_tree(self):
 
1490
        # TODO: test file contents and permissions are preserved. This test was
 
1491
        # added just to ensure that quoting was handled correctly.
 
1492
        # -- David Allouche 2006-08-11
 
1493
        transport = self.get_transport()
 
1494
        if not transport.listable():
 
1495
            self.assertRaises(TransportNotPossible,
 
1496
                              transport.iter_files_recursive)
 
1497
            return
 
1498
        if transport.is_readonly():
 
1499
            return
 
1500
        self.build_tree(['from/',
 
1501
                         'from/dir/',
 
1502
                         'from/dir/foo',
 
1503
                         'from/dir/bar',
 
1504
                         'from/dir/b%25z', # make sure quoting is correct
 
1505
                         'from/bar'],
 
1506
                        transport=transport)
 
1507
        transport.copy_tree('from', 'to')
 
1508
        paths = set(transport.iter_files_recursive())
 
1509
        self.assertEqual(paths,
 
1510
                    set(['from/dir/foo',
 
1511
                         'from/dir/bar',
 
1512
                         'from/dir/b%2525z',
 
1513
                         'from/bar',
 
1514
                         'to/dir/foo',
 
1515
                         'to/dir/bar',
 
1516
                         'to/dir/b%2525z',
 
1517
                         'to/bar',]))
 
1518
 
 
1519
    def test_copy_tree_to_transport(self):
 
1520
        transport = self.get_transport()
 
1521
        if not transport.listable():
 
1522
            self.assertRaises(TransportNotPossible,
 
1523
                              transport.iter_files_recursive)
 
1524
            return
 
1525
        if transport.is_readonly():
 
1526
            return
 
1527
        self.build_tree(['from/',
 
1528
                         'from/dir/',
 
1529
                         'from/dir/foo',
 
1530
                         'from/dir/bar',
 
1531
                         'from/dir/b%25z', # make sure quoting is correct
 
1532
                         'from/bar'],
 
1533
                        transport=transport)
 
1534
        from_transport = transport.clone('from')
 
1535
        to_transport = transport.clone('to')
 
1536
        to_transport.ensure_base()
 
1537
        from_transport.copy_tree_to_transport(to_transport)
 
1538
        paths = set(transport.iter_files_recursive())
 
1539
        self.assertEqual(paths,
 
1540
                    set(['from/dir/foo',
 
1541
                         'from/dir/bar',
 
1542
                         'from/dir/b%2525z',
 
1543
                         'from/bar',
 
1544
                         'to/dir/foo',
 
1545
                         'to/dir/bar',
 
1546
                         'to/dir/b%2525z',
 
1547
                         'to/bar',]))
 
1548
 
 
1549
    def test_unicode_paths(self):
 
1550
        """Test that we can read/write files with Unicode names."""
 
1551
        t = self.get_transport()
 
1552
 
 
1553
        # With FAT32 and certain encodings on win32
 
1554
        # '\xe5' and '\xe4' actually map to the same file
 
1555
        # adding a suffix kicks in the 'preserving but insensitive'
 
1556
        # route, and maintains the right files
 
1557
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1558
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1559
                 u'\u017d', # Z with umlat iso-8859-2
 
1560
                 u'\u062c', # Arabic j
 
1561
                 u'\u0410', # Russian A
 
1562
                 u'\u65e5', # Kanji person
 
1563
                ]
 
1564
 
 
1565
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1566
        if no_unicode_support:
 
1567
            self.knownFailure("test server cannot handle unicode paths")
 
1568
 
 
1569
        try:
 
1570
            self.build_tree(files, transport=t, line_endings='binary')
 
1571
        except UnicodeError:
 
1572
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1573
 
 
1574
        # A plain unicode string is not a valid url
 
1575
        for fname in files:
 
1576
            self.assertRaises(InvalidURL, t.get, fname)
 
1577
 
 
1578
        for fname in files:
 
1579
            fname_utf8 = fname.encode('utf-8')
 
1580
            contents = 'contents of %s\n' % (fname_utf8,)
 
1581
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1582
 
 
1583
    def test_connect_twice_is_same_content(self):
 
1584
        # check that our server (whatever it is) is accessible reliably
 
1585
        # via get_transport and multiple connections share content.
 
1586
        transport = self.get_transport()
 
1587
        if transport.is_readonly():
 
1588
            return
 
1589
        transport.put_bytes('foo', 'bar')
 
1590
        transport3 = self.get_transport()
 
1591
        self.check_transport_contents('bar', transport3, 'foo')
 
1592
 
 
1593
        # now opening at a relative url should give use a sane result:
 
1594
        transport.mkdir('newdir')
 
1595
        transport5 = self.get_transport('newdir')
 
1596
        transport6 = transport5.clone('..')
 
1597
        self.check_transport_contents('bar', transport6, 'foo')
 
1598
 
 
1599
    def test_lock_write(self):
 
1600
        """Test transport-level write locks.
 
1601
 
 
1602
        These are deprecated and transports may decline to support them.
 
1603
        """
 
1604
        transport = self.get_transport()
 
1605
        if transport.is_readonly():
 
1606
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1607
            return
 
1608
        transport.put_bytes('lock', '')
 
1609
        try:
 
1610
            lock = transport.lock_write('lock')
 
1611
        except TransportNotPossible:
 
1612
            return
 
1613
        # TODO make this consistent on all platforms:
 
1614
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1615
        lock.unlock()
 
1616
 
 
1617
    def test_lock_read(self):
 
1618
        """Test transport-level read locks.
 
1619
 
 
1620
        These are deprecated and transports may decline to support them.
 
1621
        """
 
1622
        transport = self.get_transport()
 
1623
        if transport.is_readonly():
 
1624
            file('lock', 'w').close()
 
1625
        else:
 
1626
            transport.put_bytes('lock', '')
 
1627
        try:
 
1628
            lock = transport.lock_read('lock')
 
1629
        except TransportNotPossible:
 
1630
            return
 
1631
        # TODO make this consistent on all platforms:
 
1632
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1633
        lock.unlock()
 
1634
 
 
1635
    def test_readv(self):
 
1636
        transport = self.get_transport()
 
1637
        if transport.is_readonly():
 
1638
            with file('a', 'w') as f: f.write('0123456789')
 
1639
        else:
 
1640
            transport.put_bytes('a', '0123456789')
 
1641
 
 
1642
        d = list(transport.readv('a', ((0, 1),)))
 
1643
        self.assertEqual(d[0], (0, '0'))
 
1644
 
 
1645
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1646
        self.assertEqual(d[0], (0, '0'))
 
1647
        self.assertEqual(d[1], (1, '1'))
 
1648
        self.assertEqual(d[2], (3, '34'))
 
1649
        self.assertEqual(d[3], (9, '9'))
 
1650
 
 
1651
    def test_readv_out_of_order(self):
 
1652
        transport = self.get_transport()
 
1653
        if transport.is_readonly():
 
1654
            with file('a', 'w') as f: f.write('0123456789')
 
1655
        else:
 
1656
            transport.put_bytes('a', '01234567890')
 
1657
 
 
1658
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1659
        self.assertEqual(d[0], (1, '1'))
 
1660
        self.assertEqual(d[1], (9, '9'))
 
1661
        self.assertEqual(d[2], (0, '0'))
 
1662
        self.assertEqual(d[3], (3, '34'))
 
1663
 
 
1664
    def test_readv_with_adjust_for_latency(self):
 
1665
        transport = self.get_transport()
 
1666
        # the adjust for latency flag expands the data region returned
 
1667
        # according to a per-transport heuristic, so testing is a little
 
1668
        # tricky as we need more data than the largest combining that our
 
1669
        # transports do. To accomodate this we generate random data and cross
 
1670
        # reference the returned data with the random data. To avoid doing
 
1671
        # multiple large random byte look ups we do several tests on the same
 
1672
        # backing data.
 
1673
        content = osutils.rand_bytes(200*1024)
 
1674
        content_size = len(content)
 
1675
        if transport.is_readonly():
 
1676
            self.build_tree_contents([('a', content)])
 
1677
        else:
 
1678
            transport.put_bytes('a', content)
 
1679
        def check_result_data(result_vector):
 
1680
            for item in result_vector:
 
1681
                data_len = len(item[1])
 
1682
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1683
 
 
1684
        # start corner case
 
1685
        result = list(transport.readv('a', ((0, 30),),
 
1686
            adjust_for_latency=True, upper_limit=content_size))
 
1687
        # we expect 1 result, from 0, to something > 30
 
1688
        self.assertEqual(1, len(result))
 
1689
        self.assertEqual(0, result[0][0])
 
1690
        self.assertTrue(len(result[0][1]) >= 30)
 
1691
        check_result_data(result)
 
1692
        # end of file corner case
 
1693
        result = list(transport.readv('a', ((204700, 100),),
 
1694
            adjust_for_latency=True, upper_limit=content_size))
 
1695
        # we expect 1 result, from 204800- its length, to the end
 
1696
        self.assertEqual(1, len(result))
 
1697
        data_len = len(result[0][1])
 
1698
        self.assertEqual(204800-data_len, result[0][0])
 
1699
        self.assertTrue(data_len >= 100)
 
1700
        check_result_data(result)
 
1701
        # out of order ranges are made in order
 
1702
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1703
            adjust_for_latency=True, upper_limit=content_size))
 
1704
        # we expect 2 results, in order, start and end.
 
1705
        self.assertEqual(2, len(result))
 
1706
        # start
 
1707
        data_len = len(result[0][1])
 
1708
        self.assertEqual(0, result[0][0])
 
1709
        self.assertTrue(data_len >= 30)
 
1710
        # end
 
1711
        data_len = len(result[1][1])
 
1712
        self.assertEqual(204800-data_len, result[1][0])
 
1713
        self.assertTrue(data_len >= 100)
 
1714
        check_result_data(result)
 
1715
        # close ranges get combined (even if out of order)
 
1716
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1717
            result = list(transport.readv('a', request_vector,
 
1718
                adjust_for_latency=True, upper_limit=content_size))
 
1719
            self.assertEqual(1, len(result))
 
1720
            data_len = len(result[0][1])
 
1721
            # minimum length is from 400 to 1034 - 634
 
1722
            self.assertTrue(data_len >= 634)
 
1723
            # must contain the region 400 to 1034
 
1724
            self.assertTrue(result[0][0] <= 400)
 
1725
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1726
            check_result_data(result)
 
1727
 
 
1728
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1729
        transport = self.get_transport()
 
1730
        # test from observed failure case.
 
1731
        if transport.is_readonly():
 
1732
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1733
        else:
 
1734
            transport.put_bytes('a', 'a'*1024*1024)
 
1735
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1736
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1737
            (465373, 800), (947422, 800)]
 
1738
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1739
        found_items = [False]*9
 
1740
        for pos, (start, length) in enumerate(broken_vector):
 
1741
            # check the range is covered by the result
 
1742
            for offset, data in results:
 
1743
                if offset <= start and start + length <= offset + len(data):
 
1744
                    found_items[pos] = True
 
1745
        self.assertEqual([True]*9, found_items)
 
1746
 
 
1747
    def test_get_with_open_write_stream_sees_all_content(self):
 
1748
        t = self.get_transport()
 
1749
        if t.is_readonly():
 
1750
            return
 
1751
        handle = t.open_write_stream('foo')
 
1752
        try:
 
1753
            handle.write('bcd')
 
1754
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1755
        finally:
 
1756
            handle.close()
 
1757
 
 
1758
    def test_get_smart_medium(self):
 
1759
        """All transports must either give a smart medium, or know they can't.
 
1760
        """
 
1761
        transport = self.get_transport()
 
1762
        try:
 
1763
            client_medium = transport.get_smart_medium()
 
1764
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1765
        except errors.NoSmartMedium:
 
1766
            # as long as we got it we're fine
 
1767
            pass
 
1768
 
 
1769
    def test_readv_short_read(self):
 
1770
        transport = self.get_transport()
 
1771
        if transport.is_readonly():
 
1772
            with file('a', 'w') as f: f.write('0123456789')
 
1773
        else:
 
1774
            transport.put_bytes('a', '01234567890')
 
1775
 
 
1776
        # This is intentionally reading off the end of the file
 
1777
        # since we are sure that it cannot get there
 
1778
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1779
                               # Can be raised by paramiko
 
1780
                               AssertionError),
 
1781
                              transport.readv, 'a', [(1,1), (8,10)])
 
1782
 
 
1783
        # This is trying to seek past the end of the file, it should
 
1784
        # also raise a special error
 
1785
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1786
                              transport.readv, 'a', [(12,2)])
 
1787
 
 
1788
    def test_no_segment_parameters(self):
 
1789
        """Segment parameters should be stripped and stored in
 
1790
        transport.segment_parameters."""
 
1791
        transport = self.get_transport("foo")
 
1792
        self.assertEquals({}, transport.get_segment_parameters())
 
1793
 
 
1794
    def test_segment_parameters(self):
 
1795
        """Segment parameters should be stripped and stored in
 
1796
        transport.get_segment_parameters()."""
 
1797
        base_url = self._server.get_url()
 
1798
        parameters = {"key1": "val1", "key2": "val2"}
 
1799
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1800
        transport = _mod_transport.get_transport_from_url(url)
 
1801
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1802
 
 
1803
    def test_set_segment_parameters(self):
 
1804
        """Segment parameters can be set and show up in base."""
 
1805
        transport = self.get_transport("foo")
 
1806
        orig_base = transport.base
 
1807
        transport.set_segment_parameter("arm", "board")
 
1808
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
 
1809
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
 
1810
        transport.set_segment_parameter("arm", None)
 
1811
        transport.set_segment_parameter("nonexistant", None)
 
1812
        self.assertEquals({}, transport.get_segment_parameters())
 
1813
        self.assertEquals(orig_base, transport.base)
 
1814
 
 
1815
    def test_stat_symlink(self):
 
1816
        # if a transport points directly to a symlink (and supports symlinks
 
1817
        # at all) you can tell this.  helps with bug 32669.
 
1818
        t = self.get_transport()
 
1819
        try:
 
1820
            t.symlink('target', 'link')
 
1821
        except TransportNotPossible:
 
1822
            raise TestSkipped("symlinks not supported")
 
1823
        t2 = t.clone('link')
 
1824
        st = t2.stat('')
 
1825
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1826
 
 
1827
    def test_abspath_url_unquote_unreserved(self):
 
1828
        """URLs from abspath should have unreserved characters unquoted
 
1829
        
 
1830
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1831
        """
 
1832
        t = self.get_transport()
 
1833
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1834
        self.assertEqual(t.base + "-.09AZ_az~",
 
1835
            t.abspath(needlessly_escaped_dir))
 
1836
 
 
1837
    def test_clone_url_unquote_unreserved(self):
 
1838
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1839
        
 
1840
        Cloned transports should be prefix comparable for things like the
 
1841
        isolation checking of tests, see lp:842223 for more.
 
1842
        """
 
1843
        t1 = self.get_transport()
 
1844
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1845
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1846
        t2 = t1.clone(needlessly_escaped_dir)
 
1847
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1848
 
 
1849
    def test_hook_post_connection_one(self):
 
1850
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1851
        log = []
 
1852
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1853
        t = self.get_transport()
 
1854
        self.assertEqual([], log)
 
1855
        t.has("non-existant")
 
1856
        if isinstance(t, RemoteTransport):
 
1857
            self.assertEqual([t.get_smart_medium()], log)
 
1858
        elif isinstance(t, ConnectedTransport):
 
1859
            self.assertEqual([t], log)
 
1860
        else:
 
1861
            self.assertEqual([], log)
 
1862
 
 
1863
    def test_hook_post_connection_multi(self):
 
1864
        """Fire post_connect hook once per unshared underlying connection"""
 
1865
        log = []
 
1866
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1867
        t1 = self.get_transport()
 
1868
        t2 = t1.clone(".")
 
1869
        t3 = self.get_transport()
 
1870
        self.assertEqual([], log)
 
1871
        t1.has("x")
 
1872
        t2.has("x")
 
1873
        t3.has("x")
 
1874
        if isinstance(t1, RemoteTransport):
 
1875
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1876
        elif isinstance(t1, ConnectedTransport):
 
1877
            self.assertEqual([t1, t3], log)
 
1878
        else:
 
1879
            self.assertEqual([], log)