~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Patch Queue Manager
  • Date: 2011-09-22 14:12:18 UTC
  • mfrom: (6155.3.1 jam)
  • Revision ID: pqm@pqm.ubuntu.com-20110922141218-86s4uu6nqvourw4f
(jameinel) Cleanup comments bzrlib/smart/__init__.py (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
18
19
import os
 
20
import subprocess
19
21
import sys
20
 
import stat
21
 
from cStringIO import StringIO
 
22
import threading
22
23
 
23
 
import bzrlib
24
 
from bzrlib import urlutils
25
 
from bzrlib.errors import (NoSuchFile, FileExists,
26
 
                           TransportNotPossible,
27
 
                           ConnectionError,
28
 
                           DependencyNotPresent,
29
 
                           UnsupportedProtocol,
30
 
                           PathNotChild,
31
 
                           )
32
 
from bzrlib.tests import TestCase, TestCaseInTempDir
33
 
from bzrlib.transport import (_CoalescedOffset,
34
 
                              _get_protocol_handlers,
35
 
                              _get_transport_modules,
36
 
                              get_transport,
37
 
                              register_lazy_transport,
38
 
                              _set_protocol_handlers,
39
 
                              Transport,
40
 
                              )
41
 
from bzrlib.transport.memory import MemoryTransport
42
 
from bzrlib.transport.local import (LocalTransport,
43
 
                                    EmulatedWin32LocalTransport)
 
24
from bzrlib import (
 
25
    errors,
 
26
    osutils,
 
27
    tests,
 
28
    transport,
 
29
    urlutils,
 
30
    )
 
31
from bzrlib.directory_service import directories
 
32
from bzrlib.transport import (
 
33
    chroot,
 
34
    fakenfs,
 
35
    http,
 
36
    local,
 
37
    location_to_url,
 
38
    memory,
 
39
    pathfilter,
 
40
    readonly,
 
41
    )
 
42
import bzrlib.transport.trace
 
43
from bzrlib.tests import (
 
44
    features,
 
45
    test_server,
 
46
    )
44
47
 
45
48
 
46
49
# TODO: Should possibly split transport-specific tests into their own files.
47
50
 
48
51
 
49
 
class TestTransport(TestCase):
 
52
class TestTransport(tests.TestCase):
50
53
    """Test the non transport-concrete class functionality."""
51
54
 
52
55
    def test__get_set_protocol_handlers(self):
53
 
        handlers = _get_protocol_handlers()
54
 
        self.assertNotEqual({}, handlers)
55
 
        try:
56
 
            _set_protocol_handlers({})
57
 
            self.assertEqual({}, _get_protocol_handlers())
58
 
        finally:
59
 
            _set_protocol_handlers(handlers)
 
56
        handlers = transport._get_protocol_handlers()
 
57
        self.assertNotEqual([], handlers.keys())
 
58
        transport._clear_protocol_handlers()
 
59
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
60
        self.assertEqual([], transport._get_protocol_handlers().keys())
60
61
 
61
62
    def test_get_transport_modules(self):
62
 
        handlers = _get_protocol_handlers()
 
63
        handlers = transport._get_protocol_handlers()
 
64
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
65
        # don't pollute the current handlers
 
66
        transport._clear_protocol_handlers()
 
67
 
63
68
        class SampleHandler(object):
64
69
            """I exist, isnt that enough?"""
65
 
        try:
66
 
            my_handlers = {}
67
 
            _set_protocol_handlers(my_handlers)
68
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
69
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
70
 
            self.assertEqual([SampleHandler.__module__],
71
 
                             _get_transport_modules())
72
 
        finally:
73
 
            _set_protocol_handlers(handlers)
 
70
        transport._clear_protocol_handlers()
 
71
        transport.register_transport_proto('foo')
 
72
        transport.register_lazy_transport('foo',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        transport.register_transport_proto('bar')
 
76
        transport.register_lazy_transport('bar',
 
77
                                            'bzrlib.tests.test_transport',
 
78
                                            'TestTransport.SampleHandler')
 
79
        self.assertEqual([SampleHandler.__module__,
 
80
                            'bzrlib.transport.chroot',
 
81
                            'bzrlib.transport.pathfilter'],
 
82
                            transport._get_transport_modules())
74
83
 
75
84
    def test_transport_dependency(self):
76
85
        """Transport with missing dependency causes no error"""
77
 
        saved_handlers = _get_protocol_handlers()
 
86
        saved_handlers = transport._get_protocol_handlers()
 
87
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
88
        # don't pollute the current handlers
 
89
        transport._clear_protocol_handlers()
 
90
        transport.register_transport_proto('foo')
 
91
        transport.register_lazy_transport(
 
92
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
78
93
        try:
79
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
80
 
                    'BadTransportHandler')
81
 
            try:
82
 
                get_transport('foo://fooserver/foo')
83
 
            except UnsupportedProtocol, e:
84
 
                e_str = str(e)
85
 
                self.assertEquals('Unsupported protocol'
86
 
                                  ' for url "foo://fooserver/foo":'
87
 
                                  ' Unable to import library "some_lib":'
88
 
                                  ' testing missing dependency', str(e))
89
 
            else:
90
 
                self.fail('Did not raise UnsupportedProtocol')
91
 
        finally:
92
 
            # restore original values
93
 
            _set_protocol_handlers(saved_handlers)
94
 
            
 
94
            transport.get_transport_from_url('foo://fooserver/foo')
 
95
        except errors.UnsupportedProtocol, e:
 
96
            e_str = str(e)
 
97
            self.assertEquals('Unsupported protocol'
 
98
                                ' for url "foo://fooserver/foo":'
 
99
                                ' Unable to import library "some_lib":'
 
100
                                ' testing missing dependency', str(e))
 
101
        else:
 
102
            self.fail('Did not raise UnsupportedProtocol')
 
103
 
95
104
    def test_transport_fallback(self):
96
105
        """Transport with missing dependency causes no error"""
97
 
        saved_handlers = _get_protocol_handlers()
98
 
        try:
99
 
            _set_protocol_handlers({})
100
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
101
 
                    'BackupTransportHandler')
102
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
 
                    'BadTransportHandler')
104
 
            t = get_transport('foo://fooserver/foo')
105
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
106
 
        finally:
107
 
            _set_protocol_handlers(saved_handlers)
108
 
 
109
 
    def test__combine_paths(self):
110
 
        t = Transport('/')
111
 
        self.assertEqual('/home/sarah/project/foo',
112
 
                         t._combine_paths('/home/sarah', 'project/foo'))
113
 
        self.assertEqual('/etc',
114
 
                         t._combine_paths('/home/sarah', '../../etc'))
115
 
        self.assertEqual('/etc',
116
 
                         t._combine_paths('/home/sarah', '../../../etc'))
117
 
        self.assertEqual('/etc',
118
 
                         t._combine_paths('/home/sarah', '/etc'))
119
 
 
120
 
 
121
 
class TestCoalesceOffsets(TestCase):
122
 
    
123
 
    def check(self, expected, offsets, limit=0, fudge=0):
124
 
        coalesce = Transport._coalesce_offsets
125
 
        exp = [_CoalescedOffset(*x) for x in expected]
