~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2017-01-17 15:02:46 UTC
  • mto: (6619.1.2 trunk)
  • mto: This revision was merged to the branch mainline in revision 6620.
  • Revision ID: v.ladeuil+lp@free.fr-20170117150246-ajkli2g5lh94s2c4
Fix test failure for recent versions of diff

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import itertools
 
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
 
27
import stat
 
28
import sys
 
29
 
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
 
36
    urlutils,
 
37
    )
 
38
from bzrlib.errors import (ConnectionError,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           NoSuchFile,
 
42
                           PathError,
 
43
                           TransportNotPossible,
 
44
                           )
 
45
from bzrlib.osutils import getcwd
 
46
from bzrlib.smart import medium
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
 
53
from bzrlib.tests.test_transport import TestTransportImplementation
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(standard_tests, module, loader):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
 
97
 
 
98
 
 
99
class TransportTests(TestTransportImplementation):
 
100
 
 
101
    def setUp(self):
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
104
 
 
105
    def check_transport_contents(self, content, transport, relpath):
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
 
149
    def test_has(self):
 
150
        t = self.get_transport()
 
151
 
 
152
        files = ['a', 'b', 'e', 'g', '%']
 
153
        self.build_tree(files, transport=t)
 
154
        self.assertEqual(True, t.has('a'))
 
155
        self.assertEqual(False, t.has('c'))
 
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
 
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
 
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
170
 
 
171
    def test_has_root_works(self):
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
            raise TestNotApplicable(
 
174
                "SmartTCPServer_for_testing intentionally does not allow "
 
175
                "access to /.")
 
176
        current_transport = self.get_transport()
 
177
        self.assertTrue(current_transport.has('/'))
 
178
        root = current_transport.clone('/')
 
179
        self.assertTrue(root.has(''))
 
180
 
 
181
    def test_get(self):
 
182
        t = self.get_transport()
 
183
 
 
184
        files = ['a', 'b', 'e', 'g']
 
185
        contents = ['contents of a\n',
 
186
                    'contents of b\n',
 
187
                    'contents of e\n',
 
188
                    'contents of g\n',
 
189
                    ]
 
190
        self.build_tree(files, transport=t, line_endings='binary')
 
191
        self.check_transport_contents('contents of a\n', t, 'a')
 
192
        content_f = t.get_multi(files)
 
193
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
194
        # evaluate their inputs, the transport requests should be issued and
 
195
        # handled sequentially (we don't want to force transport to buffer).
 
196
        for content, f in itertools.izip(contents, content_f):
 
197
            self.assertEqual(content, f.read())
 
198
 
 
199
        content_f = t.get_multi(iter(files))
 
200
        # Use itertools.izip() for the same reason
 
201
        for content, f in itertools.izip(contents, content_f):
 
202
            self.assertEqual(content, f.read())
 
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
 
211
        self.assertRaises(NoSuchFile, t.get, 'c')
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
 
223
 
 
224
    def test_get_directory_read_gives_ReadError(self):
 
225
        """consistent errors for read() on a file returned by get()."""
 
226
        t = self.get_transport()
 
227
        if t.is_readonly():
 
228
            self.build_tree(['a directory/'])
 
229
        else:
 
230
            t.mkdir('a%20directory')
 
231
        # getting the file must either work or fail with a PathError
 
232
        try:
 
233
            a_file = t.get('a%20directory')
 
234
        except (errors.PathError, errors.RedirectRequested):
 
235
            # early failure return immediately.
 
236
            return
 
237
        # having got a file, read() must either work (i.e. http reading a dir
 
238
        # listing) or fail with ReadError
 
239
        try:
 
240
            a_file.read()
 
241
        except errors.ReadError:
 
242
            pass
 
243
 
 
244
    def test_get_bytes(self):
 
245
        t = self.get_transport()
 
246
 
 
247
        files = ['a', 'b', 'e', 'g']
 
248
        contents = ['contents of a\n',
 
249
                    'contents of b\n',
 
250
                    'contents of e\n',
 
251
                    'contents of g\n',
 
252
                    ]
 
253
        self.build_tree(files, transport=t, line_endings='binary')
 
254
        self.check_transport_contents('contents of a\n', t, 'a')
 
255
 
 
256
        for content, fname in zip(contents, files):
 
257
            self.assertEqual(content, t.get_bytes(fname))
 
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
 
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
262
 
 
263
    def test_get_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
        finally:
 
272
            handle.close()
 
273
 
 
274
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
275
        t = self.get_transport()
 
276
        if t.is_readonly():
 
277
            return
 
278
        handle = t.open_write_stream('foo')
 
279
        try:
 
280
            handle.write('b')
 
281
            self.assertEqual('b', t.get_bytes('foo'))
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
 
287
        finally:
 
288
            handle.close()
 
289
 
 
290
    def test_put_bytes(self):
 
291
        t = self.get_transport()
 
292
 
 
293
        if t.is_readonly():
 
294
            self.assertRaises(TransportNotPossible,
 
295
                    t.put_bytes, 'a', 'some text for a\n')
 
296
            return
 
297
 
 
298
        t.put_bytes('a', 'some text for a\n')
 
299
        self.assertTrue(t.has('a'))
 
300
        self.check_transport_contents('some text for a\n', t, 'a')
 
301
 
 
302
        # The contents should be overwritten
 
303
        t.put_bytes('a', 'new text for a\n')
 
304
        self.check_transport_contents('new text for a\n', t, 'a')
 
305
 
 
306
        self.assertRaises(NoSuchFile,
 
307
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
308
 
 
309
    def test_put_bytes_non_atomic(self):
 
310
        t = self.get_transport()
 
311
 
 
312
        if t.is_readonly():
 
313
            self.assertRaises(TransportNotPossible,
 
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
315
            return
 
316
 
 
317
        self.assertFalse(t.has('a'))
 
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
319
        self.assertTrue(t.has('a'))
 
320
        self.check_transport_contents('some text for a\n', t, 'a')
 
321
        # Put also replaces contents
 
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
323
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
324
 
 
325
        # Make sure we can create another file
 
326
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
327
        # And overwrite 'a' with empty contents
 
328
        t.put_bytes_non_atomic('a', '')
 
329
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
330
        self.check_transport_contents('', t, 'a')
 
331
 
 
332
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
333
                                       'contents\n')
 
334
        # Now test the create_parent flag
 
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
336
                                       'contents\n')
 
337
        self.assertFalse(t.has('dir/a'))
 
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
339
                               create_parent_dir=True)
 
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
341
 
 
342
        # But we still get NoSuchFile if we can't make the parent dir
 
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
344
                                       'contents\n',
 
345
                                       create_parent_dir=True)
 
346
 
 
347
    def test_put_bytes_permissions(self):
 
348
        t = self.get_transport()
 
349
 
 
350
        if t.is_readonly():
 
351
            return
 
352
        if not t._can_roundtrip_unix_modebits():
 
353
            # Can't roundtrip, so no need to run this test
 
354
            return
 
355
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
356
        self.assertTransportMode(t, 'mode644', 0644)
 
357
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
358
        self.assertTransportMode(t, 'mode666', 0666)
 
359
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
360
        self.assertTransportMode(t, 'mode600', 0600)
 
361
        # Yes, you can put_bytes a file such that it becomes readonly
 
362
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
363
        self.assertTransportMode(t, 'mode400', 0400)
 
364
 
 
365
        # The default permissions should be based on the current umask
 
366
        umask = osutils.get_umask()
 
367
        t.put_bytes('nomode', 'test text\n', mode=None)
 
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
369
 
 
370
    def test_put_bytes_non_atomic_permissions(self):
 
