~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Martin Packman
  • Date: 2011-12-23 19:38:22 UTC
  • mto: This revision was merged to the branch mainline in revision 6405.
  • Revision ID: martin.packman@canonical.com-20111223193822-hesheea4o8aqwexv
Accept and document passing the medium rather than transport for smart connections

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
25
26
from StringIO import StringIO as pyStringIO
26
27
import stat
27
28
import sys
28
 
import unittest
29
29
 
30
30
from bzrlib import (
31
31
    errors,
32
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
33
36
    urlutils,
34
37
    )
35
38
from bzrlib.errors import (ConnectionError,
36
 
                           DirectoryNotEmpty,
37
39
                           FileExists,
38
40
                           InvalidURL,
39
 
                           LockError,
40
 
                           NoSmartServer,
41
41
                           NoSuchFile,
42
 
                           NotLocalUrl,
43
42
                           PathError,
44
43
                           TransportNotPossible,
45
44
                           )
46
45
from bzrlib.osutils import getcwd
47
46
from bzrlib.smart import medium
48
 
from bzrlib.symbol_versioning import zero_eleven
49
 
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
50
53
from bzrlib.tests.test_transport import TestTransportImplementation
51
 
from bzrlib.transport import memory, remote, _get_transport_modules
52
 
import bzrlib.transport
53
 
 
54
 
 
55
 
class TransportTestProviderAdapter(TestScenarioApplier):
56
 
    """A tool to generate a suite testing all transports for a single test.
57
 
 
58
 
    This is done by copying the test once for each transport and injecting
59
 
    the transport_class and transport_server classes into each copy. Each copy
60
 
    is also given a new id() to make it easy to identify.
61
 
    """
62
 
 
63
 
    def __init__(self):
64
 
        self.scenarios = self._test_permutations()
65
 
 
66
 
    def get_transport_test_permutations(self, module):
67
 
        """Get the permutations module wants to have tested."""
68
 
        if getattr(module, 'get_test_permutations', None) is None:
69
 
            raise AssertionError("transport module %s doesn't provide get_test_permutations()"
70
 
                    % module.__name__)
71
 
            ##warning("transport module %s doesn't provide get_test_permutations()"
72
 
            ##       % module.__name__)
73
 
            return []
74
 
        return module.get_test_permutations()
75
 
 
76
 
    def _test_permutations(self):
77
 
        """Return a list of the klass, server_factory pairs to test."""
78
 
        result = []
79
 
        for module in _get_transport_modules():
80
 
            try:
81
 
                permutations = self.get_transport_test_permutations(
82
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
83
 
                for (klass, server_factory) in permutations:
84
 
                    scenario = (server_factory.__name__,
85
 
                        {"transport_class":klass,
86
 
                         "transport_server":server_factory})
87
 
                    result.append(scenario)
88
 
            except errors.DependencyNotPresent, e:
89
 
                # Continue even if a dependency prevents us 
90
 
                # from running this test
91
 
                pass
92
 
        return result
93
 
 
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(standard_tests, module, loader):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
94
97
 
95
98
 
96
99
class TransportTests(TestTransportImplementation):
97
100
 
98
101
    def setUp(self):
99
102
        super(TransportTests, self).setUp()
100
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
101
104
 
102
105
    def check_transport_contents(self, content, transport, relpath):
103
 
        """Check that transport.get(relpath).read() == content."""
104
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
105
108
 
106
109
    def test_ensure_base_missing(self):
107
110
        """.ensure_base() should create the directory if it doesn't exist"""
135
138
        t_b = t_a.clone('b')
136
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
137
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
138
149
    def test_has(self):
139
150
        t = self.get_transport()
140
151
 
143
154
        self.assertEqual(True, t.has('a'))
144
155
        self.assertEqual(False, t.has('c'))
145
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
146
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
147
 
                [True, True, False, False, True, False, True, False])
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
148
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
149
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
150
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
151
 
                [True, True, False, False, True, False, True, False])
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
152
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
153
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
154
170
 
155
171
    def test_has_root_works(self):
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
            raise TestNotApplicable(
 
174
                "SmartTCPServer_for_testing intentionally does not allow "
 
175
                "access to /.")
156
176
        current_transport = self.get_transport()
157
177
        self.assertTrue(current_transport.has('/'))
158
178
        root = current_transport.clone('/')
170
190
        self.build_tree(files, transport=t, line_endings='binary')
171
191
        self.check_transport_contents('contents of a\n', t, 'a')
172
192
        content_f = t.get_multi(files)
173
 
        for content, f in zip(contents, content_f):
 
193
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
194
        # evaluate their inputs, the transport requests should be issued and
 
195
        # handled sequentially (we don't want to force transport to buffer).
 
196
        for content, f in itertools.izip(contents, content_f):
174
197
            self.assertEqual(content, f.read())
175
198
 
176
199
        content_f = t.get_multi(iter(files))
177
 
        for content, f in zip(contents, content_f):
 
200
        # Use itertools.izip() for the same reason
 
201
        for content, f in itertools.izip(contents, content_f):
178
202
            self.assertEqual(content, f.read())
179
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
180
211
        self.assertRaises(NoSuchFile, t.get, 'c')
181
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
182
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
 
223
 
 
224
    def test_get_directory_read_gives_ReadError(self):
 
225
        """consistent errors for read() on a file returned by get()."""
 
226
        t = self.get_transport()
 
227
        if t.is_readonly():
 
228
            self.build_tree(['a directory/'])
 
229
        else:
 
230
            t.mkdir('a%20directory')
 
231
        # getting the file must either work or fail with a PathError
 
232
        try:
 
233
            a_file = t.get('a%20directory')
 
234
        except (errors.PathError, errors.RedirectRequested):
 
235
            # early failure return immediately.
 
236
            return
 
237
        # having got a file, read() must either work (i.e. http reading a dir
 
238
        # listing) or fail with ReadError
 
239
        try:
 
240
            a_file.read()
 
241
        except errors.ReadError:
 
242
            pass
183
243
 
184
244
    def test_get_bytes(self):
185
245
        t = self.get_transport()
196
256
        for content, fname in zip(contents, files):
197
257
            self.assertEqual(content, t.get_bytes(fname))
