~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Bugfix WorkingTree.remove to handle subtrees, and non-cwd trees

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from cStringIO import StringIO
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
19
18
import os
20
 
import subprocess
21
19
import sys
22
 
import threading
 
20
import stat
 
21
from cStringIO import StringIO
23
22
 
 
23
import bzrlib
24
24
from bzrlib import (
25
25
    errors,
26
 
    osutils,
27
 
    tests,
28
 
    transport,
29
26
    urlutils,
30
27
    )
31
 
from bzrlib.directory_service import directories
32
 
from bzrlib.transport import (
33
 
    chroot,
34
 
    fakenfs,
35
 
    http,
36
 
    local,
37
 
    location_to_url,
38
 
    memory,
39
 
    pathfilter,
40
 
    readonly,
41
 
    )
42
 
import bzrlib.transport.trace
43
 
from bzrlib.tests import (
44
 
    features,
45
 
    test_server,
46
 
    )
 
28
from bzrlib.errors import (ConnectionError,
 
29
                           DependencyNotPresent,
 
30
                           FileExists,
 
31
                           InvalidURLJoin,
 
32
                           NoSuchFile,
 
33
                           PathNotChild,
 
34
                           TransportNotPossible,
 
35
                           UnsupportedProtocol,
 
36
                           )
 
37
from bzrlib.tests import TestCase, TestCaseInTempDir
 
38
from bzrlib.transport import (_CoalescedOffset,
 
39
                              _get_protocol_handlers,
 
40
                              _set_protocol_handlers,
 
41
                              _get_transport_modules,
 
42
                              get_transport,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              _clear_protocol_handlers,
 
46
                              Transport,
 
47
                              )
 
48
from bzrlib.transport.chroot import ChrootServer
 
49
from bzrlib.transport.memory import MemoryTransport
 
50
from bzrlib.transport.local import (LocalTransport,
 
51
                                    EmulatedWin32LocalTransport)
47
52
 
48
53
 
49
54
# TODO: Should possibly split transport-specific tests into their own files.
50
55
 
51
56
 
52
 
class TestTransport(tests.TestCase):
 
57
class TestTransport(TestCase):
53
58
    """Test the non transport-concrete class functionality."""
54
59
 
55
60
    def test__get_set_protocol_handlers(self):
56
 
        handlers = transport._get_protocol_handlers()
57
 
        self.assertNotEqual([], handlers.keys())
58
 
        transport._clear_protocol_handlers()
59
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
60
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
61
        handlers = _get_protocol_handlers()
 
62
        self.assertNotEqual([], handlers.keys( ))
 
63
        try:
 
64
            _clear_protocol_handlers()
 
65
            self.assertEqual([], _get_protocol_handlers().keys())
 
66
        finally:
 
67
            _set_protocol_handlers(handlers)
61
68
 
62
69
    def test_get_transport_modules(self):
63
 
        handlers = transport._get_protocol_handlers()
64
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
 
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
67
 
 
 
70
        handlers = _get_protocol_handlers()
68
71
        class SampleHandler(object):
69
72
            """I exist, isnt that enough?"""
70
 
        transport._clear_protocol_handlers()
71
 
        transport.register_transport_proto('foo')
72
 
        transport.register_lazy_transport('foo',
73
 
                                            'bzrlib.tests.test_transport',
74
 
                                            'TestTransport.SampleHandler')
75
 
        transport.register_transport_proto('bar')
76
 
        transport.register_lazy_transport('bar',
77
 
                                            'bzrlib.tests.test_transport',
78
 
                                            'TestTransport.SampleHandler')
79
 
        self.assertEqual([SampleHandler.__module__,
80
 
                            'bzrlib.transport.chroot',
81
 
                            'bzrlib.transport.pathfilter'],
82
 
                            transport._get_transport_modules())
 
73
        try:
 
74
            _clear_protocol_handlers()
 
75
            register_transport_proto('foo')
 
76
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
77
            register_transport_proto('bar')
 
78
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
79
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
80
                             _get_transport_modules())
 
81
        finally:
 
82
            _set_protocol_handlers(handlers)
83
83
 
84
84
    def test_transport_dependency(self):
85
85
        """Transport with missing dependency causes no error"""
86
 
        saved_handlers = transport._get_protocol_handlers()
87
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
88
 
        # don't pollute the current handlers
89
 
        transport._clear_protocol_handlers()
90
 
        transport.register_transport_proto('foo')
91
 
        transport.register_lazy_transport(
92
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
86
        saved_handlers = _get_protocol_handlers()
93
87
        try:
94
 
            transport.get_transport_from_url('foo://fooserver/foo')
95
 
        except errors.UnsupportedProtocol, e:
96
 
            e_str = str(e)
97
 
            self.assertEquals('Unsupported protocol'
98
 
                                ' for url "foo://fooserver/foo":'
99
 
                                ' Unable to import library "some_lib":'
100
 
                                ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
103
 
 
 
88
            register_transport_proto('foo')
 
89
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
90
                    'BadTransportHandler')
 
91
            try:
 
92
                get_transport('foo://fooserver/foo')
 
93
            except UnsupportedProtocol, e:
 
94
                e_str = str(e)
 
95
                self.assertEquals('Unsupported protocol'
 
96
                                  ' for url "foo://fooserver/foo":'
 
97
                                  ' Unable to import library "some_lib":'
 
98
                                  ' testing missing dependency', str(e))
 
99
            else:
 
100
                self.fail('Did not raise UnsupportedProtocol')
 
101
        finally:
 
102
            # restore original values
 
103
            _set_protocol_handlers(saved_handlers)
 
104
            
104
105
    def test_transport_fallback(self):
105
106
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
116
 
 
117
 
    def test_ssh_hints(self):
118
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
 
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol, e:
122
 
            e_str = str(e)
123
 
            self.assertEquals('Unsupported protocol'
124
 
                              ' for url "ssh://fooserver/foo":'
125
 
                              ' bzr supports bzr+ssh to operate over ssh,'
126
 
                              ' use "bzr+ssh://fooserver/foo".',
127
 
                              str(e))
128
 
        else:
129
 
            self.fail('Did not raise UnsupportedProtocol')
130
 
 
131
 
    def test_LateReadError(self):
132
 
        """The LateReadError helper should raise on read()."""
133
 
        a_file = transport.LateReadError('a path')
134
 
        try:
135
 
            a_file.read()
136
 
        except errors.ReadError, error:
137
 
            self.assertEqual('a path', error.path)
138
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
139
 
        a_file.close()
 
107
        saved_handlers = _get_protocol_handlers()
 
108
        try:
 
109
            _clear_protocol_handlers()
 
110
            register_transport_proto('foo')
 
111
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
112
                    'BackupTransportHandler')
 
113
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
114
                    'BadTransportHandler')
 
115
            t = get_transport('foo://fooserver/foo')
 
116
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
117
        finally:
 
118
            _set_protocol_handlers(saved_handlers)
 
119
 
 
120
    def test__combine_paths(self):
 
121
        t = Transport('/')
 
