~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Aaron Bentley
  • Date: 2005-07-26 14:06:11 UTC
  • mto: (1092.1.41) (1185.3.4) (974.1.47)
  • mto: This revision was merged to the branch mainline in revision 982.
  • Revision ID: abentley@panoramicfeedback.com-20050726140611-403e366f3c79c1f1
Fixed python invocation

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for Transport implementations.
18
 
 
19
 
Transport implementations tested here are supplied by
20
 
TransportTestProviderAdapter.
21
 
"""
22
 
 
23
 
import itertools
24
 
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
 
import stat
28
 
import sys
29
 
 
30
 
from bzrlib import (
31
 
    errors,
32
 
    osutils,
33
 
    pyutils,
34
 
    tests,
35
 
    transport as _mod_transport,
36
 
    urlutils,
37
 
    )
38
 
from bzrlib.errors import (ConnectionError,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           NoSuchFile,
42
 
                           PathError,
43
 
                           TransportNotPossible,
44
 
                           )
45
 
from bzrlib.osutils import getcwd
46
 
from bzrlib.smart import medium
47
 
from bzrlib.tests import (
48
 
    TestSkipped,
49
 
    TestNotApplicable,
50
 
    multiply_tests,
51
 
    )
52
 
from bzrlib.tests import test_server
53
 
from bzrlib.tests.test_transport import TestTransportImplementation
54
 
from bzrlib.transport import (
55
 
    ConnectedTransport,
56
 
    Transport,
57
 
    _get_transport_modules,
58
 
    )
59
 
from bzrlib.transport.memory import MemoryTransport
60
 
from bzrlib.transport.remote import RemoteTransport
61
 
 
62
 
 
63
 
def get_transport_test_permutations(module):
64
 
    """Get the permutations module wants to have tested."""
65
 
    if getattr(module, 'get_test_permutations', None) is None:
66
 
        raise AssertionError(
67
 
            "transport module %s doesn't provide get_test_permutations()"
68
 
            % module.__name__)
69
 
        return []
70
 
    return module.get_test_permutations()
71
 
 
72
 
 
73
 
def transport_test_permutations():
74
 
    """Return a list of the klass, server_factory pairs to test."""
75
 
    result = []
76
 
    for module in _get_transport_modules():
77
 
        try:
78
 
            permutations = get_transport_test_permutations(
79
 
                pyutils.get_named_object(module))
80
 
            for (klass, server_factory) in permutations:
81
 
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
 
                    {"transport_class":klass,
83
 
                     "transport_server":server_factory})
84
 
                result.append(scenario)
85
 
        except errors.DependencyNotPresent, e:
86
 
            # Continue even if a dependency prevents us
87
 
            # from adding this test
88
 
            pass
89
 
    return result
90
 
 
91
 
 
92
 
def load_tests(standard_tests, module, loader):
93
 
    """Multiply tests for tranport implementations."""
94
 
    result = loader.suiteClass()
95
 
    scenarios = transport_test_permutations()
96
 
    return multiply_tests(standard_tests, scenarios, result)
97
 
 
98
 
 
99
 
class TransportTests(TestTransportImplementation):
100
 
 
101
 
    def setUp(self):
102
 
        super(TransportTests, self).setUp()
103
 
        self.overrideEnv('BZR_NO_SMART_VFS', None)
104
 
 
105
 
    def check_transport_contents(self, content, transport, relpath):
106
 
        """Check that transport.get_bytes(relpath) == content."""
107
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
108
 
 
109
 
    def test_ensure_base_missing(self):
110
 
        """.ensure_base() should create the directory if it doesn't exist"""
111
 
        t = self.get_transport()
112
 
        t_a = t.clone('a')
113
 
        if t_a.is_readonly():
114
 
            self.assertRaises(TransportNotPossible,
115
 
                              t_a.ensure_base)
116
 
            return
117
 
        self.assertTrue(t_a.ensure_base())
118
 
        self.assertTrue(t.has('a'))
119
 
 
120
 
    def test_ensure_base_exists(self):
121
 
        """.ensure_base() should just be happy if it already exists"""
122
 
        t = self.get_transport()
123
 
        if t.is_readonly():
124
 
            return
125
 
 
126
 
        t.mkdir('a')
127
 
        t_a = t.clone('a')
128
 
        # ensure_base returns False if it didn't create the base
129
 
        self.assertFalse(t_a.ensure_base())
130
 
 
131
 
    def test_ensure_base_missing_parent(self):
132
 
        """.ensure_base() will fail if the parent dir doesn't exist"""
133
 
        t = self.get_transport()
134
 
        if t.is_readonly():
135
 
            return
136
 
 
137
 
        t_a = t.clone('a')
138
 
        t_b = t_a.clone('b')
139
 
        self.assertRaises(NoSuchFile, t_b.ensure_base)
140
 
 
141
 
    def test_external_url(self):
142
 
        """.external_url either works or raises InProcessTransport."""
143
 
        t = self.get_transport()
144
 
        try:
145
 
            t.external_url()
146
 
        except errors.InProcessTransport:
147
 
            pass
148
 
 
149
 
    def test_has(self):
150
 
        t = self.get_transport()
151
 
 
152
 
        files = ['a', 'b', 'e', 'g', '%']
153
 
        self.build_tree(files, transport=t)
154
 
        self.assertEqual(True, t.has('a'))
155
 
        self.assertEqual(False, t.has('c'))
156
 
        self.assertEqual(True, t.has(urlutils.escape('%')))
157
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
 
                                           'e', 'f', 'g', 'h'])),
159
 
                         [True, True, False, False,
160
 
                          True, False, True, False])
161
 
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
 
                                           urlutils.escape('%%')]))
164
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
 
                                                'e', 'f', 'g', 'h']))),
166
 
                         [True, True, False, False,
167
 
                          True, False, True, False])
168
 
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
 
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
170
 
 
171
 
    def test_has_root_works(self):
172
 
        if self.transport_server is test_server.SmartTCPServer_for_testing:
173
 
            raise TestNotApplicable(
174
 
                "SmartTCPServer_for_testing intentionally does not allow "
175
 
                "access to /.")
176
 
        current_transport = self.get_transport()
177
 
        self.assertTrue(current_transport.has('/'))
178
 
        root = current_transport.clone('/')
179
 
        self.assertTrue(root.has(''))
180
 
 
181
 
    def test_get(self):
182
 
        t = self.get_transport()
183
 
 
184
 
        files = ['a', 'b', 'e', 'g']
185
 
        contents = ['contents of a\n',
186
 
                    'contents of b\n',
187
 
                    'contents of e\n',
188
 
                    'contents of g\n',
189
 
                    ]
190
 
        self.build_tree(files, transport=t, line_endings='binary')
191
 
        self.check_transport_contents('contents of a\n', t, 'a')
192
 
        content_f = t.get_multi(files)
193
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
194
 
        # evaluate their inputs, the transport requests should be issued and
195
 
        # handled sequentially (we don't want to force transport to buffer).
196
 
        for content, f in itertools.izip(contents, content_f):
197
 
            self.assertEqual(content, f.read())
198
 
 
199
 
        content_f = t.get_multi(iter(files))
200
 
        # Use itertools.izip() for the same reason
201
 
        for content, f in itertools.izip(contents, content_f):
202
 
            self.assertEqual(content, f.read())
203
 
 
204
 
    def test_get_unknown_file(self):
205
 
        t = self.get_transport()
206
 
        files = ['a', 'b']
207
 
        contents = ['contents of a\n',
208
 
                    'contents of b\n',
209
 
                    ]
210
 
        self.build_tree(files, transport=t, line_endings='binary')
211
 
        self.assertRaises(NoSuchFile, t.get, 'c')
212
 
        def iterate_and_close(func, *args):
213
 
            for f in func(*args):
214
 
                # We call f.read() here because things like paramiko actually
215
 
                # spawn a thread to prefetch the content, which we want to
216
 
                # consume before we close the handle.
217
 
                content = f.read()
218
 
                f.close()
219
 
        self.assertRaises(NoSuchFile, iterate_and_close,
220
 
                          t.get_multi, ['a', 'b', 'c'])
221
 
        self.assertRaises(NoSuchFile, iterate_and_close,
222
 
                          t.get_multi, iter(['a', 'b', 'c']))
223
 
 
224
 
    def test_get_directory_read_gives_ReadError(self):
225
 
        """consistent errors for read() on a file returned by get()."""
226
 
        t = self.get_transport()
227
 
        if t.is_readonly():
228
 
            self.build_tree(['a directory/'])
229
 
        else:
230
 
            t.mkdir('a%20directory')
231
 
        # getting the file must either work or fail with a PathError
232
 
        try:
233
 
            a_file = t.get('a%20directory')
234
 
        except (errors.PathError, errors.RedirectRequested):
235
 
            # early failure return immediately.
236
 
            return
237
 
        # having got a file, read() must either work (i.e. http reading a dir
238
 
        # listing) or fail with ReadError
239
 
        try:
240
 
            a_file.read()
241
 
        except errors.ReadError:
242
 
            pass
243
 
 
244
 
    def test_get_bytes(self):
245
 
        t = self.get_transport()
246
 
 
247
 
        files = ['a', 'b', 'e', 'g']
