~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-06 20:42:40 UTC
  • mto: This revision was merged to the branch mainline in revision 4088.
  • Revision ID: john@arbash-meinel.com-20090306204240-mzjavv31z3gu1x7i
Fix a small bug in setup.py when an extension fails to build

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
import os
19
 
import sys
20
 
import stat
21
18
from cStringIO import StringIO
22
19
 
23
20
import bzrlib
24
 
from bzrlib.errors import (NoSuchFile, FileExists,
25
 
                           TransportNotPossible,
26
 
                           ConnectionError,
27
 
                           DependencyNotPresent,
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
28
33
                           )
29
34
from bzrlib.tests import TestCase, TestCaseInTempDir
30
 
from bzrlib.transport import (_get_protocol_handlers,
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
31
40
                              _get_transport_modules,
32
41
                              get_transport,
 
42
                              LateReadError,
33
43
                              register_lazy_transport,
34
 
                              _set_protocol_handlers,
35
 
                              urlescape,
 
44
                              register_transport_proto,
36
45
                              Transport,
37
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
38
48
from bzrlib.transport.memory import MemoryTransport
39
 
from bzrlib.transport.local import LocalTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
 
 
52
 
 
53
# TODO: Should possibly split transport-specific tests into their own files.
40
54
 
41
55
 
42
56
class TestTransport(TestCase):
43
57
    """Test the non transport-concrete class functionality."""
44
58
 
45
 
    def test_urlescape(self):
46
 
        self.assertEqual('%25', urlescape('%'))
47
 
 
48
59
    def test__get_set_protocol_handlers(self):
49
60
        handlers = _get_protocol_handlers()
50
 
        self.assertNotEqual({}, handlers)
 
61
        self.assertNotEqual([], handlers.keys( ))
51
62
        try:
52
 
            _set_protocol_handlers({})
53
 
            self.assertEqual({}, _get_protocol_handlers())
 
63
            _clear_protocol_handlers()
 
64
            self.assertEqual([], _get_protocol_handlers().keys())
54
65
        finally:
55
66
            _set_protocol_handlers(handlers)
56
67
 
57
68
    def test_get_transport_modules(self):
58
69
        handlers = _get_protocol_handlers()
 
70
        # don't pollute the current handlers
 
71
        _clear_protocol_handlers()
59
72
        class SampleHandler(object):
60
73
            """I exist, isnt that enough?"""
61
74
        try:
62
 
            my_handlers = {}
63
 
            _set_protocol_handlers(my_handlers)
64
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
65
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
66
 
            self.assertEqual([SampleHandler.__module__],
 
75
            _clear_protocol_handlers()
 
76
            register_transport_proto('foo')
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
78
                                    'TestTransport.SampleHandler')
 
79
            register_transport_proto('bar')
 
80
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
81
                                    'TestTransport.SampleHandler')
 
82
            self.assertEqual([SampleHandler.__module__,
 
83
                              'bzrlib.transport.chroot'],
67
84
                             _get_transport_modules())
68
85
        finally:
69
86
            _set_protocol_handlers(handlers)
71
88
    def test_transport_dependency(self):
72
89
        """Transport with missing dependency causes no error"""
73
90
        saved_handlers = _get_protocol_handlers()
 
91
        # don't pollute the current handlers
 
92
        _clear_protocol_handlers()
74
93
        try:
 
94
            register_transport_proto('foo')
75
95
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
76
96
                    'BadTransportHandler')
77
 
            t = get_transport('foo://fooserver/foo')
78
 
            # because we failed to load the transport
79
 
            self.assertTrue(isinstance(t, LocalTransport))
 
97
            try:
 
98
                get_transport('foo://fooserver/foo')
 
99
            except UnsupportedProtocol, e:
 
100
                e_str = str(e)
 
101
                self.assertEquals('Unsupported protocol'
 
102
                                  ' for url "foo://fooserver/foo":'
 
103
                                  ' Unable to import library "some_lib":'
 
104
                                  ' testing missing dependency', str(e))
 
105
            else:
 
106
                self.fail('Did not raise UnsupportedProtocol')
80
107
        finally:
81
108
            # restore original values
82
109
            _set_protocol_handlers(saved_handlers)
83
 
            
 
110
 
84
111
    def test_transport_fallback(self):
85
112
        """Transport with missing dependency causes no error"""