371
        t = self.get_transport()
 
372
 
 
373
        if t.is_readonly():
 
374
            return
 
375
        if not t._can_roundtrip_unix_modebits():
 
376
            # Can't roundtrip, so no need to run this test
 
377
            return
 
378
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
379
        self.assertTransportMode(t, 'mode644', 0644)
 
380
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
381
        self.assertTransportMode(t, 'mode666', 0666)
 
382
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
383
        self.assertTransportMode(t, 'mode600', 0600)
 
384
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
385
        self.assertTransportMode(t, 'mode400', 0400)
 
386
 
 
387
        # The default permissions should be based on the current umask
 
388
        umask = osutils.get_umask()
 
389
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
390
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
391
 
 
392
        # We should also be able to set the mode for a parent directory
 
393
        # when it is created
 
394
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
395
                               dir_mode=0700, create_parent_dir=True)
 
396
        self.assertTransportMode(t, 'dir700', 0700)
 
397
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
398
                               dir_mode=0770, create_parent_dir=True)
 
399
        self.assertTransportMode(t, 'dir770', 0770)
 
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
401
                               dir_mode=0777, create_parent_dir=True)
 
402
        self.assertTransportMode(t, 'dir777', 0777)
 
403
 
 
404
    def test_put_file(self):
 
405
        t = self.get_transport()
 
406
 
 
407
        if t.is_readonly():
 
408
            self.assertRaises(TransportNotPossible,
 
409
                    t.put_file, 'a', StringIO('some text for a\n'))
 
410
            return
 
411
 
 
412
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        # put_file returns the length of the data written
 
414
        self.assertEqual(16, result)
 
415
        self.assertTrue(t.has('a'))
 
416
        self.check_transport_contents('some text for a\n', t, 'a')
 
417
        # Put also replaces contents
 
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        self.assertEqual(19, result)
 
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
421
        self.assertRaises(NoSuchFile,
 
422
                          t.put_file, 'path/doesnt/exist/c',
 
423
                              StringIO('contents'))
 
424
 
 
425
    def test_put_file_non_atomic(self):
 
426
        t = self.get_transport()
 
427
 
 
428
        if t.is_readonly():
 
429
            self.assertRaises(TransportNotPossible,
 
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
431
            return
 
432
 
 
433
        self.assertFalse(t.has('a'))
 
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
435
        self.assertTrue(t.has('a'))
 
436
        self.check_transport_contents('some text for a\n', t, 'a')
 
437
        # Put also replaces contents
 
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
439
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
440
 
 
441
        # Make sure we can create another file
 
442
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
443
        # And overwrite 'a' with empty contents
 
444
        t.put_file_non_atomic('a', StringIO(''))
 
445
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
446
        self.check_transport_contents('', t, 'a')
 
447
 
 
448
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
449
                                       StringIO('contents\n'))
 
450
        # Now test the create_parent flag
 
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
452
                                       StringIO('contents\n'))
 
453
        self.assertFalse(t.has('dir/a'))
 
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
455
                              create_parent_dir=True)
 
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
457
 
 
458
        # But we still get NoSuchFile if we can't make the parent dir
 
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
460
                                       StringIO('contents\n'),
 
461
                                       create_parent_dir=True)
 
462
 
 
463
    def test_put_file_permissions(self):
 
464
 
 
465
        t = self.get_transport()
 
466
 
 
467
        if t.is_readonly():
 
468
            return
 
469
        if not t._can_roundtrip_unix_modebits():
 
470
            # Can't roundtrip, so no need to run this test
 
471
            return
 
472
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
473
        self.assertTransportMode(t, 'mode644', 0644)
 
474
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
475
        self.assertTransportMode(t, 'mode666', 0666)
 
476
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
477
        self.assertTransportMode(t, 'mode600', 0600)
 
478
        # Yes, you can put a file such that it becomes readonly
 
479
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
480
        self.assertTransportMode(t, 'mode400', 0400)
 
481
        # The default permissions should be based on the current umask
 
482
        umask = osutils.get_umask()
 
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
485
 
 
486
    def test_put_file_non_atomic_permissions(self):
 
487
        t = self.get_transport()
 
488
 
 
489
        if t.is_readonly():
 
490
            return
 
491
        if not t._can_roundtrip_unix_modebits():
 
492
            # Can't roundtrip, so no need to run this test
 
493
            return
 
494
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
495
        self.assertTransportMode(t, 'mode644', 0644)
 
496
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
497
        self.assertTransportMode(t, 'mode666', 0666)
 
498
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
499
        self.assertTransportMode(t, 'mode600', 0600)
 
500
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
501
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
502
        self.assertTransportMode(t, 'mode400', 0400)
 
503
 
 
504
        # The default permissions should be based on the current umask
 
505
        umask = osutils.get_umask()
 
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
508
 
 
509
        # We should also be able to set the mode for a parent directory
 
510
        # when it is created
 
511
        sio = StringIO()
 
512
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
513
                              dir_mode=0700, create_parent_dir=True)
 
514
        self.assertTransportMode(t, 'dir700', 0700)
 
515
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
516
                              dir_mode=0770, create_parent_dir=True)
 
517
        self.assertTransportMode(t, 'dir770', 0770)
 
518
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
519
                              dir_mode=0777, create_parent_dir=True)
 
520
        self.assertTransportMode(t, 'dir777', 0777)
 
521
 
 
522
    def test_put_bytes_unicode(self):
 
523
        t = self.get_transport()
 
524
        if t.is_readonly():
 
525
            return
 
526
        unicode_string = u'\u1234'
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
528
 
 
529
    def test_mkdir(self):
 
530
        t = self.get_transport()
 
531
 
 
532
        if t.is_readonly():
 
533
            # cannot mkdir on readonly transports. We're not testing for
 
534
            # cache coherency because cache behaviour is not currently
 
535
            # defined for the transport interface.
 
536
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
537
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
538
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
539
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
540
            return
 
541
        # Test mkdir
 
542
        t.mkdir('dir_a')
 
543
        self.assertEqual(t.has('dir_a'), True)
 
544
        self.assertEqual(t.has('dir_b'), False)
 
545
 
 
546
        t.mkdir('dir_b')
 
547
        self.assertEqual(t.has('dir_b'), True)
 
548
 
 
549
        t.mkdir_multi(['dir_c', 'dir_d'])
 