198
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
199
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
200
262
 
201
 
    def test_put(self):
202
 
        t = self.get_transport()
203
 
 
204
 
        if t.is_readonly():
205
 
            return
206
 
 
207
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
208
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
209
 
 
210
 
        self.applyDeprecated(zero_eleven,
211
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
212
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
213
 
 
214
 
        self.assertRaises(NoSuchFile,
215
 
            self.applyDeprecated,
216
 
            zero_eleven,
217
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
263
    def test_get_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
        finally:
 
272
            handle.close()
 
273
 
 
274
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
275
        t = self.get_transport()
 
276
        if t.is_readonly():
 
277
            return
 
278
        handle = t.open_write_stream('foo')
 
279
        try:
 
280
            handle.write('b')
 
281
            self.assertEqual('b', t.get_bytes('foo'))
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
 
287
        finally:
 
288
            handle.close()
218
289
 
219
290
    def test_put_bytes(self):
220
291
        t = self.get_transport()
225
296
            return
226
297
 
227
298
        t.put_bytes('a', 'some text for a\n')
228
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
229
300
        self.check_transport_contents('some text for a\n', t, 'a')
230
301
 
231
302
        # The contents should be overwritten
243
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
244
315
            return
245
316
 
246
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
247
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
248
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
249
320
        self.check_transport_contents('some text for a\n', t, 'a')
250
321
        # Put also replaces contents
251
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
263
334
        # Now test the create_parent flag
264
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
265
336
                                       'contents\n')
266
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
267
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
268
339
                               create_parent_dir=True)
269
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
270
 
        
 
341
 
271
342
        # But we still get NoSuchFile if we can't make the parent dir
272
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
273
344
                                       'contents\n',
295
366
        umask = osutils.get_umask()
296
367
        t.put_bytes('nomode', 'test text\n', mode=None)
297
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
298
 
        
 
369
 
299
370
    def test_put_bytes_non_atomic_permissions(self):
300
371
        t = self.get_transport()
301
372
 
329
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
330
401
                               dir_mode=0777, create_parent_dir=True)
331
402
        self.assertTransportMode(t, 'dir777', 0777)
332
 
        
 
403
 
333
404
    def test_put_file(self):
334
405
        t = self.get_transport()
335
406
 
338
409
                    t.put_file, 'a', StringIO('some text for a\n'))
339
410
            return
340
411
 
341
 
        t.put_file('a', StringIO('some text for a\n'))
342
 
        self.failUnless(t.has('a'))
 
412
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        # put_file returns the length of the data written
 
414
        self.assertEqual(16, result)
 
415
        self.assertTrue(t.has('a'))
343
416
        self.check_transport_contents('some text for a\n', t, 'a')
344
417
        # Put also replaces contents
345
 
        t.put_file('a', StringIO('new\ncontents for\na\n'))
 
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        self.assertEqual(19, result)
346
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
347
421
        self.assertRaises(NoSuchFile,
348
422
                          t.put_file, 'path/doesnt/exist/c',
356
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
357
431
            return
358
432
 
359
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
360
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
361
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
362
436
        self.check_transport_contents('some text for a\n', t, 'a')
363
437
        # Put also replaces contents
364
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
376
450
        # Now test the create_parent flag
377
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
378
452
                                       StringIO('contents\n'))
379
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
380
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
381
455
                              create_parent_dir=True)
382
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
383
 
        
 
457
 
384
458
        # But we still get NoSuchFile if we can't make the parent dir
385
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
386
460
                                       StringIO('contents\n'),
404
478
        # Yes, you can put a file such that it becomes readonly
405
479
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
406
480
        self.assertTransportMode(t, 'mode400', 0400)
407
 
 
408
 
        # XXX: put_multi is deprecated, so do we really care anymore?
409
 
        self.applyDeprecated(zero_eleven, t.put_multi,
410
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
411
 
        self.assertTransportMode(t, 'mmode644', 0644)
412
 
 
413
481
        # The default permissions should be based on the current umask
414
482
        umask = osutils.get_umask()
415
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
416
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
417
 
        
 
485
 
418
486
    def test_put_file_non_atomic_permissions(self):
419
487
        t = self.get_transport()
420
488
 
437
505
        umask = osutils.get_umask()
438
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
439
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
440
 
        
 
508
 
441
509
        # We should also be able to set the mode for a parent directory
442
510
        # when it is created
443
511
        sio = StringIO()
479
547
        unicode_file = pyStringIO(u'\u1234')
480
548
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
481
549
 
482
 
    def test_put_multi(self):
483
 
        t = self.get_transport()
484
 
 
485
 
        if t.is_readonly():
486
 
            return
487
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
488
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
489
 
                          ('d', StringIO('contents\nfor d\n'))]
490
 
            ))
491
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
492
 
                [True, False, False, True])
493
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
494
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
495
 
 
496
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
497
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
498
 
                              ('d', StringIO('another contents\nfor d\n'))])
499
 
            ))
500
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
501
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
502
 
 
503
 
    def test_put_permissions(self):
504
 
        t = self.get_transport()
505
 
 
506
 
        if t.is_readonly():
507
 
            return
508
 
        if not t._can_roundtrip_unix_modebits():
509
 
            # Can't roundtrip, so no need to run this test
510
 
            return
511
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
512
 
                             StringIO('test text\n'), mode=0644)
513
 
        self.assertTransportMode(t, 'mode644', 0644)
514
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
515
 
                             StringIO('test text\n'), mode=0666)
516
 
        self.assertTransportMode(t, 'mode666', 0666)
517
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
518
 
                             StringIO('test text\n'), mode=0600)
519
 
        self.assertTransportMode(t, 'mode600', 0600)
520
 
        # Yes, you can put a file such that it becomes readonly
521
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
522
 
                             StringIO('test text\n'), mode=0400)
523
 
        self.assertTransportMode(t, 'mode400', 0400)
524
 
        self.applyDeprecated(zero_eleven, t.put_multi,
525
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
526
 
        self.assertTransportMode(t, 'mmode644', 0644)
527
 
 
528
 
        # The default permissions should be based on the current umask
529
 
        umask = osutils.get_umask()
530
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
531
 
                             StringIO('test text\n'), mode=None)
