~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2010-10-26 08:08:23 UTC
  • mfrom: (5514.1.1 665100-content-type)
  • mto: This revision was merged to the branch mainline in revision 5516.
  • Revision ID: v.ladeuil+lp@free.fr-20101026080823-3wggo03b7cpn9908
Correctly set the Content-Type header when POSTing http requests

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
27
29
 
28
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
29
 
                           LockError, PathError,
30
 
                           TransportNotPossible, ConnectionError,
31
 
                           InvalidURL)
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    urlutils,
 
36
    )
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           FileExists,
 
39
                           InvalidURL,
 
40
                           NoSuchFile,
 
41
                           PathError,
 
42
                           TransportNotPossible,
 
43
                           )
32
44
from bzrlib.osutils import getcwd
33
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
34
 
from bzrlib.transport import memory
35
 
import bzrlib.transport
36
 
import bzrlib.urlutils as urlutils
37
 
 
38
 
 
39
 
def _append(fn, txt):
40
 
    """Append the given text (file-like object) to the supplied filename."""
41
 
    f = open(fn, 'ab')
42
 
    try:
43
 
        f.write(txt.read())
44
 
    finally:
45
 
        f.close()
46
 
 
47
 
 
48
 
class TestTransportImplementation(TestCaseInTempDir):
49
 
    """Implementation verification for transports.
50
 
    
51
 
    To verify a transport we need a server factory, which is a callable
52
 
    that accepts no parameters and returns an implementation of
53
 
    bzrlib.transport.Server.
54
 
    
55
 
    That Server is then used to construct transport instances and test
56
 
    the transport via loopback activity.
57
 
 
58
 
    Currently this assumes that the Transport object is connected to the 
59
 
    current working directory.  So that whatever is done 
60
 
    through the transport, should show up in the working 
61
 
    directory, and vice-versa. This is a bug, because its possible to have
62
 
    URL schemes which provide access to something that may not be 
63
 
    result in storage on the local disk, i.e. due to file system limits, or 
64
 
    due to it being a database or some other non-filesystem tool.
65
 
 
66
 
    This also tests to make sure that the functions work with both
67
 
    generators and lists (assuming iter(list) is effectively a generator)
68
 
    """
69
 
    
 
45
from bzrlib.smart import medium
 
46
from bzrlib.tests import (
 
47
    TestSkipped,
 
48
    TestNotApplicable,
 
49
    multiply_tests,
 
50
    )
 
51
from bzrlib.tests import test_server
 
52
from bzrlib.tests.test_transport import TestTransportImplementation
 
53
from bzrlib.transport import (
 
54
    ConnectedTransport,
 
55
    get_transport,
 
56
    _get_transport_modules,
 
57
    )
 
58
from bzrlib.transport.memory import MemoryTransport
 
59
 
 
60
 
 
61
def get_transport_test_permutations(module):
 
62
    """Get the permutations module wants to have tested."""
 
63
    if getattr(module, 'get_test_permutations', None) is None:
 
64
        raise AssertionError(
 
65
            "transport module %s doesn't provide get_test_permutations()"
 
66
            % module.__name__)
 
67
        return []
 
68
    return module.get_test_permutations()
 
69
 
 
70
 
 
71
def transport_test_permutations():
 
72
    """Return a list of the klass, server_factory pairs to test."""
 
73
    result = []
 
74
    for module in _get_transport_modules():
 
75
        try:
 
76
            permutations = get_transport_test_permutations(
 
77
                pyutils.get_named_object(module))
 
78
            for (klass, server_factory) in permutations:
 
79
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
80
                    {"transport_class":klass,
 
81
                     "transport_server":server_factory})
 
82
                result.append(scenario)
 
83
        except errors.DependencyNotPresent, e:
 
84
            # Continue even if a dependency prevents us
 
85
            # from adding this test
 
86
            pass
 
87
    return result
 
88
 
 
89
 
 
90
def load_tests(standard_tests, module, loader):
 
91
    """Multiply tests for tranport implementations."""
 
92
    result = loader.suiteClass()
 
93
    scenarios = transport_test_permutations()
 
94
    return multiply_tests(standard_tests, scenarios, result)
 
95
 
 
96
 
 
97
class TransportTests(TestTransportImplementation):
 
98
 
70
99
    def setUp(self):
71
 
        super(TestTransportImplementation, self).setUp()
72
 
        self._server = self.transport_server()
73
 
        self._server.setUp()
 
100
        super(TransportTests, self).setUp()
 
101
        self._captureVar('BZR_NO_SMART_VFS', None)
74
102
 
75
 
    def tearDown(self):
76
 
        super(TestTransportImplementation, self).tearDown()
77
 
        self._server.tearDown()
78
 
        
79
103
    def check_transport_contents(self, content, transport, relpath):
80
104
        """Check that transport.get(relpath).read() == content."""
81
105
        self.assertEqualDiff(content, transport.get(relpath).read())
82
106
 
83
 
    def get_transport(self):
84
 
        """Return a connected transport to the local directory."""
85
 
        base_url = self._server.get_url()
86
 
        t = bzrlib.transport.get_transport(base_url)
87
 
        if not isinstance(t, self.transport_class):
88
 
            # we want to make sure to construct one particular class, even if
89
 
            # there are several available implementations of this transport;
90
 
            # therefore construct it by hand rather than through the regular
91
 
            # get_transport method
92
 
            t = self.transport_class(base_url)
93
 
        return t
94
 
 
95
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
96
 
        """Fail unless excClass is raised when the iterator from func is used.
97
 
        
98
 
        Many transport functions can return generators this makes sure
99
 
        to wrap them in a list() call to make sure the whole generator
100
 
        is run, and that the proper exception is raised.
101
 
        """
 
107
    def test_ensure_base_missing(self):
 
108
        """.ensure_base() should create the directory if it doesn't exist"""
 
109
        t = self.get_transport()
 
110
        t_a = t.clone('a')
 
111
        if t_a.is_readonly():
 
112
            self.assertRaises(TransportNotPossible,
 
113
                              t_a.ensure_base)
 
114
            return
 
115
        self.assertTrue(t_a.ensure_base())
 
116
        self.assertTrue(t.has('a'))
 
117
 
 
118
    def test_ensure_base_exists(self):
 
119
        """.ensure_base() should just be happy if it already exists"""
 
120
        t = self.get_transport()
 
121
        if t.is_readonly():
 
122
            return
 
123
 
 
124
        t.mkdir('a')
 
125
        t_a = t.clone('a')
 
126
        # ensure_base returns False if it didn't create the base
 
127
        self.assertFalse(t_a.ensure_base())
 
128
 
 
129
    def test_ensure_base_missing_parent(self):
 
130
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
131
        t = self.get_transport()
 
132
        if t.is_readonly():
 
133
            return
 
134
 
 
135
        t_a = t.clone('a')
 
136
        t_b = t_a.clone('b')
 
137
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
138
 
 
139
    def test_external_url(self):
 
140
        """.external_url either works or raises InProcessTransport."""
 
141
        t = self.get_transport()
102
142
        try:
103
 
            list(func(*args, **kwargs))
104
 
        except excClass:
105
 
            return
106
 
        else:
107
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
108
 
            else: excName = str(excClass)
109
 
            raise self.failureException, "%s not raised" % excName
 
143
            t.external_url()
 
144
        except errors.InProcessTransport:
 
145
            pass
110
146
 
111
147
    def test_has(self):
112
148
        t = self.get_transport()
116
152
        self.assertEqual(True, t.has('a'))
117
153
        self.assertEqual(False, t.has('c'))
118
154
        self.assertEqual(True, t.has(urlutils.escape('%')))
119
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
120
 
                [True, True, False, False, True, False, True, False])
 
155
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
156
                                           'e', 'f', 'g', 'h'])),
 
157
                         [True, True, False, False,
 
158
                          True, False, True, False])
121
159
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
122
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
123
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
124
 
                [True, True, False, False, True, False, True, False])
 
160
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
161
                                           urlutils.escape('%%')]))
 
162
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
163
                                                'e', 'f', 'g', 'h']))),
 
164
                         [True, True, False, False,
 
165
                          True, False, True, False])
125
166
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
126
167
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
127
168
 
 
169
    def test_has_root_works(self):
 
170
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
171
            raise TestNotApplicable(
 
172
                "SmartTCPServer_for_testing intentionally does not allow "
 
173
                "access to /.")
 
174
        current_transport = self.get_transport()
 
175
        self.assertTrue(current_transport.has('/'))
 
176
        root = current_transport.clone('/')
 
177
        self.assertTrue(root.has(''))
 
178
 
128
179
    def test_get(self):
129
180
        t = self.get_transport()
130
181
 
137
188
        self.build_tree(files, transport=t, line_endings='binary')
138
189
        self.check_transport_contents('contents of a\n', t, 'a')
139
190
        content_f = t.get_multi(files)
140
 
        for content, f in zip(contents, content_f):
 
191
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
192
        # evaluate their inputs, the transport requests should be issued and
 
193
        # handled sequentially (we don't want to force transport to buffer).
 
194
        for content, f in itertools.izip(contents, content_f):
141
195
            self.assertEqual(content, f.read())
142
196
 
143
197
        content_f = t.get_multi(iter(files))
