~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(vila) Calling super() instead of mentioning the base class in setUp avoid
 mistakes. (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
27
29
 
28
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
29
 
                           LockError,
 
30
from bzrlib import (
 
31
    errors,
 
32
    osutils,
 
33
    pyutils,
 
34
    tests,
 
35
    transport as _mod_transport,
 
36
    urlutils,
 
37
    )
 
38
from bzrlib.errors import (ConnectionError,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           NoSuchFile,
30
42
                           PathError,
31
 
                           TransportNotPossible, ConnectionError)
32
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
33
 
from bzrlib.transport import memory, urlescape
34
 
import bzrlib.transport
35
 
 
36
 
 
37
 
def _append(fn, txt):
38
 
    """Append the given text (file-like object) to the supplied filename."""
39
 
    f = open(fn, 'ab')
40
 
    try:
41
 
        f.write(txt.read())
42
 
    finally:
43
 
        f.close()
44
 
 
45
 
 
46
 
class TestTransportImplementation(TestCaseInTempDir):
47
 
    """Implementation verification for transports.
48
 
    
49
 
    To verify a transport we need a server factory, which is a callable
50
 
    that accepts no parameters and returns an implementation of
51
 
    bzrlib.transport.Server.
52
 
    
53
 
    That Server is then used to construct transport instances and test
54
 
    the transport via loopback activity.
55
 
 
56
 
    Currently this assumes that the Transport object is connected to the 
57
 
    current working directory.  So that whatever is done 
58
 
    through the transport, should show up in the working 
59
 
    directory, and vice-versa. This is a bug, because its possible to have
60
 
    URL schemes which provide access to something that may not be 
61
 
    result in storage on the local disk, i.e. due to file system limits, or 
62
 
    due to it being a database or some other non-filesystem tool.
63
 
 
64
 
    This also tests to make sure that the functions work with both
65
 
    generators and lists (assuming iter(list) is effectively a generator)
66
 
    """
67
 
    
 
43
                           TransportNotPossible,
 
44
                           )
 
45
from bzrlib.osutils import getcwd
 
46
from bzrlib.smart import medium
 
47
from bzrlib.tests import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from bzrlib.tests import test_server
 
53
from bzrlib.tests.test_transport import TestTransportImplementation
 
54
from bzrlib.transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from bzrlib.transport.memory import MemoryTransport
 
60
from bzrlib.transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent, e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(standard_tests, module, loader):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
 
97
 
 
98
 
 
99
class TransportTests(TestTransportImplementation):
 
100
 
68
101
    def setUp(self):
69
 
        super(TestTransportImplementation, self).setUp()
70
 
        self._server = self.transport_server()
71
 
        self._server.setUp()
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BZR_NO_SMART_VFS', None)
72
104
 
73
 
    def tearDown(self):
74
 
        super(TestTransportImplementation, self).tearDown()
75
 
        self._server.tearDown()
76
 
        
77
105
    def check_transport_contents(self, content, transport, relpath):
78
 
        """Check that transport.get(relpath).read() == content."""
79
 
        self.assertEqualDiff(content, transport.get(relpath).read())
80
 
 
81
 
    def get_transport(self):
82
 
        """Return a connected transport to the local directory."""
83
 
        base_url = self._server.get_url()
84
 
        t = bzrlib.transport.get_transport(base_url)
85
 
        if not isinstance(t, self.transport_class):
86
 
            # we want to make sure to construct one particular class, even if
87
 
            # there are several available implementations of this transport;
88
 
            # therefore construct it by hand rather than through the regular
89
 
            # get_transport method
90
 
            t = self.transport_class(base_url)
91
 
        return t
92
 
 
93
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
94
 
        """Fail unless excClass is raised when the iterator from func is used.
95
 
        
96
 
        Many transport functions can return generators this makes sure
97
 
        to wrap them in a list() call to make sure the whole generator
98
 
        is run, and that the proper exception is raised.
99
 
        """
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
100
144
        try:
101
 
            list(func(*args, **kwargs))
102
 
        except excClass:
103
 
            return
104
 
        else:
105
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
106
 
            else: excName = str(excClass)
107
 
            raise self.failureException, "%s not raised" % excName
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
108
148
 
109
149
    def test_has(self):
110
150
        t = self.get_transport()
113
153
        self.build_tree(files, transport=t)
114
154
        self.assertEqual(True, t.has('a'))
115
155
        self.assertEqual(False, t.has('c'))
116
 
        self.assertEqual(True, t.has(urlescape('%')))
117
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
118
 
                [True, True, False, False, True, False, True, False])
 
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
157
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
158
                                           'e', 'f', 'g', 'h'])),
 
159
                         [True, True, False, False,
 
160
                          True, False, True, False])
119
161
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
120
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
121
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
122
 
                [True, True, False, False, True, False, True, False])
 
162
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
163
                                           urlutils.escape('%%')]))
 
164
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
165
                                                'e', 'f', 'g', 'h']))),
 
166
                         [True, True, False, False,
 
167
                          True, False, True, False])
123
168
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
124
169
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
125
170
 
 
171
    def test_has_root_works(self):
 
172
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
173
            raise TestNotApplicable(
 
174
                "SmartTCPServer_for_testing intentionally does not allow "
 
175
                "access to /.")
 
176
        current_transport = self.get_transport()
 
177
        self.assertTrue(current_transport.has('/'))
 
178
        root = current_transport.clone('/')
 
179
        self.assertTrue(root.has(''))
 
180
 
126
181
    def test_get(self):
127
182
        t = self.get_transport()
128
183
 
135
190
        self.build_tree(files, transport=t, line_endings='binary')
136
191
        self.check_transport_contents('contents of a\n', t, 'a')
137
192
        content_f = t.get_multi(files)
138
 
        for content, f in zip(contents, content_f):
 
193
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
194
        # evaluate their inputs, the transport requests should be issued and
 
195
        # handled sequentially (we don't want to force transport to buffer).
 
196
        for content, f in itertools.izip(contents, content_f):
139
197
            self.assertEqual(content, f.read())
140
198
 
141
199
        content_f = t.get_multi(iter(files))
142
 
        for content, f in zip(contents, content_f):
 
200
        # Use itertools.izip() for the same reason
 
201
        for content, f in itertools.izip(contents, content_f):
143
202
            self.assertEqual(content, f.read())
144
203
 
 
204
    def test_get_unknown_file(self):
 
205
        t = self.get_transport()
 
206
        files = ['a', 'b']
 
207
        contents = ['contents of a\n',
 
208
                    'contents of b\n',
 
209
                    ]
 
210
        self.build_tree(files, transport=t, line_endings='binary')
145
211
        self.assertRaises(NoSuchFile, t.get, 'c')
146
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
147
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
148
 
 
149
 
    def test_put(self):
150
 
        t = self.get_transport()
151
 
 
152
 
        if t.is_readonly():
153
 
            self.assertRaises(TransportNotPossible,
154
 
                    t.put, 'a', 'some text for a\n')
155
 
            return
156
 
 
157
 
        t.put('a', StringIO('some text for a\n'))
158
 
        self.failUnless(t.has('a'))
159
 
        self.check_transport_contents('some text for a\n', t, 'a')
160
 
        # Make sure 'has' is updated
161
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
162
 
                [True, False, False, False, False])
163
 
        # Put also replaces contents
164
 
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
165
 
                                      ('d', StringIO('contents\nfor d\n'))]),
166
 
                         2)
167
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
168
 
                [True, False, False, True, False])
169
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
170
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
171
 
 
172
 
        self.assertEqual(
173
 
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
174
 
                              ('d', StringIO('another contents\nfor d\n'))])),
175
 
                        2)
176
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
177
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
178
 
 
179
 
        self.assertRaises(NoSuchFile,
180
 
                          t.put, 'path/doesnt/exist/c', 'contents')
181
 
 
182
 
    def test_put_permissions(self):
183
 
        t = self.get_transport()
184
 
 
185
 
        if t.is_readonly():
186
 
            return
187
 
        t.put('mode644', StringIO('test text\n'), mode=0644)
188
 
        self.assertTransportMode(t, 'mode644', 0644)
189
 
        t.put('mode666', StringIO('test text\n'), mode=0666)
190
 
        self.assertTransportMode(t, 'mode666', 0666)
191
 
        t.put('mode600', StringIO('test text\n'), mode=0600)
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
 
223
 
 
224
    def test_get_directory_read_gives_ReadError(self):
 
225
        """consistent errors for read() on a file returned by get()."""
 
226
        t = self.get_transport()
 
227
        if t.is_readonly():
 
228
            self.build_tree(['a directory/'])
 
229
        else:
 
230
            t.mkdir('a%20directory')
 
231
        # getting the file must either work or fail with a PathError
 
232
        try:
 
233
            a_file = t.get('a%20directory')
 
234
        except (errors.PathError, errors.RedirectRequested):
 
235
            # early failure return immediately.
 
236
            return
 
237
        # having got a file, read() must either work (i.e. http reading a dir
 
238
        # listing) or fail with ReadError
 
239
        try:
 
240
            a_file.read()
 
241
        except errors.ReadError:
 
242
            pass
 
243
 
 
244
    def test_get_bytes(self):
 
245
        t = self.get_transport()
 
246
 
 
247
        files = ['a', 'b', 'e', 'g']
 
248
        contents = ['contents of a\n',
 
249
                    'contents of b\n',
 
250
                    'contents of e\n',
 
251
                    'contents of g\n',
 
252
                    ]
 
253
        self.build_tree(files, transport=t, line_endings='binary')
 
254
        self.check_transport_contents('contents of a\n', t, 'a')
 
255
 
 
256
        for content, fname in zip(contents, files):
 
257
            self.assertEqual(content, t.get_bytes(fname))
 
258
 
 
259
    def test_get_bytes_unknown_file(self):
 
260
        t = self.get_transport()
 
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
262
 
 
263
    def test_get_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
        finally:
 
272
            handle.close()
 