122
        self.assertEqual('/home/sarah/project/foo',
 
123
                         t._combine_paths('/home/sarah', 'project/foo'))
 
124
        self.assertEqual('/etc',
 
125
                         t._combine_paths('/home/sarah', '../../etc'))
 
126
        self.assertEqual('/etc',
 
127
                         t._combine_paths('/home/sarah', '../../../etc'))
 
128
        self.assertEqual('/etc',
 
129
                         t._combine_paths('/home/sarah', '/etc'))
140
130
 
141
131
    def test_local_abspath_non_local_transport(self):
142
132
        # the base implementation should throw
143
 
        t = memory.MemoryTransport()
 
133
        t = MemoryTransport()
144
134
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
145
135
        self.assertEqual('memory:///t is not a local path.', str(e))
146
136
 
147
137
 
148
 
class TestCoalesceOffsets(tests.TestCase):
149
 
 
150
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
151
 
        coalesce = transport.Transport._coalesce_offsets
152
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
153
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
154
 
                            max_size=max_size))
 
138
class TestCoalesceOffsets(TestCase):
 
139
    
 
140
    def check(self, expected, offsets, limit=0, fudge=0):
 
141
        coalesce = Transport._coalesce_offsets
 
142
        exp = [_CoalescedOffset(*x) for x in expected]
 
143
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
155
144
        self.assertEqual(exp, out)
156
145
 
157
146
    def test_coalesce_empty(self):
164
153
        self.check([(0, 10, [(0, 10)]),
165
154
                    (20, 10, [(0, 10)]),
166
155
                   ], [(0, 10), (20, 10)])
167
 
 
 
156
            
168
157
    def test_coalesce_unsorted(self):
169
158
        self.check([(20, 10, [(0, 10)]),
170
159
                    (0, 10, [(0, 10)]),
175
164
                   [(0, 10), (10, 10)])
176
165
 
177
166
    def test_coalesce_overlapped(self):
178
 
        self.assertRaises(ValueError,
179
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
180
 
                        [(0, 10), (5, 10)])
 
167
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
168
                   [(0, 10), (5, 10)])
181
169
 
182
170
    def test_coalesce_limit(self):
183
171
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
200
188
 
201
189
    def test_coalesce_fudge(self):
202
190
        self.check([(10, 30, [(0, 10), (20, 10)]),
203
 
                    (100, 10, [(0, 10)]),
 
191
                    (100, 10, [(0, 10),]),
204
192
                   ], [(10, 10), (30, 10), (100, 10)],
205
 
                   fudge=10)
206
 
 
207
 
    def test_coalesce_max_size(self):
208
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
209
 
                    (30, 50, [(0, 50)]),
210
 
                    # If one range is above max_size, it gets its own coalesced
211
 
                    # offset
212
 
                    (100, 80, [(0, 80)]),],
213
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
214
 
                   max_size=50)
215
 
 
216
 
    def test_coalesce_no_max_size(self):
217
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
218
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
193
                   fudge=10
219
194
                  )
220
195
 
221
 
    def test_coalesce_default_limit(self):
222
 
        # By default we use a 100MB max size.
223
 
        ten_mb = 10 * 1024 * 1024
224
 
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
225
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
226
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
227
 
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
228
 
                   [(i * ten_mb, ten_mb) for i in range(11)],
229
 
                   max_size=1*1024*1024*1024)
230
 
 
231
 
 
232
 
class TestMemoryServer(tests.TestCase):
233
 
 
234
 
    def test_create_server(self):
235
 
        server = memory.MemoryServer()
236
 
        server.start_server()
237
 
        url = server.get_url()
238
 
        self.assertTrue(url in transport.transport_list_registry)
239
 
        t = transport.get_transport_from_url(url)
240
 
        del t
241
 
        server.stop_server()
242
 
        self.assertFalse(url in transport.transport_list_registry)
243
 
        self.assertRaises(errors.UnsupportedProtocol,
244
 
                          transport.get_transport, url)
245
 
 
246
 
 
247
 
class TestMemoryTransport(tests.TestCase):
 
196
 
 
197
class TestMemoryTransport(TestCase):
248
198
 
249
199
    def test_get_transport(self):
250
 
        memory.MemoryTransport()
 
200
        MemoryTransport()
251
201
 
252
202
    def test_clone(self):
253
 
        t = memory.MemoryTransport()
254
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
255
 
        self.assertEqual("memory:///", t.clone("/").base)
 
203
        transport = MemoryTransport()
 
204
        self.assertTrue(isinstance(transport, MemoryTransport))
 
205
        self.assertEqual("memory:///", transport.clone("/").base)
256
206
 
257
207
    def test_abspath(self):
258
 
        t = memory.MemoryTransport()
259
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
208
        transport = MemoryTransport()
 
209
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
260
210
 
261
211
    def test_abspath_of_root(self):
262
 
        t = memory.MemoryTransport()
263
 
        self.assertEqual("memory:///", t.base)
264
 
        self.assertEqual("memory:///", t.abspath('/'))
 
212
        transport = MemoryTransport()
 
213
        self.assertEqual("memory:///", transport.base)
 
214
        self.assertEqual("memory:///", transport.abspath('/'))
265
215
 
266
216
    def test_abspath_of_relpath_starting_at_root(self):
267
 
        t = memory.MemoryTransport()
268
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
217
        transport = MemoryTransport()
 
218
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
269
219
 
270
220
    def test_append_and_get(self):
271
 
        t = memory.MemoryTransport()
272
 
        t.append_bytes('path', 'content')
273
 
        self.assertEqual(t.get('path').read(), 'content')
274
 
        t.append_file('path', StringIO('content'))
275
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
221
        transport = MemoryTransport()
 
222
        transport.append_bytes('path', 'content')
 
223
        self.assertEqual(transport.get('path').read(), 'content')
 
224
        transport.append_file('path', StringIO('content'))
 
225
        self.assertEqual(transport.get('path').read(), 'contentcontent')
276
226
 
277
227
    def test_put_and_get(self):
278
 
        t = memory.MemoryTransport()
279
 
        t.put_file('path', StringIO('content'))
280
 
        self.assertEqual(t.get('path').read(), 'content')
281
 
        t.put_bytes('path', 'content')
282
 
        self.assertEqual(t.get('path').read(), 'content')
 
228
        transport = MemoryTransport()
 
229
        transport.put_file('path', StringIO('content'))
 
230
        self.assertEqual(transport.get('path').read(), 'content')
 
231
        transport.put_bytes('path', 'content')
 
232
        self.assertEqual(transport.get('path').read(), 'content')
283
233
 
284
234
    def test_append_without_dir_fails(self):
285
 
        t = memory.MemoryTransport()
286
 
        self.assertRaises(errors.NoSuchFile,
287
 
                          t.append_bytes, 'dir/path', 'content')
 
235
        transport = MemoryTransport()
 
236
        self.assertRaises(NoSuchFile,
 
237
                          transport.append_bytes, 'dir/path', 'content')
288
238
 
289
239
    def test_put_without_dir_fails(self):
