~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Aaron Bentley
  • Date: 2006-06-21 14:30:57 UTC
  • mfrom: (1801.1.1 bzr.dev)
  • mto: This revision was merged to the branch mainline in revision 1803.
  • Revision ID: abentley@panoramicfeedback.com-20060621143057-776e4b8d707e430e
Install benchmarks. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
18
21
from cStringIO import StringIO
19
22
 
20
23
import bzrlib
21
 
from bzrlib import (
22
 
    errors,
23
 
    osutils,
24
 
    urlutils,
25
 
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
 
24
from bzrlib.errors import (NoSuchFile, FileExists,
 
25
                           TransportNotPossible,
 
26
                           ConnectionError,
 
27
                           DependencyNotPresent,
 
28
                           InvalidURL,
33
29
                           )
34
30
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
 
31
from bzrlib.transport import (_get_protocol_handlers,
40
32
                              _get_transport_modules,
41
33
                              get_transport,
42
 
                              LateReadError,
43
34
                              register_lazy_transport,
44
 
                              register_transport_proto,
 
35
                              _set_protocol_handlers,
45
36
                              Transport,
46
37
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
38
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
51
 
 
52
 
 
53
 
# TODO: Should possibly split transport-specific tests into their own files.
 
39
from bzrlib.transport.local import LocalTransport
54
40
 
55
41
 
56
42
class TestTransport(TestCase):
58
44
 
59
45
    def test__get_set_protocol_handlers(self):
60
46
        handlers = _get_protocol_handlers()
61
 
        self.assertNotEqual([], handlers.keys( ))
 
47
        self.assertNotEqual({}, handlers)
62
48
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
49
            _set_protocol_handlers({})
 
50
            self.assertEqual({}, _get_protocol_handlers())
65
51
        finally:
66
52
            _set_protocol_handlers(handlers)
67
53
 
68
54
    def test_get_transport_modules(self):
69
55
        handlers = _get_protocol_handlers()
70
 
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
72
56
        class SampleHandler(object):
73
57
            """I exist, isnt that enough?"""
74
58
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
82
 
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
 
59
            my_handlers = {}
 
60
            _set_protocol_handlers(my_handlers)
 
61
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
62
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
63
            self.assertEqual([SampleHandler.__module__],
84
64
                             _get_transport_modules())
85
65
        finally:
86
66
            _set_protocol_handlers(handlers)
88
68
    def test_transport_dependency(self):
89
69
        """Transport with missing dependency causes no error"""
90
70
        saved_handlers = _get_protocol_handlers()
91
 
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
93
71
        try:
94
 
            register_transport_proto('foo')
95
72
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
73
                    'BadTransportHandler')
97
 
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
100
 
                e_str = str(e)
101
 
                self.assertEquals('Unsupported protocol'
102
 
                                  ' for url "foo://fooserver/foo":'
103
 
                                  ' Unable to import library "some_lib":'
104
 
                                  ' testing missing dependency', str(e))
105
 
            else:
106
 
                self.fail('Did not raise UnsupportedProtocol')
 
74
            # TODO: jam 20060427 Now we get InvalidURL because it looks like 
 
75
            #       a URL but we have no support for it.
 
76
            #       Is it better to always fall back to LocalTransport?
 
77
            #       I think this is a better error than a future NoSuchFile
 
78
            self.assertRaises(InvalidURL, get_transport, 'foo://fooserver/foo')
107
79
        finally:
108
80
            # restore original values
109
81
            _set_protocol_handlers(saved_handlers)
110
 
 
 
82
            
111
83
    def test_transport_fallback(self):
112
84
        """Transport with missing dependency causes no error"""
113
85
        saved_handlers = _get_protocol_handlers()
114
86
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
 
87
            _set_protocol_handlers({})
117
88
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
89
                    'BackupTransportHandler')
119
90
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
122
93
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
94
        finally:
124
95
            _set_protocol_handlers(saved_handlers)