144
 
        for content, f in zip(contents, content_f):
 
198
        # Use itertools.izip() for the same reason
 
199
        for content, f in itertools.izip(contents, content_f):
145
200
            self.assertEqual(content, f.read())
146
201
 
 
202
    def test_get_unknown_file(self):
 
203
        t = self.get_transport()
 
204
        files = ['a', 'b']
 
205
        contents = ['contents of a\n',
 
206
                    'contents of b\n',
 
207
                    ]
 
208
        self.build_tree(files, transport=t, line_endings='binary')
147
209
        self.assertRaises(NoSuchFile, t.get, 'c')
148
210
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
149
211
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
150
212
 
151
 
    def test_put(self):
152
 
        t = self.get_transport()
153
 
 
154
 
        if t.is_readonly():
155
 
            self.assertRaises(TransportNotPossible,
156
 
                    t.put, 'a', 'some text for a\n')
157
 
            return
158
 
 
159
 
        t.put('a', StringIO('some text for a\n'))
160
 
        self.failUnless(t.has('a'))
161
 
        self.check_transport_contents('some text for a\n', t, 'a')
162
 
        # Make sure 'has' is updated
163
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
164
 
                [True, False, False, False, False])
165
 
        # Put also replaces contents
166
 
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
167
 
                                      ('d', StringIO('contents\nfor d\n'))]),
168
 
                         2)
169
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
170
 
                [True, False, False, True, False])
171
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
172
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
173
 
 
174
 
        self.assertEqual(
175
 
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
176
 
                              ('d', StringIO('another contents\nfor d\n'))])),
177
 
                        2)
178
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
179
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
180
 
 
181
 
        self.assertRaises(NoSuchFile,
182
 
                          t.put, 'path/doesnt/exist/c', 'contents')
183
 
 
184
 
    def test_put_permissions(self):
185
 
        t = self.get_transport()
186
 
 
187
 
        if t.is_readonly():
188
 
            return
189
 
        if not t._can_roundtrip_unix_modebits():
190
 
            # Can't roundtrip, so no need to run this test
191
 
            return
192
 
        t.put('mode644', StringIO('test text\n'), mode=0644)
193
 
        self.assertTransportMode(t, 'mode644', 0644)
194
 
        t.put('mode666', StringIO('test text\n'), mode=0666)
195
 
        self.assertTransportMode(t, 'mode666', 0666)
196
 
        t.put('mode600', StringIO('test text\n'), mode=0600)
 
213
    def test_get_directory_read_gives_ReadError(self):
 
214
        """consistent errors for read() on a file returned by get()."""
 
215
        t = self.get_transport()
 
216
        if t.is_readonly():
 
217
            self.build_tree(['a directory/'])
 
218
        else:
 
219
            t.mkdir('a%20directory')
 
220
        # getting the file must either work or fail with a PathError
 
221
        try:
 
222
            a_file = t.get('a%20directory')
 
223
        except (errors.PathError, errors.RedirectRequested):
 
224
            # early failure return immediately.
 
225
            return
 
226
        # having got a file, read() must either work (i.e. http reading a dir
 
227
        # listing) or fail with ReadError
 
228
        try:
 
229
            a_file.read()
 
230
        except errors.ReadError:
 
231
            pass
 
232
 
 
233
    def test_get_bytes(self):
 
234
        t = self.get_transport()
 
235
 
 
236
        files = ['a', 'b', 'e', 'g']
 
237
        contents = ['contents of a\n',
 
238
                    'contents of b\n',
 
239
                    'contents of e\n',
 
240
                    'contents of g\n',
 
241
                    ]
 
242
        self.build_tree(files, transport=t, line_endings='binary')
 
243
        self.check_transport_contents('contents of a\n', t, 'a')
 
244
 
 
245
        for content, fname in zip(contents, files):
 
246
            self.assertEqual(content, t.get_bytes(fname))
 
247
 
 
248
    def test_get_bytes_unknown_file(self):
 
249
        t = self.get_transport()
 
250
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
251
 
 
252
    def test_get_with_open_write_stream_sees_all_content(self):
 
253
        t = self.get_transport()
 
254
        if t.is_readonly():
 
255
            return
 
256
        handle = t.open_write_stream('foo')
 
257
        try:
 
258
            handle.write('b')
 
259
            self.assertEqual('b', t.get('foo').read())
 
260
        finally:
 
261
            handle.close()
 
262
 
 
263
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
            self.assertEqual('b', t.get('foo').read())
 
272
        finally:
 
273
            handle.close()
 
274
 
 
275
    def test_put_bytes(self):
 
276
        t = self.get_transport()
 
277
 
 
278
        if t.is_readonly():
 
279
            self.assertRaises(TransportNotPossible,
 
280
                    t.put_bytes, 'a', 'some text for a\n')
 
281
            return
 
282
 
 
283
        t.put_bytes('a', 'some text for a\n')
 
284
        self.failUnless(t.has('a'))
 
285
        self.check_transport_contents('some text for a\n', t, 'a')
 
286
 
 
287
        # The contents should be overwritten
 
288
        t.put_bytes('a', 'new text for a\n')
 
289
        self.check_transport_contents('new text for a\n', t, 'a')
 
290
 
 
291
        self.assertRaises(NoSuchFile,
 
292
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
293
 
 
294
    def test_put_bytes_non_atomic(self):
 
295
        t = self.get_transport()
 
296
 
 
297
        if t.is_readonly():
 
298
            self.assertRaises(TransportNotPossible,
 
299
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
300
            return
 
301
 
 
302
        self.failIf(t.has('a'))
 
303
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
304
        self.failUnless(t.has('a'))
 
305
        self.check_transport_contents('some text for a\n', t, 'a')
 
306
        # Put also replaces contents
 
307
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
308
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
309
 
 
310
        # Make sure we can create another file
 
311
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
312
        # And overwrite 'a' with empty contents
 
313
        t.put_bytes_non_atomic('a', '')
 
314
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
315
        self.check_transport_contents('', t, 'a')
 
316
 
 
317
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
318
                                       'contents\n')
 
319
        # Now test the create_parent flag
 
320
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
321
                                       'contents\n')
 
322
        self.failIf(t.has('dir/a'))
 
323
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
324
                               create_parent_dir=True)
 
325
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
326
 
 
327
        # But we still get NoSuchFile if we can't make the parent dir
 
328
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
329
                                       'contents\n',
 
330
                                       create_parent_dir=True)
 
331
 
 
332
    def test_put_bytes_permissions(self):
 
333
        t = self.get_transport()
 
334
 
 
335
        if t.is_readonly():
 
336
            return
 
337
        if not t._can_roundtrip_unix_modebits():
 
338
            # Can't roundtrip, so no need to run this test
 
339
            return
 
340
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
341
        self.assertTransportMode(t, 'mode644', 0644)
 
342
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
343
        self.assertTransportMode(t, 'mode666', 0666)
 
344
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
345
        self.assertTransportMode(t, 'mode600', 0600)
 
346
        # Yes, you can put_bytes a file such that it becomes readonly
 
347
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
348
        self.assertTransportMode(t, 'mode400', 0400)
 
349
 
 
350
        # The default permissions should be based on the current umask
 
351
        umask = osutils.get_umask()
 
352
        t.put_bytes('nomode', 'test text\n', mode=None)
 
353
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
354
 
 
355
    def test_put_bytes_non_atomic_permissions(self):
 
356
        t = self.get_transport()
 
357
 
 
358
        if t.is_readonly():
 
359
            return
 
360
        if not t._can_roundtrip_unix_modebits():
 
361
            # Can't roundtrip, so no need to run this test
 
362
            return
 
363
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
364
        self.assertTransportMode(t, 'mode644', 0644)
 
365
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
366
        self.assertTransportMode(t, 'mode666', 0666)
 
367
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
368
        self.assertTransportMode(t, 'mode600', 0600)
 
369
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
370
        self.assertTransportMode(t, 'mode400', 0400)
 
371
 
 
372
        # The default permissions should be based on the current umask
 
373
        umask = osutils.get_umask()
 
374
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
375
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
376
 
 
377
        # We should also be able to set the mode for a parent directory
 
378
        # when it is created
 
379
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
380
                               dir_mode=0700, create_parent_dir=True)
 
381
        self.assertTransportMode(t, 'dir700', 0700)
 
382
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
383
                               dir_mode=0770, create_parent_dir=True)
 
384
        self.assertTransportMode(t, 'dir770', 0770)
 
385
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
386
                               dir_mode=0777, create_parent_dir=True)
 
387
        self.assertTransportMode(t, 'dir777', 0777)
 
388
 
 
389
    def test_put_file(self):
 
390
        t = self.get_transport()
 
391
 
 
392
        if t.is_readonly():
 
393
            self.assertRaises(TransportNotPossible,
 
394
                    t.put_file, 'a', StringIO('some text for a\n'))
 
395
            return
 
396
 
 
397
        result = t.put_file('a', StringIO('some text for a\n'))
 
398
        # put_file returns the length of the data written
 
399
        self.assertEqual(16, result)
 
400
        self.failUnless(t.has('a'))
 
401
        self.check_transport_contents('some text for a\n', t, 'a')
 
402
        # Put also replaces contents
 
