~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Martin Pool
  • Date: 2005-08-25 05:58:05 UTC
  • mfrom: (974.1.36)
  • Revision ID: mbp@sourcefrog.net-20050825055805-8c892bc3c2d75131
- merge aaron's merge improvements:

  * When merging, pull in all missing revisions from the source
    branch. 

  * Detect common ancestors by looking at the whole ancestry graph, 
    rather than just mainline history.

  Some changes to reconcile this with parallel updates to the test and
  trace code.

aaron.bentley@utoronto.ca-20050823052551-f3401a8b57d9126f

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for Transport implementations.
18
 
 
19
 
Transport implementations tested here are supplied by
20
 
TransportTestProviderAdapter.
21
 
"""
22
 
 
23
 
import itertools
24
 
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
 
import stat
28
 
import sys
29
 
import unittest
30
 
 
31
 
from bzrlib import (
32
 
    errors,
33
 
    osutils,
34
 
    tests,
35
 
    urlutils,
36
 
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
 
    TestSkipped,
52
 
    TestNotApplicable,
53
 
    multiply_tests,
54
 
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
58
 
    ConnectedTransport,
59
 
    get_transport,
60
 
    _get_transport_modules,
61
 
    )
62
 
from bzrlib.transport.memory import MemoryTransport
63
 
 
64
 
 
65
 
def get_transport_test_permutations(module):
66
 
    """Get the permutations module wants to have tested."""
67
 
    if getattr(module, 'get_test_permutations', None) is None:
68
 
        raise AssertionError(
69
 
            "transport module %s doesn't provide get_test_permutations()"
70
 
            % module.__name__)
71
 
        return []
72
 
    return module.get_test_permutations()
73
 
 
74
 
 
75
 
def transport_test_permutations():
76
 
    """Return a list of the klass, server_factory pairs to test."""
77
 
    result = []
78
 
    for module in _get_transport_modules():
79
 
        try:
80
 
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
82
 
            for (klass, server_factory) in permutations:
83
 
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
 
                    {"transport_class":klass,
85
 
                     "transport_server":server_factory})
86
 
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
88
 
            # Continue even if a dependency prevents us
89
 
            # from adding this test
90
 
            pass
91
 
    return result
92
 
 
93
 
 
94
 
def load_tests(standard_tests, module, loader):
95
 
    """Multiply tests for tranport implementations."""
96
 
    result = loader.suiteClass()
97
 
    scenarios = transport_test_permutations()
98
 
    return multiply_tests(standard_tests, scenarios, result)
99
 
 
100
 
 
101
 
class TransportTests(TestTransportImplementation):
102
 
 
103
 
    def setUp(self):
104
 
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
106
 
 
107
 
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
110
 
 
111
 
    def test_ensure_base_missing(self):
112
 
        """.ensure_base() should create the directory if it doesn't exist"""
113
 
        t = self.get_transport()
114
 
        t_a = t.clone('a')
115
 
        if t_a.is_readonly():
116
 
            self.assertRaises(TransportNotPossible,
117
 
                              t_a.ensure_base)
118
 
            return
119
 
        self.assertTrue(t_a.ensure_base())
120
 
        self.assertTrue(t.has('a'))
121
 
 
122
 
    def test_ensure_base_exists(self):
123
 
        """.ensure_base() should just be happy if it already exists"""
124
 
        t = self.get_transport()
125
 
        if t.is_readonly():
126
 
            return
127
 
 
128
 
        t.mkdir('a')
129
 
        t_a = t.clone('a')
130
 
        # ensure_base returns False if it didn't create the base
131
 
        self.assertFalse(t_a.ensure_base())
132
 
 
133
 
    def test_ensure_base_missing_parent(self):
134
 
        """.ensure_base() will fail if the parent dir doesn't exist"""
135
 
        t = self.get_transport()
136
 
        if t.is_readonly():
137
 
            return
138
 
 
139
 
        t_a = t.clone('a')
140
 
        t_b = t_a.clone('b')
141
 
        self.assertRaises(NoSuchFile, t_b.ensure_base)
142
 
 
143
 
    def test_external_url(self):
144
 
        """.external_url either works or raises InProcessTransport."""
145
 
        t = self.get_transport()
146
 
        try:
147
 
            t.external_url()
148
 
        except errors.InProcessTransport:
149
 
            pass
150
 
 
151
 
    def test_has(self):
152
 
        t = self.get_transport()
153
 
 
154
 
        files = ['a', 'b', 'e', 'g', '%']
155
 
        self.build_tree(files, transport=t)
156
 
        self.assertEqual(True, t.has('a'))
157
 
        self.assertEqual(False, t.has('c'))
158
 
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
 
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
 
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
 
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
 
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
 
 
173
 
    def test_has_root_works(self):
174
 
        if self.transport_server is test_server.SmartTCPServer_for_testing:
175
 
            raise TestNotApplicable(
176
 
                "SmartTCPServer_for_testing intentionally does not allow "
177
 
                "access to /.")
178
 
        current_transport = self.get_transport()
179
 
        self.assertTrue(current_transport.has('/'))
180
 
        root = current_transport.clone('/')
181
 
        self.assertTrue(root.has(''))
182
 
 
183
 
    def test_get(self):
184
 
        t = self.get_transport()
185
 
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
205
 
 
206
 
    def test_get_unknown_file(self):
207
 
        t = self.get_transport()
208
 
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
211
 
                    ]
212
 
        self.build_tree(files, transport=t, line_endings='binary')
213
 
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
216
 
 
217
 
    def test_get_directory_read_gives_ReadError(self):
218
 
        """consistent errors for read() on a file returned by get()."""
219
 
        t = self.get_transport()
220
 
        if t.is_readonly():
221
 
            self.build_tree(['a directory/'])
222
 
        else:
223
 
            t.mkdir('a%20directory')
224
 
        # getting the file must either work or fail with a PathError
225
 
        try:
226
 
            a_file = t.get('a%20directory')
227
 
        except (errors.PathError, errors.RedirectRequested):
228
 
            # early failure return immediately.
229
 
            return
230
 
        # having got a file, read() must either work (i.e. http reading a dir
231
 
        # listing) or fail with ReadError
232
 
        try:
233
 
            a_file.read()
234
 
        except errors.ReadError:
235
 
            pass
236
 
 
237
 
    def test_get_bytes(self):
238
 
        t = self.get_transport()
239
 
 
240
 
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
245
 
                    ]
246
 
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
248
 
 
249
 
        for content, fname in zip(contents, files):
250
 
            self.assertEqual(content, t.get_bytes(fname))
251
 
 
252
 
    def test_get_bytes_unknown_file(self):
253
 
        t = self.get_transport()
254
 
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
255
 
 
256
 
    def test_get_with_open_write_stream_sees_all_content(self):
257
 
        t = self.get_transport()
258
 
        if t.is_readonly():
259
 
            return
260
 
        handle = t.open_write_stream('foo')
261
 
        try:
262
 
            handle.write('b')
263
 
            self.assertEqual('b', t.get('foo').read())
264
 
        finally:
265
 
            handle.close()
266
 
 
267
 
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
268
 
        t = self.get_transport()
269
 
        if t.is_readonly():
270
 
            return
271
 
        handle = t.open_write_stream('foo')
272
 
        try:
273
 
            handle.write('b')
274
 
            self.assertEqual('b', t.get_bytes('foo'))
275
 
            self.assertEqual('b', t.get('foo').read())
276
 
        finally:
277
 
            handle.close()
278
 
 
279
 
    def test_put_bytes(self):
280
 
        t = self.get_transport()
281
 
 
282
 
        if t.is_readonly():
283
 
            self.assertRaises(TransportNotPossible,
284
 
                    t.put_bytes, 'a', 'some text for a\n')
285
 
            return
286
 
 
287
 
        t.put_bytes('a', 'some text for a\n')
288
 
        self.failUnless(t.has('a'))
289
 
        self.check_transport_contents('some text for a\n', t, 'a')
290
 
 
291
 
        # The contents should be overwritten
292
 
        t.put_bytes('a', 'new text for a\n')
