~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: John Arbash Meinel
  • Date: 2010-08-23 21:36:06 UTC
  • mto: This revision was merged to the branch mainline in revision 5390.
  • Revision ID: john@arbash-meinel.com-20100823213606-1pk5w1nyaz9kz9k0
Lots of compatibility changes for python2.4 and mingw32 compilers.

gcc has strtoll available, and does the wrong thing if you use _strtoi64 (it implicitly
defines it to return an 'int' which isn't a 64-bit integer.)

Further, python2.4 doesn't support the %lu syntax, only having %ld and %d. So we go
back to casting everything into real python objects and then stringifying them, but
only for python 2.4.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import itertools
 
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
 
27
import stat
 
28
import sys
 
29
import unittest
 
30
 
 
31
from bzrlib import (
 
32
    errors,
 
33
    osutils,
 
34
    tests,
 
35
    urlutils,
 
36
    )
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
 
47
from bzrlib.osutils import getcwd
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestSkipped,
 
52
    TestNotApplicable,
 
53
    multiply_tests,
 
54
    )
 
55
from bzrlib.tests import test_server
 
56
from bzrlib.tests.test_transport import TestTransportImplementation
 
57
from bzrlib.transport import (
 
58
    ConnectedTransport,
 
59
    get_transport,
 
60
    _get_transport_modules,
 
61
    )
 
62
from bzrlib.transport.memory import MemoryTransport
 
63
 
 
64
 
 
65
def get_transport_test_permutations(module):
 
66
    """Get the permutations module wants to have tested."""
 
67
    if getattr(module, 'get_test_permutations', None) is None:
 
68
        raise AssertionError(
 
69
            "transport module %s doesn't provide get_test_permutations()"
 
70
            % module.__name__)
 
71
        return []
 
72
    return module.get_test_permutations()
 
73
 
 
74
 
 
75
def transport_test_permutations():
 
76
    """Return a list of the klass, server_factory pairs to test."""
 
77
    result = []
 
78
    for module in _get_transport_modules():
 
79
        try:
 
80
            permutations = get_transport_test_permutations(
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
82
            for (klass, server_factory) in permutations:
 
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
84
                    {"transport_class":klass,
 
85
                     "transport_server":server_factory})
 
86
                result.append(scenario)
 
87
        except errors.DependencyNotPresent, e:
 
88
            # Continue even if a dependency prevents us
 
89
            # from adding this test
 
90
            pass
 
91
    return result
 
92
 
 
93
 
 
94
def load_tests(standard_tests, module, loader):
 
95
    """Multiply tests for tranport implementations."""
 
96
    result = loader.suiteClass()
 
97
    scenarios = transport_test_permutations()
 
98
    return multiply_tests(standard_tests, scenarios, result)
 
99
 
 
100
 
 
101
class TransportTests(TestTransportImplementation):
 
102
 
 
103
    def setUp(self):
 
104
        super(TransportTests, self).setUp()
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
 
106
 
 
107
    def check_transport_contents(self, content, transport, relpath):
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
 
110
 
 
111
    def test_ensure_base_missing(self):
 
112
        """.ensure_base() should create the directory if it doesn't exist"""
 
113
        t = self.get_transport()
 
114
        t_a = t.clone('a')
 
115
        if t_a.is_readonly():
 
116
            self.assertRaises(TransportNotPossible,
 
117
                              t_a.ensure_base)
 
118
            return
 
119
        self.assertTrue(t_a.ensure_base())
 
120
        self.assertTrue(t.has('a'))
 
121
 
 
122
    def test_ensure_base_exists(self):
 
123
        """.ensure_base() should just be happy if it already exists"""
 
124
        t = self.get_transport()
 
125
        if t.is_readonly():
 
126
            return
 
127
 
 
128
        t.mkdir('a')
 
129
        t_a = t.clone('a')
 
130
        # ensure_base returns False if it didn't create the base
 
131
        self.assertFalse(t_a.ensure_base())
 
132
 
 
133
    def test_ensure_base_missing_parent(self):
 
134
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
135
        t = self.get_transport()
 
136
        if t.is_readonly():
 
137
            return
 
138
 
 
139
        t_a = t.clone('a')
 
140
        t_b = t_a.clone('b')
 
141
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
142
 
 
143
    def test_external_url(self):
 
144
        """.external_url either works or raises InProcessTransport."""
 
145
        t = self.get_transport()
 
146
        try:
 
147
            t.external_url()
 
148
        except errors.InProcessTransport:
 
149
            pass
 
150
 
 
151
    def test_has(self):
 
152
        t = self.get_transport()
 
153
 
 
154
        files = ['a', 'b', 'e', 'g', '%']
 
155
        self.build_tree(files, transport=t)
 
156
        self.assertEqual(True, t.has('a'))
 
157
        self.assertEqual(False, t.has('c'))
 
158
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
159
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
160
                                           'e', 'f', 'g', 'h'])),
 
161
                         [True, True, False, False,
 
162
                          True, False, True, False])
 
163
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
164
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
165
                                           urlutils.escape('%%')]))
 
166
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
167
                                                'e', 'f', 'g', 'h']))),
 
168
                         [True, True, False, False,
 
169
                          True, False, True, False])
 
170
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
171
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
172
 
 
173
    def test_has_root_works(self):
 
174
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
175
            raise TestNotApplicable(
 
176
                "SmartTCPServer_for_testing intentionally does not allow "
 
177
                "access to /.")
 
178
        current_transport = self.get_transport()
 
179
        self.assertTrue(current_transport.has('/'))
 
180
        root = current_transport.clone('/')
 
181
        self.assertTrue(root.has(''))
 
182
 
 
183
    def test_get(self):
 
184
        t = self.get_transport()
 
185
 
 
186
        files = ['a', 'b', 'e', 'g']
 
187
        contents = ['contents of a\n',
 
188
                    'contents of b\n',
 
189
                    'contents of e\n',
 
190
                    'contents of g\n',
 
191
                    ]
 
192
        self.build_tree(files, transport=t, line_endings='binary')
 
193
        self.check_transport_contents('contents of a\n', t, 'a')
 
194
        content_f = t.get_multi(files)
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
 
199
            self.assertEqual(content, f.read())
 
200
 
 
201
        content_f = t.get_multi(iter(files))
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
 
204
            self.assertEqual(content, f.read())
 
205
 
 
206
    def test_get_unknown_file(self):
 
207
        t = self.get_transport()
 
208
        files = ['a', 'b']
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
 
211
                    ]
 
212
        self.build_tree(files, transport=t, line_endings='binary')
 
213
        self.assertRaises(NoSuchFile, t.get, 'c')
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
216
 
 
217
    def test_get_directory_read_gives_ReadError(self):
 
218
        """consistent errors for read() on a file returned by get()."""
 
219
        t = self.get_transport()
 
220
        if t.is_readonly():
 
221
            self.build_tree(['a directory/'])
 
222
        else:
 
223
            t.mkdir('a%20directory')
 
224
        # getting the file must either work or fail with a PathError
 
225
        try:
 
226
            a_file = t.get('a%20directory')
 
227
        except (errors.PathError, errors.RedirectRequested):
 
228
            # early failure return immediately.
 
229
            return
 
230
        # having got a file, read() must either work (i.e. http reading a dir
 
231
        # listing) or fail with ReadError
 
232
        try:
 
233
            a_file.read()
 
234
        except errors.ReadError:
 
235
            pass
 
236
 
 
237
    def test_get_bytes(self):
 
238
        t = self.get_transport()
 
239
 
 
240
        files = ['a', 'b', 'e', 'g']
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
 
245
                    ]
 
246
        self.build_tree(files, transport=t, line_endings='binary')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
 
248
 
 
249
        for content, fname in zip(contents, files):
 
250
            self.assertEqual(content, t.get_bytes(fname))
 
251
 
 
252
    def test_get_bytes_unknown_file(self):
 
253
        t = self.get_transport()
 
254
 
 
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
256
 
 
257
    def test_get_with_open_write_stream_sees_all_content(self):
 
258
        t = self.get_transport()
 
259
        if t.is_readonly():
 
260
            return
 
261
        handle = t.open_write_stream('foo')
 
262
        try:
 
263
            handle.write('b')
 
264
            self.assertEqual('b', t.get('foo').read())
 
265
        finally:
 
266
            handle.close()
 
267
 
 
268
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
269
        t = self.get_transport()
 