248
 
        contents = ['contents of a\n',
249
 
                    'contents of b\n',
250
 
                    'contents of e\n',
251
 
                    'contents of g\n',
252
 
                    ]
253
 
        self.build_tree(files, transport=t, line_endings='binary')
254
 
        self.check_transport_contents('contents of a\n', t, 'a')
255
 
 
256
 
        for content, fname in zip(contents, files):
257
 
            self.assertEqual(content, t.get_bytes(fname))
258
 
 
259
 
    def test_get_bytes_unknown_file(self):
260
 
        t = self.get_transport()
261
 
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
262
 
 
263
 
    def test_get_with_open_write_stream_sees_all_content(self):
264
 
        t = self.get_transport()
265
 
        if t.is_readonly():
266
 
            return
267
 
        handle = t.open_write_stream('foo')
268
 
        try:
269
 
            handle.write('b')
270
 
            self.assertEqual('b', t.get_bytes('foo'))
271
 
        finally:
272
 
            handle.close()
273
 
 
274
 
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
 
        t = self.get_transport()
276
 
        if t.is_readonly():
277
 
            return
278
 
        handle = t.open_write_stream('foo')
279
 
        try:
280
 
            handle.write('b')
281
 
            self.assertEqual('b', t.get_bytes('foo'))
282
 
            f = t.get('foo')
283
 
            try:
284
 
                self.assertEqual('b', f.read())
285
 
            finally:
286
 
                f.close()
287
 
        finally:
288
 
            handle.close()
289
 
 
290
 
    def test_put_bytes(self):
291
 
        t = self.get_transport()
292
 
 
293
 
        if t.is_readonly():
294
 
            self.assertRaises(TransportNotPossible,
295
 
                    t.put_bytes, 'a', 'some text for a\n')
296
 
            return
297
 
 
298
 
        t.put_bytes('a', 'some text for a\n')
299
 
        self.assertTrue(t.has('a'))
300
 
        self.check_transport_contents('some text for a\n', t, 'a')
301
 
 
302
 
        # The contents should be overwritten
303
 
        t.put_bytes('a', 'new text for a\n')
304
 
        self.check_transport_contents('new text for a\n', t, 'a')
305
 
 
306
 
        self.assertRaises(NoSuchFile,
307
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
308
 
 
309
 
    def test_put_bytes_non_atomic(self):
310
 
        t = self.get_transport()
311
 
 
312
 
        if t.is_readonly():
313
 
            self.assertRaises(TransportNotPossible,
314
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
315
 
            return
316
 
 
317
 
        self.assertFalse(t.has('a'))
318
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
319
 
        self.assertTrue(t.has('a'))
320
 
        self.check_transport_contents('some text for a\n', t, 'a')
321
 
        # Put also replaces contents
322
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
324
 
 
325
 
        # Make sure we can create another file
326
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
327
 
        # And overwrite 'a' with empty contents
328
 
        t.put_bytes_non_atomic('a', '')
329
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
330
 
        self.check_transport_contents('', t, 'a')
331
 
 
332
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
333
 
                                       'contents\n')
334
 
        # Now test the create_parent flag
335
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
336
 
                                       'contents\n')
337
 
        self.assertFalse(t.has('dir/a'))
338
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
339
 
                               create_parent_dir=True)
340
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
341
 
 
342
 
        # But we still get NoSuchFile if we can't make the parent dir
343
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
344
 
                                       'contents\n',
345
 
                                       create_parent_dir=True)
346
 
 
347
 
    def test_put_bytes_permissions(self):
348
 
        t = self.get_transport()
349
 
 
350
 
        if t.is_readonly():
351
 
            return
352
 
        if not t._can_roundtrip_unix_modebits():
353
 
            # Can't roundtrip, so no need to run this test
354
 
            return
355
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
356
 
        self.assertTransportMode(t, 'mode644', 0644)
357
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
358
 
        self.assertTransportMode(t, 'mode666', 0666)
359
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
360
 
        self.assertTransportMode(t, 'mode600', 0600)
361
 
        # Yes, you can put_bytes a file such that it becomes readonly
362
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
363
 
        self.assertTransportMode(t, 'mode400', 0400)
364
 
 
365
 
        # The default permissions should be based on the current umask
366
 
        umask = osutils.get_umask()
367
 
        t.put_bytes('nomode', 'test text\n', mode=None)
368
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
369
 
 
370
 
    def test_put_bytes_non_atomic_permissions(self):
371
 
        t = self.get_transport()
372
 
 
373
 
        if t.is_readonly():
374
 
            return
375
 
        if not t._can_roundtrip_unix_modebits():
376
 
            # Can't roundtrip, so no need to run this test
377
 
            return
378
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
379
 
        self.assertTransportMode(t, 'mode644', 0644)
380
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
381
 
        self.assertTransportMode(t, 'mode666', 0666)
382
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
383
 
        self.assertTransportMode(t, 'mode600', 0600)
384
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
385
 
        self.assertTransportMode(t, 'mode400', 0400)
386
 
 
387
 
        # The default permissions should be based on the current umask
388
 
        umask = osutils.get_umask()
389
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
390
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
391
 
 
392
 
        # We should also be able to set the mode for a parent directory
393
 
        # when it is created
394
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
395
 
                               dir_mode=0700, create_parent_dir=True)
396
 
        self.assertTransportMode(t, 'dir700', 0700)
397
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
398
 
                               dir_mode=0770, create_parent_dir=True)
399
 
        self.assertTransportMode(t, 'dir770', 0770)
400
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
401
 
                               dir_mode=0777, create_parent_dir=True)
402
 
        self.assertTransportMode(t, 'dir777', 0777)
403
 
 
404
 
    def test_put_file(self):
405
 
        t = self.get_transport()
406
 
 
407
 
        if t.is_readonly():
408
 
            self.assertRaises(TransportNotPossible,
409
 
                    t.put_file, 'a', StringIO('some text for a\n'))
410
 
            return
411
 
 
412
 
        result = t.put_file('a', StringIO('some text for a\n'))
413
 
        # put_file returns the length of the data written
414
 
        self.assertEqual(16, result)
415
 
        self.assertTrue(t.has('a'))
416
 
        self.check_transport_contents('some text for a\n', t, 'a')
417
 
        # Put also replaces contents
418
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
 
        self.assertEqual(19, result)
420
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
421
 
        self.assertRaises(NoSuchFile,
422
 
                          t.put_file, 'path/doesnt/exist/c',
423
 
                              StringIO('contents'))
424
 
 
425
 
    def test_put_file_non_atomic(self):
426
 
        t = self.get_transport()
427
 
 
428
 
        if t.is_readonly():
429
 
            self.assertRaises(TransportNotPossible,
430
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
431
 
            return
432
 
 
433
 
        self.assertFalse(t.has('a'))
434
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
435
 
        self.assertTrue(t.has('a'))
436
 
        self.check_transport_contents('some text for a\n', t, 'a')
437
 
        # Put also replaces contents
438
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
440
 
 
441
 
        # Make sure we can create another file
442
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
443
 
        # And overwrite 'a' with empty contents
444
 
        t.put_file_non_atomic('a', StringIO(''))
445
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
446
 
        self.check_transport_contents('', t, 'a')
447
 
 
448
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
 
                                       StringIO('contents\n'))
450
 
        # Now test the create_parent flag
451
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
 
                                       StringIO('contents\n'))
453
 
        self.assertFalse(t.has('dir/a'))
454
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
455
 
                              create_parent_dir=True)
456
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
457
 
 
458
 
        # But we still get NoSuchFile if we can't make the parent dir
459
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
 
                                       StringIO('contents\n'),
461
 
                                       create_parent_dir=True)
462
 
 
463
 
    def test_put_file_permissions(self):
464
 
 
465
 
        t = self.get_transport()
466
 
 
467
 
        if t.is_readonly():
468
 
            return
469
 
        if not t._can_roundtrip_unix_modebits():
470
 
            # Can't roundtrip, so no need to run this test
471
 
            return
472
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
473
 
        self.assertTransportMode(t, 'mode644', 0644)
474
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
475
 
        self.assertTransportMode(t, 'mode666', 0666)
476
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
477
 
        self.assertTransportMode(t, 'mode600', 0600)
478
 
        # Yes, you can put a file such that it becomes readonly
479
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
480
 
        self.assertTransportMode(t, 'mode400', 0400)
481
 
        # The default permissions should be based on the current umask
482
 
        umask = osutils.get_umask()
483
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
484
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
485
 
 
486
 
    def test_put_file_non_atomic_permissions(self):
487
 
        t = self.get_transport()
488
 
 
489
 
        if t.is_readonly():
490
 
            return
491
 
        if not t._can_roundtrip_unix_modebits():
492
 
            # Can't roundtrip, so no need to run this test
493
 
            return
494
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
495
 
        self.assertTransportMode(t, 'mode644', 0644)
496
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
497
 
        self.assertTransportMode(t, 'mode666', 0666)
498
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
499
 
        self.assertTransportMode(t, 'mode600', 0600)
500
 
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
501
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
502
 
        self.assertTransportMode(t, 'mode400', 0400)
503
 
 
504
 
        # The default permissions should be based on the current umask
505
 
        umask = osutils.get_umask()
506
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
507
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
508
 
 
509
 
        # We should also be able to set the mode for a parent directory
510
 
        # when it is created
511
 
        sio = StringIO()
