~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Robert Collins
  • Date: 2009-09-07 03:08:30 UTC
  • mto: This revision was merged to the branch mainline in revision 4690.
  • Revision ID: robertc@robertcollins.net-20090907030830-rf59kt28d550eauj
Milestones language tightning, internal consistency.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
 
29
import unittest
27
30
 
28
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
 
31
from bzrlib import (
 
32
    errors,
 
33
    osutils,
 
34
    tests,
 
35
    urlutils,
 
36
    )
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
29
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
30
44
                           PathError,
31
 
                           TransportNotPossible, ConnectionError)
32
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
33
 
from bzrlib.transport import memory, urlescape
34
 
import bzrlib.transport
35
 
 
36
 
 
37
 
def _append(fn, txt):
38
 
    """Append the given text (file-like object) to the supplied filename."""
39
 
    f = open(fn, 'ab')
40
 
    try:
41
 
        f.write(txt.read())
42
 
    finally:
43
 
        f.close()
44
 
 
45
 
 
46
 
class TestTransportImplementation(TestCaseInTempDir):
47
 
    """Implementation verification for transports.
48
 
    
49
 
    To verify a transport we need a server factory, which is a callable
50
 
    that accepts no parameters and returns an implementation of
51
 
    bzrlib.transport.Server.
52
 
    
53
 
    That Server is then used to construct transport instances and test
54
 
    the transport via loopback activity.
55
 
 
56
 
    Currently this assumes that the Transport object is connected to the 
57
 
    current working directory.  So that whatever is done 
58
 
    through the transport, should show up in the working 
59
 
    directory, and vice-versa. This is a bug, because its possible to have
60
 
    URL schemes which provide access to something that may not be 
61
 
    result in storage on the local disk, i.e. due to file system limits, or 
62
 
    due to it being a database or some other non-filesystem tool.
63
 
 
64
 
    This also tests to make sure that the functions work with both
65
 
    generators and lists (assuming iter(list) is effectively a generator)
66
 
    """
67
 
    
 
45
                           TransportNotPossible,
 
46
                           )
 
47
from bzrlib.osutils import getcwd
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestSkipped,
 
52
    TestNotApplicable,
 
53
    multiply_tests,
 
54
    )
 
55
from bzrlib.tests.test_transport import TestTransportImplementation
 
56
from bzrlib.transport import (
 
57
    ConnectedTransport,
 
58
    get_transport,
 
59
    _get_transport_modules,
 
60
    )
 
61
from bzrlib.transport.memory import MemoryTransport
 
62
 
 
63
 
 
64
def get_transport_test_permutations(module):
 
65
    """Get the permutations module wants to have tested."""
 
66
    if getattr(module, 'get_test_permutations', None) is None:
 
67
        raise AssertionError(
 
68
            "transport module %s doesn't provide get_test_permutations()"
 
69
            % module.__name__)
 
70
        return []
 
71
    return module.get_test_permutations()
 
72
 
 
73
 
 
74
def transport_test_permutations():
 
75
    """Return a list of the klass, server_factory pairs to test."""
 
76
    result = []
 
77
    for module in _get_transport_modules():
 
78
        try:
 
79
            permutations = get_transport_test_permutations(
 
80
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
81
            for (klass, server_factory) in permutations:
 
82
                scenario = (server_factory.__name__,
 
83
                    {"transport_class":klass,
 
84
                     "transport_server":server_factory})
 
85
                result.append(scenario)
 
86
        except errors.DependencyNotPresent, e:
 
87
            # Continue even if a dependency prevents us
 
88
            # from adding this test
 
89
            pass
 
90
    return result
 
91
 
 
92
 
 
93
def load_tests(standard_tests, module, loader):
 
94
    """Multiply tests for tranport implementations."""
 
95
    result = loader.suiteClass()
 
96
    scenarios = transport_test_permutations()
 
97
    return multiply_tests(standard_tests, scenarios, result)
 
98
 
 
99
 
 
100
class TransportTests(TestTransportImplementation):
 
101
 
68
102
    def setUp(self):
69
 
        super(TestTransportImplementation, self).setUp()
70
 
        self._server = self.transport_server()
71
 
        self._server.setUp()
 
103
        super(TransportTests, self).setUp()
 
104
        self._captureVar('BZR_NO_SMART_VFS', None)
72
105
 
73
 
    def tearDown(self):
74
 
        super(TestTransportImplementation, self).tearDown()
75
 
        self._server.tearDown()
76
 
        
77
106
    def check_transport_contents(self, content, transport, relpath):
78
107
        """Check that transport.get(relpath).read() == content."""
79
108
        self.assertEqualDiff(content, transport.get(relpath).read())
80
109
 
81
 
    def get_transport(self):
82
 
        """Return a connected transport to the local directory."""
83
 
        t = bzrlib.transport.get_transport(self._server.get_url())
84
 
        self.failUnless(isinstance(t, self.transport_class), 
85
 
                        "Got the wrong class from get_transport"
86
 
                        "(%r, expected %r)" % (t.__class__, 
87
 
                                               self.transport_class))
88
 
        return t
89
 
 
90
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
91
 
        """Fail unless excClass is raised when the iterator from func is used.
92
 
        
93
 
        Many transport functions can return generators this makes sure
94
 
        to wrap them in a list() call to make sure the whole generator
95
 
        is run, and that the proper exception is raised.
96
 
        """
 
110
    def test_ensure_base_missing(self):
 
111
        """.ensure_base() should create the directory if it doesn't exist"""
 
112
        t = self.get_transport()
 
113
        t_a = t.clone('a')
 
114
        if t_a.is_readonly():
 
115
            self.assertRaises(TransportNotPossible,
 
116
                              t_a.ensure_base)
 
117
            return
 
118
        self.assertTrue(t_a.ensure_base())
 
119
        self.assertTrue(t.has('a'))
 
120
 
 
121
    def test_ensure_base_exists(self):
 
122
        """.ensure_base() should just be happy if it already exists"""
 
123
        t = self.get_transport()
 
124
        if t.is_readonly():
 
125
            return
 
126
 
 
127
        t.mkdir('a')
 
128
        t_a = t.clone('a')
 
129
        # ensure_base returns False if it didn't create the base
 
130
        self.assertFalse(t_a.ensure_base())
 
131
 
 
132
    def test_ensure_base_missing_parent(self):
 
133
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
134
        t = self.get_transport()
 
135
        if t.is_readonly():
 
136
            return
 
137
 
 
138
        t_a = t.clone('a')
 
139
        t_b = t_a.clone('b')
 
140
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
141
 
 
142
    def test_external_url(self):
 
143
        """.external_url either works or raises InProcessTransport."""
 
144
        t = self.get_transport()
97
145
        try:
98
 
            list(func(*args, **kwargs))
99
 
        except excClass:
100
 
            return
101
 
        else:
102
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
103
 
            else: excName = str(excClass)
104
 
            raise self.failureException, "%s not raised" % excName
 
146
            t.external_url()
 
147
        except errors.InProcessTransport:
 
148
            pass
105
149
 
106
150
    def test_has(self):
107
151
        t = self.get_transport()
110
154
        self.build_tree(files, transport=t)
111
155
        self.assertEqual(True, t.has('a'))
112
156
        self.assertEqual(False, t.has('c'))
113
 
        self.assertEqual(True, t.has(urlescape('%')))
114
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
115
 
                [True, True, False, False, True, False, True, False])
 
157
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
159
                                           'e', 'f', 'g', 'h'])),
 
160
                         [True, True, False, False,
 
161
                          True, False, True, False])
116
162
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
117
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
118
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
119
 
                [True, True, False, False, True, False, True, False])
 
163
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
164
                                           urlutils.escape('%%')]))
 
165
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
166
                                                'e', 'f', 'g', 'h']))),
 
167
                         [True, True, False, False,
 
168
                          True, False, True, False])
120
169
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
121
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
122
171
 
 
172
    def test_has_root_works(self):
 
173
        from bzrlib.smart import server
 
174
        if self.transport_server is server.SmartTCPServer_for_testing:
 
175
            raise TestNotApplicable(
 
176
                "SmartTCPServer_for_testing intentionally does not allow "
 
177
                "access to /.")
 
178
        current_transport = self.get_transport()
 
179
        self.assertTrue(current_transport.has('/'))
 
180
        root = current_transport.clone('/')
 
181
        self.assertTrue(root.has(''))
 
182
 
123
183
    def test_get(self):
124
184
        t = self.get_transport()
125
185
 
129
189
                    'contents of e\n',
130
190
                    'contents of g\n',
131
191
                    ]
132
 
        self.build_tree(files, transport=t)
 
192
        self.build_tree(files, transport=t, line_endings='binary')
133
193
        self.check_transport_contents('contents of a\n', t, 'a')
134
194
        content_f = t.get_multi(files)
135
 
        for content, f in zip(contents, content_f):
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
136
199
            self.assertEqual(content, f.read())
137
200
 
138
201
        content_f = t.get_multi(iter(files))
139
 
        for content, f in zip(contents, content_f):
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
140
204
            self.assertEqual(content, f.read())
141
205
 
 
206
    def test_get_unknown_file(self):
 
207
        t = self.get_transport()
 
208
        files = ['a', 'b']
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
 