270
        if t.is_readonly():
 
271
            return
 
272
        handle = t.open_write_stream('foo')
 
273
        try:
 
274
            handle.write('b')
 
275
            self.assertEqual('b', t.get_bytes('foo'))
 
276
            self.assertEqual('b', t.get('foo').read())
 
277
        finally:
 
278
            handle.close()
 
279
 
 
280
    def test_put_bytes(self):
 
281
        t = self.get_transport()
 
282
 
 
283
        if t.is_readonly():
 
284
            self.assertRaises(TransportNotPossible,
 
285
                    t.put_bytes, 'a', 'some text for a\n')
 
286
            return
 
287
 
 
288
        t.put_bytes('a', 'some text for a\n')
 
289
        self.failUnless(t.has('a'))
 
290
        self.check_transport_contents('some text for a\n', t, 'a')
 
291
 
 
292
        # The contents should be overwritten
 
293
        t.put_bytes('a', 'new text for a\n')
 
294
        self.check_transport_contents('new text for a\n', t, 'a')
 
295
 
 
296
        self.assertRaises(NoSuchFile,
 
297
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
298
 
 
299
    def test_put_bytes_non_atomic(self):
 
300
        t = self.get_transport()
 
301
 
 
302
        if t.is_readonly():
 
303
            self.assertRaises(TransportNotPossible,
 
304
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
305
            return
 
306
 
 
307
        self.failIf(t.has('a'))
 
308
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
309
        self.failUnless(t.has('a'))
 
310
        self.check_transport_contents('some text for a\n', t, 'a')
 
311
        # Put also replaces contents
 
312
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
313
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
314
 
 
315
        # Make sure we can create another file
 
316
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
317
        # And overwrite 'a' with empty contents
 
318
        t.put_bytes_non_atomic('a', '')
 
319
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
320
        self.check_transport_contents('', t, 'a')
 
321
 
 
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
323
                                       'contents\n')
 
324
        # Now test the create_parent flag
 
325
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
326
                                       'contents\n')
 
327
        self.failIf(t.has('dir/a'))
 
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
329
                               create_parent_dir=True)
 
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
331
 
 
332
        # But we still get NoSuchFile if we can't make the parent dir
 
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
334
                                       'contents\n',
 
335
                                       create_parent_dir=True)
 
336
 
 
337
    def test_put_bytes_permissions(self):
 
338
        t = self.get_transport()
 
339
 
 
340
        if t.is_readonly():
 
341
            return
 
342
        if not t._can_roundtrip_unix_modebits():
 
343
            # Can't roundtrip, so no need to run this test
 
344
            return
 
345
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
346
        self.assertTransportMode(t, 'mode644', 0644)
 
347
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
348
        self.assertTransportMode(t, 'mode666', 0666)
 
349
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
350
        self.assertTransportMode(t, 'mode600', 0600)
 
351
        # Yes, you can put_bytes a file such that it becomes readonly
 
352
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
353
        self.assertTransportMode(t, 'mode400', 0400)
 
354
 
 
355
        # The default permissions should be based on the current umask
 
356
        umask = osutils.get_umask()
 
357
        t.put_bytes('nomode', 'test text\n', mode=None)
 
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
359
 
 
360
    def test_put_bytes_non_atomic_permissions(self):
 
361
        t = self.get_transport()
 
362
 
 
363
        if t.is_readonly():
 
364
            return
 
365
        if not t._can_roundtrip_unix_modebits():
 
366
            # Can't roundtrip, so no need to run this test
 
367
            return
 
368
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
369
        self.assertTransportMode(t, 'mode644', 0644)
 
370
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
371
        self.assertTransportMode(t, 'mode666', 0666)
 
372
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
373
        self.assertTransportMode(t, 'mode600', 0600)
 
374
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
375
        self.assertTransportMode(t, 'mode400', 0400)
 
376
 
 
377
        # The default permissions should be based on the current umask
 
378
        umask = osutils.get_umask()
 
379
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
380
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
381
 
 
382
        # We should also be able to set the mode for a parent directory
 
383
        # when it is created
 
384
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0700, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir700', 0700)
 
387
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0770, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir770', 0770)
 
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
391
                               dir_mode=0777, create_parent_dir=True)
 
392
        self.assertTransportMode(t, 'dir777', 0777)
 
393
 
 
394
    def test_put_file(self):
 
395
        t = self.get_transport()
 
396
 
 
397
        if t.is_readonly():
 
398
            self.assertRaises(TransportNotPossible,
 
399
                    t.put_file, 'a', StringIO('some text for a\n'))
 
400
            return
 
401
 
 
402
        result = t.put_file('a', StringIO('some text for a\n'))
 
403
        # put_file returns the length of the data written
 
404
        self.assertEqual(16, result)
 
405
        self.failUnless(t.has('a'))
 
406
        self.check_transport_contents('some text for a\n', t, 'a')
 
407
        # Put also replaces contents
 
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
409
        self.assertEqual(19, result)
 
410
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
411
        self.assertRaises(NoSuchFile,
 
412
                          t.put_file, 'path/doesnt/exist/c',
 
413
                              StringIO('contents'))
 
414
 
 
415
    def test_put_file_non_atomic(self):
 
416
        t = self.get_transport()
 
417
 
 
418
        if t.is_readonly():
 
419
            self.assertRaises(TransportNotPossible,
 
420
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
421
            return
 
422
 
 
423
        self.failIf(t.has('a'))
 
424
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
425
        self.failUnless(t.has('a'))
 
426
        self.check_transport_contents('some text for a\n', t, 'a')
 
427
        # Put also replaces contents
 
428
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
429
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
430
 
 
431
        # Make sure we can create another file
 
432
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
433
        # And overwrite 'a' with empty contents
 
434
        t.put_file_non_atomic('a', StringIO(''))
 
435
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
436
        self.check_transport_contents('', t, 'a')
 
437
 
 
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
439
                                       StringIO('contents\n'))
 
440
        # Now test the create_parent flag
 
441
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
442
                                       StringIO('contents\n'))
 
443
        self.failIf(t.has('dir/a'))
 
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
445
                              create_parent_dir=True)
 
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
447
 
 
448
        # But we still get NoSuchFile if we can't make the parent dir
 
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
450
                                       StringIO('contents\n'),
 
451
                                       create_parent_dir=True)
 
452
 
 
453
    def test_put_file_permissions(self):
 
454
 
 
455
        t = self.get_transport()
 
456
 
 
457
        if t.is_readonly():
 
458
            return
 
459
        if not t._can_roundtrip_unix_modebits():
 
460
            # Can't roundtrip, so no need to run this test
 
461
            return
 
462
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
463
        self.assertTransportMode(t, 'mode644', 0644)
 
464
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
465
        self.assertTransportMode(t, 'mode666', 0666)
 
466
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
467
        self.assertTransportMode(t, 'mode600', 0600)
 
468
        # Yes, you can put a file such that it becomes readonly
 
469
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
470
        self.assertTransportMode(t, 'mode400', 0400)
 
471
        # The default permissions should be based on the current umask
 
472
        umask = osutils.get_umask()
 
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
475
 
 
476
    def test_put_file_non_atomic_permissions(self):
 
477
        t = self.get_transport()
 
478
 
 
479
        if t.is_readonly():
 
480
            return
 
481
        if not t._can_roundtrip_unix_modebits():
 
482
            # Can't roundtrip, so no need to run this test
 
483
            return
 
484
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
485
        self.assertTransportMode(t, 'mode644', 0644)
 
486
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
487
        self.assertTransportMode(t, 'mode666', 0666)
 
488
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
489
        self.assertTransportMode(t, 'mode600', 0600)
 
490
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
491
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
492
        self.assertTransportMode(t, 'mode400', 0400)
 
493
 
 
494
        # The default permissions should be based on the current umask
 
495
        umask = osutils.get_umask()
 
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
498
 
 
499
        # We should also be able to set the mode for a parent directory
 
500
        # when it is created
 
501
        sio = StringIO()
 
502
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
503
                              dir_mode=0700, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir700', 0700)
 
505
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
506
                              dir_mode=0770, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir770', 0770)
 
508
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
509
                              dir_mode=0777, create_parent_dir=True)
 
510
        self.assertTransportMode(t, 'dir777', 0777)
 
511
 
 
512
    def test_put_bytes_unicode(self):
 
