~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Patch Queue Manager
  • Date: 2016-02-01 19:56:05 UTC
  • mfrom: (6615.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20160201195605-o7rl92wf6uyum3fk
(vila) Open trunk again as 2.8b1 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
27
29
 
28
30
from bzrlib import (
29
31
    errors,
30
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
31
36
    urlutils,
32
37
    )
33
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
34
 
                           LockError, NoSmartServer, PathError,
35
 
                           TransportNotPossible, ConnectionError,
36
 
                           InvalidURL)
 
38
from bzrlib.errors import (ConnectionError,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           NoSuchFile,
 
42
                           PathError,
 
43
                           TransportNotPossible,
 
44
                           )
37
45
from bzrlib.osutils import getcwd
38
 
from bzrlib.symbol_versioning import zero_eleven
39
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
 
46
from bzrlib.smart import medium
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
40
53
from bzrlib.tests.test_transport import TestTransportImplementation
41
 
from bzrlib.transport import memory, smart, chroot
42
 
import bzrlib.transport
43
 
 
44
 
 
45
 
def _append(fn, txt):
46
 
    """Append the given text (file-like object) to the supplied filename."""
47
 
    f = open(fn, 'ab')
48
 
    try:
49
 
        f.write(txt.read())
50
 
    finally:
51
 
        f.close()
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(standard_tests, module, loader):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
52
97
 
53
98
 
54
99
class TransportTests(TestTransportImplementation):
55
100
 
 
101
    def setUp(self):
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
104
 
56
105
    def check_transport_contents(self, content, transport, relpath):
57
 
        """Check that transport.get(relpath).read() == content."""
58
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
59
148
 
60
149
    def test_has(self):
61
150
        t = self.get_transport()
65
154
        self.assertEqual(True, t.has('a'))
66
155
        self.assertEqual(False, t.has('c'))
67
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
68
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
69
 
                [True, True, False, False, True, False, True, False])
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
70
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
71
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
72
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
73
 
                [True, True, False, False, True, False, True, False])
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
74
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
75
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
76
170
 
77
171
    def test_has_root_works(self):
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
            raise TestNotApplicable(
 
174
                "SmartTCPServer_for_testing intentionally does not allow "
 
175
                "access to /.")
78
176
        current_transport = self.get_transport()
79
 
        if isinstance(current_transport, chroot.ChrootTransportDecorator):
80
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
81
177
        self.assertTrue(current_transport.has('/'))
82
178
        root = current_transport.clone('/')
83
179
        self.assertTrue(root.has(''))
94
190
        self.build_tree(files, transport=t, line_endings='binary')
95
191
        self.check_transport_contents('contents of a\n', t, 'a')
96
192
        content_f = t.get_multi(files)
97
 
        for content, f in zip(contents, content_f):
 
193
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
194
        # evaluate their inputs, the transport requests should be issued and
 
195
        # handled sequentially (we don't want to force transport to buffer).
 
196
        for content, f in itertools.izip(contents, content_f):
98
197
            self.assertEqual(content, f.read())
99
198
 
100
199
        content_f = t.get_multi(iter(files))
101
 
        for content, f in zip(contents, content_f):
 
200
        # Use itertools.izip() for the same reason
 
201
        for content, f in itertools.izip(contents, content_f):
102
202
            self.assertEqual(content, f.read())
103
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
104
211
        self.assertRaises(NoSuchFile, t.get, 'c')
105
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
106
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
 
223
 
 
224
    def test_get_directory_read_gives_ReadError(self):
 
225
        """consistent errors for read() on a file returned by get()."""
 
226
        t = self.get_transport()
 
227
        if t.is_readonly():
 
228
            self.build_tree(['a directory/'])
 
229
        else:
 
230
            t.mkdir('a%20directory')
 
231
        # getting the file must either work or fail with a PathError
 
232
        try:
 
233
            a_file = t.get('a%20directory')
 
234
        except (errors.PathError, errors.RedirectRequested):
 
235
            # early failure return immediately.
 
236
            return
 
237
        # having got a file, read() must either work (i.e. http reading a dir
 
238
        # listing) or fail with ReadError
 
239
        try:
 
240
            a_file.read()
 
241
        except errors.ReadError:
 
242
            pass
107
243
 
108
244
    def test_get_bytes(self):
109
245
        t = self.get_transport()
120
256
        for content, fname in zip(contents, files):
121
257
            self.assertEqual(content, t.get_bytes(fname))
122
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
123
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
124
262
 
125
 
    def test_put(self):
126
 
        t = self.get_transport()
127
 
 
128
 
        if t.is_readonly():
129
 
            return
130
 
 
131
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
132
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
133
 
 
134
 
        self.applyDeprecated(zero_eleven,
135
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
136
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
137
 
 
138
 
        self.assertRaises(NoSuchFile,
139
 
            self.applyDeprecated,
140
 
            zero_eleven,
141
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
263
    def test_get_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
        finally:
 
272
            handle.close()
 
273
 
 
274
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
275
        t = self.get_transport()
 
276
        if t.is_readonly():
 
277
            return
 
278
        handle = t.open_write_stream('foo')
 
279
        try:
 
280
            handle.write('b')
 
281
            self.assertEqual('b', t.get_bytes('foo'))
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
 
287
        finally:
 
288
            handle.close()
142
289
 
143
290
    def test_put_bytes(self):
144
291
        t = self.get_transport()
149
296
            return
150
297
 
151
298
        t.put_bytes('a', 'some text for a\n')
152
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
153
300
        self.check_transport_contents('some text for a\n', t, 'a')
154
301
 
155
302
        # The contents should be overwritten
167
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
168
315
            return
169
316
 
170
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
171
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
172
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
173
320
        self.check_transport_contents('some text for a\n', t, 'a')
174
321
        # Put also replaces contents
175
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
187
334
        # Now test the create_parent flag
188
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
189
336
                                       'contents\n')
190
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
191
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
192
339
                               create_parent_dir=True)
