~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Martin Pool
  • Date: 2005-06-24 09:03:20 UTC
  • Revision ID: mbp@sourcefrog.net-20050624090320-c34ea3e5aa81d01d
- write new working inventory using AtomicFile

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for Transport implementations.
18
 
 
19
 
Transport implementations tested here are supplied by
20
 
TransportTestProviderAdapter.
21
 
"""
22
 
 
23
 
import itertools
24
 
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
 
import stat
28
 
import sys
29
 
import unittest
30
 
 
31
 
from bzrlib import (
32
 
    errors,
33
 
    osutils,
34
 
    tests,
35
 
    urlutils,
36
 
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
51
 
    TestSkipped,
52
 
    TestNotApplicable,
53
 
    multiply_tests,
54
 
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
58
 
    ConnectedTransport,
59
 
    get_transport,
60
 
    _get_transport_modules,
61
 
    )
62
 
from bzrlib.transport.memory import MemoryTransport
63
 
 
64
 
 
65
 
def get_transport_test_permutations(module):
66
 
    """Get the permutations module wants to have tested."""
67
 
    if getattr(module, 'get_test_permutations', None) is None:
68
 
        raise AssertionError(
69
 
            "transport module %s doesn't provide get_test_permutations()"
70
 
            % module.__name__)
71
 
        return []
72
 
    return module.get_test_permutations()
73
 
 
74
 
 
75
 
def transport_test_permutations():
76
 
    """Return a list of the klass, server_factory pairs to test."""
77
 
    result = []
78
 
    for module in _get_transport_modules():
79
 
        try:
80
 
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
82
 
            for (klass, server_factory) in permutations:
83
 
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
 
                    {"transport_class":klass,
85
 
                     "transport_server":server_factory})
86
 
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
88
 
            # Continue even if a dependency prevents us
89
 
            # from adding this test
90
 
            pass
91
 
    return result
92
 
 
93
 
 
94
 
def load_tests(standard_tests, module, loader):
95
 
    """Multiply tests for tranport implementations."""
96
 
    result = loader.suiteClass()
97
 
    scenarios = transport_test_permutations()
98
 
    return multiply_tests(standard_tests, scenarios, result)
99
 
 
100
 
 
101
 
class TransportTests(TestTransportImplementation):
102
 
 
103
 
    def setUp(self):
104
 
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
106
 
 
107
 
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
110
 
 
111
 
    def test_ensure_base_missing(self):
112
 
        """.ensure_base() should create the directory if it doesn't exist"""
113
 
        t = self.get_transport()
114
 
        t_a = t.clone('a')
115
 
        if t_a.is_readonly():
116
 
            self.assertRaises(TransportNotPossible,
117
 
                              t_a.ensure_base)
118
 
            return
119
 
        self.assertTrue(t_a.ensure_base())
120
 
        self.assertTrue(t.has('a'))
121
 
 
122
 
    def test_ensure_base_exists(self):
123
 
        """.ensure_base() should just be happy if it already exists"""
124
 
        t = self.get_transport()
125
 
        if t.is_readonly():
126
 
            return
127
 
 
128
 
        t.mkdir('a')
129
 
        t_a = t.clone('a')
130
 
        # ensure_base returns False if it didn't create the base
131
 
        self.assertFalse(t_a.ensure_base())
132
 
 
133
 
    def test_ensure_base_missing_parent(self):
134
 
        """.ensure_base() will fail if the parent dir doesn't exist"""
135
 
        t = self.get_transport()
136
 
        if t.is_readonly():
137
 
            return
138
 
 
139
 
        t_a = t.clone('a')
140
 
        t_b = t_a.clone('b')
141
 
        self.assertRaises(NoSuchFile, t_b.ensure_base)
142
 
 
143
 
    def test_external_url(self):
144
 
        """.external_url either works or raises InProcessTransport."""
145
 
        t = self.get_transport()
146
 
        try:
147
 
            t.external_url()
148
 
        except errors.InProcessTransport:
149
 
            pass
150
 
 
151
 
    def test_has(self):
152
 
        t = self.get_transport()
153
 
 
154
 
        files = ['a', 'b', 'e', 'g', '%']
155
 
        self.build_tree(files, transport=t)
156
 
        self.assertEqual(True, t.has('a'))
157
 
        self.assertEqual(False, t.has('c'))
158
 
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
 
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
 
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
 
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
 
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
 
 
173
 
    def test_has_root_works(self):
174
 
        if self.transport_server is test_server.SmartTCPServer_for_testing:
175
 
            raise TestNotApplicable(
176
 
                "SmartTCPServer_for_testing intentionally does not allow "
177
 
                "access to /.")
178
 
        current_transport = self.get_transport()
179
 
        self.assertTrue(current_transport.has('/'))
180
 
        root = current_transport.clone('/')
181
 
        self.assertTrue(root.has(''))
182
 
 
183
 
    def test_get(self):
184
 
        t = self.get_transport()
185
 
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
205
 
 
206
 
    def test_get_unknown_file(self):
207
 
        t = self.get_transport()
208
 
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
211
 
                    ]
212
 
        self.build_tree(files, transport=t, line_endings='binary')
213
 
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
216
 
 
217
 
    def test_get_directory_read_gives_ReadError(self):
218
 
        """consistent errors for read() on a file returned by get()."""
219
 
        t = self.get_transport()
220
 
        if t.is_readonly():
221
 
            self.build_tree(['a directory/'])
222
 
        else:
223
 
            t.mkdir('a%20directory')
224
 
        # getting the file must either work or fail with a PathError
225
 
        try:
226
 
            a_file = t.get('a%20directory')
227
 
        except (errors.PathError, errors.RedirectRequested):
228
 
            # early failure return immediately.
229
 
            return
230
 
        # having got a file, read() must either work (i.e. http reading a dir
231
 
        # listing) or fail with ReadError
232
 
        try:
233
 
            a_file.read()
234
 
        except errors.ReadError:
235
 
            pass
236
 
 
237
 
    def test_get_bytes(self):
238
 
        t = self.get_transport()
239
 
 
240
 
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
245
 
                    ]
246
 
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
248
 
 
249
 
        for content, fname in zip(contents, files):
250
 
            self.assertEqual(content, t.get_bytes(fname))
251
 
 
252
 
    def test_get_bytes_unknown_file(self):
253
 
        t = self.get_transport()
254
 
 
255
 
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
 
 
257
 
    def test_get_with_open_write_stream_sees_all_content(self):
258
 
        t = self.get_transport()
259
 
        if t.is_readonly():
260
 
            return
261
 
        handle = t.open_write_stream('foo')
262
 
        try:
263
 
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
265
 
        finally:
266
 
            handle.close()
267
 
 
268
 
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
 
        t = self.get_transport()
270
 
        if t.is_readonly():
271
 
            return
272
 
        handle = t.open_write_stream('foo')
273
 
        try:
274
 
            handle.write('b')
275
 
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
277
 
        finally:
278
 
            handle.close()
279
 
 
280
 
    def test_put_bytes(self):
281
 
        t = self.get_transport()
282
 
 
283
 
        if t.is_readonly():