290
 
        t = memory.MemoryTransport()
291
 
        self.assertRaises(errors.NoSuchFile,
292
 
                          t.put_file, 'dir/path', StringIO('content'))
 
240
        transport = MemoryTransport()
 
241
        self.assertRaises(NoSuchFile,
 
242
                          transport.put_file, 'dir/path', StringIO('content'))
293
243
 
294
244
    def test_get_missing(self):
295
 
        transport = memory.MemoryTransport()
296
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
245
        transport = MemoryTransport()
 
246
        self.assertRaises(NoSuchFile, transport.get, 'foo')
297
247
 
298
248
    def test_has_missing(self):
299
 
        t = memory.MemoryTransport()
300
 
        self.assertEquals(False, t.has('foo'))
 
249
        transport = MemoryTransport()
 
250
        self.assertEquals(False, transport.has('foo'))
301
251
 
302
252
    def test_has_present(self):
303
 
        t = memory.MemoryTransport()
304
 
        t.append_bytes('foo', 'content')
305
 
        self.assertEquals(True, t.has('foo'))
 
253
        transport = MemoryTransport()
 
254
        transport.append_bytes('foo', 'content')
 
255
        self.assertEquals(True, transport.has('foo'))
306
256
 
307
257
    def test_list_dir(self):
308
 
        t = memory.MemoryTransport()
309
 
        t.put_bytes('foo', 'content')
310
 
        t.mkdir('dir')
311
 
        t.put_bytes('dir/subfoo', 'content')
312
 
        t.put_bytes('dirlike', 'content')
 
258
        transport = MemoryTransport()
 
259
        transport.put_bytes('foo', 'content')
 
260
        transport.mkdir('dir')
 
261
        transport.put_bytes('dir/subfoo', 'content')
 
262
        transport.put_bytes('dirlike', 'content')
313
263
 
314
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
315
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
264
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
265
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
316
266
 
317
267
    def test_mkdir(self):
318
 
        t = memory.MemoryTransport()
319
 
        t.mkdir('dir')
320
 
        t.append_bytes('dir/path', 'content')
321
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
268
        transport = MemoryTransport()
 
269
        transport.mkdir('dir')
 
270
        transport.append_bytes('dir/path', 'content')
 
271
        self.assertEqual(transport.get('dir/path').read(), 'content')
322
272
 
323
273
    def test_mkdir_missing_parent(self):
324
 
        t = memory.MemoryTransport()
325
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
274
        transport = MemoryTransport()
 
275
        self.assertRaises(NoSuchFile,
 
276
                          transport.mkdir, 'dir/dir')
326
277
 
327
278
    def test_mkdir_twice(self):
328
 
        t = memory.MemoryTransport()
329
 
        t.mkdir('dir')
330
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
279
        transport = MemoryTransport()
 
280
        transport.mkdir('dir')
 
281
        self.assertRaises(FileExists, transport.mkdir, 'dir')
331
282
 
332
283
    def test_parameters(self):
333
 
        t = memory.MemoryTransport()
334
 
        self.assertEqual(True, t.listable())
335
 
        self.assertEqual(False, t.is_readonly())
 
284
        transport = MemoryTransport()
 
285
        self.assertEqual(True, transport.listable())
 
286
        self.assertEqual(False, transport.should_cache())
 
287
        self.assertEqual(False, transport.is_readonly())
336
288
 
337
289
    def test_iter_files_recursive(self):
338
 
        t = memory.MemoryTransport()
339
 
        t.mkdir('dir')
340
 
        t.put_bytes('dir/foo', 'content')
341
 
        t.put_bytes('dir/bar', 'content')
342
 
        t.put_bytes('bar', 'content')
343
 
        paths = set(t.iter_files_recursive())
 
290
        transport = MemoryTransport()
 
291
        transport.mkdir('dir')
 
292
        transport.put_bytes('dir/foo', 'content')
 
293
        transport.put_bytes('dir/bar', 'content')
 
294
        transport.put_bytes('bar', 'content')
 
295
        paths = set(transport.iter_files_recursive())
344
296
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
345
297
 