193
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
194
 
        
 
341
 
195
342
        # But we still get NoSuchFile if we can't make the parent dir
196
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
197
344
                                       'contents\n',
219
366
        umask = osutils.get_umask()
220
367
        t.put_bytes('nomode', 'test text\n', mode=None)
221
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
222
 
        
 
369
 
223
370
    def test_put_bytes_non_atomic_permissions(self):
224
371
        t = self.get_transport()
225
372
 
253
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
254
401
                               dir_mode=0777, create_parent_dir=True)
255
402
        self.assertTransportMode(t, 'dir777', 0777)
256
 
        
 
403
 
257
404
    def test_put_file(self):
258
405
        t = self.get_transport()
259
406
 
262
409
                    t.put_file, 'a', StringIO('some text for a\n'))
263
410
            return
264
411
 
265
 
        t.put_file('a', StringIO('some text for a\n'))
266
 
        self.failUnless(t.has('a'))
 
412
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        # put_file returns the length of the data written
 
414
        self.assertEqual(16, result)
 
415
        self.assertTrue(t.has('a'))
267
416
        self.check_transport_contents('some text for a\n', t, 'a')
268
417
        # Put also replaces contents
269
 
        t.put_file('a', StringIO('new\ncontents for\na\n'))
 
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        self.assertEqual(19, result)
270
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
271
421
        self.assertRaises(NoSuchFile,
272
422
                          t.put_file, 'path/doesnt/exist/c',
280
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
281
431
            return
282
432
 
283
 
        self.failIf(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
284
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
285
 
        self.failUnless(t.has('a'))
 
435
        self.assertTrue(t.has('a'))
286
436
        self.check_transport_contents('some text for a\n', t, 'a')
287
437
        # Put also replaces contents
288
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
300
450
        # Now test the create_parent flag
301
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
302
452
                                       StringIO('contents\n'))
303
 
        self.failIf(t.has('dir/a'))
 
453
        self.assertFalse(t.has('dir/a'))
304
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
305
455
                              create_parent_dir=True)
306
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
307
 
        
 
457
 
308
458
        # But we still get NoSuchFile if we can't make the parent dir
309
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
310
460
                                       StringIO('contents\n'),
328
478
        # Yes, you can put a file such that it becomes readonly
329
479
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
330
480
        self.assertTransportMode(t, 'mode400', 0400)
331
 
 
332
 
        # XXX: put_multi is deprecated, so do we really care anymore?
333
 
        self.applyDeprecated(zero_eleven, t.put_multi,
334
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
335
 
        self.assertTransportMode(t, 'mmode644', 0644)
336
 
 
337
481
        # The default permissions should be based on the current umask
338
482
        umask = osutils.get_umask()
339
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
340
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
341
 
        
 
485
 
342
486
    def test_put_file_non_atomic_permissions(self):
343
487
        t = self.get_transport()
344
488
 
361
505
        umask = osutils.get_umask()
362
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
363
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
364
 
        
 
508
 
365
509
        # We should also be able to set the mode for a parent directory
366
510
        # when it is created
367
511
        sio = StringIO()
375
519
                              dir_mode=0777, create_parent_dir=True)
376
520
        self.assertTransportMode(t, 'dir777', 0777)
377
521
 
378
 
    def test_put_multi(self):
379
 
        t = self.get_transport()
380
 
 
381
 
        if t.is_readonly():
382
 
            return
383
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
384
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
385
 
                          ('d', StringIO('contents\nfor d\n'))]
386
 
            ))
387
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
388
 
                [True, False, False, True])
389
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
390
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
391
 
 
392
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
393
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
394
 
                              ('d', StringIO('another contents\nfor d\n'))])
395
 
            ))
396
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
397
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
398
 
 
399
 
    def test_put_permissions(self):
400
 
        t = self.get_transport()
401
 
 
402
 
        if t.is_readonly():
403
 
            return
404
 
        if not t._can_roundtrip_unix_modebits():
405
 
            # Can't roundtrip, so no need to run this test
406
 
            return
407
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
408
 
                             StringIO('test text\n'), mode=0644)
409
 
        self.assertTransportMode(t, 'mode644', 0644)
410
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
411
 
                             StringIO('test text\n'), mode=0666)
412
 
        self.assertTransportMode(t, 'mode666', 0666)
413
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
414
 
                             StringIO('test text\n'), mode=0600)
415
 
        self.assertTransportMode(t, 'mode600', 0600)
416
 
        # Yes, you can put a file such that it becomes readonly
417
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
418
 
                             StringIO('test text\n'), mode=0400)
419
 
        self.assertTransportMode(t, 'mode400', 0400)
420
 
        self.applyDeprecated(zero_eleven, t.put_multi,
421
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
422
 
        self.assertTransportMode(t, 'mmode644', 0644)
423
 
 
424
 
        # The default permissions should be based on the current umask
425
 
        umask = osutils.get_umask()
426
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
427
 
                             StringIO('test text\n'), mode=None)
428
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
429
 
        
 
522
    def test_put_bytes_unicode(self):
 
523
        t = self.get_transport()
 
524
        if t.is_readonly():
 
525
            return
 
526
        unicode_string = u'\u1234'
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
528
 
430
529
    def test_mkdir(self):
431
530
        t = self.get_transport()
432
531
 
433
532
        if t.is_readonly():
434
 
            # cannot mkdir on readonly transports. We're not testing for 
 
533
            # cannot mkdir on readonly transports. We're not testing for
435
534
            # cache coherency because cache behaviour is not currently
436
535
            # defined for the transport interface.
437
536
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
458
557
 
459
558
        # we were testing that a local mkdir followed by a transport
460
559
        # mkdir failed thusly, but given that we * in one process * do not
