~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2005-03-15 05:19:54 UTC
  • Revision ID: mbp@sourcefrog.net-20050315051954-e4bdd6dfd26f8ecf
witty comment

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from cStringIO import StringIO
19
 
import os
20
 
import subprocess
21
 
import sys
22
 
import threading
23
 
 
24
 
from bzrlib import (
25
 
    errors,
26
 
    osutils,
27
 
    tests,
28
 
    transport as _mod_transport,
29
 
    urlutils,
30
 
    )
31
 
from bzrlib.transport import (
32
 
    fakenfs,
33
 
    memory,
34
 
    readonly,
35
 
    )
36
 
from bzrlib.errors import (DependencyNotPresent,
37
 
                           FileExists,
38
 
                           InvalidURLJoin,
39
 
                           NoSuchFile,
40
 
                           PathNotChild,
41
 
                           ReadError,
42
 
                           UnsupportedProtocol,
43
 
                           )
44
 
from bzrlib.tests import features, TestCase, TestCaseInTempDir
45
 
from bzrlib.transport import (_clear_protocol_handlers,
46
 
                              _CoalescedOffset,
47
 
                              ConnectedTransport,
48
 
                              _get_protocol_handlers,
49
 
                              _set_protocol_handlers,
50
 
                              _get_transport_modules,
51
 
                              get_transport,
52
 
                              LateReadError,
53
 
                              register_lazy_transport,
54
 
                              register_transport_proto,
55
 
                              Transport,
56
 
                              )
57
 
from bzrlib.transport.chroot import ChrootServer
58
 
from bzrlib.transport.local import (LocalTransport,
59
 
                                    EmulatedWin32LocalTransport)
60
 
from bzrlib.transport.pathfilter import PathFilteringServer
61
 
 
62
 
 
63
 
# TODO: Should possibly split transport-specific tests into their own files.
64
 
 
65
 
 
66
 
class TestTransport(TestCase):
67
 
    """Test the non transport-concrete class functionality."""
68
 
 
69
 
    def test__get_set_protocol_handlers(self):
70
 
        handlers = _get_protocol_handlers()
71
 
        self.assertNotEqual([], handlers.keys( ))
72
 
        try:
73
 
            _clear_protocol_handlers()
74
 
            self.assertEqual([], _get_protocol_handlers().keys())
75
 
        finally:
76
 
            _set_protocol_handlers(handlers)
77
 
 
78
 
    def test_get_transport_modules(self):
79
 
        handlers = _get_protocol_handlers()
80
 
        # don't pollute the current handlers
81
 
        _clear_protocol_handlers()
82
 
        class SampleHandler(object):
83
 
            """I exist, isnt that enough?"""
84
 
        try:
85
 
            _clear_protocol_handlers()
86
 
            register_transport_proto('foo')
87
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
88
 
                                    'TestTransport.SampleHandler')
89
 
            register_transport_proto('bar')
90
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
91
 
                                    'TestTransport.SampleHandler')
92
 
            self.assertEqual([SampleHandler.__module__,
93
 
                              'bzrlib.transport.chroot',
94
 
                              'bzrlib.transport.pathfilter'],
95
 
                             _get_transport_modules())
96
 
        finally:
97
 
            _set_protocol_handlers(handlers)
98
 
 
99
 
    def test_transport_dependency(self):
100
 
        """Transport with missing dependency causes no error"""
101
 
        saved_handlers = _get_protocol_handlers()
102
 
        # don't pollute the current handlers
103
 
        _clear_protocol_handlers()
104
 
        try:
105
 
            register_transport_proto('foo')
106
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
107
 
                    'BadTransportHandler')
108
 
            try:
109
 
                get_transport('foo://fooserver/foo')
110
 
            except UnsupportedProtocol, e:
111
 
                e_str = str(e)
112
 
                self.assertEquals('Unsupported protocol'
113
 
                                  ' for url "foo://fooserver/foo":'
114
 
                                  ' Unable to import library "some_lib":'
115
 
                                  ' testing missing dependency', str(e))
116
 
            else:
117
 
                self.fail('Did not raise UnsupportedProtocol')
118
 
        finally:
119
 
            # restore original values
120
 
            _set_protocol_handlers(saved_handlers)
121
 
 
122
 
    def test_transport_fallback(self):
123
 
        """Transport with missing dependency causes no error"""
124
 
        saved_handlers = _get_protocol_handlers()
125
 
        try:
126
 
            _clear_protocol_handlers()
127
 
            register_transport_proto('foo')
128
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
129
 
                    'BackupTransportHandler')
130
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
131
 
                    'BadTransportHandler')
132
 
            t = get_transport('foo://fooserver/foo')
133
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
134
 
        finally:
135
 
            _set_protocol_handlers(saved_handlers)
136
 
 
137
 
    def test_ssh_hints(self):
138
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
139
 
        try:
140
 
            get_transport('ssh://fooserver/foo')