346
298
    def test_stat(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.put_bytes('foo', 'content')
349
 
        t.put_bytes('bar', 'phowar')
350
 
        self.assertEqual(7, t.stat('foo').st_size)
351
 
        self.assertEqual(6, t.stat('bar').st_size)
352
 
 
353
 
 
354
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
299
        transport = MemoryTransport()
 
300
        transport.put_bytes('foo', 'content')
 
301
        transport.put_bytes('bar', 'phowar')
 
302
        self.assertEqual(7, transport.stat('foo').st_size)
 
303
        self.assertEqual(6, transport.stat('bar').st_size)
 
304
 
 
305
 
 
306
class ChrootDecoratorTransportTest(TestCase):
355
307
    """Chroot decoration specific tests."""
356
308
 
357
309
    def test_abspath(self):
358
310
        # The abspath is always relative to the chroot_url.
359
 
        server = chroot.ChrootServer(
360
 
            transport.get_transport_from_url('memory:///foo/bar/'))
361
 
        self.start_server(server)
362
 
        t = transport.get_transport_from_url(server.get_url())
363
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
311
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
312
        server.setUp()
 
313
        transport = get_transport(server.get_url())
 
314
        self.assertEqual(server.get_url(), transport.abspath('/'))
364
315
 
365
 
        subdir_t = t.clone('subdir')
366
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
316
        subdir_transport = transport.clone('subdir')
 
317
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
318
        server.tearDown()
367
319
 
368
320
    def test_clone(self):
369
 
        server = chroot.ChrootServer(
370
 
            transport.get_transport_from_url('memory:///foo/bar/'))
371
 
        self.start_server(server)
372
 
        t = transport.get_transport_from_url(server.get_url())
 
321
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
322
        server.setUp()
 
323
        transport = get_transport(server.get_url())
373
324
        # relpath from root and root path are the same
374
 
        relpath_cloned = t.clone('foo')
375
 
        abspath_cloned = t.clone('/foo')
 
325
        relpath_cloned = transport.clone('foo')
 
326
        abspath_cloned = transport.clone('/foo')
376
327
        self.assertEqual(server, relpath_cloned.server)
377
328
        self.assertEqual(server, abspath_cloned.server)
378
 
 
 
329
        server.tearDown()
 
330
    
379
331
    def test_chroot_url_preserves_chroot(self):
380
332
        """Calling get_transport on a chroot transport's base should produce a
381
333
        transport with exactly the same behaviour as the original chroot
384
336
        This is so that it is not possible to escape a chroot by doing::
385
337
            url = chroot_transport.base
386
338
            parent_url = urlutils.join(url, '..')
387
 
            new_t = transport.get_transport_from_url(parent_url)
 
339
            new_transport = get_transport(parent_url)
388
340
        """
389
 
        server = chroot.ChrootServer(
390
 
            transport.get_transport_from_url('memory:///path/subpath'))
391
 
        self.start_server(server)
392
 
        t = transport.get_transport_from_url(server.get_url())
393
 
        new_t = transport.get_transport_from_url(t.base)
394
 
        self.assertEqual(t.server, new_t.server)
395
 
        self.assertEqual(t.base, new_t.base)
396
 
 
 
341
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
342
        server.setUp()
 
343
        transport = get_transport(server.get_url())
 
344
        new_transport = get_transport(transport.base)
 
345
        self.assertEqual(transport.server, new_transport.server)
 
346
        self.assertEqual(transport.base, new_transport.base)
 
347
        server.tearDown()
 
348
        
397
349
    def test_urljoin_preserves_chroot(self):
398
350
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
399
351
        URL that escapes the intended chroot.
401
353
        This is so that it is not possible to escape a chroot by doing::
402
354
            url = chroot_transport.base
403
355
            parent_url = urlutils.join(url, '..')
404
 
            new_t = transport.get_transport_from_url(parent_url)
 
356
            new_transport = get_transport(parent_url)
405
357
        """
406
 
        server = chroot.ChrootServer(
407
 
            transport.get_transport_from_url('memory:///path/'))
408
 
        self.start_server(server)
409
 
        t = transport.get_transport_from_url(server.get_url())
 
358
        server = ChrootServer(get_transport('memory:///path/'))
 
359
        server.setUp()
 
360
        transport = get_transport(server.get_url())
410
361
        self.assertRaises(
411
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
412
 
 
413
 
 
414
 
class TestChrootServer(tests.TestCase):
 
362
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
363
        server.tearDown()
 
364
 
 
365
 
 
366
class ChrootServerTest(TestCase):
415
367
 
416
368
    def test_construct(self):
417
 
        backing_transport = memory.MemoryTransport()
418
 
        server = chroot.ChrootServer(backing_transport)
 
369
        backing_transport = MemoryTransport()
 
370
        server = ChrootServer(backing_transport)
419
371
        self.assertEqual(backing_transport, server.backing_transport)
420
372
 
421
373
    def test_setUp(self):
422
 
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
424
 
        server.start_server()
425
 
        self.addCleanup(server.stop_server)
426
 
        self.assertTrue(server.scheme
427
 
                        in transport._get_protocol_handlers().keys())
 
374
        backing_transport = MemoryTransport()
 
375
        server = ChrootServer(backing_transport)
 
376
        server.setUp()
 
377
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
428
378
 
429
 
    def test_stop_server(self):
430
 
        backing_transport = memory.MemoryTransport()
431
 
        server = chroot.ChrootServer(backing_transport)
432
 
        server.start_server()
433
 
        server.stop_server()
434
 
        self.assertFalse(server.scheme
435
 
                         in transport._get_protocol_handlers().keys())
 
379
    def test_tearDown(self):
 
380
        backing_transport = MemoryTransport()
 
381
        server = ChrootServer(backing_transport)
 
382
        server.setUp()
 
383
        server.tearDown()
 
384
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
436
385
 
437
386
    def test_get_url(self):
438
 
        backing_transport = memory.MemoryTransport()
439
 
        server = chroot.ChrootServer(backing_transport)
440
 
        server.start_server()
441
 
        self.addCleanup(server.stop_server)
 
387
        backing_transport = MemoryTransport()
 
388
        server = ChrootServer(backing_transport)
 
389
        server.setUp()
442
390
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
443
 
 
444
 
 
445
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
446
 
    """Pathfilter decoration specific tests."""
447
 
 
448
 
    def test_abspath(self):
449
 
        # The abspath is always relative to the base of the backing transport.
450
 
        server = pathfilter.PathFilteringServer(
451
 
            transport.get_transport_from_url('memory:///foo/bar/'),
452
 
            lambda x: x)
453
 
        server.start_server()
454
 
        t = transport.get_transport_from_url(server.get_url())
455
 
        self.assertEqual(server.get_url(), t.abspath('/'))
456
 
 
457
 
        subdir_t = t.clone('subdir')
458
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
459
 
        server.stop_server()
460
 
 
461
 
    def make_pf_transport(self, filter_func=None):
462
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
463
 
 
464
 
        :param filter_func: by default this will be a no-op function.  Use this
465
 
            parameter to override it."""
466
 
        if filter_func is None:
467
 
            filter_func = lambda x: x
468
 
        server = pathfilter.PathFilteringServer(
469
 
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
470
 
        server.start_server()
471
 
        self.addCleanup(server.stop_server)
472
 
        return transport.get_transport_from_url(server.get_url())
473
 
 
474
 
    def test__filter(self):
475
 
        # _filter (with an identity func as filter_func) always returns
476
 
        # paths relative to the base of the backing transport.
477
 
        t = self.make_pf_transport()
478
 
        self.assertEqual('foo', t._filter('foo'))
479
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
480
 
        self.assertEqual('', t._filter('..'))
481
 
        self.assertEqual('', t._filter('/'))
482
 
        # The base of the pathfiltering transport is taken into account too.
483
 
        t = t.clone('subdir1/subdir2')
484
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
485
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
486
 
        self.assertEqual('subdir1', t._filter('..'))
487
 
        self.assertEqual('', t._filter('/'))
488
 
 
489
 
    def test_filter_invocation(self):
490
 
        filter_log = []
491
 
 
492
 
        def filter(path):
493
 
            filter_log.append(path)
494
 
            return path
495
 
        t = self.make_pf_transport(filter)
496
 
        t.has('abc')
497
 
        self.assertEqual(['abc'], filter_log)
498
 
        del filter_log[:]
499
 
        t.clone('abc').has('xyz')
500
 
        self.assertEqual(['abc/xyz'], filter_log)
501
 
        del filter_log[:]
502
 
        t.has('/abc')
503
 
        self.assertEqual(['abc'], filter_log)
504
 
 
505
 
    def test_clone(self):
506
 
        t = self.make_pf_transport()
507
 
        # relpath from root and root path are the same
508
 
        relpath_cloned = t.clone('foo')
509
 
        abspath_cloned = t.clone('/foo')
510
 
        self.assertEqual(t.server, relpath_cloned.server)
511
 
        self.assertEqual(t.server, abspath_cloned.server)
512
 
 
513
 
    def test_url_preserves_pathfiltering(self):
514
 
        """Calling get_transport on a pathfiltered transport's base should
515
 
        produce a transport with exactly the same behaviour as the original
516
 
        pathfiltered transport.
517
 
 
518
 
        This is so that it is not possible to escape (accidentally or
519
 
        otherwise) the filtering by doing::
520
 
            url = filtered_transport.base
521
 
            parent_url = urlutils.join(url, '..')
522
 
            new_t = transport.get_transport_from_url(parent_url)
523
 
        """
524
 
        t = self.make_pf_transport()
525
 
        new_t = transport.get_transport_from_url(t.base)
526
 
        self.assertEqual(t.server, new_t.server)
527
 
        self.assertEqual(t.base, new_t.base)
528
 
 
529
 
 
530
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
391
        server.tearDown()
 
392
 
 
393
 
 
394
class ReadonlyDecoratorTransportTest(TestCase):
531
395
    """Readonly decoration specific tests."""
532
396
 
533
397
    def test_local_parameters(self):
 
398
        import bzrlib.transport.readonly as readonly
534
399
        # connect to . in readonly mode
535
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
536
 
        self.assertEqual(True, t.listable())
537
 
        self.assertEqual(True, t.is_readonly())
 
400
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
401
        self.assertEqual(True, transport.listable())
 
402
        self.assertEqual(False, transport.should_cache())
 
403
        self.assertEqual(True, transport.is_readonly())
538
404
 
539
405
    def test_http_parameters(self):
540
 
        from bzrlib.tests.http_server import HttpServer
541
 
        # connect to '.' via http which is not listable
 
406
        from bzrlib.tests.HttpServer import HttpServer
 
407
        import bzrlib.transport.readonly as readonly
 
408
        # connect to . via http which is not listable
542
409
        server = HttpServer()
543
 
        self.start_server(server)
544
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
545
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
546
 
        self.assertEqual(False, t.listable())
547
 
        self.assertEqual(True, t.is_readonly())
548
 
 
549
 
 
550
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
410
        server.setUp()
 
411
        try:
 
412
            transport = get_transport('readonly+' + server.get_url())
 
413
            self.failUnless(isinstance(transport,
 
414
                                       readonly.ReadonlyTransportDecorator))
 
415
            self.assertEqual(False, transport.listable())
 
416
            self.assertEqual(True, transport.should_cache())
 
417
            self.assertEqual(True, transport.is_readonly())
 
418
        finally:
 
419
            server.tearDown()
 
420
 
 
421
 
 
422
class FakeNFSDecoratorTests(TestCaseInTempDir):
551
423
    """NFS decorator specific tests."""
552
424
 
553
425
    def get_nfs_transport(self, url):
 
426
        import bzrlib.transport.fakenfs as fakenfs
554
427
        # connect to url with nfs decoration
555
428
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
556
429
 
557
430
    def test_local_parameters(self):
558
 
        # the listable and is_readonly parameters
 
431
        # the listable, should_cache and is_readonly parameters
559
432
        # are not changed by the fakenfs decorator
560
 
        t = self.get_nfs_transport('.')
561
 
        self.assertEqual(True, t.listable())
562
 
        self.assertEqual(False, t.is_readonly())
 
433
        transport = self.get_nfs_transport('.')
 
434
        self.assertEqual(True, transport.listable())
 
435
        self.assertEqual(False, transport.should_cache())
 
436
        self.assertEqual(False, transport.is_readonly())
563
437
 
564
438
    def test_http_parameters(self):
565
 
        # the listable and is_readonly parameters
 
439
        # the listable, should_cache and is_readonly parameters
566
440
        # are not changed by the fakenfs decorator
567
 
        from bzrlib.tests.http_server import HttpServer
568
 
        # connect to '.' via http which is not listable
 
441
        from bzrlib.tests.HttpServer import HttpServer
 
442
        # connect to . via http which is not listable
569
443
        server = HttpServer()
570
 
        self.start_server(server)
571
 
        t = self.get_nfs_transport(server.get_url())
572
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
573
 
        self.assertEqual(False, t.listable())
574
 
        self.assertEqual(True, t.is_readonly())
 
444
        server.setUp()
 
445
        try:
 
446
            transport = self.get_nfs_transport(server.get_url())
 
447
            self.assertIsInstance(
 
448
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
449
            self.assertEqual(False, transport.listable())
 
450
            self.assertEqual(True, transport.should_cache())
 
451
            self.assertEqual(True, transport.is_readonly())
 
452
        finally:
 
453
            server.tearDown()
575
454
 
576
455
    def test_fakenfs_server_default(self):
577
456
        # a FakeNFSServer() should bring up a local relpath server for itself
578
 
        server = test_server.FakeNFSServer()
579
 
        self.start_server(server)
580
 
        # the url should be decorated appropriately
581
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
582
 
        # and we should be able to get a transport for it
583
 
        t = transport.get_transport_from_url(server.get_url())
584
 
        # which must be a FakeNFSTransportDecorator instance.
585
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
457
        import bzrlib.transport.fakenfs as fakenfs
 
458
        server = fakenfs.FakeNFSServer()
 
459
        server.setUp()
 
460
        try:
 
461
            # the url should be decorated appropriately
 
462
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
463
            # and we should be able to get a transport for it
 
464
            transport = get_transport(server.get_url())
 
465
            # which must be a FakeNFSTransportDecorator instance.
 
466
            self.assertIsInstance(
 
467
                transport, fakenfs.FakeNFSTransportDecorator)
 
468
        finally:
 
469
            server.tearDown()
586
470
 
587
471
    def test_fakenfs_rename_semantics(self):
588
472
        # a FakeNFS transport must mangle the way rename errors occur to
589
473
        # look like NFS problems.
590
 
        t = self.get_nfs_transport('.')
 
474
        transport = self.get_nfs_transport('.')
591
475
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
592
 
                        transport=t)
593
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
594
 
 
595
 
 
596
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
476
                        transport=transport)
 
477
        self.assertRaises(errors.ResourceBusy,
 
478
                          transport.rename, 'from', 'to')
 
479
 
 
480
 
 
481
class FakeVFATDecoratorTests(TestCaseInTempDir):
597
482
    """Tests for simulation of VFAT restrictions"""
598
483
 
599
484
    def get_vfat_transport(self, url):
603
488
 
604
489
    def test_transport_creation(self):
605
490
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
606
 
        t = self.get_vfat_transport('.')
607
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
491
        transport = self.get_vfat_transport('.')
 
492
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
608
493
 
609
494
    def test_transport_mkdir(self):
610
 
        t = self.get_vfat_transport('.')
611
 
        t.mkdir('HELLO')
612
 
        self.assertTrue(t.has('hello'))
613
 
        self.assertTrue(t.has('Hello'))
 
495
        transport = self.get_vfat_transport('.')
 
496
        transport.mkdir('HELLO')
 
497
        self.assertTrue(transport.has('hello'))
 
498
        self.assertTrue(transport.has('Hello'))
614
499
 
615
500
    def test_forbidden_chars(self):
616
 
        t = self.get_vfat_transport('.')
617
 
        self.assertRaises(ValueError, t.has, "<NU>")
618
 
 
619
 
 
620
 
class BadTransportHandler(transport.Transport):
 
501
        transport = self.get_vfat_transport('.')
 
502
        self.assertRaises(ValueError, transport.has, "<NU>")
 
503
 
 
504
 
 
505
class BadTransportHandler(Transport):
621
506
    def __init__(self, base_url):
622
 
        raise errors.DependencyNotPresent('some_lib',
623
 
                                          'testing missing dependency')
624
 
 
625
 
 
626
 
class BackupTransportHandler(transport.Transport):
 
507
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
508
 
 
509
 
 
510
class BackupTransportHandler(Transport):
627
511
    """Test transport that works as a backup for the BadTransportHandler"""
628
512
    pass
629
513
 
630
514
 
631
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
515
class TestTransportImplementation(TestCaseInTempDir):
632
516
    """Implementation verification for transports.
633
 
 
 
517
    
634
518
    To verify a transport we need a server factory, which is a callable
635
519
    that accepts no parameters and returns an implementation of
636
520
    bzrlib.transport.Server.
637
 
 
 
521
    
638
522
    That Server is then used to construct transport instances and test
639
523
    the transport via loopback activity.
640
524
 
641
 
    Currently this assumes that the Transport object is connected to the
642
 
    current working directory.  So that whatever is done
643
 
    through the transport, should show up in the working
 
525
    Currently this assumes that the Transport object is connected to the 
 
526
    current working directory.  So that whatever is done 
 
527
    through the transport, should show up in the working 
644
528
    directory, and vice-versa. This is a bug, because its possible to have
645
 
    URL schemes which provide access to something that may not be
646
 
    result in storage on the local disk, i.e. due to file system limits, or
 
529
    URL schemes which provide access to something that may not be 
 
530
    result in storage on the local disk, i.e. due to file system limits, or 
647
531
    due to it being a database or some other non-filesystem tool.
648
532
 
649
533
    This also tests to make sure that the functions work with both
650
534
    generators and lists (assuming iter(list) is effectively a generator)
651
535
    """
652
 
 
 
536
    
653
537
    def setUp(self):
654
538
        super(TestTransportImplementation, self).setUp()
655
539
        self._server = self.transport_server()
656
 
        self.start_server(self._server)
657
 
 
658
 
    def get_transport(self, relpath=None):
659
 
        """Return a connected transport to the local directory.
660
 
 
661
 
        :param relpath: a path relative to the base url.
662
 
        """
 
540
        self._server.setUp()
 
541
        self.addCleanup(self._server.tearDown)
 
542
 
 
543
    def get_transport(self):
 
544
        """Return a connected transport to the local directory."""
663
545
        base_url = self._server.get_url()
664
 
        url = self._adjust_url(base_url, relpath)
665
546
        # try getting the transport via the regular interface:
666
 
        t = transport.get_transport_from_url(url)
667
 
        # vila--20070607 if the following are commented out the test suite
668
 
        # still pass. Is this really still needed or was it a forgotten
669
 
        # temporary fix ?
 
547
        t = get_transport(base_url)
670
548
        if not isinstance(t, self.transport_class):
671
549
            # we did not get the correct transport class type. Override the
672
550
            # regular connection behaviour by direct construction.
673
 
            t = self.transport_class(url)
 
551
            t = self.transport_class(base_url)
674
552
        return t
675
553
 
676
554
 
677
 
class TestTransportFromPath(tests.TestCaseInTempDir):
678
 
 
679
 
    def test_with_path(self):
680
 
        t = transport.get_transport_from_path(self.test_dir)
681
 
        self.assertIsInstance(t, local.LocalTransport)
682
 
        self.assertEquals(t.base.rstrip("/"),
683
 
            urlutils.local_path_to_url(self.test_dir))
684
 
 
685
 
    def test_with_url(self):
686
 
        t = transport.get_transport_from_path("file:")
687
 
        self.assertIsInstance(t, local.LocalTransport)
688
 
        self.assertEquals(t.base.rstrip("/"),
689
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
690
 
 
691
 
 
692
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
693
 
 
694
 
    def test_with_path(self):
695
 
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
696
 
            self.test_dir)
697
 
 
698
 
    def test_with_url(self):
699
 
        url = urlutils.local_path_to_url(self.test_dir)
700
 
        t = transport.get_transport_from_url(url)
701
 
        self.assertIsInstance(t, local.LocalTransport)
702
 
        self.assertEquals(t.base.rstrip("/"), url)
703
 
 
704
 
    def test_with_url_and_segment_parameters(self):
705
 
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
706
 
        t = transport.get_transport_from_url(url)
707
 
        self.assertIsInstance(t, local.LocalTransport)
708
 
        self.assertEquals(t.base.rstrip("/"), url)
709
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
710
 
            f.write("data")
711
 
        self.assertTrue(t.has("afile"))
712
 
 
713
 
 
714
 
class TestLocalTransports(tests.TestCase):
 
555
class TestLocalTransports(TestCase):
715
556
 
716
557
    def test_get_transport_from_abspath(self):
717
 
        here = osutils.abspath('.')
718
 
        t = transport.get_transport(here)
719
 
        self.assertIsInstance(t, local.LocalTransport)
 
558
        here = os.path.abspath('.')
 
559
        t = get_transport(here)
 
560
        self.assertIsInstance(t, LocalTransport)
720
561
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
721
562
 
722
563
    def test_get_transport_from_relpath(self):
723
 
        here = osutils.abspath('.')
724
 
        t = transport.get_transport('.')
725
 
        self.assertIsInstance(t, local.LocalTransport)
 
564
        here = os.path.abspath('.')
 
565
        t = get_transport('.')
 
566
        self.assertIsInstance(t, LocalTransport)
726
567
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
727
568
 
728
569
    def test_get_transport_from_local_url(self):
729
 
        here = osutils.abspath('.')
 
570
        here = os.path.abspath('.')
730
571
        here_url = urlutils.local_path_to_url(here) + '/'
731
 
        t = transport.get_transport(here_url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
 
572
        t = get_transport(here_url)
 
573
        self.assertIsInstance(t, LocalTransport)
733
574
        self.assertEquals(t.base, here_url)
734
575
 
735
576
    def test_local_abspath(self):
736
 
        here = osutils.abspath('.')
737
 
        t = transport.get_transport(here)
 
577
        here = os.path.abspath('.')
 
578
        t = get_transport(here)
738
579
        self.assertEquals(t.local_abspath(''), here)
739
580
 
740
581
 
741
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
742
 
 
743
 
    def test_local_fdatasync_calls_fdatasync(self):
744
 
        """Check fdatasync on a stream tries to flush the data to the OS.
745
 
        
746
 
        We can't easily observe the external effect but we can at least see
747
 
        it's called.
748
 
        """
749
 
        sentinel = object()
750
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
751
 
        if fdatasync is sentinel:
752
 
            raise tests.TestNotApplicable('fdatasync not supported')
753
 
        t = self.get_transport('.')
754
 
        calls = self.recordCalls(os, 'fdatasync')
755
 
        w = t.open_write_stream('out')
756
 
        w.write('foo')
757
 
        w.fdatasync()
758
 
        with open('out', 'rb') as f:
759
 
            # Should have been flushed.
760
 
            self.assertEquals(f.read(), 'foo')
761
 
        self.assertEquals(len(calls), 1, calls)
762
 
 
763
 
    def test_missing_directory(self):
764
 
        t = self.get_transport('.')
765
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
766
 
 
767
 
 
768
 
class TestWin32LocalTransport(tests.TestCase):
 
582
class TestWin32LocalTransport(TestCase):
769
583
 
770
584
    def test_unc_clone_to_root(self):
771
585
        # Win32 UNC path like \\HOST\path
772
586
        # clone to root should stop at least at \\HOST part
773
587
        # not on \\
774
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
588
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
775
589
        for i in xrange(4):
776
590
            t = t.clone('..')
777
591
        self.assertEquals(t.base, 'file://HOST/')
780
594
        self.assertEquals(t.base, 'file://HOST/')
781
595
 
782
596
 
783
 
class TestConnectedTransport(tests.TestCase):
784
 
    """Tests for connected to remote server transports"""
785
 
 
786
 
    def test_parse_url(self):
787
 
        t = transport.ConnectedTransport(
788
 
            'http://simple.example.com/home/source')
789
 
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
790
 
        self.assertEquals(t._parsed_url.port, None)
791
 
        self.assertEquals(t._parsed_url.path, '/home/source/')
792
 
        self.assertTrue(t._parsed_url.user is None)
793
 
        self.assertTrue(t._parsed_url.password is None)
794
 
 
795
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
796
 
 
797
 
    def test_parse_url_with_at_in_user(self):
798
 
        # Bug 228058
799
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
800
 
        self.assertEquals(t._parsed_url.user, 'user@host.com')
801
 
 
802
 
    def test_parse_quoted_url(self):
803
 
        t = transport.ConnectedTransport(
804
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
805
 
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
806
 
        self.assertEquals(t._parsed_url.port, 2222)
807
 
        self.assertEquals(t._parsed_url.user, 'robey')
808
 
        self.assertEquals(t._parsed_url.password, 'h@t')
809
 
        self.assertEquals(t._parsed_url.path, '/path/')
810
 
 
811
 
        # Base should not keep track of the password
812
 
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
813
 
 
814
 
    def test_parse_invalid_url(self):
815
 
        self.assertRaises(errors.InvalidURL,
816
 
                          transport.ConnectedTransport,
817
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
818
 
 
819
 
    def test_relpath(self):
820
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
821
 
 
822
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
823
 
            'sub')
824
 
        self.assertRaises(errors.PathNotChild, t.relpath,
825
 
                          'http://user@host.com/abs/path/sub')
826
 
        self.assertRaises(errors.PathNotChild, t.relpath,
827
 
                          'sftp://user2@host.com/abs/path/sub')
828
 
        self.assertRaises(errors.PathNotChild, t.relpath,
829
 
                          'sftp://user@otherhost.com/abs/path/sub')
830
 
        self.assertRaises(errors.PathNotChild, t.relpath,
831
 
                          'sftp://user@host.com:33/abs/path/sub')
832
 
        # Make sure it works when we don't supply a username
833
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
834
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
835
 
 
836
 
        # Make sure it works when parts of the path will be url encoded
837
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
838
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
839
 
 
840
 
    def test_connection_sharing_propagate_credentials(self):
841
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
842
 
        self.assertEquals('user', t._parsed_url.user)
843
 
        self.assertEquals('host.com', t._parsed_url.host)
844
 
        self.assertIs(None, t._get_connection())
845
 
        self.assertIs(None, t._parsed_url.password)
846
 
        c = t.clone('subdir')
847
 
        self.assertIs(None, c._get_connection())
848
 
        self.assertIs(None, t._parsed_url.password)
849
 
 
850
 
        # Simulate the user entering a password
851
 
        password = 'secret'
852
 
        connection = object()
853
 
        t._set_connection(connection, password)
854
 
        self.assertIs(connection, t._get_connection())
855
 
        self.assertIs(password, t._get_credentials())
856
 
        self.assertIs(connection, c._get_connection())
857
 
        self.assertIs(password, c._get_credentials())
858
 
 
859
 
        # credentials can be updated
860
 
        new_password = 'even more secret'
861
 
        c._update_credentials(new_password)
862
 
        self.assertIs(connection, t._get_connection())
863
 
        self.assertIs(new_password, t._get_credentials())
864
 
        self.assertIs(connection, c._get_connection())
865
 
        self.assertIs(new_password, c._get_credentials())
866
 
 
867
 
 
868
 
class TestReusedTransports(tests.TestCase):
869
 
    """Tests for transport reuse"""
870
 
 
871
 
    def test_reuse_same_transport(self):
872
 
        possible_transports = []
873
 
        t1 = transport.get_transport_from_url('http://foo/',
874
 
                                     possible_transports=possible_transports)
875
 
        self.assertEqual([t1], possible_transports)
876
 
        t2 = transport.get_transport_from_url('http://foo/',
877
 
                                     possible_transports=[t1])
878
 
        self.assertIs(t1, t2)
879
 
 
880
 
        # Also check that final '/' are handled correctly
881
 
        t3 = transport.get_transport_from_url('http://foo/path/')
882
 
        t4 = transport.get_transport_from_url('http://foo/path',
883
 
                                     possible_transports=[t3])
884
 
        self.assertIs(t3, t4)
885
 
 
886
 
        t5 = transport.get_transport_from_url('http://foo/path')
887
 
        t6 = transport.get_transport_from_url('http://foo/path/',
888
 
                                     possible_transports=[t5])
889
 
        self.assertIs(t5, t6)
890
 
 
891
 
    def test_don_t_reuse_different_transport(self):
892
 
        t1 = transport.get_transport_from_url('http://foo/path')
893
 
        t2 = transport.get_transport_from_url('http://bar/path',
894
 
                                     possible_transports=[t1])
895
 
        self.assertIsNot(t1, t2)
896
 
 
897
 
 
898
 
class TestTransportTrace(tests.TestCase):
899
 
 
900
 
    def test_decorator(self):
901
 
        t = transport.get_transport_from_url('trace+memory://')
902
 
        self.assertIsInstance(
903
 
            t, bzrlib.transport.trace.TransportTraceDecorator)
904
 
 
905
 
    def test_clone_preserves_activity(self):
906
 
        t = transport.get_transport_from_url('trace+memory://')
907
 
        t2 = t.clone('.')
908
 
        self.assertTrue(t is not t2)
909
 
        self.assertTrue(t._activity is t2._activity)
910
 
 
911
 
    # the following specific tests are for the operations that have made use of
912
 
    # logging in tests; we could test every single operation but doing that
913
 
    # still won't cause a test failure when the top level Transport API
914
 
    # changes; so there is little return doing that.
915
 
    def test_get(self):
916
 
        t = transport.get_transport_from_url('trace+memory:///')
917
 
        t.put_bytes('foo', 'barish')
918
 
        t.get('foo')
919
 
        expected_result = []
920
 
        # put_bytes records the bytes, not the content to avoid memory
921
 
        # pressure.
922
 
        expected_result.append(('put_bytes', 'foo', 6, None))
923
 
        # get records the file name only.
924
 
        expected_result.append(('get', 'foo'))
925
 
        self.assertEqual(expected_result, t._activity)
926
 
 
927
 
    def test_readv(self):
928
 
        t = transport.get_transport_from_url('trace+memory:///')
929
 
        t.put_bytes('foo', 'barish')
930
 
        list(t.readv('foo', [(0, 1), (3, 2)],
931
 
                     adjust_for_latency=True, upper_limit=6))
932
 
        expected_result = []
933
 
        # put_bytes records the bytes, not the content to avoid memory
934
 
        # pressure.
935
 
        expected_result.append(('put_bytes', 'foo', 6, None))
936
 
        # readv records the supplied offset request
937
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
938
 
        self.assertEqual(expected_result, t._activity)
939
 
 
940
 
 
941
 
class TestSSHConnections(tests.TestCaseWithTransport):
942
 
 
943
 
    def test_bzr_connect_to_bzr_ssh(self):
944
 
        """get_transport of a bzr+ssh:// behaves correctly.
945
 
 
946
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
947
 
        """
948
 
        # This test actually causes a bzr instance to be invoked, which is very
949
 
        # expensive: it should be the only such test in the test suite.
950
 
        # A reasonable evolution for this would be to simply check inside
951
 
        # check_channel_exec_request that the command is appropriate, and then
952
 
        # satisfy requests in-process.
953
 
        self.requireFeature(features.paramiko)
954
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
955
 
        # override the interface (doesn't change self._vendor).
956
 
        # Note that this does encryption, so can be slow.
957
 
        from bzrlib.tests import stub_sftp
958
 
 
959
 
        # Start an SSH server
960
 
        self.command_executed = []
961
 
        # XXX: This is horrible -- we define a really dumb SSH server that
962
 
        # executes commands, and manage the hooking up of stdin/out/err to the
963
 
        # SSH channel ourselves.  Surely this has already been implemented
964
 
        # elsewhere?
965
 
        started = []
966
 
 
967
 
        class StubSSHServer(stub_sftp.StubServer):
968
 
 
969
 
            test = self
970
 
 
971
 
            def check_channel_exec_request(self, channel, command):
972
 
                self.test.command_executed.append(command)
973
 
                proc = subprocess.Popen(
974
 
                    command, shell=True, stdin=subprocess.PIPE,
975
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
976
 
 
977
 
                # XXX: horribly inefficient, not to mention ugly.
978
 
                # Start a thread for each of stdin/out/err, and relay bytes
979
 
                # from the subprocess to channel and vice versa.
980
 
                def ferry_bytes(read, write, close):
981
 
                    while True:
982
 
                        bytes = read(1)
983
 
                        if bytes == '':
984
 
                            close()
985
 
                            break
986
 
                        write(bytes)
987
 
 
988
 
                file_functions = [
989
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
990
 
                    (proc.stdout.read, channel.sendall, channel.close),
991
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
992
 
                started.append(proc)
993
 
                for read, write, close in file_functions:
994
 
                    t = threading.Thread(
995
 
                        target=ferry_bytes, args=(read, write, close))
996
 
                    t.start()
997
 
                    started.append(t)
998
 
 
999
 
                return True
1000
 
 
1001
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1002
 
        # We *don't* want to override the default SSH vendor: the detected one
1003
 
        # is the one to use.
1004
 
 
1005
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1006
 
        # inherits from SFTPServer which forces the SSH vendor to
1007
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1008
 
        self.start_server(ssh_server)
1009
 
        port = ssh_server.port
1010
 
 
1011
 
        if sys.platform == 'win32':
1012
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
1013
 
        else:
1014
 
            bzr_remote_path = self.get_bzr_path()
1015
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1016
 
 
1017
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1018
 
        # variable is used to tell bzr what command to run on the remote end.
1019
 
        path_to_branch = osutils.abspath('.')
1020
 
        if sys.platform == 'win32':
1021
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
1022
 
            # prefix a '/' to get the right path.
1023
 
            path_to_branch = '/' + path_to_branch
1024
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1025
 
        t = transport.get_transport(url)
1026
 
        self.permit_url(t.base)
1027
 
        t.mkdir('foo')
1028
 
 
1029
 
        self.assertEqual(
1030
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1031
 
            self.command_executed)
1032
 
        # Make sure to disconnect, so that the remote process can stop, and we
1033
 
        # can cleanup. Then pause the test until everything is shutdown
1034
 
        t._client._medium.disconnect()
1035
 
        if not started:
1036
 
            return
1037
 
        # First wait for the subprocess
1038
 
        started[0].wait()
1039
 
        # And the rest are threads
1040
 
        for t in started[1:]:
1041
 
            t.join()
1042
 
 
1043
 
 
1044
 
class TestUnhtml(tests.TestCase):
1045
 
 
1046
 
    """Tests for unhtml_roughly"""
1047
 
 
1048
 
    def test_truncation(self):
1049
 
        fake_html = "<p>something!\n" * 1000
1050
 
        result = http.unhtml_roughly(fake_html)
1051
 
        self.assertEquals(len(result), 1000)
1052
 
        self.assertStartsWith(result, " something!")
1053
 
 
1054
 
 
1055
 
class SomeDirectory(object):
1056
 
 
1057
 
    def look_up(self, name, url):
1058
 
        return "http://bar"
1059
 
 
1060
 
 
1061
 
class TestLocationToUrl(tests.TestCase):
1062
 
 
1063
 
    def get_base_location(self):
1064
 
        path = osutils.abspath('/foo/bar')
1065
 
        if path.startswith('/'):
1066
 
            url = 'file://%s' % (path,)
1067
 
        else:
1068
 
            # On Windows, abspaths start with the drive letter, so we have to
1069
 
            # add in the extra '/'
1070
 
            url = 'file:///%s' % (path,)
1071
 
        return path, url
1072
 
 
1073
 
    def test_regular_url(self):
1074
 
        self.assertEquals("file://foo", location_to_url("file://foo"))
1075
 
 
1076
 
    def test_directory(self):
1077
 
        directories.register("bar:", SomeDirectory, "Dummy directory")
1078
 
        self.addCleanup(directories.remove, "bar:")
1079
 
        self.assertEquals("http://bar", location_to_url("bar:"))
1080
 
 
1081
 
    def test_unicode_url(self):
1082
 
        self.assertRaises(errors.InvalidURL, location_to_url,
1083
 
            "http://fo/\xc3\xaf".decode("utf-8"))
1084
 
 
1085
 
    def test_unicode_path(self):
1086
 
        path, url = self.get_base_location()
1087
 
        location = path + "\xc3\xaf".decode("utf-8")
1088
 
        url += '%C3%AF'
1089
 
        self.assertEquals(url, location_to_url(location))
1090
 
 
1091
 
    def test_path(self):
1092
 
        path, url = self.get_base_location()
1093
 
        self.assertEquals(url, location_to_url(path))
1094
 
 
1095
 
    def test_relative_file_url(self):
1096
 
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
1097
 
            location_to_url("file:bar"))
1098
 
 
1099
 
    def test_absolute_file_url(self):
1100
 
        self.assertEquals("file:///bar", location_to_url("file:/bar"))
 
597
def get_test_permutations():
 
598
    """Return transport permutations to be used in testing.
 
599
 
 
600
    This module registers some transports, but they're only for testing
 
601
    registration.  We don't really want to run all the transport tests against
 
602
    them.
 
603
    """
 
604
    return []