211
                    ]
 
212
        self.build_tree(files, transport=t, line_endings='binary')
142
213
        self.assertRaises(NoSuchFile, t.get, 'c')
143
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
144
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
145
216
 
146
 
    def test_put(self):
147
 
        t = self.get_transport()
148
 
 
149
 
        if t.is_readonly():
150
 
            self.assertRaises(TransportNotPossible,
151
 
                    t.put, 'a', 'some text for a\n')
152
 
            return
153
 
 
154
 
        t.put('a', StringIO('some text for a\n'))
155
 
        self.failUnless(t.has('a'))
156
 
        self.check_transport_contents('some text for a\n', t, 'a')
157
 
        # Make sure 'has' is updated
158
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
159
 
                [True, False, False, False, False])
160
 
        # Put also replaces contents
161
 
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
162
 
                                      ('d', StringIO('contents\nfor d\n'))]),
163
 
                         2)
164
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
165
 
                [True, False, False, True, False])
166
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
167
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
168
 
 
169
 
        self.assertEqual(
170
 
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
171
 
                              ('d', StringIO('another contents\nfor d\n'))])),
172
 
                        2)
173
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
174
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
175
 
 
176
 
        self.assertRaises(NoSuchFile,
177
 
                          t.put, 'path/doesnt/exist/c', 'contents')
178
 
 
179
 
    def test_put_permissions(self):
180
 
        t = self.get_transport()
181
 
 
182
 
        if t.is_readonly():
183
 
            return
184
 
        t.put('mode644', StringIO('test text\n'), mode=0644)
185
 
        self.assertTransportMode(t, 'mode644', 0644)
186
 
        t.put('mode666', StringIO('test text\n'), mode=0666)
187
 
        self.assertTransportMode(t, 'mode666', 0666)
188
 
        t.put('mode600', StringIO('test text\n'), mode=0600)
 
217
    def test_get_directory_read_gives_ReadError(self):
 
218
        """consistent errors for read() on a file returned by get()."""
 
219
        t = self.get_transport()
 
220
        if t.is_readonly():
 
221
            self.build_tree(['a directory/'])
 
222
        else:
 
223
            t.mkdir('a%20directory')
 
224
        # getting the file must either work or fail with a PathError
 
225
        try:
 
226
            a_file = t.get('a%20directory')
 
227
        except (errors.PathError, errors.RedirectRequested):
 
228
            # early failure return immediately.
 
229
            return
 
230
        # having got a file, read() must either work (i.e. http reading a dir
 
231
        # listing) or fail with ReadError
 
232
        try:
 
233
            a_file.read()
 
234
        except errors.ReadError:
 
235
            pass
 
236
 
 
237
    def test_get_bytes(self):
 
238
        t = self.get_transport()
 
239
 
 
240
        files = ['a', 'b', 'e', 'g']
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
 
245
                    ]
 
246
        self.build_tree(files, transport=t, line_endings='binary')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
 
248
 
 
249
        for content, fname in zip(contents, files):
 
250
            self.assertEqual(content, t.get_bytes(fname))
 
251
 
 
252
    def test_get_bytes_unknown_file(self):
 
253
        t = self.get_transport()
 
254
 
 
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
256
 
 
257
    def test_get_with_open_write_stream_sees_all_content(self):
 
258
        t = self.get_transport()
 
259
        if t.is_readonly():
 
260
            return
 
261
        handle = t.open_write_stream('foo')
 
262
        try:
 
263
            handle.write('b')
 
264
            self.assertEqual('b', t.get('foo').read())
 
265
        finally:
 
266
            handle.close()
 
267
 
 
268
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
269
        t = self.get_transport()
 
270
        if t.is_readonly():
 
271
            return
 
272
        handle = t.open_write_stream('foo')
 
273
        try:
 
274
            handle.write('b')
 
275
            self.assertEqual('b', t.get_bytes('foo'))
 
276
            self.assertEqual('b', t.get('foo').read())
 
277
        finally:
 
278
            handle.close()
 
279
 
 
280
    def test_put_bytes(self):
 
281
        t = self.get_transport()
 
282
 
 
283
        if t.is_readonly():
 
284
            self.assertRaises(TransportNotPossible,
 
285
                    t.put_bytes, 'a', 'some text for a\n')
 
286
            return
 
287
 
 
288
        t.put_bytes('a', 'some text for a\n')
 
289
        self.failUnless(t.has('a'))
 
290
        self.check_transport_contents('some text for a\n', t, 'a')
 
291
 
 
292
        # The contents should be overwritten
 
293
        t.put_bytes('a', 'new text for a\n')
 
294
        self.check_transport_contents('new text for a\n', t, 'a')
 
295
 
 
296
        self.assertRaises(NoSuchFile,
 
297
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
298
 
 
299
    def test_put_bytes_non_atomic(self):
 
300
        t = self.get_transport()
 
301
 
 
302
        if t.is_readonly():
 
303
            self.assertRaises(TransportNotPossible,
 
304
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
305
            return
 
306
 
 
307
        self.failIf(t.has('a'))
 
308
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
309
        self.failUnless(t.has('a'))
 
310
        self.check_transport_contents('some text for a\n', t, 'a')
 
311
        # Put also replaces contents
 
312
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
313
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
314
 
 
315
        # Make sure we can create another file
 
316
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
317
        # And overwrite 'a' with empty contents
 
318
        t.put_bytes_non_atomic('a', '')
 
319
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
320
        self.check_transport_contents('', t, 'a')
 
321
 
 
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
323
                                       'contents\n')
 
324
        # Now test the create_parent flag
 
325
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
326
                                       'contents\n')
 
327
        self.failIf(t.has('dir/a'))
 
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
329
                               create_parent_dir=True)
 
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
331
 
 
332
        # But we still get NoSuchFile if we can't make the parent dir
 
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
334
                                       'contents\n',
 
335
                                       create_parent_dir=True)
 
336
 
 
337
    def test_put_bytes_permissions(self):
 
338
        t = self.get_transport()
 
339
 
 
340
        if t.is_readonly():
 
341
            return
 
342
        if not t._can_roundtrip_unix_modebits():
 
343
            # Can't roundtrip, so no need to run this test
 
344
            return
 
345
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
346
        self.assertTransportMode(t, 'mode644', 0644)
 
347
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
348
        self.assertTransportMode(t, 'mode666', 0666)
 
349
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
350
        self.assertTransportMode(t, 'mode600', 0600)
 
351
        # Yes, you can put_bytes a file such that it becomes readonly
 
352
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
353
        self.assertTransportMode(t, 'mode400', 0400)
 
354
 
 
355
        # The default permissions should be based on the current umask
 
356
        umask = osutils.get_umask()
 
357
        t.put_bytes('nomode', 'test text\n', mode=None)
 
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
359
 
 
360
    def test_put_bytes_non_atomic_permissions(self):
 
361
        t = self.get_transport()
 
362
 
 
363
        if t.is_readonly():
 
364
            return
 
365
        if not t._can_roundtrip_unix_modebits():
 
366
            # Can't roundtrip, so no need to run this test
 
367
            return
 
368
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
369
        self.assertTransportMode(t, 'mode644', 0644)
 
370
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
371
        self.assertTransportMode(t, 'mode666', 0666)
 
372
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
373
        self.assertTransportMode(t, 'mode600', 0600)
 
374
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
375
        self.assertTransportMode(t, 'mode400', 0400)
 
376
 
 
377
        # The default permissions should be based on the current umask
 
378
        umask = osutils.get_umask()
 
379
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
380
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
381
 
 
382
        # We should also be able to set the mode for a parent directory
 
383
        # when it is created
 
384
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0700, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir700', 0700)
 
387
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0770, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir770', 0770)
 
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
391
                               dir_mode=0777, create_parent_dir=True)
 
392
        self.assertTransportMode(t, 'dir777', 0777)
 
393
 
 
394
    def test_put_file(self):
 
395
        t = self.get_transport()
 
396
 
 
397
        if t.is_readonly():
 
398
            self.assertRaises(TransportNotPossible,
 
399
                    t.put_file, 'a', StringIO('some text for a\n'))
 
400
            return
 
401
 
 
402
        result = t.put_file('a', StringIO('some text for a\n'))
 
403
        # put_file returns the length of the data written
 
404
        self.assertEqual(16, result)
 
405
        self.failUnless(t.has('a'))
 
406
        self.check_transport_contents('some text for a\n', t, 'a')
 
407
        # Put also replaces contents
 
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
409
        self.assertEqual(19, result)
 
410
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
411
        self.assertRaises(NoSuchFile,
 
412
                          t.put_file, 'path/doesnt/exist/c',
 
413
                              StringIO('contents'))
 
414
 
 
415
    def test_put_file_non_atomic(self):
 
416
        t = self.get_transport()
 
417
 
 
418
        if t.is_readonly():
 
419
            self.assertRaises(TransportNotPossible,
 
420
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
421
            return
 
422
 
 
423
        self.failIf(t.has('a'))
 
424
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
425
        self.failUnless(t.has('a'))
 
426
        self.check_transport_contents('some text for a\n', t, 'a')
 
427
        # Put also replaces contents
 
428
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
429
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
430
 
 
431
        # Make sure we can create another file
 
432
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
433
        # And overwrite 'a' with empty contents
 
434
        t.put_file_non_atomic('a', StringIO(''))
 
435
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
436
        self.check_transport_contents('', t, 'a')
 
437
 
 
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
439
                                       StringIO('contents\n'))
 