86
113
        saved_handlers = _get_protocol_handlers()
87
114
        try:
88
 
            _set_protocol_handlers({})
 
115
            _clear_protocol_handlers()
 
116
            register_transport_proto('foo')
89
117
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
90
118
                    'BackupTransportHandler')
91
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
94
122
            self.assertTrue(isinstance(t, BackupTransportHandler))
95
123
        finally:
96
124
            _set_protocol_handlers(saved_handlers)
97
 
            
 
125
 
 
126
    def test_ssh_hints(self):
 
127
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
128
        try:
 
129
            get_transport('ssh://fooserver/foo')
 
130
        except UnsupportedProtocol, e:
 
131
            e_str = str(e)
 
132
            self.assertEquals('Unsupported protocol'
 
133
                              ' for url "ssh://fooserver/foo":'
 
134
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
135
                              str(e))
 
136
        else:
 
137
            self.fail('Did not raise UnsupportedProtocol')
 
138
 
 
139
    def test_LateReadError(self):
 
140
        """The LateReadError helper should raise on read()."""
 
141
        a_file = LateReadError('a path')
 
142
        try:
 
143
            a_file.read()
 
144
        except ReadError, error:
 
145
            self.assertEqual('a path', error.path)
 
146
        self.assertRaises(ReadError, a_file.read, 40)
 
147
        a_file.close()
 
148
 
 
149
    def test__combine_paths(self):
 
150
        t = Transport('/')
 
151
        self.assertEqual('/home/sarah/project/foo',
 
152
                         t._combine_paths('/home/sarah', 'project/foo'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '../../etc'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '/etc'))
 
159
 
 
160
    def test_local_abspath_non_local_transport(self):
 
161
        # the base implementation should throw
 
162
        t = MemoryTransport()
 
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
164
        self.assertEqual('memory:///t is not a local path.', str(e))
 
165
 
 
166
 
 
167
class TestCoalesceOffsets(TestCase):
 
168
 
 
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
170
        coalesce = Transport._coalesce_offsets
 
171
        exp = [_CoalescedOffset(*x) for x in expected]
 
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
173
                            max_size=max_size))
 
174
        self.assertEqual(exp, out)
 
175
 
 
176
    def test_coalesce_empty(self):
 
177
        self.check([], [])
 
178
 
 
179
    def test_coalesce_simple(self):
 
180
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
181
 
 
182
    def test_coalesce_unrelated(self):
 
183
        self.check([(0, 10, [(0, 10)]),
 
184
                    (20, 10, [(0, 10)]),
 
185
                   ], [(0, 10), (20, 10)])
 
186
 
 
187
    def test_coalesce_unsorted(self):
 
188
        self.check([(20, 10, [(0, 10)]),
 
189
                    (0, 10, [(0, 10)]),
 
190
                   ], [(20, 10), (0, 10)])
 
191
 
 
192
    def test_coalesce_nearby(self):
 
193
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
194
                   [(0, 10), (10, 10)])
 
195
 
 
196
    def test_coalesce_overlapped(self):
 
197
        self.assertRaises(ValueError,
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
 
200
 
 
201
    def test_coalesce_limit(self):
 
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
203
                              (30, 10), (40, 10)]),
 
204
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
205
                              (30, 10), (40, 10)]),
 
206
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
207
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
208
                       (90, 10), (100, 10)],
 
209
                    limit=5)
 
210
 
 
211
    def test_coalesce_no_limit(self):
 
212
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
213
                               (30, 10), (40, 10), (50, 10),
 
214
                               (60, 10), (70, 10), (80, 10),
 
215
                               (90, 10)]),
 
216
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
217
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
218
                       (90, 10), (100, 10)])
 
219
 
 
220
    def test_coalesce_fudge(self):
 
221
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
222
                    (100, 10, [(0, 10),]),
 
223
                   ], [(10, 10), (30, 10), (100, 10)],
 
224
                   fudge=10
 
225
                  )
 
226
    def test_coalesce_max_size(self):
 
227
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
228
                    (30, 50, [(0, 50)]),
 
229
                    # If one range is above max_size, it gets its own coalesced
 
230
                    # offset
 
231
                    (100, 80, [(0, 80),]),],
 
232
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
233
                   max_size=50
 
234
                  )
 
235
 
 
236
    def test_coalesce_no_max_size(self):
 
237
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
239
                  )
 