284
 
            self.assertRaises(TransportNotPossible,
285
 
                    t.put_bytes, 'a', 'some text for a\n')
286
 
            return
287
 
 
288
 
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
290
 
        self.check_transport_contents('some text for a\n', t, 'a')
291
 
 
292
 
        # The contents should be overwritten
293
 
        t.put_bytes('a', 'new text for a\n')
294
 
        self.check_transport_contents('new text for a\n', t, 'a')
295
 
 
296
 
        self.assertRaises(NoSuchFile,
297
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
298
 
 
299
 
    def test_put_bytes_non_atomic(self):
300
 
        t = self.get_transport()
301
 
 
302
 
        if t.is_readonly():
303
 
            self.assertRaises(TransportNotPossible,
304
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
305
 
            return
306
 
 
307
 
        self.failIf(t.has('a'))
308
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
310
 
        self.check_transport_contents('some text for a\n', t, 'a')
311
 
        # Put also replaces contents
312
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
314
 
 
315
 
        # Make sure we can create another file
316
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
317
 
        # And overwrite 'a' with empty contents
318
 
        t.put_bytes_non_atomic('a', '')
319
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
320
 
        self.check_transport_contents('', t, 'a')
321
 
 
322
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
 
                                       'contents\n')
324
 
        # Now test the create_parent flag
325
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
 
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
328
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
 
                               create_parent_dir=True)
330
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
331
 
 
332
 
        # But we still get NoSuchFile if we can't make the parent dir
333
 
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
 
                                       'contents\n',
335
 
                                       create_parent_dir=True)
336
 
 
337
 
    def test_put_bytes_permissions(self):
338
 
        t = self.get_transport()
339
 
 
340
 
        if t.is_readonly():
341
 
            return
342
 
        if not t._can_roundtrip_unix_modebits():
343
 
            # Can't roundtrip, so no need to run this test
344
 
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
351
 
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
354
 
 
355
 
        # The default permissions should be based on the current umask
356
 
        umask = osutils.get_umask()
357
 
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
359
 
 
360
 
    def test_put_bytes_non_atomic_permissions(self):
361
 
        t = self.get_transport()
362
 
 
363
 
        if t.is_readonly():
364
 
            return
365
 
        if not t._can_roundtrip_unix_modebits():
366
 
            # Can't roundtrip, so no need to run this test
367
 
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
376
 
 
377
 
        # The default permissions should be based on the current umask
378
 
        umask = osutils.get_umask()
379
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
381
 
 
382
 
        # We should also be able to set the mode for a parent directory
383
 
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
393
 
 
394
 
    def test_put_file(self):
395
 
        t = self.get_transport()
396
 
 
397
 
        if t.is_readonly():
398
 
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
400
 
            return
401
 
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
403
 
        # put_file returns the length of the data written
404
 
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
406
 
        self.check_transport_contents('some text for a\n', t, 'a')
407
 
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
409
 
        self.assertEqual(19, result)
410
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
 
        self.assertRaises(NoSuchFile,
412
 
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
414
 
 
415
 
    def test_put_file_non_atomic(self):
416
 
        t = self.get_transport()
417
 
 
418
 
        if t.is_readonly():
419
 
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
421
 
            return
422
 
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
426
 
        self.check_transport_contents('some text for a\n', t, 'a')
427
 
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
430
 
 
431
 
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
433
 
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
435
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
 
        self.check_transport_contents('', t, 'a')
437
 
 
438
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
440
 
        # Now test the create_parent flag
441
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
445
 
                              create_parent_dir=True)
446
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
447
 
 
448
 
        # But we still get NoSuchFile if we can't make the parent dir
449
 
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
451
 
                                       create_parent_dir=True)
452
 
 
453
 
    def test_put_file_permissions(self):
454
 
 
455
 
        t = self.get_transport()
456
 
 
457
 
        if t.is_readonly():
458
 
            return
459
 
        if not t._can_roundtrip_unix_modebits():
460
 
            # Can't roundtrip, so no need to run this test
461
 
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
468
 
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
471
 
        # The default permissions should be based on the current umask
472
 
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
475
 
 
476
 
    def test_put_file_non_atomic_permissions(self):
477
 
        t = self.get_transport()
478
 
 
479
 
        if t.is_readonly():
480
 
            return
481
 
        if not t._can_roundtrip_unix_modebits():
482
 
            # Can't roundtrip, so no need to run this test
483
 
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
490
 
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
493
 
 
494
 
        # The default permissions should be based on the current umask
495
 
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
498
 
 
499
 
        # We should also be able to set the mode for a parent directory
500
 
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
511
 
 
512
 
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
 
        t = self.get_transport()
520
 
        if t.is_readonly():
521
 
            return
522
 
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
539
 
 
540
 
    def test_mkdir(self):
541
 
        t = self.get_transport()
542
 
 
543
 
        if t.is_readonly():
544
 
            # cannot mkdir on readonly transports. We're not testing for
545
 
            # cache coherency because cache behaviour is not currently
546
 
            # defined for the transport interface.
547
 
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
551
 
            return
552
 
        # Test mkdir
553
 
        t.mkdir('dir_a')
554
 
        self.assertEqual(t.has('dir_a'), True)
555
 
        self.assertEqual(t.has('dir_b'), False)
556
 
 
557
 
        t.mkdir('dir_b')
558
 
        self.assertEqual(t.has('dir_b'), True)
559
 
 
560
 
        t.mkdir_multi(['dir_c', 'dir_d'])