125
 
 
126
 
    def test_ssh_hints(self):
127
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
128
 
        try:
129
 
            get_transport('ssh://fooserver/foo')
130
 
        except UnsupportedProtocol, e:
131
 
            e_str = str(e)
132
 
            self.assertEquals('Unsupported protocol'
133
 
                              ' for url "ssh://fooserver/foo":'
134
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
135
 
                              str(e))
136
 
        else:
137
 
            self.fail('Did not raise UnsupportedProtocol')
138
 
 
139
 
    def test_LateReadError(self):
140
 
        """The LateReadError helper should raise on read()."""
141
 
        a_file = LateReadError('a path')
142
 
        try:
143
 
            a_file.read()
144
 
        except ReadError, error:
145
 
            self.assertEqual('a path', error.path)
146
 
        self.assertRaises(ReadError, a_file.read, 40)
147
 
        a_file.close()
148
 
 
149
 
    def test__combine_paths(self):
150
 
        t = Transport('/')
151
 
        self.assertEqual('/home/sarah/project/foo',
152
 
                         t._combine_paths('/home/sarah', 'project/foo'))
153
 
        self.assertEqual('/etc',
154
 
                         t._combine_paths('/home/sarah', '../../etc'))
155
 
        self.assertEqual('/etc',
156
 
                         t._combine_paths('/home/sarah', '../../../etc'))
157
 
        self.assertEqual('/etc',
158
 
                         t._combine_paths('/home/sarah', '/etc'))
159
 
 
160
 
    def test_local_abspath_non_local_transport(self):
161
 
        # the base implementation should throw
162
 
        t = MemoryTransport()
163
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
164
 
        self.assertEqual('memory:///t is not a local path.', str(e))
165
 
 
166
 
 
167
 
class TestCoalesceOffsets(TestCase):
168
 
 
169
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
170
 
        coalesce = Transport._coalesce_offsets
171
 
        exp = [_CoalescedOffset(*x) for x in expected]
172
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
173
 
                            max_size=max_size))
174
 
        self.assertEqual(exp, out)
175
 
 
176
 
    def test_coalesce_empty(self):
177
 
        self.check([], [])
178
 
 
179
 
    def test_coalesce_simple(self):
180
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
181
 
 
182
 
    def test_coalesce_unrelated(self):
183
 
        self.check([(0, 10, [(0, 10)]),
184
 
                    (20, 10, [(0, 10)]),
185
 
                   ], [(0, 10), (20, 10)])
186
 
 
187
 
    def test_coalesce_unsorted(self):
188
 
        self.check([(20, 10, [(0, 10)]),
189
 
                    (0, 10, [(0, 10)]),
190
 
                   ], [(20, 10), (0, 10)])
191
 
 
192
 
    def test_coalesce_nearby(self):
193
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
194
 
                   [(0, 10), (10, 10)])
195
 
 
196
 
    def test_coalesce_overlapped(self):
197
 
        self.assertRaises(ValueError,
198
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
199
 
                        [(0, 10), (5, 10)])
200
 
 
201
 
    def test_coalesce_limit(self):
202
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
203
 
                              (30, 10), (40, 10)]),
204
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
205
 
                              (30, 10), (40, 10)]),
206
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
207
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
208
 
                       (90, 10), (100, 10)],
209
 
                    limit=5)
210
 
 
211
 
    def test_coalesce_no_limit(self):
212
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
213
 
                               (30, 10), (40, 10), (50, 10),
214
 
                               (60, 10), (70, 10), (80, 10),
215
 
                               (90, 10)]),
216
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
217
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
218
 
                       (90, 10), (100, 10)])
219
 
 
220
 
    def test_coalesce_fudge(self):
221
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
222
 
                    (100, 10, [(0, 10),]),
223
 
                   ], [(10, 10), (30, 10), (100, 10)],
224
 
                   fudge=10
225
 
                  )
226
 
    def test_coalesce_max_size(self):