512
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
513
 
                              dir_mode=0700, create_parent_dir=True)
514
 
        self.assertTransportMode(t, 'dir700', 0700)
515
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
516
 
                              dir_mode=0770, create_parent_dir=True)
517
 
        self.assertTransportMode(t, 'dir770', 0770)
518
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
519
 
                              dir_mode=0777, create_parent_dir=True)
520
 
        self.assertTransportMode(t, 'dir777', 0777)
521
 
 
522
 
    def test_put_bytes_unicode(self):
523
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
524
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
525
 
        # (we don't want to encode unicode here at all, callers should be
526
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
527
 
        # compatibility.  At some point we should use a specific exception.
528
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
529
 
        t = self.get_transport()
530
 
        if t.is_readonly():
531
 
            return
532
 
        unicode_string = u'\u1234'
533
 
        self.assertRaises(
534
 
            (AssertionError, UnicodeEncodeError),
535
 
            t.put_bytes, 'foo', unicode_string)
536
 
 
537
 
    def test_put_file_unicode(self):
538
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
539
 
        # This situation can happen (and has) if code is careless about the type
540
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
541
 
        # cStringIO, because it never returns unicode from read.
542
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
543
 
        # raise, but we raise it for hysterical raisins.
544
 
        t = self.get_transport()
545
 
        if t.is_readonly():
546
 
            return
547
 
        unicode_file = pyStringIO(u'\u1234')
548
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
549
 
 
550
 
    def test_mkdir(self):
551
 
        t = self.get_transport()
552
 
 
553
 
        if t.is_readonly():
554
 
            # cannot mkdir on readonly transports. We're not testing for
555
 
            # cache coherency because cache behaviour is not currently
556
 
            # defined for the transport interface.
557
 
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
558
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
559
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
560
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
561
 
            return
562
 
        # Test mkdir
563
 
        t.mkdir('dir_a')
564
 
        self.assertEqual(t.has('dir_a'), True)
565
 
        self.assertEqual(t.has('dir_b'), False)
566
 
 
567
 
        t.mkdir('dir_b')
568
 
        self.assertEqual(t.has('dir_b'), True)
569
 
 
570
 
        t.mkdir_multi(['dir_c', 'dir_d'])