141
 
        except UnsupportedProtocol, e:
142
 
            e_str = str(e)
143
 
            self.assertEquals('Unsupported protocol'
144
 
                              ' for url "ssh://fooserver/foo":'
145
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
146
 
                              str(e))
147
 
        else:
148
 
            self.fail('Did not raise UnsupportedProtocol')
149
 
 
150
 
    def test_LateReadError(self):
151
 
        """The LateReadError helper should raise on read()."""
152
 
        a_file = LateReadError('a path')
153
 
        try:
154
 
            a_file.read()
155
 
        except ReadError, error:
156
 
            self.assertEqual('a path', error.path)
157
 
        self.assertRaises(ReadError, a_file.read, 40)
158
 
        a_file.close()
159
 
 
160
 
    def test__combine_paths(self):
161
 
        t = Transport('/')
162
 
        self.assertEqual('/home/sarah/project/foo',
163
 
                         t._combine_paths('/home/sarah', 'project/foo'))
164
 
        self.assertEqual('/etc',
165
 
                         t._combine_paths('/home/sarah', '../../etc'))
166
 
        self.assertEqual('/etc',
167
 
                         t._combine_paths('/home/sarah', '../../../etc'))
168
 
        self.assertEqual('/etc',
169
 
                         t._combine_paths('/home/sarah', '/etc'))
170
 
 
171
 
    def test_local_abspath_non_local_transport(self):
172
 
        # the base implementation should throw
173
 
        t = memory.MemoryTransport()
174
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
175
 
        self.assertEqual('memory:///t is not a local path.', str(e))
176
 
 
177
 
 
178
 
class TestCoalesceOffsets(TestCase):
179
 
 
180
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
181
 
        coalesce = Transport._coalesce_offsets
182
 
        exp = [_CoalescedOffset(*x) for x in expected]
183
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
184
 
                            max_size=max_size))
185
 
        self.assertEqual(exp, out)
186
 
 
187
 
    def test_coalesce_empty(self):
188
 
        self.check([], [])
189
 
 
190
 
    def test_coalesce_simple(self):
191
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
192
 
 
193
 
    def test_coalesce_unrelated(self):
194
 
        self.check([(0, 10, [(0, 10)]),
195
 
                    (20, 10, [(0, 10)]),
196
 
                   ], [(0, 10), (20, 10)])
197
 
 
198
 
    def test_coalesce_unsorted(self):
199
 
        self.check([(20, 10, [(0, 10)]),
200
 
                    (0, 10, [(0, 10)]),
201
 
                   ], [(20, 10), (0, 10)])
202
 
 
203
 
    def test_coalesce_nearby(self):
204
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
205
 
                   [(0, 10), (10, 10)])
206
 
 
207
 
    def test_coalesce_overlapped(self):
208
 
        self.assertRaises(ValueError,
209
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
210
 
                        [(0, 10), (5, 10)])
211
 
 
212
 
    def test_coalesce_limit(self):
213
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
214
 
                              (30, 10), (40, 10)]),
215
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
216
 
                              (30, 10), (40, 10)]),
217
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
218
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
219
 
                       (90, 10), (100, 10)],
220
 
                    limit=5)
221
 
 
222
 
    def test_coalesce_no_limit(self):
223
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
224
 
                               (30, 10), (40, 10), (50, 10),
225
 
                               (60, 10), (70, 10), (80, 10),
226
 
                               (90, 10)]),
227
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
228
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
229
 
                       (90, 10), (100, 10)])
230
 
 
231
 
    def test_coalesce_fudge(self):
232
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
233
 
                    (100, 10, [(0, 10),]),
234
 
                   ], [(10, 10), (30, 10), (100, 10)],
235
 
                   fudge=10
236
 
                  )
237
 
    def test_coalesce_max_size(self):
238
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
239
 
                    (30, 50, [(0, 50)]),
240
 
                    # If one range is above max_size, it gets its own coalesced
241
 
                    # offset
242
 
                    (100, 80, [(0, 80),]),],
243
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
244
 
                   max_size=50
245
 
                  )
246
 
 
247
 
    def test_coalesce_no_max_size(self):
248
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
249
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
250
 
                  )
251
 
 
252
 
    def test_coalesce_default_limit(self):
253
 
        # By default we use a 100MB max size.
254
 
        ten_mb = 10*1024*1024
255
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
256
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
257
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
258
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
259
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
260
 
                   max_size=1*1024*1024*1024)
261
 
 
262
 
 
263
 
class TestMemoryServer(TestCase):
264
 
 
265
 
    def test_create_server(self):
266
 
        server = memory.MemoryServer()
267
 
        server.start_server()
268
 
        url = server.get_url()
269
 
        self.assertTrue(url in _mod_transport.transport_list_registry)
270
 
        t = _mod_transport.get_transport(url)
271
 
        del t
272
 
        server.stop_server()