273
 
 
274
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
275
        t = self.get_transport()
 
276
        if t.is_readonly():
 
277
            return
 
278
        handle = t.open_write_stream('foo')
 
279
        try:
 
280
            handle.write('b')
 
281
            self.assertEqual('b', t.get_bytes('foo'))
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
 
287
        finally:
 
288
            handle.close()
 
289
 
 
290
    def test_put_bytes(self):
 
291
        t = self.get_transport()
 
292
 
 
293
        if t.is_readonly():
 
294
            self.assertRaises(TransportNotPossible,
 
295
                    t.put_bytes, 'a', 'some text for a\n')
 
296
            return
 
297
 
 
298
        t.put_bytes('a', 'some text for a\n')
 
299
        self.assertTrue(t.has('a'))
 
300
        self.check_transport_contents('some text for a\n', t, 'a')
 
301
 
 
302
        # The contents should be overwritten
 
303
        t.put_bytes('a', 'new text for a\n')
 
304
        self.check_transport_contents('new text for a\n', t, 'a')
 
305
 
 
306
        self.assertRaises(NoSuchFile,
 
307
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
308
 
 
309
    def test_put_bytes_non_atomic(self):
 
310
        t = self.get_transport()
 
311
 
 
312
        if t.is_readonly():
 
313
            self.assertRaises(TransportNotPossible,
 
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
315
            return
 
316
 
 
317
        self.assertFalse(t.has('a'))
 
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
319
        self.assertTrue(t.has('a'))
 
320
        self.check_transport_contents('some text for a\n', t, 'a')
 
321
        # Put also replaces contents
 
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
323
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
324
 
 
325
        # Make sure we can create another file
 
326
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
327
        # And overwrite 'a' with empty contents
 
328
        t.put_bytes_non_atomic('a', '')
 
329
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
330
        self.check_transport_contents('', t, 'a')
 
331
 
 
332
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
333
                                       'contents\n')
 
334
        # Now test the create_parent flag
 
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
336
                                       'contents\n')
 
337
        self.assertFalse(t.has('dir/a'))
 
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
339
                               create_parent_dir=True)
 
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
341
 
 
342
        # But we still get NoSuchFile if we can't make the parent dir
 
343
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
344
                                       'contents\n',
 
345
                                       create_parent_dir=True)
 
346
 
 
347
    def test_put_bytes_permissions(self):
 
348
        t = self.get_transport()
 
349
 
 
350
        if t.is_readonly():
 
351
            return
 
352
        if not t._can_roundtrip_unix_modebits():
 
353
            # Can't roundtrip, so no need to run this test
 
354
            return
 
355
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
356
        self.assertTransportMode(t, 'mode644', 0644)
 
357
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
358
        self.assertTransportMode(t, 'mode666', 0666)
 
359
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
360
        self.assertTransportMode(t, 'mode600', 0600)
 
361
        # Yes, you can put_bytes a file such that it becomes readonly
 
362
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
363
        self.assertTransportMode(t, 'mode400', 0400)
 
364
 
 
365
        # The default permissions should be based on the current umask
 
366
        umask = osutils.get_umask()
 
367
        t.put_bytes('nomode', 'test text\n', mode=None)
 
368
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
369
 
 
370
    def test_put_bytes_non_atomic_permissions(self):
 
371
        t = self.get_transport()
 
372
 
 
373
        if t.is_readonly():
 
374
            return
 
375
        if not t._can_roundtrip_unix_modebits():
 
376
            # Can't roundtrip, so no need to run this test
 
377
            return
 
378
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
379
        self.assertTransportMode(t, 'mode644', 0644)
 
380
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
381
        self.assertTransportMode(t, 'mode666', 0666)
 
382
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
383
        self.assertTransportMode(t, 'mode600', 0600)
 
384
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
385
        self.assertTransportMode(t, 'mode400', 0400)
 
386
 
 
387
        # The default permissions should be based on the current umask
 
388
        umask = osutils.get_umask()
 
389
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
390
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
391
 
 
392
        # We should also be able to set the mode for a parent directory
 
393
        # when it is created
 
394
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
395
                               dir_mode=0700, create_parent_dir=True)
 
396
        self.assertTransportMode(t, 'dir700', 0700)
 
397
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
398
                               dir_mode=0770, create_parent_dir=True)
 
399
        self.assertTransportMode(t, 'dir770', 0770)
 
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
401
                               dir_mode=0777, create_parent_dir=True)
 
402
        self.assertTransportMode(t, 'dir777', 0777)
 
403
 
 
404
    def test_put_file(self):
 
405
        t = self.get_transport()
 
406
 
 
407
        if t.is_readonly():
 
408
            self.assertRaises(TransportNotPossible,
 
409
                    t.put_file, 'a', StringIO('some text for a\n'))
 
410
            return
 
411
 
 
412
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        # put_file returns the length of the data written
 
414
        self.assertEqual(16, result)
 
415
        self.assertTrue(t.has('a'))
 
416
        self.check_transport_contents('some text for a\n', t, 'a')
 
417
        # Put also replaces contents
 
418
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        self.assertEqual(19, result)
 
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
421
        self.assertRaises(NoSuchFile,
 
422
                          t.put_file, 'path/doesnt/exist/c',
 
423
                              StringIO('contents'))
 
424
 
 
425
    def test_put_file_non_atomic(self):
 
426
        t = self.get_transport()
 
427
 
 
428
        if t.is_readonly():
 
429
            self.assertRaises(TransportNotPossible,
 
430
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
431
            return
 
432
 
 
433
        self.assertFalse(t.has('a'))
 
434
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
435
        self.assertTrue(t.has('a'))
 
436
        self.check_transport_contents('some text for a\n', t, 'a')
 
437
        # Put also replaces contents
 
438
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
439
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
440
 
 
441
        # Make sure we can create another file
 
442
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
443
        # And overwrite 'a' with empty contents
 
444
        t.put_file_non_atomic('a', StringIO(''))
 
445
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
446
        self.check_transport_contents('', t, 'a')
 
447
 
 
448
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
449
                                       StringIO('contents\n'))
 
450
        # Now test the create_parent flag
 
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
452
                                       StringIO('contents\n'))
 
453
        self.assertFalse(t.has('dir/a'))
 
454
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
455
                              create_parent_dir=True)
 
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
457
 
 
458
        # But we still get NoSuchFile if we can't make the parent dir
 
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
460
                                       StringIO('contents\n'),
 
461
                                       create_parent_dir=True)
 
462
 
 
463
    def test_put_file_permissions(self):
 
464
 
 
465
        t = self.get_transport()
 
466
 
 
467
        if t.is_readonly():
 
468
            return
 
469
        if not t._can_roundtrip_unix_modebits():
 
470
            # Can't roundtrip, so no need to run this test
 
471
            return
 
472
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
473
        self.assertTransportMode(t, 'mode644', 0644)
 
474
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
475
        self.assertTransportMode(t, 'mode666', 0666)
 
476
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
192
477
        self.assertTransportMode(t, 'mode600', 0600)
193
478
        # Yes, you can put a file such that it becomes readonly
194
 
        t.put('mode400', StringIO('test text\n'), mode=0400)
195
 
        self.assertTransportMode(t, 'mode400', 0400)
196
 
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
197
 
        self.assertTransportMode(t, 'mmode644', 0644)
198
 
        
 
479
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
480
        self.assertTransportMode(t, 'mode400', 0400)
 
481
        # The default permissions should be based on the current umask
 
482
        umask = osutils.get_umask()
 
483
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
484
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
485
 
 
486
    def test_put_file_non_atomic_permissions(self):
 
487
        t = self.get_transport()
 
488
 
 
489
        if t.is_readonly():
 
490
            return
 
491
        if not t._can_roundtrip_unix_modebits():
 
492
            # Can't roundtrip, so no need to run this test
 
493
            return
 
494
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
495
        self.assertTransportMode(t, 'mode644', 0644)
 
496
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
497
        self.assertTransportMode(t, 'mode666', 0666)
 
498
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
499
        self.assertTransportMode(t, 'mode600', 0600)
 
500
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
501
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
502
        self.assertTransportMode(t, 'mode400', 0400)
 
503
 
 
504
        # The default permissions should be based on the current umask
 
505
        umask = osutils.get_umask()
 
506
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
507
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
508
 
 
509
        # We should also be able to set the mode for a parent directory
 
510
        # when it is created
 
511
        sio = StringIO()
 
512
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
513
                              dir_mode=0700, create_parent_dir=True)
 
514
        self.assertTransportMode(t, 'dir700', 0700)
 
515
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
516
                              dir_mode=0770, create_parent_dir=True)
 
517
        self.assertTransportMode(t, 'dir770', 0770)
 
518
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
519
                              dir_mode=0777, create_parent_dir=True)
 
520
        self.assertTransportMode(t, 'dir777', 0777)
 
521
 
 
522
    def test_put_bytes_unicode(self):
 
523
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
524
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
525
        # (we don't want to encode unicode here at all, callers should be
 
526
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
527
        # compatibility.  At some point we should use a specific exception.
 
528
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
529
        t = self.get_transport()
 
530
        if t.is_readonly():
 
531
            return
 
532
        unicode_string = u'\u1234'
 
533
        self.assertRaises(
 
534
            (AssertionError, UnicodeEncodeError),
 
535
            t.put_bytes, 'foo', unicode_string)
 
536
 
 
537
    def test_put_file_unicode(self):
 
538
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
539
        # This situation can happen (and has) if code is careless about the type
 
540
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
541
        # cStringIO, because it never returns unicode from read.
 
542
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
543
        # raise, but we raise it for hysterical raisins.
 
544
        t = self.get_transport()
 
545
        if t.is_readonly():
 
546
            return
 
547
        unicode_file = pyStringIO(u'\u1234')
 
548
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
549
 
199
550
    def test_mkdir(self):
200
551
        t = self.get_transport()
201
552
 
202
553
        if t.is_readonly():
203
 
            # cannot mkdir on readonly transports. We're not testing for 
 