550
 
 
551
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
552
        self.assertEqual(list(t.has_multi(
 
553
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
554
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
555
            [True, True, True, False,
 
556
             True, True, True, True])
 
557
 
 
558
        # we were testing that a local mkdir followed by a transport
 
559
        # mkdir failed thusly, but given that we * in one process * do not
 
560
        # concurrently fiddle with disk dirs and then use transport to do
 
561
        # things, the win here seems marginal compared to the constraint on
 
562
        # the interface. RBC 20051227
 
563
        t.mkdir('dir_g')
 
564
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
565
 
 
566
        # Test get/put in sub-directories
 
567
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
568
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
569
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
570
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
571
 
 
572
        # mkdir of a dir with an absent parent
 
573
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
574
 
 
575
    def test_mkdir_permissions(self):
 
576
        t = self.get_transport()
 
577
        if t.is_readonly():
 
578
            return
 
579
        if not t._can_roundtrip_unix_modebits():
 
580
            # no sense testing on this transport
 
581
            return
 
582
        # Test mkdir with a mode
 
583
        t.mkdir('dmode755', mode=0755)
 
584
        self.assertTransportMode(t, 'dmode755', 0755)
 
585
        t.mkdir('dmode555', mode=0555)
 
586
        self.assertTransportMode(t, 'dmode555', 0555)
 
587
        t.mkdir('dmode777', mode=0777)
 
588
        self.assertTransportMode(t, 'dmode777', 0777)
 
589
        t.mkdir('dmode700', mode=0700)
 
590
        self.assertTransportMode(t, 'dmode700', 0700)
 
591
        t.mkdir_multi(['mdmode755'], mode=0755)
 
592
        self.assertTransportMode(t, 'mdmode755', 0755)
 
593
 
 
594
        # Default mode should be based on umask
 
595
        umask = osutils.get_umask()
 
596
        t.mkdir('dnomode', mode=None)
 
597
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
598
 
 
599
    def test_opening_a_file_stream_creates_file(self):
 
600
        t = self.get_transport()
 
601
        if t.is_readonly():
 
602
            return
 
603
        handle = t.open_write_stream('foo')
 
604
        try:
 
605
            self.assertEqual('', t.get_bytes('foo'))
 
606
        finally:
 
607
            handle.close()
 
608
 
 
609
    def test_opening_a_file_stream_can_set_mode(self):
 
610
        t = self.get_transport()
 
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
 
614
            return
 
615
        if not t._can_roundtrip_unix_modebits():
 
616
            # Can't roundtrip, so no need to run this test
 
617
            return
 
618
 
 
619
        def check_mode(name, mode, expected):
 
620
            handle = t.open_write_stream(name, mode=mode)
 
621
            handle.close()
 
622
            self.assertTransportMode(t, name, expected)
 
623
        check_mode('mode644', 0644, 0644)
 
624
        check_mode('mode666', 0666, 0666)
 
625
        check_mode('mode600', 0600, 0600)
 
626
        # The default permissions should be based on the current umask
 
627
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
628
 
 
629
    def test_copy_to(self):
 
630
        # FIXME: test:   same server to same server (partly done)
 
631
        # same protocol two servers
 
632
        # and    different protocols (done for now except for MemoryTransport.
 
633
        # - RBC 20060122
 
634
 
 
635
        def simple_copy_files(transport_from, transport_to):
 
636
            files = ['a', 'b', 'c', 'd']
 
637
            self.build_tree(files, transport=transport_from)
 
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
639
            for f in files:
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
 
641
                                              transport_from, f)
 
642
 
 
643
        t = self.get_transport()
 
644
        temp_transport = MemoryTransport('memory:///')
 
645
        simple_copy_files(t, temp_transport)
 
646
        if not t.is_readonly():
 
647
            t.mkdir('copy_to_simple')
 
648
            t2 = t.clone('copy_to_simple')
 
649
            simple_copy_files(t, t2)
 
650
 
 
651
 
 
652
        # Test that copying into a missing directory raises
 
653
        # NoSuchFile
 
654
        if t.is_readonly():
 
655
            self.build_tree(['e/', 'e/f'])
 
656
        else:
 
657
            t.mkdir('e')
 
658
            t.put_bytes('e/f', 'contents of e')
 
659
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
660
        temp_transport.mkdir('e')
 
661
        t.copy_to(['e/f'], temp_transport)
 
662
 
 
663
        del temp_transport
 
664
        temp_transport = MemoryTransport('memory:///')
 
665
 
 
666
        files = ['a', 'b', 'c', 'd']
 
667
        t.copy_to(iter(files), temp_transport)
 
668
        for f in files:
 
669
            self.check_transport_contents(temp_transport.get_bytes(f),
 
670
                                          t, f)
 
671
        del temp_transport
 
672
 
 
673
        for mode in (0666, 0644, 0600, 0400):
 
674
            temp_transport = MemoryTransport("memory:///")
 
675
            t.copy_to(files, temp_transport, mode=mode)
 
676
            for f in files:
 
677
                self.assertTransportMode(temp_transport, f, mode)
 
678
 
 
679
    def test_create_prefix(self):
 
680
        t = self.get_transport()
 
681
        sub = t.clone('foo').clone('bar')
 
682
        try:
 
683
            sub.create_prefix()
 
684
        except TransportNotPossible:
 
685
            self.assertTrue(t.is_readonly())
 
686
        else:
 
687
            self.assertTrue(t.has('foo/bar'))
 
688
 
 
689
    def test_append_file(self):
 
690
        t = self.get_transport()
 
691
 
 
692
        if t.is_readonly():
 
693
            self.assertRaises(TransportNotPossible,
 
694
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
695
            return
 
696
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
697
        t.put_bytes('b', 'contents\nfor b\n')
 
698
 
 
699
        self.assertEqual(20,
 
700
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
701
 
 
702
        self.check_transport_contents(
 
703
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
704
            t, 'a')
 
705
 
 
706
        # a file with no parent should fail..
 
707
        self.assertRaises(NoSuchFile,
 
708
                          t.append_file, 'missing/path', StringIO('content'))
 
709
 
 
710
        # And we can create new files, too
 
711
        self.assertEqual(0,
 
712
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
713
        self.check_transport_contents('some text\nfor a missing file\n',
 
714
                                      t, 'c')
 
715
 
 
716
    def test_append_bytes(self):
 
717
        t = self.get_transport()
 
718
 
 
719
        if t.is_readonly():
 
720
            self.assertRaises(TransportNotPossible,
 
721
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
722
            return
 
723
 
 
724
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
725
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
726
 
 
727
        self.assertEqual(20,
 
728
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
729
 
 
730
        self.check_transport_contents(
 
731
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
732
            t, 'a')
 
733
 
 
734
        # a file with no parent should fail..
 
735
        self.assertRaises(NoSuchFile,
 
736
                          t.append_bytes, 'missing/path', 'content')
 
737
 
 
738
    def test_append_multi(self):
 
739
        t = self.get_transport()
 
740
 
 
741
        if t.is_readonly():
 
742
            return
 
743
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
744
                         'add\nsome\nmore\ncontents\n')
 
745
        t.put_bytes('b', 'contents\nfor b\n')
 
746
 
 
747
        self.assertEqual((43, 15),
 
748
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
749
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
750
 
 
751
        self.check_transport_contents(
 
752
            'diff\ncontents for\na\n'
 
753
            'add\nsome\nmore\ncontents\n'
 
754
            'and\nthen\nsome\nmore\n',
 
755
            t, 'a')
 
756
        self.check_transport_contents(
 
757
                'contents\nfor b\n'
 
758
                'some\nmore\nfor\nb\n',
 
759
                t, 'b')
 
760
 
 
761
        self.assertEqual((62, 31),
 
762
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
763
                                 ('b', StringIO('from an iterator\n'))])))
 
764
        self.check_transport_contents(
 
765
            'diff\ncontents for\na\n'
 
766
            'add\nsome\nmore\ncontents\n'
 
767
            'and\nthen\nsome\nmore\n'
 
768
            'a little bit more\n',
 
769
            t, 'a')
 
770
        self.check_transport_contents(
 
771
                'contents\nfor b\n'
 
772
                'some\nmore\nfor\nb\n'
 
773
                'from an iterator\n',
 
774
                t, 'b')
 
775
 
 
776
        self.assertEqual((80, 0),
 
777
            t.append_multi([('a', StringIO('some text in a\n')),
 
778
                            ('d', StringIO('missing file r\n'))]))
 
779
 
 
780
        self.check_transport_contents(
 
781
            'diff\ncontents for\na\n'
 
782
            'add\nsome\nmore\ncontents\n'
 
783
            'and\nthen\nsome\nmore\n'
 
784
            'a little bit more\n'
 
785
            'some text in a\n',
 
786
            t, 'a')
 
787
        self.check_transport_contents('missing file r\n', t, 'd')
 
788
 
 
789
    def test_append_file_mode(self):
 
790
        """Check that append accepts a mode parameter"""
 
791
        # check append accepts a mode
 
792
        t = self.get_transport()
 
793
        if t.is_readonly():
 
794
            self.assertRaises(TransportNotPossible,
 
795
                t.append_file, 'f', StringIO('f'), mode=None)
 
796
            return
 
797
        t.append_file('f', StringIO('f'), mode=None)
 
798
 
 
799
    def test_append_bytes_mode(self):
 
800
        # check append_bytes accepts a mode
 
801
        t = self.get_transport()
 
802
        if t.is_readonly():
 
803
            self.assertRaises(TransportNotPossible,
 
804
                t.append_bytes, 'f', 'f', mode=None)
 
805
            return
 
806
        t.append_bytes('f', 'f', mode=None)
 
807
 
 
808
    def test_delete(self):
 
809
        # TODO: Test Transport.delete
 
810
        t = self.get_transport()
 
811
 
 
812
        # Not much to do with a readonly transport
 
813
        if t.is_readonly():
 
814
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
815
            return
 
816
 
 
817
        t.put_bytes('a', 'a little bit of text\n')
 
818
        self.assertTrue(t.has('a'))
 
819
        t.delete('a')
 
820
        self.assertFalse(t.has('a'))
 
821
 
 
822
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
823
 
 
824
        t.put_bytes('a', 'a text\n')
 
825
        t.put_bytes('b', 'b text\n')
 
826
        t.put_bytes('c', 'c text\n')
 
827
        self.assertEqual([True, True, True],
 
828
                list(t.has_multi(['a', 'b', 'c'])))
 
829
        t.delete_multi(['a', 'c'])
 
830
        self.assertEqual([False, True, False],
 
831
                list(t.has_multi(['a', 'b', 'c'])))
 
832
        self.assertFalse(t.has('a'))
 
833
        self.assertTrue(t.has('b'))
 
834
        self.assertFalse(t.has('c'))
 
835
 
 
836
        self.assertRaises(NoSuchFile,
 
837
                t.delete_multi, ['a', 'b', 'c'])
 
838
 
 
839
        self.assertRaises(NoSuchFile,
 
840
                t.delete_multi, iter(['a', 'b', 'c']))
 
841
 
 
842
        t.put_bytes('a', 'another a text\n')
 
843
        t.put_bytes('c', 'another c text\n')
 
844
        t.delete_multi(iter(['a', 'b', 'c']))
 
845
 
 
846
        # We should have deleted everything
 
847
        # SftpServer creates control files in the
 
848
        # working directory, so we can just do a
 
849
        # plain "listdir".
 
850
        # self.assertEqual([], os.listdir('.'))
 
851
 
 
852
    def test_recommended_page_size(self):
 
853
        """Transports recommend a page size for partial access to files."""
 
854
        t = self.get_transport()
 
855
        self.assertIsInstance(t.recommended_page_size(), int)
 
856
 
 
857
    def test_rmdir(self):
 
858
        t = self.get_transport()
 
859
        # Not much to do with a readonly transport
 
860
        if t.is_readonly():
 
861
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
862
            return
 
863
        t.mkdir('adir')
 
864
        t.mkdir('adir/bdir')
 
865
        t.rmdir('adir/bdir')
 
866
        # ftp may not be able to raise NoSuchFile for lack of
 
867
        # details when failing
 
868
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
869
        t.rmdir('adir')
 
870
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
871
 
 
872
    def test_rmdir_not_empty(self):
 
873
        """Deleting a non-empty directory raises an exception
 
874
 
 
875
        sftp (and possibly others) don't give us a specific "directory not
 
876
        empty" exception -- we can just see that the operation failed.
 
877
        """
 
878
        t = self.get_transport()
 
879
        if t.is_readonly():
 
880
            return
 
881
        t.mkdir('adir')
 
882
        t.mkdir('adir/bdir')
 
883
        self.assertRaises(PathError, t.rmdir, 'adir')
 
884
 
 
885
    def test_rmdir_empty_but_similar_prefix(self):
 
886
        """rmdir does not get confused by sibling paths.
 
887
 
 
888
        A naive implementation of MemoryTransport would refuse to rmdir
 
889
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
890
        uses "path.startswith(dir)" on all file paths to determine if directory
 
891
        is empty.
 
892
        """
 
893
        t = self.get_transport()
 
894
        if t.is_readonly():
 
895
            return
 
896
        t.mkdir('foo')
 
897
        t.put_bytes('foo-bar', '')
 
898
        t.mkdir('foo-baz')
 
899
        t.rmdir('foo')
 
900
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
901
        self.assertTrue(t.has('foo-bar'))
 
902
 
 
903
    def test_rename_dir_succeeds(self):
 
904
        t = self.get_transport()
 
905
        if t.is_readonly():
 
906
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
907
                              t.rename, 'foo', 'bar')
 
908
            return
 
909
        t.mkdir('adir')
 
910
        t.mkdir('adir/asubdir')
 
911
        t.rename('adir', 'bdir')
 
912
        self.assertTrue(t.has('bdir/asubdir'))
 
913
        self.assertFalse(t.has('adir'))
 
914
 
 
915
    def test_rename_dir_nonempty(self):
 
916
        """Attempting to replace a nonemtpy directory should fail"""
 
917
        t = self.get_transport()
 
918
        if t.is_readonly():
 
919
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
920
                              t.rename, 'foo', 'bar')
 
921
            return
 
922
        t.mkdir('adir')
 
923
        t.mkdir('adir/asubdir')
 
924
        t.mkdir('bdir')
 
925
        t.mkdir('bdir/bsubdir')
 
926
        # any kind of PathError would be OK, though we normally expect
 
927
        # DirectoryNotEmpty
 
928
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
929
        # nothing was changed so it should still be as before
 
930
        self.assertTrue(t.has('bdir/bsubdir'))
 
931
        self.assertFalse(t.has('adir/bdir'))
 
932
        self.assertFalse(t.has('adir/bsubdir'))
 
933
 
 
934
    def test_rename_across_subdirs(self):
 
935
        t = self.get_transport()
 
936
        if t.is_readonly():
 
937
            raise TestNotApplicable("transport is readonly")
 
938
        t.mkdir('a')
 
939
        t.mkdir('b')
 
940
        ta = t.clone('a')
 
941
        tb = t.clone('b')
 
942
        ta.put_bytes('f', 'aoeu')
 
943
        ta.rename('f', '../b/f')
 
944
        self.assertTrue(tb.has('f'))
 
945
        self.assertFalse(ta.has('f'))
 
946
        self.assertTrue(t.has('b/f'))
 
947
 
 
948
    def test_delete_tree(self):
 
949
        t = self.get_transport()
 
950
 
 
951
        # Not much to do with a readonly transport
 
952
        if t.is_readonly():
 
953
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
954
            return
 
955
 
 
956
        # and does it like listing ?
 
957
        t.mkdir('adir')
 
958
        try:
 
959
            t.delete_tree('adir')
 
960
        except TransportNotPossible:
 
961
            # ok, this transport does not support delete_tree
 
962
            return
 
963
 
 
964
        # did it delete that trivial case?
 
965
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
966
 
 
967
        self.build_tree(['adir/',
 
968
                         'adir/file',
 
969
                         'adir/subdir/',
 
970
                         'adir/subdir/file',
 
971
                         'adir/subdir2/',
 
972
                         'adir/subdir2/file',
 
973
                         ], transport=t)
 
974
 
 
975
        t.delete_tree('adir')
 
976
        # adir should be gone now.
 
977
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
978
 
 
979
    def test_move(self):
 
980
        t = self.get_transport()
 
981
 
 
982
        if t.is_readonly():
 
983
            return
 
984
 
 
985
        # TODO: I would like to use os.listdir() to
 
986
        # make sure there are no extra files, but SftpServer
 
987
        # creates control files in the working directory
 
988
        # perhaps all of this could be done in a subdirectory
 
989
 
 
990
        t.put_bytes('a', 'a first file\n')
 
991
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
 
992
 
 
993
        t.move('a', 'b')
 
994
        self.assertTrue(t.has('b'))
 
995
        self.assertFalse(t.has('a'))
 
996
 
 
997
        self.check_transport_contents('a first file\n', t, 'b')
 
998
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
 
999
 
 
1000
        # Overwrite a file
 
1001
        t.put_bytes('c', 'c this file\n')
 
1002
        t.move('c', 'b')
 
1003
        self.assertFalse(t.has('c'))
 
1004
        self.check_transport_contents('c this file\n', t, 'b')
 
1005
 
 
1006
        # TODO: Try to write a test for atomicity
 
1007
        # TODO: Test moving into a non-existent subdirectory
 
1008
        # TODO: Test Transport.move_multi
 
1009
 
 
1010
    def test_copy(self):
 
1011
        t = self.get_transport()
 
1012
 
 
1013
        if t.is_readonly():
 
1014
            return
 
1015
 
 
1016
        t.put_bytes('a', 'a file\n')
 
1017
        t.copy('a', 'b')
 
1018
        self.check_transport_contents('a file\n', t, 'b')
 
1019
 
 
1020
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
1021
        os.mkdir('c')
 
1022
        # What should the assert be if you try to copy a
 
1023
        # file over a directory?
 
1024
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
1025
        t.put_bytes('d', 'text in d\n')
 
1026
        t.copy('d', 'b')
 
1027
        self.check_transport_contents('text in d\n', t, 'b')
 
1028
 
 
1029
        # TODO: test copy_multi
 
1030
 
 
1031
    def test_connection_error(self):
 
1032
        """ConnectionError is raised when connection is impossible.
 
1033
 
 
1034
        The error should be raised from the first operation on the transport.
 
1035
        """
 
1036
        try:
 
1037
            url = self._server.get_bogus_url()
 
1038
        except NotImplementedError:
 
1039
            raise TestSkipped("Transport %s has no bogus URL support." %
 
1040
                              self._server.__class__)
 
1041
        t = _mod_transport.get_transport_from_url(url)
 
1042
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
1043
 
 
1044
    def test_stat(self):
 
1045
        # TODO: Test stat, just try once, and if it throws, stop testing
 
1046
        from stat import S_ISDIR, S_ISREG
 
1047
 
 
1048
        t = self.get_transport()
 
1049
 
 
1050
        try:
 
1051
            st = t.stat('.')
 
1052
        except TransportNotPossible, e:
 
1053
            # This transport cannot stat
 
1054
            return
 
1055
 
 
1056
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
1057
        sizes = [14, 0, 16, 0, 18]
 
1058
        self.build_tree(paths, transport=t, line_endings='binary')
 
1059
 
 
1060
        for path, size in zip(paths, sizes):
 
1061
            st = t.stat(path)
 
1062
            if path.endswith('/'):
 
1063
                self.assertTrue(S_ISDIR(st.st_mode))
 
1064
                # directory sizes are meaningless
 
1065
            else:
 
1066
                self.assertTrue(S_ISREG(st.st_mode))
 
1067
                self.assertEqual(size, st.st_size)
 
1068
 
 
1069
        remote_stats = list(t.stat_multi(paths))
 
1070
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1071
 
 
1072
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
1073
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
1074
 
 
1075
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1076
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
1077
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
1078
        subdir = t.clone('subdir')
 
1079
        st = subdir.stat('./file')
 
1080
        st = subdir.stat('.')
 
1081
 
 
1082
    def test_hardlink(self):
 
1083
        from stat import ST_NLINK
 
1084
 
 
1085
        t = self.get_transport()
 
1086
 
 
1087
        source_name = "original_target"
 
1088
        link_name = "target_link"
 
1089
 
 
1090
        self.build_tree([source_name], transport=t)
 
1091
 
 
1092
        try:
 
1093
            t.hardlink(source_name, link_name)
 
1094
 
 
1095
            self.assertTrue(t.has(source_name))
 
1096
            self.assertTrue(t.has(link_name))
 
1097
 
 
1098
            st = t.stat(link_name)
 
1099
            self.assertEqual(st[ST_NLINK], 2)
 
1100
        except TransportNotPossible:
 
1101
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1102
                              self._server.__class__)
 