273
 
        self.assertFalse(url in _mod_transport.transport_list_registry)
274
 
        self.assertRaises(errors.UnsupportedProtocol,
275
 
                          _mod_transport.get_transport, url)
276
 
 
277
 
 
278
 
class TestMemoryTransport(TestCase):
279
 
 
280
 
    def test_get_transport(self):
281
 
        memory.MemoryTransport()
282
 
 
283
 
    def test_clone(self):
284
 
        transport = memory.MemoryTransport()
285
 
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
286
 
        self.assertEqual("memory:///", transport.clone("/").base)
287
 
 
288
 
    def test_abspath(self):
289
 
        transport = memory.MemoryTransport()
290
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
291
 
 
292
 
    def test_abspath_of_root(self):
293
 
        transport = memory.MemoryTransport()
294
 
        self.assertEqual("memory:///", transport.base)
295
 
        self.assertEqual("memory:///", transport.abspath('/'))
296
 
 
297
 
    def test_abspath_of_relpath_starting_at_root(self):
298
 
        transport = memory.MemoryTransport()
299
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
300
 
 
301
 
    def test_append_and_get(self):
302
 
        transport = memory.MemoryTransport()
303
 
        transport.append_bytes('path', 'content')
304
 
        self.assertEqual(transport.get('path').read(), 'content')
305
 
        transport.append_file('path', StringIO('content'))
306
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
307
 
 
308
 
    def test_put_and_get(self):
309
 
        transport = memory.MemoryTransport()
310
 
        transport.put_file('path', StringIO('content'))
311
 
        self.assertEqual(transport.get('path').read(), 'content')
312
 
        transport.put_bytes('path', 'content')
313
 
        self.assertEqual(transport.get('path').read(), 'content')
314
 
 
315
 
    def test_append_without_dir_fails(self):
316
 
        transport = memory.MemoryTransport()
317
 
        self.assertRaises(NoSuchFile,
318
 
                          transport.append_bytes, 'dir/path', 'content')
319
 
 
320
 
    def test_put_without_dir_fails(self):
321
 
        transport = memory.MemoryTransport()
322
 
        self.assertRaises(NoSuchFile,
323
 
                          transport.put_file, 'dir/path', StringIO('content'))
324
 
 
325
 
    def test_get_missing(self):
326
 
        transport = memory.MemoryTransport()
327
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
328
 
 
329
 
    def test_has_missing(self):
330
 
        transport = memory.MemoryTransport()
331
 
        self.assertEquals(False, transport.has('foo'))
332
 
 
333
 
    def test_has_present(self):
334
 
        transport = memory.MemoryTransport()
335
 
        transport.append_bytes('foo', 'content')
336
 
        self.assertEquals(True, transport.has('foo'))
337
 
 
338
 
    def test_list_dir(self):
339
 
        transport = memory.MemoryTransport()
340
 
        transport.put_bytes('foo', 'content')
341
 
        transport.mkdir('dir')
342
 
        transport.put_bytes('dir/subfoo', 'content')
343
 
        transport.put_bytes('dirlike', 'content')
344
 
 
345
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
346
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
347
 
 
348
 
    def test_mkdir(self):
349
 
        transport = memory.MemoryTransport()
350
 
        transport.mkdir('dir')
351
 
        transport.append_bytes('dir/path', 'content')
352
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
353
 
 
354
 
    def test_mkdir_missing_parent(self):
355
 
        transport = memory.MemoryTransport()
356
 
        self.assertRaises(NoSuchFile,
357
 
                          transport.mkdir, 'dir/dir')
358
 
 
359
 
    def test_mkdir_twice(self):
360
 
        transport = memory.MemoryTransport()
361
 
        transport.mkdir('dir')
362
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
363
 
 
364
 
    def test_parameters(self):
365
 
        transport = memory.MemoryTransport()
366
 
        self.assertEqual(True, transport.listable())
367
 
        self.assertEqual(False, transport.is_readonly())
368
 
 
369
 
    def test_iter_files_recursive(self):
370
 
        transport = memory.MemoryTransport()
371
 
        transport.mkdir('dir')
372
 
        transport.put_bytes('dir/foo', 'content')
373
 
        transport.put_bytes('dir/bar', 'content')
374
 
        transport.put_bytes('bar', 'content')
375
 
        paths = set(transport.iter_files_recursive())
376
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
377
 
 
378
 
    def test_stat(self):
379
 
        transport = memory.MemoryTransport()
380
 
        transport.put_bytes('foo', 'content')
381
 
        transport.put_bytes('bar', 'phowar')
382
 
        self.assertEqual(7, transport.stat('foo').st_size)
383
 
        self.assertEqual(6, transport.stat('bar').st_size)
384
 
 
385
 
 
386
 
class ChrootDecoratorTransportTest(TestCase):
387
 
    """Chroot decoration specific tests."""
388
 
 
389
 
    def test_abspath(self):