240
 
 
241
    def test_coalesce_default_limit(self):
 
242
        # By default we use a 100MB max size.
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
 
250
 
98
251
 
99
252
class TestMemoryTransport(TestCase):
100
253
 
104
257
    def test_clone(self):
105
258
        transport = MemoryTransport()
106
259
        self.assertTrue(isinstance(transport, MemoryTransport))
 
260
        self.assertEqual("memory:///", transport.clone("/").base)
107
261
 
108
262
    def test_abspath(self):
109
263
        transport = MemoryTransport()
110
 
        self.assertEqual("memory:/relpath", transport.abspath('relpath'))
111
 
 
112
 
    def test_relpath(self):
113
 
        transport = MemoryTransport()
 
264
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
265
 
 
266
    def test_abspath_of_root(self):
 
267
        transport = MemoryTransport()
 
268
        self.assertEqual("memory:///", transport.base)
 
269
        self.assertEqual("memory:///", transport.abspath('/'))
 
270
 
 
271
    def test_abspath_of_relpath_starting_at_root(self):
 
272
        transport = MemoryTransport()
 
273
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
114
274
 
115
275
    def test_append_and_get(self):
116
276
        transport = MemoryTransport()
117
 
        transport.append('path', StringIO('content'))
 
277
        transport.append_bytes('path', 'content')
118
278
        self.assertEqual(transport.get('path').read(), 'content')
119
 
        transport.append('path', StringIO('content'))
 
279
        transport.append_file('path', StringIO('content'))
120
280
        self.assertEqual(transport.get('path').read(), 'contentcontent')
121
281
 
122
282
    def test_put_and_get(self):
123
283
        transport = MemoryTransport()
124
 
        transport.put('path', StringIO('content'))
 
284
        transport.put_file('path', StringIO('content'))
125
285
        self.assertEqual(transport.get('path').read(), 'content')
126
 
        transport.put('path', StringIO('content'))
 
286
        transport.put_bytes('path', 'content')
127
287
        self.assertEqual(transport.get('path').read(), 'content')
128
288
 
129
289
    def test_append_without_dir_fails(self):
130
290
        transport = MemoryTransport()
131
291
        self.assertRaises(NoSuchFile,
132
 
                          transport.append, 'dir/path', StringIO('content'))
 
292
                          transport.append_bytes, 'dir/path', 'content')
133
293
 
134
294
    def test_put_without_dir_fails(self):
135
295
        transport = MemoryTransport()
136
296
        self.assertRaises(NoSuchFile,
137
 
                          transport.put, 'dir/path', StringIO('content'))
 
297
                          transport.put_file, 'dir/path', StringIO('content'))
138
298
 
139
299
    def test_get_missing(self):
140
300
        transport = MemoryTransport()
146
306
 
147
307
    def test_has_present(self):
148
308
        transport = MemoryTransport()
149
 
        transport.append('foo', StringIO('content'))
 
309
        transport.append_bytes('foo', 'content')
150
310
        self.assertEquals(True, transport.has('foo'))
151
311
 
 
312
    def test_list_dir(self):
 
313
        transport = MemoryTransport()
 
314
        transport.put_bytes('foo', 'content')
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/subfoo', 'content')
 
317
        transport.put_bytes('dirlike', 'content')
 
318
 
 
319
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
320
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
321
 
152
322
    def test_mkdir(self):
153
323
        transport = MemoryTransport()
154
324
        transport.mkdir('dir')
155
 
        transport.append('dir/path', StringIO('content'))
 
325
        transport.append_bytes('dir/path', 'content')
156
326
        self.assertEqual(transport.get('dir/path').read(), 'content')
157
327
 
158
328
    def test_mkdir_missing_parent(self):
168
338
    def test_parameters(self):
169
339
        transport = MemoryTransport()
170
340
        self.assertEqual(True, transport.listable())
171
 
        self.assertEqual(False, transport.should_cache())
172
341
        self.assertEqual(False, transport.is_readonly())
173
342
 
174
343
    def test_iter_files_recursive(self):
175
344
        transport = MemoryTransport()
176
345
        transport.mkdir('dir')
177
 
        transport.put('dir/foo', StringIO('content'))
178
 
        transport.put('dir/bar', StringIO('content'))
179
 
        transport.put('bar', StringIO('content'))
 