532
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
533
 
        
534
550
    def test_mkdir(self):
535
551
        t = self.get_transport()
536
552
 
537
553
        if t.is_readonly():
538
 
            # cannot mkdir on readonly transports. We're not testing for 
 
554
            # cannot mkdir on readonly transports. We're not testing for
539
555
            # cache coherency because cache behaviour is not currently
540
556
            # defined for the transport interface.
541
557
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
562
578
 
563
579
        # we were testing that a local mkdir followed by a transport
564
580
        # mkdir failed thusly, but given that we * in one process * do not
565
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
581
        # concurrently fiddle with disk dirs and then use transport to do
566
582
        # things, the win here seems marginal compared to the constraint on
567
583
        # the interface. RBC 20051227
568
584
        t.mkdir('dir_g')
601
617
        t.mkdir('dnomode', mode=None)
602
618
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
603
619
 
 
620
    def test_opening_a_file_stream_creates_file(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        handle = t.open_write_stream('foo')
 
625
        try:
 
626
            self.assertEqual('', t.get_bytes('foo'))
 
627
        finally:
 
628
            handle.close()
 
629
 
 
630
    def test_opening_a_file_stream_can_set_mode(self):
 
631
        t = self.get_transport()
 
632
        if t.is_readonly():
 
633
            return
 
634
        if not t._can_roundtrip_unix_modebits():
 
635
            # Can't roundtrip, so no need to run this test
 
636
            return
 
637
        def check_mode(name, mode, expected):
 
638
            handle = t.open_write_stream(name, mode=mode)
 
639
            handle.close()
 
640
            self.assertTransportMode(t, name, expected)
 
641
        check_mode('mode644', 0644, 0644)
 
642
        check_mode('mode666', 0666, 0666)
 
643
        check_mode('mode600', 0600, 0600)
 
644
        # The default permissions should be based on the current umask
 
645
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
646
 
604
647
    def test_copy_to(self):
605
648
        # FIXME: test:   same server to same server (partly done)
606
649
        # same protocol two servers
607
650
        # and    different protocols (done for now except for MemoryTransport.
608
651
        # - RBC 20060122
609
 
        from bzrlib.transport.memory import MemoryTransport
610
652
 
611
653
        def simple_copy_files(transport_from, transport_to):
612
654
            files = ['a', 'b', 'c', 'd']
613
655
            self.build_tree(files, transport=transport_from)
614
656
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
615
657
            for f in files:
616
 
                self.check_transport_contents(transport_to.get(f).read(),
 
658
                self.check_transport_contents(transport_to.get_bytes(f),
617
659
                                              transport_from, f)
618
660
 
619
661
        t = self.get_transport()
642
684
        files = ['a', 'b', 'c', 'd']
643
685
        t.copy_to(iter(files), temp_transport)
644
686
        for f in files:
645
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
687
            self.check_transport_contents(temp_transport.get_bytes(f),
646
688
                                          t, f)
647
689
        del temp_transport
648
690
 
652
694
            for f in files:
653
695
                self.assertTransportMode(temp_transport, f, mode)
654
696
 
655
 
    def test_append(self):
 
697
    def test_create_prefix(self):
656
698
        t = self.get_transport()
657
 
 
658
 
        if t.is_readonly():
659
 
            return
660
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
661
 
        t.put_bytes('b', 'contents\nfor b\n')
662
 
 
663
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
664
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
665
 
 
666
 
        self.check_transport_contents(
667
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
668
 
            t, 'a')
669
 
 
670
 
        # And we can create new files, too
671
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
672
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
673
 
        self.check_transport_contents('some text\nfor a missing file\n',
674
 
                                      t, 'c')
 
699
        sub = t.clone('foo').clone('bar')
 
700
        try:
 
701
            sub.create_prefix()
 
702
        except TransportNotPossible:
 
703
            self.assertTrue(t.is_readonly())
 
704
        else:
 
705
            self.assertTrue(t.has('foo/bar'))
 
706
 
675
707
    def test_append_file(self):
676
708
        t = self.get_transport()
677
709
 
781
813
                t.append_file, 'f', StringIO('f'), mode=None)
782
814
            return
783
815
        t.append_file('f', StringIO('f'), mode=None)
784
 
        
 
816
 
785
817
    def test_append_bytes_mode(self):
786
818
        # check append_bytes accepts a mode
787
819
        t = self.get_transport()
790
822
                t.append_bytes, 'f', 'f', mode=None)
791
823
            return
792
824
        t.append_bytes('f', 'f', mode=None)
793
 
        
 
825
 
794
826
    def test_delete(self):
795
827
        # TODO: Test Transport.delete
796
828
        t = self.get_transport()
801
833
            return
802
834
 
803
835
        t.put_bytes('a', 'a little bit of text\n')
804
 
        self.failUnless(t.has('a'))
 
836
        self.assertTrue(t.has('a'))
805
837
        t.delete('a')
806
 
        self.failIf(t.has('a'))
 
838
        self.assertFalse(t.has('a'))
807
839
 
808
840
        self.assertRaises(NoSuchFile, t.delete, 'a')
809
841
 
815
847
        t.delete_multi(['a', 'c'])
816
848
        self.assertEqual([False, True, False],
817
849
                list(t.has_multi(['a', 'b', 'c'])))
818
 
        self.failIf(t.has('a'))
819
 
        self.failUnless(t.has('b'))
820
 
        self.failIf(t.has('c'))
 
850
        self.assertFalse(t.has('a'))
 
851
        self.assertTrue(t.has('b'))
 
852
        self.assertFalse(t.has('c'))
821
853
 
822
854
        self.assertRaises(NoSuchFile,
823
855
                t.delete_multi, ['a', 'b', 'c'])
835
867
        # plain "listdir".
836
868
        # self.assertEqual([], os.listdir('.'))
837
869
 
 
870
    def test_recommended_page_size(self):
 
871
        """Transports recommend a page size for partial access to files."""
 
872
        t = self.get_transport()
 
873
        self.assertIsInstance(t.recommended_page_size(), int)
 
874
 
838
875
    def test_rmdir(self):
839
876
        t = self.get_transport()
840
877
        # Not much to do with a readonly transport
852
889
 
853
890
    def test_rmdir_not_empty(self):