390
 
        # The abspath is always relative to the chroot_url.
391
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
392
 
        self.start_server(server)
393
 
        transport = get_transport(server.get_url())
394
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
395
 
 
396
 
        subdir_transport = transport.clone('subdir')
397
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
398
 
 
399
 
    def test_clone(self):
400
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
401
 
        self.start_server(server)
402
 
        transport = get_transport(server.get_url())
403
 
        # relpath from root and root path are the same
404
 
        relpath_cloned = transport.clone('foo')
405
 
        abspath_cloned = transport.clone('/foo')
406
 
        self.assertEqual(server, relpath_cloned.server)
407
 
        self.assertEqual(server, abspath_cloned.server)
408
 
 
409
 
    def test_chroot_url_preserves_chroot(self):
410
 
        """Calling get_transport on a chroot transport's base should produce a
411
 
        transport with exactly the same behaviour as the original chroot
412
 
        transport.
413
 
 
414
 
        This is so that it is not possible to escape a chroot by doing::
415
 
            url = chroot_transport.base
416
 
            parent_url = urlutils.join(url, '..')
417
 
            new_transport = get_transport(parent_url)
418
 
        """
419
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
420
 
        self.start_server(server)
421
 
        transport = get_transport(server.get_url())
422
 
        new_transport = get_transport(transport.base)
423
 
        self.assertEqual(transport.server, new_transport.server)
424
 
        self.assertEqual(transport.base, new_transport.base)
425
 
 
426
 
    def test_urljoin_preserves_chroot(self):
427
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
428
 
        URL that escapes the intended chroot.
429
 
 
430
 
        This is so that it is not possible to escape a chroot by doing::
431
 
            url = chroot_transport.base
432
 
            parent_url = urlutils.join(url, '..')
433
 
            new_transport = get_transport(parent_url)
434
 
        """
435
 
        server = ChrootServer(get_transport('memory:///path/'))
436
 
        self.start_server(server)
437
 
        transport = get_transport(server.get_url())
438
 
        self.assertRaises(
439
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
440
 
 
441
 
 
442
 
class ChrootServerTest(TestCase):
443
 
 
444
 
    def test_construct(self):
445
 
        backing_transport = memory.MemoryTransport()
446
 
        server = ChrootServer(backing_transport)
447
 
        self.assertEqual(backing_transport, server.backing_transport)
448
 
 
449
 
    def test_setUp(self):
450
 
        backing_transport = memory.MemoryTransport()
451
 
        server = ChrootServer(backing_transport)
452
 
        server.start_server()
453
 
        try:
454
 
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
455
 
        finally:
456
 
            server.stop_server()
457
 
 
458
 
    def test_stop_server(self):
459
 
        backing_transport = memory.MemoryTransport()
460
 
        server = ChrootServer(backing_transport)
461
 
        server.start_server()
462
 
        server.stop_server()
463
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
464
 
 
465
 
    def test_get_url(self):
466
 
        backing_transport = memory.MemoryTransport()
467
 
        server = ChrootServer(backing_transport)
468
 
        server.start_server()
469
 
        try:
470
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
471
 
        finally:
472
 
            server.stop_server()
473
 
 
474
 
 
475
 
class PathFilteringDecoratorTransportTest(TestCase):
476
 
    """Pathfilter decoration specific tests."""
477
 
 
478
 
    def test_abspath(self):
479
 
        # The abspath is always relative to the base of the backing transport.
480
 
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
481
 
            lambda x: x)
482
 
        server.start_server()
483
 
        transport = get_transport(server.get_url())
484
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
485
 
 
486
 
        subdir_transport = transport.clone('subdir')
487
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
488
 
        server.stop_server()
489
 
 
490
 
    def make_pf_transport(self, filter_func=None):
491
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
492
 
        
493
 
        :param filter_func: by default this will be a no-op function.  Use this
494
 
            parameter to override it."""
495
 
        if filter_func is None:
496
 
            filter_func = lambda x: x
497
 
        server = PathFilteringServer(
498
 
            get_transport('memory:///foo/bar/'), filter_func)
499
 
        server.start_server()
500
 
        self.addCleanup(server.stop_server)
501
 
        return get_transport(server.get_url())
502
 
 
503
 
    def test__filter(self):
504
 
        # _filter (with an identity func as filter_func) always returns
505
 
        # paths relative to the base of the backing transport.
506
 
        transport = self.make_pf_transport()
507
 
        self.assertEqual('foo', transport._filter('foo'))
508
 
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
509
 
        self.assertEqual('', transport._filter('..'))
510
 
        self.assertEqual('', transport._filter('/'))
511
 
        # The base of the pathfiltering transport is taken into account too.
512
 
        transport = transport.clone('subdir1/subdir2')