561
 
 
562
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
 
        self.assertEqual(list(t.has_multi(
564
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
 
            [True, True, True, False,
567
 
             True, True, True, True])
568
 
 
569
 
        # we were testing that a local mkdir followed by a transport
570
 
        # mkdir failed thusly, but given that we * in one process * do not
571
 
        # concurrently fiddle with disk dirs and then use transport to do
572
 
        # things, the win here seems marginal compared to the constraint on
573
 
        # the interface. RBC 20051227
574
 
        t.mkdir('dir_g')
575
 
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
 
 
577
 
        # Test get/put in sub-directories
578
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
582
 
 
583
 
        # mkdir of a dir with an absent parent
584
 
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
585
 
 
586
 
    def test_mkdir_permissions(self):
587
 
        t = self.get_transport()
588
 
        if t.is_readonly():
589
 
            return
590
 
        if not t._can_roundtrip_unix_modebits():
591
 
            # no sense testing on this transport
592
 
            return
593
 
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
604
 
 
605
 
        # Default mode should be based on umask
606
 
        umask = osutils.get_umask()
607
 
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
609
 
 
610
 
    def test_opening_a_file_stream_creates_file(self):
611
 
        t = self.get_transport()
612
 
        if t.is_readonly():
613
 
            return
614
 
        handle = t.open_write_stream('foo')
615
 
        try:
616
 
            self.assertEqual('', t.get_bytes('foo'))
617
 
        finally:
618
 
            handle.close()
619
 
 
620
 
    def test_opening_a_file_stream_can_set_mode(self):
621
 
        t = self.get_transport()
622
 
        if t.is_readonly():
623
 
            return
624
 
        if not t._can_roundtrip_unix_modebits():
625
 
            # Can't roundtrip, so no need to run this test
626
 
            return
627
 
        def check_mode(name, mode, expected):
628
 
            handle = t.open_write_stream(name, mode=mode)
629
 
            handle.close()
630
 
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
634
 
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
636
 
 
637
 
    def test_copy_to(self):
638
 
        # FIXME: test:   same server to same server (partly done)
639
 
        # same protocol two servers
640
 
        # and    different protocols (done for now except for MemoryTransport.
641
 
        # - RBC 20060122
642
 
 
643
 
        def simple_copy_files(transport_from, transport_to):
644
 
            files = ['a', 'b', 'c', 'd']
645
 
            self.build_tree(files, transport=transport_from)
646
 
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
 
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
649
 
                                              transport_from, f)
650
 
 
651
 
        t = self.get_transport()
652
 
        temp_transport = MemoryTransport('memory:///')
653
 
        simple_copy_files(t, temp_transport)
654
 
        if not t.is_readonly():
655
 
            t.mkdir('copy_to_simple')
656
 
            t2 = t.clone('copy_to_simple')
657
 
            simple_copy_files(t, t2)
658
 
 
659
 
 
660
 
        # Test that copying into a missing directory raises
661
 
        # NoSuchFile
662
 
        if t.is_readonly():
663
 
            self.build_tree(['e/', 'e/f'])
664
 
        else:
665
 
            t.mkdir('e')
666
 
            t.put_bytes('e/f', 'contents of e')
667
 
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
 
        temp_transport.mkdir('e')
669
 
        t.copy_to(['e/f'], temp_transport)
670
 
 
671
 
        del temp_transport
672
 
        temp_transport = MemoryTransport('memory:///')
673
 
 
674
 
        files = ['a', 'b', 'c', 'd']
675
 
        t.copy_to(iter(files), temp_transport)
676
 
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
678
 
                                          t, f)
679
 
        del temp_transport
680
 
 
681
 
        for mode in (0666, 0644, 0600, 0400):
682
 
            temp_transport = MemoryTransport("memory:///")
683
 
            t.copy_to(files, temp_transport, mode=mode)
684
 
            for f in files:
685
 
                self.assertTransportMode(temp_transport, f, mode)
686
 
 
687
 
    def test_create_prefix(self):
688
 
        t = self.get_transport()
689
 
        sub = t.clone('foo').clone('bar')
690
 
        try:
691
 
            sub.create_prefix()
692
 
        except TransportNotPossible:
693
 
            self.assertTrue(t.is_readonly())
694
 
        else:
695
 
            self.assertTrue(t.has('foo/bar'))
696
 
 
697
 
    def test_append_file(self):
698
 
        t = self.get_transport()
699
 
 
700
 
        if t.is_readonly():