440
        # Now test the create_parent flag
 
441
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
442
                                       StringIO('contents\n'))
 
443
        self.failIf(t.has('dir/a'))
 
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
445
                              create_parent_dir=True)
 
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
447
 
 
448
        # But we still get NoSuchFile if we can't make the parent dir
 
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
450
                                       StringIO('contents\n'),
 
451
                                       create_parent_dir=True)
 
452
 
 
453
    def test_put_file_permissions(self):
 
454
 
 
455
        t = self.get_transport()
 
456
 
 
457
        if t.is_readonly():
 
458
            return
 
459
        if not t._can_roundtrip_unix_modebits():
 
460
            # Can't roundtrip, so no need to run this test
 
461
            return
 
462
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
463
        self.assertTransportMode(t, 'mode644', 0644)
 
464
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
465
        self.assertTransportMode(t, 'mode666', 0666)
 
466
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
189
467
        self.assertTransportMode(t, 'mode600', 0600)
190
468
        # Yes, you can put a file such that it becomes readonly
191
 
        t.put('mode400', StringIO('test text\n'), mode=0400)
192
 
        self.assertTransportMode(t, 'mode400', 0400)
193
 
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
194
 
        self.assertTransportMode(t, 'mmode644', 0644)
195
 
        
 
469
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
470
        self.assertTransportMode(t, 'mode400', 0400)
 
471
        # The default permissions should be based on the current umask
 
472
        umask = osutils.get_umask()
 
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
475
 
 
476
    def test_put_file_non_atomic_permissions(self):
 
477
        t = self.get_transport()
 
478
 
 
479
        if t.is_readonly():
 
480
            return
 
481
        if not t._can_roundtrip_unix_modebits():
 
482
            # Can't roundtrip, so no need to run this test
 
483
            return
 
484
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
485
        self.assertTransportMode(t, 'mode644', 0644)
 
486
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
487
        self.assertTransportMode(t, 'mode666', 0666)
 
488
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
489
        self.assertTransportMode(t, 'mode600', 0600)
 
490
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
491
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
492
        self.assertTransportMode(t, 'mode400', 0400)
 
493
 
 
494
        # The default permissions should be based on the current umask
 
495
        umask = osutils.get_umask()
 
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
498
 
 
499
        # We should also be able to set the mode for a parent directory
 
500
        # when it is created
 
501
        sio = StringIO()
 
502
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
503
                              dir_mode=0700, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir700', 0700)
 
505
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
506
                              dir_mode=0770, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir770', 0770)
 
508
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
509
                              dir_mode=0777, create_parent_dir=True)
 
510
        self.assertTransportMode(t, 'dir777', 0777)
 
511
 
 
512
    def test_put_bytes_unicode(self):
 
513
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
514
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
515
        # (we don't want to encode unicode here at all, callers should be
 
516
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
517
        # compatibility.  At some point we should use a specific exception.
 
518
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
519
        t = self.get_transport()
 
520
        if t.is_readonly():
 
521
            return
 
522
        unicode_string = u'\u1234'
 
523
        self.assertRaises(
 
524
            (AssertionError, UnicodeEncodeError),
 
525
            t.put_bytes, 'foo', unicode_string)
 
526
 
 
527
    def test_put_file_unicode(self):
 
528
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
529
        # This situation can happen (and has) if code is careless about the type
 
530
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
531
        # cStringIO, because it never returns unicode from read.
 
532
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
533
        # raise, but we raise it for hysterical raisins.
 
534
        t = self.get_transport()
 
535
        if t.is_readonly():
 
536
            return
 
537
        unicode_file = pyStringIO(u'\u1234')
 
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
539
 
196
540
    def test_mkdir(self):
197
541
        t = self.get_transport()
198
542
 
199
543
        if t.is_readonly():
200
 
            # cannot mkdir on readonly transports. We're not testing for 
 
544
            # cannot mkdir on readonly transports. We're not testing for
201
545
            # cache coherency because cache behaviour is not currently
202
546
            # defined for the transport interface.
203
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
224
568
 
225
569
        # we were testing that a local mkdir followed by a transport
226
570
        # mkdir failed thusly, but given that we * in one process * do not
227
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
571
        # concurrently fiddle with disk dirs and then use transport to do
228
572
        # things, the win here seems marginal compared to the constraint on
229
573
        # the interface. RBC 20051227
230
574
        t.mkdir('dir_g')
231
575
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
232
576
 
233
577
        # Test get/put in sub-directories
234
 
        self.assertEqual(
235
 
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
236
 
                         ('dir_b/b', StringIO('contents of dir_b/b'))])
237
 
                        , 2)
 
578
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
579
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
238
580
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
239
581
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
240
582
 
245
587
        t = self.get_transport()
246
588
        if t.is_readonly():
247
589
            return
 
590
        if not t._can_roundtrip_unix_modebits():
 
591
            # no sense testing on this transport
 
592
            return
248
593
        # Test mkdir with a mode
249
594
        t.mkdir('dmode755', mode=0755)
250
595
        self.assertTransportMode(t, 'dmode755', 0755)
254
599
        self.assertTransportMode(t, 'dmode777', 0777)
255
600
        t.mkdir('dmode700', mode=0700)
256
601
        self.assertTransportMode(t, 'dmode700', 0700)
257
 
        # TODO: jam 20051215 test mkdir_multi with a mode
258
602
        t.mkdir_multi(['mdmode755'], mode=0755)
259
603
        self.assertTransportMode(t, 'mdmode755', 0755)
260
604
 
 
605
        # Default mode should be based on umask
 
606
        umask = osutils.get_umask()
 
607
        t.mkdir('dnomode', mode=None)
 
608
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
609
 
 
610
    def test_opening_a_file_stream_creates_file(self):
 
611
        t = self.get_transport()
 
612
        if t.is_readonly():
 
613
            return
 
614
        handle = t.open_write_stream('foo')
 
615
        try:
 
616
            self.assertEqual('', t.get_bytes('foo'))
 
617
        finally:
 
618
            handle.close()
 
