~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2012-06-26 12:14:56 UTC
  • mto: This revision was merged to the branch mainline in revision 6527.
  • Revision ID: jelmer@samba.org-20120626121456-maeix7f8kllyqa7a
Remove deprecated annotate_file.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import errno
18
20
import os
 
21
import subprocess
19
22
import sys
20
 
import stat
21
 
from cStringIO import StringIO
 
23
import threading
22
24
 
23
 
import bzrlib
24
 
from bzrlib import urlutils
25
 
from bzrlib.errors import (NoSuchFile, FileExists,
26
 
                           TransportNotPossible,
27
 
                           ConnectionError,
28
 
                           DependencyNotPresent,
29
 
                           UnsupportedProtocol,
30
 
                           PathNotChild,
31
 
                           )
32
 
from bzrlib.tests import TestCase, TestCaseInTempDir
33
 
from bzrlib.transport import (_CoalescedOffset,
34
 
                              _get_protocol_handlers,
35
 
                              _get_transport_modules,
36
 
                              get_transport,
37
 
                              register_lazy_transport,
38
 
                              _set_protocol_handlers,
39
 
                              Transport,
40
 
                              )
41
 
from bzrlib.transport.memory import MemoryTransport
42
 
from bzrlib.transport.local import (LocalTransport,
43
 
                                    EmulatedWin32LocalTransport)
 
25
from bzrlib import (
 
26
    errors,
 
27
    osutils,
 
28
    tests,
 
29
    transport,
 
30
    urlutils,
 
31
    )
 
32
from bzrlib.directory_service import directories
 
33
from bzrlib.transport import (
 
34
    chroot,
 
35
    fakenfs,
 
36
    http,
 
37
    local,
 
38
    location_to_url,
 
39
    memory,
 
40
    pathfilter,
 
41
    readonly,
 
42
    )
 
43
import bzrlib.transport.trace
 
44
from bzrlib.tests import (
 
45
    features,
 
46
    test_server,
 
47
    )
44
48
 
45
49
 
46
50
# TODO: Should possibly split transport-specific tests into their own files.
47
51
 
48
52
 
49
 
class TestTransport(TestCase):
 
53
class TestTransport(tests.TestCase):
50
54
    """Test the non transport-concrete class functionality."""
51
55
 
52
56
    def test__get_set_protocol_handlers(self):
53
 
        handlers = _get_protocol_handlers()
54
 
        self.assertNotEqual({}, handlers)
55
 
        try:
56
 
            _set_protocol_handlers({})
57
 
            self.assertEqual({}, _get_protocol_handlers())
58
 
        finally:
59
 
            _set_protocol_handlers(handlers)
 
57
        handlers = transport._get_protocol_handlers()
 
58
        self.assertNotEqual([], handlers.keys())
 
59
        transport._clear_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        self.assertEqual([], transport._get_protocol_handlers().keys())
60
62
 
61
63
    def test_get_transport_modules(self):
62
 
        handlers = _get_protocol_handlers()
 
64
        handlers = transport._get_protocol_handlers()
 
65
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
66
        # don't pollute the current handlers
 
67
        transport._clear_protocol_handlers()
 
68
 
63
69
        class SampleHandler(object):
64
70
            """I exist, isnt that enough?"""
65
 
        try:
66
 
            my_handlers = {}
67
 
            _set_protocol_handlers(my_handlers)
68
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
69
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
70
 
            self.assertEqual([SampleHandler.__module__],
71
 
                             _get_transport_modules())
72
 
        finally:
73
 
            _set_protocol_handlers(handlers)
 
71
        transport._clear_protocol_handlers()
 
72
        transport.register_transport_proto('foo')
 
73
        transport.register_lazy_transport('foo',
 
74
                                            'bzrlib.tests.test_transport',
 
75
                                            'TestTransport.SampleHandler')
 
76
        transport.register_transport_proto('bar')
 
77
        transport.register_lazy_transport('bar',
 
78
                                            'bzrlib.tests.test_transport',
 
79
                                            'TestTransport.SampleHandler')
 
80
        self.assertEqual([SampleHandler.__module__,
 
81
                            'bzrlib.transport.chroot',
 
82
                            'bzrlib.transport.pathfilter'],
 
83
                            transport._get_transport_modules())
74
84
 
75
85
    def test_transport_dependency(self):
76
86
        """Transport with missing dependency causes no error"""
77
 
        saved_handlers = _get_protocol_handlers()
 
87
        saved_handlers = transport._get_protocol_handlers()
 
88
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
89
        # don't pollute the current handlers
 
90
        transport._clear_protocol_handlers()
 
91
        transport.register_transport_proto('foo')
 
92
        transport.register_lazy_transport(
 
93
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
78
94
        try:
79
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
80
 
                    'BadTransportHandler')
81
 
            try:
82
 
                get_transport('foo://fooserver/foo')
83
 
            except UnsupportedProtocol, e:
84
 
                e_str = str(e)
85
 
                self.assertEquals('Unsupported protocol'
86
 
                                  ' for url "foo://fooserver/foo":'
87
 
                                  ' Unable to import library "some_lib":'
88
 
                                  ' testing missing dependency', str(e))
89
 
            else:
90
 
                self.fail('Did not raise UnsupportedProtocol')
91
 
        finally:
92
 
            # restore original values
93
 
            _set_protocol_handlers(saved_handlers)
94
 
            
 
95
            transport.get_transport_from_url('foo://fooserver/foo')
 
96
        except errors.UnsupportedProtocol, e:
 
97
            e_str = str(e)
 
98
            self.assertEquals('Unsupported protocol'
 
99
                                ' for url "foo://fooserver/foo":'
 
100
                                ' Unable to import library "some_lib":'
 
101
                                ' testing missing dependency', str(e))
 
102
        else:
 
103
            self.fail('Did not raise UnsupportedProtocol')
 
104
 
95
105
    def test_transport_fallback(self):
96
106
        """Transport with missing dependency causes no error"""
97
 
        saved_handlers = _get_protocol_handlers()
98
 
        try:
99
 
            _set_protocol_handlers({})
100
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
101
 
                    'BackupTransportHandler')
102
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
 
                    'BadTransportHandler')
104
 
            t = get_transport('foo://fooserver/foo')
105
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
106
 
        finally:
107
 
            _set_protocol_handlers(saved_handlers)
108
 
 
109
 
    def test__combine_paths(self):
110
 
        t = Transport('/')
111
 
        self.assertEqual('/home/sarah/project/foo',
112
 
                         t._combine_paths('/home/sarah', 'project/foo'))
113
 
        self.assertEqual('/etc',
114
 
                         t._combine_paths('/home/sarah', '../../etc'))
115
 
        self.assertEqual('/etc',
116
 
                         t._combine_paths('/home/sarah', '../../../etc'))
117
 
        self.assertEqual('/etc',
118
 
                         t._combine_paths('/home/sarah', '/etc'))
119
 
 
120
 
 
121
 
class TestCoalesceOffsets(TestCase):
122
 
    
123
 
    def check(self, expected, offsets, limit=0, fudge=0):
124
 
        coalesce = Transport._coalesce_offsets
125
 
        exp = [_CoalescedOffset(*x) for x in expected]
126
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
107
        saved_handlers = transport._get_protocol_handlers()
 
108
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
109
        transport._clear_protocol_handlers()
 
110
        transport.register_transport_proto('foo')
 