227
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
228
 
                    (30, 50, [(0, 50)]),
229
 
                    # If one range is above max_size, it gets its own coalesced
230
 
                    # offset
231
 
                    (100, 80, [(0, 80),]),],
232
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
233
 
                   max_size=50
234
 
                  )
235
 
 
236
 
    def test_coalesce_no_max_size(self):
237
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
238
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
239
 
                  )
240
 
 
241
 
    def test_coalesce_default_limit(self):
242
 
        # By default we use a 100MB max size.
243
 
        ten_mb = 10*1024*1024
244
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
245
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
246
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
247
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
248
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
249
 
                   max_size=1*1024*1024*1024)
250
 
 
 
96
            
251
97
 
252
98
class TestMemoryTransport(TestCase):
253
99
 
257
103
    def test_clone(self):
258
104
        transport = MemoryTransport()
259
105
        self.assertTrue(isinstance(transport, MemoryTransport))
260
 
        self.assertEqual("memory:///", transport.clone("/").base)
261
106
 
262
107
    def test_abspath(self):
263
108
        transport = MemoryTransport()
264
109
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
265
110
 
266
 
    def test_abspath_of_root(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertEqual("memory:///", transport.base)
269
 
        self.assertEqual("memory:///", transport.abspath('/'))
270
 
 
271
 
    def test_abspath_of_relpath_starting_at_root(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
111
    def test_relpath(self):
 
112
        transport = MemoryTransport()
274
113
 
275
114
    def test_append_and_get(self):
276
115
        transport = MemoryTransport()
277
 
        transport.append_bytes('path', 'content')
 
116
        transport.append('path', StringIO('content'))
278
117
        self.assertEqual(transport.get('path').read(), 'content')
279
 
        transport.append_file('path', StringIO('content'))
 
118
        transport.append('path', StringIO('content'))
280
119
        self.assertEqual(transport.get('path').read(), 'contentcontent')
281
120
 
282
121
    def test_put_and_get(self):
283
122
        transport = MemoryTransport()
284
 
        transport.put_file('path', StringIO('content'))
 
123
        transport.put('path', StringIO('content'))
285
124
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.put_bytes('path', 'content')
 
125
        transport.put('path', StringIO('content'))
287
126
        self.assertEqual(transport.get('path').read(), 'content')
288
127
 
289
128
    def test_append_without_dir_fails(self):
290
129
        transport = MemoryTransport()
291
130
        self.assertRaises(NoSuchFile,
292
 
                          transport.append_bytes, 'dir/path', 'content')
 
131
                          transport.append, 'dir/path', StringIO('content'))
293
132
 
294
133
    def test_put_without_dir_fails(self):
295
134
        transport = MemoryTransport()
296
135
        self.assertRaises(NoSuchFile,
297
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
136
                          transport.put, 'dir/path', StringIO('content'))
298
137
 
299
138
    def test_get_missing(self):
300
139
        transport = MemoryTransport()
306
145
 
307
146
    def test_has_present(self):
308
147
        transport = MemoryTransport()
309
 
        transport.append_bytes('foo', 'content')
 
148
        transport.append('foo', StringIO('content'))
310
149
        self.assertEquals(True, transport.has('foo'))
311
150
 
312
 
    def test_list_dir(self):
313
 
        transport = MemoryTransport()
314
 
        transport.put_bytes('foo', 'content')
315
 
        transport.mkdir('dir')
316
 
        transport.put_bytes('dir/subfoo', 'content')
317
 
        transport.put_bytes('dirlike', 'content')
318
 
 
319
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
320
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
321
 
 
322
151
    def test_mkdir(self):
323
152
        transport = MemoryTransport()
324
153
        transport.mkdir('dir')
325
 
        transport.append_bytes('dir/path', 'content')
 
154
        transport.append('dir/path', StringIO('content'))
326
155
        self.assertEqual(transport.get('dir/path').read(), 'content')
327
156
 
328
157
    def test_mkdir_missing_parent(self):
338
167
    def test_parameters(self):
339
168
        transport = MemoryTransport()
340
169
        self.assertEqual(True, transport.listable())
 
170
        self.assertEqual(False, transport.should_cache())
341
171
        self.assertEqual(False, transport.is_readonly())
342
172
 
343
173
    def test_iter_files_recursive(self):
344
174
        transport = MemoryTransport()
345
175
        transport.mkdir('dir')
346
 
        transport.put_bytes('dir/foo', 'content')
347
 
        transport.put_bytes('dir/bar', 'content')
348
 
        transport.put_bytes('bar', 'content')
 
176
        transport.put('dir/foo', StringIO('content'))
 
177
        transport.put('dir/bar', StringIO('content'))
 
178
        transport.put('bar', StringIO('content'))
349
179
        paths = set(transport.iter_files_recursive())
350
180
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
351
181
 
352
182
    def test_stat(self):
353
183
        transport = MemoryTransport()
354
 
        transport.put_bytes('foo', 'content')
355
 
        transport.put_bytes('bar', 'phowar')
 
184
        transport.put('foo', StringIO('content'))
 
185
        transport.put('bar', StringIO('phowar'))
356
186
        self.assertEqual(7, transport.stat('foo').st_size)
357
187
        self.assertEqual(6, transport.stat('bar').st_size)
358
188
 
359
 
 
360
 
class ChrootDecoratorTransportTest(TestCase):
361
 
    """Chroot decoration specific tests."""
362
 
 
363
 
    def test_abspath(self):
364
 
        # The abspath is always relative to the chroot_url.
365
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
366
 
        self.start_server(server)
367
 
        transport = get_transport(server.get_url())
368
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
369
 
 
370
 
        subdir_transport = transport.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
372
 
 
373
 
    def test_clone(self):
374
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
375
 
        self.start_server(server)
376
 
        transport = get_transport(server.get_url())
377
 
        # relpath from root and root path are the same
378
 
        relpath_cloned = transport.clone('foo')
379
 
        abspath_cloned = transport.clone('/foo')
380
 
        self.assertEqual(server, relpath_cloned.server)
381
 
        self.assertEqual(server, abspath_cloned.server)
382
 
 
383
 
    def test_chroot_url_preserves_chroot(self):
384
 
        """Calling get_transport on a chroot transport's base should produce a
385
 
        transport with exactly the same behaviour as the original chroot
386
 
        transport.
387
 
 
388
 
        This is so that it is not possible to escape a chroot by doing::
389
 
            url = chroot_transport.base
390
 
            parent_url = urlutils.join(url, '..')
391
 
            new_transport = get_transport(parent_url)
392
 
        """
393
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
394
 
        self.start_server(server)
395
 
        transport = get_transport(server.get_url())
396
 
        new_transport = get_transport(transport.base)
397
 
        self.assertEqual(transport.server, new_transport.server)
398
 
        self.assertEqual(transport.base, new_transport.base)
399
 
 
400
 
    def test_urljoin_preserves_chroot(self):
401
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
402
 
        URL that escapes the intended chroot.
403
 
 
404
 
        This is so that it is not possible to escape a chroot by doing::
405
 
            url = chroot_transport.base
406
 
            parent_url = urlutils.join(url, '..')
407
 
            new_transport = get_transport(parent_url)
408
 
        """
409
 
        server = ChrootServer(get_transport('memory:///path/'))
410
 
        self.start_server(server)
411
 
        transport = get_transport(server.get_url())
412
 
        self.assertRaises(
413
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
414
 
 
415
 
 
416
 
class ChrootServerTest(TestCase):
417
 
 
418
 
    def test_construct(self):
419
 
        backing_transport = MemoryTransport()
420
 
        server = ChrootServer(backing_transport)
421
 
        self.assertEqual(backing_transport, server.backing_transport)
422
 
 
423
 
    def test_setUp(self):
424
 
        backing_transport = MemoryTransport()
425
 
        server = ChrootServer(backing_transport)
426
 
        server.setUp()
427
 
        try:
428
 
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
429
 
        finally:
430
 
            server.tearDown()
431
 
 
432
 
    def test_tearDown(self):
433
 
        backing_transport = MemoryTransport()
434
 
        server = ChrootServer(backing_transport)
435
 
        server.setUp()
436
 
        server.tearDown()
437
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
438
 
 
439
 
    def test_get_url(self):
440
 
        backing_transport = MemoryTransport()
441
 
        server = ChrootServer(backing_transport)
442
 
        server.setUp()
443
 
        try:
444
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
445
 
        finally:
446
 
            server.tearDown()
447
 
 
448
 
 
 
189
        
449
190
class ReadonlyDecoratorTransportTest(TestCase):
450
191
    """Readonly decoration specific tests."""
451
192
 
454
195
        # connect to . in readonly mode
455
196
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
456
197
        self.assertEqual(True, transport.listable())
 
198
        self.assertEqual(False, transport.should_cache())
457
199
        self.assertEqual(True, transport.is_readonly())
458
200
 
459
201
    def test_http_parameters(self):
460
 
        from bzrlib.tests.http_server import HttpServer
461
202
        import bzrlib.transport.readonly as readonly
462
 
        # connect to '.' via http which is not listable
 
203
        from bzrlib.transport.http import HttpServer
 
204
        # connect to . via http which is not listable
463
205
        server = HttpServer()
464
 
        self.start_server(server)
465
 
        transport = get_transport('readonly+' + server.get_url())
466
 
        self.failUnless(isinstance(transport,
467
 
                                   readonly.ReadonlyTransportDecorator))
468
 
        self.assertEqual(False, transport.listable())
469
 
        self.assertEqual(True, transport.is_readonly())
 
206
        server.setUp()
 
207
        try:
 
208
            transport = get_transport('readonly+' + server.get_url())
 
209
            self.failUnless(isinstance(transport,
 
210
                                       readonly.ReadonlyTransportDecorator))
 
211
            self.assertEqual(False, transport.listable())
 
212
            self.assertEqual(True, transport.should_cache())
 
213
            self.assertEqual(True, transport.is_readonly())
 
214
        finally:
 
215
            server.tearDown()
470
216
 
471
217
 
472
218
class FakeNFSDecoratorTests(TestCaseInTempDir):
478
224
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
479
225
 
480
226
    def test_local_parameters(self):
481
 
        # the listable and is_readonly parameters
 
227
        # the listable, should_cache and is_readonly parameters
482
228
        # are not changed by the fakenfs decorator
483
229
        transport = self.get_nfs_transport('.')
484
230
        self.assertEqual(True, transport.listable())
 
231
        self.assertEqual(False, transport.should_cache())
485
232
        self.assertEqual(False, transport.is_readonly())
486
233
 
487
234
    def test_http_parameters(self):
488
 
        # the listable and is_readonly parameters
 
235
        # the listable, should_cache and is_readonly parameters
489
236
        # are not changed by the fakenfs decorator
490
 
        from bzrlib.tests.http_server import HttpServer
491
 
        # connect to '.' via http which is not listable
 
237
        from bzrlib.transport.http import HttpServer
 
238
        # connect to . via http which is not listable
492
239
        server = HttpServer()
493
 
        self.start_server(server)
494
 
        transport = self.get_nfs_transport(server.get_url())
495
 
        self.assertIsInstance(
496
 
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
497
 
        self.assertEqual(False, transport.listable())
498
 
        self.assertEqual(True, transport.is_readonly())
 
240
        server.setUp()
 
241
        try:
 
242
            transport = self.get_nfs_transport(server.get_url())
 
243
            self.assertIsInstance(
 
244
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
245
            self.assertEqual(False, transport.listable())
 
246
            self.assertEqual(True, transport.should_cache())
 
247
            self.assertEqual(True, transport.is_readonly())
 
248
        finally:
 
249
            server.tearDown()
499
250
 
500
251
    def test_fakenfs_server_default(self):
501
252
        # a FakeNFSServer() should bring up a local relpath server for itself
502
253
        import bzrlib.transport.fakenfs as fakenfs
503
254
        server = fakenfs.FakeNFSServer()
504
 
        self.start_server(server)
505
 
        # the url should be decorated appropriately
506
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
507
 
        # and we should be able to get a transport for it
508
 
        transport = get_transport(server.get_url())
509
 
        # which must be a FakeNFSTransportDecorator instance.
510
 
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
255
        server.setUp()
 
256
        try:
 
257
            # the server should be a relpath localhost server
 
258
            self.assertEqual(server.get_url(), 'fakenfs+.')
 
259
            # and we should be able to get a transport for it
 
260
            transport = get_transport(server.get_url())
 
261
            # which must be a FakeNFSTransportDecorator instance.
 
262
            self.assertIsInstance(
 
263
                transport, fakenfs.FakeNFSTransportDecorator)
 
264
        finally:
 
265
            server.tearDown()
511
266
 
512
267
    def test_fakenfs_rename_semantics(self):
513
268
        # a FakeNFS transport must mangle the way rename errors occur to
515
270
        transport = self.get_nfs_transport('.')
516
271
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
517
272
                        transport=transport)
518
 
        self.assertRaises(errors.ResourceBusy,
 
273
        self.assertRaises(bzrlib.errors.ResourceBusy,
519
274
                          transport.rename, 'from', 'to')
520
275
 
521
276
 
551
306
class BackupTransportHandler(Transport):
552
307
    """Test transport that works as a backup for the BadTransportHandler"""
553
308
    pass
554
 
 
555
 
 
556
 
class TestTransportImplementation(TestCaseInTempDir):
557
 
    """Implementation verification for transports.
558
 
 
559
 
    To verify a transport we need a server factory, which is a callable
560
 
    that accepts no parameters and returns an implementation of
561
 
    bzrlib.transport.Server.
562
 
 
563
 
    That Server is then used to construct transport instances and test
564
 
    the transport via loopback activity.
565
 
 
566
 
    Currently this assumes that the Transport object is connected to the
567
 
    current working directory.  So that whatever is done
568
 
    through the transport, should show up in the working
569
 
    directory, and vice-versa. This is a bug, because its possible to have
570
 
    URL schemes which provide access to something that may not be
571
 
    result in storage on the local disk, i.e. due to file system limits, or
572
 
    due to it being a database or some other non-filesystem tool.
573
 
 
574
 
    This also tests to make sure that the functions work with both
575
 
    generators and lists (assuming iter(list) is effectively a generator)
576
 
    """
577
 
 
578
 
    def setUp(self):
579
 
        super(TestTransportImplementation, self).setUp()
580
 
        self._server = self.transport_server()
581
 
        self.start_server(self._server)
582
 
 
583
 
    def get_transport(self, relpath=None):
584
 
        """Return a connected transport to the local directory.
585
 
 
586
 
        :param relpath: a path relative to the base url.
587
 
        """
588
 
        base_url = self._server.get_url()
589
 
        url = self._adjust_url(base_url, relpath)
590
 
        # try getting the transport via the regular interface:
591
 
        t = get_transport(url)
592
 
        # vila--20070607 if the following are commented out the test suite
593
 
        # still pass. Is this really still needed or was it a forgotten
594
 
        # temporary fix ?
595
 
        if not isinstance(t, self.transport_class):
596
 
            # we did not get the correct transport class type. Override the
597
 
            # regular connection behaviour by direct construction.
598
 
            t = self.transport_class(url)
599
 
        return t
600
 
 
601
 
 
602
 
class TestLocalTransports(TestCase):
603
 
 
604
 
    def test_get_transport_from_abspath(self):
605
 
        here = osutils.abspath('.')
606
 
        t = get_transport(here)
607
 
        self.assertIsInstance(t, LocalTransport)
608
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
609
 
 
610
 
    def test_get_transport_from_relpath(self):
611
 
        here = osutils.abspath('.')
612
 
        t = get_transport('.')
613
 
        self.assertIsInstance(t, LocalTransport)
614
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
615
 
 
616
 
    def test_get_transport_from_local_url(self):
617
 
        here = osutils.abspath('.')
618
 
        here_url = urlutils.local_path_to_url(here) + '/'
619
 
        t = get_transport(here_url)
620
 
        self.assertIsInstance(t, LocalTransport)
621
 
        self.assertEquals(t.base, here_url)
622
 
 
623
 
    def test_local_abspath(self):
624
 
        here = osutils.abspath('.')
625
 
        t = get_transport(here)
626
 
        self.assertEquals(t.local_abspath(''), here)
627
 
 
628
 
 
629
 
class TestWin32LocalTransport(TestCase):
630
 
 
631
 
    def test_unc_clone_to_root(self):
632
 
        # Win32 UNC path like \\HOST\path
633
 
        # clone to root should stop at least at \\HOST part
634
 
        # not on \\
635
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
636
 
        for i in xrange(4):
637
 
            t = t.clone('..')
638
 
        self.assertEquals(t.base, 'file://HOST/')
639
 
        # make sure we reach the root
640
 
        t = t.clone('..')
641
 
        self.assertEquals(t.base, 'file://HOST/')
642
 
 
643
 
 
644
 
class TestConnectedTransport(TestCase):
645
 
    """Tests for connected to remote server transports"""
646
 
 
647
 
    def test_parse_url(self):
648
 
        t = ConnectedTransport('http://simple.example.com/home/source')
649
 
        self.assertEquals(t._host, 'simple.example.com')
650
 
        self.assertEquals(t._port, None)
651
 
        self.assertEquals(t._path, '/home/source/')
652
 
        self.failUnless(t._user is None)
653
 
        self.failUnless(t._password is None)
654
 
 
655
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
656
 
 
657
 
    def test_parse_url_with_at_in_user(self):
658
 
        # Bug 228058
659
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
660
 
        self.assertEquals(t._user, 'user@host.com')
661
 
 
662
 
    def test_parse_quoted_url(self):
663
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
664
 
        self.assertEquals(t._host, 'exAmple.com')
665
 
        self.assertEquals(t._port, 2222)
666
 
        self.assertEquals(t._user, 'robey')
667
 
        self.assertEquals(t._password, 'h@t')
668
 
        self.assertEquals(t._path, '/path/')
669
 
 
670
 
        # Base should not keep track of the password
671
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
672
 
 
673
 
    def test_parse_invalid_url(self):
674
 
        self.assertRaises(errors.InvalidURL,
675
 
                          ConnectedTransport,
676
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
677
 
 
678
 
    def test_relpath(self):
679
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
680
 
 
681
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
682
 
        self.assertRaises(errors.PathNotChild, t.relpath,
683
 
                          'http://user@host.com/abs/path/sub')
684
 
        self.assertRaises(errors.PathNotChild, t.relpath,
685
 
                          'sftp://user2@host.com/abs/path/sub')
686
 
        self.assertRaises(errors.PathNotChild, t.relpath,
687
 
                          'sftp://user@otherhost.com/abs/path/sub')
688
 
        self.assertRaises(errors.PathNotChild, t.relpath,
689
 
                          'sftp://user@host.com:33/abs/path/sub')
690
 
        # Make sure it works when we don't supply a username
691
 
        t = ConnectedTransport('sftp://host.com/abs/path')
692
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
693
 
 
694
 
        # Make sure it works when parts of the path will be url encoded
695
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
696
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
697
 
 
698
 
    def test_connection_sharing_propagate_credentials(self):
699
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
700
 
        self.assertEquals('user', t._user)
701
 
        self.assertEquals('host.com', t._host)
702
 
        self.assertIs(None, t._get_connection())
703
 
        self.assertIs(None, t._password)
704
 
        c = t.clone('subdir')
705
 
        self.assertIs(None, c._get_connection())
706
 
        self.assertIs(None, t._password)
707
 
 
708
 
        # Simulate the user entering a password
709
 
        password = 'secret'
710
 
        connection = object()
711
 
        t._set_connection(connection, password)
712
 
        self.assertIs(connection, t._get_connection())
713
 
        self.assertIs(password, t._get_credentials())
714
 
        self.assertIs(connection, c._get_connection())
715
 
        self.assertIs(password, c._get_credentials())
716
 
 
717
 
        # credentials can be updated
718
 
        new_password = 'even more secret'
719
 
        c._update_credentials(new_password)
720
 
        self.assertIs(connection, t._get_connection())
721
 
        self.assertIs(new_password, t._get_credentials())
722
 
        self.assertIs(connection, c._get_connection())
723
 
        self.assertIs(new_password, c._get_credentials())
724
 
 
725
 
 
726
 
class TestReusedTransports(TestCase):
727
 
    """Tests for transport reuse"""
728
 
 
729
 
    def test_reuse_same_transport(self):
730
 
        possible_transports = []
731
 
        t1 = get_transport('http://foo/',
732
 
                           possible_transports=possible_transports)
733
 
        self.assertEqual([t1], possible_transports)
734
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
735
 
        self.assertIs(t1, t2)
736
 
 
737
 
        # Also check that final '/' are handled correctly
738
 
        t3 = get_transport('http://foo/path/')
739
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
740
 
        self.assertIs(t3, t4)
741
 
 
742
 
        t5 = get_transport('http://foo/path')
743
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
744
 
        self.assertIs(t5, t6)
745
 
 
746
 
    def test_don_t_reuse_different_transport(self):
747
 
        t1 = get_transport('http://foo/path')
748
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
749
 
        self.assertIsNot(t1, t2)
750
 
 
751
 
 
752
 
class TestTransportTrace(TestCase):
753
 
 
754
 
    def test_get(self):
755
 
        transport = get_transport('trace+memory://')
756
 
        self.assertIsInstance(
757
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
758
 
 
759
 
    def test_clone_preserves_activity(self):
760
 
        transport = get_transport('trace+memory://')
761
 
        transport2 = transport.clone('.')
762
 
        self.assertTrue(transport is not transport2)
763
 
        self.assertTrue(transport._activity is transport2._activity)
764
 
 
765
 
    # the following specific tests are for the operations that have made use of
766
 
    # logging in tests; we could test every single operation but doing that
767
 
    # still won't cause a test failure when the top level Transport API
768
 
    # changes; so there is little return doing that.
769
 
    def test_get(self):
770
 
        transport = get_transport('trace+memory:///')
771
 
        transport.put_bytes('foo', 'barish')
772
 
        transport.get('foo')
773
 
        expected_result = []
774
 
        # put_bytes records the bytes, not the content to avoid memory
775
 
        # pressure.
776
 
        expected_result.append(('put_bytes', 'foo', 6, None))
777
 
        # get records the file name only.
778
 
        expected_result.append(('get', 'foo'))
779
 
        self.assertEqual(expected_result, transport._activity)
780
 
 
781
 
    def test_readv(self):
782
 
        transport = get_transport('trace+memory:///')
783
 
        transport.put_bytes('foo', 'barish')
784
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
785
 
            upper_limit=6))
786
 
        expected_result = []
787
 
        # put_bytes records the bytes, not the content to avoid memory
788
 
        # pressure.
789
 
        expected_result.append(('put_bytes', 'foo', 6, None))
790
 
        # readv records the supplied offset request
791
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
792
 
        self.assertEqual(expected_result, transport._activity)