554
            # cannot mkdir on readonly transports. We're not testing for
204
555
            # cache coherency because cache behaviour is not currently
205
556
            # defined for the transport interface.
206
557
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
227
578
 
228
579
        # we were testing that a local mkdir followed by a transport
229
580
        # mkdir failed thusly, but given that we * in one process * do not
230
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
581
        # concurrently fiddle with disk dirs and then use transport to do
231
582
        # things, the win here seems marginal compared to the constraint on
232
583
        # the interface. RBC 20051227
233
584
        t.mkdir('dir_g')
234
585
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
235
586
 
236
587
        # Test get/put in sub-directories
237
 
        self.assertEqual(2, 
238
 
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
239
 
                         ('dir_b/b', StringIO('contents of dir_b/b'))]))
 
588
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
589
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
240
590
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
241
591
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
242
592
 
247
597
        t = self.get_transport()
248
598
        if t.is_readonly():
249
599
            return
 
600
        if not t._can_roundtrip_unix_modebits():
 
601
            # no sense testing on this transport
 
602
            return
250
603
        # Test mkdir with a mode
251
604
        t.mkdir('dmode755', mode=0755)
252
605
        self.assertTransportMode(t, 'dmode755', 0755)
256
609
        self.assertTransportMode(t, 'dmode777', 0777)
257
610
        t.mkdir('dmode700', mode=0700)
258
611
        self.assertTransportMode(t, 'dmode700', 0700)
259
 
        # TODO: jam 20051215 test mkdir_multi with a mode
260
612
        t.mkdir_multi(['mdmode755'], mode=0755)
261
613
        self.assertTransportMode(t, 'mdmode755', 0755)
262
614
 
 
615
        # Default mode should be based on umask
 
616
        umask = osutils.get_umask()
 
617
        t.mkdir('dnomode', mode=None)
 