111
        transport.register_lazy_transport(
 
112
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
115
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
116
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
117
 
 
118
    def test_ssh_hints(self):
 
119
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
120
        try:
 
121
            transport.get_transport_from_url('ssh://fooserver/foo')
 
122
        except errors.UnsupportedProtocol, e:
 
123
            e_str = str(e)
 
124
            self.assertEquals('Unsupported protocol'
 
125
                              ' for url "ssh://fooserver/foo":'
 
126
                              ' bzr supports bzr+ssh to operate over ssh,'
 
127
                              ' use "bzr+ssh://fooserver/foo".',
 
128
                              str(e))
 
129
        else:
 
130
            self.fail('Did not raise UnsupportedProtocol')
 
131
 
 
132
    def test_LateReadError(self):
 
133
        """The LateReadError helper should raise on read()."""
 
134
        a_file = transport.LateReadError('a path')
 
135
        try:
 
136
            a_file.read()
 
137
        except errors.ReadError, error:
 
138
            self.assertEqual('a path', error.path)
 
139
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
140
        a_file.close()
 
141
 
 
142
    def test_local_abspath_non_local_transport(self):
 
143
        # the base implementation should throw
 
144
        t = memory.MemoryTransport()
 
145
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
146
        self.assertEqual('memory:///t is not a local path.', str(e))
 
147
 
 
148
 
 
149
class TestCoalesceOffsets(tests.TestCase):
 
150
 
 
151
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
152
        coalesce = transport.Transport._coalesce_offsets
 
153
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
154
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
155
                            max_size=max_size))
127
156
        self.assertEqual(exp, out)
128
157
 
129
158
    def test_coalesce_empty(self):
136
165
        self.check([(0, 10, [(0, 10)]),
137
166
                    (20, 10, [(0, 10)]),
138
167
                   ], [(0, 10), (20, 10)])
139
 
            
 
168
 
140
169
    def test_coalesce_unsorted(self):
141
170
        self.check([(20, 10, [(0, 10)]),
142
171
                    (0, 10, [(0, 10)]),
147
176
                   [(0, 10), (10, 10)])
148
177
 
149
178
    def test_coalesce_overlapped(self):
150
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
151
 
                   [(0, 10), (5, 10)])
 
179
        self.assertRaises(ValueError,
 
180
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
181
                        [(0, 10), (5, 10)])
152
182
 
153
183
    def test_coalesce_limit(self):
154
184
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
171
201
 
172
202
    def test_coalesce_fudge(self):
173
203
        self.check([(10, 30, [(0, 10), (20, 10)]),
174
 
                    (100, 10, [(0, 10),]),
 
204
                    (100, 10, [(0, 10)]),
175
205
                   ], [(10, 10), (30, 10), (100, 10)],
176
 
                   fudge=10
 
206
                   fudge=10)
 
207
 
 
208
    def test_coalesce_max_size(self):
 
209
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
210
                    (30, 50, [(0, 50)]),
 
211
                    # If one range is above max_size, it gets its own coalesced
 
212
                    # offset
 
213
                    (100, 80, [(0, 80)]),],
 
214
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
215
                   max_size=50)
 
216
 
 
217
    def test_coalesce_no_max_size(self):
 
218
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
219
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
177
220
                  )
178
221
 
179
 
 
180
 
class TestMemoryTransport(TestCase):
 
222
    def test_coalesce_default_limit(self):
 
223
        # By default we use a 100MB max size.
 
224
        ten_mb = 10 * 1024 * 1024
 
225
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
226
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
227
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
228
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
229
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
230
                   max_size=1*1024*1024*1024)
 
231
 
 
232
 
 
233
class TestMemoryServer(tests.TestCase):
 
234
 
 
235
    def test_create_server(self):
 
236
        server = memory.MemoryServer()
 
237
        server.start_server()
 
238
        url = server.get_url()
 
239
        self.assertTrue(url in transport.transport_list_registry)
 
240
        t = transport.get_transport_from_url(url)
 
241
        del t
 
242
        server.stop_server()
 
243
        self.assertFalse(url in transport.transport_list_registry)
 
244
        self.assertRaises(errors.UnsupportedProtocol,
 
245
                          transport.get_transport, url)
 
246
 
 
247
 
 
248
class TestMemoryTransport(tests.TestCase):
181
249
 
182
250
    def test_get_transport(self):
183
 
        MemoryTransport()
 
251
        memory.MemoryTransport()
184
252
 
185
253
    def test_clone(self):
186
 
        transport = MemoryTransport()
187
 
        self.assertTrue(isinstance(transport, MemoryTransport))
188
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
254
        t = memory.MemoryTransport()
 
255
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
256
        self.assertEqual("memory:///", t.clone("/").base)
189
257
 
190
258
    def test_abspath(self):
191
 
        transport = MemoryTransport()
192
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
259
        t = memory.MemoryTransport()
 
260
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
193
261
 
194
262
    def test_abspath_of_root(self):
195
 
        transport = MemoryTransport()
196
 
        self.assertEqual("memory:///", transport.base)
197
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
263
        t = memory.MemoryTransport()
 
264
        self.assertEqual("memory:///", t.base)
 
265
        self.assertEqual("memory:///", t.abspath('/'))
198
266
 
199
267
    def test_abspath_of_relpath_starting_at_root(self):
200
 
        transport = MemoryTransport()
201
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
268
        t = memory.MemoryTransport()
 
269
        self.assertEqual("memory:///foo", t.abspath('/foo'))
202
270
 
203
271
    def test_append_and_get(self):
204
 
        transport = MemoryTransport()
205
 
        transport.append_bytes('path', 'content')
206
 
        self.assertEqual(transport.get('path').read(), 'content')
207
 
        transport.append_file('path', StringIO('content'))
208
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
272
        t = memory.MemoryTransport()
 
273
        t.append_bytes('path', 'content')
 
274
        self.assertEqual(t.get('path').read(), 'content')
 
275
        t.append_file('path', StringIO('content'))
 
276
        self.assertEqual(t.get('path').read(), 'contentcontent')
209
277
 
210
278
    def test_put_and_get(self):
211
 
        transport = MemoryTransport()
212
 
        transport.put_file('path', StringIO('content'))
213
 
        self.assertEqual(transport.get('path').read(), 'content')
214
 
        transport.put_bytes('path', 'content')
215
 
        self.assertEqual(transport.get('path').read(), 'content')
 
279
        t = memory.MemoryTransport()
 
280
        t.put_file('path', StringIO('content'))
 
281
        self.assertEqual(t.get('path').read(), 'content')
 
282
        t.put_bytes('path', 'content')
 
283
        self.assertEqual(t.get('path').read(), 'content')
216
284
 
217
285
    def test_append_without_dir_fails(self):
218
 
        transport = MemoryTransport()
219
 
        self.assertRaises(NoSuchFile,
220
 
                          transport.append_bytes, 'dir/path', 'content')
 
286
        t = memory.MemoryTransport()
 
287
        self.assertRaises(errors.NoSuchFile,
 
288
                          t.append_bytes, 'dir/path', 'content')
221
289
 
222
290
    def test_put_without_dir_fails(self):
223
 
        transport = MemoryTransport()