403
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
404
        self.assertEqual(19, result)
 
405
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
406
        self.assertRaises(NoSuchFile,
 
407
                          t.put_file, 'path/doesnt/exist/c',
 
408
                              StringIO('contents'))
 
409
 
 
410
    def test_put_file_non_atomic(self):
 
411
        t = self.get_transport()
 
412
 
 
413
        if t.is_readonly():
 
414
            self.assertRaises(TransportNotPossible,
 
415
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
416
            return
 
417
 
 
418
        self.failIf(t.has('a'))
 
419
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
420
        self.failUnless(t.has('a'))
 
421
        self.check_transport_contents('some text for a\n', t, 'a')
 
422
        # Put also replaces contents
 
423
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
424
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
425
 
 
426
        # Make sure we can create another file
 
427
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
428
        # And overwrite 'a' with empty contents
 
429
        t.put_file_non_atomic('a', StringIO(''))
 
430
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
431
        self.check_transport_contents('', t, 'a')
 
432
 
 
433
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
434
                                       StringIO('contents\n'))
 
435
        # Now test the create_parent flag
 
436
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
437
                                       StringIO('contents\n'))
 
438
        self.failIf(t.has('dir/a'))
 
439
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
440
                              create_parent_dir=True)
 
441
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
442
 
 
443
        # But we still get NoSuchFile if we can't make the parent dir
 
444
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
445
                                       StringIO('contents\n'),
 
446
                                       create_parent_dir=True)
 
447
 
 
448
    def test_put_file_permissions(self):
 
449
 
 
450
        t = self.get_transport()
 
451
 
 
452
        if t.is_readonly():
 
453
            return
 
454
        if not t._can_roundtrip_unix_modebits():
 
455
            # Can't roundtrip, so no need to run this test
 
456
            return
 
457
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
458
        self.assertTransportMode(t, 'mode644', 0644)
 
459
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
460
        self.assertTransportMode(t, 'mode666', 0666)
 
461
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
197
462
        self.assertTransportMode(t, 'mode600', 0600)
198
463
        # Yes, you can put a file such that it becomes readonly
199
 
        t.put('mode400', StringIO('test text\n'), mode=0400)
200
 
        self.assertTransportMode(t, 'mode400', 0400)
201
 
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
202
 
        self.assertTransportMode(t, 'mmode644', 0644)
203
 
        
 
464
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
465
        self.assertTransportMode(t, 'mode400', 0400)
 
466
        # The default permissions should be based on the current umask
 
467
        umask = osutils.get_umask()
 
468
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
469
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
470
 
 
471
    def test_put_file_non_atomic_permissions(self):
 
472
        t = self.get_transport()
 
473
 
 
474
        if t.is_readonly():
 
475
            return
 
476
        if not t._can_roundtrip_unix_modebits():
 
477
            # Can't roundtrip, so no need to run this test
 
478
            return
 
479
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
480
        self.assertTransportMode(t, 'mode644', 0644)
 
481
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
482
        self.assertTransportMode(t, 'mode666', 0666)
 
483
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
484
        self.assertTransportMode(t, 'mode600', 0600)
 
485
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
486
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
487
        self.assertTransportMode(t, 'mode400', 0400)
 
488
 
 
489
        # The default permissions should be based on the current umask
 
490
        umask = osutils.get_umask()
 
491
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
492
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
493
 
 
494
        # We should also be able to set the mode for a parent directory
 
495
        # when it is created
 
496
        sio = StringIO()
 
497
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
498
                              dir_mode=0700, create_parent_dir=True)
 
499
        self.assertTransportMode(t, 'dir700', 0700)
 
500
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
501
                              dir_mode=0770, create_parent_dir=True)
 
502
        self.assertTransportMode(t, 'dir770', 0770)
 
503
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
504
                              dir_mode=0777, create_parent_dir=True)
 
505
        self.assertTransportMode(t, 'dir777', 0777)
 
506
 
 
507
    def test_put_bytes_unicode(self):
 
508
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
509
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
510
        # (we don't want to encode unicode here at all, callers should be
 
511
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
512
        # compatibility.  At some point we should use a specific exception.
 
513
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
514
        t = self.get_transport()
 
515
        if t.is_readonly():
 
516
            return
 
517
        unicode_string = u'\u1234'
 
518
        self.assertRaises(
 
519
            (AssertionError, UnicodeEncodeError),
 
520
            t.put_bytes, 'foo', unicode_string)
 
521
 
 
522
    def test_put_file_unicode(self):
 
523
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
524
        # This situation can happen (and has) if code is careless about the type
 
525
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
526
        # cStringIO, because it never returns unicode from read.
 
527
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
528
        # raise, but we raise it for hysterical raisins.
 
529
        t = self.get_transport()
 
530
        if t.is_readonly():
 
531
            return
 
532
        unicode_file = pyStringIO(u'\u1234')
 
533
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
534
 
204
535
    def test_mkdir(self):
205
536
        t = self.get_transport()
206
537
 
207
538
        if t.is_readonly():
208
 
            # cannot mkdir on readonly transports. We're not testing for 
 
539
            # cannot mkdir on readonly transports. We're not testing for
209
540
            # cache coherency because cache behaviour is not currently
210
541
            # defined for the transport interface.
211
542
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
232
563
 
233
564
        # we were testing that a local mkdir followed by a transport
234
565
        # mkdir failed thusly, but given that we * in one process * do not
235
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
566
        # concurrently fiddle with disk dirs and then use transport to do
236
567
        # things, the win here seems marginal compared to the constraint on
237
568
        # the interface. RBC 20051227
238
569
        t.mkdir('dir_g')
239
570
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
240
571
 
241
572
        # Test get/put in sub-directories
242
 
        self.assertEqual(2, 
243
 
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
244
 
                         ('dir_b/b', StringIO('contents of dir_b/b'))]))
 
573
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
574
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
245
575
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
246
576
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
247
577
 
264
594
        self.assertTransportMode(t, 'dmode777', 0777)
265
595
        t.mkdir('dmode700', mode=0700)
266
596
        self.assertTransportMode(t, 'dmode700', 0700)
267
 
        # TODO: jam 20051215 test mkdir_multi with a mode
268
597
        t.mkdir_multi(['mdmode755'], mode=0755)
269
598
        self.assertTransportMode(t, 'mdmode755', 0755)
270
599
 
 
600
        # Default mode should be based on umask
 
601
        umask = osutils.get_umask()
 
602
        t.mkdir('dnomode', mode=None)
 
603
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
604
 
 
605
    def test_opening_a_file_stream_creates_file(self):
 
606
        t = self.get_transport()
 
607
        if t.is_readonly():
 
608
            return
 
609
        handle = t.open_write_stream('foo')
 
610
        try:
 
611
            self.assertEqual('', t.get_bytes('foo'))
 
612
        finally:
 
613
            handle.close()
 
614
 
 
615
    def test_opening_a_file_stream_can_set_mode(self):
 
616
        t = self.get_transport()
 
617
        if t.is_readonly():
 
618
            return
 
619
        if not t._can_roundtrip_unix_modebits():
 
620
            # Can't roundtrip, so no need to run this test
 
621
            return
 
622
        def check_mode(name, mode, expected):
 
623
            handle = t.open_write_stream(name, mode=mode)
 
624
            handle.close()
 
625
            self.assertTransportMode(t, name, expected)
 
626
        check_mode('mode644', 0644, 0644)
 
627
        check_mode('mode666', 0666, 0666)
 
628
        check_mode('mode600', 0600, 0600)
 
629
        # The default permissions should be based on the current umask
 
630
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
631
 
271
632
    def test_copy_to(self):
272
633
        # FIXME: test:   same server to same server (partly done)
273
634
        # same protocol two servers
274
635
        # and    different protocols (done for now except for MemoryTransport.
275
636
        # - RBC 20060122
276
 
        from bzrlib.transport.memory import MemoryTransport
277
637
 
278
638
        def simple_copy_files(transport_from, transport_to):
279
639
            files = ['a', 'b', 'c', 'd']
298
658
            self.build_tree(['e/', 'e/f'])
299
659
        else:
300
660
            t.mkdir('e')
301
 
            t.put('e/f', StringIO('contents of e'))
 
661
            t.put_bytes('e/f', 'contents of e')
302
662
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
303
663
        temp_transport.mkdir('e')
304
664
        t.copy_to(['e/f'], temp_transport)
319
679
            for f in files:
320
680
                self.assertTransportMode(temp_transport, f, mode)
321
681
 
322
 
    def test_append(self):
323
 
        t = self.get_transport()
324
 
 
325
 
        if t.is_readonly():
326
 
            open('a', 'wb').write('diff\ncontents for\na\n')
327
 
            open('b', 'wb').write('contents\nfor b\n')
328
 
        else:
329
 
            t.put_multi([
330
 
                    ('a', StringIO('diff\ncontents for\na\n')),
331
 
                    ('b', StringIO('contents\nfor b\n'))
332
 
                    ])
333
 
 
334
 
        if t.is_readonly():