701
 
            self.assertRaises(TransportNotPossible,
702
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
 
            return
704
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
705
 
        t.put_bytes('b', 'contents\nfor b\n')
706
 
 
707
 
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
709
 
 
710
 
        self.check_transport_contents(
711
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
712
 
            t, 'a')
713
 
 
714
 
        # a file with no parent should fail..
715
 
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
717
 
 
718
 
        # And we can create new files, too
719
 
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
 
        self.check_transport_contents('some text\nfor a missing file\n',
722
 
                                      t, 'c')
723
 
 
724
 
    def test_append_bytes(self):
725
 
        t = self.get_transport()
726
 
 
727
 
        if t.is_readonly():
728
 
            self.assertRaises(TransportNotPossible,
729
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
730
 
            return
731
 
 
732
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
734
 
 
735
 
        self.assertEqual(20,
736
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
737
 
 
738
 
        self.check_transport_contents(
739
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
740
 
            t, 'a')
741
 
 
742
 
        # a file with no parent should fail..
743
 
        self.assertRaises(NoSuchFile,
744
 
                          t.append_bytes, 'missing/path', 'content')
745
 
 
746
 
    def test_append_multi(self):
747
 
        t = self.get_transport()
748
 
 
749
 
        if t.is_readonly():
750
 
            return
751
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
752
 
                         'add\nsome\nmore\ncontents\n')
753
 
        t.put_bytes('b', 'contents\nfor b\n')
754
 
 
755
 
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
758
 
 
759
 
        self.check_transport_contents(
760
 
            'diff\ncontents for\na\n'
761
 
            'add\nsome\nmore\ncontents\n'
762
 
            'and\nthen\nsome\nmore\n',
763
 
            t, 'a')
764
 
        self.check_transport_contents(
765
 
                'contents\nfor b\n'
766
 
                'some\nmore\nfor\nb\n',
767
 
                t, 'b')
768
 
 
769
 
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
772
 
        self.check_transport_contents(
773
 
            'diff\ncontents for\na\n'
774
 
            'add\nsome\nmore\ncontents\n'
775
 
            'and\nthen\nsome\nmore\n'
776
 
            'a little bit more\n',
777
 
            t, 'a')
778
 
        self.check_transport_contents(
779
 
                'contents\nfor b\n'
780
 
                'some\nmore\nfor\nb\n'
781
 
                'from an iterator\n',
782
 
                t, 'b')
783
 
 
784
 
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
787
 
 
788
 
        self.check_transport_contents(
789
 
            'diff\ncontents for\na\n'
790
 
            'add\nsome\nmore\ncontents\n'
791
 
            'and\nthen\nsome\nmore\n'
792
 
            'a little bit more\n'
793
 
            'some text in a\n',
794
 
            t, 'a')
795
 
        self.check_transport_contents('missing file r\n', t, 'd')
796
 
 
797
 
    def test_append_file_mode(self):
798
 
        """Check that append accepts a mode parameter"""
799
 
        # check append accepts a mode
800
 
        t = self.get_transport()
801
 
        if t.is_readonly():
802
 
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
804
 
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
806
 
 
807
 
    def test_append_bytes_mode(self):
808
 
        # check append_bytes accepts a mode
809
 
        t = self.get_transport()
810
 
        if t.is_readonly():
811
 
            self.assertRaises(TransportNotPossible,
812
 
                t.append_bytes, 'f', 'f', mode=None)
813
 
            return
814
 
        t.append_bytes('f', 'f', mode=None)
815
 
 
816
 
    def test_delete(self):
817
 
        # TODO: Test Transport.delete
818
 
        t = self.get_transport()
819
 
 
820
 
        # Not much to do with a readonly transport
821
 
        if t.is_readonly():
822
 
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
823
 
            return
824
 
 
825
 
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
827
 
        t.delete('a')
828
 
        self.failIf(t.has('a'))
829
 
 
830
 
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
 
 
832
 
        t.put_bytes('a', 'a text\n')
833
 
        t.put_bytes('b', 'b text\n')
834
 
        t.put_bytes('c', 'c text\n')
835
 
        self.assertEqual([True, True, True],
836
 
                list(t.has_multi(['a', 'b', 'c'])))
837
 
        t.delete_multi(['a', 'c'])
838
 
        self.assertEqual([False, True, False],
839
 
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
843
 
 
844
 
        self.assertRaises(NoSuchFile,
845
 
                t.delete_multi, ['a', 'b', 'c'])
846
 
 
847
 
        self.assertRaises(NoSuchFile,
848
 
                t.delete_multi, iter(['a', 'b', 'c']))
849
 
 
850
 
        t.put_bytes('a', 'another a text\n')
851
 
        t.put_bytes('c', 'another c text\n')
852
 
        t.delete_multi(iter(['a', 'b', 'c']))
853
 
 
854
 
        # We should have deleted everything
855
 
        # SftpServer creates control files in the
856
 
        # working directory, so we can just do a
857
 
        # plain "listdir".
858
 
        # self.assertEqual([], os.listdir('.'))
859
 
 
860
 
    def test_recommended_page_size(self):
861
 
        """Transports recommend a page size for partial access to files."""
862
 
        t = self.get_transport()
863
 
        self.assertIsInstance(t.recommended_page_size(), int)
864
 
 
865
 
    def test_rmdir(self):
866
 
        t = self.get_transport()
867
 
        # Not much to do with a readonly transport
868
 
        if t.is_readonly():
869
 
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
870
 
            return
871
 
        t.mkdir('adir')
872
 
        t.mkdir('adir/bdir')
873
 
        t.rmdir('adir/bdir')
874
 
        # ftp may not be able to raise NoSuchFile for lack of
875
 
        # details when failing
876
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
877
 
        t.rmdir('adir')
878
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
879
 
 
880
 
    def test_rmdir_not_empty(self):
881
 
        """Deleting a non-empty directory raises an exception
882
 
 
883
 
        sftp (and possibly others) don't give us a specific "directory not
884
 
        empty" exception -- we can just see that the operation failed.
885
 
        """
886
 
        t = self.get_transport()
887
 
        if t.is_readonly():
888
 
            return
889
 
        t.mkdir('adir')
890
 
        t.mkdir('adir/bdir')
891
 
        self.assertRaises(PathError, t.rmdir, 'adir')
892
 
 
893
 
    def test_rmdir_empty_but_similar_prefix(self):
894
 
        """rmdir does not get confused by sibling paths.
895
 
 
896
 
        A naive implementation of MemoryTransport would refuse to rmdir
897
 
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
898
 
        uses "path.startswith(dir)" on all file paths to determine if directory
899
 
        is empty.
900
 
        """
901
 
        t = self.get_transport()
902
 
        if t.is_readonly():
903
 
            return
904
 
        t.mkdir('foo')
905
 
        t.put_bytes('foo-bar', '')
906
 
        t.mkdir('foo-baz')
907
 
        t.rmdir('foo')
908
 
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
910
 
 
911
 
    def test_rename_dir_succeeds(self):
912
 
        t = self.get_transport()
913
 
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
915
 
        t.mkdir('adir')
916
 
        t.mkdir('adir/asubdir')
917
 
        t.rename('adir', 'bdir')
918
 
        self.assertTrue(t.has('bdir/asubdir'))
919
 
        self.assertFalse(t.has('adir'))
920
 
 
921
 
    def test_rename_dir_nonempty(self):
922
 
        """Attempting to replace a nonemtpy directory should fail"""
923
 
        t = self.get_transport()
924
 
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
926
 
        t.mkdir('adir')
927
 
        t.mkdir('adir/asubdir')
928
 
        t.mkdir('bdir')
929
 
        t.mkdir('bdir/bsubdir')
930
 
        # any kind of PathError would be OK, though we normally expect
931
 
        # DirectoryNotEmpty
932
 
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
933
 
        # nothing was changed so it should still be as before
934
 
        self.assertTrue(t.has('bdir/bsubdir'))
935
 
        self.assertFalse(t.has('adir/bdir'))
936
 
        self.assertFalse(t.has('adir/bsubdir'))
937
 
 
938
 
    def test_rename_across_subdirs(self):
939
 
        t = self.get_transport()
940
 
        if t.is_readonly():
941
 
            raise TestNotApplicable("transport is readonly")
942
 
        t.mkdir('a')
943
 
        t.mkdir('b')
944
 
        ta = t.clone('a')
945
 
        tb = t.clone('b')
946
 
        ta.put_bytes('f', 'aoeu')
947
 
        ta.rename('f', '../b/f')
948
 
        self.assertTrue(tb.has('f'))
949
 
        self.assertFalse(ta.has('f'))
950
 
        self.assertTrue(t.has('b/f'))
951
 
 
952
 
    def test_delete_tree(self):
953
 
        t = self.get_transport()
954
 
 
955
 
        # Not much to do with a readonly transport
956
 
        if t.is_readonly():
957
 
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
958
 
            return
959
 
 
960
 
        # and does it like listing ?
961
 
        t.mkdir('adir')
962
 
        try:
963
 
            t.delete_tree('adir')
964
 
        except TransportNotPossible:
965
 
            # ok, this transport does not support delete_tree
966
 
            return
967
 
 
968
 
        # did it delete that trivial case?
969
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
970
 
 
971
 
        self.build_tree(['adir/',
972
 
                         'adir/file',
973
 
                         'adir/subdir/',
974
 
                         'adir/subdir/file',
975
 
                         'adir/subdir2/',
976
 
                         'adir/subdir2/file',
977
 
                         ], transport=t)
978
 
 
979
 
        t.delete_tree('adir')
980
 
        # adir should be gone now.
981
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
982
 
 
983
 
    def test_move(self):
984
 
        t = self.get_transport()
985
 
 
986
 
        if t.is_readonly():
987
 
            return
988
 
 
989
 
        # TODO: I would like to use os.listdir() to
990
 
        # make sure there are no extra files, but SftpServer
991
 
        # creates control files in the working directory
992
 
        # perhaps all of this could be done in a subdirectory
993
 
 
994
 
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
996
 
 
997
 
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
1000
 
 
1001
 
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
 
 
1004
 
        # Overwrite a file
1005
 
        t.put_bytes('c', 'c this file\n')
1006
 
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
1008
 
        self.check_transport_contents('c this file\n', t, 'b')
1009
 
 
1010
 
        # TODO: Try to write a test for atomicity
1011
 
        # TODO: Test moving into a non-existent subdirectory
1012
 
        # TODO: Test Transport.move_multi
1013
 
 
1014
 
    def test_copy(self):
1015
 
        t = self.get_transport()
1016
 
 
1017
 
        if t.is_readonly():
1018
 
            return
1019
 
 
1020
 
        t.put_bytes('a', 'a file\n')
1021
 
        t.copy('a', 'b')
1022
 
        self.check_transport_contents('a file\n', t, 'b')
1023
 
 
1024
 
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
 
        os.mkdir('c')
1026
 
        # What should the assert be if you try to copy a
1027
 
        # file over a directory?
1028
 
        #self.assertRaises(Something, t.copy, 'a', 'c')
1029
 
        t.put_bytes('d', 'text in d\n')
1030
 
        t.copy('d', 'b')
1031
 
        self.check_transport_contents('text in d\n', t, 'b')
1032
 
 
1033
 
        # TODO: test copy_multi
1034
 
 
1035
 
    def test_connection_error(self):
1036
 
        """ConnectionError is raised when connection is impossible.
1037
 
 
1038
 
        The error should be raised from the first operation on the transport.
1039
 
        """
1040
 
        try:
1041
 
            url = self._server.get_bogus_url()
1042
 
        except NotImplementedError:
1043
 
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
 
                              self._server.__class__)
1045
 
        t = get_transport(url)
1046
 
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
 
 
1048
 
    def test_stat(self):
1049
 
        # TODO: Test stat, just try once, and if it throws, stop testing
1050
 
        from stat import S_ISDIR, S_ISREG
1051
 
 
1052
 
        t = self.get_transport()
1053
 
 
1054
 
        try:
1055
 
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
1057
 
            # This transport cannot stat
1058
 
            return
1059
 
 
1060
 
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1061
 
        sizes = [14, 0, 16, 0, 18]
1062
 
        self.build_tree(paths, transport=t, line_endings='binary')
1063
 
 
1064
 
        for path, size in zip(paths, sizes):
1065
 
            st = t.stat(path)
1066
 
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
1068
 
                # directory sizes are meaningless
1069
 
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
1071
 
                self.assertEqual(size, st.st_size)
1072
 
 
1073
 
        remote_stats = list(t.stat_multi(paths))
1074
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
 
 
1076
 
        self.assertRaises(NoSuchFile, t.stat, 'q')
1077
 
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
 
 
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
 
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
 
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
1085
 
 
1086
 
    def test_list_dir(self):
1087
 
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1088
 
        t = self.get_transport()
1089
 
 
1090
 
        if not t.listable():
1091
 
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1092
 
            return
1093
 
 
1094
 
        def sorted_list(d, transport):
1095
 
            l = list(transport.list_dir(d))
1096
 
            l.sort()
1097
 
            return l
1098
 
 
1099
 
        self.assertEqual([], sorted_list('.', t))
1100
 
        # c2 is precisely one letter longer than c here to test that
1101
 
        # suffixing is not confused.
1102
 
        # a%25b checks that quoting is done consistently across transports
1103
 
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1104
 
 
1105
 
        if not t.is_readonly():
1106
 
            self.build_tree(tree_names, transport=t)
1107
 
        else:
1108
 
            self.build_tree(tree_names)
1109
 
 
1110
 
        self.assertEqual(
1111
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1112
 
        self.assertEqual(
1113
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1114
 
        self.assertEqual(['d', 'e'], sorted_list('c', t))
1115
 
 
1116
 
        # Cloning the transport produces an equivalent listing
1117
 
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1118
 
 
1119
 
        if not t.is_readonly():
1120
 
            t.delete('c/d')
1121
 
            t.delete('b')
1122
 
        else:
1123
 
            os.unlink('c/d')
1124
 
            os.unlink('b')
1125
 
 
1126
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1127
 
        self.assertEqual(['e'], sorted_list('c', t))
1128
 
 
1129
 
        self.assertListRaises(PathError, t.list_dir, 'q')
1130
 
        self.assertListRaises(PathError, t.list_dir, 'c/f')
1131
 
        # 'a' is a file, list_dir should raise an error
1132
 
        self.assertListRaises(PathError, t.list_dir, 'a')
1133
 
 
1134
 
    def test_list_dir_result_is_url_escaped(self):
1135
 
        t = self.get_transport()
1136
 
        if not t.listable():
1137
 
            raise TestSkipped("transport not listable")
1138
 
 
1139
 
        if not t.is_readonly():
1140
 
            self.build_tree(['a/', 'a/%'], transport=t)
1141
 
        else:
1142
 
            self.build_tree(['a/', 'a/%'])
1143
 
 
1144
 
        names = list(t.list_dir('a'))
1145
 
        self.assertEqual(['%25'], names)
1146
 
        self.assertIsInstance(names[0], str)
1147
 
 
1148
 
    def test_clone_preserve_info(self):
1149
 
        t1 = self.get_transport()
1150
 
        if not isinstance(t1, ConnectedTransport):
1151
 
            raise TestSkipped("not a connected transport")
1152
 
 
1153
 
        t2 = t1.clone('subdir')
1154
 
        self.assertEquals(t1._scheme, t2._scheme)
1155
 
        self.assertEquals(t1._user, t2._user)
1156
 
        self.assertEquals(t1._password, t2._password)
1157
 
        self.assertEquals(t1._host, t2._host)
1158
 
        self.assertEquals(t1._port, t2._port)
1159
 
 
1160
 
    def test__reuse_for(self):
1161
 
        t = self.get_transport()
1162
 
        if not isinstance(t, ConnectedTransport):
1163
 
            raise TestSkipped("not a connected transport")
1164
 
 
1165
 
        def new_url(scheme=None, user=None, password=None,
1166
 
                    host=None, port=None, path=None):
1167
 
            """Build a new url from t.base changing only parts of it.
1168
 
 
1169
 
            Only the parameters different from None will be changed.
1170
 
            """
1171
 
            if scheme   is None: scheme   = t._scheme
1172
 
            if user     is None: user     = t._user
1173
 
            if password is None: password = t._password
1174
 
            if user     is None: user     = t._user
1175
 
            if host     is None: host     = t._host
1176
 
            if port     is None: port     = t._port
1177
 
            if path     is None: path     = t._path
1178
 
            return t._unsplit_url(scheme, user, password, host, port, path)
1179
 
 
1180
 
        if t._scheme == 'ftp':
1181
 
            scheme = 'sftp'
1182
 
        else:
1183
 
            scheme = 'ftp'
1184
 
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1185
 
        if t._user == 'me':
1186
 
            user = 'you'
1187
 
        else:
1188
 
            user = 'me'
1189
 
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1190
 
        # passwords are not taken into account because:
1191
 
        # - it makes no sense to have two different valid passwords for the
1192
 
        #   same user
1193
 
        # - _password in ConnectedTransport is intended to collect what the
1194
 
        #   user specified from the command-line and there are cases where the
1195
 
        #   new url can contain no password (if the url was built from an
1196
 
        #   existing transport.base for example)
1197
 
        # - password are considered part of the credentials provided at
1198
 
        #   connection creation time and as such may not be present in the url
1199
 
        #   (they may be typed by the user when prompted for example)
1200
 
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1201
 
        # We will not connect, we can use a invalid host
1202
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1203
 
        if t._port == 1234:
1204
 
            port = 4321
1205
 
        else:
1206
 
            port = 1234
1207
 
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1208
 
        # No point in trying to reuse a transport for a local URL
1209
 
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1210
 
 
1211
 
    def test_connection_sharing(self):
1212
 
        t = self.get_transport()
1213
 
        if not isinstance(t, ConnectedTransport):
1214
 
            raise TestSkipped("not a connected transport")
1215
 
 
1216
 
        c = t.clone('subdir')
1217
 
        # Some transports will create the connection  only when needed
1218
 
        t.has('surely_not') # Force connection
1219
 
        self.assertIs(t._get_connection(), c._get_connection())
1220
 
 
1221
 
        # Temporary failure, we need to create a new dummy connection
1222
 
        new_connection = object()
1223
 
        t._set_connection(new_connection)
1224
 
        # Check that both transports use the same connection
1225
 
        self.assertIs(new_connection, t._get_connection())
1226
 
        self.assertIs(new_connection, c._get_connection())
1227
 
 
1228
 
    def test_reuse_connection_for_various_paths(self):
1229
 
        t = self.get_transport()
1230
 
        if not isinstance(t, ConnectedTransport):
1231
 
            raise TestSkipped("not a connected transport")
1232
 
 
1233
 
        t.has('surely_not') # Force connection
1234
 
        self.assertIsNot(None, t._get_connection())
1235
 
 
1236
 
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1237
 
        self.assertIsNot(t, subdir)
1238
 
        self.assertIs(t._get_connection(), subdir._get_connection())
1239
 
 
1240
 
        home = subdir._reuse_for(t.base + 'home')
1241
 
        self.assertIs(t._get_connection(), home._get_connection())
1242
 
        self.assertIs(subdir._get_connection(), home._get_connection())
1243
 
 
1244
 
    def test_clone(self):
1245
 
        # TODO: Test that clone moves up and down the filesystem
1246
 
        t1 = self.get_transport()
1247
 
 
1248
 
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1249
 
 
1250
 
        self.failUnless(t1.has('a'))
1251
 
        self.failUnless(t1.has('b/c'))
1252
 
        self.failIf(t1.has('c'))
1253
 
 
1254
 
        t2 = t1.clone('b')
1255
 
        self.assertEqual(t1.base + 'b/', t2.base)
1256
 
 
1257
 
        self.failUnless(t2.has('c'))
1258
 
        self.failIf(t2.has('a'))
1259
 
 
1260
 
        t3 = t2.clone('..')
1261
 
        self.failUnless(t3.has('a'))
1262
 
        self.failIf(t3.has('c'))
1263
 
 
1264
 
        self.failIf(t1.has('b/d'))
1265
 
        self.failIf(t2.has('d'))
1266
 
        self.failIf(t3.has('b/d'))
1267
 
 
1268
 
        if t1.is_readonly():
1269
 
            self.build_tree_contents([('b/d', 'newfile\n')])
1270
 
        else:
1271
 
            t2.put_bytes('d', 'newfile\n')
1272
 
 
1273
 
        self.failUnless(t1.has('b/d'))
1274
 
        self.failUnless(t2.has('d'))
1275
 
        self.failUnless(t3.has('b/d'))
1276
 
 
1277
 
    def test_clone_to_root(self):
1278
 
        orig_transport = self.get_transport()
1279
 
        # Repeatedly go up to a parent directory until we're at the root
1280
 
        # directory of this transport
1281
 
        root_transport = orig_transport
1282
 
        new_transport = root_transport.clone("..")
1283
 
        # as we are walking up directories, the path must be
1284
 
        # growing less, except at the top
1285
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1286
 
            or new_transport.base == root_transport.base)
1287
 
        while new_transport.base != root_transport.base:
1288
 
            root_transport = new_transport
1289
 
            new_transport = root_transport.clone("..")
1290
 
            # as we are walking up directories, the path must be
1291
 
            # growing less, except at the top
1292
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1293
 
                or new_transport.base == root_transport.base)
1294
 
 
1295
 
        # Cloning to "/" should take us to exactly the same location.
1296
 
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1297
 
        # the abspath of "/" from the original transport should be the same
1298
 
        # as the base at the root:
1299
 
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1300
 
 
1301
 
        # At the root, the URL must still end with / as its a directory
1302
 
        self.assertEqual(root_transport.base[-1], '/')
1303
 
 
1304
 
    def test_clone_from_root(self):
1305
 
        """At the root, cloning to a simple dir should just do string append."""
1306
 
        orig_transport = self.get_transport()
1307
 
        root_transport = orig_transport.clone('/')
1308
 
        self.assertEqual(root_transport.base + '.bzr/',
1309
 
            root_transport.clone('.bzr').base)
1310
 
 
1311
 
    def test_base_url(self):
1312
 
        t = self.get_transport()
1313
 
        self.assertEqual('/', t.base[-1])
1314
 
 
1315
 
    def test_relpath(self):
1316
 
        t = self.get_transport()
1317
 
        self.assertEqual('', t.relpath(t.base))
1318
 
        # base ends with /
1319
 
        self.assertEqual('', t.relpath(t.base[:-1]))
1320
 
        # subdirs which don't exist should still give relpaths.
1321
 
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1322
 
        # trailing slash should be the same.
1323
 
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1324
 
 
1325
 
    def test_relpath_at_root(self):
1326
 
        t = self.get_transport()
1327
 
        # clone all the way to the top
1328
 
        new_transport = t.clone('..')
1329
 
        while new_transport.base != t.base:
1330
 
            t = new_transport
1331
 
            new_transport = t.clone('..')
1332
 
        # we must be able to get a relpath below the root
1333
 
        self.assertEqual('', t.relpath(t.base))
1334
 
        # and a deeper one should work too
1335
 
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1336
 
 
1337
 
    def test_abspath(self):
1338
 
        # smoke test for abspath. Corner cases for backends like unix fs's
1339
 
        # that have aliasing problems like symlinks should go in backend
1340
 
        # specific test cases.
1341
 
        transport = self.get_transport()
1342
 
 
1343
 
        self.assertEqual(transport.base + 'relpath',
1344
 
                         transport.abspath('relpath'))
1345
 
 
1346
 
        # This should work without raising an error.
1347
 
        transport.abspath("/")
1348
 
 
1349
 
        # the abspath of "/" and "/foo/.." should result in the same location
1350
 
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1351
 
 
1352
 
        self.assertEqual(transport.clone("/").abspath('foo'),
1353
 
                         transport.abspath("/foo"))
1354
 
 
1355
 
    def test_win32_abspath(self):
1356
 
        # Note: we tried to set sys.platform='win32' so we could test on
1357
 
        # other platforms too, but then osutils does platform specific
1358
 
        # things at import time which defeated us...
1359
 
        if sys.platform != 'win32':
1360
 
            raise TestSkipped(
1361
 
                'Testing drive letters in abspath implemented only for win32')
1362
 
 
1363
 
        # smoke test for abspath on win32.
1364
 
        # a transport based on 'file:///' never fully qualifies the drive.
1365
 
        transport = get_transport("file:///")
1366
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1367
 
 
1368
 
        # but a transport that starts with a drive spec must keep it.
1369
 
        transport = get_transport("file:///C:/")
1370
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1371
 
 
1372
 
    def test_local_abspath(self):
1373
 
        transport = self.get_transport()
1374
 
        try:
1375
 
            p = transport.local_abspath('.')
1376
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
1377
 
            # should be formattable
1378
 
            s = str(e)
1379
 
        else:
1380
 
            self.assertEqual(getcwd(), p)
1381
 
 
1382
 
    def test_abspath_at_root(self):
1383
 
        t = self.get_transport()
1384
 
        # clone all the way to the top
1385
 
        new_transport = t.clone('..')
1386
 
        while new_transport.base != t.base:
1387
 
            t = new_transport
1388
 
            new_transport = t.clone('..')
1389
 
        # we must be able to get a abspath of the root when we ask for
1390
 
        # t.abspath('..') - this due to our choice that clone('..')
1391
 
        # should return the root from the root, combined with the desire that
1392
 
        # the url from clone('..') and from abspath('..') should be the same.
1393
 
        self.assertEqual(t.base, t.abspath('..'))
1394
 
        # '' should give us the root
1395
 
        self.assertEqual(t.base, t.abspath(''))
1396
 
        # and a path should append to the url
1397
 
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
1398
 
 
1399
 
    def test_iter_files_recursive(self):
1400
 
        transport = self.get_transport()
1401
 
        if not transport.listable():
1402
 
            self.assertRaises(TransportNotPossible,
1403
 
                              transport.iter_files_recursive)
1404
 
            return
1405
 
        self.build_tree(['isolated/',
1406
 
                         'isolated/dir/',
1407
 
                         'isolated/dir/foo',
1408
 
                         'isolated/dir/bar',
1409
 
                         'isolated/dir/b%25z', # make sure quoting is correct
1410
 
                         'isolated/bar'],
1411
 
                        transport=transport)
1412
 
        paths = set(transport.iter_files_recursive())
1413
 
        # nb the directories are not converted
1414
 
        self.assertEqual(paths,
1415
 
                    set(['isolated/dir/foo',
1416
 
                         'isolated/dir/bar',
1417
 
                         'isolated/dir/b%2525z',
1418
 
                         'isolated/bar']))
1419
 
        sub_transport = transport.clone('isolated')
1420
 
        paths = set(sub_transport.iter_files_recursive())
1421
 
        self.assertEqual(paths,
1422
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1423
 
 
1424
 
    def test_copy_tree(self):
1425
 
        # TODO: test file contents and permissions are preserved. This test was
1426
 
        # added just to ensure that quoting was handled correctly.
1427
 
        # -- David Allouche 2006-08-11
1428
 
        transport = self.get_transport()
1429
 
        if not transport.listable():
1430
 
            self.assertRaises(TransportNotPossible,
1431
 
                              transport.iter_files_recursive)
1432
 
            return
1433
 
        if transport.is_readonly():
1434
 
            return
1435
 
        self.build_tree(['from/',
1436
 
                         'from/dir/',
1437
 
                         'from/dir/foo',
1438
 
                         'from/dir/bar',
1439
 
                         'from/dir/b%25z', # make sure quoting is correct
1440
 
                         'from/bar'],
1441
 
                        transport=transport)
1442
 
        transport.copy_tree('from', 'to')
1443
 
        paths = set(transport.iter_files_recursive())
1444
 
        self.assertEqual(paths,
1445
 
                    set(['from/dir/foo',
1446
 
                         'from/dir/bar',
1447
 
                         'from/dir/b%2525z',
1448
 
                         'from/bar',
1449
 
                         'to/dir/foo',
1450
 
                         'to/dir/bar',
1451
 
                         'to/dir/b%2525z',
1452
 
                         'to/bar',]))
1453
 
 
1454
 
    def test_copy_tree_to_transport(self):
1455
 
        transport = self.get_transport()
1456
 
        if not transport.listable():
1457
 
            self.assertRaises(TransportNotPossible,
1458
 
                              transport.iter_files_recursive)
1459
 
            return
1460
 
        if transport.is_readonly():
1461
 
            return
1462
 
        self.build_tree(['from/',
1463
 
                         'from/dir/',
1464
 
                         'from/dir/foo',
1465
 
                         'from/dir/bar',
1466
 
                         'from/dir/b%25z', # make sure quoting is correct
1467
 
                         'from/bar'],
1468
 
                        transport=transport)
1469
 
        from_transport = transport.clone('from')
1470
 
        to_transport = transport.clone('to')
1471
 
        to_transport.ensure_base()
1472
 
        from_transport.copy_tree_to_transport(to_transport)
1473
 
        paths = set(transport.iter_files_recursive())
1474
 
        self.assertEqual(paths,
1475
 
                    set(['from/dir/foo',
1476
 
                         'from/dir/bar',
1477
 
                         'from/dir/b%2525z',
1478
 
                         'from/bar',
1479
 
                         'to/dir/foo',
1480
 
                         'to/dir/bar',
1481
 
                         'to/dir/b%2525z',
1482
 
                         'to/bar',]))
1483
 
 
1484
 
    def test_unicode_paths(self):
1485
 
        """Test that we can read/write files with Unicode names."""
1486
 
        t = self.get_transport()
1487
 
 
1488
 
        # With FAT32 and certain encodings on win32
1489
 
        # '\xe5' and '\xe4' actually map to the same file
1490
 
        # adding a suffix kicks in the 'preserving but insensitive'
1491
 
        # route, and maintains the right files
1492
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1493
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1494
 
                 u'\u017d', # Z with umlat iso-8859-2
1495
 
                 u'\u062c', # Arabic j
1496
 
                 u'\u0410', # Russian A
1497
 
                 u'\u65e5', # Kanji person
1498
 
                ]
1499
 
 
1500
 
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1501
 
        if no_unicode_support:
1502
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
1503
 
 
1504
 
        try:
1505
 
            self.build_tree(files, transport=t, line_endings='binary')
1506
 
        except UnicodeError:
1507
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
1508
 
 
1509
 
        # A plain unicode string is not a valid url
1510
 
        for fname in files:
1511
 
            self.assertRaises(InvalidURL, t.get, fname)
1512
 
 
1513
 
        for fname in files:
1514
 
            fname_utf8 = fname.encode('utf-8')
1515
 
            contents = 'contents of %s\n' % (fname_utf8,)
1516
 
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1517
 
 
1518
 
    def test_connect_twice_is_same_content(self):
1519
 
        # check that our server (whatever it is) is accessible reliably
1520
 
        # via get_transport and multiple connections share content.
1521
 
        transport = self.get_transport()
1522
 
        if transport.is_readonly():
1523
 
            return
1524
 
        transport.put_bytes('foo', 'bar')
1525
 
        transport3 = self.get_transport()
1526
 
        self.check_transport_contents('bar', transport3, 'foo')
1527
 
 
1528
 
        # now opening at a relative url should give use a sane result:
1529
 
        transport.mkdir('newdir')
1530
 
        transport5 = self.get_transport('newdir')
1531
 
        transport6 = transport5.clone('..')
1532
 
        self.check_transport_contents('bar', transport6, 'foo')
1533
 
 
1534
 
    def test_lock_write(self):
1535
 
        """Test transport-level write locks.
1536
 
 
1537
 
        These are deprecated and transports may decline to support them.
1538
 
        """
1539
 
        transport = self.get_transport()
1540
 
        if transport.is_readonly():
1541
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1542
 
            return
1543
 
        transport.put_bytes('lock', '')
1544
 
        try:
1545
 
            lock = transport.lock_write('lock')
1546
 
        except TransportNotPossible:
1547
 
            return
1548
 
        # TODO make this consistent on all platforms:
1549
 
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1550
 
        lock.unlock()
1551
 
 
1552
 
    def test_lock_read(self):
1553
 
        """Test transport-level read locks.
1554
 
 
1555
 
        These are deprecated and transports may decline to support them.
1556
 
        """
1557
 
        transport = self.get_transport()
1558
 
        if transport.is_readonly():
1559
 
            file('lock', 'w').close()
1560
 
        else:
1561
 
            transport.put_bytes('lock', '')
1562
 
        try:
1563
 
            lock = transport.lock_read('lock')
1564
 
        except TransportNotPossible:
1565
 
            return
1566
 
        # TODO make this consistent on all platforms:
1567
 
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1568
 
        lock.unlock()
1569
 
 
1570
 
    def test_readv(self):
1571
 
        transport = self.get_transport()
1572
 
        if transport.is_readonly():
1573
 
            file('a', 'w').write('0123456789')
1574
 
        else:
1575
 
            transport.put_bytes('a', '0123456789')
1576
 
 
1577
 
        d = list(transport.readv('a', ((0, 1),)))
1578
 
        self.assertEqual(d[0], (0, '0'))
1579
 
 
1580
 
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1581
 
        self.assertEqual(d[0], (0, '0'))
1582
 
        self.assertEqual(d[1], (1, '1'))
1583
 
        self.assertEqual(d[2], (3, '34'))
1584
 
        self.assertEqual(d[3], (9, '9'))
1585
 
 
1586
 
    def test_readv_out_of_order(self):
1587
 
        transport = self.get_transport()
1588
 
        if transport.is_readonly():
1589
 
            file('a', 'w').write('0123456789')
1590
 
        else:
1591
 
            transport.put_bytes('a', '01234567890')
1592
 
 
1593
 
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1594
 
        self.assertEqual(d[0], (1, '1'))
1595
 
        self.assertEqual(d[1], (9, '9'))
1596
 
        self.assertEqual(d[2], (0, '0'))
1597
 
        self.assertEqual(d[3], (3, '34'))
1598
 
 
1599
 
    def test_readv_with_adjust_for_latency(self):
1600
 
        transport = self.get_transport()
1601
 
        # the adjust for latency flag expands the data region returned
1602
 
        # according to a per-transport heuristic, so testing is a little
1603
 
        # tricky as we need more data than the largest combining that our
1604
 
        # transports do. To accomodate this we generate random data and cross
1605
 
        # reference the returned data with the random data. To avoid doing
1606
 
        # multiple large random byte look ups we do several tests on the same
1607
 
        # backing data.
1608
 
        content = osutils.rand_bytes(200*1024)
1609
 
        content_size = len(content)
1610
 
        if transport.is_readonly():
1611
 
            self.build_tree_contents([('a', content)])
1612
 
        else:
1613
 
            transport.put_bytes('a', content)
1614
 
        def check_result_data(result_vector):
1615
 
            for item in result_vector:
1616
 
                data_len = len(item[1])
1617
 
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1618
 
 
1619
 
        # start corner case
1620
 
        result = list(transport.readv('a', ((0, 30),),
1621
 
            adjust_for_latency=True, upper_limit=content_size))
1622
 
        # we expect 1 result, from 0, to something > 30
1623
 
        self.assertEqual(1, len(result))
1624
 
        self.assertEqual(0, result[0][0])
1625
 
        self.assertTrue(len(result[0][1]) >= 30)
1626
 
        check_result_data(result)
1627
 
        # end of file corner case
1628
 
        result = list(transport.readv('a', ((204700, 100),),
1629
 
            adjust_for_latency=True, upper_limit=content_size))
1630
 
        # we expect 1 result, from 204800- its length, to the end
1631
 
        self.assertEqual(1, len(result))
1632
 
        data_len = len(result[0][1])
1633
 
        self.assertEqual(204800-data_len, result[0][0])
1634
 
        self.assertTrue(data_len >= 100)
1635
 
        check_result_data(result)
1636
 
        # out of order ranges are made in order
1637
 
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1638
 
            adjust_for_latency=True, upper_limit=content_size))