461
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
560
        # concurrently fiddle with disk dirs and then use transport to do
462
561
        # things, the win here seems marginal compared to the constraint on
463
562
        # the interface. RBC 20051227
464
563
        t.mkdir('dir_g')
497
596
        t.mkdir('dnomode', mode=None)
498
597
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
499
598
 
 
599
    def test_opening_a_file_stream_creates_file(self):
 
600
        t = self.get_transport()
 
601
        if t.is_readonly():
 
602
            return
 
603
        handle = t.open_write_stream('foo')
 
604
        try:
 
605
            self.assertEqual('', t.get_bytes('foo'))
 
606
        finally:
 
607
            handle.close()
 
608
 
 
609
    def test_opening_a_file_stream_can_set_mode(self):
 
610
        t = self.get_transport()
 
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
 
614
            return
 
615
        if not t._can_roundtrip_unix_modebits():
 
616
            # Can't roundtrip, so no need to run this test
 
617
            return
 
618
 
 
619
        def check_mode(name, mode, expected):
 
620
            handle = t.open_write_stream(name, mode=mode)
 
621
            handle.close()
 
622
            self.assertTransportMode(t, name, expected)
 
623
        check_mode('mode644', 0644, 0644)
 
624
        check_mode('mode666', 0666, 0666)
 
625
        check_mode('mode600', 0600, 0600)
 
626
        # The default permissions should be based on the current umask
 
627
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
628
 
500
629
    def test_copy_to(self):
501
630
        # FIXME: test:   same server to same server (partly done)
502
631
        # same protocol two servers
503
632
        # and    different protocols (done for now except for MemoryTransport.
504
633
        # - RBC 20060122
505
 
        from bzrlib.transport.memory import MemoryTransport
506
634
 
507
635
        def simple_copy_files(transport_from, transport_to):
508
636
            files = ['a', 'b', 'c', 'd']
509
637
            self.build_tree(files, transport=transport_from)
510
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
511
639
            for f in files:
512
 
                self.check_transport_contents(transport_to.get(f).read(),
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
513
641
                                              transport_from, f)
514
642
 
515
643
        t = self.get_transport()
538
666
        files = ['a', 'b', 'c', 'd']
539
667
        t.copy_to(iter(files), temp_transport)
540
668
        for f in files:
541
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
669
            self.check_transport_contents(temp_transport.get_bytes(f),
542
670
                                          t, f)
543
671
        del temp_transport
544
672
 
548
676
            for f in files:
549
677
                self.assertTransportMode(temp_transport, f, mode)
550
678
 
551
 
    def test_append(self):
 
679
    def test_create_prefix(self):
552
680
        t = self.get_transport()
553
 
 
554
 
        if t.is_readonly():
555
 
            return
556
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
557
 
        t.put_bytes('b', 'contents\nfor b\n')
558
 
 
559
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
560
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
561
 
 
562
 
        self.check_transport_contents(
563
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
564
 
            t, 'a')
565
 
 
566
 
        # And we can create new files, too
567
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
568
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
569
 
        self.check_transport_contents('some text\nfor a missing file\n',
570
 
                                      t, 'c')
 
681
        sub = t.clone('foo').clone('bar')
 
682
        try:
 
683
            sub.create_prefix()
 
684
        except TransportNotPossible:
 
685
            self.assertTrue(t.is_readonly())
 
686
        else:
 
687
            self.assertTrue(t.has('foo/bar'))
 
688
 
571
689
    def test_append_file(self):
572
690
        t = self.get_transport()
573
691
 
677
795
                t.append_file, 'f', StringIO('f'), mode=None)
678
796
            return
679
797
        t.append_file('f', StringIO('f'), mode=None)
680
 
        
 
798
 
681
799
    def test_append_bytes_mode(self):
682
800
        # check append_bytes accepts a mode
683
801
        t = self.get_transport()
686
804
                t.append_bytes, 'f', 'f', mode=None)
687
805
            return
688
806
        t.append_bytes('f', 'f', mode=None)
689
 
        
 
807
 
690
808
    def test_delete(self):
691
809
        # TODO: Test Transport.delete
692
810
        t = self.get_transport()
697
815
            return
698
816
 
699
817
        t.put_bytes('a', 'a little bit of text\n')
700
 
        self.failUnless(t.has('a'))
 
818
        self.assertTrue(t.has('a'))
701
819
        t.delete('a')
702
 
        self.failIf(t.has('a'))
 
820
        self.assertFalse(t.has('a'))
703
821
 
704
822
        self.assertRaises(NoSuchFile, t.delete, 'a')
705
823
 
711
829
        t.delete_multi(['a', 'c'])
712
830
        self.assertEqual([False, True, False],
713
831
                list(t.has_multi(['a', 'b', 'c'])))
714
 
        self.failIf(t.has('a'))
715
 
        self.failUnless(t.has('b'))
716
 
        self.failIf(t.has('c'))
 
832
        self.assertFalse(t.has('a'))
 
833
        self.assertTrue(t.has('b'))
 
834
        self.assertFalse(t.has('c'))
717
835
 
718
836
        self.assertRaises(NoSuchFile,
719
837
                t.delete_multi, ['a', 'b', 'c'])
731
849
        # plain "listdir".
732
850
        # self.assertEqual([], os.listdir('.'))
733
851
 
 
852
    def test_recommended_page_size(self):
 
853
        """Transports recommend a page size for partial access to files."""
 
854
        t = self.get_transport()
 
855
        self.assertIsInstance(t.recommended_page_size(), int)
 
856
 
734
857
    def test_rmdir(self):
735
858
        t = self.get_transport()
736
859
        # Not much to do with a readonly transport
748
871
 
749
872
    def test_rmdir_not_empty(self):