335
 
            self.assertRaises(TransportNotPossible,
336
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
337
 
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
338
 
        else:
339
 
            self.assertEqual(20,
340
 
                t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
341
 
 
342
 
        self.check_transport_contents(
343
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
344
 
            t, 'a')
345
 
 
346
 
        if t.is_readonly():
347
 
            self.assertRaises(TransportNotPossible,
348
 
                    t.append_multi,
349
 
                        [('a', 'and\nthen\nsome\nmore\n'),
350
 
                         ('b', 'some\nmore\nfor\nb\n')])
351
 
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
352
 
            _append('b', StringIO('some\nmore\nfor\nb\n'))
353
 
        else:
354
 
            self.assertEqual((43, 15), 
355
 
                t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
356
 
                                ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
682
    def test_create_prefix(self):
 
683
        t = self.get_transport()
 
684
        sub = t.clone('foo').clone('bar')
 
685
        try:
 
686
            sub.create_prefix()
 
687
        except TransportNotPossible:
 
688
            self.assertTrue(t.is_readonly())
 
689
        else:
 
690
            self.assertTrue(t.has('foo/bar'))
 
691
 
 
692
    def test_append_file(self):
 
693
        t = self.get_transport()
 
694
 
 
695
        if t.is_readonly():
 
696
            self.assertRaises(TransportNotPossible,
 
697
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
698
            return
 
699
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
700
        t.put_bytes('b', 'contents\nfor b\n')
 
701
 
 
702
        self.assertEqual(20,
 
703
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
704
 
 
705
        self.check_transport_contents(
 
706
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
707
            t, 'a')
 
708
 
 
709
        # a file with no parent should fail..
 
710
        self.assertRaises(NoSuchFile,
 
711
                          t.append_file, 'missing/path', StringIO('content'))
 
712
 
 
713
        # And we can create new files, too
 
714
        self.assertEqual(0,
 
715
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
716
        self.check_transport_contents('some text\nfor a missing file\n',
 
717
                                      t, 'c')
 
718
 
 
719
    def test_append_bytes(self):
 
720
        t = self.get_transport()
 
721
 
 
722
        if t.is_readonly():
 
723
            self.assertRaises(TransportNotPossible,
 
724
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
725
            return
 
726
 
 
727
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
728
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
729
 
 
730
        self.assertEqual(20,
 
731
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
732
 
 
733
        self.check_transport_contents(
 
734
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
735
            t, 'a')
 
736
 
 
737
        # a file with no parent should fail..
 
738
        self.assertRaises(NoSuchFile,
 
739
                          t.append_bytes, 'missing/path', 'content')
 
740
 
 
741
    def test_append_multi(self):
 
742
        t = self.get_transport()
 
743
 
 
744
        if t.is_readonly():
 
745
            return
 
746
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
747
                         'add\nsome\nmore\ncontents\n')
 
748
        t.put_bytes('b', 'contents\nfor b\n')
 
749
 
 
750
        self.assertEqual((43, 15),
 
751
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
752
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
753
 
357
754
        self.check_transport_contents(
358
755
            'diff\ncontents for\na\n'
359
756
            'add\nsome\nmore\ncontents\n'
364
761
                'some\nmore\nfor\nb\n',
365
762
                t, 'b')
366
763
 
367
 
        if t.is_readonly():
368
 
            _append('a', StringIO('a little bit more\n'))
369
 
            _append('b', StringIO('from an iterator\n'))
370
 
        else:
371
 
            self.assertEqual((62, 31),
372
 
                t.append_multi(iter([('a', StringIO('a little bit more\n')),
373
 
                                     ('b', StringIO('from an iterator\n'))])))
 
764
        self.assertEqual((62, 31),
 
765
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
766
                                 ('b', StringIO('from an iterator\n'))])))
374
767
        self.check_transport_contents(
375
768
            'diff\ncontents for\na\n'
376
769
            'add\nsome\nmore\ncontents\n'
383
776
                'from an iterator\n',
384
777
                t, 'b')
385
778
 
386
 
        if t.is_readonly():
387
 
            _append('c', StringIO('some text\nfor a missing file\n'))
388
 
            _append('a', StringIO('some text in a\n'))
389
 
            _append('d', StringIO('missing file r\n'))
390
 
        else:
391
 
            self.assertEqual(0,
392
 
                t.append('c', StringIO('some text\nfor a missing file\n')))
393
 
            self.assertEqual((80, 0),
394
 
                t.append_multi([('a', StringIO('some text in a\n')),
395
 
                                ('d', StringIO('missing file r\n'))]))
 
779
        self.assertEqual((80, 0),
 
780
            t.append_multi([('a', StringIO('some text in a\n')),
 
781
                            ('d', StringIO('missing file r\n'))]))
 
782
 
396
783
        self.check_transport_contents(
397
784
            'diff\ncontents for\na\n'
398
785
            'add\nsome\nmore\ncontents\n'
400
787
            'a little bit more\n'
401
788
            'some text in a\n',
402
789
            t, 'a')
403
 
        self.check_transport_contents('some text\nfor a missing file\n',
404
 
                                      t, 'c')
405
790
        self.check_transport_contents('missing file r\n', t, 'd')
406
 
        
407
 
        # a file with no parent should fail..
408
 
        if not t.is_readonly():
409
 
            self.assertRaises(NoSuchFile,
410
 
                              t.append, 'missing/path', 
411
 
                              StringIO('content'))
412
 
 
413
 
    def test_append_file(self):
414
 
        t = self.get_transport()
415
 
 
416
 
        contents = [
417
 
            ('f1', StringIO('this is a string\nand some more stuff\n')),
418
 
            ('f2', StringIO('here is some text\nand a bit more\n')),
419
 
            ('f3', StringIO('some text for the\nthird file created\n')),
420
 
            ('f4', StringIO('this is a string\nand some more stuff\n')),
421
 
            ('f5', StringIO('here is some text\nand a bit more\n')),
422
 
            ('f6', StringIO('some text for the\nthird file created\n'))
423
 
        ]
424
 
        
425
 
        if t.is_readonly():
426
 
            for f, val in contents:
427
 
                open(f, 'wb').write(val.read())
428
 
        else:
429
 
            t.put_multi(contents)
430
 
 
431
 
        a1 = StringIO('appending to\none\n')
432
 
        if t.is_readonly():
433
 
            _append('f1', a1)
434
 
        else:
435
 
            t.append('f1', a1)
436
 
 
437
 
        del a1
438
 
 
439
 
        self.check_transport_contents(
440
 
                'this is a string\nand some more stuff\n'
441
 
                'appending to\none\n',
442
 
                t, 'f1')
443
 
 
444
 
        a2 = StringIO('adding more\ntext to two\n')
445
 
        a3 = StringIO('some garbage\nto put in three\n')
446
 
 
447
 
        if t.is_readonly():
448
 
            _append('f2', a2)
449
 
            _append('f3', a3)
450
 
        else:
451
 
            t.append_multi([('f2', a2), ('f3', a3)])
452
 
 
453
 
        del a2, a3
454
 
 
455
 
        self.check_transport_contents(
456
 
                'here is some text\nand a bit more\n'
457
 
                'adding more\ntext to two\n',
458
 
                t, 'f2')
459
 
        self.check_transport_contents( 
460
 
                'some text for the\nthird file created\n'
461
 
                'some garbage\nto put in three\n',
462
 
                t, 'f3')
463
 
 
464
 
        # Test that an actual file object can be used with put
465
 
        a4 = t.get('f1')
466
 
        if t.is_readonly():
467
 
            _append('f4', a4)
468
 
        else:
469
 
            t.append('f4', a4)
470
 
 
471
 
        del a4
472
 
 
473
 
        self.check_transport_contents(
474
 
                'this is a string\nand some more stuff\n'
475
 
                'this is a string\nand some more stuff\n'
476
 
                'appending to\none\n',
477
 
                t, 'f4')
478
 
 
479
 
        a5 = t.get('f2')
480
 
        a6 = t.get('f3')
481
 
        if t.is_readonly():
482
 
            _append('f5', a5)
483
 
            _append('f6', a6)
484
 
        else:
485
 
            t.append_multi([('f5', a5), ('f6', a6)])
486
 
 
487
 
        del a5, a6
488
 
 
489
 
        self.check_transport_contents(
490
 
                'here is some text\nand a bit more\n'
491
 
                'here is some text\nand a bit more\n'
492
 
                'adding more\ntext to two\n',
493
 
                t, 'f5')
494
 
        self.check_transport_contents(
495
 
                'some text for the\nthird file created\n'
496
 
                'some text for the\nthird file created\n'
497
 
                'some garbage\nto put in three\n',
498
 
                t, 'f6')
499
 
 
500
 
        a5 = t.get('f2')
501
 
        a6 = t.get('f2')
502
 
        a7 = t.get('f3')
503
 
        if t.is_readonly():
504
 
            _append('c', a5)
505
 
            _append('a', a6)
506
 
            _append('d', a7)
507
 
        else:
508
 
            t.append('c', a5)
509
 
            t.append_multi([('a', a6), ('d', a7)])
510
 
        del a5, a6, a7
511
 
        self.check_transport_contents(t.get('f2').read(), t, 'c')
512
 
        self.check_transport_contents(t.get('f3').read(), t, 'd')
513
 
 
514
 
    def test_append_mode(self):
 
791
 
 
792
    def test_append_file_mode(self):
 
793
        """Check that append accepts a mode parameter"""
515
794
        # check append accepts a mode
516
795
        t = self.get_transport()
517
796
        if t.is_readonly():
518
 
            return
519
 
        t.append('f', StringIO('f'), mode=None)
520
 
        
 
797
            self.assertRaises(TransportNotPossible,
 
798
                t.append_file, 'f', StringIO('f'), mode=None)
 
799
            return
 
800
        t.append_file('f', StringIO('f'), mode=None)
 
801
 
 
802
    def test_append_bytes_mode(self):
 
803
        # check append_bytes accepts a mode
 
804
        t = self.get_transport()
 
805
        if t.is_readonly():
 
806
            self.assertRaises(TransportNotPossible,
 
807
                t.append_bytes, 'f', 'f', mode=None)
 
808
            return
 
809
        t.append_bytes('f', 'f', mode=None)
 
810
 
521
811
    def test_delete(self):
522
812
        # TODO: Test Transport.delete
523
813
        t = self.get_transport()
527
817
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
528
818
            return
529
819
 
530
 
        t.put('a', StringIO('a little bit of text\n'))
 
820
        t.put_bytes('a', 'a little bit of text\n')
531
821
        self.failUnless(t.has('a'))
532
822
        t.delete('a')
533
823
        self.failIf(t.has('a'))
534
824
 
535
825
        self.assertRaises(NoSuchFile, t.delete, 'a')
536
826
 
537
 
        t.put('a', StringIO('a text\n'))
538
 
        t.put('b', StringIO('b text\n'))
539
 
        t.put('c', StringIO('c text\n'))
 
827
        t.put_bytes('a', 'a text\n')
 
828
        t.put_bytes('b', 'b text\n')
 
829
        t.put_bytes('c', 'c text\n')
540
830
        self.assertEqual([True, True, True],
541
831
                list(t.has_multi(['a', 'b', 'c'])))
542
832
        t.delete_multi(['a', 'c'])
552
842
        self.assertRaises(NoSuchFile,
553
843
                t.delete_multi, iter(['a', 'b', 'c']))
554
844
 
555
 
        t.put('a', StringIO('another a text\n'))
556
 
        t.put('c', StringIO('another c text\n'))
 
845
        t.put_bytes('a', 'another a text\n')
 
846
        t.put_bytes('c', 'another c text\n')
557
847
        t.delete_multi(iter(['a', 'b', 'c']))
558
848
 
559
849
        # We should have deleted everything
562
852
        # plain "listdir".
563
853
        # self.assertEqual([], os.listdir('.'))
564
854
 
 
855
    def test_recommended_page_size(self):
 
856
        """Transports recommend a page size for partial access to files."""
 
857
        t = self.get_transport()
 
858
        self.assertIsInstance(t.recommended_page_size(), int)
 
859
 
565
860
    def test_rmdir(self):
566
861
        t = self.get_transport()
567
862
        # Not much to do with a readonly transport
571
866
        t.mkdir('adir')
572
867
        t.mkdir('adir/bdir')
573
868
        t.rmdir('adir/bdir')
574
 
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
869
        # ftp may not be able to raise NoSuchFile for lack of
 
870
        # details when failing
 
871
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
575
872
        t.rmdir('adir')
576
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
873
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
577
874
 
578
875
    def test_rmdir_not_empty(self):
579
876
        """Deleting a non-empty directory raises an exception
580
 
        
 
877
 
581
878
        sftp (and possibly others) don't give us a specific "directory not
582
879
        empty" exception -- we can just see that the operation failed.
583
880
        """
588
885
        t.mkdir('adir/bdir')
589
886
        self.assertRaises(PathError, t.rmdir, 'adir')
590
887
 
 
888
    def test_rmdir_empty_but_similar_prefix(self):
 
889
        """rmdir does not get confused by sibling paths.
 
890
 
 
891
        A naive implementation of MemoryTransport would refuse to rmdir
 
892
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
893
        uses "path.startswith(dir)" on all file paths to determine if directory
 
894
        is empty.
 
895
        """
 
896
        t = self.get_transport()
 
897
        if t.is_readonly():
 
898
            return
 
899
        t.mkdir('foo')
 
900
        t.put_bytes('foo-bar', '')
 
901
        t.mkdir('foo-baz')
 
902
        t.rmdir('foo')
 
903
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
904
        self.failUnless(t.has('foo-bar'))
 
905
 
591
906
    def test_rename_dir_succeeds(self):
592
907
        t = self.get_transport()
593
908
        if t.is_readonly():
607
922
        t.mkdir('adir/asubdir')
608
923
        t.mkdir('bdir')
609
924
        t.mkdir('bdir/bsubdir')
 
925
        # any kind of PathError would be OK, though we normally expect
 
926
        # DirectoryNotEmpty
610
927
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
611
928
        # nothing was changed so it should still be as before
612
929
        self.assertTrue(t.has('bdir/bsubdir'))
613
930
        self.assertFalse(t.has('adir/bdir'))
614
931
        self.assertFalse(t.has('adir/bsubdir'))
615
932
 
 
933
    def test_rename_across_subdirs(self):
 
934
        t = self.get_transport()
 
935
        if t.is_readonly():
 
936
            raise TestNotApplicable("transport is readonly")
 
937
        t.mkdir('a')
 
938
        t.mkdir('b')
 
939
        ta = t.clone('a')
 
940
        tb = t.clone('b')
 
941
        ta.put_bytes('f', 'aoeu')
 
942
        ta.rename('f', '../b/f')
 
943
        self.assertTrue(tb.has('f'))
 
944
        self.assertFalse(ta.has('f'))
 
945
        self.assertTrue(t.has('b/f'))
 
946
 
616
947
    def test_delete_tree(self):
617
948
        t = self.get_transport()
618
949
 
628
959
        except TransportNotPossible:
629
960
            # ok, this transport does not support delete_tree
630
961
            return
631
 
        
 
962
 
632
963
        # did it delete that trivial case?
633
964
        self.assertRaises(NoSuchFile, t.stat, 'adir')
634
965
 
635
966
        self.build_tree(['adir/',
636
 
                         'adir/file', 
637
 
                         'adir/subdir/', 
638
 
                         'adir/subdir/file', 
 
967
                         'adir/file',
 
968
                         'adir/subdir/',
 
969
                         'adir/subdir/file',
639
970
                         'adir/subdir2/',
640
971
                         'adir/subdir2/file',
641
972
                         ], transport=t)
655
986
        # creates control files in the working directory
656
987
        # perhaps all of this could be done in a subdirectory
657
988
 
658
 
        t.put('a', StringIO('a first file\n'))
 
989
        t.put_bytes('a', 'a first file\n')
659
990
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
660
991
 
661
992
        t.move('a', 'b')
666
997
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
667
998
 
668
999
        # Overwrite a file
669
 
        t.put('c', StringIO('c this file\n'))
 
1000
        t.put_bytes('c', 'c this file\n')
670
1001
        t.move('c', 'b')
671
1002
        self.failIf(t.has('c'))
672
1003
        self.check_transport_contents('c this file\n', t, 'b')
673
1004
 
674
1005
        # TODO: Try to write a test for atomicity
675
 
        # TODO: Test moving into a non-existant subdirectory
 
1006
        # TODO: Test moving into a non-existent subdirectory
676
1007
        # TODO: Test Transport.move_multi
677
1008
 
678
1009
    def test_copy(self):
681
1012
        if t.is_readonly():
682
1013
            return
683
1014
 
684
 
        t.put('a', StringIO('a file\n'))
 
1015
        t.put_bytes('a', 'a file\n')
685
1016
        t.copy('a', 'b')
686
1017
        self.check_transport_contents('a file\n', t, 'b')
687
1018
 
690
1021
        # What should the assert be if you try to copy a
691
1022
        # file over a directory?
692
1023
        #self.assertRaises(Something, t.copy, 'a', 'c')
693
 
        t.put('d', StringIO('text in d\n'))
 
1024
        t.put_bytes('d', 'text in d\n')
694
1025
        t.copy('d', 'b')
695
1026
        self.check_transport_contents('text in d\n', t, 'b')
696
1027
 
697
1028
        # TODO: test copy_multi
698
1029
 
699
1030
    def test_connection_error(self):
700
 
        """ConnectionError is raised when connection is impossible"""
 
1031
        """ConnectionError is raised when connection is impossible.
 
1032
 
 
1033
        The error should be raised from the first operation on the transport.
 
1034
        """
701
1035
        try:
702
1036
            url = self._server.get_bogus_url()
703
1037
        except NotImplementedError:
704
1038
            raise TestSkipped("Transport %s has no bogus URL support." %
705
1039
                              self._server.__class__)
706
 
        try:
707
 
            t = bzrlib.transport.get_transport(url)
708
 
            t.get('.bzr/branch')
709
 
        except (ConnectionError, NoSuchFile), e:
710
 
            pass
711
 
        except (Exception), e:
712
 
            self.fail('Wrong exception thrown (%s): %s' 
713
 
                        % (e.__class__.__name__, e))
714
 
        else:
715
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1040
        t = get_transport(url)
 
1041
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
716
1042
 
717
1043
    def test_stat(self):
718
1044
        # TODO: Test stat, just try once, and if it throws, stop testing
727
1053
            return
728
1054
 
729
1055
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
730
 
        sizes = [14, 0, 16, 0, 18] 
 
1056
        sizes = [14, 0, 16, 0, 18]
731
1057
        self.build_tree(paths, transport=t, line_endings='binary')
732
1058
 
733
1059
        for path, size in zip(paths, sizes):
752
1078
        subdir.stat('./file')
753
1079
        subdir.stat('.')
754
1080
 
 
1081
    def test_hardlink(self):
 
1082
        from stat import ST_NLINK
 
1083
 
 
1084
        t = self.get_transport()
 
1085
 
 
1086
        source_name = "original_target"
 
1087
        link_name = "target_link"
 
1088
 
 
1089
        self.build_tree([source_name], transport=t)
 
1090
 
 
1091
        try:
 
1092
            t.hardlink(source_name, link_name)
 
1093
 
 
1094
            self.failUnless(t.has(source_name))
 
1095
            self.failUnless(t.has(link_name))
 
1096
 
 
1097
            st = t.stat(link_name)
 
1098
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1099
        except TransportNotPossible:
 
1100
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1101
                              self._server.__class__)
 
1102
 
 
1103
    def test_symlink(self):
 
1104
        from stat import S_ISLNK
 
1105
 
 
1106
        t = self.get_transport()
 
1107
 
 
1108
        source_name = "original_target"
 
1109
        link_name = "target_link"
 
1110
 
 
1111
        self.build_tree([source_name], transport=t)
 
1112
 
 
1113
        try:
 
1114
            t.symlink(source_name, link_name)
 
1115
 
 
1116
            self.failUnless(t.has(source_name))
 
1117
            self.failUnless(t.has(link_name))
 
1118
 
 
1119
            st = t.stat(link_name)
 
1120
            self.failUnless(S_ISLNK(st.st_mode),
 
1121
                "expected symlink, got mode %o" % st.st_mode)
 
1122
        except TransportNotPossible:
 
1123
            raise TestSkipped("Transport %s does not support symlinks." %
 
1124
                              self._server.__class__)
 
1125
        except IOError:
 
1126
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1127
 
755
1128
    def test_list_dir(self):
756
1129
        # TODO: Test list_dir, just try once, and if it throws, stop testing
757
1130
        t = self.get_transport()
758
 
        
 
1131
 
759
1132
        if not t.listable():
760
1133
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
761
1134
            return
762
1135
 
763
 
        def sorted_list(d):
764
 
            l = list(t.list_dir(d))
 
1136
        def sorted_list(d, transport):
 
1137
            l = list(transport.list_dir(d))
765
1138
            l.sort()
766
1139
            return l
767
1140
 
768
 
        # SftpServer creates control files in the working directory
769
 
        # so lets move down a directory to avoid those.
770
 
        if not t.is_readonly():
771
 
            t.mkdir('wd')
772
 
        else:
773
 
            os.mkdir('wd')
774
 
        t = t.clone('wd')
775
 
 
776
 
        self.assertEqual([], sorted_list(u'.'))
 
1141
        self.assertEqual([], sorted_list('.', t))
777
1142
        # c2 is precisely one letter longer than c here to test that
778
1143
        # suffixing is not confused.
 
1144
        # a%25b checks that quoting is done consistently across transports
 
1145
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1146
 
779
1147
        if not t.is_readonly():
780
 
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
1148
            self.build_tree(tree_names, transport=t)
781
1149
        else:
782
 
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
783
 
 
784
 
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
785
 
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
1150
            self.build_tree(tree_names)
 
1151
 
 
1152
        self.assertEqual(
 
1153
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1154
        self.assertEqual(
 
1155
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1156
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1157
 
 
1158
        # Cloning the transport produces an equivalent listing
 
1159
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
786
1160
 
787
1161
        if not t.is_readonly():
788
1162
            t.delete('c/d')
789
1163
            t.delete('b')
790
1164
        else:
791
 
            os.unlink('wd/c/d')
792
 
            os.unlink('wd/b')
793
 
            
794
 
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
795
 
        self.assertEqual([u'e'], sorted_list(u'c'))
 
1165
            os.unlink('c/d')
 
1166
            os.unlink('b')
 
1167
 
 
1168
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1169
        self.assertEqual(['e'], sorted_list('c', t))
796
1170
 
797
1171
        self.assertListRaises(PathError, t.list_dir, 'q')
798
1172
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1173
        # 'a' is a file, list_dir should raise an error
799
1174
        self.assertListRaises(PathError, t.list_dir, 'a')
800
1175
 
 
1176
    def test_list_dir_result_is_url_escaped(self):
 
1177
        t = self.get_transport()
 
1178
        if not t.listable():
 
1179
            raise TestSkipped("transport not listable")
 
1180
 
 
1181
        if not t.is_readonly():
 
1182
            self.build_tree(['a/', 'a/%'], transport=t)
 
1183
        else:
 
1184
            self.build_tree(['a/', 'a/%'])
 
1185
 
 
1186
        names = list(t.list_dir('a'))
 
1187
        self.assertEqual(['%25'], names)
 
1188
        self.assertIsInstance(names[0], str)
 
1189
 
 
1190
    def test_clone_preserve_info(self):
 
1191
        t1 = self.get_transport()
 
1192
        if not isinstance(t1, ConnectedTransport):
 
1193
            raise TestSkipped("not a connected transport")
 
1194
 
 
1195
        t2 = t1.clone('subdir')
 
1196
        self.assertEquals(t1._scheme, t2._scheme)
 
1197
        self.assertEquals(t1._user, t2._user)
 
1198
        self.assertEquals(t1._password, t2._password)
 
1199
        self.assertEquals(t1._host, t2._host)
 
1200
        self.assertEquals(t1._port, t2._port)
 
1201
 
 
1202
    def test__reuse_for(self):
 
1203
        t = self.get_transport()
 
1204
        if not isinstance(t, ConnectedTransport):
 
1205
            raise TestSkipped("not a connected transport")
 
1206
 
 
1207
        def new_url(scheme=None, user=None, password=None,
 
1208
                    host=None, port=None, path=None):
 
1209
            """Build a new url from t.base changing only parts of it.
 
1210
 
 
1211
            Only the parameters different from None will be changed.
 
1212
            """
 
1213
            if scheme   is None: scheme   = t._scheme
 
1214
            if user     is None: user     = t._user
 
1215
            if password is None: password = t._password
 
1216
            if user     is None: user     = t._user
 
1217
            if host     is None: host     = t._host
 
1218
            if port     is None: port     = t._port
 
1219
            if path     is None: path     = t._path
 
1220
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1221
 
 
1222
        if t._scheme == 'ftp':
 
1223
            scheme = 'sftp'
 
1224
        else:
 
1225
            scheme = 'ftp'
 
1226
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1227
        if t._user == 'me':
 
1228
            user = 'you'
 
1229
        else:
 
1230
            user = 'me'
 
1231
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1232
        # passwords are not taken into account because:
 
1233
        # - it makes no sense to have two different valid passwords for the
 
1234
        #   same user
 
1235
        # - _password in ConnectedTransport is intended to collect what the
 
1236
        #   user specified from the command-line and there are cases where the
 
1237
        #   new url can contain no password (if the url was built from an
 
1238
        #   existing transport.base for example)
 
1239
        # - password are considered part of the credentials provided at
 
1240
        #   connection creation time and as such may not be present in the url
 
1241
        #   (they may be typed by the user when prompted for example)
 
1242
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1243
        # We will not connect, we can use a invalid host
 
1244
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1245
        if t._port == 1234:
 
1246
            port = 4321
 
1247
        else:
 
1248
            port = 1234
 
1249
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1250
        # No point in trying to reuse a transport for a local URL
 
1251
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1252
 
 
1253
    def test_connection_sharing(self):
 
1254
        t = self.get_transport()
 
1255
        if not isinstance(t, ConnectedTransport):
 
1256
            raise TestSkipped("not a connected transport")
 
1257
 
 
1258
        c = t.clone('subdir')
 
1259
        # Some transports will create the connection  only when needed
 
1260
        t.has('surely_not') # Force connection
 
1261
        self.assertIs(t._get_connection(), c._get_connection())
 
1262
 
 
1263
        # Temporary failure, we need to create a new dummy connection
 
1264
        new_connection = None
 
1265
        t._set_connection(new_connection)
 
1266
        # Check that both transports use the same connection
 
1267
        self.assertIs(new_connection, t._get_connection())
 
1268
        self.assertIs(new_connection, c._get_connection())
 
1269
 
 
1270
    def test_reuse_connection_for_various_paths(self):
 
1271
        t = self.get_transport()
 
1272
        if not isinstance(t, ConnectedTransport):
 
1273
            raise TestSkipped("not a connected transport")
 
1274
 
 
1275
        t.has('surely_not') # Force connection
 
1276
        self.assertIsNot(None, t._get_connection())
 
1277
 
 
1278
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1279
        self.assertIsNot(t, subdir)
 
1280
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1281
 
 
1282
        home = subdir._reuse_for(t.base + 'home')
 
1283
        self.assertIs(t._get_connection(), home._get_connection())
 
1284
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1285
 
801
1286
    def test_clone(self):
802
1287
        # TODO: Test that clone moves up and down the filesystem
803
1288
        t1 = self.get_transport()
823
1308
        self.failIf(t3.has('b/d'))
824
1309
 
825
1310
        if t1.is_readonly():
826
 
            open('b/d', 'wb').write('newfile\n')
 
1311
            self.build_tree_contents([('b/d', 'newfile\n')])
827
1312
        else:
828
 
            t2.put('d', StringIO('newfile\n'))
 
1313
            t2.put_bytes('d', 'newfile\n')
829
1314
 
830
1315
        self.failUnless(t1.has('b/d'))
831
1316
        self.failUnless(t2.has('d'))
832
1317
        self.failUnless(t3.has('b/d'))
833
1318
 
 
1319
    def test_clone_to_root(self):
 
1320
        orig_transport = self.get_transport()
 
1321
        # Repeatedly go up to a parent directory until we're at the root
 
1322
        # directory of this transport
 
1323
        root_transport = orig_transport
 
1324
        new_transport = root_transport.clone("..")
 
1325
        # as we are walking up directories, the path must be
 
1326
        # growing less, except at the top
 
1327
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1328
            or new_transport.base == root_transport.base)
 
1329
        while new_transport.base != root_transport.base:
 
1330
            root_transport = new_transport
 
1331
            new_transport = root_transport.clone("..")
 
1332
            # as we are walking up directories, the path must be
 
1333
            # growing less, except at the top
 
1334
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1335
                or new_transport.base == root_transport.base)
 
1336
 
 
1337
        # Cloning to "/" should take us to exactly the same location.
 
1338
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1339
        # the abspath of "/" from the original transport should be the same
 
1340
        # as the base at the root:
 
1341
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1342
 
 
1343
        # At the root, the URL must still end with / as its a directory
 
1344
        self.assertEqual(root_transport.base[-1], '/')
 
1345
 
 
1346
    def test_clone_from_root(self):
 
1347
        """At the root, cloning to a simple dir should just do string append."""
 
1348
        orig_transport = self.get_transport()
 
1349
        root_transport = orig_transport.clone('/')
 
1350
        self.assertEqual(root_transport.base + '.bzr/',
 
1351
            root_transport.clone('.bzr').base)
 
1352
 
 
1353
    def test_base_url(self):
 
1354
        t = self.get_transport()
 
1355
        self.assertEqual('/', t.base[-1])
 
1356
 
834
1357
    def test_relpath(self):
835
1358
        t = self.get_transport()
836
1359
        self.assertEqual('', t.relpath(t.base))
837
1360
        # base ends with /
838
1361
        self.assertEqual('', t.relpath(t.base[:-1]))
839
 
        # subdirs which dont exist should still give relpaths.
 
1362
        # subdirs which don't exist should still give relpaths.
840
1363
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
841
1364
        # trailing slash should be the same.
842
1365
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
858
1381
        # that have aliasing problems like symlinks should go in backend
859
1382
        # specific test cases.
860
1383
        transport = self.get_transport()
861
 
        
862
 
        # disabled because some transports might normalize urls in generating
863
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
 
1384
 
864
1385
        self.assertEqual(transport.base + 'relpath',
865
1386
                         transport.abspath('relpath'))
866
1387
 
 
1388
        # This should work without raising an error.
 
1389
        transport.abspath("/")
 
1390
 
 
1391
        # the abspath of "/" and "/foo/.." should result in the same location
 
1392
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1393
 
 
1394
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1395
                         transport.abspath("/foo"))
 
1396
 
 
1397
    def test_win32_abspath(self):
 
1398
        # Note: we tried to set sys.platform='win32' so we could test on
 
1399
        # other platforms too, but then osutils does platform specific
 
1400
        # things at import time which defeated us...
 
1401
        if sys.platform != 'win32':
 
1402
            raise TestSkipped(
 
1403
                'Testing drive letters in abspath implemented only for win32')
 
1404
 
 
1405
        # smoke test for abspath on win32.
 
1406
        # a transport based on 'file:///' never fully qualifies the drive.
 
1407
        transport = get_transport("file:///")
 
1408
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1409
 
 
1410
        # but a transport that starts with a drive spec must keep it.
 
1411
        transport = get_transport("file:///C:/")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1413
 
867
1414
    def test_local_abspath(self):
868
1415
        transport = self.get_transport()
869
1416
        try:
870
1417
            p = transport.local_abspath('.')
871
 
        except TransportNotPossible:
872
 
            pass # This is not a local transport
 
1418
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1419
            # should be formattable
 
1420
            s = str(e)
873
1421
        else:
874
1422
            self.assertEqual(getcwd(), p)
875
1423
 
900
1448
                         'isolated/dir/',
901
1449
                         'isolated/dir/foo',
902
1450
                         'isolated/dir/bar',
 
1451
                         'isolated/dir/b%25z', # make sure quoting is correct
903
1452
                         'isolated/bar'],
904
1453
                        transport=transport)
905
1454
        paths = set(transport.iter_files_recursive())
907
1456
        self.assertEqual(paths,
908
1457
                    set(['isolated/dir/foo',
909
1458
                         'isolated/dir/bar',
 
1459
                         'isolated/dir/b%2525z',
910
1460
                         'isolated/bar']))
911
1461
        sub_transport = transport.clone('isolated')
912
1462
        paths = set(sub_transport.iter_files_recursive())
913
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
1463
        self.assertEqual(paths,
 
1464
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1465
 
 
1466
    def test_copy_tree(self):
 
1467
        # TODO: test file contents and permissions are preserved. This test was
 
1468
        # added just to ensure that quoting was handled correctly.
 
1469
        # -- David Allouche 2006-08-11
 
1470
        transport = self.get_transport()
 
1471
        if not transport.listable():
 
1472
            self.assertRaises(TransportNotPossible,
 
1473
                              transport.iter_files_recursive)
 
1474
            return
 
1475
        if transport.is_readonly():
 
1476
            return
 
1477
        self.build_tree(['from/',
 
1478
                         'from/dir/',
 
1479
                         'from/dir/foo',
 
1480
                         'from/dir/bar',
 
1481
                         'from/dir/b%25z', # make sure quoting is correct
 
1482
                         'from/bar'],
 
1483
                        transport=transport)
 
1484
        transport.copy_tree('from', 'to')
 
1485
        paths = set(transport.iter_files_recursive())
 
1486
        self.assertEqual(paths,
 
1487
                    set(['from/dir/foo',
 
1488
                         'from/dir/bar',
 
1489
                         'from/dir/b%2525z',
 
1490
                         'from/bar',
 
1491
                         'to/dir/foo',
 
1492
                         'to/dir/bar',
 
1493
                         'to/dir/b%2525z',
 
1494
                         'to/bar',]))
 
1495
 
 
1496
    def test_copy_tree_to_transport(self):
 
1497
        transport = self.get_transport()
 
1498
        if not transport.listable():
 
1499
            self.assertRaises(TransportNotPossible,
 
1500
                              transport.iter_files_recursive)
 
1501
            return
 
1502
        if transport.is_readonly():
 
1503
            return
 
1504
        self.build_tree(['from/',
 
1505
                         'from/dir/',
 
1506
                         'from/dir/foo',
 
1507
                         'from/dir/bar',
 
1508
                         'from/dir/b%25z', # make sure quoting is correct
 
1509
                         'from/bar'],
 
1510
                        transport=transport)
 
1511
        from_transport = transport.clone('from')
 
1512
        to_transport = transport.clone('to')
 
1513
        to_transport.ensure_base()
 
1514
        from_transport.copy_tree_to_transport(to_transport)
 
1515
        paths = set(transport.iter_files_recursive())
 
1516
        self.assertEqual(paths,
 
1517
                    set(['from/dir/foo',
 
1518
                         'from/dir/bar',
 
1519
                         'from/dir/b%2525z',
 
1520
                         'from/bar',
 
1521
                         'to/dir/foo',
 
1522
                         'to/dir/bar',
 
1523
                         'to/dir/b%2525z',
 
1524
                         'to/bar',]))
914
1525
 
915
1526
    def test_unicode_paths(self):
916
1527
        """Test that we can read/write files with Unicode names."""
928
1539
                 u'\u65e5', # Kanji person
929
1540
                ]
930
1541
 
 
1542
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1543
        if no_unicode_support:
 
1544
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1545
 
931
1546
        try:
932
1547
            self.build_tree(files, transport=t, line_endings='binary')
933
1548
        except UnicodeError:
943
1558
            self.check_transport_contents(contents, t, urlutils.escape(fname))
944
1559
 
945
1560
    def test_connect_twice_is_same_content(self):
946
 
        # check that our server (whatever it is) is accessable reliably
 
1561
        # check that our server (whatever it is) is accessible reliably
947
1562
        # via get_transport and multiple connections share content.
948
1563
        transport = self.get_transport()
949
1564
        if transport.is_readonly():
950
1565
            return
951
 
        transport.put('foo', StringIO('bar'))
952
 
        transport2 = self.get_transport()
953
 
        self.check_transport_contents('bar', transport2, 'foo')
954
 
        # its base should be usable.
955
 
        transport2 = bzrlib.transport.get_transport(transport.base)
956
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1566
        transport.put_bytes('foo', 'bar')
 
1567
        transport3 = self.get_transport()
 
1568
        self.check_transport_contents('bar', transport3, 'foo')
957
1569
 
958
1570
        # now opening at a relative url should give use a sane result:
959
1571
        transport.mkdir('newdir')
960
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
961
 
        transport2 = transport2.clone('..')
962
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1572
        transport5 = self.get_transport('newdir')
 
1573
        transport6 = transport5.clone('..')
 
1574
        self.check_transport_contents('bar', transport6, 'foo')
963
1575
 
964
1576
    def test_lock_write(self):
 
1577
        """Test transport-level write locks.
 
1578
 
 
1579
        These are deprecated and transports may decline to support them.
 
1580
        """
965
1581
        transport = self.get_transport()
966
1582
        if transport.is_readonly():
967
1583
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
968
1584
            return
969
 
        transport.put('lock', StringIO())
970
 
        lock = transport.lock_write('lock')
 
1585
        transport.put_bytes('lock', '')
 
1586
        try:
 
1587
            lock = transport.lock_write('lock')
 
1588
        except TransportNotPossible:
 
1589
            return
971
1590
        # TODO make this consistent on all platforms:
972
1591
        # self.assertRaises(LockError, transport.lock_write, 'lock')
973
1592
        lock.unlock()
974
1593
 
975
1594
    def test_lock_read(self):
 
1595
        """Test transport-level read locks.
 
1596
 
 
1597
        These are deprecated and transports may decline to support them.
 
1598
        """
976
1599
        transport = self.get_transport()
977
1600
        if transport.is_readonly():
978
1601
            file('lock', 'w').close()
979
1602
        else:
980
 
            transport.put('lock', StringIO())
981
 
        lock = transport.lock_read('lock')
 
1603
            transport.put_bytes('lock', '')
 
1604
        try:
 
1605
            lock = transport.lock_read('lock')
 
1606
        except TransportNotPossible:
 
1607
            return
982
1608
        # TODO make this consistent on all platforms:
983
1609
        # self.assertRaises(LockError, transport.lock_read, 'lock')
984
1610
        lock.unlock()
988
1614
        if transport.is_readonly():
989
1615
            file('a', 'w').write('0123456789')
990
1616
        else:
991
 
            transport.put('a', StringIO('01234567890'))
 
1617
            transport.put_bytes('a', '0123456789')
 
1618
 
 
1619
        d = list(transport.readv('a', ((0, 1),)))
 
1620
        self.assertEqual(d[0], (0, '0'))
992
1621
 
993
1622
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
994
1623
        self.assertEqual(d[0], (0, '0'))
995
1624
        self.assertEqual(d[1], (1, '1'))
996
1625
        self.assertEqual(d[2], (3, '34'))
997
1626
        self.assertEqual(d[3], (9, '9'))
 
1627
 
 
1628
    def test_readv_out_of_order(self):
 
1629
        transport = self.get_transport()
 
1630
        if transport.is_readonly():
 
1631
            file('a', 'w').write('0123456789')
 
1632
        else:
 
1633
            transport.put_bytes('a', '01234567890')
 
1634
 
 
1635
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1636
        self.assertEqual(d[0], (1, '1'))
 
1637
        self.assertEqual(d[1], (9, '9'))
 
1638
        self.assertEqual(d[2], (0, '0'))
 
1639
        self.assertEqual(d[3], (3, '34'))
 
1640
 
 
1641
    def test_readv_with_adjust_for_latency(self):
 
1642
        transport = self.get_transport()
 
1643
        # the adjust for latency flag expands the data region returned
 
1644
        # according to a per-transport heuristic, so testing is a little
 
1645
        # tricky as we need more data than the largest combining that our
 
1646
        # transports do. To accomodate this we generate random data and cross
 
1647
        # reference the returned data with the random data. To avoid doing
 
1648
        # multiple large random byte look ups we do several tests on the same
 
1649
        # backing data.
 
1650
        content = osutils.rand_bytes(200*1024)
 
1651
        content_size = len(content)
 
1652
        if transport.is_readonly():
 
1653
            self.build_tree_contents([('a', content)])
 
1654
        else:
 
1655
            transport.put_bytes('a', content)
 
1656
        def check_result_data(result_vector):
 
1657
            for item in result_vector:
 
1658
                data_len = len(item[1])
 
1659
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1660
 
 
1661
        # start corner case
 
1662
        result = list(transport.readv('a', ((0, 30),),
 
1663
            adjust_for_latency=True, upper_limit=content_size))
 
1664
        # we expect 1 result, from 0, to something > 30
 
1665
        self.assertEqual(1, len(result))
 
1666
        self.assertEqual(0, result[0][0])
 
1667
        self.assertTrue(len(result[0][1]) >= 30)
 
1668
        check_result_data(result)
 
1669
        # end of file corner case
 
1670
        result = list(transport.readv('a', ((204700, 100),),
 
1671
            adjust_for_latency=True, upper_limit=content_size))
 
1672
        # we expect 1 result, from 204800- its length, to the end
 
1673
        self.assertEqual(1, len(result))
 
1674
        data_len = len(result[0][1])
 
1675
        self.assertEqual(204800-data_len, result[0][0])
 
1676
        self.assertTrue(data_len >= 100)
 
1677
        check_result_data(result)
 
1678
        # out of order ranges are made in order
 
1679
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1680
            adjust_for_latency=True, upper_limit=content_size))
 