854
891
        """Deleting a non-empty directory raises an exception
855
 
        
 
892
 
856
893
        sftp (and possibly others) don't give us a specific "directory not
857
894
        empty" exception -- we can just see that the operation failed.
858
895
        """
865
902
 
866
903
    def test_rmdir_empty_but_similar_prefix(self):
867
904
        """rmdir does not get confused by sibling paths.
868
 
        
 
905
 
869
906
        A naive implementation of MemoryTransport would refuse to rmdir
870
907
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
871
908
        uses "path.startswith(dir)" on all file paths to determine if directory
879
916
        t.mkdir('foo-baz')
880
917
        t.rmdir('foo')
881
918
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
882
 
        self.failUnless(t.has('foo-bar'))
 
919
        self.assertTrue(t.has('foo-bar'))
883
920
 
884
921
    def test_rename_dir_succeeds(self):
885
922
        t = self.get_transport()
908
945
        self.assertFalse(t.has('adir/bdir'))
909
946
        self.assertFalse(t.has('adir/bsubdir'))
910
947
 
 
948
    def test_rename_across_subdirs(self):
 
949
        t = self.get_transport()
 
950
        if t.is_readonly():
 
951
            raise TestNotApplicable("transport is readonly")
 
952
        t.mkdir('a')
 
953
        t.mkdir('b')
 
954
        ta = t.clone('a')
 
955
        tb = t.clone('b')
 
956
        ta.put_bytes('f', 'aoeu')
 
957
        ta.rename('f', '../b/f')
 
958
        self.assertTrue(tb.has('f'))
 
959
        self.assertFalse(ta.has('f'))
 
960
        self.assertTrue(t.has('b/f'))
 
961
 
911
962
    def test_delete_tree(self):
912
963
        t = self.get_transport()
913
964
 
923
974
        except TransportNotPossible:
924
975
            # ok, this transport does not support delete_tree
925
976
            return
926
 
        
 
977
 
927
978
        # did it delete that trivial case?
928
979
        self.assertRaises(NoSuchFile, t.stat, 'adir')
929
980
 
930
981
        self.build_tree(['adir/',
931
 
                         'adir/file', 
932
 
                         'adir/subdir/', 
933
 
                         'adir/subdir/file', 
 
982
                         'adir/file',
 
983
                         'adir/subdir/',
 
984
                         'adir/subdir/file',
934
985
                         'adir/subdir2/',
935
986
                         'adir/subdir2/file',
936
987
                         ], transport=t)
954
1005
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
955
1006
 
956
1007
        t.move('a', 'b')
957
 
        self.failUnless(t.has('b'))
958
 
        self.failIf(t.has('a'))
 
1008
        self.assertTrue(t.has('b'))
 
1009
        self.assertFalse(t.has('a'))
959
1010
 
960
1011
        self.check_transport_contents('a first file\n', t, 'b')
961
1012
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
963
1014
        # Overwrite a file
964
1015
        t.put_bytes('c', 'c this file\n')
965
1016
        t.move('c', 'b')
966
 
        self.failIf(t.has('c'))
 
1017
        self.assertFalse(t.has('c'))
967
1018
        self.check_transport_contents('c this file\n', t, 'b')
968
1019
 
969
1020
        # TODO: Try to write a test for atomicity
970
 
        # TODO: Test moving into a non-existant subdirectory
 
1021
        # TODO: Test moving into a non-existent subdirectory
971
1022
        # TODO: Test Transport.move_multi
972
1023
 
973
1024
    def test_copy(self):
993
1044
 
994
1045
    def test_connection_error(self):
995
1046
        """ConnectionError is raised when connection is impossible.
996
 
        
997
 
        The error may be raised from either the constructor or the first
998
 
        operation on the transport.
 
1047
 
 
1048
        The error should be raised from the first operation on the transport.