618
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
619
 
 
620
    def test_opening_a_file_stream_creates_file(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        handle = t.open_write_stream('foo')
 
625
        try:
 
626
            self.assertEqual('', t.get_bytes('foo'))
 
627
        finally:
 
628
            handle.close()
 
629
 
 
630
    def test_opening_a_file_stream_can_set_mode(self):
 
631
        t = self.get_transport()
 
632
        if t.is_readonly():
 
633
            return
 
634
        if not t._can_roundtrip_unix_modebits():
 
635
            # Can't roundtrip, so no need to run this test
 
636
            return
 
637
        def check_mode(name, mode, expected):
 
638
            handle = t.open_write_stream(name, mode=mode)
 
639
            handle.close()
 
640
            self.assertTransportMode(t, name, expected)
 
641
        check_mode('mode644', 0644, 0644)
 
642
        check_mode('mode666', 0666, 0666)
 
643
        check_mode('mode600', 0600, 0600)
 
644
        # The default permissions should be based on the current umask
 
645
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
646
 
263
647
    def test_copy_to(self):
264
648
        # FIXME: test:   same server to same server (partly done)
265
649
        # same protocol two servers
266
650
        # and    different protocols (done for now except for MemoryTransport.
267
651
        # - RBC 20060122
268
 
        from bzrlib.transport.memory import MemoryTransport
269
652
 
270
653
        def simple_copy_files(transport_from, transport_to):
271
654
            files = ['a', 'b', 'c', 'd']
272
655
            self.build_tree(files, transport=transport_from)
273
656
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
274
657
            for f in files:
275
 
                self.check_transport_contents(transport_to.get(f).read(),
 
658
                self.check_transport_contents(transport_to.get_bytes(f),
276
659
                                              transport_from, f)
277
660
 
278
661
        t = self.get_transport()
279
 
        temp_transport = MemoryTransport('memory:/')
 
662
        temp_transport = MemoryTransport('memory:///')
280
663
        simple_copy_files(t, temp_transport)
281
664
        if not t.is_readonly():
282
665
            t.mkdir('copy_to_simple')
290
673
            self.build_tree(['e/', 'e/f'])
291
674
        else:
292
675
            t.mkdir('e')
293
 
            t.put('e/f', StringIO('contents of e'))
 
676
            t.put_bytes('e/f', 'contents of e')
294
677
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
295
678
        temp_transport.mkdir('e')
296
679
        t.copy_to(['e/f'], temp_transport)
297
680
 
298
681
        del temp_transport
299
 
        temp_transport = MemoryTransport('memory:/')
 
682
        temp_transport = MemoryTransport('memory:///')
300
683
 
301
684
        files = ['a', 'b', 'c', 'd']
302
685
        t.copy_to(iter(files), temp_transport)
303
686
        for f in files:
304
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
687
            self.check_transport_contents(temp_transport.get_bytes(f),
305
688
                                          t, f)
306
689
        del temp_transport
307
690
 
308
691
        for mode in (0666, 0644, 0600, 0400):
309
 
            temp_transport = MemoryTransport("memory:/")
 
692
            temp_transport = MemoryTransport("memory:///")
310
693
            t.copy_to(files, temp_transport, mode=mode)
311
694
            for f in files:
312
695
                self.assertTransportMode(temp_transport, f, mode)
313
696
 
314
 
    def test_append(self):
315
 
        t = self.get_transport()
316
 
 
317
 
        if t.is_readonly():
318
 
            open('a', 'wb').write('diff\ncontents for\na\n')
319
 
            open('b', 'wb').write('contents\nfor b\n')
320
 
        else:
321
 
            t.put_multi([
322
 
                    ('a', StringIO('diff\ncontents for\na\n')),
323
 
                    ('b', StringIO('contents\nfor b\n'))
324
 
                    ])
325
 
 
326
 
        if t.is_readonly():
327
 
            self.assertRaises(TransportNotPossible,
328
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
329
 
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
330
 
        else:
331
 
            self.assertEqual(20,
332
 
                t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
333
 
 
334
 
        self.check_transport_contents(
335
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
336
 
            t, 'a')
337
 
 
338
 
        if t.is_readonly():
339
 
            self.assertRaises(TransportNotPossible,
340
 
                    t.append_multi,
341
 
                        [('a', 'and\nthen\nsome\nmore\n'),
342
 
                         ('b', 'some\nmore\nfor\nb\n')])
343
 
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
344
 
            _append('b', StringIO('some\nmore\nfor\nb\n'))
345
 
        else:
346
 
            self.assertEqual((43, 15), 
347
 
                t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
348
 
                                ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
697
    def test_create_prefix(self):
 
698
        t = self.get_transport()
 
699
        sub = t.clone('foo').clone('bar')
 
700
        try:
 
701
            sub.create_prefix()
 
702
        except TransportNotPossible:
 
703
            self.assertTrue(t.is_readonly())
 
704
        else:
 
705
            self.assertTrue(t.has('foo/bar'))
 
706
 
 
707
    def test_append_file(self):
 
708
        t = self.get_transport()
 
709
 
 
710
        if t.is_readonly():
 
711
            self.assertRaises(TransportNotPossible,
 
712
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
713
            return
 
714
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
715
        t.put_bytes('b', 'contents\nfor b\n')
 
716
 
 
717
        self.assertEqual(20,
 
718
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
719
 
 
720
        self.check_transport_contents(
 
721
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
722
            t, 'a')
 
723
 
 
724
        # a file with no parent should fail..
 
725
        self.assertRaises(NoSuchFile,
 
726
                          t.append_file, 'missing/path', StringIO('content'))
 
727
 
 
728
        # And we can create new files, too
 
729
        self.assertEqual(0,
 
730
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
731
        self.check_transport_contents('some text\nfor a missing file\n',
 
732
                                      t, 'c')
 
733
 
 
734
    def test_append_bytes(self):
 
735
        t = self.get_transport()
 
736
 
 
737
        if t.is_readonly():
 
738
            self.assertRaises(TransportNotPossible,
 
739
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
740
            return
 
741
 
 
742
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
743
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
744
 
 
745
        self.assertEqual(20,
 
746
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
747
 
 
748
        self.check_transport_contents(
 
749
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
750
            t, 'a')
 
751
 
 
752
        # a file with no parent should fail..
 
753
        self.assertRaises(NoSuchFile,
 
754
                          t.append_bytes, 'missing/path', 'content')
 
755
 
 
756
    def test_append_multi(self):
 
757
        t = self.get_transport()
 
758
 
 
759
        if t.is_readonly():
 
760
            return
 
761
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
762
                         'add\nsome\nmore\ncontents\n')
 
763
        t.put_bytes('b', 'contents\nfor b\n')
 
764
 
 
765
        self.assertEqual((43, 15),
 
766
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
767
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
768
 
349
769
        self.check_transport_contents(
350
770
            'diff\ncontents for\na\n'
351
771
            'add\nsome\nmore\ncontents\n'
356
776
                'some\nmore\nfor\nb\n',
357
777
                t, 'b')
358
778
 
359
 
        if t.is_readonly():
360
 
            _append('a', StringIO('a little bit more\n'))
361
 
            _append('b', StringIO('from an iterator\n'))
362
 
        else:
363
 
            self.assertEqual((62, 31),
364
 
                t.append_multi(iter([('a', StringIO('a little bit more\n')),
365
 
                                     ('b', StringIO('from an iterator\n'))])))
 
779
        self.assertEqual((62, 31),
 
780
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
781
                                 ('b', StringIO('from an iterator\n'))])))
366
782
        self.check_transport_contents(
367
783
            'diff\ncontents for\na\n'
368
784
            'add\nsome\nmore\ncontents\n'
375
791
                'from an iterator\n',
376
792
                t, 'b')
377
793
 
378
 
        if t.is_readonly():
379
 
            _append('c', StringIO('some text\nfor a missing file\n'))
380
 
            _append('a', StringIO('some text in a\n'))
381
 
            _append('d', StringIO('missing file r\n'))
382
 
        else:
383
 
            self.assertEqual(0,
384
 
                t.append('c', StringIO('some text\nfor a missing file\n')))
385
 
            self.assertEqual((80, 0),
386
 
                t.append_multi([('a', StringIO('some text in a\n')),
387
 
                                ('d', StringIO('missing file r\n'))]))
 
794
        self.assertEqual((80, 0),
 
795
            t.append_multi([('a', StringIO('some text in a\n')),
 
796
                            ('d', StringIO('missing file r\n'))]))
 
797
 
388
798
        self.check_transport_contents(
389
799
            'diff\ncontents for\na\n'
390
800
            'add\nsome\nmore\ncontents\n'
392
802
            'a little bit more\n'
393
803
            'some text in a\n',
394
804
            t, 'a')
395
 
        self.check_transport_contents('some text\nfor a missing file\n',
396
 
                                      t, 'c')
397
805
        self.check_transport_contents('missing file r\n', t, 'd')
398
 
        
399
 
        # a file with no parent should fail..
400
 
        if not t.is_readonly():
401
 
            self.assertRaises(NoSuchFile,
402
 
                              t.append, 'missing/path', 
403
 
                              StringIO('content'))
404
 
 
405
 
    def test_append_file(self):
406
 
        t = self.get_transport()
407
 
 
408
 
        contents = [
409
 
            ('f1', StringIO('this is a string\nand some more stuff\n')),
410
 
            ('f2', StringIO('here is some text\nand a bit more\n')),
411
 
            ('f3', StringIO('some text for the\nthird file created\n')),
412
 
            ('f4', StringIO('this is a string\nand some more stuff\n')),
413
 
            ('f5', StringIO('here is some text\nand a bit more\n')),
414
 
            ('f6', StringIO('some text for the\nthird file created\n'))
415
 
        ]
416
 
        
417
 
        if t.is_readonly():
418
 
            for f, val in contents:
419
 
                open(f, 'wb').write(val.read())
420
 
        else:
421
 
            t.put_multi(contents)
422
 
 
423
 
        a1 = StringIO('appending to\none\n')
424
 
        if t.is_readonly():
425
 
            _append('f1', a1)
426
 
        else:
427
 
            t.append('f1', a1)
428
 
 
429
 
        del a1
430
 
 
431
 
        self.check_transport_contents(
432
 
                'this is a string\nand some more stuff\n'
433
 
                'appending to\none\n',
434
 
                t, 'f1')
435
 
 
436
 
        a2 = StringIO('adding more\ntext to two\n')
437
 
        a3 = StringIO('some garbage\nto put in three\n')
438
 
 
439
 
        if t.is_readonly():
440
 
            _append('f2', a2)
441
 
            _append('f3', a3)
442
 
        else:
443
 
            t.append_multi([('f2', a2), ('f3', a3)])
444
 
 
445
 
        del a2, a3
446
 
 
447
 
        self.check_transport_contents(
448
 
                'here is some text\nand a bit more\n'
449
 
                'adding more\ntext to two\n',
450
 
                t, 'f2')
451
 
        self.check_transport_contents( 
452
 
                'some text for the\nthird file created\n'
453
 
                'some garbage\nto put in three\n',
454
 
                t, 'f3')
455
 
 
456
 
        # Test that an actual file object can be used with put
457
 
        a4 = t.get('f1')
458
 
        if t.is_readonly():
459
 
            _append('f4', a4)
460
 
        else:
461
 
            t.append('f4', a4)
462
 
 
463
 
        del a4
464
 
 
465
 
        self.check_transport_contents(
466
 
                'this is a string\nand some more stuff\n'
467
 
                'this is a string\nand some more stuff\n'
468
 
                'appending to\none\n',
469
 
                t, 'f4')
470
 
 
471
 
        a5 = t.get('f2')
472
 
        a6 = t.get('f3')
473
 
        if t.is_readonly():
474
 
            _append('f5', a5)
475
 
            _append('f6', a6)
476
 
        else:
477
 
            t.append_multi([('f5', a5), ('f6', a6)])
478
 
 
479
 
        del a5, a6
480
 
 
481
 
        self.check_transport_contents(
482
 
                'here is some text\nand a bit more\n'
483
 
                'here is some text\nand a bit more\n'
484
 
                'adding more\ntext to two\n',
485
 
                t, 'f5')
486
 
        self.check_transport_contents(
487
 
                'some text for the\nthird file created\n'
488
 
                'some text for the\nthird file created\n'
489
 
                'some garbage\nto put in three\n',
490
 
                t, 'f6')
491
 
 
492
 
        a5 = t.get('f2')
493
 
        a6 = t.get('f2')
494
 
        a7 = t.get('f3')
495
 
        if t.is_readonly():
496
 
            _append('c', a5)
497
 
            _append('a', a6)
498
 
            _append('d', a7)
499
 
        else:
500
 
            t.append('c', a5)
501
 
            t.append_multi([('a', a6), ('d', a7)])
502
 
        del a5, a6, a7
503
 
        self.check_transport_contents(t.get('f2').read(), t, 'c')
504
 
        self.check_transport_contents(t.get('f3').read(), t, 'd')
 
806
 
 
807
    def test_append_file_mode(self):
 
808
        """Check that append accepts a mode parameter"""
 
809
        # check append accepts a mode
 
810
        t = self.get_transport()
 
811
        if t.is_readonly():
 
812
            self.assertRaises(TransportNotPossible,
 
813
                t.append_file, 'f', StringIO('f'), mode=None)
 
814
            return
 
815
        t.append_file('f', StringIO('f'), mode=None)
 
816
 
 
817
    def test_append_bytes_mode(self):
 
818
        # check append_bytes accepts a mode
 
819
        t = self.get_transport()
 
820
        if t.is_readonly():
 
821
            self.assertRaises(TransportNotPossible,
 
822
                t.append_bytes, 'f', 'f', mode=None)
 
823
            return
 
824
        t.append_bytes('f', 'f', mode=None)
505
825
 
506
826
    def test_delete(self):
507
827
        # TODO: Test Transport.delete
512
832
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
513
833
            return
514
834
 
515
 
        t.put('a', StringIO('a little bit of text\n'))
516
 
        self.failUnless(t.has('a'))
 
835
        t.put_bytes('a', 'a little bit of text\n')
 
836
        self.assertTrue(t.has('a'))
517
837
        t.delete('a')
518
 
        self.failIf(t.has('a'))
 
838
        self.assertFalse(t.has('a'))
519
839
 
520
840
        self.assertRaises(NoSuchFile, t.delete, 'a')
521
841
 
522
 
        t.put('a', StringIO('a text\n'))
523
 
        t.put('b', StringIO('b text\n'))
524
 
        t.put('c', StringIO('c text\n'))
 
842
        t.put_bytes('a', 'a text\n')
 
843
        t.put_bytes('b', 'b text\n')
 
844
        t.put_bytes('c', 'c text\n')
525
845
        self.assertEqual([True, True, True],
526
846
                list(t.has_multi(['a', 'b', 'c'])))
527
847
        t.delete_multi(['a', 'c'])
528
848
        self.assertEqual([False, True, False],
529
849
                list(t.has_multi(['a', 'b', 'c'])))
530
 
        self.failIf(t.has('a'))
531
 
        self.failUnless(t.has('b'))
532
 
        self.failIf(t.has('c'))
 
850
        self.assertFalse(t.has('a'))
 
851
        self.assertTrue(t.has('b'))
 
852
        self.assertFalse(t.has('c'))
533
853
 
534
854
        self.assertRaises(NoSuchFile,
535
855
                t.delete_multi, ['a', 'b', 'c'])
537
857
        self.assertRaises(NoSuchFile,
538
858
                t.delete_multi, iter(['a', 'b', 'c']))
539
859
 
540
 
        t.put('a', StringIO('another a text\n'))
541
 
        t.put('c', StringIO('another c text\n'))
 
860
        t.put_bytes('a', 'another a text\n')
 
861
        t.put_bytes('c', 'another c text\n')
542
862
        t.delete_multi(iter(['a', 'b', 'c']))
543
863
 
544
864
        # We should have deleted everything
547
867
        # plain "listdir".
548
868
        # self.assertEqual([], os.listdir('.'))
549
869
 
 
870
    def test_recommended_page_size(self):
 
871
        """Transports recommend a page size for partial access to files."""
 
872
        t = self.get_transport()
 
873
        self.assertIsInstance(t.recommended_page_size(), int)
 
874
 
550
875
    def test_rmdir(self):
551
876
        t = self.get_transport()
552
877
        # Not much to do with a readonly transport
556
881
        t.mkdir('adir')
557
882
        t.mkdir('adir/bdir')
558
883
        t.rmdir('adir/bdir')
559
 
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
884
        # ftp may not be able to raise NoSuchFile for lack of
 
885
        # details when failing
 
886
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
560
887
        t.rmdir('adir')
561
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
888
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
562
889
 
563
890
    def test_rmdir_not_empty(self):
564
891
        """Deleting a non-empty directory raises an exception
565
 
        
 
892
 
566
893
        sftp (and possibly others) don't give us a specific "directory not
567
894
        empty" exception -- we can just see that the operation failed.
568
895
        """
573
900
        t.mkdir('adir/bdir')
574
901
        self.assertRaises(PathError, t.rmdir, 'adir')
575
902
 
 
903
    def test_rmdir_empty_but_similar_prefix(self):
 
904
        """rmdir does not get confused by sibling paths.
 
905
 
 
906
        A naive implementation of MemoryTransport would refuse to rmdir
 
907
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
908
        uses "path.startswith(dir)" on all file paths to determine if directory
 
909
        is empty.
 
910
        """
 
911
        t = self.get_transport()
 
912
        if t.is_readonly():
 
913
            return
 
914
        t.mkdir('foo')
 
915
        t.put_bytes('foo-bar', '')
 
916
        t.mkdir('foo-baz')
 
917
        t.rmdir('foo')
 
918
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
919
        self.assertTrue(t.has('foo-bar'))
 
920
 
576
921
    def test_rename_dir_succeeds(self):
577
922
        t = self.get_transport()
578
923
        if t.is_readonly():
592
937
        t.mkdir('adir/asubdir')
593
938
        t.mkdir('bdir')
594
939
        t.mkdir('bdir/bsubdir')
 
940
        # any kind of PathError would be OK, though we normally expect
 
941
        # DirectoryNotEmpty
595
942
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
596
943
        # nothing was changed so it should still be as before
597
944
        self.assertTrue(t.has('bdir/bsubdir'))
598
945
        self.assertFalse(t.has('adir/bdir'))
599
946
        self.assertFalse(t.has('adir/bsubdir'))
600
947
 
 
948
    def test_rename_across_subdirs(self):
 
949
        t = self.get_transport()
 
950
        if t.is_readonly():
 
951
            raise TestNotApplicable("transport is readonly")
 
952
        t.mkdir('a')
 
953
        t.mkdir('b')
 
954
        ta = t.clone('a')
 
955
        tb = t.clone('b')
 
956
        ta.put_bytes('f', 'aoeu')
 
957
        ta.rename('f', '../b/f')
 
958
        self.assertTrue(tb.has('f'))
 
959
        self.assertFalse(ta.has('f'))
 
960
        self.assertTrue(t.has('b/f'))
 
961
 
601
962
    def test_delete_tree(self):
602
963
        t = self.get_transport()
603
964
 
613
974
        except TransportNotPossible:
614
975
            # ok, this transport does not support delete_tree
615
976
            return
616
 
        
 
977
 
617
978
        # did it delete that trivial case?
618
979
        self.assertRaises(NoSuchFile, t.stat, 'adir')
619
980
 
620
981
        self.build_tree(['adir/',
621
 
                         'adir/file', 
622
 
                         'adir/subdir/', 
623
 
                         'adir/subdir/file', 
 
982
                         'adir/file',
 
983
                         'adir/subdir/',
 
984
                         'adir/subdir/file',
624
985
                         'adir/subdir2/',
625
986
                         'adir/subdir2/file',
626
987
                         ], transport=t)
640
1001
        # creates control files in the working directory
641
1002
        # perhaps all of this could be done in a subdirectory
642
1003
 
643
 
        t.put('a', StringIO('a first file\n'))
 
1004
        t.put_bytes('a', 'a first file\n')
644
1005
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
645
1006
 
646
1007
        t.move('a', 'b')
647
 
        self.failUnless(t.has('b'))
648
 
        self.failIf(t.has('a'))
 
1008
        self.assertTrue(t.has('b'))
 
1009
        self.assertFalse(t.has('a'))
649
1010
 
650
1011
        self.check_transport_contents('a first file\n', t, 'b')
651
1012
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
652
1013
 
653
1014
        # Overwrite a file
654
 
        t.put('c', StringIO('c this file\n'))
 
1015
        t.put_bytes('c', 'c this file\n')
655
1016
        t.move('c', 'b')
656
 
        self.failIf(t.has('c'))
 
1017
        self.assertFalse(t.has('c'))
657
1018
        self.check_transport_contents('c this file\n', t, 'b')
658
1019
 
659
1020
        # TODO: Try to write a test for atomicity
660
 
        # TODO: Test moving into a non-existant subdirectory
 
1021
        # TODO: Test moving into a non-existent subdirectory
661
1022
        # TODO: Test Transport.move_multi
662
1023
 
663
1024
    def test_copy(self):
666
1027
        if t.is_readonly():
667
1028
            return
668
1029
 
669
 
        t.put('a', StringIO('a file\n'))
 
1030
        t.put_bytes('a', 'a file\n')
670
1031
        t.copy('a', 'b')
671
1032
        self.check_transport_contents('a file\n', t, 'b')
672
1033
 
675
1036
        # What should the assert be if you try to copy a
676
1037
        # file over a directory?
677
1038
        #self.assertRaises(Something, t.copy, 'a', 'c')
678
 
        t.put('d', StringIO('text in d\n'))
 
1039
        t.put_bytes('d', 'text in d\n')
679
1040
        t.copy('d', 'b')
680
1041
        self.check_transport_contents('text in d\n', t, 'b')
681
1042
 
682
1043
        # TODO: test copy_multi
683
1044
 
684
1045
    def test_connection_error(self):
685
 
        """ConnectionError is raised when connection is impossible"""
 
1046
        """ConnectionError is raised when connection is impossible.
 
1047
 
 
1048
        The error should be raised from the first operation on the transport.
 
1049
        """
686
1050
        try:
687
1051
            url = self._server.get_bogus_url()
688
1052
        except NotImplementedError:
689
1053
            raise TestSkipped("Transport %s has no bogus URL support." %
690
1054
                              self._server.__class__)
691
 
        t = bzrlib.transport.get_transport(url)
692
 
        try:
693
 
            t.get('.bzr/branch')
694
 
        except (ConnectionError, NoSuchFile), e:
695
 
            pass
696
 
        except (Exception), e:
697
 
            self.failIf(True, 'Wrong exception thrown: %s' % e)
698
 
        else:
699
 
            self.failIf(True, 'Did not get the expected exception.')
 
1055
        t = _mod_transport.get_transport_from_url(url)
 
1056
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
700
1057
 
701
1058
    def test_stat(self):
702
1059
        # TODO: Test stat, just try once, and if it throws, stop testing
711
1068
            return
712
1069
 
713
1070
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
714
 
        sizes = [14, 0, 16, 0, 18] 
 
1071
        sizes = [14, 0, 16, 0, 18]
715
1072
        self.build_tree(paths, transport=t, line_endings='binary')
716
1073
 
717
1074
        for path, size in zip(paths, sizes):
718
1075
            st = t.stat(path)
719
1076
            if path.endswith('/'):
720
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1077
                self.assertTrue(S_ISDIR(st.st_mode))
721
1078
                # directory sizes are meaningless
722
1079
            else:
723
 
                self.failUnless(S_ISREG(st.st_mode))
 
1080
                self.assertTrue(S_ISREG(st.st_mode))
724
1081
                self.assertEqual(size, st.st_size)
725
1082
 
726
1083
        remote_stats = list(t.stat_multi(paths))
733
1090
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
734
1091
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
735
1092
        subdir = t.clone('subdir')
736
 
        subdir.stat('./file')
737
 
        subdir.stat('.')
 
1093
        st = subdir.stat('./file')
 
1094
        st = subdir.stat('.')
 
1095
 
 
1096
    def test_hardlink(self):
 
1097
        from stat import ST_NLINK
 
1098
 
 
1099
        t = self.get_transport()
 
1100
 
 
1101
        source_name = "original_target"
 
1102
        link_name = "target_link"
 
1103
 
 
1104
        self.build_tree([source_name], transport=t)
 
1105
 
 
1106
        try:
 
1107
            t.hardlink(source_name, link_name)
 
1108
 
 
1109
            self.assertTrue(t.has(source_name))
 
1110
            self.assertTrue(t.has(link_name))
 
1111
 
 
1112
            st = t.stat(link_name)
 
1113
            self.assertEqual(st[ST_NLINK], 2)
 
1114
        except TransportNotPossible:
 
1115
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1116
                              self._server.__class__)
 
1117
 
 
1118
    def test_symlink(self):
 
1119
        from stat import S_ISLNK
 
1120
 
 
1121
        t = self.get_transport()
 
1122
 
 
1123
        source_name = "original_target"
 
1124
        link_name = "target_link"
 
1125
 
 
1126
        self.build_tree([source_name], transport=t)
 
1127
 
 
1128
        try:
 
1129
            t.symlink(source_name, link_name)
 
1130
 
 
1131
            self.assertTrue(t.has(source_name))
 
1132
            self.assertTrue(t.has(link_name))
 
1133
 
 
1134
            st = t.stat(link_name)
 
1135
            self.assertTrue(S_ISLNK(st.st_mode),
 
1136
                "expected symlink, got mode %o" % st.st_mode)
 
1137
        except TransportNotPossible:
 
1138
            raise TestSkipped("Transport %s does not support symlinks." %
 
1139
                              self._server.__class__)
 
1140
        except IOError:
 
1141
            self.knownFailure("Paramiko fails to create symlinks during tests")
738
1142
 
739
1143
    def test_list_dir(self):
740
1144
        # TODO: Test list_dir, just try once, and if it throws, stop testing
741
1145
        t = self.get_transport()
742
 
        
 
1146
 
743
1147
        if not t.listable():
744
1148
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
745
1149
            return
746
1150
 
747
 
        def sorted_list(d):
748
 
            l = list(t.list_dir(d))
 
1151
        def sorted_list(d, transport):
 
1152
            l = list(transport.list_dir(d))
749
1153
            l.sort()
750
1154
            return l
751
1155
 
752
 
        # SftpServer creates control files in the working directory
753
 
        # so lets move down a directory to avoid those.
754
 
        if not t.is_readonly():
755
 
            t.mkdir('wd')
756
 
        else:
757
 
            os.mkdir('wd')
758
 
        t = t.clone('wd')
759
 
 
760
 
        self.assertEqual([], sorted_list(u'.'))
 
1156
        self.assertEqual([], sorted_list('.', t))
761
1157
        # c2 is precisely one letter longer than c here to test that
762
1158
        # suffixing is not confused.
 
1159
        # a%25b checks that quoting is done consistently across transports
 
1160
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1161
 
763
1162
        if not t.is_readonly():
764
 
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
1163
            self.build_tree(tree_names, transport=t)
765
1164
        else:
766
 
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
767
 
 
768
 
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
769
 
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
1165
            self.build_tree(tree_names)
 
1166
 
 
1167
        self.assertEqual(
 
1168
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1169
        self.assertEqual(
 
1170
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1171
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1172
 
 
1173
        # Cloning the transport produces an equivalent listing
 
1174
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
770
1175
 
771
1176
        if not t.is_readonly():
772
1177
            t.delete('c/d')
773
1178
            t.delete('b')
774
1179
        else:
775
 
            os.unlink('wd/c/d')
776
 
            os.unlink('wd/b')
777
 
            
778
 
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
779
 
        self.assertEqual([u'e'], sorted_list(u'c'))
780
 
 
781
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
782
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
783
 
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
1180
            os.unlink('c/d')
 
1181
            os.unlink('b')
 
1182
 
 
1183
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1184
        self.assertEqual(['e'], sorted_list('c', t))
 
1185
 
 
1186
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1187
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1188
        # 'a' is a file, list_dir should raise an error
 
1189
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1190
 
 
1191
    def test_list_dir_result_is_url_escaped(self):
 
1192
        t = self.get_transport()
 
1193
        if not t.listable():
 
1194
            raise TestSkipped("transport not listable")
 
1195
 
 
1196
        if not t.is_readonly():
 
1197
            self.build_tree(['a/', 'a/%'], transport=t)
 
1198
        else:
 
1199
            self.build_tree(['a/', 'a/%'])
 
1200
 
 
1201
        names = list(t.list_dir('a'))
 
1202
        self.assertEqual(['%25'], names)
 
1203
        self.assertIsInstance(names[0], str)
 
1204
 
 
1205
    def test_clone_preserve_info(self):
 
1206
        t1 = self.get_transport()
 
1207
        if not isinstance(t1, ConnectedTransport):
 
1208
            raise TestSkipped("not a connected transport")
 
1209
 
 
1210
        t2 = t1.clone('subdir')
 
1211
        self.assertEquals(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1212
        self.assertEquals(t1._parsed_url.user, t2._parsed_url.user)
 
1213
        self.assertEquals(t1._parsed_url.password, t2._parsed_url.password)
 
1214
        self.assertEquals(t1._parsed_url.host, t2._parsed_url.host)
 
1215
        self.assertEquals(t1._parsed_url.port, t2._parsed_url.port)
 
1216
 
 
1217
    def test__reuse_for(self):
 
1218
        t = self.get_transport()
 
1219
        if not isinstance(t, ConnectedTransport):
 
1220
            raise TestSkipped("not a connected transport")
 
1221
 
 
1222
        def new_url(scheme=None, user=None, password=None,
 
1223
                    host=None, port=None, path=None):
 
1224
            """Build a new url from t.base changing only parts of it.
 
1225
 
 
1226
            Only the parameters different from None will be changed.
 
1227
            """
 
1228
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1229
            if user     is None: user     = t._parsed_url.user
 
1230
            if password is None: password = t._parsed_url.password
 
1231
            if user     is None: user     = t._parsed_url.user
 
1232
            if host     is None: host     = t._parsed_url.host
 
1233
            if port     is None: port     = t._parsed_url.port
 
1234
            if path     is None: path     = t._parsed_url.path
 
1235
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1236
 
 
1237
        if t._parsed_url.scheme == 'ftp':
 
1238
            scheme = 'sftp'
 
1239
        else:
 
1240
            scheme = 'ftp'
 
1241
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1242
        if t._parsed_url.user == 'me':
 
1243
            user = 'you'
 
1244
        else:
 
1245
            user = 'me'
 
1246
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1247
        # passwords are not taken into account because:
 
1248
        # - it makes no sense to have two different valid passwords for the
 
1249
        #   same user
 
1250
        # - _password in ConnectedTransport is intended to collect what the
 
1251
        #   user specified from the command-line and there are cases where the
 
1252
        #   new url can contain no password (if the url was built from an
 
1253
        #   existing transport.base for example)
 
1254
        # - password are considered part of the credentials provided at
 
1255
        #   connection creation time and as such may not be present in the url
 
1256
        #   (they may be typed by the user when prompted for example)
 
1257
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1258
        # We will not connect, we can use a invalid host
 
1259
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1260
        if t._parsed_url.port == 1234:
 
1261
            port = 4321
 
1262
        else:
 
1263
            port = 1234
 
1264
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1265
        # No point in trying to reuse a transport for a local URL
 
1266
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1267
 
 
1268
    def test_connection_sharing(self):
 
1269
        t = self.get_transport()
 
1270
        if not isinstance(t, ConnectedTransport):
 
1271
            raise TestSkipped("not a connected transport")
 
1272
 
 
1273
        c = t.clone('subdir')
 
1274
        # Some transports will create the connection  only when needed
 
1275
        t.has('surely_not') # Force connection
 
1276
        self.assertIs(t._get_connection(), c._get_connection())
 
1277
 
 
1278
        # Temporary failure, we need to create a new dummy connection
 
1279
        new_connection = None
 
1280
        t._set_connection(new_connection)
 
1281
        # Check that both transports use the same connection
 
1282
        self.assertIs(new_connection, t._get_connection())
 
1283
        self.assertIs(new_connection, c._get_connection())
 
1284
 
 
1285
    def test_reuse_connection_for_various_paths(self):
 
1286
        t = self.get_transport()
 
1287
        if not isinstance(t, ConnectedTransport):
 
1288
            raise TestSkipped("not a connected transport")
 
1289
 
 
1290
        t.has('surely_not') # Force connection
 
1291
        self.assertIsNot(None, t._get_connection())
 
1292
 
 
1293
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1294
        self.assertIsNot(t, subdir)
 
1295
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1296
 
 
1297
        home = subdir._reuse_for(t.base + 'home')
 
1298
        self.assertIs(t._get_connection(), home._get_connection())
 
1299
        self.assertIs(subdir._get_connection(), home._get_connection())
784
1300
 
785
1301
    def test_clone(self):
786
1302
        # TODO: Test that clone moves up and down the filesystem
788
1304
 
789
1305
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
790
1306
 
791
 
        self.failUnless(t1.has('a'))
792
 
        self.failUnless(t1.has('b/c'))
793
 
        self.failIf(t1.has('c'))
 
1307
        self.assertTrue(t1.has('a'))
 
1308
        self.assertTrue(t1.has('b/c'))
 
1309
        self.assertFalse(t1.has('c'))
794
1310
 
795
1311
        t2 = t1.clone('b')
796
1312
        self.assertEqual(t1.base + 'b/', t2.base)
797
1313
 
798
 
        self.failUnless(t2.has('c'))
799
 
        self.failIf(t2.has('a'))
 
1314
        self.assertTrue(t2.has('c'))
 
1315
        self.assertFalse(t2.has('a'))
800
1316
 
801
1317
        t3 = t2.clone('..')
802
 
        self.failUnless(t3.has('a'))
803
 
        self.failIf(t3.has('c'))
 
1318
        self.assertTrue(t3.has('a'))
 
1319
        self.assertFalse(t3.has('c'))
804
1320
 
805
 
        self.failIf(t1.has('b/d'))
806
 
        self.failIf(t2.has('d'))
807
 
        self.failIf(t3.has('b/d'))
 
1321
        self.assertFalse(t1.has('b/d'))
 
1322
        self.assertFalse(t2.has('d'))
 
1323
        self.assertFalse(t3.has('b/d'))
808
1324
 
809
1325
        if t1.is_readonly():
810
 
            open('b/d', 'wb').write('newfile\n')
 
1326
            self.build_tree_contents([('b/d', 'newfile\n')])
811
1327
        else:
812
 
            t2.put('d', StringIO('newfile\n'))
813
 
 
814
 
        self.failUnless(t1.has('b/d'))
815
 
        self.failUnless(t2.has('d'))
816
 
        self.failUnless(t3.has('b/d'))
 
1328
            t2.put_bytes('d', 'newfile\n')
 
1329
 
 
1330
        self.assertTrue(t1.has('b/d'))
 
1331
        self.assertTrue(t2.has('d'))
 
1332
        self.assertTrue(t3.has('b/d'))
 
1333
 
 
1334
    def test_clone_to_root(self):
 
1335
        orig_transport = self.get_transport()
 
1336
        # Repeatedly go up to a parent directory until we're at the root
 
1337
        # directory of this transport
 
1338
        root_transport = orig_transport
 
1339
        new_transport = root_transport.clone("..")
 
1340
        # as we are walking up directories, the path must be
 
1341
        # growing less, except at the top
 
1342
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1343
            or new_transport.base == root_transport.base)
 
1344
        while new_transport.base != root_transport.base:
 
1345
            root_transport = new_transport
 
1346
            new_transport = root_transport.clone("..")
 
1347
            # as we are walking up directories, the path must be
 
1348
            # growing less, except at the top
 
1349
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1350
                or new_transport.base == root_transport.base)
 
1351
 
 
1352
        # Cloning to "/" should take us to exactly the same location.
 
1353
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1354
        # the abspath of "/" from the original transport should be the same
 
1355
        # as the base at the root:
 
1356
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1357
 
 
1358
        # At the root, the URL must still end with / as its a directory
 
1359
        self.assertEqual(root_transport.base[-1], '/')
 
1360
 
 
1361
    def test_clone_from_root(self):
 
1362
        """At the root, cloning to a simple dir should just do string append."""
 
1363
        orig_transport = self.get_transport()
 
1364
        root_transport = orig_transport.clone('/')
 
1365
        self.assertEqual(root_transport.base + '.bzr/',
 
1366
            root_transport.clone('.bzr').base)
 
1367
 
 
1368
    def test_base_url(self):
 
1369
        t = self.get_transport()
 
1370
        self.assertEqual('/', t.base[-1])
817
1371
 
818
1372
    def test_relpath(self):
819
1373
        t = self.get_transport()
820
1374
        self.assertEqual('', t.relpath(t.base))
821
1375
        # base ends with /
822
1376
        self.assertEqual('', t.relpath(t.base[:-1]))
823
 
        # subdirs which dont exist should still give relpaths.
 
1377
        # subdirs which don't exist should still give relpaths.
824
1378
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
825
1379
        # trailing slash should be the same.
826
1380
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
842
1396
        # that have aliasing problems like symlinks should go in backend
843
1397
        # specific test cases.
844
1398
        transport = self.get_transport()
845
 
        
846
 
        # disabled because some transports might normalize urls in generating
847
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
 
1399
 
848
1400
        self.assertEqual(transport.base + 'relpath',
849
1401
                         transport.abspath('relpath'))
850
1402
 
 
1403
        # This should work without raising an error.
 
1404
        transport.abspath("/")
 
1405
 
 
1406
        # the abspath of "/" and "/foo/.." should result in the same location
 
1407
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1408
 
 
1409
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1410
                         transport.abspath("/foo"))
 
1411
 
 
1412
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1413
    def test_win32_abspath(self):
 
1414
        # Note: we tried to set sys.platform='win32' so we could test on
 
1415
        # other platforms too, but then osutils does platform specific
 
1416
        # things at import time which defeated us...
 
1417
        if sys.platform != 'win32':
 
1418
            raise TestSkipped(
 
1419
                'Testing drive letters in abspath implemented only for win32')
 
1420
 
 
1421
        # smoke test for abspath on win32.
 
1422
        # a transport based on 'file:///' never fully qualifies the drive.
 
1423
        transport = _mod_transport.get_transport_from_url("file:///")
 
1424
        self.assertEqual(transport.abspath("/"), "file:///")
 
1425
 
 
1426
        # but a transport that starts with a drive spec must keep it.
 
1427
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1428
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1429
 
 
1430
    def test_local_abspath(self):
 
1431
        transport = self.get_transport()
 
1432
        try:
 
1433
            p = transport.local_abspath('.')
 
1434
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1435
            # should be formattable
 
1436
            s = str(e)
 
1437
        else:
 
1438
            self.assertEqual(getcwd(), p)
 
1439
 
851
1440
    def test_abspath_at_root(self):
852
1441
        t = self.get_transport()
853
1442
        # clone all the way to the top
875
1464
                         'isolated/dir/',
876
1465
                         'isolated/dir/foo',
877
1466
                         'isolated/dir/bar',
 
1467
                         'isolated/dir/b%25z', # make sure quoting is correct
878
1468
                         'isolated/bar'],
879
1469
                        transport=transport)
880
1470
        paths = set(transport.iter_files_recursive())
882
1472
        self.assertEqual(paths,
883
1473
                    set(['isolated/dir/foo',
884
1474
                         'isolated/dir/bar',
 
1475
                         'isolated/dir/b%2525z',
885
1476
                         'isolated/bar']))
886
1477
        sub_transport = transport.clone('isolated')
887
1478
        paths = set(sub_transport.iter_files_recursive())
888
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
1479
        self.assertEqual(paths,
 
1480
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1481
 
 
1482
    def test_copy_tree(self):
 
1483
        # TODO: test file contents and permissions are preserved. This test was
 
1484
        # added just to ensure that quoting was handled correctly.
 
1485
        # -- David Allouche 2006-08-11
 
1486
        transport = self.get_transport()
 
1487
        if not transport.listable():
 
1488
            self.assertRaises(TransportNotPossible,
 
1489
                              transport.iter_files_recursive)
 
1490
            return
 
1491
        if transport.is_readonly():
 
1492
            return
 
1493
        self.build_tree(['from/',
 
1494
                         'from/dir/',
 
1495
                         'from/dir/foo',
 
1496
                         'from/dir/bar',
 
1497
                         'from/dir/b%25z', # make sure quoting is correct
 
1498
                         'from/bar'],
 
1499
                        transport=transport)
 
1500
        transport.copy_tree('from', 'to')
 
1501
        paths = set(transport.iter_files_recursive())
 
1502
        self.assertEqual(paths,
 
1503
                    set(['from/dir/foo',
 
1504
                         'from/dir/bar',
 
1505
                         'from/dir/b%2525z',
 
1506
                         'from/bar',
 
1507
                         'to/dir/foo',
 
1508
                         'to/dir/bar',
 
1509
                         'to/dir/b%2525z',
 
1510
                         'to/bar',]))
 
1511
 
 
1512
    def test_copy_tree_to_transport(self):
 
1513
        transport = self.get_transport()
 
1514
        if not transport.listable():
 
1515
            self.assertRaises(TransportNotPossible,
 
1516
                              transport.iter_files_recursive)
 
1517
            return
 
1518
        if transport.is_readonly():
 
1519
            return
 
1520
        self.build_tree(['from/',
 
1521
                         'from/dir/',
 
1522
                         'from/dir/foo',
 
1523
                         'from/dir/bar',
 
1524
                         'from/dir/b%25z', # make sure quoting is correct
 
1525
                         'from/bar'],
 
1526
                        transport=transport)
 
1527
        from_transport = transport.clone('from')
 
1528
        to_transport = transport.clone('to')
 
1529
        to_transport.ensure_base()
 
1530
        from_transport.copy_tree_to_transport(to_transport)
 
1531
        paths = set(transport.iter_files_recursive())
 
1532
        self.assertEqual(paths,
 
1533
                    set(['from/dir/foo',
 
1534
                         'from/dir/bar',
 
1535
                         'from/dir/b%2525z',
 
1536
                         'from/bar',
 
1537
                         'to/dir/foo',
 
1538
                         'to/dir/bar',
 
1539
                         'to/dir/b%2525z',
 
1540
                         'to/bar',]))
 
1541
 
 
1542
    def test_unicode_paths(self):
 
1543
        """Test that we can read/write files with Unicode names."""
 
1544
        t = self.get_transport()
 
1545
 
 
1546
        # With FAT32 and certain encodings on win32
 
1547
        # '\xe5' and '\xe4' actually map to the same file
 
1548
        # adding a suffix kicks in the 'preserving but insensitive'
 
1549
        # route, and maintains the right files
 
1550
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1551
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1552
                 u'\u017d', # Z with umlat iso-8859-2
 
1553
                 u'\u062c', # Arabic j
 
1554
                 u'\u0410', # Russian A
 
1555
                 u'\u65e5', # Kanji person
 
1556
                ]
 
1557
 
 
1558
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1559
        if no_unicode_support:
 
1560
            self.knownFailure("test server cannot handle unicode paths")
 
1561
 
 
1562
        try:
 
1563
            self.build_tree(files, transport=t, line_endings='binary')
 
1564
        except UnicodeError:
 
1565
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1566
 
 
1567
        # A plain unicode string is not a valid url
 
1568
        for fname in files:
 
1569
            self.assertRaises(InvalidURL, t.get, fname)
 
1570
 
 
1571
        for fname in files:
 
1572
            fname_utf8 = fname.encode('utf-8')
 
1573
            contents = 'contents of %s\n' % (fname_utf8,)
 
1574
            self.check_transport_contents(contents, t, urlutils.escape(fname))
889
1575
 
890
1576
    def test_connect_twice_is_same_content(self):
891
 
        # check that our server (whatever it is) is accessable reliably
 
1577
        # check that our server (whatever it is) is accessible reliably
892
1578
        # via get_transport and multiple connections share content.
893
1579
        transport = self.get_transport()
894
1580
        if transport.is_readonly():
895
1581
            return
896
 
        transport.put('foo', StringIO('bar'))
897
 
        transport2 = self.get_transport()
898
 
        self.check_transport_contents('bar', transport2, 'foo')
899
 
        # its base should be usable.
900
 
        transport2 = bzrlib.transport.get_transport(transport.base)
901
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1582
        transport.put_bytes('foo', 'bar')
 
1583
        transport3 = self.get_transport()
 
1584
        self.check_transport_contents('bar', transport3, 'foo')
902
1585
 
903
1586
        # now opening at a relative url should give use a sane result:
904
1587
        transport.mkdir('newdir')
905
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
906
 
        transport2 = transport2.clone('..')
907
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1588
        transport5 = self.get_transport('newdir')
 
1589
        transport6 = transport5.clone('..')
 
1590
        self.check_transport_contents('bar', transport6, 'foo')
908
1591
 
909
1592
    def test_lock_write(self):
 
1593
        """Test transport-level write locks.
 
1594
 
 
1595
        These are deprecated and transports may decline to support them.
 
1596
        """
910
1597
        transport = self.get_transport()
911
1598
        if transport.is_readonly():
912
1599
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
913
1600
            return
914
 
        transport.put('lock', StringIO())
915
 
        lock = transport.lock_write('lock')
 
1601
        transport.put_bytes('lock', '')
 
1602
        try:
 
1603
            lock = transport.lock_write('lock')
 
1604
        except TransportNotPossible:
 
1605
            return
916
1606
        # TODO make this consistent on all platforms:
917
1607
        # self.assertRaises(LockError, transport.lock_write, 'lock')
918
1608
        lock.unlock()
919
1609
 
920
1610
    def test_lock_read(self):
 
1611
        """Test transport-level read locks.
 
1612
 
 
1613
        These are deprecated and transports may decline to support them.
 
1614
        """
921
1615
        transport = self.get_transport()
922
1616
        if transport.is_readonly():
923
1617
            file('lock', 'w').close()
924
1618
        else:
925
 
            transport.put('lock', StringIO())
926
 
        lock = transport.lock_read('lock')
 
1619
            transport.put_bytes('lock', '')
 
1620
        try:
 
1621
            lock = transport.lock_read('lock')
 
1622
        except TransportNotPossible:
 
1623
            return
927
1624
        # TODO make this consistent on all platforms:
928
1625
        # self.assertRaises(LockError, transport.lock_read, 'lock')
929
1626
        lock.unlock()
931
1628
    def test_readv(self):
932
1629
        transport = self.get_transport()
933
1630
        if transport.is_readonly():
934
 
            file('a', 'w').write('0123456789')
 
1631
            with file('a', 'w') as f: f.write('0123456789')
935
1632
        else:
936
 
            transport.put('a', StringIO('01234567890'))
 
1633
            transport.put_bytes('a', '0123456789')
 
1634
 
 
1635
        d = list(transport.readv('a', ((0, 1),)))
 
1636
        self.assertEqual(d[0], (0, '0'))
937
1637
 
938
1638
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
939
1639
        self.assertEqual(d[0], (0, '0'))
940
1640
        self.assertEqual(d[1], (1, '1'))
941
1641
        self.assertEqual(d[2], (3, '34'))
942
1642
        self.assertEqual(d[3], (9, '9'))
 
1643
 
 
1644
    def test_readv_out_of_order(self):
 
1645
        transport = self.get_transport()
 
1646
        if transport.is_readonly():
 
1647
            with file('a', 'w') as f: f.write('0123456789')
 
1648
        else:
 
1649
            transport.put_bytes('a', '01234567890')
 
1650
 
 
1651
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1652
        self.assertEqual(d[0], (1, '1'))
 
1653
        self.assertEqual(d[1], (9, '9'))
 
1654
        self.assertEqual(d[2], (0, '0'))
 
1655
        self.assertEqual(d[3], (3, '34'))
 
1656
 
 
1657
    def test_readv_with_adjust_for_latency(self):
 
1658
        transport = self.get_transport()
 
1659
        # the adjust for latency flag expands the data region returned
 
1660
        # according to a per-transport heuristic, so testing is a little
 
1661
        # tricky as we need more data than the largest combining that our
 
1662
        # transports do. To accomodate this we generate random data and cross
 
1663
        # reference the returned data with the random data. To avoid doing
 
1664
        # multiple large random byte look ups we do several tests on the same
 
1665
        # backing data.
 
1666
        content = osutils.rand_bytes(200*1024)
 
1667
        content_size = len(content)
 
1668
        if transport.is_readonly():
 
1669
            self.build_tree_contents([('a', content)])
 
1670
        else:
 
1671
            transport.put_bytes('a', content)
 
1672
        def check_result_data(result_vector):
 
1673
            for item in result_vector:
 
1674
                data_len = len(item[1])
 
1675
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1676
 
 
1677
        # start corner case
 
1678
        result = list(transport.readv('a', ((0, 30),),
 
1679
            adjust_for_latency=True, upper_limit=content_size))
 
1680
        # we expect 1 result, from 0, to something > 30
 
1681
        self.assertEqual(1, len(result))
 
1682
        self.assertEqual(0, result[0][0])
 
1683
        self.assertTrue(len(result[0][1]) >= 30)
 
1684
        check_result_data(result)
 
1685
        # end of file corner case
 
1686
        result = list(transport.readv('a', ((204700, 100),),
 
1687
            adjust_for_latency=True, upper_limit=content_size))
 
1688
        # we expect 1 result, from 204800- its length, to the end
 
1689
        self.assertEqual(1, len(result))
 
1690
        data_len = len(result[0][1])
 
1691
        self.assertEqual(204800-data_len, result[0][0])
 
1692
        self.assertTrue(data_len >= 100)
 
1693
        check_result_data(result)
 
1694
        # out of order ranges are made in order
 
1695
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1696
            adjust_for_latency=True, upper_limit=content_size))
 
1697
        # we expect 2 results, in order, start and end.
 
1698
        self.assertEqual(2, len(result))
 
1699
        # start
 
1700
        data_len = len(result[0][1])
 
1701
        self.assertEqual(0, result[0][0])
 
1702
        self.assertTrue(data_len >= 30)
 
1703
        # end
 
1704
        data_len = len(result[1][1])
 
1705
        self.assertEqual(204800-data_len, result[1][0])
 
1706
        self.assertTrue(data_len >= 100)
 
1707
        check_result_data(result)
 
1708
        # close ranges get combined (even if out of order)
 
1709
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1710
            result = list(transport.readv('a', request_vector,
 
1711
                adjust_for_latency=True, upper_limit=content_size))
 
1712
            self.assertEqual(1, len(result))
 
1713
            data_len = len(result[0][1])
 
1714
            # minimum length is from 400 to 1034 - 634
 
1715
            self.assertTrue(data_len >= 634)
 
1716
            # must contain the region 400 to 1034
 
1717
            self.assertTrue(result[0][0] <= 400)
 
1718
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1719
            check_result_data(result)
 
1720
 
 
1721
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1722
        transport = self.get_transport()
 
1723
        # test from observed failure case.
 
1724
        if transport.is_readonly():
 
1725
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1726
        else:
 
1727
            transport.put_bytes('a', 'a'*1024*1024)
 
1728
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1729
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1730
            (465373, 800), (947422, 800)]
 
1731
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1732
        found_items = [False]*9
 
1733
        for pos, (start, length) in enumerate(broken_vector):
 
1734
            # check the range is covered by the result
 
1735
            for offset, data in results:
 
1736
                if offset <= start and start + length <= offset + len(data):
 
1737
                    found_items[pos] = True
 
1738
        self.assertEqual([True]*9, found_items)
 
1739
 
 
1740
    def test_get_with_open_write_stream_sees_all_content(self):
 
1741
        t = self.get_transport()
 
1742
        if t.is_readonly():
 
1743
            return
 
1744
        handle = t.open_write_stream('foo')
 
1745
        try:
 
1746
            handle.write('bcd')
 
1747
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1748
        finally:
 
1749
            handle.close()
 
1750
 
 
1751
    def test_get_smart_medium(self):
 
1752
        """All transports must either give a smart medium, or know they can't.
 
1753
        """
 
1754
        transport = self.get_transport()
 
1755
        try:
 
1756
            client_medium = transport.get_smart_medium()
 
1757
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1758
        except errors.NoSmartMedium:
 
1759
            # as long as we got it we're fine
 
1760
            pass
 
1761
 
 
1762
    def test_readv_short_read(self):
 
1763
        transport = self.get_transport()
 
1764
        if transport.is_readonly():
 
1765
            with file('a', 'w') as f: f.write('0123456789')
 
1766
        else:
 
1767
            transport.put_bytes('a', '01234567890')
 
1768
 
 
1769
        # This is intentionally reading off the end of the file
 
1770
        # since we are sure that it cannot get there
 
1771
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1772
                               # Can be raised by paramiko
 
1773
                               AssertionError),
 