1681
        # we expect 2 results, in order, start and end.
 
1682
        self.assertEqual(2, len(result))
 
1683
        # start
 
1684
        data_len = len(result[0][1])
 
1685
        self.assertEqual(0, result[0][0])
 
1686
        self.assertTrue(data_len >= 30)
 
1687
        # end
 
1688
        data_len = len(result[1][1])
 
1689
        self.assertEqual(204800-data_len, result[1][0])
 
1690
        self.assertTrue(data_len >= 100)
 
1691
        check_result_data(result)
 
1692
        # close ranges get combined (even if out of order)
 
1693
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1694
            result = list(transport.readv('a', request_vector,
 
1695
                adjust_for_latency=True, upper_limit=content_size))
 
1696
            self.assertEqual(1, len(result))
 
1697
            data_len = len(result[0][1])
 
1698
            # minimum length is from 400 to 1034 - 634
 
1699
            self.assertTrue(data_len >= 634)
 
1700
            # must contain the region 400 to 1034
 
1701
            self.assertTrue(result[0][0] <= 400)
 
1702
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1703
            check_result_data(result)
 
1704
 
 
1705
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1706
        transport = self.get_transport()
 
1707
        # test from observed failure case.
 
1708
        if transport.is_readonly():
 
1709
            file('a', 'w').write('a'*1024*1024)
 