619
 
 
620
    def test_opening_a_file_stream_can_set_mode(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        if not t._can_roundtrip_unix_modebits():
 
625
            # Can't roundtrip, so no need to run this test
 
626
            return
 
627
        def check_mode(name, mode, expected):
 
628
            handle = t.open_write_stream(name, mode=mode)
 
629
            handle.close()
 
630
            self.assertTransportMode(t, name, expected)
 
631
        check_mode('mode644', 0644, 0644)
 
632
        check_mode('mode666', 0666, 0666)
 
633
        check_mode('mode600', 0600, 0600)
 
634
        # The default permissions should be based on the current umask
 
635
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
636
 
261
637
    def test_copy_to(self):
262
638
        # FIXME: test:   same server to same server (partly done)
263
639
        # same protocol two servers
264
640
        # and    different protocols (done for now except for MemoryTransport.
265
641
        # - RBC 20060122
266
 
        from bzrlib.transport.memory import MemoryTransport
267
642
 
268
643
        def simple_copy_files(transport_from, transport_to):
269
644
            files = ['a', 'b', 'c', 'd']
270
645
            self.build_tree(files, transport=transport_from)
271
 
            transport_from.copy_to(files, transport_to)
 
646
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
272
647
            for f in files:
273
648
                self.check_transport_contents(transport_to.get(f).read(),
274
649
                                              transport_from, f)
275
650
 
276
651
        t = self.get_transport()
277
 
        temp_transport = MemoryTransport('memory:/')
 
652
        temp_transport = MemoryTransport('memory:///')
278
653
        simple_copy_files(t, temp_transport)
279
654
        if not t.is_readonly():
280
655
            t.mkdir('copy_to_simple')
288
663
            self.build_tree(['e/', 'e/f'])
289
664
        else:
290
665
            t.mkdir('e')
291
 
            t.put('e/f', StringIO('contents of e'))
 
666
            t.put_bytes('e/f', 'contents of e')
292
667
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
293
668
        temp_transport.mkdir('e')
294
669
        t.copy_to(['e/f'], temp_transport)
295
670
 
296
671
        del temp_transport
297
 
        temp_transport = MemoryTransport('memory:/')
 
672
        temp_transport = MemoryTransport('memory:///')
298
673
 
299
674
        files = ['a', 'b', 'c', 'd']
300
675
        t.copy_to(iter(files), temp_transport)
304
679
        del temp_transport
305
680
 
306
681
        for mode in (0666, 0644, 0600, 0400):
307
 
            temp_transport = MemoryTransport("memory:/")
 
682
            temp_transport = MemoryTransport("memory:///")
308
683
            t.copy_to(files, temp_transport, mode=mode)
309
684
            for f in files:
310
685
                self.assertTransportMode(temp_transport, f, mode)
311
686
 
312
 
    def test_append(self):
313
 
        t = self.get_transport()
314
 
 
315
 
        if t.is_readonly():
316
 
            open('a', 'wb').write('diff\ncontents for\na\n')
317
 
            open('b', 'wb').write('contents\nfor b\n')
318
 
        else:
319
 
            t.put_multi([
320
 
                    ('a', StringIO('diff\ncontents for\na\n')),
321
 
                    ('b', StringIO('contents\nfor b\n'))
322
 
                    ])
323
 
 
324
 
        if t.is_readonly():
325
 
            self.assertRaises(TransportNotPossible,
326
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
327
 
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
328
 
        else:
329
 
            t.append('a', StringIO('add\nsome\nmore\ncontents\n'))
330
 
 
331
 
        self.check_transport_contents(
332
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
333
 
            t, 'a')
334
 
 
335
 
        if t.is_readonly():
336
 
            self.assertRaises(TransportNotPossible,
337
 
                    t.append_multi,
338
 
                        [('a', 'and\nthen\nsome\nmore\n'),
339
 
                         ('b', 'some\nmore\nfor\nb\n')])
340
 
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
341
 
            _append('b', StringIO('some\nmore\nfor\nb\n'))
342
 
        else:
 
687
    def test_create_prefix(self):
 
688
        t = self.get_transport()
 
689
        sub = t.clone('foo').clone('bar')
 
690
        try:
 
691
            sub.create_prefix()
 
692
        except TransportNotPossible:
 
693
            self.assertTrue(t.is_readonly())
 
694
        else:
 
695
            self.assertTrue(t.has('foo/bar'))
 
696
 
 
697
    def test_append_file(self):
 
698
        t = self.get_transport()
 
699
 
 
700
        if t.is_readonly():
 
701
            self.assertRaises(TransportNotPossible,
 
702
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
703
            return
 
704
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
705
        t.put_bytes('b', 'contents\nfor b\n')
 
706
 
 
707
        self.assertEqual(20,
 
708
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
709
 
 
710
        self.check_transport_contents(
 
711
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
712
            t, 'a')
 
713
 
 
714
        # a file with no parent should fail..
 
715
        self.assertRaises(NoSuchFile,
 
716
                          t.append_file, 'missing/path', StringIO('content'))
 
717
 
 
718
        # And we can create new files, too
 
719
        self.assertEqual(0,
 
720
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
721
        self.check_transport_contents('some text\nfor a missing file\n',
 
722
                                      t, 'c')
 
723
 
 
724
    def test_append_bytes(self):
 
725
        t = self.get_transport()
 
726
 
 
727
        if t.is_readonly():
 
728
            self.assertRaises(TransportNotPossible,
 
729
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
730
            return
 
731
 
 
732
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
733
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
734
 
 
735
        self.assertEqual(20,
 
736
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
737
 
 
738
        self.check_transport_contents(
 
739
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
740
            t, 'a')
 
741
 
 
742
        # a file with no parent should fail..
 
743
        self.assertRaises(NoSuchFile,
 
744
                          t.append_bytes, 'missing/path', 'content')
 
745
 
 
746
    def test_append_multi(self):
 
747
        t = self.get_transport()
 
748
 
 
749
        if t.is_readonly():
 
750
            return
 
751
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
752
                         'add\nsome\nmore\ncontents\n')
 
753
        t.put_bytes('b', 'contents\nfor b\n')
 
754
 
 
755
        self.assertEqual((43, 15),
343
756
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
344
 
                    ('b', StringIO('some\nmore\nfor\nb\n'))])
 
757
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
758
 
345
759
        self.check_transport_contents(
346
760
            'diff\ncontents for\na\n'
347
761
            'add\nsome\nmore\ncontents\n'
352
766
                'some\nmore\nfor\nb\n',
353
767
                t, 'b')
354
768
 
355
 
        if t.is_readonly():
356
 
            _append('a', StringIO('a little bit more\n'))
357
 
            _append('b', StringIO('from an iterator\n'))
358
 
        else:
 
769
        self.assertEqual((62, 31),
359
770
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
360
 
                    ('b', StringIO('from an iterator\n'))]))
 
771
                                 ('b', StringIO('from an iterator\n'))])))
361
772
        self.check_transport_contents(
362
773
            'diff\ncontents for\na\n'
363
774
            'add\nsome\nmore\ncontents\n'
370
781
                'from an iterator\n',
371
782
                t, 'b')
372
783
 
373
 
        if t.is_readonly():
374
 
            _append('c', StringIO('some text\nfor a missing file\n'))
375
 
            _append('a', StringIO('some text in a\n'))
376
 
            _append('d', StringIO('missing file r\n'))
377
 
        else:
378
 
            t.append('c', StringIO('some text\nfor a missing file\n'))
 
784
        self.assertEqual((80, 0),
379
785
            t.append_multi([('a', StringIO('some text in a\n')),
380
 
                            ('d', StringIO('missing file r\n'))])
 
786
                            ('d', StringIO('missing file r\n'))]))
 
787
 
381
788
        self.check_transport_contents(
382
789
            'diff\ncontents for\na\n'
383
790
            'add\nsome\nmore\ncontents\n'
385
792
            'a little bit more\n'
386
793
            'some text in a\n',
387
794
            t, 'a')
388
 
        self.check_transport_contents('some text\nfor a missing file\n',
389
 
                                      t, 'c')
390
795
        self.check_transport_contents('missing file r\n', t, 'd')
391
 
        
392
 
        # a file with no parent should fail..
393
 
        if not t.is_readonly():
394
 
            self.assertRaises(NoSuchFile,
395
 
                              t.append, 'missing/path', 
396
 
                              StringIO('content'))
397
 
 
398
 
    def test_append_file(self):
399
 
        t = self.get_transport()
400
 
 
401
 
        contents = [
402
 
            ('f1', StringIO('this is a string\nand some more stuff\n')),
403
 
            ('f2', StringIO('here is some text\nand a bit more\n')),
404
 
            ('f3', StringIO('some text for the\nthird file created\n')),
405
 
            ('f4', StringIO('this is a string\nand some more stuff\n')),
406
 
            ('f5', StringIO('here is some text\nand a bit more\n')),
407
 
            ('f6', StringIO('some text for the\nthird file created\n'))
408
 
        ]
409
 
        
410
 
        if t.is_readonly():
411
 
            for f, val in contents:
412
 
                open(f, 'wb').write(val.read())
413
 
        else:
414
 
            t.put_multi(contents)
415
 
 
416
 
        a1 = StringIO('appending to\none\n')
417
 
        if t.is_readonly():
418
 
            _append('f1', a1)
419
 
        else:
420
 
            t.append('f1', a1)
421
 
 
422
 
        del a1
423
 
 
424
 
        self.check_transport_contents(
425
 
                'this is a string\nand some more stuff\n'
426
 
                'appending to\none\n',
427
 
                t, 'f1')
428
 
 
429
 
        a2 = StringIO('adding more\ntext to two\n')
430
 
        a3 = StringIO('some garbage\nto put in three\n')
431
 
 
432
 
        if t.is_readonly():
433
 
            _append('f2', a2)
434
 
            _append('f3', a3)
435
 
        else:
436
 
            t.append_multi([('f2', a2), ('f3', a3)])
437
 
 
438
 
        del a2, a3
439
 
 
440
 
        self.check_transport_contents(
441
 
                'here is some text\nand a bit more\n'
442
 
                'adding more\ntext to two\n',
443
 
                t, 'f2')
444
 
        self.check_transport_contents( 
445
 
                'some text for the\nthird file created\n'
446
 
                'some garbage\nto put in three\n',
447
 
                t, 'f3')
448
 
 
449
 
        # Test that an actual file object can be used with put
450
 
        a4 = t.get('f1')
451
 
        if t.is_readonly():
452
 
            _append('f4', a4)
453
 
        else:
454
 
            t.append('f4', a4)
455
 
 
456
 
        del a4
457
 
 
458
 
        self.check_transport_contents(
459
 
                'this is a string\nand some more stuff\n'
460
 
                'this is a string\nand some more stuff\n'
461
 
                'appending to\none\n',
462
 
                t, 'f4')
463
 
 
464
 
        a5 = t.get('f2')
465
 
        a6 = t.get('f3')
466
 
        if t.is_readonly():
467
 
            _append('f5', a5)
468
 
            _append('f6', a6)
469
 
        else:
470
 
            t.append_multi([('f5', a5), ('f6', a6)])
471
 
 
472
 
        del a5, a6
473
 
 
474
 
        self.check_transport_contents(
475
 
                'here is some text\nand a bit more\n'
476
 
                'here is some text\nand a bit more\n'
477
 
                'adding more\ntext to two\n',
478
 
                t, 'f5')
479
 
        self.check_transport_contents(
480
 
                'some text for the\nthird file created\n'
481
 
                'some text for the\nthird file created\n'
482
 
                'some garbage\nto put in three\n',
483
 
                t, 'f6')
484
 
 
485
 
        a5 = t.get('f2')
486
 
        a6 = t.get('f2')
487
 
        a7 = t.get('f3')
488
 
        if t.is_readonly():
489
 
            _append('c', a5)
490
 
            _append('a', a6)
491
 
            _append('d', a7)
492
 
        else:
493
 
            t.append('c', a5)
494
 
            t.append_multi([('a', a6), ('d', a7)])
495
 
        del a5, a6, a7
496
 
        self.check_transport_contents(t.get('f2').read(), t, 'c')