346
        transport.put_bytes('dir/foo', 'content')
 
347
        transport.put_bytes('dir/bar', 'content')
 
348
        transport.put_bytes('bar', 'content')
180
349
        paths = set(transport.iter_files_recursive())
181
350
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
182
351
 
183
352
    def test_stat(self):
184
353
        transport = MemoryTransport()
185
 
        transport.put('foo', StringIO('content'))
186
 
        transport.put('bar', StringIO('phowar'))
 
354
        transport.put_bytes('foo', 'content')
 
355
        transport.put_bytes('bar', 'phowar')
187
356
        self.assertEqual(7, transport.stat('foo').st_size)
188
357
        self.assertEqual(6, transport.stat('bar').st_size)
189
358
 
190
 
        
 
359
 
 
360
class ChrootDecoratorTransportTest(TestCase):
 
361
    """Chroot decoration specific tests."""
 
362
 
 
363
    def test_abspath(self):
 
364
        # The abspath is always relative to the chroot_url.
 
365
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
366
        server.setUp()
 
367
        transport = get_transport(server.get_url())
 
368
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
369
 
 
370
        subdir_transport = transport.clone('subdir')
 
371
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
372
        server.tearDown()
 
373
 
 
374
    def test_clone(self):
 
375
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
376
        server.setUp()
 
377
        transport = get_transport(server.get_url())
 
378
        # relpath from root and root path are the same
 
379
        relpath_cloned = transport.clone('foo')
 
380
        abspath_cloned = transport.clone('/foo')
 
381
        self.assertEqual(server, relpath_cloned.server)
 
382
        self.assertEqual(server, abspath_cloned.server)
 
383
        server.tearDown()
 
384
 
 
385
    def test_chroot_url_preserves_chroot(self):
 
386
        """Calling get_transport on a chroot transport's base should produce a
 
387
        transport with exactly the same behaviour as the original chroot
 
388
        transport.
 
389
 
 
390
        This is so that it is not possible to escape a chroot by doing::
 
391
            url = chroot_transport.base
 
392
            parent_url = urlutils.join(url, '..')
 
393
            new_transport = get_transport(parent_url)
 
394
        """
 
395
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
396
        server.setUp()
 
397
        transport = get_transport(server.get_url())
 
398
        new_transport = get_transport(transport.base)
 
399
        self.assertEqual(transport.server, new_transport.server)
 
400
        self.assertEqual(transport.base, new_transport.base)
 
401
        server.tearDown()
 
402
 
 
403
    def test_urljoin_preserves_chroot(self):
 
404
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
405
        URL that escapes the intended chroot.
 
406
 
 
407
        This is so that it is not possible to escape a chroot by doing::
 
408
            url = chroot_transport.base
 
409
            parent_url = urlutils.join(url, '..')
 
410
            new_transport = get_transport(parent_url)
 