1710
        else:
 
1711
            transport.put_bytes('a', 'a'*1024*1024)
 
1712
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1713
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1714
            (465373, 800), (947422, 800)]
 
1715
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1716
        found_items = [False]*9
 
1717
        for pos, (start, length) in enumerate(broken_vector):
 
1718
            # check the range is covered by the result
 
1719
            for offset, data in results:
 
1720
                if offset <= start and start + length <= offset + len(data):
 
1721
                    found_items[pos] = True
 
1722
        self.assertEqual([True]*9, found_items)
 
1723
 
 
1724
    def test_get_with_open_write_stream_sees_all_content(self):
 
1725
        t = self.get_transport()
 
1726
        if t.is_readonly():
 
1727
            return
 
1728
        handle = t.open_write_stream('foo')
 
1729
        try:
 
1730
            handle.write('bcd')
 
1731
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1732
        finally:
 
1733
            handle.close()
 
1734
 
 
1735
    def test_get_smart_medium(self):
 
1736
        """All transports must either give a smart medium, or know they can't.
 
1737
        """
 
1738
        transport = self.get_transport()
 
1739
        try:
 
1740
            client_medium = transport.get_smart_medium()
 
1741
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1742
        except errors.NoSmartMedium:
 
1743
            # as long as we got it we're fine
 