999
1049
        """
1000
1050
        try:
1001
1051
            url = self._server.get_bogus_url()
1002
1052
        except NotImplementedError:
1003
1053
            raise TestSkipped("Transport %s has no bogus URL support." %
1004
1054
                              self._server.__class__)
1005
 
        # This should be:  but SSH still connects on construction. No COOKIE!
1006
 
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1007
 
        try:
1008
 
            t = bzrlib.transport.get_transport(url)
1009
 
            t.get('.bzr/branch')
1010
 
        except (ConnectionError, NoSuchFile), e:
1011
 
            pass
1012
 
        except (Exception), e:
1013
 
            self.fail('Wrong exception thrown (%s.%s): %s' 
1014
 
                        % (e.__class__.__module__, e.__class__.__name__, e))
1015
 
        else:
1016
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1055
        t = _mod_transport.get_transport_from_url(url)
 
1056
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1017
1057
 
1018
1058
    def test_stat(self):
1019
1059
        # TODO: Test stat, just try once, and if it throws, stop testing
1028
1068
            return
1029
1069
 
1030
1070
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1031
 
        sizes = [14, 0, 16, 0, 18] 
 
1071
        sizes = [14, 0, 16, 0, 18]
1032
1072
        self.build_tree(paths, transport=t, line_endings='binary')
1033
1073
 
1034
1074
        for path, size in zip(paths, sizes):
1035
1075
            st = t.stat(path)
1036
1076
            if path.endswith('/'):
1037
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1077
                self.assertTrue(S_ISDIR(st.st_mode))
1038
1078
                # directory sizes are meaningless
1039
1079
            else:
1040
 
                self.failUnless(S_ISREG(st.st_mode))
 
1080
                self.assertTrue(S_ISREG(st.st_mode))
1041
1081
                self.assertEqual(size, st.st_size)
1042
1082
 
1043
1083
        remote_stats = list(t.stat_multi(paths))
1050
1090
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1051
1091
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1052
1092
        subdir = t.clone('subdir')
1053
 
        subdir.stat('./file')
1054
 
        subdir.stat('.')
 
1093
        st = subdir.stat('./file')
 
1094
        st = subdir.stat('.')
 
1095
 
 
1096
    def test_hardlink(self):
 
1097
        from stat import ST_NLINK
 
1098
 
 
1099
        t = self.get_transport()
 
1100
 
 
1101
        source_name = "original_target"
 
1102
        link_name = "target_link"
 
1103
 
 
1104
        self.build_tree([source_name], transport=t)
 
1105
 
 
1106
        try:
 
1107
            t.hardlink(source_name, link_name)
 
1108
 
 
1109
            self.assertTrue(t.has(source_name))
 
1110
            self.assertTrue(t.has(link_name))
 
1111
 
 
1112
            st = t.stat(link_name)
 
1113
            self.assertEqual(st[ST_NLINK], 2)
 
1114
        except TransportNotPossible:
 
1115
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1116
                              self._server.__class__)
 
1117
 
 
1118
    def test_symlink(self):
 
1119
        from stat import S_ISLNK
 
1120
 
 
1121
        t = self.get_transport()
 
1122
 
 
1123
        source_name = "original_target"
 
1124
        link_name = "target_link"
 
1125
 
 
1126
        self.build_tree([source_name], transport=t)
 
1127
 
 
1128
        try:
 
1129
            t.symlink(source_name, link_name)
 
1130
 
 
1131
            self.assertTrue(t.has(source_name))
 
1132
            self.assertTrue(t.has(link_name))
 
1133
 
 
1134
            st = t.stat(link_name)
 
1135
            self.assertTrue(S_ISLNK(st.st_mode),
 
1136
                "expected symlink, got mode %o" % st.st_mode)
 
1137
        except TransportNotPossible:
 
1138
            raise TestSkipped("Transport %s does not support symlinks." %
 
1139
                              self._server.__class__)
 
1140
        except IOError:
 
1141
            self.knownFailure("Paramiko fails to create symlinks during tests")
1055
1142
 
1056
1143
    def test_list_dir(self):
1057
1144
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1058
1145
        t = self.get_transport()
1059
 
        
 
1146
 
1060
1147
        if not t.listable():
1061
1148
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1062
1149
            return
1063
1150
 
1064
 
        def sorted_list(d):
1065
 
            l = list(t.list_dir(d))
 
1151
        def sorted_list(d, transport):
 
1152
            l = list(transport.list_dir(d))
1066
1153
            l.sort()
1067
1154
            return l
1068
1155
 
1069
 
        self.assertEqual([], sorted_list('.'))
 
1156
        self.assertEqual([], sorted_list('.', t))
1070
1157
        # c2 is precisely one letter longer than c here to test that
1071
1158
        # suffixing is not confused.
1072
1159
        # a%25b checks that quoting is done consistently across transports
1078
1165
            self.build_tree(tree_names)
1079
1166
 
1080
1167
        self.assertEqual(
1081
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
1082
 
        self.assertEqual(['d', 'e'], sorted_list('c'))
 
1168
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1169
        self.assertEqual(
 
1170
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1171
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1172
 
 
1173
        # Cloning the transport produces an equivalent listing
 
1174
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1083
1175
 
1084
1176
        if not t.is_readonly():
1085
1177
            t.delete('c/d')
1087
1179
        else:
1088
1180
            os.unlink('c/d')
1089
1181
            os.unlink('b')
1090
 
            
1091
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
1092
 
        self.assertEqual(['e'], sorted_list('c'))
 
1182
 
 
1183
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1184
        self.assertEqual(['e'], sorted_list('c', t))
1093
1185
 
1094
1186
        self.assertListRaises(PathError, t.list_dir, 'q')
1095
1187
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1188
        # 'a' is a file, list_dir should raise an error
1096
1189
        self.assertListRaises(PathError, t.list_dir, 'a')
1097
1190
 
1098
1191
    def test_list_dir_result_is_url_escaped(self):
1104
1197
            self.build_tree(['a/', 'a/%'], transport=t)
1105
1198
        else:
1106
1199
            self.build_tree(['a/', 'a/%'])
1107
 
        
 
1200
 
1108
1201
        names = list(t.list_dir('a'))
1109
1202
        self.assertEqual(['%25'], names)
1110
1203
        self.assertIsInstance(names[0], str)
1111
1204
 
 
1205
    def test_clone_preserve_info(self):
 
1206
        t1 = self.get_transport()
 
1207
        if not isinstance(t1, ConnectedTransport):
 
1208
            raise TestSkipped("not a connected transport")
 
1209
 
 
1210
        t2 = t1.clone('subdir')
 
1211
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1212
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1213
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1214
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1215
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
 
1216
 
 
1217
    def test__reuse_for(self):
 
1218
        t = self.get_transport()
 
1219
        if not isinstance(t, ConnectedTransport):
 
1220
            raise TestSkipped("not a connected transport")
 
1221
 
 
1222
        def new_url(scheme=None, user=None, password=None,
 
1223
                    host=None, port=None, path=None):
 
1224
            """Build a new url from t.base changing only parts of it.
 
1225
 
 
1226
            Only the parameters different from None will be changed.
 