1103
 
 
1104
    def test_symlink(self):
 
1105
        from stat import S_ISLNK
 
1106
 
 
1107
        t = self.get_transport()
 
1108
 
 
1109
        source_name = "original_target"
 
1110
        link_name = "target_link"
 
1111
 
 
1112
        self.build_tree([source_name], transport=t)
 
1113
 
 
1114
        try:
 
1115
            t.symlink(source_name, link_name)
 
1116
 
 
1117
            self.assertTrue(t.has(source_name))
 
1118
            self.assertTrue(t.has(link_name))
 
1119
 
 
1120
            st = t.stat(link_name)
 
1121
            self.assertTrue(S_ISLNK(st.st_mode),
 
1122
                "expected symlink, got mode %o" % st.st_mode)
 
1123
        except TransportNotPossible:
 
1124
            raise TestSkipped("Transport %s does not support symlinks." %
 
1125
                              self._server.__class__)
 
1126
        except IOError:
 
1127
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1128
 
 
1129
    def test_list_dir(self):
 
1130
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1131
        t = self.get_transport()
 
1132
 
 
1133
        if not t.listable():
 
1134
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1135
            return
 
1136
 
 
1137
        def sorted_list(d, transport):
 
1138
            l = list(transport.list_dir(d))
 
1139
            l.sort()
 