1639
 
        # we expect 2 results, in order, start and end.
1640
 
        self.assertEqual(2, len(result))
1641
 
        # start
1642
 
        data_len = len(result[0][1])
1643
 
        self.assertEqual(0, result[0][0])
1644
 
        self.assertTrue(data_len >= 30)
1645
 
        # end
1646
 
        data_len = len(result[1][1])
1647
 
        self.assertEqual(204800-data_len, result[1][0])
1648
 
        self.assertTrue(data_len >= 100)
1649
 
        check_result_data(result)
1650
 
        # close ranges get combined (even if out of order)
1651
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1652
 
            result = list(transport.readv('a', request_vector,
1653
 
                adjust_for_latency=True, upper_limit=content_size))
1654
 
            self.assertEqual(1, len(result))
1655
 
            data_len = len(result[0][1])
1656
 
            # minimum length is from 400 to 1034 - 634
1657
 
            self.assertTrue(data_len >= 634)
1658
 
            # must contain the region 400 to 1034
1659
 
            self.assertTrue(result[0][0] <= 400)
1660
 
            self.assertTrue(result[0][0] + data_len >= 1034)
1661
 
            check_result_data(result)
1662
 
 
1663
 
    def test_readv_with_adjust_for_latency_with_big_file(self):
1664
 
        transport = self.get_transport()