411
        """
 
412
        server = ChrootServer(get_transport('memory:///path/'))
 
413
        server.setUp()
 
414
        transport = get_transport(server.get_url())
 
415
        self.assertRaises(
 
416
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
417
        server.tearDown()
 
418
 
 
419
 
 
420
class ChrootServerTest(TestCase):
 
421
 
 
422
    def test_construct(self):
 
423
        backing_transport = MemoryTransport()
 
424
        server = ChrootServer(backing_transport)
 
425
        self.assertEqual(backing_transport, server.backing_transport)
 
426
 
 
427
    def test_setUp(self):
 
428
        backing_transport = MemoryTransport()
 
429
        server = ChrootServer(backing_transport)
 
430
        server.setUp()
 
431
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
432
 
 
433
    def test_tearDown(self):
 
434
        backing_transport = MemoryTransport()
 
435
        server = ChrootServer(backing_transport)
 
436
        server.setUp()
 
437
        server.tearDown()
 
438
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
439
 
 
440
    def test_get_url(self):
 
441
        backing_transport = MemoryTransport()
 
442
        server = ChrootServer(backing_transport)
 
443
        server.setUp()
 
444
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
445
        server.tearDown()
 
446
 
 
447
 
191
448
class ReadonlyDecoratorTransportTest(TestCase):
192
449
    """Readonly decoration specific tests."""
193
450
 
196
453
        # connect to . in readonly mode
197
454
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
198
455
        self.assertEqual(True, transport.listable())
199
 
        self.assertEqual(False, transport.should_cache())
200
456
        self.assertEqual(True, transport.is_readonly())
201
457
 
202
458
    def test_http_parameters(self):
 
459
        from bzrlib.tests.http_server import HttpServer
203
460
        import bzrlib.transport.readonly as readonly
204
 
        from bzrlib.transport.http import HttpServer
205
 
        # connect to . via http which is not listable
 
461
        # connect to '.' via http which is not listable
206
462
        server = HttpServer()
207
463
        server.setUp()
208
464
        try:
210
466
            self.failUnless(isinstance(transport,
211
467
                                       readonly.ReadonlyTransportDecorator))
212
468
            self.assertEqual(False, transport.listable())
213
 
            self.assertEqual(True, transport.should_cache())
214
469
            self.assertEqual(True, transport.is_readonly())
215
470
        finally:
216
471
            server.tearDown()
225
480
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
226
481
 
227
482
    def test_local_parameters(self):
228
 
        # the listable, should_cache and is_readonly parameters
 
483
        # the listable and is_readonly parameters
229
484
        # are not changed by the fakenfs decorator
230
485
        transport = self.get_nfs_transport('.')
231
486
        self.assertEqual(True, transport.listable())
232
 
        self.assertEqual(False, transport.should_cache())
233
487
        self.assertEqual(False, transport.is_readonly())
234
488
 
235
489
    def test_http_parameters(self):
236
 
        # the listable, should_cache and is_readonly parameters
 
490
        # the listable and is_readonly parameters
237
491
        # are not changed by the fakenfs decorator
238
 
        from bzrlib.transport.http import HttpServer
239
 
        # connect to . via http which is not listable
 
492
        from bzrlib.tests.http_server import HttpServer
 
493
        # connect to '.' via http which is not listable
240
494
        server = HttpServer()
241
495
        server.setUp()
242
496
        try:
244
498
            self.assertIsInstance(
245
499
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
246
500
            self.assertEqual(False, transport.listable())
247
 
            self.assertEqual(True, transport.should_cache())
248
501
            self.assertEqual(True, transport.is_readonly())
249
502
        finally:
250
503
            server.tearDown()
255
508
        server = fakenfs.FakeNFSServer()
256
509
        server.setUp()
257
510
        try:
258
 
            # the server should be a relpath localhost server
259
 
            self.assertEqual(server.get_url(), 'fakenfs+.')
 
511
            # the url should be decorated appropriately
 
512
            self.assertStartsWith(server.get_url(), 'fakenfs+')
260
513
            # and we should be able to get a transport for it
261
514
            transport = get_transport(server.get_url())
262
515
            # which must be a FakeNFSTransportDecorator instance.
271
524
        transport = self.get_nfs_transport('.')
272
525
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
273
526
                        transport=transport)
274
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
527
        self.assertRaises(errors.ResourceBusy,
275
528
                          transport.rename, 'from', 'to')
276
529
 
277
530
 
307
560
class BackupTransportHandler(Transport):
308
561
    """Test transport that works as a backup for the BadTransportHandler"""
309
562
    pass
 
563
 
 
564
 
 
565
class TestTransportImplementation(TestCaseInTempDir):
 
566
    """Implementation verification for transports.
 
567
 
 
568
    To verify a transport we need a server factory, which is a callable
 
569
    that accepts no parameters and returns an implementation of
 
570
    bzrlib.transport.Server.
 
571
 
 
572
    That Server is then used to construct transport instances and test
 
573
    the transport via loopback activity.
 
574
 
 
575
    Currently this assumes that the Transport object is connected to the
 
576
    current working directory.  So that whatever is done
 
577
    through the transport, should show up in the working
 
578
    directory, and vice-versa. This is a bug, because its possible to have
 
579
    URL schemes which provide access to something that may not be
 
580
    result in storage on the local disk, i.e. due to file system limits, or
 
581
    due to it being a database or some other non-filesystem tool.
 
582
 
 
583
    This also tests to make sure that the functions work with both
 
584
    generators and lists (assuming iter(list) is effectively a generator)
 
585
    """
 
586
 
 
587
    def setUp(self):
 
588
        super(TestTransportImplementation, self).setUp()
 
589
        self._server = self.transport_server()
 
590
        self._server.setUp()
 
591
        self.addCleanup(self._server.tearDown)
 
592
 
 
593
    def get_transport(self, relpath=None):
 
594
        """Return a connected transport to the local directory.
 