1140
            return l
 
1141
 
 
1142
        self.assertEqual([], sorted_list('.', t))
 
1143
        # c2 is precisely one letter longer than c here to test that
 
1144
        # suffixing is not confused.
 
1145
        # a%25b checks that quoting is done consistently across transports
 
1146
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1147
 
 
1148
        if not t.is_readonly():
 
1149
            self.build_tree(tree_names, transport=t)
 
1150
        else:
 
1151
            self.build_tree(tree_names)
 
1152
 
 
1153
        self.assertEqual(
 
1154
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1155
        self.assertEqual(
 
1156
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1157
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1158
 
 
1159
        # Cloning the transport produces an equivalent listing
 
1160
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1161
 
 
1162
        if not t.is_readonly():
 
1163
            t.delete('c/d')
 
1164
            t.delete('b')
 
1165
        else:
 
1166
            os.unlink('c/d')
 
1167
            os.unlink('b')
 
1168
 
 
1169
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1170
        self.assertEqual(['e'], sorted_list('c', t))
 
1171
 
 
1172
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1173
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1174
        # 'a' is a file, list_dir should raise an error
 
1175
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1176
 
 
1177
    def test_list_dir_result_is_url_escaped(self):
 
1178
        t = self.get_transport()
 
1179
        if not t.listable():
 
1180
            raise TestSkipped("transport not listable")
 
1181
 
 
1182
        if not t.is_readonly():
 
1183
            self.build_tree(['a/', 'a/%'], transport=t)
 
1184
        else:
 
1185
            self.build_tree(['a/', 'a/%'])
 
1186
 
 
1187
        names = list(t.list_dir('a'))
 
1188
        self.assertEqual(['%25'], names)
 
1189
        self.assertIsInstance(names[0], str)
 
1190
 
 
1191
    def test_clone_preserve_info(self):
 
1192
        t1 = self.get_transport()
 
1193
        if not isinstance(t1, ConnectedTransport):
 
1194
            raise TestSkipped("not a connected transport")
 
1195
 
 
1196
        t2 = t1.clone('subdir')
 
1197
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1202
 
 
1203
    def test__reuse_for(self):
 
1204
        t = self.get_transport()
 
1205
        if not isinstance(t, ConnectedTransport):
 
1206
            raise TestSkipped("not a connected transport")
 
1207
 
 
1208
        def new_url(scheme=None, user=None, password=None,
 
1209
                    host=None, port=None, path=None):
 
1210
            """Build a new url from t.base changing only parts of it.
 
1211
 
 
1212
            Only the parameters different from None will be changed.
 
1213
            """
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1222
 
 
1223
        if t._parsed_url.scheme == 'ftp':
 
1224
            scheme = 'sftp'
 
1225
        else:
 
1226
            scheme = 'ftp'
 
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1228
        if t._parsed_url.user == 'me':
 
1229
            user = 'you'
 
1230
        else:
 
1231
            user = 'me'
 
1232
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1233
        # passwords are not taken into account because:
 
1234
        # - it makes no sense to have two different valid passwords for the
 
1235
        #   same user
 
1236
        # - _password in ConnectedTransport is intended to collect what the
 
1237
        #   user specified from the command-line and there are cases where the
 
1238
        #   new url can contain no password (if the url was built from an
 
1239
        #   existing transport.base for example)
 
1240
        # - password are considered part of the credentials provided at
 
1241
        #   connection creation time and as such may not be present in the url
 
1242
        #   (they may be typed by the user when prompted for example)
 
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1244
        # We will not connect, we can use a invalid host
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
 
1247
            port = 4321
 
1248
        else:
 
1249
            port = 1234
 
1250
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1251
        # No point in trying to reuse a transport for a local URL
 
1252
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1253
 
 
1254
    def test_connection_sharing(self):
 
1255
        t = self.get_transport()
 
1256
        if not isinstance(t, ConnectedTransport):
 
1257
            raise TestSkipped("not a connected transport")
 
1258
 
 
1259
        c = t.clone('subdir')
 
1260
        # Some transports will create the connection  only when needed
 
1261
        t.has('surely_not') # Force connection
 
1262
        self.assertIs(t._get_connection(), c._get_connection())
 
1263
 
 
1264
        # Temporary failure, we need to create a new dummy connection
 
1265
        new_connection = None
 
1266
        t._set_connection(new_connection)
 
1267
        # Check that both transports use the same connection
 
1268
        self.assertIs(new_connection, t._get_connection())
 
1269
        self.assertIs(new_connection, c._get_connection())
 
1270
 
 
1271
    def test_reuse_connection_for_various_paths(self):
 
1272
        t = self.get_transport()
 
1273
        if not isinstance(t, ConnectedTransport):
 
1274
            raise TestSkipped("not a connected transport")
 
1275
 
 
1276
        t.has('surely_not') # Force connection
 
1277
        self.assertIsNot(None, t._get_connection())
 
1278
 
 
1279
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1280
        self.assertIsNot(t, subdir)
 
1281
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1282
 
 
1283
        home = subdir._reuse_for(t.base + 'home')
 
1284
        self.assertIs(t._get_connection(), home._get_connection())
 
1285
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1286
 
 
1287
    def test_clone(self):
 
1288
        # TODO: Test that clone moves up and down the filesystem
 
1289
        t1 = self.get_transport()
 
1290
 
 
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1292
 
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
 
1296
 
 
1297
        t2 = t1.clone('b')
 
1298
        self.assertEqual(t1.base + 'b/', t2.base)
 
1299
 
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
 
1302
 
 
1303
        t3 = t2.clone('..')
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
 
1306
 
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
 
1310
 
 
1311
        if t1.is_readonly():
 
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1313
        else:
 
1314
            t2.put_bytes('d', 'newfile\n')
 
1315
 
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
 
1319
 
 
1320
    def test_clone_to_root(self):
 
1321
        orig_transport = self.get_transport()
 
1322
        # Repeatedly go up to a parent directory until we're at the root
 
1323
        # directory of this transport
 
1324
        root_transport = orig_transport
 
1325
        new_transport = root_transport.clone("..")
 
1326
        # as we are walking up directories, the path must be
 
1327
        # growing less, except at the top
 
1328
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1329
            or new_transport.base == root_transport.base)
 
1330
        while new_transport.base != root_transport.base:
 
1331
            root_transport = new_transport
 
1332
            new_transport = root_transport.clone("..")
 
1333
            # as we are walking up directories, the path must be
 
1334
            # growing less, except at the top
 
1335
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1336
                or new_transport.base == root_transport.base)
 