1227
            """
 
1228
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1229
            if user     is None: user     = t._parsed_url.user
 
1230
            if password is None: password = t._parsed_url.password
 
1231
            if user     is None: user     = t._parsed_url.user
 
1232
            if host     is None: host     = t._parsed_url.host
 
1233
            if port     is None: port     = t._parsed_url.port
 
1234
            if path     is None: path     = t._parsed_url.path
 
1235
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1236
 
 
1237
        if t._parsed_url.scheme == 'ftp':
 
1238
            scheme = 'sftp'
 
1239
        else:
 
1240
            scheme = 'ftp'
 
1241
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1242
        if t._parsed_url.user == 'me':
 
1243
            user = 'you'
 
1244
        else:
 
1245
            user = 'me'
 
1246
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1247
        # passwords are not taken into account because:
 
1248
        # - it makes no sense to have two different valid passwords for the
 
1249
        #   same user
 
1250
        # - _password in ConnectedTransport is intended to collect what the
 
1251
        #   user specified from the command-line and there are cases where the
 
1252
        #   new url can contain no password (if the url was built from an
 
1253
        #   existing transport.base for example)
 
1254
        # - password are considered part of the credentials provided at
 
1255
        #   connection creation time and as such may not be present in the url
 
1256
        #   (they may be typed by the user when prompted for example)
 
1257
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1258
        # We will not connect, we can use a invalid host
 
1259
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1260
        if t._parsed_url.port == 1234:
 
1261
            port = 4321
 
1262
        else:
 
1263
            port = 1234
 
1264
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1265
        # No point in trying to reuse a transport for a local URL
 
1266
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1267
 
 
1268
    def test_connection_sharing(self):
 
1269
        t = self.get_transport()
 
1270
        if not isinstance(t, ConnectedTransport):
 
1271
            raise TestSkipped("not a connected transport")
 
1272
 
 
1273
        c = t.clone('subdir')
 
1274
        # Some transports will create the connection  only when needed
 
1275
        t.has('surely_not') # Force connection
 
1276
        self.assertIs(t._get_connection(), c._get_connection())
 
1277
 
 
1278
        # Temporary failure, we need to create a new dummy connection
 
1279
        new_connection = None
 
1280
        t._set_connection(new_connection)
 
1281
        # Check that both transports use the same connection
 
1282
        self.assertIs(new_connection, t._get_connection())
 
1283
        self.assertIs(new_connection, c._get_connection())
 
1284
 
 
1285
    def test_reuse_connection_for_various_paths(self):
 
1286
        t = self.get_transport()
 
1287
        if not isinstance(t, ConnectedTransport):
 
1288
            raise TestSkipped("not a connected transport")
 
1289
 
 
1290
        t.has('surely_not') # Force connection
 
1291
        self.assertIsNot(None, t._get_connection())
 
1292
 
 
1293
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1294
        self.assertIsNot(t, subdir)
 
1295
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1296
 
 
1297
        home = subdir._reuse_for(t.base + 'home')
 
1298
        self.assertIs(t._get_connection(), home._get_connection())
 
1299
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1300
 
1112
1301
    def test_clone(self):
1113
1302
        # TODO: Test that clone moves up and down the filesystem
1114
1303
        t1 = self.get_transport()
1115
1304
 
1116
1305
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1117
1306
 
1118
 
        self.failUnless(t1.has('a'))
1119
 
        self.failUnless(t1.has('b/c'))
1120
 
        self.failIf(t1.has('c'))
 
1307
        self.assertTrue(t1.has('a'))
 
1308
        self.assertTrue(t1.has('b/c'))
 
1309
        self.assertFalse(t1.has('c'))
1121
1310
 
1122
1311
        t2 = t1.clone('b')
1123
1312
        self.assertEqual(t1.base + 'b/', t2.base)
1124
1313
 
1125
 
        self.failUnless(t2.has('c'))
1126
 
        self.failIf(t2.has('a'))
 
1314
        self.assertTrue(t2.has('c'))
 
1315
        self.assertFalse(t2.has('a'))
1127
1316
 
1128
1317
        t3 = t2.clone('..')
1129
 
        self.failUnless(t3.has('a'))
1130
 
        self.failIf(t3.has('c'))
 
1318
        self.assertTrue(t3.has('a'))
 
1319
        self.assertFalse(t3.has('c'))
1131
1320
 
1132
 
        self.failIf(t1.has('b/d'))
1133
 
        self.failIf(t2.has('d'))
1134
 
        self.failIf(t3.has('b/d'))
 
1321
        self.assertFalse(t1.has('b/d'))
 
1322
        self.assertFalse(t2.has('d'))
 
1323
        self.assertFalse(t3.has('b/d'))
1135
1324
 
1136
1325
        if t1.is_readonly():
1137
 
            open('b/d', 'wb').write('newfile\n')
 
1326
            self.build_tree_contents([('b/d', 'newfile\n')])
1138
1327
        else:
1139
1328
            t2.put_bytes('d', 'newfile\n')
1140
1329
 
1141
 
        self.failUnless(t1.has('b/d'))
1142
 
        self.failUnless(t2.has('d'))
1143
 
        self.failUnless(t3.has('b/d'))
 
1330
        self.assertTrue(t1.has('b/d'))
 
1331
        self.assertTrue(t2.has('d'))
 
1332
        self.assertTrue(t3.has('b/d'))
1144
1333
 
1145
1334
    def test_clone_to_root(self):
1146
1335
        orig_transport = self.get_transport()
1185
1374
        self.assertEqual('', t.relpath(t.base))
1186
1375
        # base ends with /
1187
1376
        self.assertEqual('', t.relpath(t.base[:-1]))
1188
 
        # subdirs which dont exist should still give relpaths.
 
1377
        # subdirs which don't exist should still give relpaths.
1189
1378
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1190
1379
        # trailing slash should be the same.
1191
1380
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1207
1396
        # that have aliasing problems like symlinks should go in backend
1208
1397
        # specific test cases.
1209
1398
        transport = self.get_transport()
1210
 
        
 
1399
 
1211
1400
        self.assertEqual(transport.base + 'relpath',
1212
1401
                         transport.abspath('relpath'))
1213
1402
 
1220
1409
        self.assertEqual(transport.clone("/").abspath('foo'),
1221
1410
                         transport.abspath("/foo"))
1222
1411
 
 
1412
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1413
    def test_win32_abspath(self):
 
1414
        # Note: we tried to set sys.platform='win32' so we could test on
 
1415
        # other platforms too, but then osutils does platform specific
 
1416
        # things at import time which defeated us...
 
1417
        if sys.platform != 'win32':
 
1418
            raise TestSkipped(
 
1419
                'Testing drive letters in abspath implemented only for win32')
 
1420
 
 
1421
        # smoke test for abspath on win32.
 
1422
        # a transport based on 'file:///' never fully qualifies the drive.
 
1423
        transport = _mod_transport.get_transport_from_url("file:///")
 
1424
        self.assertEqual(transport.abspath("/"), "file:///")
 
1425
 
 
1426
        # but a transport that starts with a drive spec must keep it.
 
1427
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1428
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1429
 
1223
1430
    def test_local_abspath(self):
1224
1431
        transport = self.get_transport()
1225
1432
        try:
1302
1509
                         'to/dir/b%2525z',
1303
1510
                         'to/bar',]))
1304
1511
 
 
1512
    def test_copy_tree_to_transport(self):
 
1513
        transport = self.get_transport()
 
1514
        if not transport.listable():
 
1515
            self.assertRaises(TransportNotPossible,
 
1516
                              transport.iter_files_recursive)
 
1517
            return
 
1518
        if transport.is_readonly():
 
1519
            return
 
1520
        self.build_tree(['from/',
 
1521
                         'from/dir/',
 
1522
                         'from/dir/foo',
 
1523
                         'from/dir/bar',
 
1524
                         'from/dir/b%25z', # make sure quoting is correct
 
1525
                         'from/bar'],
 
1526
                        transport=transport)
 
1527
        from_transport = transport.clone('from')
 
1528
        to_transport = transport.clone('to')
 
1529
        to_transport.ensure_base()
 
1530
        from_transport.copy_tree_to_transport(to_transport)
 
1531
        paths = set(transport.iter_files_recursive())
 
1532
        self.assertEqual(paths,
 
1533
                    set(['from/dir/foo',
 
1534
                         'from/dir/bar',
 
1535
                         'from/dir/b%2525z',
 
1536
                         'from/bar',
 
1537
                         'to/dir/foo',
 
1538
                         'to/dir/bar',
 
1539
                         'to/dir/b%2525z',
 
1540
                         'to/bar',]))
 
1541
 
1305
1542
    def test_unicode_paths(self):
1306
1543
        """Test that we can read/write files with Unicode names."""
1307
1544
        t = self.get_transport()
1318
1555
                 u'\u65e5', # Kanji person
1319
1556
                ]
1320
1557
 
 
1558
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1559
        if no_unicode_support:
 
1560
            self.knownFailure("test server cannot handle unicode paths")
 
1561
 
1321
1562
        try:
1322
1563
            self.build_tree(files, transport=t, line_endings='binary')
1323
1564
        except UnicodeError:
1333
1574
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1334
1575
 
1335
1576
    def test_connect_twice_is_same_content(self):
1336
 
        # check that our server (whatever it is) is accessable reliably
 
1577
        # check that our server (whatever it is) is accessible reliably
1337
1578
        # via get_transport and multiple connections share content.
1338
1579
        transport = self.get_transport()
1339
1580
        if transport.is_readonly():
1340
1581
            return
1341
1582
        transport.put_bytes('foo', 'bar')
1342
 
        transport2 = self.get_transport()
1343
 
        self.check_transport_contents('bar', transport2, 'foo')
1344
 
        # its base should be usable.
1345
 
        transport2 = bzrlib.transport.get_transport(transport.base)
1346
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1583
        transport3 = self.get_transport()
 
1584
        self.check_transport_contents('bar', transport3, 'foo')
1347
1585
 
1348
1586
        # now opening at a relative url should give use a sane result:
1349
1587
        transport.mkdir('newdir')
1350
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1351
 
        transport2 = transport2.clone('..')
1352
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1588
        transport5 = self.get_transport('newdir')
 
1589
        transport6 = transport5.clone('..')
 
1590
        self.check_transport_contents('bar', transport6, 'foo')
1353
1591
 
1354
1592
    def test_lock_write(self):
1355
1593
        """Test transport-level write locks.