1665
 
        # test from observed failure case.
1666
 
        if transport.is_readonly():
1667
 
            file('a', 'w').write('a'*1024*1024)
1668
 
        else:
1669
 
            transport.put_bytes('a', 'a'*1024*1024)
1670
 
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1671
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1672
 
            (465373, 800), (947422, 800)]
1673
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1674
 
        found_items = [False]*9
1675
 
        for pos, (start, length) in enumerate(broken_vector):
1676
 
            # check the range is covered by the result
1677
 
            for offset, data in results:
1678
 
                if offset <= start and start + length <= offset + len(data):
1679
 
                    found_items[pos] = True
1680
 
        self.assertEqual([True]*9, found_items)
1681
 
 
1682
 
    def test_get_with_open_write_stream_sees_all_content(self):
1683
 
        t = self.get_transport()
1684
 
        if t.is_readonly():
1685
 
            return
1686
 
        handle = t.open_write_stream('foo')
1687
 
        try:
1688
 
            handle.write('bcd')
1689
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1690
 
        finally:
1691
 
            handle.close()
1692
 
 
1693
 
    def test_get_smart_medium(self):
1694
 
        """All transports must either give a smart medium, or know they can't.
1695
 
        """
1696
 
        transport = self.get_transport()
1697
 
        try:
1698
 
            client_medium = transport.get_smart_medium()
1699
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1700
 
        except errors.NoSmartMedium:
1701
 
            # as long as we got it we're fine
1702
 
            pass
1703
 
 
1704
 
    def test_readv_short_read(self):
1705
 
        transport = self.get_transport()
1706
 
        if transport.is_readonly():
1707
 
            file('a', 'w').write('0123456789')
1708
 
        else:
1709
 
            transport.put_bytes('a', '01234567890')
1710
 
 
1711
 
        # This is intentionally reading off the end of the file
1712
 
        # since we are sure that it cannot get there
1713
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1714
 
                               # Can be raised by paramiko
1715
 
                               AssertionError),
1716
 
                              transport.readv, 'a', [(1,1), (8,10)])
1717
 
 
1718
 
        # This is trying to seek past the end of the file, it should
1719
 
        # also raise a special error
1720
 
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1721
 
                              transport.readv, 'a', [(12,2)])