513
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
514
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
515
        # (we don't want to encode unicode here at all, callers should be
 
516
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
517
        # compatibility.  At some point we should use a specific exception.
 
518
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
519
        t = self.get_transport()
 
520
        if t.is_readonly():
 
521
            return
 
522
        unicode_string = u'\u1234'
 
523
        self.assertRaises(
 
524
            (AssertionError, UnicodeEncodeError),
 
525
            t.put_bytes, 'foo', unicode_string)
 
526
 
 
527
    def test_put_file_unicode(self):
 
528
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
529
        # This situation can happen (and has) if code is careless about the type
 
530
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
531
        # cStringIO, because it never returns unicode from read.
 
532
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
533
        # raise, but we raise it for hysterical raisins.
 
534
        t = self.get_transport()
 
535
        if t.is_readonly():
 
536
            return
 
537
        unicode_file = pyStringIO(u'\u1234')
 
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
539
 
 
540
    def test_mkdir(self):
 
541
        t = self.get_transport()
 
542
 
 
543
        if t.is_readonly():
 
544
            # cannot mkdir on readonly transports. We're not testing for
 
545
            # cache coherency because cache behaviour is not currently
 
546
            # defined for the transport interface.
 
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
548
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
549
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
550
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
551
            return
 
552
        # Test mkdir
 
553
        t.mkdir('dir_a')
 
554
        self.assertEqual(t.has('dir_a'), True)
 
555
        self.assertEqual(t.has('dir_b'), False)
 
556
 
 
557
        t.mkdir('dir_b')
 
558
        self.assertEqual(t.has('dir_b'), True)
 
559
 
 
560
        t.mkdir_multi(['dir_c', 'dir_d'])
 