497
 
        self.check_transport_contents(t.get('f3').read(), t, 'd')
 
796
 
 
797
    def test_append_file_mode(self):
 
798
        """Check that append accepts a mode parameter"""
 
799
        # check append accepts a mode
 
800
        t = self.get_transport()
 
801
        if t.is_readonly():
 
802
            self.assertRaises(TransportNotPossible,
 
803
                t.append_file, 'f', StringIO('f'), mode=None)
 
804
            return
 
805
        t.append_file('f', StringIO('f'), mode=None)
 
806
 
 
807
    def test_append_bytes_mode(self):
 
808
        # check append_bytes accepts a mode
 
809
        t = self.get_transport()
 
810
        if t.is_readonly():
 
811
            self.assertRaises(TransportNotPossible,
 
812
                t.append_bytes, 'f', 'f', mode=None)
 
813
            return
 
814
        t.append_bytes('f', 'f', mode=None)
498
815
 
499
816
    def test_delete(self):
500
817
        # TODO: Test Transport.delete
505
822
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
506
823
            return
507
824
 
508
 
        t.put('a', StringIO('a little bit of text\n'))
 
825
        t.put_bytes('a', 'a little bit of text\n')
509
826
        self.failUnless(t.has('a'))
510
827
        t.delete('a')
511
828
        self.failIf(t.has('a'))
512
829
 
513
830
        self.assertRaises(NoSuchFile, t.delete, 'a')
514
831
 
515
 
        t.put('a', StringIO('a text\n'))
516
 
        t.put('b', StringIO('b text\n'))
517
 
        t.put('c', StringIO('c text\n'))
 
832
        t.put_bytes('a', 'a text\n')
 
833
        t.put_bytes('b', 'b text\n')
 
834
        t.put_bytes('c', 'c text\n')
518
835
        self.assertEqual([True, True, True],
519
836
                list(t.has_multi(['a', 'b', 'c'])))
520
837
        t.delete_multi(['a', 'c'])
530
847
        self.assertRaises(NoSuchFile,
531
848
                t.delete_multi, iter(['a', 'b', 'c']))
532
849
 
533
 
        t.put('a', StringIO('another a text\n'))
534
 
        t.put('c', StringIO('another c text\n'))
 
850
        t.put_bytes('a', 'another a text\n')
 
851
        t.put_bytes('c', 'another c text\n')
535
852
        t.delete_multi(iter(['a', 'b', 'c']))
536
853
 
537
854
        # We should have deleted everything
540
857
        # plain "listdir".
541
858
        # self.assertEqual([], os.listdir('.'))
542
859
 
 
860
    def test_recommended_page_size(self):
 
861
        """Transports recommend a page size for partial access to files."""
 
862
        t = self.get_transport()
 
863
        self.assertIsInstance(t.recommended_page_size(), int)
 
864
 
543
865
    def test_rmdir(self):
544
866
        t = self.get_transport()
545
867
        # Not much to do with a readonly transport
549
871
        t.mkdir('adir')
550
872
        t.mkdir('adir/bdir')
551
873
        t.rmdir('adir/bdir')
552
 
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
874
        # ftp may not be able to raise NoSuchFile for lack of
 
875
        # details when failing
 
876
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
553
877
        t.rmdir('adir')
554
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
878
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
555
879
 
556
880
    def test_rmdir_not_empty(self):
557
881
        """Deleting a non-empty directory raises an exception
558
 
        
 
882
 
559
883
        sftp (and possibly others) don't give us a specific "directory not
560
884
        empty" exception -- we can just see that the operation failed.
561
885
        """
566
890
        t.mkdir('adir/bdir')
567
891
        self.assertRaises(PathError, t.rmdir, 'adir')
568
892
 
 
893
    def test_rmdir_empty_but_similar_prefix(self):
 
894
        """rmdir does not get confused by sibling paths.
 
895
 
 
896
        A naive implementation of MemoryTransport would refuse to rmdir
 
897
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
898
        uses "path.startswith(dir)" on all file paths to determine if directory
 
899
        is empty.
 
900
        """
 
901
        t = self.get_transport()
 
902
        if t.is_readonly():
 
903
            return
 
904
        t.mkdir('foo')
 
905
        t.put_bytes('foo-bar', '')
 
906
        t.mkdir('foo-baz')
 
907
        t.rmdir('foo')
 
908
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
909
        self.failUnless(t.has('foo-bar'))
 
910
 
569
911
    def test_rename_dir_succeeds(self):
570
912
        t = self.get_transport()
571
913
        if t.is_readonly():
585
927
        t.mkdir('adir/asubdir')
586
928
        t.mkdir('bdir')
587
929
        t.mkdir('bdir/bsubdir')
 
930
        # any kind of PathError would be OK, though we normally expect
 
931
        # DirectoryNotEmpty
588
932
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
589
933
        # nothing was changed so it should still be as before
590
934
        self.assertTrue(t.has('bdir/bsubdir'))
591
935
        self.assertFalse(t.has('adir/bdir'))
592
936
        self.assertFalse(t.has('adir/bsubdir'))
593
937
 
 
938
    def test_rename_across_subdirs(self):
 
939
        t = self.get_transport()
 
940
        if t.is_readonly():
 
941
            raise TestNotApplicable("transport is readonly")
 
942
        t.mkdir('a')
 
943
        t.mkdir('b')
 
944
        ta = t.clone('a')
 
945
        tb = t.clone('b')
 
946
        ta.put_bytes('f', 'aoeu')
 
947
        ta.rename('f', '../b/f')
 
948
        self.assertTrue(tb.has('f'))
 
949
        self.assertFalse(ta.has('f'))
 
950
        self.assertTrue(t.has('b/f'))
 
951
 
594
952
    def test_delete_tree(self):
595
953
        t = self.get_transport()
596
954
 
606
964
        except TransportNotPossible:
607
965
            # ok, this transport does not support delete_tree
608
966
            return
609
 
        
 
967
 
610
968
        # did it delete that trivial case?
611
969
        self.assertRaises(NoSuchFile, t.stat, 'adir')
612
970
 
613
971
        self.build_tree(['adir/',
614
 
                         'adir/file', 
615
 
                         'adir/subdir/', 
616
 
                         'adir/subdir/file', 
 
972
                         'adir/file',
 
973
                         'adir/subdir/',
 
974
                         'adir/subdir/file',
617
975
                         'adir/subdir2/',
618
976
                         'adir/subdir2/file',
619
977
                         ], transport=t)
633
991
        # creates control files in the working directory
634
992
        # perhaps all of this could be done in a subdirectory
635
993
 
636
 
        t.put('a', StringIO('a first file\n'))
 
994
        t.put_bytes('a', 'a first file\n')
637
995
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
638
996
 
639
997
        t.move('a', 'b')
644
1002
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
645
1003
 
646
1004
        # Overwrite a file
647
 
        t.put('c', StringIO('c this file\n'))
 
1005
        t.put_bytes('c', 'c this file\n')
648
1006
        t.move('c', 'b')
649
1007
        self.failIf(t.has('c'))
650
1008
        self.check_transport_contents('c this file\n', t, 'b')
651
1009
 
652
1010
        # TODO: Try to write a test for atomicity
653
 
        # TODO: Test moving into a non-existant subdirectory
 
1011
        # TODO: Test moving into a non-existent subdirectory
654
1012
        # TODO: Test Transport.move_multi
655
1013
 
656
1014
    def test_copy(self):
659
1017
        if t.is_readonly():
660
1018
            return
661
1019
 
662
 
        t.put('a', StringIO('a file\n'))
 
1020
        t.put_bytes('a', 'a file\n')
663
1021
        t.copy('a', 'b')
664
1022
        self.check_transport_contents('a file\n', t, 'b')
665
1023
 
668
1026
        # What should the assert be if you try to copy a
669
1027
        # file over a directory?
670
1028
        #self.assertRaises(Something, t.copy, 'a', 'c')
671
 
        t.put('d', StringIO('text in d\n'))
 
1029
        t.put_bytes('d', 'text in d\n')
672
1030
        t.copy('d', 'b')
673
1031
        self.check_transport_contents('text in d\n', t, 'b')
674
1032
 
675
1033
        # TODO: test copy_multi
676
1034
 
677
1035
    def test_connection_error(self):
678
 
        """ConnectionError is raised when connection is impossible"""
 
1036
        """ConnectionError is raised when connection is impossible.
 
1037
 
 
1038
        The error should be raised from the first operation on the transport.
 
1039
        """
679
1040
        try:
680
1041
            url = self._server.get_bogus_url()
681
1042
        except NotImplementedError:
682
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
683
1044
                              self._server.__class__)
684
 
        t = bzrlib.transport.get_transport(url)
685
 
        try:
686
 
            t.get('.bzr/branch')
687
 
        except (ConnectionError, NoSuchFile), e:
688
 
            pass
689
 
        except (Exception), e:
690
 
            self.failIf(True, 'Wrong exception thrown: %s' % e)
691
 
        else:
692
 
            self.failIf(True, 'Did not get the expected exception.')
 
1045
        t = get_transport(url)
 
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
693
1047
 
694
1048
    def test_stat(self):
695
1049
        # TODO: Test stat, just try once, and if it throws, stop testing