293
 
        self.check_transport_contents('new text for a\n', t, 'a')
294
 
 
295
 
        self.assertRaises(NoSuchFile,
296
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
297
 
 
298
 
    def test_put_bytes_non_atomic(self):
299
 
        t = self.get_transport()
300
 
 
301
 
        if t.is_readonly():
302
 
            self.assertRaises(TransportNotPossible,
303
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
304
 
            return
305
 
 
306
 
        self.failIf(t.has('a'))
307
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
308
 
        self.failUnless(t.has('a'))
309
 
        self.check_transport_contents('some text for a\n', t, 'a')
310
 
        # Put also replaces contents
311
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
312
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
313
 
 
314
 
        # Make sure we can create another file
315
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
316
 
        # And overwrite 'a' with empty contents
317
 
        t.put_bytes_non_atomic('a', '')
318
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
319
 
        self.check_transport_contents('', t, 'a')
320
 
 
321
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
322
 
                                       'contents\n')
323
 
        # Now test the create_parent flag
324
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
325
 
                                       'contents\n')
326
 
        self.failIf(t.has('dir/a'))
327
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
 
                               create_parent_dir=True)
329
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
330
 
 
331
 
        # But we still get NoSuchFile if we can't make the parent dir
332
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
333
 
                                       'contents\n',
334
 
                                       create_parent_dir=True)
335
 
 
336
 
    def test_put_bytes_permissions(self):
337
 
        t = self.get_transport()
338
 
 
339
 
        if t.is_readonly():
340
 
            return
341
 
        if not t._can_roundtrip_unix_modebits():
342
 
            # Can't roundtrip, so no need to run this test
343
 
            return
344
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
345
 
        self.assertTransportMode(t, 'mode644', 0644)
346
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
347
 
        self.assertTransportMode(t, 'mode666', 0666)
348
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
349
 
        self.assertTransportMode(t, 'mode600', 0600)
350
 
        # Yes, you can put_bytes a file such that it becomes readonly
351
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
352
 
        self.assertTransportMode(t, 'mode400', 0400)
353
 
 
354
 
        # The default permissions should be based on the current umask
355
 
        umask = osutils.get_umask()
356
 
        t.put_bytes('nomode', 'test text\n', mode=None)
357
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
358
 
 
359
 
    def test_put_bytes_non_atomic_permissions(self):
360
 
        t = self.get_transport()
361
 
 
362
 
        if t.is_readonly():
363
 
            return
364
 
        if not t._can_roundtrip_unix_modebits():
365
 
            # Can't roundtrip, so no need to run this test
366
 
            return
367
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
368
 
        self.assertTransportMode(t, 'mode644', 0644)
369
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
370
 
        self.assertTransportMode(t, 'mode666', 0666)
371
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
372
 
        self.assertTransportMode(t, 'mode600', 0600)
373
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
374
 
        self.assertTransportMode(t, 'mode400', 0400)
375
 
 
376
 
        # The default permissions should be based on the current umask
377
 
        umask = osutils.get_umask()
378
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
379
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
380
 
 
381
 
        # We should also be able to set the mode for a parent directory
382
 
        # when it is created
383
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
384
 
                               dir_mode=0700, create_parent_dir=True)
385
 
        self.assertTransportMode(t, 'dir700', 0700)
386
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
387
 
                               dir_mode=0770, create_parent_dir=True)
388
 
        self.assertTransportMode(t, 'dir770', 0770)
389
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
390
 
                               dir_mode=0777, create_parent_dir=True)
391
 
        self.assertTransportMode(t, 'dir777', 0777)
392
 
 
393
 
    def test_put_file(self):
394
 
        t = self.get_transport()
395
 
 
396
 
        if t.is_readonly():
397
 
            self.assertRaises(TransportNotPossible,
398
 
                    t.put_file, 'a', StringIO('some text for a\n'))
399
 
            return
400
 
 
401
 
        result = t.put_file('a', StringIO('some text for a\n'))
402
 
        # put_file returns the length of the data written
403
 
        self.assertEqual(16, result)
404
 
        self.failUnless(t.has('a'))
405
 
        self.check_transport_contents('some text for a\n', t, 'a')
406
 
        # Put also replaces contents
407
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
408
 
        self.assertEqual(19, result)
409
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
410
 
        self.assertRaises(NoSuchFile,
411
 
                          t.put_file, 'path/doesnt/exist/c',
412
 
                              StringIO('contents'))
413
 
 
414
 
    def test_put_file_non_atomic(self):
415
 
        t = self.get_transport()
416
 
 
417
 
        if t.is_readonly():
418
 
            self.assertRaises(TransportNotPossible,
419
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
420
 
            return
421
 
 
422
 
        self.failIf(t.has('a'))
423
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
 
        self.failUnless(t.has('a'))
425
 
        self.check_transport_contents('some text for a\n', t, 'a')
426
 
        # Put also replaces contents
427
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
428
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
429
 
 
430
 
        # Make sure we can create another file
431
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
432
 
        # And overwrite 'a' with empty contents
433
 
        t.put_file_non_atomic('a', StringIO(''))
434
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
435
 
        self.check_transport_contents('', t, 'a')
436
 
 
437
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
438
 
                                       StringIO('contents\n'))
439
 
        # Now test the create_parent flag
440
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
 
                                       StringIO('contents\n'))
442
 
        self.failIf(t.has('dir/a'))
443
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
 
                              create_parent_dir=True)
445
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
446
 
 
447
 
        # But we still get NoSuchFile if we can't make the parent dir
448
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
449
 
                                       StringIO('contents\n'),
450
 
                                       create_parent_dir=True)
451
 
 
452
 
    def test_put_file_permissions(self):
453
 
 
454
 
        t = self.get_transport()
455
 
 
456
 
        if t.is_readonly():
457
 
            return
458
 
        if not t._can_roundtrip_unix_modebits():
459
 
            # Can't roundtrip, so no need to run this test
460
 
            return
461
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
462
 
        self.assertTransportMode(t, 'mode644', 0644)
463
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
464
 
        self.assertTransportMode(t, 'mode666', 0666)
465
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
466
 
        self.assertTransportMode(t, 'mode600', 0600)
467
 
        # Yes, you can put a file such that it becomes readonly
468
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
469
 
        self.assertTransportMode(t, 'mode400', 0400)
470
 
        # The default permissions should be based on the current umask
471
 
        umask = osutils.get_umask()
472
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
473
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
474
 
 
475
 
    def test_put_file_non_atomic_permissions(self):
476
 
        t = self.get_transport()
477
 
 
478
 
        if t.is_readonly():
479
 
            return
480
 
        if not t._can_roundtrip_unix_modebits():
481
 
            # Can't roundtrip, so no need to run this test
482
 
            return
483
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
484
 
        self.assertTransportMode(t, 'mode644', 0644)
485
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
486
 
        self.assertTransportMode(t, 'mode666', 0666)
487
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
488
 
        self.assertTransportMode(t, 'mode600', 0600)
489
 
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
490
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
491
 
        self.assertTransportMode(t, 'mode400', 0400)
492
 
 
493
 
        # The default permissions should be based on the current umask
494
 
        umask = osutils.get_umask()
495
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
496
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
497
 
 
498
 
        # We should also be able to set the mode for a parent directory
499
 
        # when it is created
500
 
        sio = StringIO()
501
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
502
 
                              dir_mode=0700, create_parent_dir=True)
503
 
        self.assertTransportMode(t, 'dir700', 0700)
504
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
505
 
                              dir_mode=0770, create_parent_dir=True)
506
 
        self.assertTransportMode(t, 'dir770', 0770)
507
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
508
 
                              dir_mode=0777, create_parent_dir=True)
509
 
        self.assertTransportMode(t, 'dir777', 0777)
510
 
 
511
 
    def test_put_bytes_unicode(self):
512
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
513
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
514
 
        # (we don't want to encode unicode here at all, callers should be
515
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
516
 
        # compatibility.  At some point we should use a specific exception.
517
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
518
 
        t = self.get_transport()
519
 
        if t.is_readonly():