561
 
 
562
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
563
        self.assertEqual(list(t.has_multi(
 
564
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
565
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
566
            [True, True, True, False,
 
567
             True, True, True, True])
 
568
 
 
569
        # we were testing that a local mkdir followed by a transport
 
570
        # mkdir failed thusly, but given that we * in one process * do not
 
571
        # concurrently fiddle with disk dirs and then use transport to do
 
572
        # things, the win here seems marginal compared to the constraint on
 
573
        # the interface. RBC 20051227
 
574
        t.mkdir('dir_g')
 
575
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
576
 
 
577
        # Test get/put in sub-directories
 
578
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
579
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
580
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
581
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
582
 
 
583
        # mkdir of a dir with an absent parent
 
584
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
585
 
 
586
    def test_mkdir_permissions(self):
 
587
        t = self.get_transport()
 
588
        if t.is_readonly():
 
589
            return
 
590
        if not t._can_roundtrip_unix_modebits():
 
591
            # no sense testing on this transport
 
592
            return
 
593
        # Test mkdir with a mode
 
594
        t.mkdir('dmode755', mode=0755)
 
595
        self.assertTransportMode(t, 'dmode755', 0755)
 
596
        t.mkdir('dmode555', mode=0555)
 
597
        self.assertTransportMode(t, 'dmode555', 0555)
 
598
        t.mkdir('dmode777', mode=0777)
 
599
        self.assertTransportMode(t, 'dmode777', 0777)
 
600
        t.mkdir('dmode700', mode=0700)
 
601
        self.assertTransportMode(t, 'dmode700', 0700)
 
602
        t.mkdir_multi(['mdmode755'], mode=0755)
 
603
        self.assertTransportMode(t, 'mdmode755', 0755)
 
604
 
 
605
        # Default mode should be based on umask
 
606
        umask = osutils.get_umask()
 
607
        t.mkdir('dnomode', mode=None)
 
608
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
609
 
 
610
    def test_opening_a_file_stream_creates_file(self):
 
611
        t = self.get_transport()
 
612
        if t.is_readonly():
 
613
            return
 
614
        handle = t.open_write_stream('foo')
 
615
        try:
 
616
            self.assertEqual('', t.get_bytes('foo'))
 
617
        finally:
 
618
            handle.close()
 
619
 
 
620
    def test_opening_a_file_stream_can_set_mode(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        if not t._can_roundtrip_unix_modebits():
 
625
            # Can't roundtrip, so no need to run this test
 
626
            return
 
627
        def check_mode(name, mode, expected):
 
628
            handle = t.open_write_stream(name, mode=mode)
 
629
            handle.close()
 
630
            self.assertTransportMode(t, name, expected)
 
631
        check_mode('mode644', 0644, 0644)
 
632
        check_mode('mode666', 0666, 0666)
 
633
        check_mode('mode600', 0600, 0600)
 
634
        # The default permissions should be based on the current umask
 
635
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
636
 
 
637
    def test_copy_to(self):
 
638
        # FIXME: test:   same server to same server (partly done)
 
639
        # same protocol two servers
 
640
        # and    different protocols (done for now except for MemoryTransport.
 
641
        # - RBC 20060122
 
642
 
 
643
        def simple_copy_files(transport_from, transport_to):
 
644
            files = ['a', 'b', 'c', 'd']
 
645
            self.build_tree(files, transport=transport_from)
 
646
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
647
            for f in files:
 
648
                self.check_transport_contents(transport_to.get(f).read(),
 
649
                                              transport_from, f)
 
650
 
 
651
        t = self.get_transport()
 
652
        temp_transport = MemoryTransport('memory:///')
 
653
        simple_copy_files(t, temp_transport)
 
654
        if not t.is_readonly():
 
655
            t.mkdir('copy_to_simple')
 
656
            t2 = t.clone('copy_to_simple')
 
657
            simple_copy_files(t, t2)
 
658
 
 
659
 
 
660
        # Test that copying into a missing directory raises
 
661
        # NoSuchFile
 
662
        if t.is_readonly():
 
663
            self.build_tree(['e/', 'e/f'])
 
664
        else:
 
665
            t.mkdir('e')
 
666
            t.put_bytes('e/f', 'contents of e')
 
667
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
668
        temp_transport.mkdir('e')
 
669
        t.copy_to(['e/f'], temp_transport)
 
670
 
 
671
        del temp_transport
 
672
        temp_transport = MemoryTransport('memory:///')
 
673
 
 
674
        files = ['a', 'b', 'c', 'd']
 
675
        t.copy_to(iter(files), temp_transport)
 
676
        for f in files:
 
677
            self.check_transport_contents(temp_transport.get(f).read(),
 
678
                                          t, f)
 
679
        del temp_transport
 
680
 
 
681
        for mode in (0666, 0644, 0600, 0400):
 
682
            temp_transport = MemoryTransport("memory:///")
 
683
            t.copy_to(files, temp_transport, mode=mode)
 
684
            for f in files:
 
685
                self.assertTransportMode(temp_transport, f, mode)
 
686
 
 
687
    def test_create_prefix(self):
 
688
        t = self.get_transport()
 
689
        sub = t.clone('foo').clone('bar')
 
690
        try:
 
691
            sub.create_prefix()
 
692
        except TransportNotPossible:
 
693
            self.assertTrue(t.is_readonly())
 
694
        else:
 
695
            self.assertTrue(t.has('foo/bar'))
 
696
 
 
697
    def test_append_file(self):
 
698
        t = self.get_transport()
 
699
 
 
700
        if t.is_readonly():
 
701
            self.assertRaises(TransportNotPossible,
 
702
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
703
            return
 
704
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
705
        t.put_bytes('b', 'contents\nfor b\n')
 
706
 
 
707
        self.assertEqual(20,
 
708
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
709
 
 
710
        self.check_transport_contents(
 
711
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
712
            t, 'a')
 
713
 
 
714
        # a file with no parent should fail..
 
715
        self.assertRaises(NoSuchFile,
 
716
                          t.append_file, 'missing/path', StringIO('content'))
 
717
 
 
718
        # And we can create new files, too
 
719
        self.assertEqual(0,
 
720
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
721
        self.check_transport_contents('some text\nfor a missing file\n',
 
722
                                      t, 'c')
 
723
 
 
724
    def test_append_bytes(self):
 
725
        t = self.get_transport()
 
726
 
 
727
        if t.is_readonly():
 
728
            self.assertRaises(TransportNotPossible,
 
729
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
730
            return
 
731
 
 
732
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
733
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
734
 
 
735
        self.assertEqual(20,
 
736
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
737
 
 
738
        self.check_transport_contents(
 
739
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
740
            t, 'a')
 
741
 
 
742
        # a file with no parent should fail..
 
743
        self.assertRaises(NoSuchFile,
 
744
                          t.append_bytes, 'missing/path', 'content')
 
745
 
 
746
    def test_append_multi(self):
 
747
        t = self.get_transport()
 
748
 
 
749
        if t.is_readonly():
 
750
            return
 
751
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
752
                         'add\nsome\nmore\ncontents\n')
 
753
        t.put_bytes('b', 'contents\nfor b\n')
 
754
 
 
755
        self.assertEqual((43, 15),
 
756
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
757
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
758
 
 
759
        self.check_transport_contents(
 
760
            'diff\ncontents for\na\n'
 
761
            'add\nsome\nmore\ncontents\n'
 
762
            'and\nthen\nsome\nmore\n',
 
763
            t, 'a')
 
764
        self.check_transport_contents(
 
765
                'contents\nfor b\n'
 
766
                'some\nmore\nfor\nb\n',
 
767
                t, 'b')
 
768
 
 
769
        self.assertEqual((62, 31),
 
770
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
771
                                 ('b', StringIO('from an iterator\n'))])))
 
772
        self.check_transport_contents(
 
773
            'diff\ncontents for\na\n'
 
774
            'add\nsome\nmore\ncontents\n'
 
775
            'and\nthen\nsome\nmore\n'
 
776
            'a little bit more\n',
 
777
            t, 'a')
 
778
        self.check_transport_contents(
 
779
                'contents\nfor b\n'
 
780
                'some\nmore\nfor\nb\n'
 
781
                'from an iterator\n',
 
782
                t, 'b')
 
783
 
 
784
        self.assertEqual((80, 0),
 
785
            t.append_multi([('a', StringIO('some text in a\n')),
 
786
                            ('d', StringIO('missing file r\n'))]))
 
787
 
 
788
        self.check_transport_contents(
 
789
            'diff\ncontents for\na\n'
 
790
            'add\nsome\nmore\ncontents\n'
 
791
            'and\nthen\nsome\nmore\n'
 
792
            'a little bit more\n'
 
793
            'some text in a\n',
 
794
            t, 'a')
 
795
        self.check_transport_contents('missing file r\n', t, 'd')
 
796
 
 
797
    def test_append_file_mode(self):
 
798
        """Check that append accepts a mode parameter"""
 
799
        # check append accepts a mode
 
800
        t = self.get_transport()
 
801
        if t.is_readonly():
 
802
            self.assertRaises(TransportNotPossible,
 
803
                t.append_file, 'f', StringIO('f'), mode=None)
 
804
            return
 
805
        t.append_file('f', StringIO('f'), mode=None)
 
806
 
 
807
    def test_append_bytes_mode(self):
 
808
        # check append_bytes accepts a mode
 
809
        t = self.get_transport()
 
810
        if t.is_readonly():
 
811
            self.assertRaises(TransportNotPossible,
 
812
                t.append_bytes, 'f', 'f', mode=None)
 
813
            return
 
814
        t.append_bytes('f', 'f', mode=None)
 
815
 
 
816
    def test_delete(self):
 
817
        # TODO: Test Transport.delete
 
818
        t = self.get_transport()
 
819
 
 
820
        # Not much to do with a readonly transport
 
821
        if t.is_readonly():
 
822
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
823
            return
 
824
 
 
825
        t.put_bytes('a', 'a little bit of text\n')
 
826
        self.failUnless(t.has('a'))
 
827
        t.delete('a')
 
828
        self.failIf(t.has('a'))
 
829
 
 
830
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
831
 
 
832
        t.put_bytes('a', 'a text\n')
 
833
        t.put_bytes('b', 'b text\n')
 
834
        t.put_bytes('c', 'c text\n')
 
835
        self.assertEqual([True, True, True],
 
836
                list(t.has_multi(['a', 'b', 'c'])))
 
837
        t.delete_multi(['a', 'c'])
 
838
        self.assertEqual([False, True, False],
 
839
                list(t.has_multi(['a', 'b', 'c'])))
 
840
        self.failIf(t.has('a'))
 
841
        self.failUnless(t.has('b'))
 
842
        self.failIf(t.has('c'))
 
843
 
 
844
        self.assertRaises(NoSuchFile,
 
845
                t.delete_multi, ['a', 'b', 'c'])
 
846
 
 
847
        self.assertRaises(NoSuchFile,
 
848
                t.delete_multi, iter(['a', 'b', 'c']))
 
849
 
 
850
        t.put_bytes('a', 'another a text\n')
 
851
        t.put_bytes('c', 'another c text\n')
 
852
        t.delete_multi(iter(['a', 'b', 'c']))
 
853
 
 
854
        # We should have deleted everything
 
855
        # SftpServer creates control files in the
 
856
        # working directory, so we can just do a
 
857
        # plain "listdir".
 
858
        # self.assertEqual([], os.listdir('.'))
 
859
 
 
860
    def test_recommended_page_size(self):
 
861
        """Transports recommend a page size for partial access to files."""
 
862
        t = self.get_transport()
 
863
        self.assertIsInstance(t.recommended_page_size(), int)
 
864
 
 
865
    def test_rmdir(self):
 
866
        t = self.get_transport()
 
867
        # Not much to do with a readonly transport
 
868
        if t.is_readonly():
 
869
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
870
            return
 
871
        t.mkdir('adir')
 
872
        t.mkdir('adir/bdir')
 
873
        t.rmdir('adir/bdir')
 
874
        # ftp may not be able to raise NoSuchFile for lack of
 
875
        # details when failing
 
876
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
877
        t.rmdir('adir')
 
878
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
879
 
 
880
    def test_rmdir_not_empty(self):
 
881
        """Deleting a non-empty directory raises an exception
 
882
 
 
883
        sftp (and possibly others) don't give us a specific "directory not
 
884
        empty" exception -- we can just see that the operation failed.
 
885
        """
 
886
        t = self.get_transport()
 
887
        if t.is_readonly():
 
888
            return
 
889
        t.mkdir('adir')
 
890
        t.mkdir('adir/bdir')
 
891
        self.assertRaises(PathError, t.rmdir, 'adir')
 
892
 
 
893
    def test_rmdir_empty_but_similar_prefix(self):
 
894
        """rmdir does not get confused by sibling paths.
 
895
 
 
896
        A naive implementation of MemoryTransport would refuse to rmdir
 
897
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
898
        uses "path.startswith(dir)" on all file paths to determine if directory
 
899
        is empty.
 
900
        """
 
901
        t = self.get_transport()
 
902
        if t.is_readonly():
 
903
            return
 
904
        t.mkdir('foo')
 
905
        t.put_bytes('foo-bar', '')
 
906
        t.mkdir('foo-baz')
 
907
        t.rmdir('foo')
 
908
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
909
        self.failUnless(t.has('foo-bar'))
 
910
 
 
911
    def test_rename_dir_succeeds(self):
 
912
        t = self.get_transport()
 
913
        if t.is_readonly():
 
914
            raise TestSkipped("transport is readonly")
 
915
        t.mkdir('adir')
 
916
        t.mkdir('adir/asubdir')
 
917
        t.rename('adir', 'bdir')
 
918
        self.assertTrue(t.has('bdir/asubdir'))
 
919
        self.assertFalse(t.has('adir'))
 
920
 
 
921
    def test_rename_dir_nonempty(self):
 
922
        """Attempting to replace a nonemtpy directory should fail"""
 
923
        t = self.get_transport()
 
924
        if t.is_readonly():
 
925
            raise TestSkipped("transport is readonly")
 
926
        t.mkdir('adir')
 
927
        t.mkdir('adir/asubdir')
 
928
        t.mkdir('bdir')
 
929
        t.mkdir('bdir/bsubdir')
 
930
        # any kind of PathError would be OK, though we normally expect
 
931
        # DirectoryNotEmpty
 
932
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
933
        # nothing was changed so it should still be as before
 
934
        self.assertTrue(t.has('bdir/bsubdir'))
 
935
        self.assertFalse(t.has('adir/bdir'))
 
936
        self.assertFalse(t.has('adir/bsubdir'))
 
937
 
 
938
    def test_rename_across_subdirs(self):
 
939
        t = self.get_transport()
 
940
        if t.is_readonly():
 
941
            raise TestNotApplicable("transport is readonly")
 
942
        t.mkdir('a')
 
943
        t.mkdir('b')
 
944
        ta = t.clone('a')
 
945
        tb = t.clone('b')
 
946
        ta.put_bytes('f', 'aoeu')
 
947
        ta.rename('f', '../b/f')
 
948
        self.assertTrue(tb.has('f'))
 
949
        self.assertFalse(ta.has('f'))
 
950
        self.assertTrue(t.has('b/f'))
 
951
 
 
952
    def test_delete_tree(self):
 
953
        t = self.get_transport()
 
954
 
 
955
        # Not much to do with a readonly transport
 
956
        if t.is_readonly():
 
957
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
958
            return
 
959
 
 
960
        # and does it like listing ?
 
961
        t.mkdir('adir')
 
962
        try:
 
963
            t.delete_tree('adir')
 
964
        except TransportNotPossible:
 
965
            # ok, this transport does not support delete_tree
 
966
            return
 
967
 
 
968
        # did it delete that trivial case?
 
969
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
970
 
 
971
        self.build_tree(['adir/',
 
972
                         'adir/file',
 
973
                         'adir/subdir/',
 
974
                         'adir/subdir/file',
 
975
                         'adir/subdir2/',
 
976
                         'adir/subdir2/file',
 
977
                         ], transport=t)
 
978
 
 
979
        t.delete_tree('adir')
 
980
        # adir should be gone now.
 
981
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
982
 
 
983
    def test_move(self):
 
984
        t = self.get_transport()
 
985
 
 
986
        if t.is_readonly():
 
987
            return
 
988
 
 
989
        # TODO: I would like to use os.listdir() to
 
990
        # make sure there are no extra files, but SftpServer
 
991
        # creates control files in the working directory
 
992
        # perhaps all of this could be done in a subdirectory
 
993
 
 
994
        t.put_bytes('a', 'a first file\n')
 
995
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
996
 
 
997
        t.move('a', 'b')
 
998
        self.failUnless(t.has('b'))
 
999
        self.failIf(t.has('a'))
 
1000
 
 
1001
        self.check_transport_contents('a first file\n', t, 'b')
 
1002
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
1003
 
 
1004
        # Overwrite a file
 
1005
        t.put_bytes('c', 'c this file\n')
 
1006
        t.move('c', 'b')
 
1007
        self.failIf(t.has('c'))
 
1008
        self.check_transport_contents('c this file\n', t, 'b')
 
1009
 
 
1010
        # TODO: Try to write a test for atomicity
 
1011
        # TODO: Test moving into a non-existent subdirectory
 
1012
        # TODO: Test Transport.move_multi
 
1013
 
 
1014
    def test_copy(self):
 
1015
        t = self.get_transport()
 
1016
 
 
1017
        if t.is_readonly():
 
1018
            return
 
1019
 
 
1020
        t.put_bytes('a', 'a file\n')
 
1021
        t.copy('a', 'b')
 
1022
        self.check_transport_contents('a file\n', t, 'b')
 
1023
 
 
1024
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
1025
        os.mkdir('c')
 
1026
        # What should the assert be if you try to copy a
 
1027
        # file over a directory?
 
1028
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
1029
        t.put_bytes('d', 'text in d\n')
 
1030
        t.copy('d', 'b')
 
1031
        self.check_transport_contents('text in d\n', t, 'b')
 
1032
 
 
1033
        # TODO: test copy_multi
 
1034
 
 
1035
    def test_connection_error(self):
 
1036
        """ConnectionError is raised when connection is impossible.
 
1037
 
 
1038
        The error should be raised from the first operation on the transport.
 
1039
        """
 
1040
        try:
 
1041
            url = self._server.get_bogus_url()
 
1042
        except NotImplementedError:
 
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
 
1044
                              self._server.__class__)
 
1045
        t = get_transport(url)
 
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
1047
 
 
1048
    def test_stat(self):
 
1049
        # TODO: Test stat, just try once, and if it throws, stop testing
 
1050
        from stat import S_ISDIR, S_ISREG
 
1051
 
 
1052
        t = self.get_transport()
 
1053
 
 
1054
        try:
 
1055
            st = t.stat('.')
 
1056
        except TransportNotPossible, e:
 
1057
            # This transport cannot stat
 
1058
            return
 
1059
 
 
1060
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
1061
        sizes = [14, 0, 16, 0, 18]
 
1062
        self.build_tree(paths, transport=t, line_endings='binary')
 
1063
 
 
1064
        for path, size in zip(paths, sizes):
 
1065
            st = t.stat(path)
 
1066
            if path.endswith('/'):
 
1067
                self.failUnless(S_ISDIR(st.st_mode))
 
1068
                # directory sizes are meaningless
 
1069
            else:
 
1070
                self.failUnless(S_ISREG(st.st_mode))
 
1071
                self.assertEqual(size, st.st_size)
 
1072
 
 
1073
        remote_stats = list(t.stat_multi(paths))
 
1074
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1075
 
 
1076
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
1077
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
1078
 
 
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1080
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
1081
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
1082
        subdir = t.clone('subdir')
 
1083
        subdir.stat('./file')
 
1084
        subdir.stat('.')
 
1085
 
 
1086
    def test_hardlink(self):
 
1087
        from stat import ST_NLINK
 
1088
 
 
1089
        t = self.get_transport()
 
1090
 
 
1091
        source_name = "original_target"
 
1092
        link_name = "target_link"
 
1093
 
 
1094
        self.build_tree([source_name], transport=t)
 
1095
 
 
1096
        try:
 
1097
            t.hardlink(source_name, link_name)
 
1098
 
 
1099
            self.failUnless(t.has(source_name))
 
1100
            self.failUnless(t.has(link_name))
 
1101
 
 
1102
            st = t.stat(link_name)
 
1103
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1104
        except TransportNotPossible:
 
1105
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1106
                              self._server.__class__)
 
1107
 
 
1108
    def test_symlink(self):
 
1109
        from stat import S_ISLNK
 
1110
 
 
1111
        t = self.get_transport()
 
1112
 
 
1113
        source_name = "original_target"
 
1114
        link_name = "target_link"
 
1115
 
 
1116
        self.build_tree([source_name], transport=t)
 
1117
 
 
1118
        try:
 
1119
            t.symlink(source_name, link_name)
 
1120
 
 
1121
            self.failUnless(t.has(source_name))
 
1122
            self.failUnless(t.has(link_name))
 
1123
 
 
1124
            st = t.stat(link_name)
 
1125
            self.failUnless(S_ISLNK(st.st_mode),
 
1126
                "expected symlink, got mode %o" % st.st_mode)
 
1127
        except TransportNotPossible:
 
1128
            raise TestSkipped("Transport %s does not support symlinks." %
 
1129
                              self._server.__class__)
 
1130
        except IOError:
 
1131
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1132
 
 
1133
    def test_list_dir(self):
 
1134
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1135
        t = self.get_transport()
 
1136
 
 
1137
        if not t.listable():
 
1138
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1139
            return
 
1140
 
 
1141
        def sorted_list(d, transport):
 
1142
            l = list(transport.list_dir(d))
 
1143
            l.sort()
 
1144
            return l
 
1145
 
 
1146
        self.assertEqual([], sorted_list('.', t))
 
1147
        # c2 is precisely one letter longer than c here to test that
 
1148
        # suffixing is not confused.
 
1149
        # a%25b checks that quoting is done consistently across transports
 
1150
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1151
 
 
1152
        if not t.is_readonly():
 
1153
            self.build_tree(tree_names, transport=t)
 
1154
        else:
 
1155
            self.build_tree(tree_names)
 
1156
 
 
1157
        self.assertEqual(
 
1158
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1159
        self.assertEqual(
 
1160
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1161
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1162
 
 
1163
        # Cloning the transport produces an equivalent listing
 
1164
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1165
 
 
1166
        if not t.is_readonly():
 
1167
            t.delete('c/d')
 
1168
            t.delete('b')
 
1169
        else:
 
1170
            os.unlink('c/d')
 
1171
            os.unlink('b')
 
1172
 
 
1173
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1174
        self.assertEqual(['e'], sorted_list('c', t))
 
1175
 
 
1176
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1177
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1178
        # 'a' is a file, list_dir should raise an error
 
1179
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1180
 
 
1181
    def test_list_dir_result_is_url_escaped(self):
 
1182
        t = self.get_transport()
 
1183
        if not t.listable():
 
1184
            raise TestSkipped("transport not listable")
 
1185
 
 
1186
        if not t.is_readonly():
 
1187
            self.build_tree(['a/', 'a/%'], transport=t)
 
1188
        else:
 
1189
            self.build_tree(['a/', 'a/%'])
 
1190
 
 
1191
        names = list(t.list_dir('a'))
 
1192
        self.assertEqual(['%25'], names)
 
1193
        self.assertIsInstance(names[0], str)
 
1194
 
 
1195
    def test_clone_preserve_info(self):
 
1196
        t1 = self.get_transport()
 
1197
        if not isinstance(t1, ConnectedTransport):
 
1198
            raise TestSkipped("not a connected transport")
 
1199
 
 
1200
        t2 = t1.clone('subdir')
 
1201
        self.assertEquals(t1._scheme, t2._scheme)
 
1202
        self.assertEquals(t1._user, t2._user)
 
1203
        self.assertEquals(t1._password, t2._password)
 
1204
        self.assertEquals(t1._host, t2._host)
 
1205
        self.assertEquals(t1._port, t2._port)
 
1206
 
 
1207
    def test__reuse_for(self):
 
1208
        t = self.get_transport()
 
1209
        if not isinstance(t, ConnectedTransport):
 
1210
            raise TestSkipped("not a connected transport")
 
1211
 
 
1212
        def new_url(scheme=None, user=None, password=None,
 
1213
                    host=None, port=None, path=None):
 
1214
            """Build a new url from t.base changing only parts of it.
 
1215
 
 
1216
            Only the parameters different from None will be changed.
 
1217
            """
 
1218
            if scheme   is None: scheme   = t._scheme
 
1219
            if user     is None: user     = t._user
 
1220
            if password is None: password = t._password
 
1221
            if user     is None: user     = t._user
 
1222
            if host     is None: host     = t._host
 
1223
            if port     is None: port     = t._port
 
1224
            if path     is None: path     = t._path
 
1225
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1226
 
 
1227
        if t._scheme == 'ftp':
 
1228
            scheme = 'sftp'
 
1229
        else:
 
1230
            scheme = 'ftp'
 
1231
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1232
        if t._user == 'me':
 
1233
            user = 'you'
 
1234
        else:
 
1235
            user = 'me'
 
1236
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1237
        # passwords are not taken into account because:
 
1238
        # - it makes no sense to have two different valid passwords for the
 
1239
        #   same user
 
1240
        # - _password in ConnectedTransport is intended to collect what the
 
1241
        #   user specified from the command-line and there are cases where the
 
1242
        #   new url can contain no password (if the url was built from an
 
1243
        #   existing transport.base for example)
 
1244
        # - password are considered part of the credentials provided at
 
1245
        #   connection creation time and as such may not be present in the url
 
1246
        #   (they may be typed by the user when prompted for example)
 
1247
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1248
        # We will not connect, we can use a invalid host
 
1249
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1250
        if t._port == 1234:
 
1251
            port = 4321
 
1252
        else:
 
1253
            port = 1234
 
1254
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1255
        # No point in trying to reuse a transport for a local URL
 
1256
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1257
 
 
1258
    def test_connection_sharing(self):
 
1259
        t = self.get_transport()
 
1260
        if not isinstance(t, ConnectedTransport):
 
1261
            raise TestSkipped("not a connected transport")
 
1262
 
 
1263
        c = t.clone('subdir')
 
1264
        # Some transports will create the connection  only when needed
 
1265
        t.has('surely_not') # Force connection
 
1266
        self.assertIs(t._get_connection(), c._get_connection())
 
1267
 
 
1268
        # Temporary failure, we need to create a new dummy connection
 
1269
        new_connection = object()
 
1270
        t._set_connection(new_connection)
 
1271
        # Check that both transports use the same connection
 
1272
        self.assertIs(new_connection, t._get_connection())
 
1273
        self.assertIs(new_connection, c._get_connection())
 
1274
 
 
1275
    def test_reuse_connection_for_various_paths(self):
 
1276
        t = self.get_transport()
 
1277
        if not isinstance(t, ConnectedTransport):
 
1278
            raise TestSkipped("not a connected transport")
 
1279
 
 
1280
        t.has('surely_not') # Force connection
 
1281
        self.assertIsNot(None, t._get_connection())
 
1282
 
 
1283
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1284
        self.assertIsNot(t, subdir)
 
1285
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1286
 
 
1287
        home = subdir._reuse_for(t.base + 'home')
 
1288
        self.assertIs(t._get_connection(), home._get_connection())
 
1289
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1290
 
 
1291
    def test_clone(self):
 
1292
        # TODO: Test that clone moves up and down the filesystem
 
1293
        t1 = self.get_transport()
 
1294
 
 
1295
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1296
 
 
1297
        self.failUnless(t1.has('a'))
 
1298
        self.failUnless(t1.has('b/c'))
 
1299
        self.failIf(t1.has('c'))
 
1300
 
 
1301
        t2 = t1.clone('b')
 
1302
        self.assertEqual(t1.base + 'b/', t2.base)
 
1303
 
 
1304
        self.failUnless(t2.has('c'))
 
1305
        self.failIf(t2.has('a'))
 
1306
 
 
1307
        t3 = t2.clone('..')
 
1308
        self.failUnless(t3.has('a'))
 
1309
        self.failIf(t3.has('c'))
 
1310
 
 
1311
        self.failIf(t1.has('b/d'))
 
1312
        self.failIf(t2.has('d'))
 
1313
        self.failIf(t3.has('b/d'))
 
1314
 
 
1315
        if t1.is_readonly():
 
1316
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1317
        else:
 
1318
            t2.put_bytes('d', 'newfile\n')
 
1319
 
 
1320
        self.failUnless(t1.has('b/d'))
 
1321
        self.failUnless(t2.has('d'))
 
1322
        self.failUnless(t3.has('b/d'))
 
1323
 
 
1324
    def test_clone_to_root(self):
 
1325
        orig_transport = self.get_transport()
 
1326
        # Repeatedly go up to a parent directory until we're at the root
 
1327
        # directory of this transport
 
1328
        root_transport = orig_transport
 
1329
        new_transport = root_transport.clone("..")
 
1330
        # as we are walking up directories, the path must be
 
1331
        # growing less, except at the top
 
1332
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1333
            or new_transport.base == root_transport.base)
 
1334
        while new_transport.base != root_transport.base:
 
1335
            root_transport = new_transport
 
1336
            new_transport = root_transport.clone("..")
 
1337
            # as we are walking up directories, the path must be
 
1338
            # growing less, except at the top
 
1339
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1340
                or new_transport.base == root_transport.base)
 
1341
 
 
1342
        # Cloning to "/" should take us to exactly the same location.
 
1343
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1344
        # the abspath of "/" from the original transport should be the same
 
1345
        # as the base at the root:
 
1346
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1347
 
 
1348
        # At the root, the URL must still end with / as its a directory
 
1349
        self.assertEqual(root_transport.base[-1], '/')
 
1350
 
 
1351
    def test_clone_from_root(self):
 
1352
        """At the root, cloning to a simple dir should just do string append."""
 
1353
        orig_transport = self.get_transport()
 
1354
        root_transport = orig_transport.clone('/')
 
1355
        self.assertEqual(root_transport.base + '.bzr/',
 
1356
            root_transport.clone('.bzr').base)
 
1357
 
 
1358
    def test_base_url(self):
 
1359
        t = self.get_transport()
 
1360
        self.assertEqual('/', t.base[-1])
 
1361
 
 
1362
    def test_relpath(self):
 
1363
        t = self.get_transport()
 
1364
        self.assertEqual('', t.relpath(t.base))
 
1365
        # base ends with /
 
1366
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1367
        # subdirs which don't exist should still give relpaths.
 
1368
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1369
        # trailing slash should be the same.
 
1370
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1371
 
 
1372
    def test_relpath_at_root(self):
 
1373
        t = self.get_transport()
 
1374
        # clone all the way to the top
 
1375
        new_transport = t.clone('..')
 
1376
        while new_transport.base != t.base:
 
1377
            t = new_transport
 
1378
            new_transport = t.clone('..')
 
1379
        # we must be able to get a relpath below the root
 
1380
        self.assertEqual('', t.relpath(t.base))
 
1381
        # and a deeper one should work too
 
1382
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1383
 
 
1384
    def test_abspath(self):
 
1385
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1386
        # that have aliasing problems like symlinks should go in backend
 
1387
        # specific test cases.
 
1388
        transport = self.get_transport()
 
1389
 
 
1390
        self.assertEqual(transport.base + 'relpath',
 
1391
                         transport.abspath('relpath'))
 
1392
 
 
1393
        # This should work without raising an error.
 
1394
        transport.abspath("/")
 
1395
 
 
1396
        # the abspath of "/" and "/foo/.." should result in the same location
 
1397
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1398
 
 
1399
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1400
                         transport.abspath("/foo"))
 
1401
 
 
1402
    def test_win32_abspath(self):
 
1403
        # Note: we tried to set sys.platform='win32' so we could test on
 
1404
        # other platforms too, but then osutils does platform specific
 
1405
        # things at import time which defeated us...
 
1406
        if sys.platform != 'win32':
 
1407
            raise TestSkipped(
 
1408
                'Testing drive letters in abspath implemented only for win32')
 
1409
 
 
1410
        # smoke test for abspath on win32.
 
1411
        # a transport based on 'file:///' never fully qualifies the drive.
 
1412
        transport = get_transport("file:///")
 
1413
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1414
 
 
1415
        # but a transport that starts with a drive spec must keep it.
 
1416
        transport = get_transport("file:///C:/")
 
1417
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1418
 
 
1419
    def test_local_abspath(self):
 
1420
        transport = self.get_transport()
 
1421
        try:
 
1422
            p = transport.local_abspath('.')
 
1423
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1424
            # should be formattable
 
1425
            s = str(e)
 
1426
        else:
 
1427
            self.assertEqual(getcwd(), p)
 
1428
 
 
1429
    def test_abspath_at_root(self):
 
1430
        t = self.get_transport()
 
1431
        # clone all the way to the top
 
1432
        new_transport = t.clone('..')
 
1433
        while new_transport.base != t.base:
 
1434
            t = new_transport
 
1435
            new_transport = t.clone('..')
 
1436
        # we must be able to get a abspath of the root when we ask for
 
1437
        # t.abspath('..') - this due to our choice that clone('..')
 
1438
        # should return the root from the root, combined with the desire that
 
1439
        # the url from clone('..') and from abspath('..') should be the same.
 
1440
        self.assertEqual(t.base, t.abspath('..'))
 
1441
        # '' should give us the root
 
1442
        self.assertEqual(t.base, t.abspath(''))
 
1443
        # and a path should append to the url
 
1444
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1445
 
 
1446
    def test_iter_files_recursive(self):
 
1447
        transport = self.get_transport()
 
1448
        if not transport.listable():
 
1449
            self.assertRaises(TransportNotPossible,
 
1450
                              transport.iter_files_recursive)
 
1451
            return
 
1452
        self.build_tree(['isolated/',
 
1453
                         'isolated/dir/',
 
1454
                         'isolated/dir/foo',
 
1455
                         'isolated/dir/bar',
 
1456
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1457
                         'isolated/bar'],
 
1458
                        transport=transport)
 
1459
        paths = set(transport.iter_files_recursive())
 
1460
        # nb the directories are not converted
 
1461
        self.assertEqual(paths,
 
1462
                    set(['isolated/dir/foo',
 
1463
                         'isolated/dir/bar',
 
1464
                         'isolated/dir/b%2525z',
 
1465
                         'isolated/bar']))
 
1466
        sub_transport = transport.clone('isolated')
 
1467
        paths = set(sub_transport.iter_files_recursive())
 
1468
        self.assertEqual(paths,
 
1469
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1470
 
 
1471
    def test_copy_tree(self):
 
1472
        # TODO: test file contents and permissions are preserved. This test was
 
1473
        # added just to ensure that quoting was handled correctly.
 
1474
        # -- David Allouche 2006-08-11
 
1475
        transport = self.get_transport()
 
1476
        if not transport.listable():
 
1477
            self.assertRaises(TransportNotPossible,
 
1478
                              transport.iter_files_recursive)
 
1479
            return
 
1480
        if transport.is_readonly():
 
1481
            return
 
1482
        self.build_tree(['from/',
 
1483
                         'from/dir/',
 
1484
                         'from/dir/foo',
 
1485
                         'from/dir/bar',
 
1486
                         'from/dir/b%25z', # make sure quoting is correct
 
1487
                         'from/bar'],
 
1488
                        transport=transport)
 
1489
        transport.copy_tree('from', 'to')
 
1490
        paths = set(transport.iter_files_recursive())
 
1491
        self.assertEqual(paths,
 
1492
                    set(['from/dir/foo',
 
1493
                         'from/dir/bar',
 
1494
                         'from/dir/b%2525z',
 
1495
                         'from/bar',
 
1496
                         'to/dir/foo',
 
1497
                         'to/dir/bar',
 
1498
                         'to/dir/b%2525z',
 
1499
                         'to/bar',]))
 
1500
 
 
1501
    def test_copy_tree_to_transport(self):
 
1502
        transport = self.get_transport()
 
1503
        if not transport.listable():
 
1504
            self.assertRaises(TransportNotPossible,
 
1505
                              transport.iter_files_recursive)
 
1506
            return
 
1507
        if transport.is_readonly():
 
1508
            return
 
1509
        self.build_tree(['from/',
 
1510
                         'from/dir/',
 
1511
                         'from/dir/foo',
 
1512
                         'from/dir/bar',
 
1513
                         'from/dir/b%25z', # make sure quoting is correct
 
1514
                         'from/bar'],
 
1515
                        transport=transport)
 
1516
        from_transport = transport.clone('from')
 
1517
        to_transport = transport.clone('to')
 
1518
        to_transport.ensure_base()
 
1519
        from_transport.copy_tree_to_transport(to_transport)
 
1520
        paths = set(transport.iter_files_recursive())
 
1521
        self.assertEqual(paths,
 
1522
                    set(['from/dir/foo',
 
1523
                         'from/dir/bar',
 
1524
                         'from/dir/b%2525z',
 
1525
                         'from/bar',
 
1526
                         'to/dir/foo',
 
1527
                         'to/dir/bar',
 
1528
                         'to/dir/b%2525z',
 
1529
                         'to/bar',]))
 
1530
 
 
1531
    def test_unicode_paths(self):
 
1532
        """Test that we can read/write files with Unicode names."""
 
1533
        t = self.get_transport()
 
1534
 
 
1535
        # With FAT32 and certain encodings on win32
 
1536
        # '\xe5' and '\xe4' actually map to the same file
 
1537
        # adding a suffix kicks in the 'preserving but insensitive'
 
1538
        # route, and maintains the right files
 
1539
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1540
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1541
                 u'\u017d', # Z with umlat iso-8859-2
 
1542
                 u'\u062c', # Arabic j
 
1543
                 u'\u0410', # Russian A
 
1544
                 u'\u65e5', # Kanji person
 
1545
                ]
 
1546
 
 
1547
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1548
        if no_unicode_support:
 
1549
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1550
 
 
1551
        try:
 
1552
            self.build_tree(files, transport=t, line_endings='binary')
 
1553
        except UnicodeError:
 
1554
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1555
 
 
1556
        # A plain unicode string is not a valid url
 
1557
        for fname in files:
 
1558
            self.assertRaises(InvalidURL, t.get, fname)
 
1559
 
 
1560
        for fname in files:
 
1561
            fname_utf8 = fname.encode('utf-8')
 
1562
            contents = 'contents of %s\n' % (fname_utf8,)
 
1563
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1564
 
 
1565
    def test_connect_twice_is_same_content(self):
 
1566
        # check that our server (whatever it is) is accessible reliably
 
1567
        # via get_transport and multiple connections share content.
 
1568
        transport = self.get_transport()
 
1569
        if transport.is_readonly():
 
1570
            return
 
1571
        transport.put_bytes('foo', 'bar')
 
1572
        transport3 = self.get_transport()
 
1573
        self.check_transport_contents('bar', transport3, 'foo')
 
1574
 
 
1575
        # now opening at a relative url should give use a sane result:
 
1576
        transport.mkdir('newdir')
 
1577
        transport5 = self.get_transport('newdir')
 
1578
        transport6 = transport5.clone('..')
 
1579
        self.check_transport_contents('bar', transport6, 'foo')
 
1580
 
 
1581
    def test_lock_write(self):
 
1582
        """Test transport-level write locks.
 
1583
 
 
1584
        These are deprecated and transports may decline to support them.
 
1585
        """
 
1586
        transport = self.get_transport()
 
1587
        if transport.is_readonly():
 
1588
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1589
            return
 
1590
        transport.put_bytes('lock', '')
 
1591
        try:
 
1592
            lock = transport.lock_write('lock')
 
1593
        except TransportNotPossible:
 
1594
            return
 
1595
        # TODO make this consistent on all platforms:
 
1596
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1597
        lock.unlock()
 
1598
 
 
1599
    def test_lock_read(self):
 
1600
        """Test transport-level read locks.
 
1601
 
 
1602
        These are deprecated and transports may decline to support them.
 
1603
        """
 
1604
        transport = self.get_transport()
 
1605
        if transport.is_readonly():
 
1606
            file('lock', 'w').close()
 
1607
        else:
 
1608
            transport.put_bytes('lock', '')
 
1609
        try:
 
1610
            lock = transport.lock_read('lock')
 
1611
        except TransportNotPossible:
 
1612
            return
 
1613
        # TODO make this consistent on all platforms:
 
1614
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1615
        lock.unlock()
 
1616
 
 
1617
    def test_readv(self):
 
1618
        transport = self.get_transport()
 
1619
        if transport.is_readonly():
 
1620
            file('a', 'w').write('0123456789')
 
1621
        else:
 
1622
            transport.put_bytes('a', '0123456789')
 
1623
 
 
1624
        d = list(transport.readv('a', ((0, 1),)))
 
1625
        self.assertEqual(d[0], (0, '0'))
 
1626
 
 
1627
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1628
        self.assertEqual(d[0], (0, '0'))
 
1629
        self.assertEqual(d[1], (1, '1'))
 
1630
        self.assertEqual(d[2], (3, '34'))
 
1631
        self.assertEqual(d[3], (9, '9'))
 
1632
 
 
1633
    def test_readv_out_of_order(self):
 
1634
        transport = self.get_transport()
 
1635
        if transport.is_readonly():
 
1636
            file('a', 'w').write('0123456789')
 
1637
        else:
 
1638
            transport.put_bytes('a', '01234567890')
 
1639
 
 
1640
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1641
        self.assertEqual(d[0], (1, '1'))
 
1642
        self.assertEqual(d[1], (9, '9'))
 
1643
        self.assertEqual(d[2], (0, '0'))
 
1644
        self.assertEqual(d[3], (3, '34'))
 
1645
 
 
1646
    def test_readv_with_adjust_for_latency(self):
 
1647
        transport = self.get_transport()
 
1648
        # the adjust for latency flag expands the data region returned
 
1649
        # according to a per-transport heuristic, so testing is a little
 
1650
        # tricky as we need more data than the largest combining that our
 
1651
        # transports do. To accomodate this we generate random data and cross
 
1652
        # reference the returned data with the random data. To avoid doing
 
1653
        # multiple large random byte look ups we do several tests on the same
 
1654
        # backing data.
 
1655
        content = osutils.rand_bytes(200*1024)
 
1656
        content_size = len(content)
 
1657
        if transport.is_readonly():
 
1658
            self.build_tree_contents([('a', content)])
 
1659
        else:
 
1660
            transport.put_bytes('a', content)
 
1661
        def check_result_data(result_vector):
 
1662
            for item in result_vector:
 
1663
                data_len = len(item[1])
 
1664
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1665
 
 
1666
        # start corner case
 
1667
        result = list(transport.readv('a', ((0, 30),),
 
1668
            adjust_for_latency=True, upper_limit=content_size))
 
1669
        # we expect 1 result, from 0, to something > 30
 
1670
        self.assertEqual(1, len(result))
 
1671
        self.assertEqual(0, result[0][0])
 
1672
        self.assertTrue(len(result[0][1]) >= 30)
 
1673
        check_result_data(result)
 
1674
        # end of file corner case
 
1675
        result = list(transport.readv('a', ((204700, 100),),
 
1676
            adjust_for_latency=True, upper_limit=content_size))
 
1677
        # we expect 1 result, from 204800- its length, to the end
 
1678
        self.assertEqual(1, len(result))
 
1679
        data_len = len(result[0][1])
 
1680
        self.assertEqual(204800-data_len, result[0][0])
 
1681
        self.assertTrue(data_len >= 100)
 
1682
        check_result_data(result)
 
1683
        # out of order ranges are made in order
 
1684
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1685
            adjust_for_latency=True, upper_limit=content_size))
 