126
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
106
        saved_handlers = transport._get_protocol_handlers()
 
107
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
108
        transport._clear_protocol_handlers()
 
109
        transport.register_transport_proto('foo')
 
110
        transport.register_lazy_transport(
 
111
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
112
        transport.register_lazy_transport(
 
113
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
114
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
115
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
116
 
 
117
    def test_ssh_hints(self):
 
118
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
119
        try:
 
120
            transport.get_transport_from_url('ssh://fooserver/foo')
 
121
        except errors.UnsupportedProtocol, e:
 
122
            e_str = str(e)
 
123
            self.assertEquals('Unsupported protocol'
 
124
                              ' for url "ssh://fooserver/foo":'
 
125
                              ' bzr supports bzr+ssh to operate over ssh,'
 
126
                              ' use "bzr+ssh://fooserver/foo".',
 
127
                              str(e))
 
128
        else:
 
129
            self.fail('Did not raise UnsupportedProtocol')
 
130
 
 
131
    def test_LateReadError(self):
 
132
        """The LateReadError helper should raise on read()."""
 
133
        a_file = transport.LateReadError('a path')
 
134
        try:
 
135
            a_file.read()
 
136
        except errors.ReadError, error:
 
137
            self.assertEqual('a path', error.path)
 
138
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
139
        a_file.close()
 
140
 
 
141
    def test_local_abspath_non_local_transport(self):
 
142
        # the base implementation should throw
 
143
        t = memory.MemoryTransport()
 
144
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
145
        self.assertEqual('memory:///t is not a local path.', str(e))
 
146
 
 
147
 
 
148
class TestCoalesceOffsets(tests.TestCase):
 
149
 
 
150
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
151
        coalesce = transport.Transport._coalesce_offsets
 
152
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
153
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
154
                            max_size=max_size))
127
155
        self.assertEqual(exp, out)
128
156
 
129
157
    def test_coalesce_empty(self):
136
164
        self.check([(0, 10, [(0, 10)]),
137
165
                    (20, 10, [(0, 10)]),
138
166
                   ], [(0, 10), (20, 10)])
139
 
            
 
167
 
140
168
    def test_coalesce_unsorted(self):
141
169
        self.check([(20, 10, [(0, 10)]),
142
170
                    (0, 10, [(0, 10)]),
147
175
                   [(0, 10), (10, 10)])
148
176
 
149
177
    def test_coalesce_overlapped(self):
150
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
151
 
                   [(0, 10), (5, 10)])
 
178
        self.assertRaises(ValueError,
 
179
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
180
                        [(0, 10), (5, 10)])
152
181
 
153
182
    def test_coalesce_limit(self):
154
183
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
171
200
 
172
201
    def test_coalesce_fudge(self):
173
202
        self.check([(10, 30, [(0, 10), (20, 10)]),
174
 
                    (100, 10, [(0, 10),]),
 
203
                    (100, 10, [(0, 10)]),
175
204
                   ], [(10, 10), (30, 10), (100, 10)],
176
 
                   fudge=10
 
205
                   fudge=10)
 
206
 
 
207
    def test_coalesce_max_size(self):
 
208
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
209
                    (30, 50, [(0, 50)]),
 
210
                    # If one range is above max_size, it gets its own coalesced
 
211
                    # offset
 
212
                    (100, 80, [(0, 80)]),],
 
213
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
214
                   max_size=50)
 
215
 
 
216
    def test_coalesce_no_max_size(self):
 
217
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
218
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
177
219
                  )
178
220
 
179
 
 
180
 
class TestMemoryTransport(TestCase):
 
221
    def test_coalesce_default_limit(self):
 
222
        # By default we use a 100MB max size.
 
223
        ten_mb = 10 * 1024 * 1024
 
224
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
225
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
226
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
227
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
228
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
229
                   max_size=1*1024*1024*1024)
 
230
 
 
231
 
 
232
class TestMemoryServer(tests.TestCase):
 
233
 
 
234
    def test_create_server(self):
 
235
        server = memory.MemoryServer()
 
236
        server.start_server()
 
237
        url = server.get_url()
 
238
        self.assertTrue(url in transport.transport_list_registry)
 
239
        t = transport.get_transport_from_url(url)
 
240
        del t
 
241
        server.stop_server()
 
242
        self.assertFalse(url in transport.transport_list_registry)
 
243
        self.assertRaises(errors.UnsupportedProtocol,
 
244
                          transport.get_transport, url)
 
245
 
 
246
 
 
247
class TestMemoryTransport(tests.TestCase):
181
248
 
182
249
    def test_get_transport(self):
183
 
        MemoryTransport()
 
250
        memory.MemoryTransport()
184
251
 
185
252
    def test_clone(self):
186
 
        transport = MemoryTransport()
187
 
        self.assertTrue(isinstance(transport, MemoryTransport))
188
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
253
        t = memory.MemoryTransport()
 
254
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
255
        self.assertEqual("memory:///", t.clone("/").base)
189
256
 
190
257
    def test_abspath(self):
191
 
        transport = MemoryTransport()
192
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
258
        t = memory.MemoryTransport()
 
259
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
193
260
 
194
261
    def test_abspath_of_root(self):
195
 
        transport = MemoryTransport()
196
 
        self.assertEqual("memory:///", transport.base)
197
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
262
        t = memory.MemoryTransport()
 
263
        self.assertEqual("memory:///", t.base)
 
264
        self.assertEqual("memory:///", t.abspath('/'))
198
265
 
199
266
    def test_abspath_of_relpath_starting_at_root(self):
200
 
        transport = MemoryTransport()
201
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
267
        t = memory.MemoryTransport()
 
268
        self.assertEqual("memory:///foo", t.abspath('/foo'))
202
269
 
203
270
    def test_append_and_get(self):
204
 
        transport = MemoryTransport()
205
 
        transport.append_bytes('path', 'content')
206
 
        self.assertEqual(transport.get('path').read(), 'content')
207
 
        transport.append_file('path', StringIO('content'))
208
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
271
        t = memory.MemoryTransport()
 
272
        t.append_bytes('path', 'content')
 
273
        self.assertEqual(t.get('path').read(), 'content')
 
274
        t.append_file('path', StringIO('content'))
 
275
        self.assertEqual(t.get('path').read(), 'contentcontent')
209
276
 
210
277
    def test_put_and_get(self):
211
 
        transport = MemoryTransport()
212
 
        transport.put_file('path', StringIO('content'))
213
 
        self.assertEqual(transport.get('path').read(), 'content')
214
 
        transport.put_bytes('path', 'content')
215
 
        self.assertEqual(transport.get('path').read(), 'content')
 
278
        t = memory.MemoryTransport()
 
279
        t.put_file('path', StringIO('content'))
 
280
        self.assertEqual(t.get('path').read(), 'content')
 
281
        t.put_bytes('path', 'content')
 
282
        self.assertEqual(t.get('path').read(), 'content')
216
283
 
217
284
    def test_append_without_dir_fails(self):
218
 
        transport = MemoryTransport()
219
 
        self.assertRaises(NoSuchFile,
220
 
                          transport.append_bytes, 'dir/path', 'content')
 
285
        t = memory.MemoryTransport()
 