571
 
 
572
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
573
 
        self.assertEqual(list(t.has_multi(
574
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
575
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
576
 
            [True, True, True, False,
577
 
             True, True, True, True])
578
 
 
579
 
        # we were testing that a local mkdir followed by a transport
580
 
        # mkdir failed thusly, but given that we * in one process * do not
581
 
        # concurrently fiddle with disk dirs and then use transport to do
582
 
        # things, the win here seems marginal compared to the constraint on
583
 
        # the interface. RBC 20051227
584
 
        t.mkdir('dir_g')
585
 
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
586
 
 
587
 
        # Test get/put in sub-directories
588
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
589
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
590
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
591
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
592
 
 
593
 
        # mkdir of a dir with an absent parent
594
 
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
595
 
 
596
 
    def test_mkdir_permissions(self):
597
 
        t = self.get_transport()
598
 
        if t.is_readonly():
599
 
            return
600
 
        if not t._can_roundtrip_unix_modebits():
601
 
            # no sense testing on this transport
602
 
            return
603
 
        # Test mkdir with a mode
604
 
        t.mkdir('dmode755', mode=0755)
605
 
        self.assertTransportMode(t, 'dmode755', 0755)
606
 
        t.mkdir('dmode555', mode=0555)
607
 
        self.assertTransportMode(t, 'dmode555', 0555)
608
 
        t.mkdir('dmode777', mode=0777)
609
 
        self.assertTransportMode(t, 'dmode777', 0777)
610
 
        t.mkdir('dmode700', mode=0700)
611
 
        self.assertTransportMode(t, 'dmode700', 0700)
612
 
        t.mkdir_multi(['mdmode755'], mode=0755)
613
 
        self.assertTransportMode(t, 'mdmode755', 0755)
614
 
 
615
 
        # Default mode should be based on umask
616
 
        umask = osutils.get_umask()
617
 
        t.mkdir('dnomode', mode=None)
618
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
619
 
 
620
 
    def test_opening_a_file_stream_creates_file(self):
621
 
        t = self.get_transport()
622
 
        if t.is_readonly():
623
 
            return
624
 
        handle = t.open_write_stream('foo')
625
 
        try:
626
 
            self.assertEqual('', t.get_bytes('foo'))
627
 
        finally:
628
 
            handle.close()
629
 
 
630
 
    def test_opening_a_file_stream_can_set_mode(self):
631
 
        t = self.get_transport()
632
 
        if t.is_readonly():
633
 
            return
634
 
        if not t._can_roundtrip_unix_modebits():
635
 
            # Can't roundtrip, so no need to run this test
636
 
            return
637
 
        def check_mode(name, mode, expected):
638
 
            handle = t.open_write_stream(name, mode=mode)
639
 
            handle.close()
640
 
            self.assertTransportMode(t, name, expected)
641
 
        check_mode('mode644', 0644, 0644)
642
 
        check_mode('mode666', 0666, 0666)
643
 
        check_mode('mode600', 0600, 0600)
644
 
        # The default permissions should be based on the current umask
645
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
646
 
 
647
 
    def test_copy_to(self):
648
 
        # FIXME: test:   same server to same server (partly done)
649
 
        # same protocol two servers
650
 
        # and    different protocols (done for now except for MemoryTransport.
651
 
        # - RBC 20060122
652
 
 
653
 
        def simple_copy_files(transport_from, transport_to):
654
 
            files = ['a', 'b', 'c', 'd']
655
 
            self.build_tree(files, transport=transport_from)
656
 
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
657
 
            for f in files:
658
 
                self.check_transport_contents(transport_to.get_bytes(f),
659
 
                                              transport_from, f)
660
 
 
661
 
        t = self.get_transport()
662
 
        temp_transport = MemoryTransport('memory:///')
663
 
        simple_copy_files(t, temp_transport)
664
 
        if not t.is_readonly():
665
 
            t.mkdir('copy_to_simple')
666
 
            t2 = t.clone('copy_to_simple')
667
 
            simple_copy_files(t, t2)
668
 
 
669
 
 
670
 
        # Test that copying into a missing directory raises
671
 
        # NoSuchFile
672
 
        if t.is_readonly():
673
 
            self.build_tree(['e/', 'e/f'])
674
 
        else:
675
 
            t.mkdir('e')
676
 
            t.put_bytes('e/f', 'contents of e')
677
 
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
678
 
        temp_transport.mkdir('e')
679
 
        t.copy_to(['e/f'], temp_transport)
680
 
 
681
 
        del temp_transport
682
 
        temp_transport = MemoryTransport('memory:///')
683
 
 
684
 
        files = ['a', 'b', 'c', 'd']
685
 
        t.copy_to(iter(files), temp_transport)
686
 
        for f in files:
687
 
            self.check_transport_contents(temp_transport.get_bytes(f),
688
 
                                          t, f)
689
 
        del temp_transport
690
 
 
691
 
        for mode in (0666, 0644, 0600, 0400):
692
 
            temp_transport = MemoryTransport("memory:///")
693
 
            t.copy_to(files, temp_transport, mode=mode)
694
 
            for f in files:
695
 
                self.assertTransportMode(temp_transport, f, mode)
696
 
 
697
 
    def test_create_prefix(self):
698
 
        t = self.get_transport()
699
 
        sub = t.clone('foo').clone('bar')
700
 
        try:
701
 
            sub.create_prefix()
702
 
        except TransportNotPossible:
703
 
            self.assertTrue(t.is_readonly())
704
 
        else:
705
 
            self.assertTrue(t.has('foo/bar'))
706
 
 
707
 
    def test_append_file(self):
708
 
        t = self.get_transport()
709
 
 
710
 
        if t.is_readonly():
711
 
            self.assertRaises(TransportNotPossible,
712
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
713
 
            return
714
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
715
 
        t.put_bytes('b', 'contents\nfor b\n')
716
 
 
717
 
        self.assertEqual(20,
718
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
719
 
 
720
 
        self.check_transport_contents(
721
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
722
 
            t, 'a')
723
 
 
724
 
        # a file with no parent should fail..
725
 
        self.assertRaises(NoSuchFile,
726
 
                          t.append_file, 'missing/path', StringIO('content'))
727
 
 
728
 
        # And we can create new files, too
729
 
        self.assertEqual(0,
730
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
731
 
        self.check_transport_contents('some text\nfor a missing file\n',
732
 
                                      t, 'c')
733
 
 
734
 
    def test_append_bytes(self):
735
 
        t = self.get_transport()
736
 
 
737
 
        if t.is_readonly():
738
 
            self.assertRaises(TransportNotPossible,
739
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
740
 
            return
741
 
 
742
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
743
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
744
 
 
745
 
        self.assertEqual(20,
746
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
747
 
 
748
 
        self.check_transport_contents(
749
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
750
 
            t, 'a')
751
 
 
752
 
        # a file with no parent should fail..
753
 
        self.assertRaises(NoSuchFile,
754
 
                          t.append_bytes, 'missing/path', 'content')
755
 
 
756
 
    def test_append_multi(self):
757
 
        t = self.get_transport()
758
 
 
759
 
        if t.is_readonly():
760
 
            return
761
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
762
 
                         'add\nsome\nmore\ncontents\n')
763
 
        t.put_bytes('b', 'contents\nfor b\n')
764
 
 
765
 
        self.assertEqual((43, 15),
766
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
767
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
768
 
 
769
 
        self.check_transport_contents(
770
 
            'diff\ncontents for\na\n'
771
 
            'add\nsome\nmore\ncontents\n'
772
 
            'and\nthen\nsome\nmore\n',
773
 
            t, 'a')
774
 
        self.check_transport_contents(
775
 
                'contents\nfor b\n'
776
 
                'some\nmore\nfor\nb\n',
777
 
                t, 'b')
778
 
 
779
 
        self.assertEqual((62, 31),
780
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
781
 
                                 ('b', StringIO('from an iterator\n'))])))
782
 
        self.check_transport_contents(
783
 
            'diff\ncontents for\na\n'
784
 
            'add\nsome\nmore\ncontents\n'
785
 
            'and\nthen\nsome\nmore\n'
786
 
            'a little bit more\n',
787
 
            t, 'a')
788
 
        self.check_transport_contents(
789
 
                'contents\nfor b\n'
790
 
                'some\nmore\nfor\nb\n'
791
 
                'from an iterator\n',
792
 
                t, 'b')
793
 
 
794
 
        self.assertEqual((80, 0),
795
 
            t.append_multi([('a', StringIO('some text in a\n')),
796
 
                            ('d', StringIO('missing file r\n'))]))
797
 
 
798
 
        self.check_transport_contents(
799
 
            'diff\ncontents for\na\n'
800
 
            'add\nsome\nmore\ncontents\n'
801
 
            'and\nthen\nsome\nmore\n'
802
 
            'a little bit more\n'
803
 
            'some text in a\n',
804
 
            t, 'a')
805
 
        self.check_transport_contents('missing file r\n', t, 'd')
806
 
 
807
 
    def test_append_file_mode(self):
808
 
        """Check that append accepts a mode parameter"""
809
 
        # check append accepts a mode
810
 
        t = self.get_transport()
811
 
        if t.is_readonly():
812
 
            self.assertRaises(TransportNotPossible,
813
 
                t.append_file, 'f', StringIO('f'), mode=None)
814
 
            return
815
 
        t.append_file('f', StringIO('f'), mode=None)
816
 
 
817
 
    def test_append_bytes_mode(self):
818
 
        # check append_bytes accepts a mode
819
 
        t = self.get_transport()
820
 
        if t.is_readonly():
821
 
            self.assertRaises(TransportNotPossible,
822
 
                t.append_bytes, 'f', 'f', mode=None)
823
 
            return
824
 
        t.append_bytes('f', 'f', mode=None)
825
 
 
826
 
    def test_delete(self):
827
 
        # TODO: Test Transport.delete
828
 
        t = self.get_transport()
829
 
 
830
 
        # Not much to do with a readonly transport
831
 
        if t.is_readonly():
832
 
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
833
 
            return
834
 
 
835
 
        t.put_bytes('a', 'a little bit of text\n')
836
 
        self.assertTrue(t.has('a'))
837
 
        t.delete('a')
838
 
        self.assertFalse(t.has('a'))
839
 
 
840
 
        self.assertRaises(NoSuchFile, t.delete, 'a')
841
 
 
842
 
        t.put_bytes('a', 'a text\n')
843
 
        t.put_bytes('b', 'b text\n')
844
 
        t.put_bytes('c', 'c text\n')
845
 
        self.assertEqual([True, True, True],
846
 
                list(t.has_multi(['a', 'b', 'c'])))
847
 
        t.delete_multi(['a', 'c'])
848
 
        self.assertEqual([False, True, False],
849
 
                list(t.has_multi(['a', 'b', 'c'])))
850
 
        self.assertFalse(t.has('a'))
851
 
        self.assertTrue(t.has('b'))
852
 
        self.assertFalse(t.has('c'))
853
 
 
854
 
        self.assertRaises(NoSuchFile,
855
 
                t.delete_multi, ['a', 'b', 'c'])
856
 
 
857
 
        self.assertRaises(NoSuchFile,
858
 
                t.delete_multi, iter(['a', 'b', 'c']))
859
 
 
860
 
        t.put_bytes('a', 'another a text\n')
861
 
        t.put_bytes('c', 'another c text\n')
862
 
        t.delete_multi(iter(['a', 'b', 'c']))
863
 
 
864
 
        # We should have deleted everything
865
 
        # SftpServer creates control files in the
866
 
        # working directory, so we can just do a
867
 
        # plain "listdir".
868
 
        # self.assertEqual([], os.listdir('.'))
869
 
 
870
 
    def test_recommended_page_size(self):
871
 
        """Transports recommend a page size for partial access to files."""
872
 
        t = self.get_transport()
873
 
        self.assertIsInstance(t.recommended_page_size(), int)
874
 
 
875
 
    def test_rmdir(self):
876
 
        t = self.get_transport()
877
 
        # Not much to do with a readonly transport
878
 
        if t.is_readonly():
879
 
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
880
 
            return
881
 
        t.mkdir('adir')
882
 
        t.mkdir('adir/bdir')
883
 
        t.rmdir('adir/bdir')
884
 
        # ftp may not be able to raise NoSuchFile for lack of
885
 
        # details when failing
886
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
887
 
        t.rmdir('adir')
888
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
889
 
 
890
 
    def test_rmdir_not_empty(self):
891
 
        """Deleting a non-empty directory raises an exception
892
 
 
893
 
        sftp (and possibly others) don't give us a specific "directory not
894
 
        empty" exception -- we can just see that the operation failed.
895
 
        """
896
 
        t = self.get_transport()
897
 
        if t.is_readonly():
898
 
            return
899
 
        t.mkdir('adir')
900
 
        t.mkdir('adir/bdir')
901
 
        self.assertRaises(PathError, t.rmdir, 'adir')
902
 
 
903
 
    def test_rmdir_empty_but_similar_prefix(self):
904
 
        """rmdir does not get confused by sibling paths.
905
 
 
906
 
        A naive implementation of MemoryTransport would refuse to rmdir
907
 
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
908
 
        uses "path.startswith(dir)" on all file paths to determine if directory
909
 
        is empty.
910
 
        """
911
 
        t = self.get_transport()
912
 
        if t.is_readonly():
913
 
            return
914
 
        t.mkdir('foo')
915
 
        t.put_bytes('foo-bar', '')
916
 
        t.mkdir('foo-baz')
917
 
        t.rmdir('foo')
918
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
919
 
        self.assertTrue(t.has('foo-bar'))
920
 
 
921
 
    def test_rename_dir_succeeds(self):
922
 
        t = self.get_transport()
923
 
        if t.is_readonly():
924
 
            raise TestSkipped("transport is readonly")
925
 
        t.mkdir('adir')
926
 
        t.mkdir('adir/asubdir')
927
 
        t.rename('adir', 'bdir')
928
 
        self.assertTrue(t.has('bdir/asubdir'))
929
 
        self.assertFalse(t.has('adir'))
930
 
 
931
 
    def test_rename_dir_nonempty(self):
932
 
        """Attempting to replace a nonemtpy directory should fail"""
933
 
        t = self.get_transport()
934
 
        if t.is_readonly():
935
 
            raise TestSkipped("transport is readonly")
936
 
        t.mkdir('adir')
937
 
        t.mkdir('adir/asubdir')
938
 
        t.mkdir('bdir')
939
 
        t.mkdir('bdir/bsubdir')
940
 
        # any kind of PathError would be OK, though we normally expect
941
 
        # DirectoryNotEmpty
942
 
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
943
 
        # nothing was changed so it should still be as before
944
 
        self.assertTrue(t.has('bdir/bsubdir'))
945
 
        self.assertFalse(t.has('adir/bdir'))
946
 
        self.assertFalse(t.has('adir/bsubdir'))
947
 
 
948
 
    def test_rename_across_subdirs(self):
949
 
        t = self.get_transport()
950
 
        if t.is_readonly():
951
 
            raise TestNotApplicable("transport is readonly")
952
 
        t.mkdir('a')
953
 
        t.mkdir('b')
954
 
        ta = t.clone('a')
955
 
        tb = t.clone('b')
956
 
        ta.put_bytes('f', 'aoeu')
957
 
        ta.rename('f', '../b/f')
958
 
        self.assertTrue(tb.has('f'))
959
 
        self.assertFalse(ta.has('f'))
960
 
        self.assertTrue(t.has('b/f'))
961
 
 
962
 
    def test_delete_tree(self):
963
 
        t = self.get_transport()
964
 
 
965
 
        # Not much to do with a readonly transport
966
 
        if t.is_readonly():
967
 
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
968
 
            return
969
 
 
970
 
        # and does it like listing ?
971
 
        t.mkdir('adir')
972
 
        try:
973
 
            t.delete_tree('adir')
974
 
        except TransportNotPossible:
975
 
            # ok, this transport does not support delete_tree
976
 
            return
977
 
 
978
 
        # did it delete that trivial case?
979
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
980
 
 
981
 
        self.build_tree(['adir/',
982
 
                         'adir/file',
983
 
                         'adir/subdir/',
984
 
                         'adir/subdir/file',
985
 
                         'adir/subdir2/',
986
 
                         'adir/subdir2/file',
987
 
                         ], transport=t)
988
 
 
989
 
        t.delete_tree('adir')
990
 
        # adir should be gone now.
991
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
992
 
 
993
 
    def test_move(self):
994
 
        t = self.get_transport()
995
 
 
996
 
        if t.is_readonly():
997
 
            return
998
 
 
999
 
        # TODO: I would like to use os.listdir() to
1000
 
        # make sure there are no extra files, but SftpServer
1001
 
        # creates control files in the working directory
1002
 
        # perhaps all of this could be done in a subdirectory
1003
 
 
1004
 
        t.put_bytes('a', 'a first file\n')
1005
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
1006
 
 
1007
 
        t.move('a', 'b')
1008
 
        self.assertTrue(t.has('b'))
1009
 
        self.assertFalse(t.has('a'))
1010
 
 
1011
 
        self.check_transport_contents('a first file\n', t, 'b')
1012
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1013
 
 
1014
 
        # Overwrite a file
1015
 
        t.put_bytes('c', 'c this file\n')
1016
 
        t.move('c', 'b')
1017
 
        self.assertFalse(t.has('c'))
1018
 
        self.check_transport_contents('c this file\n', t, 'b')
1019
 
 
1020
 
        # TODO: Try to write a test for atomicity
1021
 
        # TODO: Test moving into a non-existent subdirectory
1022
 
        # TODO: Test Transport.move_multi
1023
 
 
1024
 
    def test_copy(self):
1025
 
        t = self.get_transport()
1026
 
 
1027
 
        if t.is_readonly():
1028
 
            return
1029
 
 
1030
 
        t.put_bytes('a', 'a file\n')
1031
 
        t.copy('a', 'b')
1032
 
        self.check_transport_contents('a file\n', t, 'b')
1033
 
 
1034
 
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1035
 
        os.mkdir('c')
1036
 
        # What should the assert be if you try to copy a
1037
 
        # file over a directory?
1038
 
        #self.assertRaises(Something, t.copy, 'a', 'c')
1039
 
        t.put_bytes('d', 'text in d\n')
1040
 
        t.copy('d', 'b')
1041
 
        self.check_transport_contents('text in d\n', t, 'b')
1042
 
 
1043
 
        # TODO: test copy_multi
1044
 
 
1045
 
    def test_connection_error(self):
1046
 
        """ConnectionError is raised when connection is impossible.
1047
 
 
1048
 
        The error should be raised from the first operation on the transport.
1049
 
        """
1050
 
        try:
1051
 
            url = self._server.get_bogus_url()
1052
 
        except NotImplementedError:
1053
 
            raise TestSkipped("Transport %s has no bogus URL support." %
1054
 
                              self._server.__class__)
1055
 
        t = _mod_transport.get_transport_from_url(url)
1056
 
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1057
 
 
1058
 
    def test_stat(self):
1059
 
        # TODO: Test stat, just try once, and if it throws, stop testing
1060
 
        from stat import S_ISDIR, S_ISREG
1061
 
 
1062
 
        t = self.get_transport()
1063
 
 
1064
 
        try:
1065
 
            st = t.stat('.')
1066
 
        except TransportNotPossible, e:
1067
 
            # This transport cannot stat
1068
 
            return
1069
 
 
1070
 
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1071
 
        sizes = [14, 0, 16, 0, 18]
1072
 
        self.build_tree(paths, transport=t, line_endings='binary')
1073
 
 
1074
 
        for path, size in zip(paths, sizes):
1075
 
            st = t.stat(path)
1076
 
            if path.endswith('/'):
1077
 
                self.assertTrue(S_ISDIR(st.st_mode))
1078
 
                # directory sizes are meaningless
1079
 
            else:
1080
 
                self.assertTrue(S_ISREG(st.st_mode))
1081
 
                self.assertEqual(size, st.st_size)
1082
 
 
1083
 
        remote_stats = list(t.stat_multi(paths))
1084
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1085
 
 
1086
 
        self.assertRaises(NoSuchFile, t.stat, 'q')
1087
 
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1088
 
 
1089
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1090
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1091
 
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1092
 
        subdir = t.clone('subdir')
1093
 
        st = subdir.stat('./file')
1094
 
        st = subdir.stat('.')
1095
 
 
1096
 
    def test_hardlink(self):
1097
 
        from stat import ST_NLINK
1098
 
 
1099
 
        t = self.get_transport()
1100
 
 
1101
 
        source_name = "original_target"
1102
 
        link_name = "target_link"
1103
 
 
1104
 
        self.build_tree([source_name], transport=t)
1105
 
 
1106
 
        try:
1107
 
            t.hardlink(source_name, link_name)
1108
 
 
1109
 
            self.assertTrue(t.has(source_name))
1110
 
            self.assertTrue(t.has(link_name))
1111
 
 
1112
 
            st = t.stat(link_name)
1113
 
            self.assertEqual(st[ST_NLINK], 2)
1114
 
        except TransportNotPossible:
1115
 
            raise TestSkipped("Transport %s does not support hardlinks." %
1116
 
                              self._server.__class__)
1117
 
 
1118
 
    def test_symlink(self):
1119
 
        from stat import S_ISLNK
1120
 
 
1121
 
        t = self.get_transport()
1122
 
 
1123
 
        source_name = "original_target"
1124
 
        link_name = "target_link"
1125
 
 
1126
 
        self.build_tree([source_name], transport=t)
1127
 
 
1128
 
        try:
1129
 
            t.symlink(source_name, link_name)
1130
 
 
1131
 
            self.assertTrue(t.has(source_name))
1132
 
            self.assertTrue(t.has(link_name))
1133
 
 
1134
 
            st = t.stat(link_name)
1135
 
            self.assertTrue(S_ISLNK(st.st_mode),
1136
 
                "expected symlink, got mode %o" % st.st_mode)
1137
 
        except TransportNotPossible:
1138
 
            raise TestSkipped("Transport %s does not support symlinks." %
1139
 
                              self._server.__class__)
1140
 
        except IOError:
1141
 
            self.knownFailure("Paramiko fails to create symlinks during tests")
1142
 
 
1143
 
    def test_list_dir(self):
1144
 
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1145
 
        t = self.get_transport()
1146
 
 
1147
 
        if not t.listable():
1148
 
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1149
 
            return
1150
 
 
1151
 
        def sorted_list(d, transport):
1152
 
            l = list(transport.list_dir(d))
1153
 
            l.sort()
1154
 
            return l
1155
 
 
1156
 
        self.assertEqual([], sorted_list('.', t))
1157
 
        # c2 is precisely one letter longer than c here to test that
1158
 
        # suffixing is not confused.
1159
 
        # a%25b checks that quoting is done consistently across transports
1160
 
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1161
 
 
1162
 
        if not t.is_readonly():
1163
 
            self.build_tree(tree_names, transport=t)
1164
 
        else:
1165
 
            self.build_tree(tree_names)
1166
 
 
1167
 
        self.assertEqual(
1168
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1169
 
        self.assertEqual(
1170
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1171
 
        self.assertEqual(['d', 'e'], sorted_list('c', t))
1172
 
 
1173
 
        # Cloning the transport produces an equivalent listing
1174
 
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1175
 
 
1176
 
        if not t.is_readonly():
1177
 
            t.delete('c/d')
1178
 
            t.delete('b')
1179
 
        else:
1180
 
            os.unlink('c/d')
1181
 
            os.unlink('b')
1182
 
 
1183
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1184
 
        self.assertEqual(['e'], sorted_list('c', t))
1185
 
 
1186
 
        self.assertListRaises(PathError, t.list_dir, 'q')
1187
 
        self.assertListRaises(PathError, t.list_dir, 'c/f')
1188
 
        # 'a' is a file, list_dir should raise an error
1189
 
        self.assertListRaises(PathError, t.list_dir, 'a')
1190
 
 
1191
 
    def test_list_dir_result_is_url_escaped(self):
1192
 
        t = self.get_transport()
1193
 
        if not t.listable():
1194
 
            raise TestSkipped("transport not listable")
1195
 
 
1196
 
        if not t.is_readonly():
1197
 
            self.build_tree(['a/', 'a/%'], transport=t)
1198
 
        else:
1199
 
            self.build_tree(['a/', 'a/%'])
1200
 
 
1201
 
        names = list(t.list_dir('a'))
1202
 
        self.assertEqual(['%25'], names)
1203
 
        self.assertIsInstance(names[0], str)
1204
 
 
1205
 
    def test_clone_preserve_info(self):
1206
 
        t1 = self.get_transport()
1207
 
        if not isinstance(t1, ConnectedTransport):
1208
 
            raise TestSkipped("not a connected transport")
1209
 
 
1210
 
        t2 = t1.clone('subdir')
1211
 
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
1212
 
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
1213
 
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
1214
 
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
1215
 
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
1216
 
 
1217
 
    def test__reuse_for(self):
1218
 
        t = self.get_transport()
1219
 
        if not isinstance(t, ConnectedTransport):
1220
 
            raise TestSkipped("not a connected transport")
1221
 
 
1222
 
        def new_url(scheme=None, user=None, password=None,
1223
 
                    host=None, port=None, path=None):
1224
 
            """Build a new url from t.base changing only parts of it.
1225
 
 
1226
 
            Only the parameters different from None will be changed.
1227
 
            """
1228
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1229
 
            if user     is None: user     = t._parsed_url.user
1230
 
            if password is None: password = t._parsed_url.password
1231
 
            if user     is None: user     = t._parsed_url.user
1232
 
            if host     is None: host     = t._parsed_url.host
1233
 
            if port     is None: port     = t._parsed_url.port
1234
 
            if path     is None: path     = t._parsed_url.path
1235
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
1236
 
 
1237
 
        if t._parsed_url.scheme == 'ftp':
1238
 
            scheme = 'sftp'
1239
 
        else:
1240
 
            scheme = 'ftp'
1241
 
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1242
 
        if t._parsed_url.user == 'me':
1243
 
            user = 'you'
1244
 
        else:
1245
 
            user = 'me'
1246
 
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1247
 
        # passwords are not taken into account because:
1248
 
        # - it makes no sense to have two different valid passwords for the
1249
 
        #   same user
1250
 
        # - _password in ConnectedTransport is intended to collect what the
1251
 
        #   user specified from the command-line and there are cases where the
1252
 
        #   new url can contain no password (if the url was built from an
1253
 
        #   existing transport.base for example)
1254
 
        # - password are considered part of the credentials provided at
1255
 
        #   connection creation time and as such may not be present in the url
1256
 
        #   (they may be typed by the user when prompted for example)
1257
 
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1258
 
        # We will not connect, we can use a invalid host
1259
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1260
 
        if t._parsed_url.port == 1234:
1261
 
            port = 4321
1262
 
        else:
1263
 
            port = 1234
1264
 
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1265
 
        # No point in trying to reuse a transport for a local URL
1266
 
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1267
 
 
1268
 
    def test_connection_sharing(self):
1269
 
        t = self.get_transport()
1270
 
        if not isinstance(t, ConnectedTransport):
1271
 
            raise TestSkipped("not a connected transport")
1272
 
 
1273
 
        c = t.clone('subdir')
1274
 
        # Some transports will create the connection  only when needed
1275
 
        t.has('surely_not') # Force connection
1276
 
        self.assertIs(t._get_connection(), c._get_connection())
1277
 
 
1278
 
        # Temporary failure, we need to create a new dummy connection
1279
 
        new_connection = None
1280
 
        t._set_connection(new_connection)
1281
 
        # Check that both transports use the same connection
1282
 
        self.assertIs(new_connection, t._get_connection())
1283
 
        self.assertIs(new_connection, c._get_connection())
1284
 
 
1285
 
    def test_reuse_connection_for_various_paths(self):
1286
 
        t = self.get_transport()
1287
 
        if not isinstance(t, ConnectedTransport):
1288
 
            raise TestSkipped("not a connected transport")
1289
 
 
1290
 
        t.has('surely_not') # Force connection
1291
 
        self.assertIsNot(None, t._get_connection())
1292
 
 
1293
 
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1294
 
        self.assertIsNot(t, subdir)
1295
 
        self.assertIs(t._get_connection(), subdir._get_connection())
1296
 
 
1297
 
        home = subdir._reuse_for(t.base + 'home')
1298
 
        self.assertIs(t._get_connection(), home._get_connection())
1299
 
        self.assertIs(subdir._get_connection(), home._get_connection())
1300
 
 
1301
 
    def test_clone(self):
1302
 
        # TODO: Test that clone moves up and down the filesystem
1303
 
        t1 = self.get_transport()
1304
 
 
1305
 
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1306
 
 
1307
 
        self.assertTrue(t1.has('a'))
1308
 
        self.assertTrue(t1.has('b/c'))
1309
 
        self.assertFalse(t1.has('c'))
1310
 
 
1311
 
        t2 = t1.clone('b')
1312
 
        self.assertEqual(t1.base + 'b/', t2.base)
1313
 
 
1314
 
        self.assertTrue(t2.has('c'))
1315
 
        self.assertFalse(t2.has('a'))
1316
 
 
1317
 
        t3 = t2.clone('..')
1318
 
        self.assertTrue(t3.has('a'))
1319
 
        self.assertFalse(t3.has('c'))
1320
 
 
1321
 
        self.assertFalse(t1.has('b/d'))
1322
 
        self.assertFalse(t2.has('d'))
1323
 
        self.assertFalse(t3.has('b/d'))
1324
 
 
1325
 
        if t1.is_readonly():
1326
 
            self.build_tree_contents([('b/d', 'newfile\n')])
1327
 
        else:
1328
 
            t2.put_bytes('d', 'newfile\n')
1329
 
 
1330
 
        self.assertTrue(t1.has('b/d'))
1331
 
        self.assertTrue(t2.has('d'))
1332
 
        self.assertTrue(t3.has('b/d'))
1333
 
 
1334
 
    def test_clone_to_root(self):
1335
 
        orig_transport = self.get_transport()
1336
 
        # Repeatedly go up to a parent directory until we're at the root
1337
 
        # directory of this transport
1338
 
        root_transport = orig_transport
1339
 
        new_transport = root_transport.clone("..")
1340
 
        # as we are walking up directories, the path must be
1341
 
        # growing less, except at the top
1342
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1343
 
            or new_transport.base == root_transport.base)
1344
 
        while new_transport.base != root_transport.base:
1345
 
            root_transport = new_transport
1346
 
            new_transport = root_transport.clone("..")
1347
 
            # as we are walking up directories, the path must be
1348
 
            # growing less, except at the top
1349
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1350
 
                or new_transport.base == root_transport.base)
1351
 
 
1352
 
        # Cloning to "/" should take us to exactly the same location.
1353
 
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1354
 
        # the abspath of "/" from the original transport should be the same
1355
 
        # as the base at the root:
1356
 
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1357
 
 
1358
 
        # At the root, the URL must still end with / as its a directory
1359
 
        self.assertEqual(root_transport.base[-1], '/')
1360
 
 
1361
 
    def test_clone_from_root(self):
1362
 
        """At the root, cloning to a simple dir should just do string append."""
1363
 
        orig_transport = self.get_transport()
1364
 
        root_transport = orig_transport.clone('/')
1365
 
        self.assertEqual(root_transport.base + '.bzr/',
1366
 
            root_transport.clone('.bzr').base)
1367
 
 
1368
 
    def test_base_url(self):
1369
 
        t = self.get_transport()
1370
 
        self.assertEqual('/', t.base[-1])
1371
 
 
1372
 
    def test_relpath(self):
1373
 
        t = self.get_transport()
1374
 
        self.assertEqual('', t.relpath(t.base))
1375
 
        # base ends with /
1376
 
        self.assertEqual('', t.relpath(t.base[:-1]))
1377
 
        # subdirs which don't exist should still give relpaths.
1378
 
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1379
 
        # trailing slash should be the same.
1380
 
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1381
 
 
1382
 
    def test_relpath_at_root(self):
1383
 
        t = self.get_transport()
1384
 
        # clone all the way to the top
1385
 
        new_transport = t.clone('..')
1386
 
        while new_transport.base != t.base:
1387
 
            t = new_transport
1388
 
            new_transport = t.clone('..')
1389
 
        # we must be able to get a relpath below the root
1390
 
        self.assertEqual('', t.relpath(t.base))
1391
 
        # and a deeper one should work too
1392
 
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1393
 
 
1394
 
    def test_abspath(self):
1395
 
        # smoke test for abspath. Corner cases for backends like unix fs's
1396
 
        # that have aliasing problems like symlinks should go in backend
1397
 
        # specific test cases.
1398
 
        transport = self.get_transport()
1399
 
 
1400
 
        self.assertEqual(transport.base + 'relpath',
1401
 
                         transport.abspath('relpath'))
1402
 
 
1403
 
        # This should work without raising an error.
1404
 
        transport.abspath("/")
1405
 
 
1406
 
        # the abspath of "/" and "/foo/.." should result in the same location
1407
 
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1408
 
 
1409
 
        self.assertEqual(transport.clone("/").abspath('foo'),
1410
 
                         transport.abspath("/foo"))
1411
 
 
1412
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1413
 
    def test_win32_abspath(self):
1414
 
        # Note: we tried to set sys.platform='win32' so we could test on
1415
 
        # other platforms too, but then osutils does platform specific
1416
 
        # things at import time which defeated us...
1417
 
        if sys.platform != 'win32':
1418
 
            raise TestSkipped(
1419
 
                'Testing drive letters in abspath implemented only for win32')
1420
 
 
1421
 
        # smoke test for abspath on win32.
1422
 
        # a transport based on 'file:///' never fully qualifies the drive.
1423
 
        transport = _mod_transport.get_transport_from_url("file:///")
1424
 
        self.assertEqual(transport.abspath("/"), "file:///")
1425
 
 
1426
 
        # but a transport that starts with a drive spec must keep it.
1427
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1428
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1429
 
 
1430
 
    def test_local_abspath(self):
1431
 
        transport = self.get_transport()
1432
 
        try:
1433
 
            p = transport.local_abspath('.')
1434
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
1435
 
            # should be formattable
1436
 
            s = str(e)
1437
 
        else:
1438
 
            self.assertEqual(getcwd(), p)
1439
 
 
1440
 
    def test_abspath_at_root(self):
1441
 
        t = self.get_transport()
1442
 
        # clone all the way to the top
1443
 
        new_transport = t.clone('..')
1444
 
        while new_transport.base != t.base:
1445
 
            t = new_transport
1446
 
            new_transport = t.clone('..')
1447
 
        # we must be able to get a abspath of the root when we ask for
1448
 
        # t.abspath('..') - this due to our choice that clone('..')
1449
 
        # should return the root from the root, combined with the desire that
1450
 
        # the url from clone('..') and from abspath('..') should be the same.
1451
 
        self.assertEqual(t.base, t.abspath('..'))
1452
 
        # '' should give us the root
1453
 
        self.assertEqual(t.base, t.abspath(''))
1454
 
        # and a path should append to the url
1455
 
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
1456
 
 
1457
 
    def test_iter_files_recursive(self):
1458
 
        transport = self.get_transport()
1459
 
        if not transport.listable():
1460
 
            self.assertRaises(TransportNotPossible,
1461
 
                              transport.iter_files_recursive)
1462
 
            return
1463
 
        self.build_tree(['isolated/',
1464
 
                         'isolated/dir/',
1465
 
                         'isolated/dir/foo',
1466
 
                         'isolated/dir/bar',
1467
 
                         'isolated/dir/b%25z', # make sure quoting is correct
1468
 
                         'isolated/bar'],
1469
 
                        transport=transport)
1470
 
        paths = set(transport.iter_files_recursive())
1471
 
        # nb the directories are not converted
1472
 
        self.assertEqual(paths,
1473
 
                    set(['isolated/dir/foo',
1474
 
                         'isolated/dir/bar',
1475
 
                         'isolated/dir/b%2525z',
1476
 
                         'isolated/bar']))
1477
 
        sub_transport = transport.clone('isolated')
1478
 
        paths = set(sub_transport.iter_files_recursive())
1479
 
        self.assertEqual(paths,
1480
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1481
 
 
1482
 
    def test_copy_tree(self):
1483
 
        # TODO: test file contents and permissions are preserved. This test was
1484
 
        # added just to ensure that quoting was handled correctly.
1485
 
        # -- David Allouche 2006-08-11
1486
 
        transport = self.get_transport()
1487
 
        if not transport.listable():
1488
 
            self.assertRaises(TransportNotPossible,
1489
 
                              transport.iter_files_recursive)
1490
 
            return
1491
 
        if transport.is_readonly():
1492
 
            return
1493
 
        self.build_tree(['from/',
1494
 
                         'from/dir/',
1495
 
                         'from/dir/foo',
1496
 
                         'from/dir/bar',
1497
 
                         'from/dir/b%25z', # make sure quoting is correct
1498
 
                         'from/bar'],
1499
 
                        transport=transport)
1500
 
        transport.copy_tree('from', 'to')
1501
 
        paths = set(transport.iter_files_recursive())
1502
 
        self.assertEqual(paths,
1503
 
                    set(['from/dir/foo',
1504
 
                         'from/dir/bar',
1505
 
                         'from/dir/b%2525z',
1506
 
                         'from/bar',
1507
 
                         'to/dir/foo',
1508
 
                         'to/dir/bar',
1509
 
                         'to/dir/b%2525z',
1510
 
                         'to/bar',]))
1511
 
 
1512
 
    def test_copy_tree_to_transport(self):
1513
 
        transport = self.get_transport()
1514
 
        if not transport.listable():
1515
 
            self.assertRaises(TransportNotPossible,
1516
 
                              transport.iter_files_recursive)
1517
 
            return
1518
 
        if transport.is_readonly():
1519
 
            return
1520
 
        self.build_tree(['from/',
1521
 
                         'from/dir/',
1522
 
                         'from/dir/foo',
1523
 
                         'from/dir/bar',
1524
 
                         'from/dir/b%25z', # make sure quoting is correct
1525
 
                         'from/bar'],
1526
 
                        transport=transport)
1527
 
        from_transport = transport.clone('from')
1528
 
        to_transport = transport.clone('to')
1529
 
        to_transport.ensure_base()
1530
 
        from_transport.copy_tree_to_transport(to_transport)
1531
 
        paths = set(transport.iter_files_recursive())
1532
 
        self.assertEqual(paths,
1533
 
                    set(['from/dir/foo',
1534
 
                         'from/dir/bar',
1535
 
                         'from/dir/b%2525z',
1536
 
                         'from/bar',
1537
 
                         'to/dir/foo',
1538
 
                         'to/dir/bar',
1539
 
                         'to/dir/b%2525z',
1540
 
                         'to/bar',]))
1541
 
 
1542
 
    def test_unicode_paths(self):
1543
 
        """Test that we can read/write files with Unicode names."""
1544
 
        t = self.get_transport()
1545
 
 
1546
 
        # With FAT32 and certain encodings on win32
1547
 
        # '\xe5' and '\xe4' actually map to the same file
1548
 
        # adding a suffix kicks in the 'preserving but insensitive'
1549
 
        # route, and maintains the right files
1550
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1551
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1552
 
                 u'\u017d', # Z with umlat iso-8859-2
1553
 
                 u'\u062c', # Arabic j
1554
 
                 u'\u0410', # Russian A
1555
 
                 u'\u65e5', # Kanji person
1556
 
                ]
1557
 
 
1558
 
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1559
 
        if no_unicode_support:
1560
 
            self.knownFailure("test server cannot handle unicode paths")
1561
 
 
1562
 
        try:
1563
 
            self.build_tree(files, transport=t, line_endings='binary')
1564
 
        except UnicodeError:
1565
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
1566
 
 
1567
 
        # A plain unicode string is not a valid url
1568
 
        for fname in files:
1569
 
            self.assertRaises(InvalidURL, t.get, fname)
1570
 
 
1571
 
        for fname in files:
1572
 
            fname_utf8 = fname.encode('utf-8')
1573
 
            contents = 'contents of %s\n' % (fname_utf8,)
1574
 
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1575
 
 
1576
 
    def test_connect_twice_is_same_content(self):
1577
 
        # check that our server (whatever it is) is accessible reliably
1578
 
        # via get_transport and multiple connections share content.
1579
 
        transport = self.get_transport()
1580
 
        if transport.is_readonly():
1581
 
            return
1582
 
        transport.put_bytes('foo', 'bar')
1583
 
        transport3 = self.get_transport()
1584
 
        self.check_transport_contents('bar', transport3, 'foo')
1585
 
 
1586
 
        # now opening at a relative url should give use a sane result:
1587
 
        transport.mkdir('newdir')
1588
 
        transport5 = self.get_transport('newdir')
1589
 
        transport6 = transport5.clone('..')
1590
 
        self.check_transport_contents('bar', transport6, 'foo')
1591
 
 
1592
 
    def test_lock_write(self):
1593
 
        """Test transport-level write locks.
1594
 
 
1595
 
        These are deprecated and transports may decline to support them.
1596
 
        """
1597
 
        transport = self.get_transport()
1598
 
        if transport.is_readonly():
1599
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1600
 
            return
1601
 
        transport.put_bytes('lock', '')
1602
 
        try:
1603
 
            lock = transport.lock_write('lock')
1604
 
        except TransportNotPossible:
1605
 
            return
1606
 
        # TODO make this consistent on all platforms:
1607
 
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1608
 
        lock.unlock()
1609
 
 
1610
 
    def test_lock_read(self):
1611
 
        """Test transport-level read locks.
1612
 
 
1613
 
        These are deprecated and transports may decline to support them.
1614
 
        """
1615
 
        transport = self.get_transport()
1616
 
        if transport.is_readonly():
1617
 
            file('lock', 'w').close()
1618
 
        else:
1619
 
            transport.put_bytes('lock', '')
1620
 
        try:
1621
 
            lock = transport.lock_read('lock')
1622
 
        except TransportNotPossible:
1623
 
            return
1624
 
        # TODO make this consistent on all platforms:
1625
 
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1626
 
        lock.unlock()
1627
 
 
1628
 
    def test_readv(self):
1629
 
        transport = self.get_transport()
1630
 
        if transport.is_readonly():
1631
 
            with file('a', 'w') as f: f.write('0123456789')
1632
 
        else:
1633
 
            transport.put_bytes('a', '0123456789')
1634
 
 
1635
 
        d = list(transport.readv('a', ((0, 1),)))
1636
 
        self.assertEqual(d[0], (0, '0'))
1637
 
 
1638
 
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1639
 
        self.assertEqual(d[0], (0, '0'))
1640
 
        self.assertEqual(d[1], (1, '1'))
1641
 
        self.assertEqual(d[2], (3, '34'))
1642
 
        self.assertEqual(d[3], (9, '9'))
1643
 
 
1644
 
    def test_readv_out_of_order(self):
1645
 
        transport = self.get_transport()
1646
 
        if transport.is_readonly():
1647
 
            with file('a', 'w') as f: f.write('0123456789')
1648
 
        else:
1649
 
            transport.put_bytes('a', '01234567890')
1650
 
 
1651
 
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1652
 
        self.assertEqual(d[0], (1, '1'))
1653
 
        self.assertEqual(d[1], (9, '9'))
1654
 
        self.assertEqual(d[2], (0, '0'))
1655
 
        self.assertEqual(d[3], (3, '34'))
1656
 
 
1657
 
    def test_readv_with_adjust_for_latency(self):
1658
 
        transport = self.get_transport()
1659
 
        # the adjust for latency flag expands the data region returned
1660
 
        # according to a per-transport heuristic, so testing is a little
1661
 
        # tricky as we need more data than the largest combining that our
1662
 
        # transports do. To accomodate this we generate random data and cross
1663
 
        # reference the returned data with the random data. To avoid doing
1664
 
        # multiple large random byte look ups we do several tests on the same
1665
 
        # backing data.
1666
 
        content = osutils.rand_bytes(200*1024)
1667
 
        content_size = len(content)
1668
 
        if transport.is_readonly():
1669
 
            self.build_tree_contents([('a', content)])
1670
 
        else:
1671
 
            transport.put_bytes('a', content)
1672
 
        def check_result_data(result_vector):
1673
 
            for item in result_vector:
1674
 
                data_len = len(item[1])
1675
 
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1676
 
 
1677
 
        # start corner case
1678
 
        result = list(transport.readv('a', ((0, 30),),
1679
 
            adjust_for_latency=True, upper_limit=content_size))
1680
 
        # we expect 1 result, from 0, to something > 30
1681
 
        self.assertEqual(1, len(result))
1682
 
        self.assertEqual(0, result[0][0])
1683
 
        self.assertTrue(len(result[0][1]) >= 30)
1684
 
        check_result_data(result)
1685
 
        # end of file corner case
1686
 
        result = list(transport.readv('a', ((204700, 100),),
1687
 
            adjust_for_latency=True, upper_limit=content_size))
1688
 
        # we expect 1 result, from 204800- its length, to the end
1689
 
        self.assertEqual(1, len(result))
1690
 
        data_len = len(result[0][1])
1691
 
        self.assertEqual(204800-data_len, result[0][0])
1692
 
        self.assertTrue(data_len >= 100)
1693
 
        check_result_data(result)
1694
 
        # out of order ranges are made in order
1695
 
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1696
 
            adjust_for_latency=True, upper_limit=content_size))
1697
 
        # we expect 2 results, in order, start and end.
1698
 
        self.assertEqual(2, len(result))
1699
 
        # start
1700
 
        data_len = len(result[0][1])
1701
 
        self.assertEqual(0, result[0][0])
1702
 
        self.assertTrue(data_len >= 30)
1703
 
        # end
1704
 
        data_len = len(result[1][1])
1705
 
        self.assertEqual(204800-data_len, result[1][0])
1706
 
        self.assertTrue(data_len >= 100)
1707
 
        check_result_data(result)
1708
 
        # close ranges get combined (even if out of order)
1709
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1710
 
            result = list(transport.readv('a', request_vector,
1711
 
                adjust_for_latency=True, upper_limit=content_size))
1712
 
            self.assertEqual(1, len(result))
1713
 
            data_len = len(result[0][1])
1714
 
            # minimum length is from 400 to 1034 - 634
1715
 
            self.assertTrue(data_len >= 634)
1716
 
            # must contain the region 400 to 1034
1717
 
            self.assertTrue(result[0][0] <= 400)
1718
 
            self.assertTrue(result[0][0] + data_len >= 1034)
1719
 
            check_result_data(result)
1720
 
 
1721
 
    def test_readv_with_adjust_for_latency_with_big_file(self):
1722
 
        transport = self.get_transport()
1723
 
        # test from observed failure case.
1724
 
        if transport.is_readonly():
1725
 
            with file('a', 'w') as f: f.write('a'*1024*1024)
1726
 
        else:
1727
 
            transport.put_bytes('a', 'a'*1024*1024)
1728
 
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1729
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1730
 
            (465373, 800), (947422, 800)]
1731
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1732
 
        found_items = [False]*9
1733
 
        for pos, (start, length) in enumerate(broken_vector):
1734
 
            # check the range is covered by the result
1735
 
            for offset, data in results:
1736
 
                if offset <= start and start + length <= offset + len(data):
1737
 
                    found_items[pos] = True
1738
 
        self.assertEqual([True]*9, found_items)
1739
 
 
1740
 
    def test_get_with_open_write_stream_sees_all_content(self):
1741
 
        t = self.get_transport()
1742
 
        if t.is_readonly():
1743
 
            return
1744
 
        handle = t.open_write_stream('foo')
1745
 
        try:
1746
 
            handle.write('bcd')
1747
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1748
 
        finally:
1749
 
            handle.close()
1750
 
 
1751
 
    def test_get_smart_medium(self):
1752
 
        """All transports must either give a smart medium, or know they can't.
1753
 
        """
1754
 
        transport = self.get_transport()
1755
 
        try:
1756
 
            client_medium = transport.get_smart_medium()
1757
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1758
 
        except errors.NoSmartMedium:
1759
 
            # as long as we got it we're fine
1760
 
            pass
1761
 
 
1762
 
    def test_readv_short_read(self):
1763
 
        transport = self.get_transport()
1764
 
        if transport.is_readonly():
1765
 
            with file('a', 'w') as f: f.write('0123456789')
1766
 
        else:
1767
 
            transport.put_bytes('a', '01234567890')
1768
 
 
1769
 
        # This is intentionally reading off the end of the file
1770
 
        # since we are sure that it cannot get there
1771
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1772
 
                               # Can be raised by paramiko
1773
 
                               AssertionError),