1390
1628
    def test_readv(self):
1391
1629
        transport = self.get_transport()
1392
1630
        if transport.is_readonly():
1393
 
            file('a', 'w').write('0123456789')
 
1631
            with file('a', 'w') as f: f.write('0123456789')
1394
1632
        else:
1395
1633
            transport.put_bytes('a', '0123456789')
1396
1634
 
1406
1644
    def test_readv_out_of_order(self):
1407
1645
        transport = self.get_transport()
1408
1646
        if transport.is_readonly():
1409
 
            file('a', 'w').write('0123456789')
 
1647
            with file('a', 'w') as f: f.write('0123456789')
1410
1648
        else:
1411
1649
            transport.put_bytes('a', '01234567890')
1412
1650
 
1416
1654
        self.assertEqual(d[2], (0, '0'))
1417
1655
        self.assertEqual(d[3], (3, '34'))
1418
1656
 
 
1657
    def test_readv_with_adjust_for_latency(self):
 
1658
        transport = self.get_transport()
 
1659
        # the adjust for latency flag expands the data region returned
 
1660
        # according to a per-transport heuristic, so testing is a little
 
1661
        # tricky as we need more data than the largest combining that our
 
1662
        # transports do. To accomodate this we generate random data and cross
 
1663
        # reference the returned data with the random data. To avoid doing
 
1664
        # multiple large random byte look ups we do several tests on the same
 
1665
        # backing data.
 
1666
        content = osutils.rand_bytes(200*1024)
 
1667
        content_size = len(content)
 
1668
        if transport.is_readonly():
 
1669
            self.build_tree_contents([('a', content)])
 
1670
        else:
 
1671
            transport.put_bytes('a', content)
 
1672
        def check_result_data(result_vector):
 
1673
            for item in result_vector:
 
1674
                data_len = len(item[1])
 
1675
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1676
 
 
1677
        # start corner case
 
1678
        result = list(transport.readv('a', ((0, 30),),
 
1679
            adjust_for_latency=True, upper_limit=content_size))
 
1680
        # we expect 1 result, from 0, to something > 30
 
1681
        self.assertEqual(1, len(result))
 
1682
        self.assertEqual(0, result[0][0])
 
1683
        self.assertTrue(len(result[0][1]) >= 30)
 
1684
        check_result_data(result)
 
1685
        # end of file corner case
 
1686
        result = list(transport.readv('a', ((204700, 100),),
 
1687
            adjust_for_latency=True, upper_limit=content_size))
 
1688
        # we expect 1 result, from 204800- its length, to the end
 
1689
        self.assertEqual(1, len(result))
 
1690
        data_len = len(result[0][1])
 
1691
        self.assertEqual(204800-data_len, result[0][0])
 
1692
        self.assertTrue(data_len >= 100)
 
1693
        check_result_data(result)
 
1694
        # out of order ranges are made in order
 
1695
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1696
            adjust_for_latency=True, upper_limit=content_size))
 
1697
        # we expect 2 results, in order, start and end.
 
1698
        self.assertEqual(2, len(result))
 
1699
        # start
 
1700
        data_len = len(result[0][1])
 
1701
        self.assertEqual(0, result[0][0])
 
1702
        self.assertTrue(data_len >= 30)
 
1703
        # end
 
1704
        data_len = len(result[1][1])
 
1705
        self.assertEqual(204800-data_len, result[1][0])
 