224
 
        self.assertRaises(NoSuchFile,
225
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
291
        t = memory.MemoryTransport()
 
292
        self.assertRaises(errors.NoSuchFile,
 
293
                          t.put_file, 'dir/path', StringIO('content'))
226
294
 
227
295
    def test_get_missing(self):
228
 
        transport = MemoryTransport()
229
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
296
        transport = memory.MemoryTransport()
 
297
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
230
298
 
231
299
    def test_has_missing(self):
232
 
        transport = MemoryTransport()
233
 
        self.assertEquals(False, transport.has('foo'))
 
300
        t = memory.MemoryTransport()
 
301
        self.assertEquals(False, t.has('foo'))
234
302
 
235
303
    def test_has_present(self):
236
 
        transport = MemoryTransport()
237
 
        transport.append_bytes('foo', 'content')
238
 
        self.assertEquals(True, transport.has('foo'))
 
304
        t = memory.MemoryTransport()
 
305
        t.append_bytes('foo', 'content')
 
306
        self.assertEquals(True, t.has('foo'))
239
307
 
240
308
    def test_list_dir(self):
241
 
        transport = MemoryTransport()
242
 
        transport.put_bytes('foo', 'content')
243
 
        transport.mkdir('dir')
244
 
        transport.put_bytes('dir/subfoo', 'content')
245
 
        transport.put_bytes('dirlike', 'content')
 
309
        t = memory.MemoryTransport()
 
310
        t.put_bytes('foo', 'content')
 
311
        t.mkdir('dir')
 
312
        t.put_bytes('dir/subfoo', 'content')
 
313
        t.put_bytes('dirlike', 'content')
246
314
 
247
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
248
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
315
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
316
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
249
317
 
250
318
    def test_mkdir(self):
251
 
        transport = MemoryTransport()
252
 
        transport.mkdir('dir')
253
 
        transport.append_bytes('dir/path', 'content')
254
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
319
        t = memory.MemoryTransport()
 
320
        t.mkdir('dir')
 
321
        t.append_bytes('dir/path', 'content')
 
322
        self.assertEqual(t.get('dir/path').read(), 'content')
255
323
 
256
324
    def test_mkdir_missing_parent(self):
257
 
        transport = MemoryTransport()
258
 
        self.assertRaises(NoSuchFile,
259
 
                          transport.mkdir, 'dir/dir')
 
325
        t = memory.MemoryTransport()
 
326
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
260
327
 
261
328
    def test_mkdir_twice(self):
262
 
        transport = MemoryTransport()
263
 
        transport.mkdir('dir')
264
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
329
        t = memory.MemoryTransport()
 
330
        t.mkdir('dir')
 
331
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
265
332
 
266
333
    def test_parameters(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual(True, transport.listable())
269
 
        self.assertEqual(False, transport.should_cache())
270
 
        self.assertEqual(False, transport.is_readonly())
 
334
        t = memory.MemoryTransport()
 
335
        self.assertEqual(True, t.listable())
 
336
        self.assertEqual(False, t.is_readonly())
271
337
 
272
338
    def test_iter_files_recursive(self):
273
 
        transport = MemoryTransport()
274
 
        transport.mkdir('dir')
275
 
        transport.put_bytes('dir/foo', 'content')
276
 
        transport.put_bytes('dir/bar', 'content')
277
 
        transport.put_bytes('bar', 'content')
278
 
        paths = set(transport.iter_files_recursive())
 
339
        t = memory.MemoryTransport()
 
340
        t.mkdir('dir')
 
341
        t.put_bytes('dir/foo', 'content')
 
342
        t.put_bytes('dir/bar', 'content')
 
343
        t.put_bytes('bar', 'content')
 
344
        paths = set(t.iter_files_recursive())
279
345
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
280
346
 
281
347
    def test_stat(self):
282
 
        transport = MemoryTransport()
283
 
        transport.put_bytes('foo', 'content')
284
 
        transport.put_bytes('bar', 'phowar')
285
 
        self.assertEqual(7, transport.stat('foo').st_size)
286
 
        self.assertEqual(6, transport.stat('bar').st_size)
287
 
 
288
 
 
289
 
class ChrootDecoratorTransportTest(TestCase):
 
348
        t = memory.MemoryTransport()
 
349
        t.put_bytes('foo', 'content')
 
350
        t.put_bytes('bar', 'phowar')
 
351
        self.assertEqual(7, t.stat('foo').st_size)
 
352
        self.assertEqual(6, t.stat('bar').st_size)
 
353
 
 
354
 
 
355
class ChrootDecoratorTransportTest(tests.TestCase):
290
356
    """Chroot decoration specific tests."""
291
357
 
 
358
    def test_abspath(self):
 
359
        # The abspath is always relative to the chroot_url.
 
360
        server = chroot.ChrootServer(
 
361
            transport.get_transport_from_url('memory:///foo/bar/'))
 
362
        self.start_server(server)
 
363
        t = transport.get_transport_from_url(server.get_url())
 
364
        self.assertEqual(server.get_url(), t.abspath('/'))
 
365
 
 
366
        subdir_t = t.clone('subdir')
 
367
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
368
 
 
369
    def test_clone(self):
 
370
        server = chroot.ChrootServer(
 
371
            transport.get_transport_from_url('memory:///foo/bar/'))
 
372
        self.start_server(server)
 
373
        t = transport.get_transport_from_url(server.get_url())
 
374
        # relpath from root and root path are the same
 
375
        relpath_cloned = t.clone('foo')
 
376
        abspath_cloned = t.clone('/foo')
 
377
        self.assertEqual(server, relpath_cloned.server)
 
378
        self.assertEqual(server, abspath_cloned.server)
 
379
 
 
380
    def test_chroot_url_preserves_chroot(self):
 
381
        """Calling get_transport on a chroot transport's base should produce a
 
382
        transport with exactly the same behaviour as the original chroot
 
383
        transport.
 
384
 
 
385
        This is so that it is not possible to escape a chroot by doing::
 
386
            url = chroot_transport.base
 
387
            parent_url = urlutils.join(url, '..')
 
388
            new_t = transport.get_transport_from_url(parent_url)
 
389
        """
 
390
        server = chroot.ChrootServer(
 
391
            transport.get_transport_from_url('memory:///path/subpath'))
 
392
        self.start_server(server)
 
393
        t = transport.get_transport_from_url(server.get_url())
 
394
        new_t = transport.get_transport_from_url(t.base)
 
395
        self.assertEqual(t.server, new_t.server)
 
396
        self.assertEqual(t.base, new_t.base)
 
397
 
 
398
    def test_urljoin_preserves_chroot(self):
 
399
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
400
        URL that escapes the intended chroot.
 
401
 
 
402
        This is so that it is not possible to escape a chroot by doing::
 
403
            url = chroot_transport.base
 
404
            parent_url = urlutils.join(url, '..')
 
405
            new_t = transport.get_transport_from_url(parent_url)
 
406
        """
 
407
        server = chroot.ChrootServer(
 
408
            transport.get_transport_from_url('memory:///path/'))
 
409
        self.start_server(server)
 
410
        t = transport.get_transport_from_url(server.get_url())
 
411
        self.assertRaises(
 
412
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
413
 
 
414
 
 
415
class TestChrootServer(tests.TestCase):
 
416
 
292
417
    def test_construct(self):
293
 
        from bzrlib.transport import chroot
294
 
        transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
295
 
        self.assertEqual('memory:///pathA/', transport.chroot_url)
296
 
 
297
 
        transport = chroot.ChrootTransportDecorator(
298
 
            'chroot+memory:///path/B', chroot='memory:///path/')
299
 
        self.assertEqual('memory:///path/', transport.chroot_url)
300
 
 
301
 
    def test_append_file(self):
302
 
        transport = get_transport('chroot+memory:///foo/bar')
303
 
        self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
304
 
 
305
 
    def test_append_bytes(self):
306
 
        transport = get_transport('chroot+memory:///foo/bar')
307
 
        self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
 
418
        backing_transport = memory.MemoryTransport()
 
419
        server = chroot.ChrootServer(backing_transport)
 
420
        self.assertEqual(backing_transport, server.backing_transport)
 
421
 
 
422
    def test_setUp(self):
 
423
        backing_transport = memory.MemoryTransport()
 
424
        server = chroot.ChrootServer(backing_transport)
 
425
        server.start_server()
 
426
        self.addCleanup(server.stop_server)
 
427
        self.assertTrue(server.scheme
 
428
                        in transport._get_protocol_handlers().keys())
 
429
 
 
430
    def test_stop_server(self):
 
431
        backing_transport = memory.MemoryTransport()
 
432
        server = chroot.ChrootServer(backing_transport)
 
433
        server.start_server()
 
434
        server.stop_server()
 
435
        self.assertFalse(server.scheme
 
436
                         in transport._get_protocol_handlers().keys())
 
437
 
 
438
    def test_get_url(self):
 
439
        backing_transport = memory.MemoryTransport()
 
440
        server = chroot.ChrootServer(backing_transport)
 
441
        server.start_server()
 
442
        self.addCleanup(server.stop_server)
 
443
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
444
 
 
445
 
 
446
class TestHooks(tests.TestCase):
 
447
    """Basic tests for transport hooks"""
 
448
 
 
449
    def _get_connected_transport(self):
 
450
        return transport.ConnectedTransport("bogus:nowhere")
 
451
 
 
452
    def test_transporthooks_initialisation(self):
 
453
        """Check all expected transport hook points are set up"""
 
454
        hookpoint = transport.TransportHooks()
 
455
        self.assertTrue("post_connect" in hookpoint,
 
456
            "post_connect not in %s" % (hookpoint,))
 
457
 
 
458
    def test_post_connect(self):
 
459
        """Ensure the post_connect hook is called when _set_transport is"""
 
460
        calls = []
 
461
        transport.Transport.hooks.install_named_hook("post_connect",
 
462
            calls.append, None)
 
463
        t = self._get_connected_transport()
 
464
        self.assertLength(0, calls)
 
465
        t._set_connection("connection", "auth")
 
466
        self.assertEqual(calls, [t])
 
467
 
 
468
 
 
469
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
470
    """Pathfilter decoration specific tests."""
 
471
 
 
472
    def test_abspath(self):
 
473
        # The abspath is always relative to the base of the backing transport.
 
474
        server = pathfilter.PathFilteringServer(
 
475
            transport.get_transport_from_url('memory:///foo/bar/'),
 
476
            lambda x: x)
 
477
        server.start_server()
 
478
        t = transport.get_transport_from_url(server.get_url())
 
479
        self.assertEqual(server.get_url(), t.abspath('/'))
 
480
 
 
481
        subdir_t = t.clone('subdir')
 
482
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
483
        server.stop_server()
 
484
 
 
485
    def make_pf_transport(self, filter_func=None):
 
486
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
487
 
 
488
        :param filter_func: by default this will be a no-op function.  Use this
 
489
            parameter to override it."""
 
490
        if filter_func is None:
 
491
            filter_func = lambda x: x
 
492
        server = pathfilter.PathFilteringServer(
 
493
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
 
494
        server.start_server()
 
495
        self.addCleanup(server.stop_server)
 
496
        return transport.get_transport_from_url(server.get_url())
 
497
 
 
498
    def test__filter(self):
 
499
        # _filter (with an identity func as filter_func) always returns
 
500
        # paths relative to the base of the backing transport.
 
501
        t = self.make_pf_transport()
 
502
        self.assertEqual('foo', t._filter('foo'))
 
503
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
504
        self.assertEqual('', t._filter('..'))
 
505
        self.assertEqual('', t._filter('/'))
 
506
        # The base of the pathfiltering transport is taken into account too.
 
507
        t = t.clone('subdir1/subdir2')
 
508
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
509
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
510
        self.assertEqual('subdir1', t._filter('..'))
 
511
        self.assertEqual('', t._filter('/'))
 
512
 
 
513
    def test_filter_invocation(self):
 
514
        filter_log = []
 
515
 
 
516
        def filter(path):
 
517
            filter_log.append(path)
 
518
            return path
 
519
        t = self.make_pf_transport(filter)
 
520
        t.has('abc')
 
521
        self.assertEqual(['abc'], filter_log)
 
522
        del filter_log[:]
 
523
        t.clone('abc').has('xyz')
 
524
        self.assertEqual(['abc/xyz'], filter_log)
 
525
        del filter_log[:]
 
526
        t.has('/abc')
 
527
        self.assertEqual(['abc'], filter_log)
308
528
 
309
529
    def test_clone(self):
310
 
        transport = get_transport('chroot+memory:///foo/bar')
311
 
        self.assertRaises(PathNotChild, transport.clone, '/foo')
312
 
 
313
 
    def test_delete(self):
314
 
        transport = get_transport('chroot+memory:///foo/bar')
315
 
        self.assertRaises(PathNotChild, transport.delete, '/foo')
316
 
 
317
 
    def test_delete_tree(self):
318
 
        transport = get_transport('chroot+memory:///foo/bar')
319
 
        self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
320
 
 
321
 
    def test_get(self):
322
 
        transport = get_transport('chroot+memory:///foo/bar')
323
 
        self.assertRaises(PathNotChild, transport.get, '/foo')
324
 
 
325
 
    def test_get_bytes(self):
326
 
        transport = get_transport('chroot+memory:///foo/bar')
327
 
        self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
328
 
 
329
 
    def test_has(self):
330
 
        transport = get_transport('chroot+memory:///foo/bar')
331
 
        self.assertRaises(PathNotChild, transport.has, '/foo')
332
 
 
333
 
    def test_list_dir(self):
334
 
        transport = get_transport('chroot+memory:///foo/bar')
335
 
        self.assertRaises(PathNotChild, transport.list_dir, '/foo')
336
 
 
337
 
    def test_lock_read(self):
338
 
        transport = get_transport('chroot+memory:///foo/bar')
339
 
        self.assertRaises(PathNotChild, transport.lock_read, '/foo')
340
 
 
341
 
    def test_lock_write(self):
342
 
        transport = get_transport('chroot+memory:///foo/bar')
343
 
        self.assertRaises(PathNotChild, transport.lock_write, '/foo')
344
 
 
345
 
    def test_mkdir(self):
346
 
        transport = get_transport('chroot+memory:///foo/bar')
347
 
        self.assertRaises(PathNotChild, transport.mkdir, '/foo')
348
 
 
349
 
    def test_put_bytes(self):
350
 
        transport = get_transport('chroot+memory:///foo/bar')
351
 
        self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
352
 
 
353
 
    def test_put_file(self):
354
 
        transport = get_transport('chroot+memory:///foo/bar')
355
 
        self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
356
 
 
357
 
    def test_rename(self):
358
 
        transport = get_transport('chroot+memory:///foo/bar')
359
 
        self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
360
 
        self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
361
 
 
362
 
    def test_rmdir(self):
363
 
        transport = get_transport('chroot+memory:///foo/bar')
364
 
        self.assertRaises(PathNotChild, transport.rmdir, '/foo')
365
 
 
366
 
    def test_stat(self):
367
 
        transport = get_transport('chroot+memory:///foo/bar')
368
 
        self.assertRaises(PathNotChild, transport.stat, '/foo')
369
 
 
370
 
 
371
 
class ReadonlyDecoratorTransportTest(TestCase):
 
530
        t = self.make_pf_transport()
 
531
        # relpath from root and root path are the same
 
532
        relpath_cloned = t.clone('foo')
 
533
        abspath_cloned = t.clone('/foo')
 
534
        self.assertEqual(t.server, relpath_cloned.server)
 
535
        self.assertEqual(t.server, abspath_cloned.server)
 
536
 
 
537
    def test_url_preserves_pathfiltering(self):
 
538
        """Calling get_transport on a pathfiltered transport's base should
 
539
        produce a transport with exactly the same behaviour as the original
 
540
        pathfiltered transport.
 
541
 
 
542
        This is so that it is not possible to escape (accidentally or
 
543
        otherwise) the filtering by doing::
 
544
            url = filtered_transport.base
 
545
            parent_url = urlutils.join(url, '..')
 
546
            new_t = transport.get_transport_from_url(parent_url)
 
547
        """
 
548
        t = self.make_pf_transport()
 
549
        new_t = transport.get_transport_from_url(t.base)
 
550
        self.assertEqual(t.server, new_t.server)
 
551
        self.assertEqual(t.base, new_t.base)
 
552
 
 
553
 
 
554
class ReadonlyDecoratorTransportTest(tests.TestCase):
372
555
    """Readonly decoration specific tests."""
373
556
 
374
557
    def test_local_parameters(self):
375
 
        import bzrlib.transport.readonly as readonly
376
558
        # connect to . in readonly mode
377
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
378
 
        self.assertEqual(True, transport.listable())
379
 
        self.assertEqual(False, transport.should_cache())
380
 
        self.assertEqual(True, transport.is_readonly())
 
559
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
560
        self.assertEqual(True, t.listable())
 
561
        self.assertEqual(True, t.is_readonly())
381
562
 
382
563
    def test_http_parameters(self):
383
 
        from bzrlib.tests.HttpServer import HttpServer
384
 
        import bzrlib.transport.readonly as readonly
385
 
        # connect to . via http which is not listable
 
564
        from bzrlib.tests.http_server import HttpServer
 
565
        # connect to '.' via http which is not listable
386
566
        server = HttpServer()
387
 
        server.setUp()
388
 
        try:
389
 
            transport = get_transport('readonly+' + server.get_url())
390
 
            self.failUnless(isinstance(transport,
391
 
                                       readonly.ReadonlyTransportDecorator))
392
 
            self.assertEqual(False, transport.listable())
393
 
            self.assertEqual(True, transport.should_cache())
394
 
            self.assertEqual(True, transport.is_readonly())
395
 
        finally:
396
 
            server.tearDown()
397
 
 
398
 
 
399
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
567
        self.start_server(server)
 
568
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
569
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
570
        self.assertEqual(False, t.listable())
 
571
        self.assertEqual(True, t.is_readonly())
 
572
 
 
573
 
 
574
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
400
575
    """NFS decorator specific tests."""
401
576
 
402
577
    def get_nfs_transport(self, url):
403
 
        import bzrlib.transport.fakenfs as fakenfs
404
578
        # connect to url with nfs decoration
405
579
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
406
580
 
407
581
    def test_local_parameters(self):
408
 
        # the listable, should_cache and is_readonly parameters
 
582
        # the listable and is_readonly parameters
409
583
        # are not changed by the fakenfs decorator
410
 
        transport = self.get_nfs_transport('.')
411
 
        self.assertEqual(True, transport.listable())
412
 
        self.assertEqual(False, transport.should_cache())
413
 
        self.assertEqual(False, transport.is_readonly())
 
584
        t = self.get_nfs_transport('.')
 
585
        self.assertEqual(True, t.listable())
 
586
        self.assertEqual(False, t.is_readonly())
414
587
 
415
588
    def test_http_parameters(self):
416
 
        # the listable, should_cache and is_readonly parameters
 
589
        # the listable and is_readonly parameters
417
590
        # are not changed by the fakenfs decorator
418
 
        from bzrlib.tests.HttpServer import HttpServer
419
 
        # connect to . via http which is not listable
 
591
        from bzrlib.tests.http_server import HttpServer
 
592
        # connect to '.' via http which is not listable
420
593
        server = HttpServer()
421
 
        server.setUp()
422
 
        try:
423
 
            transport = self.get_nfs_transport(server.get_url())
424
 
            self.assertIsInstance(
425
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
426
 
            self.assertEqual(False, transport.listable())
427
 
            self.assertEqual(True, transport.should_cache())
428
 
            self.assertEqual(True, transport.is_readonly())
429
 
        finally:
430
 
            server.tearDown()
 
594
        self.start_server(server)
 
595
        t = self.get_nfs_transport(server.get_url())
 
596
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
597
        self.assertEqual(False, t.listable())
 
598
        self.assertEqual(True, t.is_readonly())
431
599
 
432
600
    def test_fakenfs_server_default(self):
433
601
        # a FakeNFSServer() should bring up a local relpath server for itself
434
 
        import bzrlib.transport.fakenfs as fakenfs
435
 
        server = fakenfs.FakeNFSServer()
436
 
        server.setUp()
437
 
        try:
438
 
            # the url should be decorated appropriately
439
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
440
 
            # and we should be able to get a transport for it
441
 
            transport = get_transport(server.get_url())
442
 
            # which must be a FakeNFSTransportDecorator instance.
443
 
            self.assertIsInstance(
444
 
                transport, fakenfs.FakeNFSTransportDecorator)
445
 
        finally:
446
 
            server.tearDown()
 
602
        server = test_server.FakeNFSServer()
 
603
        self.start_server(server)
 
604
        # the url should be decorated appropriately
 
605
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
606
        # and we should be able to get a transport for it
 
607
        t = transport.get_transport_from_url(server.get_url())
 
608
        # which must be a FakeNFSTransportDecorator instance.
 
609
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
447
610
 
448
611
    def test_fakenfs_rename_semantics(self):
449
612
        # a FakeNFS transport must mangle the way rename errors occur to
450
613
        # look like NFS problems.
451
 
        transport = self.get_nfs_transport('.')
 
614
        t = self.get_nfs_transport('.')
452
615
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
453
 
                        transport=transport)
454
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
455
 
                          transport.rename, 'from', 'to')
456
 
 
457
 
 
458
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
616
                        transport=t)
 
617
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
618
 
 
619
 
 
620
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
459
621
    """Tests for simulation of VFAT restrictions"""
460
622
 
461
623
    def get_vfat_transport(self, url):
465
627
 
466
628
    def test_transport_creation(self):
467
629
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
468
 
        transport = self.get_vfat_transport('.')
469
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
630
        t = self.get_vfat_transport('.')
 
631
        self.assertIsInstance(t, FakeVFATTransportDecorator)
470
632
 
471
633
    def test_transport_mkdir(self):
472
 
        transport = self.get_vfat_transport('.')
473
 
        transport.mkdir('HELLO')
474
 
        self.assertTrue(transport.has('hello'))
475
 
        self.assertTrue(transport.has('Hello'))
 
634
        t = self.get_vfat_transport('.')
 
635
        t.mkdir('HELLO')
 
636
        self.assertTrue(t.has('hello'))
 
637
        self.assertTrue(t.has('Hello'))
476
638
 
477
639
    def test_forbidden_chars(self):
478
 
        transport = self.get_vfat_transport('.')
479
 
        self.assertRaises(ValueError, transport.has, "<NU>")
480
 
 
481
 
 
482
 
class BadTransportHandler(Transport):
 
640
        t = self.get_vfat_transport('.')
 
641
        self.assertRaises(ValueError, t.has, "<NU>")
 
642
 
 
643
 
 
644
class BadTransportHandler(transport.Transport):
483
645
    def __init__(self, base_url):
484
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
485
 
 
486
 
 
487
 
class BackupTransportHandler(Transport):
 
646
        raise errors.DependencyNotPresent('some_lib',
 
647
                                          'testing missing dependency')
 
648
 
 
649
 
 
650
class BackupTransportHandler(transport.Transport):
488
651
    """Test transport that works as a backup for the BadTransportHandler"""
489
652
    pass
490
653
 
491
654
 
492
 
class TestTransportImplementation(TestCaseInTempDir):
 
655
class TestTransportImplementation(tests.TestCaseInTempDir):
493
656
    """Implementation verification for transports.
494
 
    
 
657
 
495
658
    To verify a transport we need a server factory, which is a callable
496
659
    that accepts no parameters and returns an implementation of
497
660
    bzrlib.transport.Server.
498
 
    
 
661
 
499
662
    That Server is then used to construct transport instances and test
500
663
    the transport via loopback activity.
501
664
 
502
 
    Currently this assumes that the Transport object is connected to the 
503
 
    current working directory.  So that whatever is done 
504
 
    through the transport, should show up in the working 
 
665
    Currently this assumes that the Transport object is connected to the
 
666
    current working directory.  So that whatever is done
 
667
    through the transport, should show up in the working
505
668
    directory, and vice-versa. This is a bug, because its possible to have
506
 
    URL schemes which provide access to something that may not be 
507
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
669
    URL schemes which provide access to something that may not be
 
670
    result in storage on the local disk, i.e. due to file system limits, or
508
671
    due to it being a database or some other non-filesystem tool.
509
672
 
510
673
    This also tests to make sure that the functions work with both
511
674
    generators and lists (assuming iter(list) is effectively a generator)
512
675
    """
513
 
    
 
676
 
514
677
    def setUp(self):
515
678
        super(TestTransportImplementation, self).setUp()
516
679
        self._server = self.transport_server()
517
 
        self._server.setUp()
518
 
 
519
 
    def tearDown(self):
520
 
        super(TestTransportImplementation, self).tearDown()
521
 
        self._server.tearDown()
522
 
        
523
 
    def get_transport(self):
524
 
        """Return a connected transport to the local directory."""
 
680
        self.start_server(self._server)
 
681
 
 
682
    def get_transport(self, relpath=None):
 
683
        """Return a connected transport to the local directory.
 
684
 
 
685
        :param relpath: a path relative to the base url.
 
686
        """
525
687
        base_url = self._server.get_url()
 
688
        url = self._adjust_url(base_url, relpath)
526
689
        # try getting the transport via the regular interface:
527
 
        t = get_transport(base_url)
 
690
        t = transport.get_transport_from_url(url)
 
691
        # vila--20070607 if the following are commented out the test suite
 
692
        # still pass. Is this really still needed or was it a forgotten
 
693
        # temporary fix ?
528
694
        if not isinstance(t, self.transport_class):
529
695
            # we did not get the correct transport class type. Override the
530
696
            # regular connection behaviour by direct construction.
531
 
            t = self.transport_class(base_url)
 
697
            t = self.transport_class(url)
532
698
        return t
533
699
 
534
700
 
535
 
class TestLocalTransports(TestCase):
 
701
class TestTransportFromPath(tests.TestCaseInTempDir):
 
702
 
 
703
    def test_with_path(self):
 
704
        t = transport.get_transport_from_path(self.test_dir)
 
705
        self.assertIsInstance(t, local.LocalTransport)
 
706
        self.assertEquals(t.base.rstrip("/"),
 
707
            urlutils.local_path_to_url(self.test_dir))
 
708
 
 
709
    def test_with_url(self):
 
710
        t = transport.get_transport_from_path("file:")
 
711
        self.assertIsInstance(t, local.LocalTransport)
 
712
        self.assertEquals(t.base.rstrip("/"),
 
713
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
714
 
 
715
 
 
716
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
717
 
 
718
    def test_with_path(self):
 
719
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
720
            self.test_dir)
 
721
 
 
722
    def test_with_url(self):
 
723
        url = urlutils.local_path_to_url(self.test_dir)
 
724
        t = transport.get_transport_from_url(url)
 
725
        self.assertIsInstance(t, local.LocalTransport)
 
726
        self.assertEquals(t.base.rstrip("/"), url)
 
727
 
 
728
    def test_with_url_and_segment_parameters(self):
 
729
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
730
        t = transport.get_transport_from_url(url)
 
731
        self.assertIsInstance(t, local.LocalTransport)
 
732
        self.assertEquals(t.base.rstrip("/"), url)
 
733
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
734
            f.write("data")
 
735
        self.assertTrue(t.has("afile"))
 
736
 
 
737
 
 
738
class TestLocalTransports(tests.TestCase):
536
739
 
537
740
    def test_get_transport_from_abspath(self):
538
 
        here = os.path.abspath('.')
539
 
        t = get_transport(here)
540
 
        self.assertIsInstance(t, LocalTransport)
 
741
        here = osutils.abspath('.')
 
742
        t = transport.get_transport(here)
 
743
        self.assertIsInstance(t, local.LocalTransport)
541
744
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
542
745
 
543
746
    def test_get_transport_from_relpath(self):
544
 
        here = os.path.abspath('.')
545
 
        t = get_transport('.')
546
 
        self.assertIsInstance(t, LocalTransport)
 
747
        here = osutils.abspath('.')
 
748
        t = transport.get_transport('.')
 
749
        self.assertIsInstance(t, local.LocalTransport)
547
750
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
548
751
 
549
752
    def test_get_transport_from_local_url(self):
550
 
        here = os.path.abspath('.')
 
753
        here = osutils.abspath('.')
551
754
        here_url = urlutils.local_path_to_url(here) + '/'
552
 
        t = get_transport(here_url)
553
 
        self.assertIsInstance(t, LocalTransport)
 
755
        t = transport.get_transport(here_url)
 
756
        self.assertIsInstance(t, local.LocalTransport)
554
757
        self.assertEquals(t.base, here_url)
555
758
 
556
 
 
557
 
class TestWin32LocalTransport(TestCase):
 
759
    def test_local_abspath(self):
 
760
        here = osutils.abspath('.')
 
761
        t = transport.get_transport(here)
 
762
        self.assertEquals(t.local_abspath(''), here)
 
763
 
 
764
 
 
765
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
766
 
 
767
    def test_local_transport_mkdir(self):
 
768
        here = osutils.abspath('.')
 
769
        t = transport.get_transport(here)
 
770
        t.mkdir('test')
 
771
        self.assertTrue(os.path.exists('test'))
 
772
 
 
773
    def test_local_transport_mkdir_permission_denied(self):
 
774
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
775
        here = osutils.abspath('.')
 
776
        t = transport.get_transport(here)
 
777
        def fake_chmod(path, mode):
 
778
            e = OSError('permission denied')
 
779
            e.errno = errno.EPERM
 
780
            raise e
 
781
        self.overrideAttr(os, 'chmod', fake_chmod)
 
782
        t.mkdir('test')
 
783
        t.mkdir('test2', mode=0707)
 
784
        self.assertTrue(os.path.exists('test'))
 
785
        self.assertTrue(os.path.exists('test2'))
 
786
 
 
787
 
 
788
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
789
 
 
790
    def test_local_fdatasync_calls_fdatasync(self):
 
791
        """Check fdatasync on a stream tries to flush the data to the OS.
 
792
        
 
793
        We can't easily observe the external effect but we can at least see
 
794
        it's called.
 
795
        """
 
796
        sentinel = object()
 
797
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
798
        if fdatasync is sentinel:
 
799
            raise tests.TestNotApplicable('fdatasync not supported')
 
800
        t = self.get_transport('.')
 
801
        calls = self.recordCalls(os, 'fdatasync')
 
802
        w = t.open_write_stream('out')
 
803
        w.write('foo')
 
804
        w.fdatasync()
 
805
        with open('out', 'rb') as f:
 
806
            # Should have been flushed.
 
807
            self.assertEquals(f.read(), 'foo')
 
808
        self.assertEquals(len(calls), 1, calls)
 
809
 
 
810
    def test_missing_directory(self):
 
811
        t = self.get_transport('.')
 
812
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
 
813
 
 
814
 
 
815
class TestWin32LocalTransport(tests.TestCase):
558
816
 
559
817
    def test_unc_clone_to_root(self):
560
818
        # Win32 UNC path like \\HOST\path
561
819
        # clone to root should stop at least at \\HOST part
562
820
        # not on \\
563
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
821
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
564
822
        for i in xrange(4):
565
823
            t = t.clone('..')
566
824
        self.assertEquals(t.base, 'file://HOST/')
567
825
        # make sure we reach the root
568
826
        t = t.clone('..')
569
827
        self.assertEquals(t.base, 'file://HOST/')
 
828
 
 
829
 
 
830
class TestConnectedTransport(tests.TestCase):
 
831
    """Tests for connected to remote server transports"""
 
832
 
 
833
    def test_parse_url(self):
 
834
        t = transport.ConnectedTransport(
 
835
            'http://simple.example.com/home/source')
 
836
        self.assertEquals(t._parsed_url.host, 'simple.example.com')
 
837
        self.assertEquals(t._parsed_url.port, None)
 
838
        self.assertEquals(t._parsed_url.path, '/home/source/')
 
839
        self.assertTrue(t._parsed_url.user is None)
 
840
        self.assertTrue(t._parsed_url.password is None)
 
841
 
 
842
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
843
 
 
844
    def test_parse_url_with_at_in_user(self):
 
845
        # Bug 228058
 
846
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
847
        self.assertEquals(t._parsed_url.user, 'user@host.com')
 
848
 
 
849
    def test_parse_quoted_url(self):
 
850
        t = transport.ConnectedTransport(
 
851
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
852
        self.assertEquals(t._parsed_url.host, 'exAmple.com')
 
853
        self.assertEquals(t._parsed_url.port, 2222)
 
854
        self.assertEquals(t._parsed_url.user, 'robey')
 
855
        self.assertEquals(t._parsed_url.password, 'h@t')
 
856
        self.assertEquals(t._parsed_url.path, '/path/')
 
857
 
 
858
        # Base should not keep track of the password
 
859
        self.assertEquals(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
 
860
 
 
861
    def test_parse_invalid_url(self):
 
862
        self.assertRaises(errors.InvalidURL,
 
863
                          transport.ConnectedTransport,
 
864
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
865
 
 
866
    def test_relpath(self):
 
867
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
868
 
 
869
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
870
            'sub')
 
871
        self.assertRaises(errors.PathNotChild, t.relpath,
 
872
                          'http://user@host.com/abs/path/sub')
 
873
        self.assertRaises(errors.PathNotChild, t.relpath,
 
874
                          'sftp://user2@host.com/abs/path/sub')
 
875
        self.assertRaises(errors.PathNotChild, t.relpath,
 
876
                          'sftp://user@otherhost.com/abs/path/sub')
 
877
        self.assertRaises(errors.PathNotChild, t.relpath,
 
878
                          'sftp://user@host.com:33/abs/path/sub')
 
879
        # Make sure it works when we don't supply a username
 
880
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
881
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
882
 
 
883
        # Make sure it works when parts of the path will be url encoded
 
884
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
885
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
886
 
 
887
    def test_connection_sharing_propagate_credentials(self):
 
888
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
889
        self.assertEquals('user', t._parsed_url.user)
 
890
        self.assertEquals('host.com', t._parsed_url.host)
 
891
        self.assertIs(None, t._get_connection())
 
892
        self.assertIs(None, t._parsed_url.password)
 
893
        c = t.clone('subdir')
 
894
        self.assertIs(None, c._get_connection())
 
895
        self.assertIs(None, t._parsed_url.password)
 
896
 
 
897
        # Simulate the user entering a password
 
898
        password = 'secret'
 
899
        connection = object()
 
900
        t._set_connection(connection, password)
 
901
        self.assertIs(connection, t._get_connection())
 
902
        self.assertIs(password, t._get_credentials())
 
903
        self.assertIs(connection, c._get_connection())
 
904
        self.assertIs(password, c._get_credentials())
 
905
 
 
906
        # credentials can be updated
 
907
        new_password = 'even more secret'
 
908
        c._update_credentials(new_password)
 
909
        self.assertIs(connection, t._get_connection())
 
910
        self.assertIs(new_password, t._get_credentials())
 
911
        self.assertIs(connection, c._get_connection())
 
912
        self.assertIs(new_password, c._get_credentials())
 
913
 
 
914
 
 
915
class TestReusedTransports(tests.TestCase):
 
916
    """Tests for transport reuse"""
 
917
 
 
918
    def test_reuse_same_transport(self):
 
919
        possible_transports = []
 
920
        t1 = transport.get_transport_from_url('http://foo/',
 
921
                                     possible_transports=possible_transports)
 
922
        self.assertEqual([t1], possible_transports)
 
923
        t2 = transport.get_transport_from_url('http://foo/',
 
924
                                     possible_transports=[t1])
 
925
        self.assertIs(t1, t2)
 
926
 
 
927
        # Also check that final '/' are handled correctly
 
928
        t3 = transport.get_transport_from_url('http://foo/path/')
 
929
        t4 = transport.get_transport_from_url('http://foo/path',
 
930
                                     possible_transports=[t3])
 
931
        self.assertIs(t3, t4)
 
932
 
 
933
        t5 = transport.get_transport_from_url('http://foo/path')
 
934
        t6 = transport.get_transport_from_url('http://foo/path/',
 
935
                                     possible_transports=[t5])
 
936
        self.assertIs(t5, t6)
 
937
 
 
938
    def test_don_t_reuse_different_transport(self):
 
939
        t1 = transport.get_transport_from_url('http://foo/path')
 
940
        t2 = transport.get_transport_from_url('http://bar/path',
 
941
                                     possible_transports=[t1])
 
942
        self.assertIsNot(t1, t2)
 
943
 
 
944
 
 
945
class TestTransportTrace(tests.TestCase):
 
946
 
 
947
    def test_decorator(self):
 
948
        t = transport.get_transport_from_url('trace+memory://')
 
949
        self.assertIsInstance(
 
950
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
951
 
 
952
    def test_clone_preserves_activity(self):
 
953
        t = transport.get_transport_from_url('trace+memory://')
 
954
        t2 = t.clone('.')
 
955
        self.assertTrue(t is not t2)
 
956
        self.assertTrue(t._activity is t2._activity)
 
957
 
 
958
    # the following specific tests are for the operations that have made use of
 
959
    # logging in tests; we could test every single operation but doing that
 
960
    # still won't cause a test failure when the top level Transport API
 
961
    # changes; so there is little return doing that.
 
962
    def test_get(self):
 
963
        t = transport.get_transport_from_url('trace+memory:///')
 
964
        t.put_bytes('foo', 'barish')
 
965
        t.get('foo')
 
966
        expected_result = []
 
967
        # put_bytes records the bytes, not the content to avoid memory
 
968
        # pressure.
 
969
        expected_result.append(('put_bytes', 'foo', 6, None))
 
970
        # get records the file name only.
 
971
        expected_result.append(('get', 'foo'))
 
972
        self.assertEqual(expected_result, t._activity)
 
973
 
 
974
    def test_readv(self):
 
975
        t = transport.get_transport_from_url('trace+memory:///')
 
976
        t.put_bytes('foo', 'barish')
 
977
        list(t.readv('foo', [(0, 1), (3, 2)],
 
978
                     adjust_for_latency=True, upper_limit=6))
 
979
        expected_result = []
 
980
        # put_bytes records the bytes, not the content to avoid memory
 
981
        # pressure.
 
982
        expected_result.append(('put_bytes', 'foo', 6, None))
 
983
        # readv records the supplied offset request
 
984
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
985
        self.assertEqual(expected_result, t._activity)
 
986
 
 
987
 
 
988
class TestSSHConnections(tests.TestCaseWithTransport):
 
989
 
 
990
    def test_bzr_connect_to_bzr_ssh(self):
 
991
        """get_transport of a bzr+ssh:// behaves correctly.
 
992
 
 
993
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
994
        """
 
995
        # This test actually causes a bzr instance to be invoked, which is very
 
996
        # expensive: it should be the only such test in the test suite.
 
997
        # A reasonable evolution for this would be to simply check inside
 
998
        # check_channel_exec_request that the command is appropriate, and then
 
999
        # satisfy requests in-process.
 
1000
        self.requireFeature(features.paramiko)
 
1001
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
1002
        # override the interface (doesn't change self._vendor).
 
1003
        # Note that this does encryption, so can be slow.
 
1004
        from bzrlib.tests import stub_sftp
 
1005
 
 
1006
        # Start an SSH server
 
1007
        self.command_executed = []
 
1008
        # XXX: This is horrible -- we define a really dumb SSH server that
 
1009
        # executes commands, and manage the hooking up of stdin/out/err to the
 
1010
        # SSH channel ourselves.  Surely this has already been implemented
 
1011
        # elsewhere?
 
1012
        started = []
 
1013
 
 
1014
        class StubSSHServer(stub_sftp.StubServer):
 
1015
 
 
1016
            test = self
 
1017
 
 
1018
            def check_channel_exec_request(self, channel, command):
 
1019
                self.test.command_executed.append(command)
 
1020
                proc = subprocess.Popen(
 
1021
                    command, shell=True, stdin=subprocess.PIPE,
 
1022
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1023
 
 
1024
                # XXX: horribly inefficient, not to mention ugly.
 
1025
                # Start a thread for each of stdin/out/err, and relay bytes
 
1026
                # from the subprocess to channel and vice versa.
 
1027
                def ferry_bytes(read, write, close):
 
1028
                    while True:
 
1029
                        bytes = read(1)
 
1030
                        if bytes == '':
 
1031
                            close()
 
1032
                            break
 
1033
                        write(bytes)
 
1034
 
 
1035
                file_functions = [
 
1036
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
1037
                    (proc.stdout.read, channel.sendall, channel.close),
 
1038
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
1039
                started.append(proc)
 
1040
                for read, write, close in file_functions:
 
1041
                    t = threading.Thread(
 
1042
                        target=ferry_bytes, args=(read, write, close))
 
1043
                    t.start()
 
1044
                    started.append(t)
 
1045
 
 
1046
                return True
 
1047
 
 
1048
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
1049
        # We *don't* want to override the default SSH vendor: the detected one
 
1050
        # is the one to use.
 
1051
 
 
1052
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1053
        # inherits from SFTPServer which forces the SSH vendor to
 
1054
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
1055
        self.start_server(ssh_server)
 
1056
        port = ssh_server.port
 
1057
 
 
1058
        if sys.platform == 'win32':
 
1059
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1060
        else:
 
1061
            bzr_remote_path = self.get_bzr_path()
 
1062
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
1063
 
 
1064
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
1065
        # variable is used to tell bzr what command to run on the remote end.
 
1066
        path_to_branch = osutils.abspath('.')
 
1067
        if sys.platform == 'win32':
 
1068
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
1069
            # prefix a '/' to get the right path.
 
1070
            path_to_branch = '/' + path_to_branch
 
1071
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
1072
        t = transport.get_transport(url)
 
1073
        self.permit_url(t.base)
 
1074
        t.mkdir('foo')
 
1075
 
 
1076
        self.assertEqual(
 
1077
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1078
            self.command_executed)
 
1079
        # Make sure to disconnect, so that the remote process can stop, and we
 
1080
        # can cleanup. Then pause the test until everything is shutdown
 
1081
        t._client._medium.disconnect()
 
1082
        if not started:
 
1083
            return
 
1084
        # First wait for the subprocess
 
1085
        started[0].wait()
 
1086
        # And the rest are threads
 
1087
        for t in started[1:]:
 
1088
            t.join()
 
1089
 
 
1090
 
 
1091
class TestUnhtml(tests.TestCase):
 
1092
 
 
1093
    """Tests for unhtml_roughly"""
 
1094
 
 
1095
    def test_truncation(self):
 
1096
        fake_html = "<p>something!\n" * 1000
 
1097
        result = http.unhtml_roughly(fake_html)
 
1098
        self.assertEquals(len(result), 1000)
 
1099
        self.assertStartsWith(result, " something!")
 
1100
 
 
1101
 
 
1102
class SomeDirectory(object):
 
1103
 
 
1104
    def look_up(self, name, url):
 
1105
        return "http://bar"
 
1106
 
 
1107
 
 
1108
class TestLocationToUrl(tests.TestCase):
 
1109
 
 
1110
    def get_base_location(self):
 
1111
        path = osutils.abspath('/foo/bar')
 
1112
        if path.startswith('/'):
 
1113
            url = 'file://%s' % (path,)
 
1114
        else:
 
1115
            # On Windows, abspaths start with the drive letter, so we have to
 
1116
            # add in the extra '/'
 
1117
            url = 'file:///%s' % (path,)
 
1118
        return path, url
 
1119
 
 
1120
    def test_regular_url(self):
 
1121
        self.assertEquals("file://foo", location_to_url("file://foo"))
 
1122
 
 
1123
    def test_directory(self):
 
1124
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1125
        self.addCleanup(directories.remove, "bar:")
 
1126
        self.assertEquals("http://bar", location_to_url("bar:"))
 
1127
 
 
1128
    def test_unicode_url(self):
 
1129
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1130
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1131
 
 
1132
    def test_unicode_path(self):
 
1133
        path, url = self.get_base_location()
 
1134
        location = path + "\xc3\xaf".decode("utf-8")
 
1135
        url += '%C3%AF'
 
1136
        self.assertEquals(url, location_to_url(location))
 
1137
 
 
1138
    def test_path(self):
 
1139
        path, url = self.get_base_location()
 
1140
        self.assertEquals(url, location_to_url(path))
 
1141
 
 
1142
    def test_relative_file_url(self):
 
1143
        self.assertEquals(urlutils.local_path_to_url(".") + "/bar",
 
1144
            location_to_url("file:bar"))
 
1145
 
 
1146
    def test_absolute_file_url(self):
 
1147
        self.assertEquals("file:///bar", location_to_url("file:/bar"))