1774
 
                              transport.readv, 'a', [(1,1), (8,10)])
1775
 
 
1776
 
        # This is trying to seek past the end of the file, it should
1777
 
        # also raise a special error
1778
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1779
 
                              transport.readv, 'a', [(12,2)])
1780
 
 
1781
 
    def test_no_segment_parameters(self):
1782
 
        """Segment parameters should be stripped and stored in
1783
 
        transport.segment_parameters."""
1784
 
        transport = self.get_transport("foo")
1785
 
        self.assertEquals({}, transport.get_segment_parameters())
1786
 
 
1787
 
    def test_segment_parameters(self):
1788
 
        """Segment parameters should be stripped and stored in
1789
 
        transport.get_segment_parameters()."""
1790
 
        base_url = self._server.get_url()
1791
 
        parameters = {"key1": "val1", "key2": "val2"}
1792
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1793
 
        transport = _mod_transport.get_transport_from_url(url)
1794
 
        self.assertEquals(parameters, transport.get_segment_parameters())
1795
 
 
1796
 
    def test_set_segment_parameters(self):
1797
 
        """Segment parameters can be set and show up in base."""
1798
 
        transport = self.get_transport("foo")
1799
 
        orig_base = transport.base
1800
 
        transport.set_segment_parameter("arm", "board")