704
1058
            return
705
1059
 
706
1060
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
707
 
        sizes = [14, 0, 16, 0, 18] 
708
 
        self.build_tree(paths, transport=t)
 
1061
        sizes = [14, 0, 16, 0, 18]
 
1062
        self.build_tree(paths, transport=t, line_endings='binary')
709
1063
 
710
1064
        for path, size in zip(paths, sizes):
711
1065
            st = t.stat(path)
732
1086
    def test_list_dir(self):
733
1087
        # TODO: Test list_dir, just try once, and if it throws, stop testing
734
1088
        t = self.get_transport()
735
 
        
 
1089
 
736
1090
        if not t.listable():
737
1091
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
738
1092
            return
739
1093
 
740
 
        def sorted_list(d):
741
 
            l = list(t.list_dir(d))
 
1094
        def sorted_list(d, transport):
 
1095
            l = list(transport.list_dir(d))
742
1096
            l.sort()
743
1097
            return l
744
1098
 
745
 
        # SftpServer creates control files in the working directory
746
 
        # so lets move down a directory to avoid those.
747
 
        if not t.is_readonly():
748
 
            t.mkdir('wd')
749
 
        else:
750
 
            os.mkdir('wd')
751
 
        t = t.clone('wd')
752
 
 
753
 
        self.assertEqual([], sorted_list(u'.'))
 
1099
        self.assertEqual([], sorted_list('.', t))
754
1100
        # c2 is precisely one letter longer than c here to test that
755
1101
        # suffixing is not confused.
 
1102
        # a%25b checks that quoting is done consistently across transports
 
1103
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1104
 
756
1105
        if not t.is_readonly():
757
 
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
1106
            self.build_tree(tree_names, transport=t)
758
1107
        else:
759
 
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
760
 
 
761
 
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
762
 
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
1108
            self.build_tree(tree_names)
 
1109
 
 
1110
        self.assertEqual(
 
1111
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1112
        self.assertEqual(
 
1113
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1114
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1115
 
 
1116
        # Cloning the transport produces an equivalent listing
 
1117
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
763
1118
 
764
1119
        if not t.is_readonly():
765
1120
            t.delete('c/d')
766
1121
            t.delete('b')
767
1122
        else:
768
 
            os.unlink('wd/c/d')
769
 
            os.unlink('wd/b')
770
 
            
771
 
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
772
 
        self.assertEqual([u'e'], sorted_list(u'c'))
773
 
 
774
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
775
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
776
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
1123
            os.unlink('c/d')
 
1124
            os.unlink('b')
 
1125
 
 
1126
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1127
        self.assertEqual(['e'], sorted_list('c', t))
 
1128
 
 
1129
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1130
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1131
        # 'a' is a file, list_dir should raise an error
 
1132
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1133
 
 
1134
    def test_list_dir_result_is_url_escaped(self):
 
1135
        t = self.get_transport()
 
1136
        if not t.listable():
 
1137
            raise TestSkipped("transport not listable")
 
1138
 
 
1139
        if not t.is_readonly():
 
1140
            self.build_tree(['a/', 'a/%'], transport=t)
 
1141
        else:
 
1142
            self.build_tree(['a/', 'a/%'])
 
1143
 
 
1144
        names = list(t.list_dir('a'))
 
1145
        self.assertEqual(['%25'], names)
 
1146
        self.assertIsInstance(names[0], str)
 
1147
 
 
1148
    def test_clone_preserve_info(self):
 
1149
        t1 = self.get_transport()
 
1150
        if not isinstance(t1, ConnectedTransport):
 
1151
            raise TestSkipped("not a connected transport")
 
1152
 
 
1153
        t2 = t1.clone('subdir')
 
1154
        self.assertEquals(t1._scheme, t2._scheme)
 
1155
        self.assertEquals(t1._user, t2._user)
 
1156
        self.assertEquals(t1._password, t2._password)
 
1157
        self.assertEquals(t1._host, t2._host)
 
1158
        self.assertEquals(t1._port, t2._port)
 
1159
 
 
1160
    def test__reuse_for(self):
 
1161
        t = self.get_transport()
 
1162
        if not isinstance(t, ConnectedTransport):
 
1163
            raise TestSkipped("not a connected transport")
 
1164
 
 
1165
        def new_url(scheme=None, user=None, password=None,
 
1166
                    host=None, port=None, path=None):
 
1167
            """Build a new url from t.base changing only parts of it.
 
1168
 
 
1169
            Only the parameters different from None will be changed.
 
1170
            """
 
1171
            if scheme   is None: scheme   = t._scheme
 
1172
            if user     is None: user     = t._user
 
1173
            if password is None: password = t._password
 
1174
            if user     is None: user     = t._user
 
1175
            if host     is None: host     = t._host
 
1176
            if port     is None: port     = t._port
 
1177
            if path     is None: path     = t._path
 
1178
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1179
 
 
1180
        if t._scheme == 'ftp':
 
1181
            scheme = 'sftp'
 
1182
        else:
 
1183
            scheme = 'ftp'
 
1184
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1185
        if t._user == 'me':
 
1186
            user = 'you'
 
1187
        else:
 
1188
            user = 'me'
 
1189
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1190
        # passwords are not taken into account because:
 
1191
        # - it makes no sense to have two different valid passwords for the
 
1192
        #   same user
 
1193
        # - _password in ConnectedTransport is intended to collect what the
 
1194
        #   user specified from the command-line and there are cases where the
 
1195
        #   new url can contain no password (if the url was built from an
 
1196
        #   existing transport.base for example)
 
1197
        # - password are considered part of the credentials provided at
 
1198
        #   connection creation time and as such may not be present in the url
 
1199
        #   (they may be typed by the user when prompted for example)
 
1200
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1201
        # We will not connect, we can use a invalid host
 
1202
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1203
        if t._port == 1234:
 
1204
            port = 4321
 
1205
        else:
 
1206
            port = 1234
 
1207
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1208
        # No point in trying to reuse a transport for a local URL
 
1209
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1210
 
 
1211
    def test_connection_sharing(self):
 
1212
        t = self.get_transport()
 
1213
        if not isinstance(t, ConnectedTransport):
 
1214
            raise TestSkipped("not a connected transport")
 
1215
 
 
1216
        c = t.clone('subdir')
 
1217
        # Some transports will create the connection  only when needed
 
1218
        t.has('surely_not') # Force connection
 
1219
        self.assertIs(t._get_connection(), c._get_connection())
 
1220
 
 
1221
        # Temporary failure, we need to create a new dummy connection
 
1222
        new_connection = object()
 
1223
        t._set_connection(new_connection)
 
1224
        # Check that both transports use the same connection
 
1225
        self.assertIs(new_connection, t._get_connection())
 
1226
        self.assertIs(new_connection, c._get_connection())
 
1227
 
 
1228
    def test_reuse_connection_for_various_paths(self):
 
1229
        t = self.get_transport()
 
1230
        if not isinstance(t, ConnectedTransport):
 
1231
            raise TestSkipped("not a connected transport")
 
1232
 
 
1233
        t.has('surely_not') # Force connection
 
1234
        self.assertIsNot(None, t._get_connection())
 
1235
 
 
1236
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1237
        self.assertIsNot(t, subdir)
 
1238
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1239
 
 
1240
        home = subdir._reuse_for(t.base + 'home')
 
1241
        self.assertIs(t._get_connection(), home._get_connection())
 
1242
        self.assertIs(subdir._get_connection(), home._get_connection())
777
1243
 
778
1244
    def test_clone(self):
779
1245
        # TODO: Test that clone moves up and down the filesystem
800
1266
        self.failIf(t3.has('b/d'))
801
1267
 
802
1268
        if t1.is_readonly():
803
 
            open('b/d', 'wb').write('newfile\n')
 
1269
            self.build_tree_contents([('b/d', 'newfile\n')])
804
1270
        else:
805
 
            t2.put('d', StringIO('newfile\n'))
 
1271
            t2.put_bytes('d', 'newfile\n')
806
1272
 
807
1273
        self.failUnless(t1.has('b/d'))
808
1274
        self.failUnless(t2.has('d'))
809
1275
        self.failUnless(t3.has('b/d'))
810
1276
 
 
1277
    def test_clone_to_root(self):
 
1278
        orig_transport = self.get_transport()
 
1279
        # Repeatedly go up to a parent directory until we're at the root
 
1280
        # directory of this transport
 
1281
        root_transport = orig_transport
 
1282
        new_transport = root_transport.clone("..")
 
1283
        # as we are walking up directories, the path must be
 
1284
        # growing less, except at the top
 
1285
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1286
            or new_transport.base == root_transport.base)
 
1287
        while new_transport.base != root_transport.base:
 
1288
            root_transport = new_transport
 
1289
            new_transport = root_transport.clone("..")
 
1290
            # as we are walking up directories, the path must be
 
1291
            # growing less, except at the top
 
1292
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1293
                or new_transport.base == root_transport.base)
 
1294
 
 
1295
        # Cloning to "/" should take us to exactly the same location.
 
1296
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1297
        # the abspath of "/" from the original transport should be the same
 
1298
        # as the base at the root:
 
1299
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1300
 
 
1301
        # At the root, the URL must still end with / as its a directory
 