286
        self.assertRaises(errors.NoSuchFile,
 
287
                          t.append_bytes, 'dir/path', 'content')
221
288
 
222
289
    def test_put_without_dir_fails(self):
223
 
        transport = MemoryTransport()
224
 
        self.assertRaises(NoSuchFile,
225
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
290
        t = memory.MemoryTransport()
 
291
        self.assertRaises(errors.NoSuchFile,
 
292
                          t.put_file, 'dir/path', StringIO('content'))
226
293
 
227
294
    def test_get_missing(self):
228
 
        transport = MemoryTransport()
229
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
295
        transport = memory.MemoryTransport()
 
296
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
230
297
 
231
298
    def test_has_missing(self):
232
 
        transport = MemoryTransport()
233
 
        self.assertEquals(False, transport.has('foo'))
 
299
        t = memory.MemoryTransport()
 
300
        self.assertEquals(False, t.has('foo'))
234
301
 
235
302
    def test_has_present(self):
236
 
        transport = MemoryTransport()
237
 
        transport.append_bytes('foo', 'content')
238
 
        self.assertEquals(True, transport.has('foo'))
 
303
        t = memory.MemoryTransport()
 
304
        t.append_bytes('foo', 'content')
 
305
        self.assertEquals(True, t.has('foo'))
239
306
 
240
307
    def test_list_dir(self):
241
 
        transport = MemoryTransport()
242
 
        transport.put_bytes('foo', 'content')
243
 
        transport.mkdir('dir')
244
 
        transport.put_bytes('dir/subfoo', 'content')
245
 
        transport.put_bytes('dirlike', 'content')
 
308
        t = memory.MemoryTransport()
 
309
        t.put_bytes('foo', 'content')
 
310
        t.mkdir('dir')
 
311
        t.put_bytes('dir/subfoo', 'content')
 
312
        t.put_bytes('dirlike', 'content')
246
313
 
247
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
248
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
314
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
315
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
249
316
 
250
317
    def test_mkdir(self):
251
 
        transport = MemoryTransport()
252
 
        transport.mkdir('dir')
253
 
        transport.append_bytes('dir/path', 'content')
254
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
318
        t = memory.MemoryTransport()
 
319
        t.mkdir('dir')
 
320
        t.append_bytes('dir/path', 'content')
 
321
        self.assertEqual(t.get('dir/path').read(), 'content')
255
322
 
256
323
    def test_mkdir_missing_parent(self):
257
 
        transport = MemoryTransport()
258
 
        self.assertRaises(NoSuchFile,
259
 
                          transport.mkdir, 'dir/dir')
 
324
        t = memory.MemoryTransport()
 
325
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
260
326
 
261
327
    def test_mkdir_twice(self):
262
 
        transport = MemoryTransport()
263
 
        transport.mkdir('dir')
264
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
328
        t = memory.MemoryTransport()
 
329
        t.mkdir('dir')
 
330
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
265
331
 
266
332
    def test_parameters(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual(True, transport.listable())
269
 
        self.assertEqual(False, transport.should_cache())
270
 
        self.assertEqual(False, transport.is_readonly())
 
333
        t = memory.MemoryTransport()
 
334
        self.assertEqual(True, t.listable())
 
335
        self.assertEqual(False, t.is_readonly())
271
336
 
272
337
    def test_iter_files_recursive(self):
273
 
        transport = MemoryTransport()
274
 
        transport.mkdir('dir')
275
 
        transport.put_bytes('dir/foo', 'content')
276
 
        transport.put_bytes('dir/bar', 'content')
277
 
        transport.put_bytes('bar', 'content')
278
 
        paths = set(transport.iter_files_recursive())
 
338
        t = memory.MemoryTransport()
 
339
        t.mkdir('dir')
 
340
        t.put_bytes('dir/foo', 'content')
 
341
        t.put_bytes('dir/bar', 'content')
 
342
        t.put_bytes('bar', 'content')
 
343
        paths = set(t.iter_files_recursive())
279
344
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
280
345
 
281
346
    def test_stat(self):
282
 
        transport = MemoryTransport()
283
 
        transport.put_bytes('foo', 'content')
284
 
        transport.put_bytes('bar', 'phowar')
285
 
        self.assertEqual(7, transport.stat('foo').st_size)
286
 
        self.assertEqual(6, transport.stat('bar').st_size)
287
 
 
288
 
 
289
 
class ChrootDecoratorTransportTest(TestCase):
 
347
        t = memory.MemoryTransport()
 
348
        t.put_bytes('foo', 'content')
 
349
        t.put_bytes('bar', 'phowar')
 
350
        self.assertEqual(7, t.stat('foo').st_size)
 
351
        self.assertEqual(6, t.stat('bar').st_size)
 
352
 
 
353
 
 
354
class ChrootDecoratorTransportTest(tests.TestCase):
290
355
    """Chroot decoration specific tests."""
291
356
 
 
357
    def test_abspath(self):
 
358
        # The abspath is always relative to the chroot_url.
 
359
        server = chroot.ChrootServer(
 
360
            transport.get_transport_from_url('memory:///foo/bar/'))
 
361
        self.start_server(server)
 
362
        t = transport.get_transport_from_url(server.get_url())
 
363
        self.assertEqual(server.get_url(), t.abspath('/'))
 
364
 
 
365
        subdir_t = t.clone('subdir')
 
366
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
367
 
 
368
    def test_clone(self):
 
369
        server = chroot.ChrootServer(
 
370
            transport.get_transport_from_url('memory:///foo/bar/'))
 
371
        self.start_server(server)
 
372
        t = transport.get_transport_from_url(server.get_url())
 
373
        # relpath from root and root path are the same
 
374
        relpath_cloned = t.clone('foo')
 
375
        abspath_cloned = t.clone('/foo')
 
376
        self.assertEqual(server, relpath_cloned.server)
 
377
        self.assertEqual(server, abspath_cloned.server)
 
378
 
 
379
    def test_chroot_url_preserves_chroot(self):
 
380
        """Calling get_transport on a chroot transport's base should produce a
 
381
        transport with exactly the same behaviour as the original chroot
 
382
        transport.
 
383
 
 
384
        This is so that it is not possible to escape a chroot by doing::
 
385
            url = chroot_transport.base
 
386
            parent_url = urlutils.join(url, '..')
 
387
            new_t = transport.get_transport_from_url(parent_url)
 
388
        """
 
389
        server = chroot.ChrootServer(
 
390
            transport.get_transport_from_url('memory:///path/subpath'))
 
391
        self.start_server(server)
 
392
        t = transport.get_transport_from_url(server.get_url())
 
393
        new_t = transport.get_transport_from_url(t.base)
 
394
        self.assertEqual(t.server, new_t.server)
 
395
        self.assertEqual(t.base, new_t.base)
 
396
 
 
397
    def test_urljoin_preserves_chroot(self):
 
398
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
399
        URL that escapes the intended chroot.
 
400
 
 
401
        This is so that it is not possible to escape a chroot by doing::
 
402
            url = chroot_transport.base
 
403
            parent_url = urlutils.join(url, '..')
 
404
            new_t = transport.get_transport_from_url(parent_url)
 
405
        """
 
406
        server = chroot.ChrootServer(
 
407
            transport.get_transport_from_url('memory:///path/'))
 
408
        self.start_server(server)
 
409
        t = transport.get_transport_from_url(server.get_url())
 
410
        self.assertRaises(
 
411
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
412
 
 
413
 
 
414
class TestChrootServer(tests.TestCase):
 
415
 
292
416
    def test_construct(self):
293
 
        from bzrlib.transport import chroot
294
 
        transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
295
 
        self.assertEqual('memory:///pathA/', transport.chroot_url)
296
 
 
297
 
        transport = chroot.ChrootTransportDecorator(
298
 
            'chroot+memory:///path/B', chroot='memory:///path/')
299
 
        self.assertEqual('memory:///path/', transport.chroot_url)
300
 
 
301
 
    def test_append_file(self):
302
 
        transport = get_transport('chroot+memory:///foo/bar')
303
 
        self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
304
 
 
305
 
    def test_append_bytes(self):
306
 
        transport = get_transport('chroot+memory:///foo/bar')
307
 
        self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
 
417
        backing_transport = memory.MemoryTransport()
 
418
        server = chroot.ChrootServer(backing_transport)
 
419
        self.assertEqual(backing_transport, server.backing_transport)
 
420
 
 
421
    def test_setUp(self):
 
422
        backing_transport = memory.MemoryTransport()
 
423
        server = chroot.ChrootServer(backing_transport)
 
424
        server.start_server()
 
425
        self.addCleanup(server.stop_server)
 
426
        self.assertTrue(server.scheme
 
427
                        in transport._get_protocol_handlers().keys())
 
428
 
 
429
    def test_stop_server(self):
 
430
        backing_transport = memory.MemoryTransport()
 
431
        server = chroot.ChrootServer(backing_transport)
 
432
        server.start_server()
 
433
        server.stop_server()
 
434
        self.assertFalse(server.scheme
 
435
                         in transport._get_protocol_handlers().keys())
 
436
 
 
437
    def test_get_url(self):
 
438
        backing_transport = memory.MemoryTransport()
 
439
        server = chroot.ChrootServer(backing_transport)
 
440
        server.start_server()
 
441
        self.addCleanup(server.stop_server)
 
442
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
443
 
 
444
 
 
445
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
446
    """Pathfilter decoration specific tests."""
 
447
 
 
448
    def test_abspath(self):
 
449
        # The abspath is always relative to the base of the backing transport.
 
450
        server = pathfilter.PathFilteringServer(
 
451
            transport.get_transport_from_url('memory:///foo/bar/'),
 
452
            lambda x: x)
 
453
        server.start_server()
 
454
        t = transport.get_transport_from_url(server.get_url())
 
455
        self.assertEqual(server.get_url(), t.abspath('/'))
 
456
 
 
457
        subdir_t = t.clone('subdir')
 
458
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
459
        server.stop_server()
 
460
 
 
461
    def make_pf_transport(self, filter_func=None):
 
462
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
463
 
 
464
        :param filter_func: by default this will be a no-op function.  Use this
 
465
            parameter to override it."""
 
466
        if filter_func is None:
 
467
            filter_func = lambda x: x
 
468
        server = pathfilter.PathFilteringServer(
 
469
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
470
        server.start_server()
 
471
        self.addCleanup(server.stop_server)
 
472
        return transport.get_transport_from_url(server.get_url())
 
473
 
 
474
    def test__filter(self):
 
475
        # _filter (with an identity func as filter_func) always returns
 
476
        # paths relative to the base of the backing transport.
 
477
        t = self.make_pf_transport()
 
478
        self.assertEqual('foo', t._filter('foo'))
 
479
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
480
        self.assertEqual('', t._filter('..'))
 
481
        self.assertEqual('', t._filter('/'))
 
482
        # The base of the pathfiltering transport is taken into account too.
 
483
        t = t.clone('subdir1/subdir2')
 
484
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
485
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
486
        self.assertEqual('subdir1', t._filter('..'))
 
487
        self.assertEqual('', t._filter('/'))
 
488
 
 
489
    def test_filter_invocation(self):
 
490
        filter_log = []
 
491
 
 
492
        def filter(path):
 
493
            filter_log.append(path)
 
494
            return path
 
495
        t = self.make_pf_transport(filter)
 
496
        t.has('abc')
 
497
        self.assertEqual(['abc'], filter_log)
 
498
        del filter_log[:]
 
499
        t.clone('abc').has('xyz')
 
500
        self.assertEqual(['abc/xyz'], filter_log)
 
501
        del filter_log[:]
 
502
        t.has('/abc')
 
503
        self.assertEqual(['abc'], filter_log)
308
504
 
309
505
    def test_clone(self):
310
 
        transport = get_transport('chroot+memory:///foo/bar')
311
 
        self.assertRaises(PathNotChild, transport.clone, '/foo')
312
 
 
313
 
    def test_delete(self):
314
 
        transport = get_transport('chroot+memory:///foo/bar')
315
 
        self.assertRaises(PathNotChild, transport.delete, '/foo')
316
 
 
317
 
    def test_delete_tree(self):
318
 
        transport = get_transport('chroot+memory:///foo/bar')
319
 
        self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
320
 
 
321
 
    def test_get(self):
322
 
        transport = get_transport('chroot+memory:///foo/bar')
323
 
        self.assertRaises(PathNotChild, transport.get, '/foo')
324
 
 
325
 
    def test_get_bytes(self):
326
 
        transport = get_transport('chroot+memory:///foo/bar')
327
 
        self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
328
 
 
329
 
    def test_has(self):
330
 
        transport = get_transport('chroot+memory:///foo/bar')
331
 
        self.assertRaises(PathNotChild, transport.has, '/foo')
332
 
 
333
 
    def test_list_dir(self):
334
 
        transport = get_transport('chroot+memory:///foo/bar')
335
 
        self.assertRaises(PathNotChild, transport.list_dir, '/foo')
336
 
 
337
 
    def test_lock_read(self):
338
 
        transport = get_transport('chroot+memory:///foo/bar')
339
 
        self.assertRaises(PathNotChild, transport.lock_read, '/foo')
340
 
 
341
 
    def test_lock_write(self):
342
 
        transport = get_transport('chroot+memory:///foo/bar')
343
 
        self.assertRaises(PathNotChild, transport.lock_write, '/foo')
344
 
 
345
 
    def test_mkdir(self):
346
 
        transport = get_transport('chroot+memory:///foo/bar')
347
 
        self.assertRaises(PathNotChild, transport.mkdir, '/foo')
348
 
 
349
 
    def test_put_bytes(self):
350
 
        transport = get_transport('chroot+memory:///foo/bar')
351
 
        self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
352
 
 
353
 
    def test_put_file(self):
354
 
        transport = get_transport('chroot+memory:///foo/bar')
355
 
        self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
356
 
 
357
 
    def test_rename(self):
358
 
        transport = get_transport('chroot+memory:///foo/bar')
359
 
        self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
360
 
        self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
361
 
 
362
 
    def test_rmdir(self):
363
 
        transport = get_transport('chroot+memory:///foo/bar')
364
 
        self.assertRaises(PathNotChild, transport.rmdir, '/foo')
365
 
 
366
 
    def test_stat(self):
367
 
        transport = get_transport('chroot+memory:///foo/bar')
368
 
        self.assertRaises(PathNotChild, transport.stat, '/foo')
369
 
 
370
 
 
371
 
class ReadonlyDecoratorTransportTest(TestCase):
 
506
        t = self.make_pf_transport()
 
507
        # relpath from root and root path are the same
 
508
        relpath_cloned = t.clone('foo')
 
509
        abspath_cloned = t.clone('/foo')
 
510
        self.assertEqual(t.server, relpath_cloned.server)
 
511
        self.assertEqual(t.server, abspath_cloned.server)
 
512
 
 
513
    def test_url_preserves_pathfiltering(self):
 
514
        """Calling get_transport on a pathfiltered transport's base should
 
515
        produce a transport with exactly the same behaviour as the original
 
516
        pathfiltered transport.
 
517
 
 
518
        This is so that it is not possible to escape (accidentally or
 
519
        otherwise) the filtering by doing::
 
520
            url = filtered_transport.base
 
521
            parent_url = urlutils.join(url, '..')
 
522
            new_t = transport.get_transport_from_url(parent_url)
 
523
        """
 
524
        t = self.make_pf_transport()
 
525
        new_t = transport.get_transport_from_url(t.base)
 
526
        self.assertEqual(t.server, new_t.server)
 
527
        self.assertEqual(t.base, new_t.base)
 
528
 
 
529
 
 
530
class ReadonlyDecoratorTransportTest(tests.TestCase):
372
531
    """Readonly decoration specific tests."""
373
532
 
374
533
    def test_local_parameters(self):
375
 
        import bzrlib.transport.readonly as readonly
376
534
        # connect to . in readonly mode
377
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
378
 
        self.assertEqual(True, transport.listable())
379
 
        self.assertEqual(False, transport.should_cache())
380
 
        self.assertEqual(True, transport.is_readonly())
 
535
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
536
        self.assertEqual(True, t.listable())
 
537
        self.assertEqual(True, t.is_readonly())
381
538
 
382
539
    def test_http_parameters(self):
383
 
        from bzrlib.tests.HttpServer import HttpServer
384
 
        import bzrlib.transport.readonly as readonly
385
 
        # connect to . via http which is not listable
 
540
        from bzrlib.tests.http_server import HttpServer
 
541
        # connect to '.' via http which is not listable
386
542
        server = HttpServer()
387
 
        server.setUp()
388
 
        try:
389
 
            transport = get_transport('readonly+' + server.get_url())
390
 
            self.failUnless(isinstance(transport,
391
 
                                       readonly.ReadonlyTransportDecorator))
392
 
            self.assertEqual(False, transport.listable())
393
 
            self.assertEqual(True, transport.should_cache())
394
 
            self.assertEqual(True, transport.is_readonly())
395
 
        finally:
396
 
            server.tearDown()
397
 
 
398
 
 
399
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
543
        self.start_server(server)
 
544
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
545
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
546
        self.assertEqual(False, t.listable())
 
547
        self.assertEqual(True, t.is_readonly())
 
548
 
 
549
 
 
550
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
400
551
    """NFS decorator specific tests."""
401
552
 
402
553
    def get_nfs_transport(self, url):
403
 
        import bzrlib.transport.fakenfs as fakenfs
404
554
        # connect to url with nfs decoration
405
555
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
406
556
 
407
557
    def test_local_parameters(self):
408
 
        # the listable, should_cache and is_readonly parameters
 
558
        # the listable and is_readonly parameters
409
559
        # are not changed by the fakenfs decorator
410
 
        transport = self.get_nfs_transport('.')
411
 
        self.assertEqual(True, transport.listable())
412
 
        self.assertEqual(False, transport.should_cache())
413
 
        self.assertEqual(False, transport.is_readonly())
 
560
        t = self.get_nfs_transport('.')
 
561
        self.assertEqual(True, t.listable())
 
562
        self.assertEqual(False, t.is_readonly())
414
563
 
415
564
    def test_http_parameters(self):
416
 
        # the listable, should_cache and is_readonly parameters
 
565
        # the listable and is_readonly parameters
417
566
        # are not changed by the fakenfs decorator
418
 
        from bzrlib.tests.HttpServer import HttpServer
419
 
        # connect to . via http which is not listable
 
567
        from bzrlib.tests.http_server import HttpServer
 
568
        # connect to '.' via http which is not listable
420
569
        server = HttpServer()
421
 
        server.setUp()
422
 
        try:
423
 
            transport = self.get_nfs_transport(server.get_url())
424
 
            self.assertIsInstance(
425
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
426
 
            self.assertEqual(False, transport.listable())
427
 
            self.assertEqual(True, transport.should_cache())
428
 
            self.assertEqual(True, transport.is_readonly())
429
 
        finally:
430
 
            server.tearDown()
 
570
        self.start_server(server)
 
571
        t = self.get_nfs_transport(server.get_url())
 
572
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
573
        self.assertEqual(False, t.listable())
 
574
        self.assertEqual(True, t.is_readonly())
431
575
 
432
576
    def test_fakenfs_server_default(self):
433
577
        # a FakeNFSServer() should bring up a local relpath server for itself
434
 
        import bzrlib.transport.fakenfs as fakenfs
435
 
        server = fakenfs.FakeNFSServer()
436
 
        server.setUp()
437
 
        try:
438
 
            # the url should be decorated appropriately
439
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
440
 
            # and we should be able to get a transport for it
441
 
            transport = get_transport(server.get_url())
442
 
            # which must be a FakeNFSTransportDecorator instance.
443
 
            self.assertIsInstance(
444
 
                transport, fakenfs.FakeNFSTransportDecorator)
445
 
        finally:
446
 
            server.tearDown()
 
578
        server = test_server.FakeNFSServer()
 
579
        self.start_server(server)
 
580
        # the url should be decorated appropriately
 
581
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
582
        # and we should be able to get a transport for it
 
583
        t = transport.get_transport_from_url(server.get_url())
 
584
        # which must be a FakeNFSTransportDecorator instance.
 
585
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
447
586
 
448
587
    def test_fakenfs_rename_semantics(self):
449
588
        # a FakeNFS transport must mangle the way rename errors occur to
450
589
        # look like NFS problems.
451
 
        transport = self.get_nfs_transport('.')
 
590
        t = self.get_nfs_transport('.')
452
591
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
453
 
                        transport=transport)
454
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
455
 
                          transport.rename, 'from', 'to')
456
 
 
457
 
 
458
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
592
                        transport=t)
 
593
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
594
 
 
595
 
 
596
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
459
597
    """Tests for simulation of VFAT restrictions"""
460
598
 
461
599
    def get_vfat_transport(self, url):
465
603
 
466
604
    def test_transport_creation(self):
467
605
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
468
 
        transport = self.get_vfat_transport('.')
469
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
606
        t = self.get_vfat_transport('.')
 
607
        self.assertIsInstance(t, FakeVFATTransportDecorator)
470
608
 
471
609
    def test_transport_mkdir(self):
472
 
        transport = self.get_vfat_transport('.')
473
 
        transport.mkdir('HELLO')
474
 
        self.assertTrue(transport.has('hello'))
475
 
        self.assertTrue(transport.has('Hello'))
 
610
        t = self.get_vfat_transport('.')
 
611
        t.mkdir('HELLO')
 
612
        self.assertTrue(t.has('hello'))
 
613
        self.assertTrue(t.has('Hello'))
476
614
 
477
615
    def test_forbidden_chars(self):
478
 
        transport = self.get_vfat_transport('.')
479
 
        self.assertRaises(ValueError, transport.has, "<NU>")
480
 
 
481
 
 
482
 
class BadTransportHandler(Transport):
 
616
        t = self.get_vfat_transport('.')
 
617
        self.assertRaises(ValueError, t.has, "<NU>")
 
618
 
 
619
 
 
620
class BadTransportHandler(transport.Transport):
483
621
    def __init__(self, base_url):
484
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
485
 
 
486
 
 
487
 
class BackupTransportHandler(Transport):
 
622
        raise errors.DependencyNotPresent('some_lib',
 
623
                                          'testing missing dependency')
 
624
 
 
625
 
 
626
class BackupTransportHandler(transport.Transport):
488
627
    """Test transport that works as a backup for the BadTransportHandler"""
489
628
    pass
490
629
 
491
630
 
492
 
class TestTransportImplementation(TestCaseInTempDir):
 
631
class TestTransportImplementation(tests.TestCaseInTempDir):
493
632
    """Implementation verification for transports.
494
 
    
 
633
 
495
634
    To verify a transport we need a server factory, which is a callable
496
635
    that accepts no parameters and returns an implementation of
497
636
    bzrlib.transport.Server.
498
 
    
 
637
 
499
638
    That Server is then used to construct transport instances and test
500
639
    the transport via loopback activity.
501
640
 
502
 
    Currently this assumes that the Transport object is connected to the 
503
 
    current working directory.  So that whatever is done 
504
 
    through the transport, should show up in the working 
 
641
    Currently this assumes that the Transport object is connected to the
 
642
    current working directory.  So that whatever is done
 
643
    through the transport, should show up in the working
505
644
    directory, and vice-versa. This is a bug, because its possible to have
506
 
    URL schemes which provide access to something that may not be 
507
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
645
    URL schemes which provide access to something that may not be
 
646
    result in storage on the local disk, i.e. due to file system limits, or
508
647
    due to it being a database or some other non-filesystem tool.
509
648
 
510
649
    This also tests to make sure that the functions work with both
511
650
    generators and lists (assuming iter(list) is effectively a generator)
512
651
    """
513
 
    
 
652
 
514
653
    def setUp(self):
515
654
        super(TestTransportImplementation, self).setUp()
516
655
        self._server = self.transport_server()
517
 
        self._server.setUp()
518
 
 
519
 
    def tearDown(self):
520
 
        super(TestTransportImplementation, self).tearDown()
521
 
        self._server.tearDown()
522
 
        
523
 
    def get_transport(self):
524
 
        """Return a connected transport to the local directory."""
 
656
        self.start_server(self._server)
 
657
 
 
658
    def get_transport(self, relpath=None):
 
659
        """Return a connected transport to the local directory.
 
660
 
 
661
        :param relpath: a path relative to the base url.
 
662
        """
525
663
        base_url = self._server.get_url()
 
664
        url = self._adjust_url(base_url, relpath)
526
665
        # try getting the transport via the regular interface:
527
 
        t = get_transport(base_url)
 
666
        t = transport.get_transport_from_url(url)
 
667
        # vila--20070607 if the following are commented out the test suite
 
668
        # still pass. Is this really still needed or was it a forgotten
 
669
        # temporary fix ?
528
670
        if not isinstance(t, self.transport_class):
529
671
            # we did not get the correct transport class type. Override the
530
672
            # regular connection behaviour by direct construction.
531
 
            t = self.transport_class(base_url)
 
673
            t = self.transport_class(url)
532
674
        return t
533
675
 
534
676
 
535
 
class TestLocalTransports(TestCase):
 
677
class TestTransportFromPath(tests.TestCaseInTempDir):
 
678
 
 
679
    def test_with_path(self):
 
680
        t = transport.get_transport_from_path(self.test_dir)
 
681
        self.assertIsInstance(t, local.LocalTransport)
 
682
        self.assertEquals(t.base.rstrip("/"),
 
683
            urlutils.local_path_to_url(self.test_dir))
 
684
 
 
685
    def test_with_url(self):
 
686
        t = transport.get_transport_from_path("file:")
 
687
        self.assertIsInstance(t, local.LocalTransport)
 
688
        self.assertEquals(t.base.rstrip("/"),
 
689
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
690
 
 
691
 
 
692
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
693
 
 
694
    def test_with_path(self):
 
695
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
696
            self.test_dir)
 
697
 
 
698
    def test_with_url(self):
 
699
        url = urlutils.local_path_to_url(self.test_dir)
 
700
        t = transport.get_transport_from_url(url)
 
701
        self.assertIsInstance(t, local.LocalTransport)
 
702
        self.assertEquals(t.base.rstrip("/"), url)
 
703
 
 
704
    def test_with_url_and_segment_parameters(self):
 
705
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
706
        t = transport.get_transport_from_url(url)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEquals(t.base.rstrip("/"), url)
 
709
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
710
            f.write("data")
 
711
        self.assertTrue(t.has("afile"))
 
712
 
 
713
 
 
714
class TestLocalTransports(tests.TestCase):
536
715
 
537
716
    def test_get_transport_from_abspath(self):
538
 
        here = os.path.abspath('.')
539
 
        t = get_transport(here)
540
 
        self.assertIsInstance(t, LocalTransport)
 
717
        here = osutils.abspath('.')
 
718
        t = transport.get_transport(here)
 
719
        self.assertIsInstance(t, local.LocalTransport)
541
720
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
542
721
 
543
722
    def test_get_transport_from_relpath(self):
544
 
        here = os.path.abspath('.')
545
 
        t = get_transport('.')
546
 
        self.assertIsInstance(t, LocalTransport)
 
723
        here = osutils.abspath('.')
 
724
        t = transport.get_transport('.')
 
725
        self.assertIsInstance(t, local.LocalTransport)
547
726
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
548
727
 
549
728
    def test_get_transport_from_local_url(self):
550
 
        here = os.path.abspath('.')
 
729
        here = osutils.abspath('.')
551
730
        here_url = urlutils.local_path_to_url(here) + '/'
552
 
        t = get_transport(here_url)
553
 
        self.assertIsInstance(t, LocalTransport)
 
731
        t = transport.get_transport(here_url)
 
732
        self.assertIsInstance(t, local.LocalTransport)
554
733
        self.assertEquals(t.base, here_url)
555
734
 
556
 
 
557
 
class TestWin32LocalTransport(TestCase):
 
735
    def test_local_abspath(self):
 
736
        here = osutils.abspath('.')
 
737
        t = transport.get_transport(here)
 
738
        self.assertEquals(t.local_abspath(''), here)
 
739
 
 
740
 
 
741
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
742
 
 
743
    def test_local_fdatasync_calls_fdatasync(self):
 
744
        """Check fdatasync on a stream tries to flush the data to the OS.
 
745
        
 
746
        We can't easily observe the external effect but we can at least see
 
747
        it's called.
 
748
        """
 
749
        sentinel = object()
 
750
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
751
        if fdatasync is sentinel:
 
752
            raise tests.TestNotApplicable('fdatasync not supported')
 
753
        t = self.get_transport('.')
 
754
        calls = self.recordCalls(os, 'fdatasync')
 
755
        w = t.open_write_stream('out')
 
756
        w.write('foo')
 
757
        w.fdatasync()
 
758
        with open('out', 'rb') as f:
 
759
            # Should have been flushed.
 
760
            self.assertEquals(f.read(), 'foo')
 
761
        self.assertEquals(len(calls), 1, calls)
 
762
 
 
763
    def test_missing_directory(self):
 
764
        t = self.get_transport('.')
 
765
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
766
 
 
767
 
 
768
class TestWin32LocalTransport(tests.TestCase):
558
769
 
559
770
    def test_unc_clone_to_root(self):
560
771
        # Win32 UNC path like \\HOST\path
561
772
        # clone to root should stop at least at \\HOST part
562
773
        # not on \\
563
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
774
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
564
775
        for i in xrange(4):
565
776
            t = t.clone('..')
566
777
        self.assertEquals(t.base, 'file://HOST/')
567
778
        # make sure we reach the root
568
779
        t = t.clone('..')
569
780
        self.assertEquals(t.base, 'file://HOST/')
 
781
 
 
782
 
 
783
class TestConnectedTransport(tests.TestCase):
 
784
    """Tests for connected to remote server transports"""
 
785
 
 
786
    def test_parse_url(self):
 
787
        t = transport.ConnectedTransport(
 
788
            'http://simple.example.com/home/source')
 
789
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
790
        self.assertEquals(t._parsed_url.port, None)
 
791
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
792
        self.assertTrue(t._parsed_url.user is None)
 
793
        self.assertTrue(t._parsed_url.password is None)
 
794
 
 
795
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
796
 
 
797
    def test_parse_url_with_at_in_user(self):
 
798
        # Bug 228058
 
799
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
800
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
801
 
 
802
    def test_parse_quoted_url(self):
 
803
        t = transport.ConnectedTransport(
 
804
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
805
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
806
        self.assertEquals(t._parsed_url.port, 2222)
 
807
        self.assertEquals(t._parsed_url.user, 'robey')
 
808
        self.assertEquals(t._parsed_url.password, 'h@t')
 
809
        self.assertEquals(t._parsed_url.path, '/path/')
 
810
 
 
811
        # Base should not keep track of the password
 
812
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
813
 
 
814
    def test_parse_invalid_url(self):
 
815
        self.assertRaises(errors.InvalidURL,
 
816
                          transport.ConnectedTransport,
 
817
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
818
 
 
819
    def test_relpath(self):
 
820
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
821
 
 
822
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
823
            'sub')
 
824
        self.assertRaises(errors.PathNotChild, t.relpath,
 
825
                          'http://user@host.com/abs/path/sub')
 
826
        self.assertRaises(errors.PathNotChild, t.relpath,
 
827
                          'sftp://user2@host.com/abs/path/sub')
 
828
        self.assertRaises(errors.PathNotChild, t.relpath,
 
829
                          'sftp://user@otherhost.com/abs/path/sub')
 
830
        self.assertRaises(errors.PathNotChild, t.relpath,
 
831
                          'sftp://user@host.com:33/abs/path/sub')
 
832
        # Make sure it works when we don't supply a username
 
833
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
834
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
835
 
 
836
        # Make sure it works when parts of the path will be url encoded
 
837
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
838
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
839
 
 
840
    def test_connection_sharing_propagate_credentials(self):
 
841
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
842
        self.assertEquals('user', t._parsed_url.user)
 
843
        self.assertEquals('host.com', t._parsed_url.host)
 
844
        self.assertIs(None, t._get_connection())
 
845
        self.assertIs(None, t._parsed_url.password)
 
846
        c = t.clone('subdir')
 
847
        self.assertIs(None, c._get_connection())
 
848
        self.assertIs(None, t._parsed_url.password)
 
849
 
 
850
        # Simulate the user entering a password
 
851
        password = 'secret'
 
852
        connection = object()
 
853
        t._set_connection(connection, password)
 
854
        self.assertIs(connection, t._get_connection())
 
855
        self.assertIs(password, t._get_credentials())
 
856
        self.assertIs(connection, c._get_connection())
 
857
        self.assertIs(password, c._get_credentials())
 
858
 
 
859
        # credentials can be updated
 
860
        new_password = 'even more secret'
 
861
        c._update_credentials(new_password)
 
862
        self.assertIs(connection, t._get_connection())
 
863
        self.assertIs(new_password, t._get_credentials())
 
864
        self.assertIs(connection, c._get_connection())
 
865
        self.assertIs(new_password, c._get_credentials())
 
866
 
 
867
 
 
868
class TestReusedTransports(tests.TestCase):
 
869
    """Tests for transport reuse"""
 
870
 
 
871
    def test_reuse_same_transport(self):
 
872
        possible_transports = []
 
873
        t1 = transport.get_transport_from_url('http://foo/',
 
874
                                     possible_transports=possible_transports)
 
875
        self.assertEqual([t1], possible_transports)
 
876
        t2 = transport.get_transport_from_url('http://foo/',
 
877
                                     possible_transports=[t1])
 
878
        self.assertIs(t1, t2)
 
879
 
 
880
        # Also check that final '/' are handled correctly
 
881
        t3 = transport.get_transport_from_url('http://foo/path/')
 
882
        t4 = transport.get_transport_from_url('http://foo/path',
 
883
                                     possible_transports=[t3])
 
884
        self.assertIs(t3, t4)
 
885
 
 
886
        t5 = transport.get_transport_from_url('http://foo/path')
 
887
        t6 = transport.get_transport_from_url('http://foo/path/',
 
888
                                     possible_transports=[t5])
 
889
        self.assertIs(t5, t6)
 
890
 
 
891
    def test_don_t_reuse_different_transport(self):
 
892
        t1 = transport.get_transport_from_url('http://foo/path')
 
893
        t2 = transport.get_transport_from_url('http://bar/path',
 
894
                                     possible_transports=[t1])
 
895
        self.assertIsNot(t1, t2)
 
896
 
 
897
 
 
898
class TestTransportTrace(tests.TestCase):
 
899
 
 
900
    def test_decorator(self):
 
901
        t = transport.get_transport_from_url('trace+memory://')
 
902
        self.assertIsInstance(
 
903
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
904
 
 
905
    def test_clone_preserves_activity(self):
 
906
        t = transport.get_transport_from_url('trace+memory://')
 
907
        t2 = t.clone('.')
 
908
        self.assertTrue(t is not t2)
 
909
        self.assertTrue(t._activity is t2._activity)
 
910
 
 
911
    # the following specific tests are for the operations that have made use of
 
912
    # logging in tests; we could test every single operation but doing that
 
913
    # still won't cause a test failure when the top level Transport API
 
914
    # changes; so there is little return doing that.
 
915
    def test_get(self):
 
916
        t = transport.get_transport_from_url('trace+memory:///')
 
917
        t.put_bytes('foo', 'barish')
 
918
        t.get('foo')
 
919
        expected_result = []
 
920
        # put_bytes records the bytes, not the content to avoid memory
 
921
        # pressure.
 
922
        expected_result.append(('put_bytes', 'foo', 6, None))
 
923
        # get records the file name only.
 
924
        expected_result.append(('get', 'foo'))
 
925
        self.assertEqual(expected_result, t._activity)
 
926
 
 
927
    def test_readv(self):
 
928
        t = transport.get_transport_from_url('trace+memory:///')
 
929
        t.put_bytes('foo', 'barish')
 
930
        list(t.readv('foo', [(0, 1), (3, 2)],
 
931
                     adjust_for_latency=True, upper_limit=6))
 
932
        expected_result = []
 
933
        # put_bytes records the bytes, not the content to avoid memory
 
934
        # pressure.
 
935
        expected_result.append(('put_bytes', 'foo', 6, None))
 
936
        # readv records the supplied offset request
 
937
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
938
        self.assertEqual(expected_result, t._activity)
 
939
 
 
940
 
 
941
class TestSSHConnections(tests.TestCaseWithTransport):
 
942
 
 
943
    def test_bzr_connect_to_bzr_ssh(self):
 
944
        """get_transport of a bzr+ssh:// behaves correctly.
 
945
 
 
946
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
947
        """
 
948
        # This test actually causes a bzr instance to be invoked, which is very
 
949
        # expensive: it should be the only such test in the test suite.
 
950
        # A reasonable evolution for this would be to simply check inside
 
951
        # check_channel_exec_request that the command is appropriate, and then
 
952
        # satisfy requests in-process.
 
953
        self.requireFeature(features.paramiko)
 
954
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
955
        # override the interface (doesn't change self._vendor).
 
956
        # Note that this does encryption, so can be slow.
 
957
        from bzrlib.tests import stub_sftp
 
958
 
 
959
        # Start an SSH server
 
960
        self.command_executed = []
 
961
        # XXX: This is horrible -- we define a really dumb SSH server that
 
962
        # executes commands, and manage the hooking up of stdin/out/err to the
 
963
        # SSH channel ourselves.  Surely this has already been implemented
 
964
        # elsewhere?
 
965
        started = []
 
966
 
 
967
        class StubSSHServer(stub_sftp.StubServer):
 
968
 
 
969
            test = self
 
970
 
 
971
            def check_channel_exec_request(self, channel, command):
 
972
                self.test.command_executed.append(command)
 
973
                proc = subprocess.Popen(
 
974
                    command, shell=True, stdin=subprocess.PIPE,
 
975
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
976
 
 
977
                # XXX: horribly inefficient, not to mention ugly.
 
978
                # Start a thread for each of stdin/out/err, and relay bytes
 
979
                # from the subprocess to channel and vice versa.
 
980
                def ferry_bytes(read, write, close):
 
981
                    while True:
 
982
                        bytes = read(1)
 
983
                        if bytes == '':
 
984
                            close()
 
985
                            break
 
986
                        write(bytes)
 
987
 
 
988
                file_functions = [
 
989
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
990
                    (proc.stdout.read, channel.sendall, channel.close),
 
991
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
992
                started.append(proc)
 
993
                for read, write, close in file_functions:
 
994
                    t = threading.Thread(
 
995
                        target=ferry_bytes, args=(read, write, close))
 
996
                    t.start()
 
997
                    started.append(t)
 
998
 
 
999
                return True
 
1000
 
 
1001
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1002
        # We *don't* want to override the default SSH vendor: the detected one
 
1003
        # is the one to use.
 
1004
 
 
1005
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1006
        # inherits from SFTPServer which forces the SSH vendor to
 
1007
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1008
        self.start_server(ssh_server)
 
1009
        port = ssh_server.port
 
1010
 
 
1011
        if sys.platform == 'win32':
 
1012
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1013
        else:
 
1014
            bzr_remote_path = self.get_bzr_path()
 
1015
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1016
 
 
1017
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1018
        # variable is used to tell bzr what command to run on the remote end.
 
1019
        path_to_branch = osutils.abspath('.')
 
1020
        if sys.platform == 'win32':
 
1021
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1022
            # prefix a '/' to get the right path.
 
1023
            path_to_branch = '/' + path_to_branch
 
1024
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1025
        t = transport.get_transport(url)
 
1026
        self.permit_url(t.base)
 
1027
        t.mkdir('foo')
 
1028
 
 
1029
        self.assertEqual(
 
1030
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1031
            self.command_executed)
 
1032
        # Make sure to disconnect, so that the remote process can stop, and we
 
1033
        # can cleanup. Then pause the test until everything is shutdown
 
1034
        t._client._medium.disconnect()
 
1035
        if not started:
 
1036
            return
 
1037
        # First wait for the subprocess
 
1038
        started[0].wait()
 
1039
        # And the rest are threads
 
1040
        for t in started[1:]:
 
1041
            t.join()
 
1042
 
 
1043
 
 
1044
class TestUnhtml(tests.TestCase):
 
1045
 
 
1046
    """Tests for unhtml_roughly"""
 
1047
 
 
1048
    def test_truncation(self):
 
1049
        fake_html = "<p>something!\n" * 1000
 
1050
        result = http.unhtml_roughly(fake_html)
 
1051
        self.assertEquals(len(result), 1000)
 
1052
        self.assertStartsWith(result, " something!")
 
1053
 
 
1054
 
 
1055
class SomeDirectory(object):
 
1056
 
 
1057
    def look_up(self, name, url):
 
1058
        return "http://bar"
 
1059
 
 
1060
 
 
1061
class TestLocationToUrl(tests.TestCase):
 
1062
 
 
1063
    def get_base_location(self):
 
1064
        path = osutils.abspath('/foo/bar')
 
1065
        if path.startswith('/'):
 
1066
            url = 'file://%s' % (path,)
 
1067
        else:
 
1068
            # On Windows, abspaths start with the drive letter, so we have to
 
1069
            # add in the extra '/'
 
1070
            url = 'file:///%s' % (path,)
 
1071
        return path, url
 
1072
 
 
1073
    def test_regular_url(self):
 
1074
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1075
 
 
1076
    def test_directory(self):
 
1077
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1078
        self.addCleanup(directories.remove, "bar:")
 
1079
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1080
 
 
1081
    def test_unicode_url(self):
 
1082
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1083
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1084
 
 
1085
    def test_unicode_path(self):
 
1086
        path, url = self.get_base_location()
 
1087
        location = path + "\xc3\xaf".decode("utf-8")
 
1088
        url += '%C3%AF'
 
1089
        self.assertEquals(url, location_to_url(location))
 
1090
 
 
1091
    def test_path(self):
 
1092
        path, url = self.get_base_location()
 
1093
        self.assertEquals(url, location_to_url(path))
 
1094
 
 
1095
    def test_relative_file_url(self):
 
1096
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1097
            location_to_url("file:bar"))
 
1098
 
 
1099
    def test_absolute_file_url(self):
 
1100
        self.assertEquals("file:///bar", location_to_url("file:/bar"))