1774
                              transport.readv, 'a', [(1,1), (8,10)])
 
1775
 
 
1776
        # This is trying to seek past the end of the file, it should
 
1777
        # also raise a special error
 
1778
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1779
                              transport.readv, 'a', [(12,2)])
 
1780
 
 
1781
    def test_no_segment_parameters(self):
 
1782
        """Segment parameters should be stripped and stored in
 
1783
        transport.segment_parameters."""
 
1784
        transport = self.get_transport("foo")
 
1785
        self.assertEquals({}, transport.get_segment_parameters())
 
1786
 
 
1787
    def test_segment_parameters(self):
 
1788
        """Segment parameters should be stripped and stored in
 
1789
        transport.get_segment_parameters()."""
 
1790
        base_url = self._server.get_url()
 
1791
        parameters = {"key1": "val1", "key2": "val2"}
 
1792
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1793
        transport = _mod_transport.get_transport_from_url(url)
 
1794
        self.assertEquals(parameters, transport.get_segment_parameters())
 
1795
 
 
1796
    def test_set_segment_parameters(self):
 
1797
        """Segment parameters can be set and show up in base."""
 
1798
        transport = self.get_transport("foo")
 
1799
        orig_base = transport.base
 
1800
        transport.set_segment_parameter("arm", "board")
 