750
873
        """Deleting a non-empty directory raises an exception
751
 
        
 
874
 
752
875
        sftp (and possibly others) don't give us a specific "directory not
753
876
        empty" exception -- we can just see that the operation failed.
754
877
        """
759
882
        t.mkdir('adir/bdir')
760
883
        self.assertRaises(PathError, t.rmdir, 'adir')
761
884
 
 
885
    def test_rmdir_empty_but_similar_prefix(self):
 
886
        """rmdir does not get confused by sibling paths.
 
887
 
 
888
        A naive implementation of MemoryTransport would refuse to rmdir
 
889
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
890
        uses "path.startswith(dir)" on all file paths to determine if directory
 
891
        is empty.
 
892
        """
 
893
        t = self.get_transport()
 
894
        if t.is_readonly():
 
895
            return
 
896
        t.mkdir('foo')
 
897
        t.put_bytes('foo-bar', '')
 
898
        t.mkdir('foo-baz')
 
899
        t.rmdir('foo')
 
900
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
901
        self.assertTrue(t.has('foo-bar'))
 
902
 
762
903
    def test_rename_dir_succeeds(self):
763
904
        t = self.get_transport()
764
905
        if t.is_readonly():
765
 
            raise TestSkipped("transport is readonly")
 
906
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
907
                              t.rename, 'foo', 'bar')
 
908
            return
766
909
        t.mkdir('adir')
767
910
        t.mkdir('adir/asubdir')
768
911
        t.rename('adir', 'bdir')
773
916
        """Attempting to replace a nonemtpy directory should fail"""
774
917
        t = self.get_transport()
775
918
        if t.is_readonly():
776
 
            raise TestSkipped("transport is readonly")
 
919
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
920
                              t.rename, 'foo', 'bar')
 
921
            return
777
922
        t.mkdir('adir')
778
923
        t.mkdir('adir/asubdir')
779
924
        t.mkdir('bdir')
786
931
        self.assertFalse(t.has('adir/bdir'))
787
932
        self.assertFalse(t.has('adir/bsubdir'))
788
933
 
 
934
    def test_rename_across_subdirs(self):
 
935
        t = self.get_transport()
 
936
        if t.is_readonly():
 
937
            raise TestNotApplicable("transport is readonly")
 
938
        t.mkdir('a')
 
939
        t.mkdir('b')
 
940
        ta = t.clone('a')
 
941
        tb = t.clone('b')
 
942
        ta.put_bytes('f', 'aoeu')
 
943
        ta.rename('f', '../b/f')
 
944
        self.assertTrue(tb.has('f'))
 
945
        self.assertFalse(ta.has('f'))
 
946
        self.assertTrue(t.has('b/f'))
 
947
 
789
948
    def test_delete_tree(self):
790
949
        t = self.get_transport()
791
950
 
801
960
        except TransportNotPossible:
802
961
            # ok, this transport does not support delete_tree
803
962
            return
804
 
        
 
963
 
805
964
        # did it delete that trivial case?
806
965
        self.assertRaises(NoSuchFile, t.stat, 'adir')
807
966
 
808
967
        self.build_tree(['adir/',
809
 
                         'adir/file', 
810
 
                         'adir/subdir/', 
811
 
                         'adir/subdir/file', 
 
968
                         'adir/file',
 
969
                         'adir/subdir/',
 
970
                         'adir/subdir/file',
812
971
                         'adir/subdir2/',
813
972
                         'adir/subdir2/file',
814
973
                         ], transport=t)
829
988
        # perhaps all of this could be done in a subdirectory
830
989
 
831
990
        t.put_bytes('a', 'a first file\n')
832
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
991
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
833
992
 
834
993
        t.move('a', 'b')
835
 
        self.failUnless(t.has('b'))
836
 
        self.failIf(t.has('a'))
 
994
        self.assertTrue(t.has('b'))
 
995
        self.assertFalse(t.has('a'))
837
996
 
838
997
        self.check_transport_contents('a first file\n', t, 'b')
839
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
998
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
840
999
 
841
1000
        # Overwrite a file
842
1001
        t.put_bytes('c', 'c this file\n')
843
1002
        t.move('c', 'b')
844
 
        self.failIf(t.has('c'))
 
1003
        self.assertFalse(t.has('c'))
845
1004
        self.check_transport_contents('c this file\n', t, 'b')
846
1005
 
847
1006
        # TODO: Try to write a test for atomicity
848
 
        # TODO: Test moving into a non-existant subdirectory
 
1007
        # TODO: Test moving into a non-existent subdirectory
849
1008
        # TODO: Test Transport.move_multi
850
1009
 
851
1010
    def test_copy(self):
871
1030
 
872
1031
    def test_connection_error(self):
873
1032
        """ConnectionError is raised when connection is impossible.
874
 
        
875
 
        The error may be raised from either the constructor or the first
876
 
        operation on the transport.
 
1033
 
 
1034
        The error should be raised from the first operation on the transport.
877
1035
        """
878
1036
        try:
879
1037
            url = self._server.get_bogus_url()
880
1038
        except NotImplementedError:
881
1039
            raise TestSkipped("Transport %s has no bogus URL support." %
882
1040
                              self._server.__class__)
883
 
        # This should be:  but SSH still connects on construction. No COOKIE!
884
 
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
885
 
        try:
886
 
            t = bzrlib.transport.get_transport(url)
887
 
            t.get('.bzr/branch')
888
 
        except (ConnectionError, NoSuchFile), e:
889
 
            pass
890
 
        except (Exception), e:
891
 
            self.fail('Wrong exception thrown (%s.%s): %s' 
892
 
                        % (e.__class__.__module__, e.__class__.__name__, e))
893
 
        else:
894
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1041
        t = _mod_transport.get_transport_from_url(url)
 
1042
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
895
1043
 
896
1044
    def test_stat(self):
897
1045
        # TODO: Test stat, just try once, and if it throws, stop testing
906
1054
            return