595
 
 
596
        :param relpath: a path relative to the base url.
 
597
        """
 
598
        base_url = self._server.get_url()
 
599
        url = self._adjust_url(base_url, relpath)
 
600
        # try getting the transport via the regular interface:
 
601
        t = get_transport(url)
 
602
        # vila--20070607 if the following are commented out the test suite
 
603
        # still pass. Is this really still needed or was it a forgotten
 
604
        # temporary fix ?
 
605
        if not isinstance(t, self.transport_class):
 
606
            # we did not get the correct transport class type. Override the
 
607
            # regular connection behaviour by direct construction.
 
608
            t = self.transport_class(url)
 
609
        return t
 
610
 
 
611
 
 
612
class TestLocalTransports(TestCase):
 
613
 
 
614
    def test_get_transport_from_abspath(self):
 
615
        here = osutils.abspath('.')
 
616
        t = get_transport(here)
 
617
        self.assertIsInstance(t, LocalTransport)
 
618
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
619
 
 
620
    def test_get_transport_from_relpath(self):
 
621
        here = osutils.abspath('.')
 
622
        t = get_transport('.')
 
623
        self.assertIsInstance(t, LocalTransport)
 
624
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
625
 
 
626
    def test_get_transport_from_local_url(self):
 
627
        here = osutils.abspath('.')
 
628
        here_url = urlutils.local_path_to_url(here) + '/'
 
629
        t = get_transport(here_url)
 
630
        self.assertIsInstance(t, LocalTransport)
 
631
        self.assertEquals(t.base, here_url)
 
632
 
 
633
    def test_local_abspath(self):
 
634
        here = osutils.abspath('.')
 
635
        t = get_transport(here)
 
636
        self.assertEquals(t.local_abspath(''), here)
 
637
 
 
638
 
 
639
class TestWin32LocalTransport(TestCase):
 
640
 
 
641
    def test_unc_clone_to_root(self):
 
642
        # Win32 UNC path like \\HOST\path
 
643
        # clone to root should stop at least at \\HOST part
 
644
        # not on \\
 
645
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
646
        for i in xrange(4):
 
647
            t = t.clone('..')
 
648
        self.assertEquals(t.base, 'file://HOST/')
 
649
        # make sure we reach the root
 
650
        t = t.clone('..')
 
651
        self.assertEquals(t.base, 'file://HOST/')
 
652
 
 
653
 
 
654
class TestConnectedTransport(TestCase):
 
655
    """Tests for connected to remote server transports"""
 
656
 
 
657
    def test_parse_url(self):
 
658
        t = ConnectedTransport('http://simple.example.com/home/source')
 
659
        self.assertEquals(t._host, 'simple.example.com')
 
660
        self.assertEquals(t._port, None)
 
661
        self.assertEquals(t._path, '/home/source/')
 
662
        self.failUnless(t._user is None)
 
663
        self.failUnless(t._password is None)
 
664
 
 
665
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
666
 
 
667
    def test_parse_url_with_at_in_user(self):
 
668
        # Bug 228058
 
669
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
670
        self.assertEquals(t._user, 'user@host.com')
 
671
 
 
672
    def test_parse_quoted_url(self):
 
673
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
674
        self.assertEquals(t._host, 'exAmple.com')
 
675
        self.assertEquals(t._port, 2222)
 
676
        self.assertEquals(t._user, 'robey')
 
677
        self.assertEquals(t._password, 'h@t')
 
678
        self.assertEquals(t._path, '/path/')
 
679
 
 
680
        # Base should not keep track of the password
 
681
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
682
 
 
683
    def test_parse_invalid_url(self):
 
684
        self.assertRaises(errors.InvalidURL,
 
685
                          ConnectedTransport,
 
686
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
687
 
 
688
    def test_relpath(self):
 
689
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
690
 
 
691
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
692
        self.assertRaises(errors.PathNotChild, t.relpath,
 
693
                          'http://user@host.com/abs/path/sub')
 
694
        self.assertRaises(errors.PathNotChild, t.relpath,
 
695
                          'sftp://user2@host.com/abs/path/sub')
 
696
        self.assertRaises(errors.PathNotChild, t.relpath,
 
697
                          'sftp://user@otherhost.com/abs/path/sub')
 
698
        self.assertRaises(errors.PathNotChild, t.relpath,
 
699
                          'sftp://user@host.com:33/abs/path/sub')
 
700
        # Make sure it works when we don't supply a username
 
701
        t = ConnectedTransport('sftp://host.com/abs/path')
 
702
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
703
 
 
704
        # Make sure it works when parts of the path will be url encoded
 
705
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
706
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
707
 
 
708
    def test_connection_sharing_propagate_credentials(self):
 
709
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
710
        self.assertEquals('user', t._user)
 
711
        self.assertEquals('host.com', t._host)
 
712
        self.assertIs(None, t._get_connection())
 
713
        self.assertIs(None, t._password)
 
714
        c = t.clone('subdir')
 
715
        self.assertIs(None, c._get_connection())
 
716
        self.assertIs(None, t._password)
 
717
 
 
718
        # Simulate the user entering a password
 
719
        password = 'secret'
 
720
        connection = object()
 
721
        t._set_connection(connection, password)
 
722
        self.assertIs(connection, t._get_connection())
 
723
        self.assertIs(password, t._get_credentials())
 
724
        self.assertIs(connection, c._get_connection())
 
725
        self.assertIs(password, c._get_credentials())
 
726
 
 
727
        # credentials can be updated
 
728
        new_password = 'even more secret'
 
729
        c._update_credentials(new_password)
 
730
        self.assertIs(connection, t._get_connection())
 
731
        self.assertIs(new_password, t._get_credentials())
 
732
        self.assertIs(connection, c._get_connection())
 
733
        self.assertIs(new_password, c._get_credentials())
 
734
 
 
735
 
 
736
class TestReusedTransports(TestCase):
 
737
    """Tests for transport reuse"""
 
738
 
 
739
    def test_reuse_same_transport(self):
 
740
        possible_transports = []
 
741
        t1 = get_transport('http://foo/',
 
742
                           possible_transports=possible_transports)
 
743
        self.assertEqual([t1], possible_transports)
 
744
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
745
        self.assertIs(t1, t2)
 
746
 
 
747
        # Also check that final '/' are handled correctly
 
748
        t3 = get_transport('http://foo/path/')
 
749
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
750
        self.assertIs(t3, t4)
 
751
 
 
752
        t5 = get_transport('http://foo/path')
 
753
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
754
        self.assertIs(t5, t6)
 
755
 
 
756
    def test_don_t_reuse_different_transport(self):
 
757
        t1 = get_transport('http://foo/path')
 
758
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
759
        self.assertIsNot(t1, t2)
 
760
 
 
761
 
 
762
class TestTransportTrace(TestCase):
 
763
 
 
764
    def test_get(self):
 
765
        transport = get_transport('trace+memory://')
 
766
        self.assertIsInstance(
 
767
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
768
 
 
769
    def test_clone_preserves_activity(self):
 
770
        transport = get_transport('trace+memory://')
 
771
        transport2 = transport.clone('.')
 
772
        self.assertTrue(transport is not transport2)
 
773
        self.assertTrue(transport._activity is transport2._activity)
 
774
 
 
775
    # the following specific tests are for the operations that have made use of
 
776
    # logging in tests; we could test every single operation but doing that
 
777
    # still won't cause a test failure when the top level Transport API
 
778
    # changes; so there is little return doing that.
 
779
    def test_get(self):
 
780
        transport = get_transport('trace+memory:///')
 
781
        transport.put_bytes('foo', 'barish')
 
782
        transport.get('foo')
 
783
        expected_result = []
 
784
        # put_bytes records the bytes, not the content to avoid memory
 
785
        # pressure.
 
786
        expected_result.append(('put_bytes', 'foo', 6, None))
 
787
        # get records the file name only.
 
788
        expected_result.append(('get', 'foo'))
 
789
        self.assertEqual(expected_result, transport._activity)
 
790
 
 
791
    def test_readv(self):
 
792
        transport = get_transport('trace+memory:///')
 
793
        transport.put_bytes('foo', 'barish')
 
794
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
795
            upper_limit=6))
 
796
        expected_result = []
 
797
        # put_bytes records the bytes, not the content to avoid memory
 
798
        # pressure.
 
799
        expected_result.append(('put_bytes', 'foo', 6, None))
 
800
        # readv records the supplied offset request
 
801
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
802
        self.assertEqual(expected_result, transport._activity)