1801
 
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
1802
 
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
1803
 
        transport.set_segment_parameter("arm", None)
1804
 
        transport.set_segment_parameter("nonexistant", None)
1805
 
        self.assertEquals({}, transport.get_segment_parameters())
1806
 
        self.assertEquals(orig_base, transport.base)
1807
 
 
1808
 
    def test_stat_symlink(self):
1809
 
        # if a transport points directly to a symlink (and supports symlinks
1810
 
        # at all) you can tell this.  helps with bug 32669.
1811
 
        t = self.get_transport()
1812
 
        try:
1813
 
            t.symlink('target', 'link')
1814
 
        except TransportNotPossible:
1815
 
            raise TestSkipped("symlinks not supported")
1816
 
        t2 = t.clone('link')
1817
 
        st = t2.stat('')
1818
 
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1819
 
 
1820
 
    def test_abspath_url_unquote_unreserved(self):
1821
 
        """URLs from abspath should have unreserved characters unquoted
1822
 
        
1823
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1824
 
        """
1825
 
        t = self.get_transport()
1826
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1827
 
        self.assertEqual(t.base + "-.09AZ_az~",
1828
 
            t.abspath(needlessly_escaped_dir))
1829
 
 
1830
 
    def test_clone_url_unquote_unreserved(self):