1801
        self.assertEquals("%s,arm=board" % orig_base, transport.base)
 
1802
        self.assertEquals({"arm": "board"}, transport.get_segment_parameters())
 
1803
        transport.set_segment_parameter("arm", None)
 
1804
        transport.set_segment_parameter("nonexistant", None)
 
1805
        self.assertEquals({}, transport.get_segment_parameters())
 
1806
        self.assertEquals(orig_base, transport.base)
 
1807
 
 
1808
    def test_stat_symlink(self):
 
1809
        # if a transport points directly to a symlink (and supports symlinks
 
1810
        # at all) you can tell this.  helps with bug 32669.
 
1811
        t = self.get_transport()
 
1812
        try:
 
1813
            t.symlink('target', 'link')
 
1814
        except TransportNotPossible:
 
1815
            raise TestSkipped("symlinks not supported")
 
1816
        t2 = t.clone('link')
 
1817
        st = t2.stat('')
 
1818
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1819
 
 
1820
    def test_abspath_url_unquote_unreserved(self):
 
1821
        """URLs from abspath should have unreserved characters unquoted
 
1822
        
 
1823
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1824
        """
 
1825
        t = self.get_transport()
 
1826
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1827
        self.assertEqual(t.base + "-.09AZ_az~",
 
1828
            t.abspath(needlessly_escaped_dir))
 