1337
 
 
1338
        # Cloning to "/" should take us to exactly the same location.
 
1339
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1340
        # the abspath of "/" from the original transport should be the same
 
1341
        # as the base at the root:
 
1342
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1343
 
 
1344
        # At the root, the URL must still end with / as its a directory
 
1345
        self.assertEqual(root_transport.base[-1], '/')
 
1346
 
 
1347
    def test_clone_from_root(self):
 
1348
        """At the root, cloning to a simple dir should just do string append."""
 
1349
        orig_transport = self.get_transport()
 
1350
        root_transport = orig_transport.clone('/')
 
1351
        self.assertEqual(root_transport.base + '.bzr/',
 
1352
            root_transport.clone('.bzr').base)
 
1353
 
 
1354
    def test_base_url(self):
 
1355
        t = self.get_transport()
 
1356
        self.assertEqual('/', t.base[-1])
 
1357
 
 
1358
    def test_relpath(self):
 
1359
        t = self.get_transport()
 
1360
        self.assertEqual('', t.relpath(t.base))
 
1361
        # base ends with /
 
1362
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1363
        # subdirs which don't exist should still give relpaths.
 
1364
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1365
        # trailing slash should be the same.
 
1366
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1367
 
 
1368
    def test_relpath_at_root(self):
 
1369
        t = self.get_transport()
 
1370
        # clone all the way to the top
 
1371
        new_transport = t.clone('..')
 
1372
        while new_transport.base != t.base:
 
1373
            t = new_transport
 
1374
            new_transport = t.clone('..')
 
1375
        # we must be able to get a relpath below the root
 
1376
        self.assertEqual('', t.relpath(t.base))
 
1377
        # and a deeper one should work too
 
1378
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1379
 
 
1380
    def test_abspath(self):
 
1381
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1382
        # that have aliasing problems like symlinks should go in backend
 
1383
        # specific test cases.
 
1384
        transport = self.get_transport()
 
1385
 
 
1386
        self.assertEqual(transport.base + 'relpath',
 
1387
                         transport.abspath('relpath'))
 
1388
 
 
1389
        # This should work without raising an error.
 
1390
        transport.abspath("/")
 
1391
 
 
1392
        # the abspath of "/" and "/foo/.." should result in the same location
 
1393
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1394
 
 
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1396
                         transport.abspath("/foo"))
 
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1399
    def test_win32_abspath(self):
 
1400
        # Note: we tried to set sys.platform='win32' so we could test on
 
1401
        # other platforms too, but then osutils does platform specific
 
1402
        # things at import time which defeated us...
 
1403
        if sys.platform != 'win32':
 
1404
            raise TestSkipped(
 
1405
                'Testing drive letters in abspath implemented only for win32')
 
1406
 
 
1407
        # smoke test for abspath on win32.
 