513
 
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
514
 
        self.assertEqual(
515
 
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
516
 
        self.assertEqual('subdir1', transport._filter('..'))
517
 
        self.assertEqual('', transport._filter('/'))
518
 
 
519
 
    def test_filter_invocation(self):
520
 
        filter_log = []
521
 
        def filter(path):
522
 
            filter_log.append(path)
523
 
            return path
524
 
        transport = self.make_pf_transport(filter)
525
 
        transport.has('abc')
526
 
        self.assertEqual(['abc'], filter_log)
527
 
        del filter_log[:]
528
 
        transport.clone('abc').has('xyz')
529
 
        self.assertEqual(['abc/xyz'], filter_log)
530
 
        del filter_log[:]
531
 
        transport.has('/abc')
532
 
        self.assertEqual(['abc'], filter_log)
533
 
 
534
 
    def test_clone(self):
535
 
        transport = self.make_pf_transport()
536
 
        # relpath from root and root path are the same
537
 
        relpath_cloned = transport.clone('foo')
538
 
        abspath_cloned = transport.clone('/foo')
539
 
        self.assertEqual(transport.server, relpath_cloned.server)
540
 
        self.assertEqual(transport.server, abspath_cloned.server)
541
 
 
542
 
    def test_url_preserves_pathfiltering(self):
543
 
        """Calling get_transport on a pathfiltered transport's base should
544
 
        produce a transport with exactly the same behaviour as the original
545
 
        pathfiltered transport.
546
 
 
547
 
        This is so that it is not possible to escape (accidentally or
548
 
        otherwise) the filtering by doing::
549
 
            url = filtered_transport.base
550
 
            parent_url = urlutils.join(url, '..')
551
 
            new_transport = get_transport(parent_url)
552
 
        """
553
 
        transport = self.make_pf_transport()
554
 
        new_transport = get_transport(transport.base)
555
 
        self.assertEqual(transport.server, new_transport.server)
556
 
        self.assertEqual(transport.base, new_transport.base)
557
 
 
558
 
 
559
 
class ReadonlyDecoratorTransportTest(TestCase):
560
 
    """Readonly decoration specific tests."""
561
 
 
562
 
    def test_local_parameters(self):
563
 
        # connect to . in readonly mode
564
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
565
 
        self.assertEqual(True, transport.listable())
566
 
        self.assertEqual(True, transport.is_readonly())
567
 
 
568
 
    def test_http_parameters(self):
569
 
        from bzrlib.tests.http_server import HttpServer
570
 
        # connect to '.' via http which is not listable
571
 
        server = HttpServer()
572
 
        self.start_server(server)
573
 
        transport = get_transport('readonly+' + server.get_url())
574
 
        self.failUnless(isinstance(transport,
575
 
                                   readonly.ReadonlyTransportDecorator))
576
 
        self.assertEqual(False, transport.listable())
577
 
        self.assertEqual(True, transport.is_readonly())
578
 
 
579
 
 
580
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
581
 
    """NFS decorator specific tests."""
582
 
 
583
 
    def get_nfs_transport(self, url):
584
 
        # connect to url with nfs decoration
585
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
586
 
 
587
 
    def test_local_parameters(self):
588
 
        # the listable and is_readonly parameters
589
 
        # are not changed by the fakenfs decorator
590
 
        transport = self.get_nfs_transport('.')
591
 
        self.assertEqual(True, transport.listable())
592
 
        self.assertEqual(False, transport.is_readonly())
593
 
 
594
 
    def test_http_parameters(self):
595
 
        # the listable and is_readonly parameters
596
 
        # are not changed by the fakenfs decorator
597
 
        from bzrlib.tests.http_server import HttpServer
598
 
        # connect to '.' via http which is not listable
599
 
        server = HttpServer()
600
 
        self.start_server(server)
601
 
        transport = self.get_nfs_transport(server.get_url())
602
 
        self.assertIsInstance(
603
 
            transport, fakenfs.FakeNFSTransportDecorator)
604
 
        self.assertEqual(False, transport.listable())
605
 
        self.assertEqual(True, transport.is_readonly())
606
 
 
607
 
    def test_fakenfs_server_default(self):
608
 
        # a FakeNFSServer() should bring up a local relpath server for itself
609
 
        server = fakenfs.FakeNFSServer()
610
 
        self.start_server(server)
611
 
        # the url should be decorated appropriately
612
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
613
 
        # and we should be able to get a transport for it
614
 
        transport = get_transport(server.get_url())
615
 
        # which must be a FakeNFSTransportDecorator instance.
616
 
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
617
 
 
618
 
    def test_fakenfs_rename_semantics(self):
619
 
        # a FakeNFS transport must mangle the way rename errors occur to
620
 
        # look like NFS problems.
621
 
        transport = self.get_nfs_transport('.')
622
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
623
 
                        transport=transport)
624
 
        self.assertRaises(errors.ResourceBusy,
625
 
                          transport.rename, 'from', 'to')
626
 
 
627
 
 
628
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
629
 
    """Tests for simulation of VFAT restrictions"""