520
 
            return
521
 
        unicode_string = u'\u1234'
522
 
        self.assertRaises(
523
 
            (AssertionError, UnicodeEncodeError),
524
 
            t.put_bytes, 'foo', unicode_string)
525
 
 
526
 
    def test_put_file_unicode(self):
527
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
528
 
        # This situation can happen (and has) if code is careless about the type
529
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
530
 
        # cStringIO, because it never returns unicode from read.
531
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
532
 
        # raise, but we raise it for hysterical raisins.
533
 
        t = self.get_transport()
534
 
        if t.is_readonly():
535
 
            return
536
 
        unicode_file = pyStringIO(u'\u1234')
537
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
538
 
 
539
 
    def test_mkdir(self):
540
 
        t = self.get_transport()
541
 
 
542
 
        if t.is_readonly():
543
 
            # cannot mkdir on readonly transports. We're not testing for
544
 
            # cache coherency because cache behaviour is not currently
545
 
            # defined for the transport interface.
546
 
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
547
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
548
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
549
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
550
 
            return
551
 
        # Test mkdir
552
 
        t.mkdir('dir_a')
553
 
        self.assertEqual(t.has('dir_a'), True)
554
 
        self.assertEqual(t.has('dir_b'), False)
555
 
 
556
 
        t.mkdir('dir_b')
557
 
        self.assertEqual(t.has('dir_b'), True)
558
 
 
559
 
        t.mkdir_multi(['dir_c', 'dir_d'])