1829
 
 
1830
    def test_clone_url_unquote_unreserved(self):
 
1831
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1832
        
 
1833
        Cloned transports should be prefix comparable for things like the
 
1834
        isolation checking of tests, see lp:842223 for more.
 
1835
        """
 
1836
        t1 = self.get_transport()
 
1837
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1838
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1839
        t2 = t1.clone(needlessly_escaped_dir)
 
1840
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1841
 
 
1842
    def test_hook_post_connection_one(self):
 
1843
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t = self.get_transport()
 
1847
        self.assertEqual([], log)
 
1848
        t.has("non-existant")
 
1849
        if isinstance(t, RemoteTransport):
 
1850
            self.assertEqual([t.get_smart_medium()], log)
 
1851
        elif isinstance(t, ConnectedTransport):
 
1852
            self.assertEqual([t], log)
 
1853
        else:
 
1854
            self.assertEqual([], log)
 
1855
 
 
1856
    def test_hook_post_connection_multi(self):
 
1857
        """Fire post_connect hook once per unshared underlying connection"""
 
1858
        log = []
 
1859
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1860
        t1 = self.get_transport()
 
1861
        t2 = t1.clone(".")
 
1862
        t3 = self.get_transport()
 
1863
        self.assertEqual([], log)
 
1864
        t1.has("x")
 
1865
        t2.has("x")
 
1866
        t3.has("x")
 
1867
        if isinstance(t1, RemoteTransport):
 
1868
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1869
        elif isinstance(t1, ConnectedTransport):
 
1870
            self.assertEqual([t1, t3], log)
 
1871
        else:
 
1872
            self.assertEqual([], log)