1686
        # we expect 2 results, in order, start and end.
 
1687
        self.assertEqual(2, len(result))
 
1688
        # start
 
1689
        data_len = len(result[0][1])
 
1690
        self.assertEqual(0, result[0][0])
 
1691
        self.assertTrue(data_len >= 30)
 
1692
        # end
 
1693
        data_len = len(result[1][1])
 
1694
        self.assertEqual(204800-data_len, result[1][0])
 
1695
        self.assertTrue(data_len >= 100)
 
1696
        check_result_data(result)
 
1697
        # close ranges get combined (even if out of order)
 
1698
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1699
            result = list(transport.readv('a', request_vector,
 
1700
                adjust_for_latency=True, upper_limit=content_size))
 
1701
            self.assertEqual(1, len(result))
 
1702
            data_len = len(result[0][1])
 
1703
            # minimum length is from 400 to 1034 - 634
 
1704
            self.assertTrue(data_len >= 634)
 
1705
            # must contain the region 400 to 1034
 
1706
            self.assertTrue(result[0][0] <= 400)
 
1707
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1708
            check_result_data(result)
 
1709
 
 
1710
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1711
        transport = self.get_transport()
 
1712
        # test from observed failure case.
 
1713
        if transport.is_readonly():
 
1714
            file('a', 'w').write('a'*1024*1024)
 