907
1055
 
908
1056
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
909
 
        sizes = [14, 0, 16, 0, 18] 
 
1057
        sizes = [14, 0, 16, 0, 18]
910
1058
        self.build_tree(paths, transport=t, line_endings='binary')
911
1059
 
912
1060
        for path, size in zip(paths, sizes):
913
1061
            st = t.stat(path)
914
1062
            if path.endswith('/'):
915
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1063
                self.assertTrue(S_ISDIR(st.st_mode))
916
1064
                # directory sizes are meaningless
917
1065
            else:
918
 
                self.failUnless(S_ISREG(st.st_mode))
 
1066
                self.assertTrue(S_ISREG(st.st_mode))
919
1067
                self.assertEqual(size, st.st_size)
920
1068
 
921
1069
        remote_stats = list(t.stat_multi(paths))
928
1076
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
929
1077
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
930
1078
        subdir = t.clone('subdir')
931
 
        subdir.stat('./file')
932
 
        subdir.stat('.')
 
1079
        st = subdir.stat('./file')
 
1080
        st = subdir.stat('.')
 
1081
 
 
1082
    def test_hardlink(self):
 
1083
        from stat import ST_NLINK
 
1084
 
 
1085
        t = self.get_transport()
 
1086
 
 
1087
        source_name = "original_target"
 
1088
        link_name = "target_link"
 
1089
 
 
1090
        self.build_tree([source_name], transport=t)
 
1091
 
 
1092
        try:
 
1093
            t.hardlink(source_name, link_name)
 
1094
 
 
1095
            self.assertTrue(t.has(source_name))
 
1096
            self.assertTrue(t.has(link_name))
 
1097
 
 
1098
            st = t.stat(link_name)
 
1099
            self.assertEqual(st[ST_NLINK], 2)
 
1100
        except TransportNotPossible:
 
1101
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1102
                              self._server.__class__)
 
1103
 
 
1104
    def test_symlink(self):
 
1105
        from stat import S_ISLNK
 
1106
 
 
1107
        t = self.get_transport()
 
1108
 
 
1109
        source_name = "original_target"
 
1110
        link_name = "target_link"
 
1111
 
 
1112
        self.build_tree([source_name], transport=t)
 
1113
 
 
1114
        try:
 
1115
            t.symlink(source_name, link_name)
 
1116
 
 
1117
            self.assertTrue(t.has(source_name))
 
1118
            self.assertTrue(t.has(link_name))
 
1119
 
 
1120
            st = t.stat(link_name)
 
1121
            self.assertTrue(S_ISLNK(st.st_mode),
 
1122
                "expected symlink, got mode %o" % st.st_mode)
 
1123
        except TransportNotPossible:
 
1124
            raise TestSkipped("Transport %s does not support symlinks." %
 
1125
                              self._server.__class__)
 
1126
        except IOError:
 
1127
            self.knownFailure("Paramiko fails to create symlinks during tests")
933
1128
 
934
1129
    def test_list_dir(self):
935
1130
        # TODO: Test list_dir, just try once, and if it throws, stop testing
936
1131
        t = self.get_transport()
937
 
        
 
1132
 
938
1133
        if not t.listable():
939
1134
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
940
1135
            return
941
1136
 
942
 
        def sorted_list(d):
943
 
            l = list(t.list_dir(d))
 
1137
        def sorted_list(d, transport):
 
1138
            l = list(transport.list_dir(d))
944
1139
            l.sort()
945
1140
            return l
946
1141
 
947
 
        self.assertEqual([], sorted_list('.'))
 
1142
        self.assertEqual([], sorted_list('.', t))
948
1143
        # c2 is precisely one letter longer than c here to test that
949
1144
        # suffixing is not confused.
950
1145
        # a%25b checks that quoting is done consistently across transports
956
1151
            self.build_tree(tree_names)
957
1152
 