1302
        self.assertEqual(root_transport.base[-1], '/')
 
1303
 
 
1304
    def test_clone_from_root(self):
 
1305
        """At the root, cloning to a simple dir should just do string append."""
 
1306
        orig_transport = self.get_transport()
 
1307
        root_transport = orig_transport.clone('/')
 
1308
        self.assertEqual(root_transport.base + '.bzr/',
 
1309
            root_transport.clone('.bzr').base)
 
1310
 
 
1311
    def test_base_url(self):
 
1312
        t = self.get_transport()
 
1313
        self.assertEqual('/', t.base[-1])
 
1314
 
811
1315
    def test_relpath(self):
812
1316
        t = self.get_transport()
813
1317
        self.assertEqual('', t.relpath(t.base))
814
1318
        # base ends with /
815
1319
        self.assertEqual('', t.relpath(t.base[:-1]))
816
 
        # subdirs which dont exist should still give relpaths.
 
1320
        # subdirs which don't exist should still give relpaths.
817
1321
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
818
1322
        # trailing slash should be the same.
819
1323
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
820
1324
 
 
1325
    def test_relpath_at_root(self):
 
1326
        t = self.get_transport()
 
1327
        # clone all the way to the top
 
1328
        new_transport = t.clone('..')
 
1329
        while new_transport.base != t.base:
 
1330
            t = new_transport
 
1331
            new_transport = t.clone('..')
 
1332
        # we must be able to get a relpath below the root
 
1333
        self.assertEqual('', t.relpath(t.base))
 
1334
        # and a deeper one should work too
 
1335
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1336
 
821
1337
    def test_abspath(self):
822
1338
        # smoke test for abspath. Corner cases for backends like unix fs's
823
1339
        # that have aliasing problems like symlinks should go in backend
824
1340
        # specific test cases.
825
1341
        transport = self.get_transport()
 
1342
 
826
1343
        self.assertEqual(transport.base + 'relpath',
827
1344
                         transport.abspath('relpath'))
828
1345
 
 
1346
        # This should work without raising an error.
 
1347
        transport.abspath("/")
 
1348
 
 
1349
        # the abspath of "/" and "/foo/.." should result in the same location
 
1350
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1351
 
 
1352
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1353
                         transport.abspath("/foo"))
 
1354
 
 
1355
    def test_win32_abspath(self):
 
1356
        # Note: we tried to set sys.platform='win32' so we could test on
 
1357
        # other platforms too, but then osutils does platform specific
 
1358
        # things at import time which defeated us...
 
1359
        if sys.platform != 'win32':
 
1360
            raise TestSkipped(
 
1361
                'Testing drive letters in abspath implemented only for win32')
 
1362
 
 
1363
        # smoke test for abspath on win32.
 
1364
        # a transport based on 'file:///' never fully qualifies the drive.
 
1365
        transport = get_transport("file:///")
 
1366
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1367
 
 
1368
        # but a transport that starts with a drive spec must keep it.
 
1369
        transport = get_transport("file:///C:/")
 
1370
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1371
 
 
1372
    def test_local_abspath(self):
 
1373
        transport = self.get_transport()
 
1374
        try:
 
1375
            p = transport.local_abspath('.')
 
1376
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1377
            # should be formattable
 
1378
            s = str(e)
 
1379
        else:
 
1380
            self.assertEqual(getcwd(), p)
 
1381
 
 
1382
    def test_abspath_at_root(self):
 
1383
        t = self.get_transport()
 
1384
        # clone all the way to the top
 
1385
        new_transport = t.clone('..')
 
1386
        while new_transport.base != t.base:
 
1387
            t = new_transport
 
1388
            new_transport = t.clone('..')
 
1389
        # we must be able to get a abspath of the root when we ask for
 
1390
        # t.abspath('..') - this due to our choice that clone('..')
 
1391
        # should return the root from the root, combined with the desire that
 
1392
        # the url from clone('..') and from abspath('..') should be the same.
 
1393
        self.assertEqual(t.base, t.abspath('..'))
 
1394
        # '' should give us the root
 
1395
        self.assertEqual(t.base, t.abspath(''))
 
1396
        # and a path should append to the url
 
1397
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1398
 
829
1399
    def test_iter_files_recursive(self):
830
1400
        transport = self.get_transport()
831
1401
        if not transport.listable():
836
1406
                         'isolated/dir/',
837
1407
                         'isolated/dir/foo',
838
1408
                         'isolated/dir/bar',
 
1409
                         'isolated/dir/b%25z', # make sure quoting is correct
839
1410
                         'isolated/bar'],
840
1411
                        transport=transport)
841
1412
        paths = set(transport.iter_files_recursive())
843
1414
        self.assertEqual(paths,
844
1415
                    set(['isolated/dir/foo',
845
1416
                         'isolated/dir/bar',
 
1417
                         'isolated/dir/b%2525z',
846
1418
                         'isolated/bar']))
847
1419
        sub_transport = transport.clone('isolated')
848
1420
        paths = set(sub_transport.iter_files_recursive())
849
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
1421
        self.assertEqual(paths,
 
1422
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1423
 
 
1424
    def test_copy_tree(self):
 
1425
        # TODO: test file contents and permissions are preserved. This test was
 
1426
        # added just to ensure that quoting was handled correctly.
 
1427
        # -- David Allouche 2006-08-11
 
1428
        transport = self.get_transport()
 
1429
        if not transport.listable():
 
1430
            self.assertRaises(TransportNotPossible,
 
1431
                              transport.iter_files_recursive)
 
1432
            return
 
1433
        if transport.is_readonly():
 
1434
            return
 
1435
        self.build_tree(['from/',
 
1436
                         'from/dir/',
 
1437
                         'from/dir/foo',
 
1438
                         'from/dir/bar',
 
1439
                         'from/dir/b%25z', # make sure quoting is correct
 
1440
                         'from/bar'],
 
1441
                        transport=transport)
 
1442
        transport.copy_tree('from', 'to')
 
1443
        paths = set(transport.iter_files_recursive())
 
1444
        self.assertEqual(paths,
 
1445
                    set(['from/dir/foo',
 
1446
                         'from/dir/bar',
 
1447
                         'from/dir/b%2525z',
 
1448
                         'from/bar',
 
1449
                         'to/dir/foo',
 
1450
                         'to/dir/bar',
 
1451
                         'to/dir/b%2525z',
 
1452
                         'to/bar',]))
 
1453
 
 
1454
    def test_copy_tree_to_transport(self):
 
1455
        transport = self.get_transport()
 
1456
        if not transport.listable():
 
1457
            self.assertRaises(TransportNotPossible,
 
1458
                              transport.iter_files_recursive)
 
1459
            return
 
1460
        if transport.is_readonly():
 
1461
            return
 
1462
        self.build_tree(['from/',
 
1463
                         'from/dir/',
 
1464
                         'from/dir/foo',
 
1465
                         'from/dir/bar',
 
1466
                         'from/dir/b%25z', # make sure quoting is correct
 
1467
                         'from/bar'],
 
1468
                        transport=transport)
 
1469
        from_transport = transport.clone('from')
 
1470
        to_transport = transport.clone('to')
 
1471
        to_transport.ensure_base()
 
1472
        from_transport.copy_tree_to_transport(to_transport)
 
1473
        paths = set(transport.iter_files_recursive())
 
1474
        self.assertEqual(paths,
 
1475
                    set(['from/dir/foo',
 
1476
                         'from/dir/bar',
 
1477
                         'from/dir/b%2525z',
 
1478
                         'from/bar',
 
1479
                         'to/dir/foo',
 
1480
                         'to/dir/bar',
 
1481
                         'to/dir/b%2525z',
 
1482
                         'to/bar',]))
 
1483
 
 
1484
    def test_unicode_paths(self):
 
1485
        """Test that we can read/write files with Unicode names."""
 
1486
        t = self.get_transport()
 
1487
 
 
1488
        # With FAT32 and certain encodings on win32
 
1489
        # '\xe5' and '\xe4' actually map to the same file
 
1490
        # adding a suffix kicks in the 'preserving but insensitive'
 
1491
        # route, and maintains the right files
 
1492
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1493
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1494
                 u'\u017d', # Z with umlat iso-8859-2
 
1495
                 u'\u062c', # Arabic j
 
1496
                 u'\u0410', # Russian A
 
1497
                 u'\u65e5', # Kanji person
 
1498
                ]
 
1499
 
 
1500
        try:
 
1501
            self.build_tree(files, transport=t, line_endings='binary')
 
1502
        except UnicodeError:
 
1503
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1504
 
 
1505
        # A plain unicode string is not a valid url
 
1506
        for fname in files:
 
1507
            self.assertRaises(InvalidURL, t.get, fname)
 
1508
 
 
1509
        for fname in files:
 
1510
            fname_utf8 = fname.encode('utf-8')
 
1511
            contents = 'contents of %s\n' % (fname_utf8,)
 
1512
            self.check_transport_contents(contents, t, urlutils.escape(fname))
850
1513
 
851
1514
    def test_connect_twice_is_same_content(self):
852
 
        # check that our server (whatever it is) is accessable reliably
 
1515
        # check that our server (whatever it is) is accessible reliably