1715
        else:
 
1716
            transport.put_bytes('a', 'a'*1024*1024)
 
1717
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1718
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1719
            (465373, 800), (947422, 800)]
 
1720
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1721
        found_items = [False]*9
 
1722
        for pos, (start, length) in enumerate(broken_vector):
 
1723
            # check the range is covered by the result
 
1724
            for offset, data in results:
 
1725
                if offset <= start and start + length <= offset + len(data):
 
1726
                    found_items[pos] = True
 
1727
        self.assertEqual([True]*9, found_items)
 
1728
 
 
1729
    def test_get_with_open_write_stream_sees_all_content(self):
 
1730
        t = self.get_transport()
 
1731
        if t.is_readonly():
 
1732
            return
 
1733
        handle = t.open_write_stream('foo')
 
1734
        try:
 
1735
            handle.write('bcd')
 
1736
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1737
        finally:
 
1738
            handle.close()
 
1739
 
 
1740
    def test_get_smart_medium(self):
 
1741
        """All transports must either give a smart medium, or know they can't.
 
1742
        """
 
1743
        transport = self.get_transport()
 
1744
        try:
 
1745
            client_medium = transport.get_smart_medium()
 
1746
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1747
        except errors.NoSmartMedium:
 
1748
            # as long as we got it we're fine
 