958
1153
        self.assertEqual(
959
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
960
 
        self.assertEqual(['d', 'e'], sorted_list('c'))
 
1154
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1155
        self.assertEqual(
 
1156
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1157
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1158
 
 
1159
        # Cloning the transport produces an equivalent listing
 
1160
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
961
1161
 
962
1162
        if not t.is_readonly():
963
1163
            t.delete('c/d')
965
1165
        else:
966
1166
            os.unlink('c/d')
967
1167
            os.unlink('b')
968
 
            
969
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
970
 
        self.assertEqual(['e'], sorted_list('c'))
 
1168
 
 
1169
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1170
        self.assertEqual(['e'], sorted_list('c', t))
971
1171
 
972
1172
        self.assertListRaises(PathError, t.list_dir, 'q')
973
1173
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1174
        # 'a' is a file, list_dir should raise an error
974
1175
        self.assertListRaises(PathError, t.list_dir, 'a')
975
1176
 
976
1177
    def test_list_dir_result_is_url_escaped(self):
982
1183
            self.build_tree(['a/', 'a/%'], transport=t)
983
1184
        else:
984
1185
            self.build_tree(['a/', 'a/%'])
985
 
        
 
1186
 
986
1187
        names = list(t.list_dir('a'))
987
1188
        self.assertEqual(['%25'], names)
988
1189
        self.assertIsInstance(names[0], str)
989
1190
 
 
1191
    def test_clone_preserve_info(self):
 
1192
        t1 = self.get_transport()
 
1193
        if not isinstance(t1, ConnectedTransport):
 
1194
            raise TestSkipped("not a connected transport")
 
1195
 
 
1196
        t2 = t1.clone('subdir')
 
1197
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1202
 
 
1203
    def test__reuse_for(self):
 
1204
        t = self.get_transport()
 
1205
        if not isinstance(t, ConnectedTransport):
 
1206
            raise TestSkipped("not a connected transport")
 
1207
 
 
1208
        def new_url(scheme=None, user=None, password=None,
 
1209
                    host=None, port=None, path=None):
 
1210
            """Build a new url from t.base changing only parts of it.
 
1211
 
 
1212
            Only the parameters different from None will be changed.
 
1213
            """
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1222
 
 
1223
        if t._parsed_url.scheme == 'ftp':
 
1224
            scheme = 'sftp'
 
1225
        else:
 
1226
            scheme = 'ftp'
 
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1228
        if t._parsed_url.user == 'me':
 
1229
            user = 'you'
 
1230
        else:
 
1231
            user = 'me'
 
1232
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1233
        # passwords are not taken into account because:
 
1234
        # - it makes no sense to have two different valid passwords for the
 
1235
        #   same user
 
1236
        # - _password in ConnectedTransport is intended to collect what the
 
1237
        #   user specified from the command-line and there are cases where the
 
1238
        #   new url can contain no password (if the url was built from an
 
1239
        #   existing transport.base for example)
 
1240
        # - password are considered part of the credentials provided at
 
1241
        #   connection creation time and as such may not be present in the url
 
1242
        #   (they may be typed by the user when prompted for example)
 
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1244
        # We will not connect, we can use a invalid host
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
 
1247
            port = 4321
 
1248
        else:
 
1249
            port = 1234
 
1250
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1251
        # No point in trying to reuse a transport for a local URL
 
1252
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1253
 
 
1254
    def test_connection_sharing(self):
 
1255
        t = self.get_transport()
 
1256
        if not isinstance(t, ConnectedTransport):
 
1257
            raise TestSkipped("not a connected transport")
 
1258
 
 
1259
        c = t.clone('subdir')
 
1260
        # Some transports will create the connection  only when needed
 
1261
        t.has('surely_not') # Force connection
 
1262
        self.assertIs(t._get_connection(), c._get_connection())
 
1263
 
 
1264
        # Temporary failure, we need to create a new dummy connection
 
1265
        new_connection = None
 
1266
        t._set_connection(new_connection)
 
1267
        # Check that both transports use the same connection
 
1268
        self.assertIs(new_connection, t._get_connection())
 
1269
        self.assertIs(new_connection, c._get_connection())
 
1270
 
 
1271
    def test_reuse_connection_for_various_paths(self):
 
1272
        t = self.get_transport()
 
1273
        if not isinstance(t, ConnectedTransport):
 
1274
            raise TestSkipped("not a connected transport")
 
1275
 
 
1276
        t.has('surely_not') # Force connection
 
1277
        self.assertIsNot(None, t._get_connection())
 
1278
 
 
1279
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1280
        self.assertIsNot(t, subdir)
 
1281
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1282
 
 
1283
        home = subdir._reuse_for(t.base + 'home')
 
1284
        self.assertIs(t._get_connection(), home._get_connection())
 
1285
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1286
 
990
1287
    def test_clone(self):
991
1288
        # TODO: Test that clone moves up and down the filesystem
992
1289
        t1 = self.get_transport()
993
 
        if isinstance(t1, chroot.ChrootTransportDecorator):
994
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
995
1290
 
996
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
997
1292
 
998
 
        self.failUnless(t1.has('a'))
999
 
        self.failUnless(t1.has('b/c'))
1000
 
        self.failIf(t1.has('c'))
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
1001
1296
 
1002
1297
        t2 = t1.clone('b')
1003
1298
        self.assertEqual(t1.base + 'b/', t2.base)
1004
1299
 
1005
 
        self.failUnless(t2.has('c'))
1006
 
        self.failIf(t2.has('a'))
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
1007
1302
 
1008
1303
        t3 = t2.clone('..')
1009
 
        self.failUnless(t3.has('a'))
1010
 
        self.failIf(t3.has('c'))
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
1011
1306
 
1012
 
        self.failIf(t1.has('b/d'))
1013
 
        self.failIf(t2.has('d'))
1014
 
        self.failIf(t3.has('b/d'))
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
1015
1310
 
1016
1311
        if t1.is_readonly():
1017
 
            open('b/d', 'wb').write('newfile\n')
 
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
1018
1313
        else:
1019
1314
            t2.put_bytes('d', 'newfile\n')
1020
1315
 
1021
 
        self.failUnless(t1.has('b/d'))
1022
 
        self.failUnless(t2.has('d'))
1023
 
        self.failUnless(t3.has('b/d'))
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
1024
1319
 
1025
1320
    def test_clone_to_root(self):
1026
1321
        orig_transport = self.get_transport()
1027
 
        if isinstance(orig_transport, chroot.ChrootTransportDecorator):
1028
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1029
1322
        # Repeatedly go up to a parent directory until we're at the root
1030
1323
        # directory of this transport
1031
1324
        root_transport = orig_transport
1054
1347
    def test_clone_from_root(self):
1055
1348
        """At the root, cloning to a simple dir should just do string append."""
1056
1349
        orig_transport = self.get_transport()
1057
 
        if isinstance(orig_transport, chroot.ChrootTransportDecorator):
1058
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('/')")
1059
1350
        root_transport = orig_transport.clone('/')
1060
1351
        self.assertEqual(root_transport.base + '.bzr/',
1061
1352
            root_transport.clone('.bzr').base)
1069
1360
        self.assertEqual('', t.relpath(t.base))
1070
1361
        # base ends with /
1071
1362
        self.assertEqual('', t.relpath(t.base[:-1]))
1072
 
        # subdirs which dont exist should still give relpaths.
 
1363
        # subdirs which don't exist should still give relpaths.
1073
1364
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1074
1365
        # trailing slash should be the same.
1075
1366
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1076
1367
 
1077
1368
    def test_relpath_at_root(self):
1078
1369
        t = self.get_transport()
1079
 
        if isinstance(t, chroot.ChrootTransportDecorator):
1080
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1081
1370
        # clone all the way to the top
1082
1371
        new_transport = t.clone('..')
1083
1372
        while new_transport.base != t.base:
1093
1382
        # that have aliasing problems like symlinks should go in backend
1094
1383
        # specific test cases.
1095
1384
        transport = self.get_transport()
1096
 
        if isinstance(transport, chroot.ChrootTransportDecorator):
1097
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1098
 
        
 
1385
 
1099
1386
        self.assertEqual(transport.base + 'relpath',
1100
1387
                         transport.abspath('relpath'))
1101
1388
 
1108
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
1109
1396
                         transport.abspath("/foo"))
1110
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1399
    def test_win32_abspath(self):
 
1400
        # Note: we tried to set sys.platform='win32' so we could test on
 
1401
        # other platforms too, but then osutils does platform specific
 
1402
        # things at import time which defeated us...
 
1403
        if sys.platform != 'win32':
 
1404
            raise TestSkipped(
 
1405
                'Testing drive letters in abspath implemented only for win32')
 
1406
 
 
1407
        # smoke test for abspath on win32.
 
1408
        # a transport based on 'file:///' never fully qualifies the drive.
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
 
 
1412
        # but a transport that starts with a drive spec must keep it.
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
 
1111
1416
    def test_local_abspath(self):
1112
1417
        transport = self.get_transport()
1113
1418
        try:
1114
1419
            p = transport.local_abspath('.')
1115
 
        except TransportNotPossible:
1116
 
            pass # This is not a local transport
 
1420
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1421
            # should be formattable
 
1422
            s = str(e)
1117
1423
        else:
1118
1424
            self.assertEqual(getcwd(), p)
1119
1425
 
1120
1426
    def test_abspath_at_root(self):
1121
1427
        t = self.get_transport()
1122
 
        if isinstance(t, chroot.ChrootTransportDecorator):
1123
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1124
1428
        # clone all the way to the top
1125
1429
        new_transport = t.clone('..')
1126
1430
        while new_transport.base != t.base:
1191
1495
                         'to/dir/b%2525z',
1192
1496
                         'to/bar',]))