1831
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1832
 
        
1833
 
        Cloned transports should be prefix comparable for things like the
1834
 
        isolation checking of tests, see lp:842223 for more.
1835
 
        """
1836
 
        t1 = self.get_transport()
1837
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1838
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1839
 
        t2 = t1.clone(needlessly_escaped_dir)
1840
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1841
 
 
1842
 
    def test_hook_post_connection_one(self):
1843
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1844
 
        log = []
1845
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1846
 
        t = self.get_transport()
1847
 
        self.assertEqual([], log)
1848
 
        t.has("non-existant")
1849
 
        if isinstance(t, RemoteTransport):
1850
 
            self.assertEqual([t.get_smart_medium()], log)
1851
 
        elif isinstance(t, ConnectedTransport):
1852
 
            self.assertEqual([t], log)
1853
 
        else:
1854
 
            self.assertEqual([], log)
1855
 
 
1856
 
    def test_hook_post_connection_multi(self):
1857
 
        """Fire post_connect hook once per unshared underlying connection"""
1858
 
        log = []
1859
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1860
 
        t1 = self.get_transport()
1861
 
        t2 = t1.clone(".")
1862
 
        t3 = self.get_transport()
1863
 
        self.assertEqual([], log)
1864
 
        t1.has("x")
1865
 
        t2.has("x")
1866
 
        t3.has("x")
1867
 
        if isinstance(t1, RemoteTransport):
1868
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1869
 
        elif isinstance(t1, ConnectedTransport):
1870
 
            self.assertEqual([t1, t3], log)
1871
 
        else:
1872
 
            self.assertEqual([], log)