1749
            pass
 
1750
 
 
1751
    def test_readv_short_read(self):
 
1752
        transport = self.get_transport()
 
1753
        if transport.is_readonly():
 
1754
            file('a', 'w').write('0123456789')
 
1755
        else:
 
1756
            transport.put_bytes('a', '01234567890')
 
1757
 
 
1758
        # This is intentionally reading off the end of the file
 
1759
        # since we are sure that it cannot get there
 
1760
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1761
                               # Can be raised by paramiko
 
1762
                               AssertionError),
 
1763
                              transport.readv, 'a', [(1,1), (8,10)])
 
1764
 
 
1765
        # This is trying to seek past the end of the file, it should
 
1766
        # also raise a special error
 
1767
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1768
                              transport.readv, 'a', [(12,2)])
 
1769
 
 
1770
    def test_stat_symlink(self):
 
1771
        # if a transport points directly to a symlink (and supports symlinks
 
1772
        # at all) you can tell this.  helps with bug 32669.
 
1773
        t = self.get_transport()
 
1774
        try:
 
1775
            t.symlink('target', 'link')
 
1776
        except TransportNotPossible:
 
1777
            raise TestSkipped("symlinks not supported")
 
1778
        t2 = t.clone('link')
 
1779
        st = t2.stat('')
 
1780
        self.assertTrue(stat.S_ISLNK(st.st_mode))