1193
1497
 
 
1498
    def test_copy_tree_to_transport(self):
 
1499
        transport = self.get_transport()
 
1500
        if not transport.listable():
 
1501
            self.assertRaises(TransportNotPossible,
 
1502
                              transport.iter_files_recursive)
 
1503
            return
 
1504
        if transport.is_readonly():
 
1505
            return
 
1506
        self.build_tree(['from/',
 
1507
                         'from/dir/',
 
1508
                         'from/dir/foo',
 
1509
                         'from/dir/bar',
 
1510
                         'from/dir/b%25z', # make sure quoting is correct
 
1511
                         'from/bar'],
 
1512
                        transport=transport)
 
1513
        from_transport = transport.clone('from')
 
1514
        to_transport = transport.clone('to')
 
1515
        to_transport.ensure_base()
 
1516
        from_transport.copy_tree_to_transport(to_transport)
 
1517
        paths = set(transport.iter_files_recursive())
 
1518
        self.assertEqual(paths,
 
1519
                    set(['from/dir/foo',
 
1520
                         'from/dir/bar',
 
1521
                         'from/dir/b%2525z',
 
1522
                         'from/bar',
 
1523
                         'to/dir/foo',
 
1524
                         'to/dir/bar',
 
1525
                         'to/dir/b%2525z',
 
1526
                         'to/bar',]))
 
1527
 
1194
1528
    def test_unicode_paths(self):
1195
1529
        """Test that we can read/write files with Unicode names."""
1196
1530
        t = self.get_transport()
1207
1541
                 u'\u65e5', # Kanji person
1208
1542
                ]
1209
1543
 
 
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1545
        if no_unicode_support:
 
1546
            self.knownFailure("test server cannot handle unicode paths")
 
1547
 
1210
1548
        try:
1211
1549
            self.build_tree(files, transport=t, line_endings='binary')
1212
1550
        except UnicodeError:
1222
1560
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1223
1561
 
1224
1562
    def test_connect_twice_is_same_content(self):
1225
 
        # check that our server (whatever it is) is accessable reliably
 
1563
        # check that our server (whatever it is) is accessible reliably
1226
1564
        # via get_transport and multiple connections share content.
1227
1565
        transport = self.get_transport()
1228
 
        if isinstance(transport, chroot.ChrootTransportDecorator):
1229
 
            raise TestSkipped("ChrootTransportDecorator disallows clone('..')")
1230
1566
        if transport.is_readonly():
1231
1567
            return
1232
1568
        transport.put_bytes('foo', 'bar')
1233
 
        transport2 = self.get_transport()
1234
 
        self.check_transport_contents('bar', transport2, 'foo')
1235
 
        # its base should be usable.
1236
 
        transport2 = bzrlib.transport.get_transport(transport.base)
1237
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1569
        transport3 = self.get_transport()
 
1570
        self.check_transport_contents('bar', transport3, 'foo')
1238
1571
 
1239
1572
        # now opening at a relative url should give use a sane result:
1240
1573
        transport.mkdir('newdir')
1241
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1242
 
        transport2 = transport2.clone('..')
1243
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1574
        transport5 = self.get_transport('newdir')
 
1575
        transport6 = transport5.clone('..')
 
1576
        self.check_transport_contents('bar', transport6, 'foo')
1244
1577
 
1245
1578
    def test_lock_write(self):