630
 
 
631
 
    def get_vfat_transport(self, url):
632
 
        """Return vfat-backed transport for test directory"""
633
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
634
 
        return FakeVFATTransportDecorator('vfat+' + url)
635
 
 
636
 
    def test_transport_creation(self):
637
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
638
 
        transport = self.get_vfat_transport('.')
639
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
640
 
 
641
 
    def test_transport_mkdir(self):
642
 
        transport = self.get_vfat_transport('.')
643
 
        transport.mkdir('HELLO')
644
 
        self.assertTrue(transport.has('hello'))
645
 
        self.assertTrue(transport.has('Hello'))
646
 
 
647
 
    def test_forbidden_chars(self):
648
 
        transport = self.get_vfat_transport('.')
649
 
        self.assertRaises(ValueError, transport.has, "<NU>")
650
 
 
651
 
 
652
 
class BadTransportHandler(Transport):
653
 
    def __init__(self, base_url):
654
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
655
 
 
656
 
 
657
 
class BackupTransportHandler(Transport):
658
 
    """Test transport that works as a backup for the BadTransportHandler"""
659
 
    pass
660
 
 
661
 
 
662
 
class TestTransportImplementation(TestCaseInTempDir):
663
 
    """Implementation verification for transports.
664
 
 
665
 
    To verify a transport we need a server factory, which is a callable
666
 
    that accepts no parameters and returns an implementation of
667
 
    bzrlib.transport.Server.
668
 
 
669
 
    That Server is then used to construct transport instances and test
670
 
    the transport via loopback activity.
671
 
 
672
 
    Currently this assumes that the Transport object is connected to the
673
 
    current working directory.  So that whatever is done
674
 
    through the transport, should show up in the working
675
 
    directory, and vice-versa. This is a bug, because its possible to have
676
 
    URL schemes which provide access to something that may not be
677
 
    result in storage on the local disk, i.e. due to file system limits, or
678
 
    due to it being a database or some other non-filesystem tool.
679
 
 
680
 
    This also tests to make sure that the functions work with both
681
 
    generators and lists (assuming iter(list) is effectively a generator)
682
 
    """
683
 
 
684
 
    def setUp(self):
685
 
        super(TestTransportImplementation, self).setUp()
686
 
        self._server = self.transport_server()
687
 
        self.start_server(self._server)
688
 
 
689
 
    def get_transport(self, relpath=None):
690
 
        """Return a connected transport to the local directory.
691
 
 
692
 
        :param relpath: a path relative to the base url.
693
 
        """
694
 
        base_url = self._server.get_url()
695
 
        url = self._adjust_url(base_url, relpath)
696
 
        # try getting the transport via the regular interface:
697
 
        t = get_transport(url)
698
 
        # vila--20070607 if the following are commented out the test suite
699
 
        # still pass. Is this really still needed or was it a forgotten
700
 
        # temporary fix ?
701
 
        if not isinstance(t, self.transport_class):
702
 
            # we did not get the correct transport class type. Override the
703
 
            # regular connection behaviour by direct construction.
704
 
            t = self.transport_class(url)
705
 
        return t
706
 
 
707
 
 
708
 
class TestLocalTransports(TestCase):
709
 
 
710
 
    def test_get_transport_from_abspath(self):
711
 
        here = osutils.abspath('.')
712
 
        t = get_transport(here)
713
 
        self.assertIsInstance(t, LocalTransport)
714
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
715
 
 
716
 
    def test_get_transport_from_relpath(self):
717
 
        here = osutils.abspath('.')
718
 
        t = get_transport('.')
719
 
        self.assertIsInstance(t, LocalTransport)
720
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
721
 
 
722
 
    def test_get_transport_from_local_url(self):
723
 
        here = osutils.abspath('.')
724
 
        here_url = urlutils.local_path_to_url(here) + '/'
725
 
        t = get_transport(here_url)
726
 
        self.assertIsInstance(t, LocalTransport)
727
 
        self.assertEquals(t.base, here_url)
728
 
 
729
 
    def test_local_abspath(self):
730
 
        here = osutils.abspath('.')
731
 
        t = get_transport(here)
732
 
        self.assertEquals(t.local_abspath(''), here)
733
 
 
734
 
 
735
 
class TestWin32LocalTransport(TestCase):
736
 
 
737
 
    def test_unc_clone_to_root(self):
738
 
        # Win32 UNC path like \\HOST\path
739
 
        # clone to root should stop at least at \\HOST part
740
 
        # not on \\
741
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
742
 
        for i in xrange(4):
743
 
            t = t.clone('..')
744
 
        self.assertEquals(t.base, 'file://HOST/')
745
 
        # make sure we reach the root
746
 
        t = t.clone('..')
747
 
        self.assertEquals(t.base, 'file://HOST/')
748
 
 
749
 
 
750
 
class TestConnectedTransport(TestCase):
751
 
    """Tests for connected to remote server transports"""