1744
            pass
 
1745
 
 
1746
    def test_readv_short_read(self):
 
1747
        transport = self.get_transport()
 
1748
        if transport.is_readonly():
 
1749
            file('a', 'w').write('0123456789')
 
1750
        else:
 
1751
            transport.put_bytes('a', '01234567890')
 
1752
 
 
1753
        # This is intentionally reading off the end of the file
 
1754
        # since we are sure that it cannot get there
 
1755
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1756
                               # Can be raised by paramiko
 
1757
                               AssertionError),
 
1758
                              transport.readv, 'a', [(1,1), (8,10)])
 
1759
 
 
1760
        # This is trying to seek past the end of the file, it should
 
1761
        # also raise a special error
 
1762
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1763
                              transport.readv, 'a', [(12,2)])
 
1764
 
 
1765
    def test_stat_symlink(self):
 
1766
        # if a transport points directly to a symlink (and supports symlinks
 
1767
        # at all) you can tell this.  helps with bug 32669.
 
1768
        t = self.get_transport()
 
1769
        try:
 
1770
            t.symlink('target', 'link')
 
1771
        except TransportNotPossible:
 
1772
            raise TestSkipped("symlinks not supported")
 
1773
        t2 = t.clone('link')
 
1774
        st = t2.stat('')
 
1775
        self.assertTrue(stat.S_ISLNK(st.st_mode))