560
 
 
561
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
562
 
        self.assertEqual(list(t.has_multi(
563
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
564
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
565
 
            [True, True, True, False,
566
 
             True, True, True, True])
567
 
 
568
 
        # we were testing that a local mkdir followed by a transport
569
 
        # mkdir failed thusly, but given that we * in one process * do not
570
 
        # concurrently fiddle with disk dirs and then use transport to do
571
 
        # things, the win here seems marginal compared to the constraint on
572
 
        # the interface. RBC 20051227
573
 
        t.mkdir('dir_g')
574
 
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
575
 
 
576
 
        # Test get/put in sub-directories
577
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
578
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
579
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
580
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
581
 
 
582
 
        # mkdir of a dir with an absent parent
583
 
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
584
 
 
585
 
    def test_mkdir_permissions(self):
586
 
        t = self.get_transport()
587
 
        if t.is_readonly():
588
 
            return
589
 
        if not t._can_roundtrip_unix_modebits():
590
 
            # no sense testing on this transport
591
 
            return
592
 
        # Test mkdir with a mode
593
 
        t.mkdir('dmode755', mode=0755)
594
 
        self.assertTransportMode(t, 'dmode755', 0755)
595
 
        t.mkdir('dmode555', mode=0555)
596
 
        self.assertTransportMode(t, 'dmode555', 0555)
597
 
        t.mkdir('dmode777', mode=0777)
598
 
        self.assertTransportMode(t, 'dmode777', 0777)
599
 
        t.mkdir('dmode700', mode=0700)
600
 
        self.assertTransportMode(t, 'dmode700', 0700)
601
 
        t.mkdir_multi(['mdmode755'], mode=0755)
602
 
        self.assertTransportMode(t, 'mdmode755', 0755)
603
 
 
604
 
        # Default mode should be based on umask
605
 
        umask = osutils.get_umask()
606
 
        t.mkdir('dnomode', mode=None)
607
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
608
 
 
609
 
    def test_opening_a_file_stream_creates_file(self):
610
 
        t = self.get_transport()
611
 
        if t.is_readonly():
612
 
            return
613
 
        handle = t.open_write_stream('foo')
614
 
        try:
615
 
            self.assertEqual('', t.get_bytes('foo'))
616
 
        finally:
617
 
            handle.close()
618
 
 
619
 
    def test_opening_a_file_stream_can_set_mode(self):
620
 
        t = self.get_transport()
621
 
        if t.is_readonly():
622
 
            return
623
 
        if not t._can_roundtrip_unix_modebits():
624
 
            # Can't roundtrip, so no need to run this test
625
 
            return
626
 
        def check_mode(name, mode, expected):
627
 
            handle = t.open_write_stream(name, mode=mode)
628
 
            handle.close()
629
 
            self.assertTransportMode(t, name, expected)
630
 
        check_mode('mode644', 0644, 0644)
631
 
        check_mode('mode666', 0666, 0666)
632
 
        check_mode('mode600', 0600, 0600)
633
 
        # The default permissions should be based on the current umask
634
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
635
 
 
636
 
    def test_copy_to(self):
637
 
        # FIXME: test:   same server to same server (partly done)
638
 
        # same protocol two servers
639
 
        # and    different protocols (done for now except for MemoryTransport.
640
 
        # - RBC 20060122
641
 
 
642
 
        def simple_copy_files(transport_from, transport_to):
643
 
            files = ['a', 'b', 'c', 'd']
644
 
            self.build_tree(files, transport=transport_from)
645
 
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
646
 
            for f in files:
647
 
                self.check_transport_contents(transport_to.get(f).read(),
648
 
                                              transport_from, f)
649
 
 
650
 
        t = self.get_transport()
651
 
        temp_transport = MemoryTransport('memory:///')
652
 
        simple_copy_files(t, temp_transport)
653
 
        if not t.is_readonly():
654
 
            t.mkdir('copy_to_simple')
655
 
            t2 = t.clone('copy_to_simple')
656
 
            simple_copy_files(t, t2)
657
 
 
658
 
 
659
 
        # Test that copying into a missing directory raises
660
 
        # NoSuchFile
661
 
        if t.is_readonly():
662
 
            self.build_tree(['e/', 'e/f'])
663
 
        else:
664
 
            t.mkdir('e')
665
 
            t.put_bytes('e/f', 'contents of e')
666
 
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
667
 
        temp_transport.mkdir('e')
668
 
        t.copy_to(['e/f'], temp_transport)
669
 
 
670
 
        del temp_transport
671
 
        temp_transport = MemoryTransport('memory:///')
672
 
 
673
 
        files = ['a', 'b', 'c', 'd']
674
 
        t.copy_to(iter(files), temp_transport)
675
 
        for f in files:
676
 
            self.check_transport_contents(temp_transport.get(f).read(),
677
 
                                          t, f)
678
 
        del temp_transport
679
 
 
680
 
        for mode in (0666, 0644, 0600, 0400):
681
 
            temp_transport = MemoryTransport("memory:///")
682
 
            t.copy_to(files, temp_transport, mode=mode)
683
 
            for f in files:
684
 
                self.assertTransportMode(temp_transport, f, mode)
685
 
 
686
 
    def test_create_prefix(self):
687
 
        t = self.get_transport()
688
 
        sub = t.clone('foo').clone('bar')
689
 
        try:
690
 
            sub.create_prefix()
691
 
        except TransportNotPossible:
692
 
            self.assertTrue(t.is_readonly())
693
 
        else:
694
 
            self.assertTrue(t.has('foo/bar'))
695
 
 
696
 
    def test_append_file(self):
697
 
        t = self.get_transport()
698
 
 
699
 
        if t.is_readonly():
700
 
            self.assertRaises(TransportNotPossible,
701
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
702
 
            return
703
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
704
 
        t.put_bytes('b', 'contents\nfor b\n')
705
 
 
706
 
        self.assertEqual(20,
707
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
708
 
 
709
 
        self.check_transport_contents(
710
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
711
 
            t, 'a')
712
 
 
713
 
        # a file with no parent should fail..
714
 
        self.assertRaises(NoSuchFile,
715
 
                          t.append_file, 'missing/path', StringIO('content'))
716
 
 
717
 
        # And we can create new files, too
718
 
        self.assertEqual(0,
719
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
720
 
        self.check_transport_contents('some text\nfor a missing file\n',
721
 
                                      t, 'c')
722
 
 
723
 
    def test_append_bytes(self):
724
 
        t = self.get_transport()
725
 
 
726
 
        if t.is_readonly():
727
 
            self.assertRaises(TransportNotPossible,
728
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
729
 
            return
730
 
 
731
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
732
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
733
 
 
734
 
        self.assertEqual(20,
735
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
736
 
 
737
 
        self.check_transport_contents(
738
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
739
 
            t, 'a')
740
 
 
741
 
        # a file with no parent should fail..
742
 
        self.assertRaises(NoSuchFile,
743
 
                          t.append_bytes, 'missing/path', 'content')
744
 
 
745
 
    def test_append_multi(self):
746
 
        t = self.get_transport()
747
 
 
748
 
        if t.is_readonly():
749
 
            return
750
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
751
 
                         'add\nsome\nmore\ncontents\n')
752
 
        t.put_bytes('b', 'contents\nfor b\n')
753
 
 
754
 
        self.assertEqual((43, 15),
755
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
756
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
757
 
 
758
 
        self.check_transport_contents(
759
 
            'diff\ncontents for\na\n'
760
 
            'add\nsome\nmore\ncontents\n'
761
 
            'and\nthen\nsome\nmore\n',
762
 
            t, 'a')
763
 
        self.check_transport_contents(
764
 
                'contents\nfor b\n'
765
 
                'some\nmore\nfor\nb\n',
766
 
                t, 'b')
767
 
 
768
 
        self.assertEqual((62, 31),
769
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
770
 
                                 ('b', StringIO('from an iterator\n'))])))
771
 
        self.check_transport_contents(
772
 
            'diff\ncontents for\na\n'
773
 
            'add\nsome\nmore\ncontents\n'
774
 
            'and\nthen\nsome\nmore\n'
775
 
            'a little bit more\n',
776
 
            t, 'a')
777
 
        self.check_transport_contents(
778
 
                'contents\nfor b\n'
779
 
                'some\nmore\nfor\nb\n'
780
 
                'from an iterator\n',
781
 
                t, 'b')
782
 
 
783
 
        self.assertEqual((80, 0),
784
 
            t.append_multi([('a', StringIO('some text in a\n')),
785
 
                            ('d', StringIO('missing file r\n'))]))
786
 
 
787
 
        self.check_transport_contents(
788
 
            'diff\ncontents for\na\n'
789
 
            'add\nsome\nmore\ncontents\n'
790
 
            'and\nthen\nsome\nmore\n'
791
 
            'a little bit more\n'
792
 
            'some text in a\n',
793
 
            t, 'a')
794
 
        self.check_transport_contents('missing file r\n', t, 'd')
795
 
 
796
 
    def test_append_file_mode(self):
797
 
        """Check that append accepts a mode parameter"""
798
 
        # check append accepts a mode
799
 
        t = self.get_transport()
800
 
        if t.is_readonly():
801
 
            self.assertRaises(TransportNotPossible,
802
 
                t.append_file, 'f', StringIO('f'), mode=None)
803
 
            return
804
 
        t.append_file('f', StringIO('f'), mode=None)
805
 
 
806
 
    def test_append_bytes_mode(self):
807
 
        # check append_bytes accepts a mode
808
 
        t = self.get_transport()
809
 
        if t.is_readonly():
810
 
            self.assertRaises(TransportNotPossible,
811
 
                t.append_bytes, 'f', 'f', mode=None)
812
 
            return
813
 
        t.append_bytes('f', 'f', mode=None)
814
 
 
815
 
    def test_delete(self):
816
 
        # TODO: Test Transport.delete
817
 
        t = self.get_transport()
818
 
 
819
 
        # Not much to do with a readonly transport
820
 
        if t.is_readonly():
821
 
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
822
 
            return
823
 
 
824
 
        t.put_bytes('a', 'a little bit of text\n')
825
 
        self.failUnless(t.has('a'))
826
 
        t.delete('a')
827
 
        self.failIf(t.has('a'))
828
 
 
829
 
        self.assertRaises(NoSuchFile, t.delete, 'a')
830
 
 
831
 
        t.put_bytes('a', 'a text\n')
832
 
        t.put_bytes('b', 'b text\n')
833
 
        t.put_bytes('c', 'c text\n')
834
 
        self.assertEqual([True, True, True],
835
 
                list(t.has_multi(['a', 'b', 'c'])))
836
 
        t.delete_multi(['a', 'c'])
837
 
        self.assertEqual([False, True, False],
838
 
                list(t.has_multi(['a', 'b', 'c'])))
839
 
        self.failIf(t.has('a'))
840
 
        self.failUnless(t.has('b'))
841
 
        self.failIf(t.has('c'))
842
 
 
843
 
        self.assertRaises(NoSuchFile,
844
 
                t.delete_multi, ['a', 'b', 'c'])
845
 
 
846
 
        self.assertRaises(NoSuchFile,
847
 
                t.delete_multi, iter(['a', 'b', 'c']))
848
 
 
849
 
        t.put_bytes('a', 'another a text\n')
850
 
        t.put_bytes('c', 'another c text\n')
851
 
        t.delete_multi(iter(['a', 'b', 'c']))
852
 
 
853
 
        # We should have deleted everything
854
 
        # SftpServer creates control files in the
855
 
        # working directory, so we can just do a
856
 
        # plain "listdir".
857
 
        # self.assertEqual([], os.listdir('.'))
858
 
 
859
 
    def test_recommended_page_size(self):
860
 
        """Transports recommend a page size for partial access to files."""
861
 
        t = self.get_transport()
862
 
        self.assertIsInstance(t.recommended_page_size(), int)
863
 
 
864
 
    def test_rmdir(self):
865
 
        t = self.get_transport()
866
 
        # Not much to do with a readonly transport
867
 
        if t.is_readonly():
868
 
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
869
 
            return
870
 
        t.mkdir('adir')
871
 
        t.mkdir('adir/bdir')
872
 
        t.rmdir('adir/bdir')
873
 
        # ftp may not be able to raise NoSuchFile for lack of
874
 
        # details when failing
875
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
876
 
        t.rmdir('adir')
877
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
878
 
 
879
 
    def test_rmdir_not_empty(self):
880
 
        """Deleting a non-empty directory raises an exception
881
 
 
882
 
        sftp (and possibly others) don't give us a specific "directory not
883
 
        empty" exception -- we can just see that the operation failed.
884
 
        """
885
 
        t = self.get_transport()
886
 
        if t.is_readonly():
887
 
            return
888
 
        t.mkdir('adir')
889
 
        t.mkdir('adir/bdir')
890
 
        self.assertRaises(PathError, t.rmdir, 'adir')
891
 
 
892
 
    def test_rmdir_empty_but_similar_prefix(self):
893
 
        """rmdir does not get confused by sibling paths.
894
 
 
895
 
        A naive implementation of MemoryTransport would refuse to rmdir
896
 
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
897
 
        uses "path.startswith(dir)" on all file paths to determine if directory
898
 
        is empty.
899
 
        """
900
 
        t = self.get_transport()
901
 
        if t.is_readonly():
902
 
            return
903
 
        t.mkdir('foo')
904
 
        t.put_bytes('foo-bar', '')
905
 
        t.mkdir('foo-baz')
906
 
        t.rmdir('foo')
907
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
 
        self.failUnless(t.has('foo-bar'))
909
 
 
910
 
    def test_rename_dir_succeeds(self):
911
 
        t = self.get_transport()
912
 
        if t.is_readonly():
913
 
            raise TestSkipped("transport is readonly")
914
 
        t.mkdir('adir')
915
 
        t.mkdir('adir/asubdir')
916
 
        t.rename('adir', 'bdir')
917
 
        self.assertTrue(t.has('bdir/asubdir'))
918
 
        self.assertFalse(t.has('adir'))
919
 
 
920
 
    def test_rename_dir_nonempty(self):
921
 
        """Attempting to replace a nonemtpy directory should fail"""
922
 
        t = self.get_transport()
923
 
        if t.is_readonly():
924
 
            raise TestSkipped("transport is readonly")
925
 
        t.mkdir('adir')
926
 
        t.mkdir('adir/asubdir')
927
 
        t.mkdir('bdir')
928
 
        t.mkdir('bdir/bsubdir')
929
 
        # any kind of PathError would be OK, though we normally expect
930
 
        # DirectoryNotEmpty
931
 
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
932
 
        # nothing was changed so it should still be as before
933
 
        self.assertTrue(t.has('bdir/bsubdir'))
934
 
        self.assertFalse(t.has('adir/bdir'))
935
 
        self.assertFalse(t.has('adir/bsubdir'))
936
 
 
937
 
    def test_rename_across_subdirs(self):
938
 
        t = self.get_transport()
939
 
        if t.is_readonly():
940
 
            raise TestNotApplicable("transport is readonly")
941
 
        t.mkdir('a')
942
 
        t.mkdir('b')
943
 
        ta = t.clone('a')
944
 
        tb = t.clone('b')
945
 
        ta.put_bytes('f', 'aoeu')
946
 
        ta.rename('f', '../b/f')
947
 
        self.assertTrue(tb.has('f'))
948
 
        self.assertFalse(ta.has('f'))
949
 
        self.assertTrue(t.has('b/f'))
950
 
 
951
 
    def test_delete_tree(self):
952
 
        t = self.get_transport()
953
 
 
954
 
        # Not much to do with a readonly transport
955
 
        if t.is_readonly():
956
 
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
957
 
            return
958
 
 
959
 
        # and does it like listing ?
960
 
        t.mkdir('adir')
961
 
        try:
962
 
            t.delete_tree('adir')
963
 
        except TransportNotPossible:
964
 
            # ok, this transport does not support delete_tree
965
 
            return
966
 
 
967
 
        # did it delete that trivial case?
968
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
969
 
 
970
 
        self.build_tree(['adir/',
971
 
                         'adir/file',
972
 
                         'adir/subdir/',
973
 
                         'adir/subdir/file',
974
 
                         'adir/subdir2/',
975
 
                         'adir/subdir2/file',
976
 
                         ], transport=t)
977
 
 
978
 
        t.delete_tree('adir')
979
 
        # adir should be gone now.
980
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
981
 
 
982
 
    def test_move(self):
983
 
        t = self.get_transport()
984
 
 
985
 
        if t.is_readonly():
986
 
            return
987
 
 
988
 
        # TODO: I would like to use os.listdir() to
989
 
        # make sure there are no extra files, but SftpServer
990
 
        # creates control files in the working directory
991
 
        # perhaps all of this could be done in a subdirectory
992
 
 
993
 
        t.put_bytes('a', 'a first file\n')
994
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
995
 
 
996
 
        t.move('a', 'b')
997
 
        self.failUnless(t.has('b'))
998
 
        self.failIf(t.has('a'))
999
 
 
1000
 
        self.check_transport_contents('a first file\n', t, 'b')
1001
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1002
 
 
1003
 
        # Overwrite a file
1004
 
        t.put_bytes('c', 'c this file\n')
1005
 
        t.move('c', 'b')
1006
 
        self.failIf(t.has('c'))
1007
 
        self.check_transport_contents('c this file\n', t, 'b')
1008
 
 
1009
 
        # TODO: Try to write a test for atomicity
1010
 
        # TODO: Test moving into a non-existent subdirectory
1011
 
        # TODO: Test Transport.move_multi
1012
 
 
1013
 
    def test_copy(self):
1014
 
        t = self.get_transport()
1015
 
 
1016
 
        if t.is_readonly():
1017
 
            return
1018
 
 
1019
 
        t.put_bytes('a', 'a file\n')
1020
 
        t.copy('a', 'b')
1021
 
        self.check_transport_contents('a file\n', t, 'b')
1022
 
 
1023
 
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1024
 
        os.mkdir('c')
1025
 
        # What should the assert be if you try to copy a
1026
 
        # file over a directory?
1027
 
        #self.assertRaises(Something, t.copy, 'a', 'c')
1028
 
        t.put_bytes('d', 'text in d\n')
1029
 
        t.copy('d', 'b')
1030
 
        self.check_transport_contents('text in d\n', t, 'b')
1031
 
 
1032
 
        # TODO: test copy_multi
1033
 
 
1034
 
    def test_connection_error(self):
1035
 
        """ConnectionError is raised when connection is impossible.
1036
 
 
1037
 
        The error should be raised from the first operation on the transport.
1038
 
        """
1039
 
        try:
1040
 
            url = self._server.get_bogus_url()
1041
 
        except NotImplementedError:
1042
 
            raise TestSkipped("Transport %s has no bogus URL support." %
1043
 
                              self._server.__class__)
1044
 
        t = get_transport(url)
1045
 
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1046
 
 
1047
 
    def test_stat(self):
1048
 
        # TODO: Test stat, just try once, and if it throws, stop testing
1049
 
        from stat import S_ISDIR, S_ISREG
1050
 
 
1051
 
        t = self.get_transport()
1052
 
 
1053
 
        try:
1054
 
            st = t.stat('.')
1055
 
        except TransportNotPossible, e:
1056
 
            # This transport cannot stat
1057
 
            return
1058
 
 
1059
 
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1060
 
        sizes = [14, 0, 16, 0, 18]
1061
 
        self.build_tree(paths, transport=t, line_endings='binary')
1062
 
 
1063
 
        for path, size in zip(paths, sizes):
1064
 
            st = t.stat(path)
1065
 
            if path.endswith('/'):
1066
 
                self.failUnless(S_ISDIR(st.st_mode))
1067
 
                # directory sizes are meaningless
1068
 
            else:
1069
 
                self.failUnless(S_ISREG(st.st_mode))
1070
 
                self.assertEqual(size, st.st_size)
1071
 
 
1072
 
        remote_stats = list(t.stat_multi(paths))
1073
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1074
 
 
1075
 
        self.assertRaises(NoSuchFile, t.stat, 'q')
1076
 
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1077
 
 
1078
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
 
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
 
        subdir = t.clone('subdir')
1082
 
        subdir.stat('./file')
1083
 
        subdir.stat('.')
1084
 
 
1085
 
    def test_hardlink(self):
1086
 
        from stat import ST_NLINK
1087
 
 
1088
 
        t = self.get_transport()
1089
 
 
1090
 
        source_name = "original_target"
1091
 
        link_name = "target_link"
1092
 
 
1093
 
        self.build_tree([source_name], transport=t)
1094
 
 
1095
 
        try:
1096
 
            t.hardlink(source_name, link_name)
1097
 
 
1098
 
            self.failUnless(t.has(source_name))
1099
 
            self.failUnless(t.has(link_name))
1100
 
 
1101
 
            st = t.stat(link_name)
1102
 
            self.failUnlessEqual(st[ST_NLINK], 2)
1103
 
        except TransportNotPossible:
1104
 
            raise TestSkipped("Transport %s does not support hardlinks." %
1105
 
                              self._server.__class__)
1106
 
 
1107
 
    def test_symlink(self):
1108
 
        from stat import S_ISLNK
1109
 
 
1110
 
        t = self.get_transport()
1111
 
 
1112
 
        source_name = "original_target"
1113
 
        link_name = "target_link"
1114
 
 
1115
 
        self.build_tree([source_name], transport=t)
1116
 
 
1117
 
        try:
1118
 
            t.symlink(source_name, link_name)
1119
 
 
1120
 
            self.failUnless(t.has(source_name))
1121
 
            self.failUnless(t.has(link_name))
1122
 
 
1123
 
            st = t.stat(link_name)
1124
 
            self.failUnless(S_ISLNK(st.st_mode),
1125
 
                "expected symlink, got mode %o" % st.st_mode)
1126
 
        except TransportNotPossible:
1127
 
            raise TestSkipped("Transport %s does not support symlinks." %
1128
 
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1131
 
 
1132
 
    def test_list_dir(self):
1133
 
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1134
 
        t = self.get_transport()
1135
 
 
1136
 
        if not t.listable():
1137
 
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1138
 
            return
1139
 
 
1140
 
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
1143
 
            return l
1144
 
 
1145
 
        self.assertEqual([], sorted_list('.', t))
1146
 
        # c2 is precisely one letter longer than c here to test that
1147
 
        # suffixing is not confused.
1148
 
        # a%25b checks that quoting is done consistently across transports
1149
 
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1150
 
 
1151
 
        if not t.is_readonly():
1152
 
            self.build_tree(tree_names, transport=t)
1153
 
        else:
1154
 
            self.build_tree(tree_names)
1155
 
 
1156
 
        self.assertEqual(
1157
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1158
 
        self.assertEqual(
1159
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1160
 
        self.assertEqual(['d', 'e'], sorted_list('c', t))
1161
 
 
1162
 
        # Cloning the transport produces an equivalent listing
1163
 
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1164
 
 
1165
 
        if not t.is_readonly():
1166
 
            t.delete('c/d')
1167
 
            t.delete('b')
1168
 
        else:
1169
 
            os.unlink('c/d')
1170
 
            os.unlink('b')
1171
 
 
1172
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1173
 
        self.assertEqual(['e'], sorted_list('c', t))
1174
 
 
1175
 
        self.assertListRaises(PathError, t.list_dir, 'q')
1176
 
        self.assertListRaises(PathError, t.list_dir, 'c/f')
1177
 
        # 'a' is a file, list_dir should raise an error
1178
 
        self.assertListRaises(PathError, t.list_dir, 'a')
1179
 
 
1180
 
    def test_list_dir_result_is_url_escaped(self):
1181
 
        t = self.get_transport()
1182
 
        if not t.listable():
1183
 
            raise TestSkipped("transport not listable")
1184
 
 
1185
 
        if not t.is_readonly():
1186
 
            self.build_tree(['a/', 'a/%'], transport=t)
1187
 
        else:
1188
 
            self.build_tree(['a/', 'a/%'])
1189
 
 
1190
 
        names = list(t.list_dir('a'))
1191
 
        self.assertEqual(['%25'], names)
1192
 
        self.assertIsInstance(names[0], str)
1193
 
 
1194
 
    def test_clone_preserve_info(self):
1195
 
        t1 = self.get_transport()
1196
 
        if not isinstance(t1, ConnectedTransport):
1197
 
            raise TestSkipped("not a connected transport")
1198
 
 
1199
 
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
1205
 
 
1206
 
    def test__reuse_for(self):
1207
 
        t = self.get_transport()
1208
 
        if not isinstance(t, ConnectedTransport):
1209
 
            raise TestSkipped("not a connected transport")
1210
 
 
1211
 
        def new_url(scheme=None, user=None, password=None,
1212
 
                    host=None, port=None, path=None):
1213
 
            """Build a new url from t.base changing only parts of it.
1214
 
 
1215
 
            Only the parameters different from None will be changed.
1216
 
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
1225
 
 
1226
 
        if t._scheme == 'ftp':
1227
 
            scheme = 'sftp'
1228
 
        else:
1229
 
            scheme = 'ftp'
1230
 
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
1232
 
            user = 'you'
1233
 
        else:
1234
 
            user = 'me'
1235
 
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1236
 
        # passwords are not taken into account because:
1237
 
        # - it makes no sense to have two different valid passwords for the
1238
 
        #   same user
1239
 
        # - _password in ConnectedTransport is intended to collect what the
1240
 
        #   user specified from the command-line and there are cases where the
1241
 
        #   new url can contain no password (if the url was built from an
1242
 
        #   existing transport.base for example)
1243
 
        # - password are considered part of the credentials provided at
1244
 
        #   connection creation time and as such may not be present in the url
1245
 
        #   (they may be typed by the user when prompted for example)
1246
 
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
 
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
1250
 
            port = 4321
1251
 
        else:
1252
 
            port = 1234
1253
 
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1254
 
        # No point in trying to reuse a transport for a local URL
1255
 
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1256
 
 
1257
 
    def test_connection_sharing(self):
1258
 
        t = self.get_transport()
1259
 
        if not isinstance(t, ConnectedTransport):
1260
 
            raise TestSkipped("not a connected transport")
1261
 
 
1262
 
        c = t.clone('subdir')
1263
 
        # Some transports will create the connection  only when needed
1264
 
        t.has('surely_not') # Force connection
1265
 
        self.assertIs(t._get_connection(), c._get_connection())
1266
 
 
1267
 
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = None
1269
 
        t._set_connection(new_connection)
1270
 
        # Check that both transports use the same connection
1271
 
        self.assertIs(new_connection, t._get_connection())
1272
 
        self.assertIs(new_connection, c._get_connection())
1273
 
 
1274
 
    def test_reuse_connection_for_various_paths(self):
1275
 
        t = self.get_transport()
1276
 
        if not isinstance(t, ConnectedTransport):
1277
 
            raise TestSkipped("not a connected transport")
1278
 
 
1279
 
        t.has('surely_not') # Force connection
1280
 
        self.assertIsNot(None, t._get_connection())
1281
 
 
1282
 
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1283
 
        self.assertIsNot(t, subdir)
1284
 
        self.assertIs(t._get_connection(), subdir._get_connection())
1285
 
 
1286
 
        home = subdir._reuse_for(t.base + 'home')
1287
 
        self.assertIs(t._get_connection(), home._get_connection())
1288
 
        self.assertIs(subdir._get_connection(), home._get_connection())
1289
 
 
1290
 
    def test_clone(self):
1291
 
        # TODO: Test that clone moves up and down the filesystem
1292
 
        t1 = self.get_transport()
1293
 
 
1294
 
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
 
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
1299
 
 
1300
 
        t2 = t1.clone('b')
1301
 
        self.assertEqual(t1.base + 'b/', t2.base)
1302
 
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
1305
 
 
1306
 
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
1309
 
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
1313
 
 
1314
 
        if t1.is_readonly():
1315
 
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
 
        else:
1317
 
            t2.put_bytes('d', 'newfile\n')
1318
 
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
1322
 
 
1323
 
    def test_clone_to_root(self):
1324
 
        orig_transport = self.get_transport()
1325
 
        # Repeatedly go up to a parent directory until we're at the root
1326
 
        # directory of this transport
1327
 
        root_transport = orig_transport
1328
 
        new_transport = root_transport.clone("..")
1329
 
        # as we are walking up directories, the path must be
1330
 
        # growing less, except at the top
1331
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
 
            or new_transport.base == root_transport.base)
1333
 
        while new_transport.base != root_transport.base:
1334
 
            root_transport = new_transport
1335
 
            new_transport = root_transport.clone("..")
1336
 
            # as we are walking up directories, the path must be
1337
 
            # growing less, except at the top
1338
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
 
                or new_transport.base == root_transport.base)
1340
 
 
1341
 
        # Cloning to "/" should take us to exactly the same location.
1342
 
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1343
 
        # the abspath of "/" from the original transport should be the same
1344
 
        # as the base at the root:
1345
 
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1346
 
 
1347
 
        # At the root, the URL must still end with / as its a directory
1348
 
        self.assertEqual(root_transport.base[-1], '/')
1349
 
 
1350
 
    def test_clone_from_root(self):
1351
 
        """At the root, cloning to a simple dir should just do string append."""
1352
 
        orig_transport = self.get_transport()
1353
 
        root_transport = orig_transport.clone('/')
1354
 
        self.assertEqual(root_transport.base + '.bzr/',
1355
 
            root_transport.clone('.bzr').base)
1356
 
 
1357
 
    def test_base_url(self):
1358
 
        t = self.get_transport()
1359
 
        self.assertEqual('/', t.base[-1])
1360
 
 
1361
 
    def test_relpath(self):
1362
 
        t = self.get_transport()
1363
 
        self.assertEqual('', t.relpath(t.base))
1364
 
        # base ends with /
1365
 
        self.assertEqual('', t.relpath(t.base[:-1]))
1366
 
        # subdirs which don't exist should still give relpaths.
1367
 
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1368
 
        # trailing slash should be the same.
1369
 
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1370
 
 
1371
 
    def test_relpath_at_root(self):
1372
 
        t = self.get_transport()
1373
 
        # clone all the way to the top
1374
 
        new_transport = t.clone('..')
1375
 
        while new_transport.base != t.base:
1376
 
            t = new_transport
1377
 
            new_transport = t.clone('..')
1378
 
        # we must be able to get a relpath below the root
1379
 
        self.assertEqual('', t.relpath(t.base))
1380
 
        # and a deeper one should work too
1381
 
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1382
 
 
1383
 
    def test_abspath(self):
1384
 
        # smoke test for abspath. Corner cases for backends like unix fs's
1385
 
        # that have aliasing problems like symlinks should go in backend
1386
 
        # specific test cases.
1387
 
        transport = self.get_transport()
1388
 
 
1389
 
        self.assertEqual(transport.base + 'relpath',
1390
 
                         transport.abspath('relpath'))
1391
 
 
1392
 
        # This should work without raising an error.
1393
 
        transport.abspath("/")
1394
 
 
1395
 
        # the abspath of "/" and "/foo/.." should result in the same location
1396
 
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1397
 
 
1398
 
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
 
                         transport.abspath("/foo"))
1400
 
 
1401
 
    def test_win32_abspath(self):
1402
 
        # Note: we tried to set sys.platform='win32' so we could test on
1403
 
        # other platforms too, but then osutils does platform specific
1404
 
        # things at import time which defeated us...
1405
 
        if sys.platform != 'win32':
1406
 
            raise TestSkipped(
1407
 
                'Testing drive letters in abspath implemented only for win32')
1408
 
 
1409
 
        # smoke test for abspath on win32.
1410
 
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1413
 
 
1414
 
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1417
 
 
1418
 
    def test_local_abspath(self):
1419
 
        transport = self.get_transport()
1420
 
        try:
1421
 
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
1423
 
            # should be formattable
1424
 
            s = str(e)
1425
 
        else:
1426
 
            self.assertEqual(getcwd(), p)
1427
 
 
1428
 
    def test_abspath_at_root(self):
1429
 
        t = self.get_transport()
1430
 
        # clone all the way to the top
1431
 
        new_transport = t.clone('..')
1432
 
        while new_transport.base != t.base:
1433
 
            t = new_transport
1434
 
            new_transport = t.clone('..')
1435
 
        # we must be able to get a abspath of the root when we ask for
1436
 
        # t.abspath('..') - this due to our choice that clone('..')
1437
 
        # should return the root from the root, combined with the desire that
1438
 
        # the url from clone('..') and from abspath('..') should be the same.
1439
 
        self.assertEqual(t.base, t.abspath('..'))
1440
 
        # '' should give us the root
1441
 
        self.assertEqual(t.base, t.abspath(''))
1442
 
        # and a path should append to the url
1443
 
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
1444
 
 
1445
 
    def test_iter_files_recursive(self):
1446
 
        transport = self.get_transport()
1447
 
        if not transport.listable():
1448
 
            self.assertRaises(TransportNotPossible,
1449
 
                              transport.iter_files_recursive)
1450
 
            return
1451
 
        self.build_tree(['isolated/',
1452
 
                         'isolated/dir/',
1453
 
                         'isolated/dir/foo',
1454
 
                         'isolated/dir/bar',
1455
 
                         'isolated/dir/b%25z', # make sure quoting is correct
1456
 
                         'isolated/bar'],
1457
 
                        transport=transport)
1458
 
        paths = set(transport.iter_files_recursive())
1459
 
        # nb the directories are not converted
1460
 
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
1462
 
                         'isolated/dir/bar',
1463
 
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
1465
 
        sub_transport = transport.clone('isolated')
1466
 
        paths = set(sub_transport.iter_files_recursive())
1467
 
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1469
 
 
1470
 
    def test_copy_tree(self):
1471
 
        # TODO: test file contents and permissions are preserved. This test was
1472
 
        # added just to ensure that quoting was handled correctly.
1473
 
        # -- David Allouche 2006-08-11
1474
 
        transport = self.get_transport()
1475
 
        if not transport.listable():
1476
 
            self.assertRaises(TransportNotPossible,
1477
 
                              transport.iter_files_recursive)
1478
 
            return
1479
 
        if transport.is_readonly():
1480
 
            return
1481
 
        self.build_tree(['from/',
1482
 
                         'from/dir/',
1483
 
                         'from/dir/foo',
1484
 
                         'from/dir/bar',
1485
 
                         'from/dir/b%25z', # make sure quoting is correct
1486
 
                         'from/bar'],
1487
 
                        transport=transport)
1488
 
        transport.copy_tree('from', 'to')
1489
 
        paths = set(transport.iter_files_recursive())
1490
 
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
1492
 
                         'from/dir/bar',
1493
 
                         'from/dir/b%2525z',
1494
 
                         'from/bar',
1495
 
                         'to/dir/foo',
1496
 
                         'to/dir/bar',
1497
 
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
1499
 
 
1500
 
    def test_copy_tree_to_transport(self):
1501
 
        transport = self.get_transport()
1502
 
        if not transport.listable():
1503
 
            self.assertRaises(TransportNotPossible,
1504
 
                              transport.iter_files_recursive)
1505
 
            return
1506
 
        if transport.is_readonly():
1507
 
            return
1508
 
        self.build_tree(['from/',
1509
 
                         'from/dir/',
1510
 
                         'from/dir/foo',
1511
 
                         'from/dir/bar',
1512
 
                         'from/dir/b%25z', # make sure quoting is correct
1513
 
                         'from/bar'],
1514
 
                        transport=transport)
1515
 
        from_transport = transport.clone('from')
1516
 
        to_transport = transport.clone('to')
1517
 
        to_transport.ensure_base()
1518
 
        from_transport.copy_tree_to_transport(to_transport)
1519
 
        paths = set(transport.iter_files_recursive())
1520
 
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
1522
 
                         'from/dir/bar',
1523
 
                         'from/dir/b%2525z',
1524
 
                         'from/bar',
1525
 
                         'to/dir/foo',
1526
 
                         'to/dir/bar',
1527
 
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
1529
 
 
1530
 
    def test_unicode_paths(self):
1531
 
        """Test that we can read/write files with Unicode names."""
1532
 
        t = self.get_transport()
1533
 
 
1534
 
        # With FAT32 and certain encodings on win32
1535
 
        # '\xe5' and '\xe4' actually map to the same file
1536
 
        # adding a suffix kicks in the 'preserving but insensitive'
1537
 
        # route, and maintains the right files
1538
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1540
 
                 u'\u017d', # Z with umlat iso-8859-2
1541
 
                 u'\u062c', # Arabic j
1542
 
                 u'\u0410', # Russian A
1543
 
                 u'\u65e5', # Kanji person
1544
 
                ]
1545
 
 
1546
 
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
 
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
1549
 
 
1550
 
        try:
1551
 
            self.build_tree(files, transport=t, line_endings='binary')
1552
 
        except UnicodeError:
1553
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
1554
 
 
1555
 
        # A plain unicode string is not a valid url
1556
 
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
1558
 
 
1559
 
        for fname in files:
1560
 
            fname_utf8 = fname.encode('utf-8')
1561
 
            contents = 'contents of %s\n' % (fname_utf8,)
1562
 
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1563
 
 
1564
 
    def test_connect_twice_is_same_content(self):
1565
 
        # check that our server (whatever it is) is accessible reliably
1566
 
        # via get_transport and multiple connections share content.
1567
 
        transport = self.get_transport()
1568
 
        if transport.is_readonly():
1569
 
            return
1570
 
        transport.put_bytes('foo', 'bar')
1571
 
        transport3 = self.get_transport()
1572
 
        self.check_transport_contents('bar', transport3, 'foo')
1573
 
 
1574
 
        # now opening at a relative url should give use a sane result:
1575
 
        transport.mkdir('newdir')
1576
 
        transport5 = self.get_transport('newdir')
1577
 
        transport6 = transport5.clone('..')
1578
 
        self.check_transport_contents('bar', transport6, 'foo')
1579
 
 
1580
 
    def test_lock_write(self):
1581
 
        """Test transport-level write locks.
1582
 
 
1583
 
        These are deprecated and transports may decline to support them.
1584
 
        """
1585
 
        transport = self.get_transport()
1586
 
        if transport.is_readonly():
1587
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1588
 
            return
1589
 
        transport.put_bytes('lock', '')
1590
 
        try:
1591
 
            lock = transport.lock_write('lock')
1592
 
        except TransportNotPossible:
1593
 
            return
1594
 
        # TODO make this consistent on all platforms:
1595
 
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1596
 
        lock.unlock()
1597
 
 
1598
 
    def test_lock_read(self):
1599
 
        """Test transport-level read locks.
1600
 
 
1601
 
        These are deprecated and transports may decline to support them.
1602
 
        """
1603
 
        transport = self.get_transport()
1604
 
        if transport.is_readonly():
1605
 
            file('lock', 'w').close()
1606
 
        else:
1607
 
            transport.put_bytes('lock', '')
1608
 
        try:
1609
 
            lock = transport.lock_read('lock')
1610
 
        except TransportNotPossible:
1611
 
            return
1612
 
        # TODO make this consistent on all platforms:
1613
 
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1614
 
        lock.unlock()
1615
 
 
1616
 
    def test_readv(self):
1617
 
        transport = self.get_transport()
1618
 
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
1620
 
        else:
1621
 
            transport.put_bytes('a', '0123456789')
1622
 
 
1623
 
        d = list(transport.readv('a', ((0, 1),)))
1624
 
        self.assertEqual(d[0], (0, '0'))
1625
 
 
1626
 
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
 
        self.assertEqual(d[0], (0, '0'))
1628
 
        self.assertEqual(d[1], (1, '1'))
1629
 
        self.assertEqual(d[2], (3, '34'))
1630
 
        self.assertEqual(d[3], (9, '9'))
1631
 
 
1632
 
    def test_readv_out_of_order(self):
1633
 
        transport = self.get_transport()
1634
 
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
1636
 
        else:
1637
 
            transport.put_bytes('a', '01234567890')
1638
 
 
1639
 
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
 
        self.assertEqual(d[0], (1, '1'))
1641
 
        self.assertEqual(d[1], (9, '9'))
1642
 
        self.assertEqual(d[2], (0, '0'))
1643
 
        self.assertEqual(d[3], (3, '34'))
1644
 
 
1645
 
    def test_readv_with_adjust_for_latency(self):
1646
 
        transport = self.get_transport()
1647
 
        # the adjust for latency flag expands the data region returned
1648
 
        # according to a per-transport heuristic, so testing is a little
1649
 
        # tricky as we need more data than the largest combining that our
1650
 
        # transports do. To accomodate this we generate random data and cross
1651
 
        # reference the returned data with the random data. To avoid doing
1652
 
        # multiple large random byte look ups we do several tests on the same
1653
 
        # backing data.
1654
 
        content = osutils.rand_bytes(200*1024)
1655
 
        content_size = len(content)
1656
 
        if transport.is_readonly():
1657
 
            self.build_tree_contents([('a', content)])
1658
 
        else:
1659
 
            transport.put_bytes('a', content)
1660
 
        def check_result_data(result_vector):
1661
 
            for item in result_vector:
1662
 
                data_len = len(item[1])
1663
 
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1664
 
 
1665
 
        # start corner case
1666
 
        result = list(transport.readv('a', ((0, 30),),
1667
 
            adjust_for_latency=True, upper_limit=content_size))
1668
 
        # we expect 1 result, from 0, to something > 30
1669
 
        self.assertEqual(1, len(result))
1670
 
        self.assertEqual(0, result[0][0])
1671
 
        self.assertTrue(len(result[0][1]) >= 30)
1672
 
        check_result_data(result)
1673
 
        # end of file corner case
1674
 
        result = list(transport.readv('a', ((204700, 100),),
1675
 
            adjust_for_latency=True, upper_limit=content_size))
1676
 
        # we expect 1 result, from 204800- its length, to the end
1677
 
        self.assertEqual(1, len(result))
1678
 
        data_len = len(result[0][1])
1679
 
        self.assertEqual(204800-data_len, result[0][0])
1680
 
        self.assertTrue(data_len >= 100)
1681
 
        check_result_data(result)
1682
 
        # out of order ranges are made in order
1683
 
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1684
 
            adjust_for_latency=True, upper_limit=content_size))
1685
 
        # we expect 2 results, in order, start and end.
1686
 
        self.assertEqual(2, len(result))
1687
 
        # start
1688
 
        data_len = len(result[0][1])
1689
 
        self.assertEqual(0, result[0][0])
1690
 
        self.assertTrue(data_len >= 30)
1691
 
        # end
1692
 
        data_len = len(result[1][1])
1693
 
        self.assertEqual(204800-data_len, result[1][0])
1694
 
        self.assertTrue(data_len >= 100)
1695
 
        check_result_data(result)
1696
 
        # close ranges get combined (even if out of order)
1697
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1698
 
            result = list(transport.readv('a', request_vector,
1699
 
                adjust_for_latency=True, upper_limit=content_size))
1700
 
            self.assertEqual(1, len(result))
1701
 
            data_len = len(result[0][1])
1702
 
            # minimum length is from 400 to 1034 - 634
1703
 
            self.assertTrue(data_len >= 634)
1704
 
            # must contain the region 400 to 1034
1705
 
            self.assertTrue(result[0][0] <= 400)
1706
 
            self.assertTrue(result[0][0] + data_len >= 1034)
1707
 
            check_result_data(result)
1708
 
 
1709
 
    def test_readv_with_adjust_for_latency_with_big_file(self):
1710
 
        transport = self.get_transport()
1711
 
        # test from observed failure case.
1712
 
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
1714
 
        else:
1715
 
            transport.put_bytes('a', 'a'*1024*1024)
1716
 
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
 
            (465373, 800), (947422, 800)]
1719
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
 
        found_items = [False]*9
1721
 
        for pos, (start, length) in enumerate(broken_vector):
1722
 
            # check the range is covered by the result
1723
 
            for offset, data in results:
1724
 
                if offset <= start and start + length <= offset + len(data):
1725
 
                    found_items[pos] = True
1726
 
        self.assertEqual([True]*9, found_items)
1727
 
 
1728
 
    def test_get_with_open_write_stream_sees_all_content(self):
1729
 
        t = self.get_transport()
1730
 
        if t.is_readonly():
1731
 
            return
1732
 
        handle = t.open_write_stream('foo')
1733
 
        try:
1734
 
            handle.write('bcd')
1735
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1736
 
        finally:
1737
 
            handle.close()
1738
 
 
1739
 
    def test_get_smart_medium(self):
1740
 
        """All transports must either give a smart medium, or know they can't.
1741
 
        """
1742
 
        transport = self.get_transport()
1743
 
        try:
1744
 
            client_medium = transport.get_smart_medium()
1745
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1746
 
        except errors.NoSmartMedium:
1747
 
            # as long as we got it we're fine
1748
 
            pass
1749
 
 
1750
 
    def test_readv_short_read(self):
1751
 
        transport = self.get_transport()
1752
 
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
1754
 
        else:
1755
 
            transport.put_bytes('a', '01234567890')
1756
 
 
1757
 
        # This is intentionally reading off the end of the file
1758
 
        # since we are sure that it cannot get there
1759
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
 
                               # Can be raised by paramiko
1761
 
                               AssertionError),
1762
 
                              transport.readv, 'a', [(1,1), (8,10)])
1763
 
 
1764
 
        # This is trying to seek past the end of the file, it should
1765
 
        # also raise a special error
1766
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
 
                              transport.readv, 'a', [(12,2)])
1768
 
 
1769
 
    def test_stat_symlink(self):
1770
 
        # if a transport points directly to a symlink (and supports symlinks
1771
 
        # at all) you can tell this.  helps with bug 32669.
1772
 
        t = self.get_transport()
1773
 
        try:
1774
 
            t.symlink('target', 'link')
1775
 
        except TransportNotPossible:
1776
 
            raise TestSkipped("symlinks not supported")
1777
 
        t2 = t.clone('link')
1778
 
        st = t2.stat('')
1779
 
        self.assertTrue(stat.S_ISLNK(st.st_mode))