752
 
 
753
 
    def test_parse_url(self):
754
 
        t = ConnectedTransport('http://simple.example.com/home/source')
755
 
        self.assertEquals(t._host, 'simple.example.com')
756
 
        self.assertEquals(t._port, None)
757
 
        self.assertEquals(t._path, '/home/source/')
758
 
        self.failUnless(t._user is None)
759
 
        self.failUnless(t._password is None)
760
 
 
761
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
762
 
 
763
 
    def test_parse_url_with_at_in_user(self):
764
 
        # Bug 228058
765
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
766
 
        self.assertEquals(t._user, 'user@host.com')
767
 
 
768
 
    def test_parse_quoted_url(self):
769
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
770
 
        self.assertEquals(t._host, 'exAmple.com')
771
 
        self.assertEquals(t._port, 2222)
772
 
        self.assertEquals(t._user, 'robey')
773
 
        self.assertEquals(t._password, 'h@t')
774
 
        self.assertEquals(t._path, '/path/')
775
 
 
776
 
        # Base should not keep track of the password
777
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
778
 
 
779
 
    def test_parse_invalid_url(self):
780
 
        self.assertRaises(errors.InvalidURL,
781
 
                          ConnectedTransport,
782
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
783
 
 
784
 
    def test_relpath(self):
785
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
786
 
 
787
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
788
 
        self.assertRaises(errors.PathNotChild, t.relpath,
789
 
                          'http://user@host.com/abs/path/sub')
790
 
        self.assertRaises(errors.PathNotChild, t.relpath,
791
 
                          'sftp://user2@host.com/abs/path/sub')
792
 
        self.assertRaises(errors.PathNotChild, t.relpath,
793
 
                          'sftp://user@otherhost.com/abs/path/sub')
794
 
        self.assertRaises(errors.PathNotChild, t.relpath,
795
 
                          'sftp://user@host.com:33/abs/path/sub')
796
 
        # Make sure it works when we don't supply a username
797
 
        t = ConnectedTransport('sftp://host.com/abs/path')
798
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
799
 
 
800
 
        # Make sure it works when parts of the path will be url encoded
801
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
802
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
803
 
 
804
 
    def test_connection_sharing_propagate_credentials(self):
805
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
806
 
        self.assertEquals('user', t._user)
807
 
        self.assertEquals('host.com', t._host)
808
 
        self.assertIs(None, t._get_connection())
809
 
        self.assertIs(None, t._password)
810
 
        c = t.clone('subdir')
811
 
        self.assertIs(None, c._get_connection())
812
 
        self.assertIs(None, t._password)
813
 
 
814
 
        # Simulate the user entering a password
815
 
        password = 'secret'
816
 
        connection = object()
817
 
        t._set_connection(connection, password)
818
 
        self.assertIs(connection, t._get_connection())
819
 
        self.assertIs(password, t._get_credentials())
820
 
        self.assertIs(connection, c._get_connection())
821
 
        self.assertIs(password, c._get_credentials())
822
 
 
823
 
        # credentials can be updated
824
 
        new_password = 'even more secret'
825
 
        c._update_credentials(new_password)
826
 
        self.assertIs(connection, t._get_connection())
827
 
        self.assertIs(new_password, t._get_credentials())
828
 
        self.assertIs(connection, c._get_connection())
829
 
        self.assertIs(new_password, c._get_credentials())
830
 
 
831
 
 
832
 
class TestReusedTransports(TestCase):
833
 
    """Tests for transport reuse"""
834
 
 
835
 
    def test_reuse_same_transport(self):
836
 
        possible_transports = []
837
 
        t1 = get_transport('http://foo/',
838
 
                           possible_transports=possible_transports)
839
 
        self.assertEqual([t1], possible_transports)
840
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
841
 
        self.assertIs(t1, t2)
842
 
 
843
 
        # Also check that final '/' are handled correctly
844
 
        t3 = get_transport('http://foo/path/')
845
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
846
 
        self.assertIs(t3, t4)
847
 
 
848
 
        t5 = get_transport('http://foo/path')
849
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
850
 
        self.assertIs(t5, t6)
851
 
 
852
 
    def test_don_t_reuse_different_transport(self):
853
 
        t1 = get_transport('http://foo/path')
854
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
855
 
        self.assertIsNot(t1, t2)
856
 
 
857
 
 
858
 
class TestTransportTrace(TestCase):
859
 
 
860
 
    def test_get(self):
861
 
        transport = get_transport('trace+memory://')
862
 
        self.assertIsInstance(
863
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
864
 
 
865
 
    def test_clone_preserves_activity(self):
866
 
        transport = get_transport('trace+memory://')
867
 
        transport2 = transport.clone('.')
868
 
        self.assertTrue(transport is not transport2)
869
 
        self.assertTrue(transport._activity is transport2._activity)
870
 
 
871
 
    # the following specific tests are for the operations that have made use of
872
 
    # logging in tests; we could test every single operation but doing that
873
 
    # still won't cause a test failure when the top level Transport API
874
 
    # changes; so there is little return doing that.
875
 
    def test_get(self):
876
 
        transport = get_transport('trace+memory:///')
877
 
        transport.put_bytes('foo', 'barish')
878
 
        transport.get('foo')
879
 
        expected_result = []
880
 
        # put_bytes records the bytes, not the content to avoid memory
881
 
        # pressure.
882
 
        expected_result.append(('put_bytes', 'foo', 6, None))
883
 
        # get records the file name only.
884
 
        expected_result.append(('get', 'foo'))
885
 
        self.assertEqual(expected_result, transport._activity)
886
 
 
887
 
    def test_readv(self):
888
 
        transport = get_transport('trace+memory:///')
889
 
        transport.put_bytes('foo', 'barish')
890
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
891
 
            upper_limit=6))