1706
        self.assertTrue(data_len >= 100)
 
1707
        check_result_data(result)
 
1708
        # close ranges get combined (even if out of order)
 
1709
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1710
            result = list(transport.readv('a', request_vector,
 
1711
                adjust_for_latency=True, upper_limit=content_size))
 
1712
            self.assertEqual(1, len(result))
 
1713
            data_len = len(result[0][1])
 
1714
            # minimum length is from 400 to 1034 - 634
 
1715
            self.assertTrue(data_len >= 634)
 
1716
            # must contain the region 400 to 1034
 
1717
            self.assertTrue(result[0][0] <= 400)
 
1718
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1719
            check_result_data(result)
 
1720
 
 
1721
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1722
        transport = self.get_transport()
 
1723
        # test from observed failure case.
 
1724
        if transport.is_readonly():
 
1725
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1726
        else:
 
1727
            transport.put_bytes('a', 'a'*1024*1024)
 
1728
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1729
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1730
            (465373, 800), (947422, 800)]
 
1731
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1732
        found_items = [False]*9
 
1733
        for pos, (start, length) in enumerate(broken_vector):
 
1734
            # check the range is covered by the result
 
1735
            for offset, data in results:
 
1736
                if offset <= start and start + length <= offset + len(data):
 
1737
                    found_items[pos] = True
 
1738
        self.assertEqual([True]*9, found_items)
 
1739
 
 
1740
    def test_get_with_open_write_stream_sees_all_content(self):
 
1741
        t = self.get_transport()
 
1742
        if t.is_readonly():
 
1743
            return
 
1744
        handle = t.open_write_stream('foo')
 
1745
        try:
 
1746
            handle.write('bcd')
 
1747
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1748
        finally:
 
1749
            handle.close()
 
1750
 
1419
1751
    def test_get_smart_medium(self):
1420
1752
        """All transports must either give a smart medium, or know they can't.
1421
1753
        """
1430
1762
    def test_readv_short_read(self):
1431
1763
        transport = self.get_transport()
1432
1764
        if transport.is_readonly():
1433
 
            file('a', 'w').write('0123456789')
 
1765
            with file('a', 'w') as f: f.write('0123456789')
1434
1766
        else:
1435
1767
            transport.put_bytes('a', '01234567890')
1436
1768
 
1445
1777
        # also raise a special error
1446
1778
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1447
1779
                              transport.readv, 'a', [(12,2)])
 
1780
 
 
1781
    def test_no_segment_parameters(self):
 
1782
        """Segment parameters should be stripped and stored in
 
1783
        transport.segment_parameters."""
 
1784
        transport = self.get_transport("foo")
 
1785
        self.assertEquals({}, transport.get_segment_parameters())
 
1786
 
 
1787
    def test_segment_parameters(self):
 
1788
        """Segment parameters should be stripped and stored in
 
1789
        transport.get_segment_parameters()."""
 
1790
        base_url = self._server.get_url()
 
1791
        parameters = {"key1": "val1", "key2": "val2"}
 
1792
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1793
        transport = _mod_transport.get_transport_from_url(url)
 
1794
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1795
 
 
1796
    def test_set_segment_parameters(self):
 
1797
        """Segment parameters can be set and show up in base."""
 
1798
        transport = self.get_transport("foo")
 
1799
        orig_base = transport.base
 
1800
        transport.set_segment_parameter("arm", "board")
 
1801
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
 
1802
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
 
1803
        transport.set_segment_parameter("arm", None)
 
1804
        transport.set_segment_parameter("nonexistant", None)
 
1805
        self.assertEquals({}, transport.get_segment_parameters())
 
1806
        self.assertEquals(orig_base, transport.base)
 
1807
 
 
1808
    def test_stat_symlink(self):
 
1809
        # if a transport points directly to a symlink (and supports symlinks
 
1810
        # at all) you can tell this.  helps with bug 32669.
 
1811
        t = self.get_transport()
 
1812
        try:
 
1813
            t.symlink('target', 'link')
 
1814
        except TransportNotPossible:
 
1815
            raise TestSkipped("symlinks not supported")
 
1816
        t2 = t.clone('link')
 
1817
        st = t2.stat('')
 
1818
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1819
 
 
1820
    def test_abspath_url_unquote_unreserved(self):
 
1821
        """URLs from abspath should have unreserved characters unquoted
 
1822
        
 
1823
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1824
        """
 
1825
        t = self.get_transport()
 
1826
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1827
        self.assertEqual(t.base + "-.09AZ_az~",
 
1828
            t.abspath(needlessly_escaped_dir))
 
1829
 
 
1830
    def test_clone_url_unquote_unreserved(self):
 
1831
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1832
        
 
1833
        Cloned transports should be prefix comparable for things like the
 
1834
        isolation checking of tests, see lp:842223 for more.
 
1835
        """
 
1836
        t1 = self.get_transport()
 
1837
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1838
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1839
        t2 = t1.clone(needlessly_escaped_dir)
 
1840
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1841
 
 
1842
    def test_hook_post_connection_one(self):
 
1843
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t = self.get_transport()
 
1847
        self.assertEqual([], log)
 
1848
        t.has("non-existant")
 
1849
        if isinstance(t, RemoteTransport):
 
1850
            self.assertEqual([t.get_smart_medium()], log)
 
1851
        elif isinstance(t, ConnectedTransport):
 
1852
            self.assertEqual([t], log)
 
1853
        else:
 
1854
            self.assertEqual([], log)
 
1855
 
 
1856
    def test_hook_post_connection_multi(self):
 
1857
        """Fire post_connect hook once per unshared underlying connection"""
 
1858
        log = []
 
1859
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1860
        t1 = self.get_transport()
 
1861
        t2 = t1.clone(".")
 
1862
        t3 = self.get_transport()
 
1863
        self.assertEqual([], log)
 
1864
        t1.has("x")
 
1865
        t2.has("x")
 
1866
        t3.has("x")
 
1867
        if isinstance(t1, RemoteTransport):
 
1868
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1869
        elif isinstance(t1, ConnectedTransport):
 
1870
            self.assertEqual([t1, t3], log)
 
1871
        else:
 
1872
            self.assertEqual([], log)