853
1516
        # via get_transport and multiple connections share content.
854
1517
        transport = self.get_transport()
855
1518
        if transport.is_readonly():
856
1519
            return
857
 
        transport.put('foo', StringIO('bar'))
858
 
        transport2 = self.get_transport()
859
 
        self.check_transport_contents('bar', transport2, 'foo')
860
 
        # its base should be usable.
861
 
        transport2 = bzrlib.transport.get_transport(transport.base)
862
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1520
        transport.put_bytes('foo', 'bar')
 
1521
        transport3 = self.get_transport()
 
1522
        self.check_transport_contents('bar', transport3, 'foo')
863
1523
 
864
1524
        # now opening at a relative url should give use a sane result:
865
1525
        transport.mkdir('newdir')
866
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
867
 
        transport2 = transport2.clone('..')
868
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1526
        transport5 = self.get_transport('newdir')
 
1527
        transport6 = transport5.clone('..')
 
1528
        self.check_transport_contents('bar', transport6, 'foo')
869
1529
 
870
1530
    def test_lock_write(self):
 
1531
        """Test transport-level write locks.
 
1532
 
 
1533
        These are deprecated and transports may decline to support them.
 
1534
        """
871
1535
        transport = self.get_transport()
872
1536
        if transport.is_readonly():
873
1537
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
874
1538
            return
875
 
        transport.put('lock', StringIO())
876
 
        lock = transport.lock_write('lock')
 
1539
        transport.put_bytes('lock', '')
 
1540
        try:
 
1541
            lock = transport.lock_write('lock')
 
1542
        except TransportNotPossible:
 
1543
            return
877
1544
        # TODO make this consistent on all platforms:
878
1545
        # self.assertRaises(LockError, transport.lock_write, 'lock')
879
1546
        lock.unlock()
880
1547
 
881
1548
    def test_lock_read(self):
 
1549
        """Test transport-level read locks.
 
1550
 
 
1551
        These are deprecated and transports may decline to support them.
 
1552
        """
882
1553
        transport = self.get_transport()
883
1554
        if transport.is_readonly():
884
1555
            file('lock', 'w').close()
885
1556
        else:
886
 
            transport.put('lock', StringIO())
887
 
        lock = transport.lock_read('lock')
 
1557
            transport.put_bytes('lock', '')
 
1558
        try:
 
1559
            lock = transport.lock_read('lock')
 
1560
        except TransportNotPossible:
 
1561
            return
888
1562
        # TODO make this consistent on all platforms:
889
1563
        # self.assertRaises(LockError, transport.lock_read, 'lock')
890
1564
        lock.unlock()
 
1565
 
 
1566
    def test_readv(self):
 
1567
        transport = self.get_transport()
 
1568
        if transport.is_readonly():
 
1569
            file('a', 'w').write('0123456789')
 
1570
        else:
 
1571
            transport.put_bytes('a', '0123456789')
 
1572
 
 
1573
        d = list(transport.readv('a', ((0, 1),)))
 
1574
        self.assertEqual(d[0], (0, '0'))
 
1575
 
 
1576
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1577
        self.assertEqual(d[0], (0, '0'))
 
1578
        self.assertEqual(d[1], (1, '1'))
 
1579
        self.assertEqual(d[2], (3, '34'))
 
1580
        self.assertEqual(d[3], (9, '9'))
 
1581
 
 
1582
    def test_readv_out_of_order(self):
 
1583
        transport = self.get_transport()
 
1584
        if transport.is_readonly():
 
1585
            file('a', 'w').write('0123456789')
 
1586
        else:
 
1587
            transport.put_bytes('a', '01234567890')
 
1588
 
 
1589
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1590
        self.assertEqual(d[0], (1, '1'))
 
1591
        self.assertEqual(d[1], (9, '9'))
 
1592
        self.assertEqual(d[2], (0, '0'))
 
1593
        self.assertEqual(d[3], (3, '34'))
 
1594
 
 
1595
    def test_readv_with_adjust_for_latency(self):
 
1596
        transport = self.get_transport()
 
1597
        # the adjust for latency flag expands the data region returned
 
1598
        # according to a per-transport heuristic, so testing is a little
 
1599
        # tricky as we need more data than the largest combining that our
 
1600
        # transports do. To accomodate this we generate random data and cross
 
1601
        # reference the returned data with the random data. To avoid doing
 
1602
        # multiple large random byte look ups we do several tests on the same
 
1603
        # backing data.
 
1604
        content = osutils.rand_bytes(200*1024)
 
1605
        content_size = len(content)
 
1606
        if transport.is_readonly():
 
1607
            self.build_tree_contents([('a', content)])
 
1608
        else:
 
1609
            transport.put_bytes('a', content)
 
1610
        def check_result_data(result_vector):
 
1611
            for item in result_vector:
 
1612
                data_len = len(item[1])
 
1613
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1614
 
 
1615
        # start corner case
 
1616
        result = list(transport.readv('a', ((0, 30),),
 
1617
            adjust_for_latency=True, upper_limit=content_size))
 
1618
        # we expect 1 result, from 0, to something > 30
 
1619
        self.assertEqual(1, len(result))
 
1620
        self.assertEqual(0, result[0][0])
 
1621
        self.assertTrue(len(result[0][1]) >= 30)
 
1622
        check_result_data(result)
 
1623
        # end of file corner case
 
1624
        result = list(transport.readv('a', ((204700, 100),),
 
1625
            adjust_for_latency=True, upper_limit=content_size))
 
1626
        # we expect 1 result, from 204800- its length, to the end
 
1627
        self.assertEqual(1, len(result))
 
1628
        data_len = len(result[0][1])
 
1629
        self.assertEqual(204800-data_len, result[0][0])
 
1630
        self.assertTrue(data_len >= 100)
 
1631
        check_result_data(result)
 
1632
        # out of order ranges are made in order
 
1633
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1634
            adjust_for_latency=True, upper_limit=content_size))
 
1635
        # we expect 2 results, in order, start and end.
 
1636
        self.assertEqual(2, len(result))
 
1637
        # start
 
1638
        data_len = len(result[0][1])
 
1639
        self.assertEqual(0, result[0][0])
 
1640
        self.assertTrue(data_len >= 30)
 
1641
        # end
 
1642
        data_len = len(result[1][1])
 
1643
        self.assertEqual(204800-data_len, result[1][0])
 
1644
        self.assertTrue(data_len >= 100)
 
1645
        check_result_data(result)
 
1646
        # close ranges get combined (even if out of order)
 
1647
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1648
            result = list(transport.readv('a', request_vector,
 
1649
                adjust_for_latency=True, upper_limit=content_size))
 
1650
            self.assertEqual(1, len(result))
 
1651
            data_len = len(result[0][1])
 
1652
            # minimum length is from 400 to 1034 - 634
 
1653
            self.assertTrue(data_len >= 634)
 
1654
            # must contain the region 400 to 1034
 
1655
            self.assertTrue(result[0][0] <= 400)
 
1656
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1657
            check_result_data(result)
 
1658
 
 
1659
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1660
        transport = self.get_transport()
 
1661
        # test from observed failure case.
 
1662
        if transport.is_readonly():
 
1663
            file('a', 'w').write('a'*1024*1024)
 
1664
        else:
 
1665
            transport.put_bytes('a', 'a'*1024*1024)
 
1666
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1667
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1668
            (465373, 800), (947422, 800)]
 
1669
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1670
        found_items = [False]*9
 
1671
        for pos, (start, length) in enumerate(broken_vector):
 
1672
            # check the range is covered by the result
 
1673
            for offset, data in results:
 
1674
                if offset <= start and start + length <= offset + len(data):
 
1675
                    found_items[pos] = True
 
1676
        self.assertEqual([True]*9, found_items)
 
1677
 
 
1678
    def test_get_with_open_write_stream_sees_all_content(self):
 
1679
        t = self.get_transport()
 
1680
        if t.is_readonly():
 
1681
            return
 
1682
        handle = t.open_write_stream('foo')
 
1683
        try:
 
1684
            handle.write('bcd')
 
1685
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1686
        finally:
 
1687
            handle.close()
 
1688
 
 
1689
    def test_get_smart_medium(self):
 
1690
        """All transports must either give a smart medium, or know they can't.
 
1691
        """
 
1692
        transport = self.get_transport()
 
1693
        try:
 
1694
            client_medium = transport.get_smart_medium()
 
1695
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1696
        except errors.NoSmartMedium:
 
1697
            # as long as we got it we're fine
 
1698
            pass
 
1699
 
 
1700
    def test_readv_short_read(self):
 
1701
        transport = self.get_transport()
 
1702
        if transport.is_readonly():
 
1703
            file('a', 'w').write('0123456789')
 
1704
        else:
 
1705
            transport.put_bytes('a', '01234567890')
 
1706
 
 
1707
        # This is intentionally reading off the end of the file
 
1708
        # since we are sure that it cannot get there
 
1709
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1710
                               # Can be raised by paramiko
 
1711
                               AssertionError),
 
1712
                              transport.readv, 'a', [(1,1), (8,10)])
 
1713
 
 
1714
        # This is trying to seek past the end of the file, it should
 
1715
        # also raise a special error
 
1716
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1717
                              transport.readv, 'a', [(12,2)])