1408
        # a transport based on 'file:///' never fully qualifies the drive.
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
 
 
1412
        # but a transport that starts with a drive spec must keep it.
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
 
 
1416
    def test_local_abspath(self):
 
1417
        transport = self.get_transport()
 
1418
        try:
 
1419
            p = transport.local_abspath('.')
 
1420
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1421
            # should be formattable
 
1422
            s = str(e)
 
1423
        else:
 
1424
            self.assertEqual(getcwd(), p)
 
1425
 
 
1426
    def test_abspath_at_root(self):
 
1427
        t = self.get_transport()
 
1428
        # clone all the way to the top
 
1429
        new_transport = t.clone('..')
 
1430
        while new_transport.base != t.base:
 
1431
            t = new_transport
 
1432
            new_transport = t.clone('..')
 
1433
        # we must be able to get a abspath of the root when we ask for
 
1434
        # t.abspath('..') - this due to our choice that clone('..')
 
1435
        # should return the root from the root, combined with the desire that
 
1436
        # the url from clone('..') and from abspath('..') should be the same.
 
1437
        self.assertEqual(t.base, t.abspath('..'))
 
1438
        # '' should give us the root
 
1439
        self.assertEqual(t.base, t.abspath(''))
 
1440
        # and a path should append to the url
 
1441
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1442
 
 
1443
    def test_iter_files_recursive(self):
 
1444
        transport = self.get_transport()
 
1445
        if not transport.listable():
 
1446
            self.assertRaises(TransportNotPossible,
 
1447
                              transport.iter_files_recursive)
 
1448
            return
 
1449
        self.build_tree(['isolated/',
 
1450
                         'isolated/dir/',
 
1451
                         'isolated/dir/foo',
 
1452
                         'isolated/dir/bar',
 
1453
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1454
                         'isolated/bar'],
 
1455
                        transport=transport)
 
1456
        paths = set(transport.iter_files_recursive())
 
1457
        # nb the directories are not converted
 
1458
        self.assertEqual(paths,
 
1459
                    set(['isolated/dir/foo',
 
1460
                         'isolated/dir/bar',
 
1461
                         'isolated/dir/b%2525z',
 
1462
                         'isolated/bar']))
 
1463
        sub_transport = transport.clone('isolated')
 
1464
        paths = set(sub_transport.iter_files_recursive())
 
1465
        self.assertEqual(paths,
 
1466
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1467
 
 
1468
    def test_copy_tree(self):
 
1469
        # TODO: test file contents and permissions are preserved. This test was
 
1470
        # added just to ensure that quoting was handled correctly.
 
1471
        # -- David Allouche 2006-08-11
 
1472
        transport = self.get_transport()
 
1473
        if not transport.listable():
 
1474
            self.assertRaises(TransportNotPossible,
 
1475
                              transport.iter_files_recursive)
 
1476
            return
 
1477
        if transport.is_readonly():
 
1478
            return
 
1479
        self.build_tree(['from/',
 
1480
                         'from/dir/',
 
1481
                         'from/dir/foo',
 
1482
                         'from/dir/bar',
 
1483
                         'from/dir/b%25z', # make sure quoting is correct
 
1484
                         'from/bar'],
 
1485
                        transport=transport)
 
1486
        transport.copy_tree('from', 'to')
 
1487
        paths = set(transport.iter_files_recursive())
 
1488
        self.assertEqual(paths,
 
1489
                    set(['from/dir/foo',
 
1490
                         'from/dir/bar',
 
1491
                         'from/dir/b%2525z',
 
1492
                         'from/bar',
 
1493
                         'to/dir/foo',
 
1494
                         'to/dir/bar',
 
1495
                         'to/dir/b%2525z',
 
1496
                         'to/bar',]))
 
1497
 
 
1498
    def test_copy_tree_to_transport(self):
 
1499
        transport = self.get_transport()
 
1500
        if not transport.listable():
 
1501
            self.assertRaises(TransportNotPossible,
 
1502
                              transport.iter_files_recursive)
 
1503
            return
 
1504
        if transport.is_readonly():
 
1505
            return
 
1506
        self.build_tree(['from/',
 
1507
                         'from/dir/',
 
1508
                         'from/dir/foo',
 
1509
                         'from/dir/bar',
 
1510
                         'from/dir/b%25z', # make sure quoting is correct
 
1511
                         'from/bar'],
 
1512
                        transport=transport)
 
1513
        from_transport = transport.clone('from')
 
1514
        to_transport = transport.clone('to')
 
1515
        to_transport.ensure_base()
 
1516
        from_transport.copy_tree_to_transport(to_transport)
 
1517
        paths = set(transport.iter_files_recursive())
 
1518
        self.assertEqual(paths,
 
1519
                    set(['from/dir/foo',
 
1520
                         'from/dir/bar',
 
1521
                         'from/dir/b%2525z',
 
1522
                         'from/bar',
 
1523
                         'to/dir/foo',
 
1524
                         'to/dir/bar',
 
1525
                         'to/dir/b%2525z',
 
1526
                         'to/bar',]))
 
1527
 
 
1528
    def test_unicode_paths(self):
 
1529
        """Test that we can read/write files with Unicode names."""
 
1530
        t = self.get_transport()
 
1531
 
 
1532
        # With FAT32 and certain encodings on win32
 
1533
        # '\xe5' and '\xe4' actually map to the same file
 
1534
        # adding a suffix kicks in the 'preserving but insensitive'
 
1535
        # route, and maintains the right files
 
1536
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1537
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1538
                 u'\u017d', # Z with umlat iso-8859-2
 
1539
                 u'\u062c', # Arabic j
 
1540
                 u'\u0410', # Russian A
 
1541
                 u'\u65e5', # Kanji person
 
1542
                ]
 
1543
 
 
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1545
        if no_unicode_support:
 
1546
            self.knownFailure("test server cannot handle unicode paths")
 
1547
 
 
1548
        try:
 
1549
            self.build_tree(files, transport=t, line_endings='binary')
 
1550
        except UnicodeError:
 
1551
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1552
 
 
1553
        # A plain unicode string is not a valid url
 
1554
        for fname in files:
 
1555
            self.assertRaises(InvalidURL, t.get, fname)
 
1556
 
 
1557
        for fname in files:
 
1558
            fname_utf8 = fname.encode('utf-8')
 
1559
            contents = 'contents of %s\n' % (fname_utf8,)
 
1560
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1561
 
 
1562
    def test_connect_twice_is_same_content(self):
 
1563
        # check that our server (whatever it is) is accessible reliably
 
1564
        # via get_transport and multiple connections share content.
 
1565
        transport = self.get_transport()
 
1566
        if transport.is_readonly():
 
1567
            return
 
1568
        transport.put_bytes('foo', 'bar')
 
1569
        transport3 = self.get_transport()
 
1570
        self.check_transport_contents('bar', transport3, 'foo')
 
1571
 
 
1572
        # now opening at a relative url should give use a sane result:
 
1573
        transport.mkdir('newdir')
 
1574
        transport5 = self.get_transport('newdir')
 
1575
        transport6 = transport5.clone('..')
 
1576
        self.check_transport_contents('bar', transport6, 'foo')
 
1577
 
 
1578
    def test_lock_write(self):
 
1579
        """Test transport-level write locks.
 
1580
 
 
1581
        These are deprecated and transports may decline to support them.
 
1582
        """
 
1583
        transport = self.get_transport()
 
1584
        if transport.is_readonly():
 
1585
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1586
            return
 
1587
        transport.put_bytes('lock', '')
 
1588
        try:
 
1589
            lock = transport.lock_write('lock')
 