1246
1579
        """Test transport-level write locks.
1281
1614
    def test_readv(self):
1282
1615
        transport = self.get_transport()
1283
1616
        if transport.is_readonly():
1284
 
            file('a', 'w').write('0123456789')
 
1617
            with file('a', 'w') as f: f.write('0123456789')
1285
1618
        else:
1286
1619
            transport.put_bytes('a', '0123456789')
1287
1620
 
1297
1630
    def test_readv_out_of_order(self):
1298
1631
        transport = self.get_transport()
1299
1632
        if transport.is_readonly():
1300
 
            file('a', 'w').write('0123456789')
 
1633
            with file('a', 'w') as f: f.write('0123456789')
1301
1634
        else:
1302
1635
            transport.put_bytes('a', '01234567890')
1303
1636
 
1307
1640
        self.assertEqual(d[2], (0, '0'))
1308
1641
        self.assertEqual(d[3], (3, '34'))
1309
1642
 
 
1643
    def test_readv_with_adjust_for_latency(self):
 
1644
        transport = self.get_transport()
 
1645
        # the adjust for latency flag expands the data region returned
 
1646
        # according to a per-transport heuristic, so testing is a little
 
1647
        # tricky as we need more data than the largest combining that our
 
1648
        # transports do. To accomodate this we generate random data and cross
 
1649
        # reference the returned data with the random data. To avoid doing
 
1650
        # multiple large random byte look ups we do several tests on the same
 
1651
        # backing data.
 
1652
        content = osutils.rand_bytes(200*1024)
 
1653
        content_size = len(content)
 
1654
        if transport.is_readonly():
 
1655
            self.build_tree_contents([('a', content)])
 
1656
        else:
 
1657
            transport.put_bytes('a', content)
 
1658
        def check_result_data(result_vector):
 
1659
            for item in result_vector:
 
1660
                data_len = len(item[1])
 
1661
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1662
 
 
1663
        # start corner case
 
1664
        result = list(transport.readv('a', ((0, 30),),
 
1665
            adjust_for_latency=True, upper_limit=content_size))
 
1666
        # we expect 1 result, from 0, to something > 30
 
1667
        self.assertEqual(1, len(result))
 
1668
        self.assertEqual(0, result[0][0])
 
1669
        self.assertTrue(len(result[0][1]) >= 30)
 
1670
        check_result_data(result)
 
1671
        # end of file corner case
 
1672
        result = list(transport.readv('a', ((204700, 100),),
 
1673
            adjust_for_latency=True, upper_limit=content_size))
 
1674
        # we expect 1 result, from 204800- its length, to the end
 
1675
        self.assertEqual(1, len(result))
 
1676
        data_len = len(result[0][1])
 
1677
        self.assertEqual(204800-data_len, result[0][0])
 
1678
        self.assertTrue(data_len >= 100)
 
1679
        check_result_data(result)
 
1680
        # out of order ranges are made in order
 
1681
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1682
            adjust_for_latency=True, upper_limit=content_size))
 
1683
        # we expect 2 results, in order, start and end.
 
1684
        self.assertEqual(2, len(result))
 
1685
        # start
 
1686
        data_len = len(result[0][1])
 
1687
        self.assertEqual(0, result[0][0])
 
1688
        self.assertTrue(data_len >= 30)
 
1689
        # end
 
1690
        data_len = len(result[1][1])
 
1691
        self.assertEqual(204800-data_len, result[1][0])
 
1692
        self.assertTrue(data_len >= 100)
 
1693
        check_result_data(result)
 
1694
        # close ranges get combined (even if out of order)
 
1695
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1696
            result = list(transport.readv('a', request_vector,
 
1697
                adjust_for_latency=True, upper_limit=content_size))
 
1698
            self.assertEqual(1, len(result))
 
1699
            data_len = len(result[0][1])
 
1700
            # minimum length is from 400 to 1034 - 634
 
1701
            self.assertTrue(data_len >= 634)
 
1702
            # must contain the region 400 to 1034
 
1703
            self.assertTrue(result[0][0] <= 400)
 
1704
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1705
            check_result_data(result)
 
1706
 
 
1707
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1708
        transport = self.get_transport()
 
1709
        # test from observed failure case.
 
1710
        if transport.is_readonly():
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1712
        else:
 
1713
            transport.put_bytes('a', 'a'*1024*1024)
 
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1715
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1716
            (465373, 800), (947422, 800)]
 
1717
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1718
        found_items = [False]*9
 
1719
        for pos, (start, length) in enumerate(broken_vector):
 
1720
            # check the range is covered by the result
 
1721
            for offset, data in results:
 
1722
                if offset <= start and start + length <= offset + len(data):
 
1723
                    found_items[pos] = True
 
1724
        self.assertEqual([True]*9, found_items)
 
1725
 
 
1726
    def test_get_with_open_write_stream_sees_all_content(self):
 
1727
        t = self.get_transport()
 
1728
        if t.is_readonly():
 
1729
            return
 
1730
        handle = t.open_write_stream('foo')
 
1731
        try:
 
1732
            handle.write('bcd')
 
1733
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1734
        finally:
 
1735
            handle.close()
 
1736
 
1310
1737
    def test_get_smart_medium(self):
1311
1738
        """All transports must either give a smart medium, or know they can't.
1312
1739
        """
1313
1740
        transport = self.get_transport()
1314
1741
        try:
1315
 
            medium = transport.get_smart_medium()
1316
 
            self.assertIsInstance(medium, smart.SmartClientMedium)
 
1742
            client_medium = transport.get_smart_medium()
 
1743
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1317
1744
        except errors.NoSmartMedium:
1318
1745
            # as long as we got it we're fine
1319
1746
            pass
1321
1748
    def test_readv_short_read(self):
1322
1749
        transport = self.get_transport()
1323
1750
        if transport.is_readonly():
1324
 
            file('a', 'w').write('0123456789')
 
1751
            with file('a', 'w') as f: f.write('0123456789')
1325
1752
        else:
1326
1753
            transport.put_bytes('a', '01234567890')
1327
1754
 
1336
1763
        # also raise a special error
1337
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1338
1765
                              transport.readv, 'a', [(12,2)])
 
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEqual({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEqual({}, transport.get_segment_parameters())
 
1792
        self.assertEqual(orig_base, transport.base)
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)