892
 
        expected_result = []
893
 
        # put_bytes records the bytes, not the content to avoid memory
894
 
        # pressure.
895
 
        expected_result.append(('put_bytes', 'foo', 6, None))
896
 
        # readv records the supplied offset request
897
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
898
 
        self.assertEqual(expected_result, transport._activity)
899
 
 
900
 
 
901
 
class TestSSHConnections(tests.TestCaseWithTransport):
902
 
 
903
 
    def test_bzr_connect_to_bzr_ssh(self):
904
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
905
 
 
906
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
907
 
        """
908
 
        # This test actually causes a bzr instance to be invoked, which is very
909
 
        # expensive: it should be the only such test in the test suite.
910
 
        # A reasonable evolution for this would be to simply check inside
911
 
        # check_channel_exec_request that the command is appropriate, and then
912
 
        # satisfy requests in-process.
913
 
        self.requireFeature(features.paramiko)
914
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
915
 
        # override the interface (doesn't change self._vendor).
916
 
        # Note that this does encryption, so can be slow.
917
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
918
 
        from bzrlib.tests.stub_sftp import StubServer
919
 
 
920
 
        # Start an SSH server
921
 
        self.command_executed = []
922
 
        # XXX: This is horrible -- we define a really dumb SSH server that
923
 
        # executes commands, and manage the hooking up of stdin/out/err to the
924
 
        # SSH channel ourselves.  Surely this has already been implemented
925
 
        # elsewhere?
926
 
        started = []
927
 
        class StubSSHServer(StubServer):
928
 
 
929
 
            test = self
930
 
 
931
 
            def check_channel_exec_request(self, channel, command):
932
 
                self.test.command_executed.append(command)
933
 
                proc = subprocess.Popen(
934
 
                    command, shell=True, stdin=subprocess.PIPE,
935
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
936
 
 
937
 
                # XXX: horribly inefficient, not to mention ugly.
938
 
                # Start a thread for each of stdin/out/err, and relay bytes from
939
 
                # the subprocess to channel and vice versa.
940
 
                def ferry_bytes(read, write, close):
941
 
                    while True:
942
 
                        bytes = read(1)
943
 
                        if bytes == '':
944
 
                            close()
945
 
                            break
946
 
                        write(bytes)
947
 
 
948
 
                file_functions = [
949
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
950
 
                    (proc.stdout.read, channel.sendall, channel.close),
951
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
952
 
                started.append(proc)
953
 
                for read, write, close in file_functions:
954
 
                    t = threading.Thread(
955
 
                        target=ferry_bytes, args=(read, write, close))
956
 
                    t.start()
957
 
                    started.append(t)
958
 
 
959
 
                return True
960
 
 
961
 
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
962
 
        # We *don't* want to override the default SSH vendor: the detected one
963
 
        # is the one to use.
964
 
        self.start_server(ssh_server)
965
 
        port = ssh_server._listener.port
966
 
 
967
 
        if sys.platform == 'win32':
968
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
969
 
        else:
970
 
            bzr_remote_path = self.get_bzr_path()
971
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
972
 
 
973
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
974
 
        # variable is used to tell bzr what command to run on the remote end.
975
 
        path_to_branch = osutils.abspath('.')
976
 
        if sys.platform == 'win32':
977
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
978
 
            # prefix a '/' to get the right path.
979
 
            path_to_branch = '/' + path_to_branch
980
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
981
 
        t = get_transport(url)
982
 
        self.permit_url(t.base)
983
 
        t.mkdir('foo')
984
 
 
985
 
        self.assertEqual(
986
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
987
 
            self.command_executed)
988
 
        # Make sure to disconnect, so that the remote process can stop, and we
989
 
        # can cleanup. Then pause the test until everything is shutdown
990
 
        t._client._medium.disconnect()
991
 
        if not started:
992
 
            return
993
 
        # First wait for the subprocess
994
 
        started[0].wait()
995
 
        # And the rest are threads
996
 
        for t in started[1:]:
997
 
            t.join()