1590
        except TransportNotPossible:
 
1591
            return
 
1592
        # TODO make this consistent on all platforms:
 
1593
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1594
        lock.unlock()
 
1595
 
 
1596
    def test_lock_read(self):
 
1597
        """Test transport-level read locks.
 
1598
 
 
1599
        These are deprecated and transports may decline to support them.
 
1600
        """
 
1601
        transport = self.get_transport()
 
1602
        if transport.is_readonly():
 
1603
            file('lock', 'w').close()
 
1604
        else:
 
1605
            transport.put_bytes('lock', '')
 
1606
        try:
 
1607
            lock = transport.lock_read('lock')
 
1608
        except TransportNotPossible:
 
1609
            return
 
1610
        # TODO make this consistent on all platforms:
 
1611
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1612
        lock.unlock()
 
1613
 
 
1614
    def test_readv(self):
 
1615
        transport = self.get_transport()
 
1616
        if transport.is_readonly():
 
1617
            with file('a', 'w') as f: f.write('0123456789')
 
1618
        else:
 
1619
            transport.put_bytes('a', '0123456789')
 
1620
 
 
1621
        d = list(transport.readv('a', ((0, 1),)))
 
1622
        self.assertEqual(d[0], (0, '0'))
 
1623
 
 
1624
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1625
        self.assertEqual(d[0], (0, '0'))
 
1626
        self.assertEqual(d[1], (1, '1'))
 
1627
        self.assertEqual(d[2], (3, '34'))
 
1628
        self.assertEqual(d[3], (9, '9'))
 
1629
 
 
1630
    def test_readv_out_of_order(self):
 
1631
        transport = self.get_transport()
 
1632
        if transport.is_readonly():
 
1633
            with file('a', 'w') as f: f.write('0123456789')
 
1634
        else:
 
1635
            transport.put_bytes('a', '01234567890')
 
1636
 
 
1637
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1638
        self.assertEqual(d[0], (1, '1'))
 
1639
        self.assertEqual(d[1], (9, '9'))
 
1640
        self.assertEqual(d[2], (0, '0'))
 
1641
        self.assertEqual(d[3], (3, '34'))
 
1642
 
 
1643
    def test_readv_with_adjust_for_latency(self):
 
1644
        transport = self.get_transport()
 
1645
        # the adjust for latency flag expands the data region returned
 
1646
        # according to a per-transport heuristic, so testing is a little
 
1647
        # tricky as we need more data than the largest combining that our
 
1648
        # transports do. To accomodate this we generate random data and cross
 
1649
        # reference the returned data with the random data. To avoid doing
 
1650
        # multiple large random byte look ups we do several tests on the same
 
1651
        # backing data.
 
1652
        content = osutils.rand_bytes(200*1024)
 
1653
        content_size = len(content)
 
1654
        if transport.is_readonly():
 
1655
            self.build_tree_contents([('a', content)])
 
1656
        else:
 
1657
            transport.put_bytes('a', content)
 
1658
        def check_result_data(result_vector):
 
1659
            for item in result_vector:
 
1660
                data_len = len(item[1])
 
1661
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1662
 
 
1663
        # start corner case
 
1664
        result = list(transport.readv('a', ((0, 30),),
 
1665
            adjust_for_latency=True, upper_limit=content_size))
 
1666
        # we expect 1 result, from 0, to something > 30
 
1667
        self.assertEqual(1, len(result))
 
1668
        self.assertEqual(0, result[0][0])
 
1669
        self.assertTrue(len(result[0][1]) >= 30)
 
1670
        check_result_data(result)
 
1671
        # end of file corner case
 
1672
        result = list(transport.readv('a', ((204700, 100),),
 
1673
            adjust_for_latency=True, upper_limit=content_size))
 
1674
        # we expect 1 result, from 204800- its length, to the end
 
1675
        self.assertEqual(1, len(result))
 
1676
        data_len = len(result[0][1])
 
1677
        self.assertEqual(204800-data_len, result[0][0])
 
1678
        self.assertTrue(data_len >= 100)
 
1679
        check_result_data(result)
 
1680
        # out of order ranges are made in order
 
1681
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1682
            adjust_for_latency=True, upper_limit=content_size))
 
1683
        # we expect 2 results, in order, start and end.
 
1684
        self.assertEqual(2, len(result))
 
1685
        # start
 
1686
        data_len = len(result[0][1])
 
1687
        self.assertEqual(0, result[0][0])
 
1688
        self.assertTrue(data_len >= 30)
 
1689
        # end
 
1690
        data_len = len(result[1][1])
 
1691
        self.assertEqual(204800-data_len, result[1][0])
 
1692
        self.assertTrue(data_len >= 100)
 
1693
        check_result_data(result)
 
1694
        # close ranges get combined (even if out of order)
 
1695
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1696
            result = list(transport.readv('a', request_vector,
 
1697
                adjust_for_latency=True, upper_limit=content_size))
 
1698
            self.assertEqual(1, len(result))
 
1699
            data_len = len(result[0][1])
 
1700
            # minimum length is from 400 to 1034 - 634
 
1701
            self.assertTrue(data_len >= 634)
 
1702
            # must contain the region 400 to 1034
 
1703
            self.assertTrue(result[0][0] <= 400)
 
1704
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1705
            check_result_data(result)
 
1706
 
 
1707
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1708
        transport = self.get_transport()
 
1709
        # test from observed failure case.
 
1710
        if transport.is_readonly():
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1712
        else:
 
1713
            transport.put_bytes('a', 'a'*1024*1024)
 
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1715
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1716
            (465373, 800), (947422, 800)]
 
1717
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1718
        found_items = [False]*9
 
1719
        for pos, (start, length) in enumerate(broken_vector):
 
1720
            # check the range is covered by the result
 
1721
            for offset, data in results:
 
1722
                if offset <= start and start + length <= offset + len(data):
 
1723
                    found_items[pos] = True
 
1724
        self.assertEqual([True]*9, found_items)
 
1725
 
 
1726
    def test_get_with_open_write_stream_sees_all_content(self):
 
1727
        t = self.get_transport()
 
1728
        if t.is_readonly():
 
1729
            return
 
1730
        handle = t.open_write_stream('foo')
 
1731
        try:
 
1732
            handle.write('bcd')
 
1733
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1734
        finally:
 
1735
            handle.close()
 
1736
 
 
1737
    def test_get_smart_medium(self):
 
1738
        """All transports must either give a smart medium, or know they can't.
 
1739
        """
 
1740
        transport = self.get_transport()
 
1741
        try:
 
1742
            client_medium = transport.get_smart_medium()
 
1743
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1744
        except errors.NoSmartMedium:
 
1745
            # as long as we got it we're fine
 
1746
            pass
 
1747
 
 
1748
    def test_readv_short_read(self):
 
1749
        transport = self.get_transport()
 
1750
        if transport.is_readonly():
 
1751
            with file('a', 'w') as f: f.write('0123456789')
 
1752
        else:
 
1753
            transport.put_bytes('a', '01234567890')
 
1754
 
 
1755
        # This is intentionally reading off the end of the file
 
1756
        # since we are sure that it cannot get there
 
1757
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1758
                               # Can be raised by paramiko
 
1759
                               AssertionError),
 
1760
                              transport.readv, 'a', [(1,1), (8,10)])
 
1761
 
 
1762
        # This is trying to seek past the end of the file, it should
 
1763
        # also raise a special error
 
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1765
                              transport.readv, 'a', [(12,2)])
 
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEqual({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEqual({}, transport.get_segment_parameters())
 
1792
        self.assertEqual(orig_base, transport.base)
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)