~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Martin Pool
  • Date: 2010-02-03 00:08:23 UTC
  • mto: This revision was merged to the branch mainline in revision 5002.
  • Revision ID: mbp@sourcefrog.net-20100203000823-fcyf2791xrl3fbfo
expand tabs

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
 
23
 
 
24
from bzrlib import (
 
25
    errors,
 
26
    osutils,
 
27
    tests,
 
28
    transport as _mod_transport,
 
29
    urlutils,
 
30
    )
 
31
from bzrlib.transport import (
 
32
    fakenfs,
 
33
    memory,
 
34
    readonly,
 
35
    )
 
36
from bzrlib.errors import (DependencyNotPresent,
 
37
                           FileExists,
 
38
                           InvalidURLJoin,
 
39
                           NoSuchFile,
 
40
                           PathNotChild,
 
41
                           ReadError,
 
42
                           UnsupportedProtocol,
 
43
                           )
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
45
from bzrlib.transport import (_clear_protocol_handlers,
 
46
                              _CoalescedOffset,
 
47
                              ConnectedTransport,
 
48
                              _get_protocol_handlers,
 
49
                              _set_protocol_handlers,
 
50
                              _get_transport_modules,
 
51
                              get_transport,
 
52
                              LateReadError,
 
53
                              register_lazy_transport,
 
54
                              register_transport_proto,
 
55
                              Transport,
 
56
                              )
 
57
from bzrlib.transport.chroot import ChrootServer
 
58
from bzrlib.transport.local import (LocalTransport,
 
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
 
61
 
 
62
 
 
63
# TODO: Should possibly split transport-specific tests into their own files.
 
64
 
 
65
 
 
66
class TestTransport(TestCase):
 
67
    """Test the non transport-concrete class functionality."""
 
68
 
 
69
    def test__get_set_protocol_handlers(self):
 
70
        handlers = _get_protocol_handlers()
 
71
        self.assertNotEqual([], handlers.keys( ))
 
72
        try:
 
73
            _clear_protocol_handlers()
 
74
            self.assertEqual([], _get_protocol_handlers().keys())
 
75
        finally:
 
76
            _set_protocol_handlers(handlers)
 
77
 
 
78
    def test_get_transport_modules(self):
 
79
        handlers = _get_protocol_handlers()
 
80
        # don't pollute the current handlers
 
81
        _clear_protocol_handlers()
 
82
        class SampleHandler(object):
 
83
            """I exist, isnt that enough?"""
 
84
        try:
 
85
            _clear_protocol_handlers()
 
86
            register_transport_proto('foo')
 
87
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
88
                                    'TestTransport.SampleHandler')
 
89
            register_transport_proto('bar')
 
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
91
                                    'TestTransport.SampleHandler')
 
92
            self.assertEqual([SampleHandler.__module__,
 
93
                              'bzrlib.transport.chroot',
 
94
                              'bzrlib.transport.pathfilter'],
 
95
                             _get_transport_modules())
 
96
        finally:
 
97
            _set_protocol_handlers(handlers)
 
98
 
 
99
    def test_transport_dependency(self):
 
100
        """Transport with missing dependency causes no error"""
 
101
        saved_handlers = _get_protocol_handlers()
 
102
        # don't pollute the current handlers
 
103
        _clear_protocol_handlers()
 
104
        try:
 
105
            register_transport_proto('foo')
 
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
107
                    'BadTransportHandler')
 
108
            try:
 
109
                get_transport('foo://fooserver/foo')
 
110
            except UnsupportedProtocol, e:
 
111
                e_str = str(e)
 
112
                self.assertEquals('Unsupported protocol'
 
113
                                  ' for url "foo://fooserver/foo":'
 
114
                                  ' Unable to import library "some_lib":'
 
115
                                  ' testing missing dependency', str(e))
 
116
            else:
 
117
                self.fail('Did not raise UnsupportedProtocol')
 
118
        finally:
 
119
            # restore original values
 
120
            _set_protocol_handlers(saved_handlers)
 
121
 
 
122
    def test_transport_fallback(self):
 
123
        """Transport with missing dependency causes no error"""
 
124
        saved_handlers = _get_protocol_handlers()
 
125
        try:
 
126
            _clear_protocol_handlers()
 
127
            register_transport_proto('foo')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BackupTransportHandler')
 
130
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
131
                    'BadTransportHandler')
 
132
            t = get_transport('foo://fooserver/foo')
 
133
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
134
        finally:
 
135
            _set_protocol_handlers(saved_handlers)
 
136
 
 
137
    def test_ssh_hints(self):
 
138
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
139
        try:
 
140
            get_transport('ssh://fooserver/foo')
 
141
        except UnsupportedProtocol, e:
 
142
            e_str = str(e)
 
143
            self.assertEquals('Unsupported protocol'
 
144
                              ' for url "ssh://fooserver/foo":'
 
145
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
146
                              str(e))
 
147
        else:
 
148
            self.fail('Did not raise UnsupportedProtocol')
 
149
 
 
150
    def test_LateReadError(self):
 
151
        """The LateReadError helper should raise on read()."""
 
152
        a_file = LateReadError('a path')
 
153
        try:
 
154
            a_file.read()
 
155
        except ReadError, error:
 
156
            self.assertEqual('a path', error.path)
 
157
        self.assertRaises(ReadError, a_file.read, 40)
 
158
        a_file.close()
 
159
 
 
160
    def test__combine_paths(self):
 
161
        t = Transport('/')
 
162
        self.assertEqual('/home/sarah/project/foo',
 
163
                         t._combine_paths('/home/sarah', 'project/foo'))
 
164
        self.assertEqual('/etc',
 
165
                         t._combine_paths('/home/sarah', '../../etc'))
 
166
        self.assertEqual('/etc',
 
167
                         t._combine_paths('/home/sarah', '../../../etc'))
 
168
        self.assertEqual('/etc',
 
169
                         t._combine_paths('/home/sarah', '/etc'))
 
170
 
 
171
    def test_local_abspath_non_local_transport(self):
 
172
        # the base implementation should throw
 
173
        t = memory.MemoryTransport()
 
174
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
175
        self.assertEqual('memory:///t is not a local path.', str(e))
 
176
 
 
177
 
 
178
class TestCoalesceOffsets(TestCase):
 
179
 
 
180
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
181
        coalesce = Transport._coalesce_offsets
 
182
        exp = [_CoalescedOffset(*x) for x in expected]
 
183
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
184
                            max_size=max_size))
 
185
        self.assertEqual(exp, out)
 
186
 
 
187
    def test_coalesce_empty(self):
 
188
        self.check([], [])
 
189
 
 
190
    def test_coalesce_simple(self):
 
191
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
192
 
 
193
    def test_coalesce_unrelated(self):
 
194
        self.check([(0, 10, [(0, 10)]),
 
195
                    (20, 10, [(0, 10)]),
 
196
                   ], [(0, 10), (20, 10)])
 
197
 
 
198
    def test_coalesce_unsorted(self):
 
199
        self.check([(20, 10, [(0, 10)]),
 
200
                    (0, 10, [(0, 10)]),
 
201
                   ], [(20, 10), (0, 10)])
 
202
 
 
203
    def test_coalesce_nearby(self):
 
204
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
205
                   [(0, 10), (10, 10)])
 
206
 
 
207
    def test_coalesce_overlapped(self):
 
208
        self.assertRaises(ValueError,
 
209
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
210
                        [(0, 10), (5, 10)])
 
211
 
 
212
    def test_coalesce_limit(self):
 
213
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
214
                              (30, 10), (40, 10)]),
 
215
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
216
                              (30, 10), (40, 10)]),
 
217
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
218
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
219
                       (90, 10), (100, 10)],
 
220
                    limit=5)
 
221
 
 
222
    def test_coalesce_no_limit(self):
 
223
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
224
                               (30, 10), (40, 10), (50, 10),
 
225
                               (60, 10), (70, 10), (80, 10),
 
226
                               (90, 10)]),
 
227
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
228
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
229
                       (90, 10), (100, 10)])
 
230
 
 
231
    def test_coalesce_fudge(self):
 
232
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
233
                    (100, 10, [(0, 10),]),
 
234
                   ], [(10, 10), (30, 10), (100, 10)],
 
235
                   fudge=10
 
236
                  )
 
237
    def test_coalesce_max_size(self):
 
238
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
239
                    (30, 50, [(0, 50)]),
 
240
                    # If one range is above max_size, it gets its own coalesced
 
241
                    # offset
 
242
                    (100, 80, [(0, 80),]),],
 
243
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
244
                   max_size=50
 
245
                  )
 
246
 
 
247
    def test_coalesce_no_max_size(self):
 
248
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
249
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
250
                  )
 
251
 
 
252
    def test_coalesce_default_limit(self):
 
253
        # By default we use a 100MB max size.
 
254
        ten_mb = 10*1024*1024
 
255
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
256
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
257
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
258
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
259
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
260
                   max_size=1*1024*1024*1024)
 
261
 
 
262
 
 
263
class TestMemoryServer(TestCase):
 
264
 
 
265
    def test_create_server(self):
 
266
        server = memory.MemoryServer()
 
267
        server.start_server()
 
268
        url = server.get_url()
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
 
271
        del t
 
272
        server.stop_server()
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
 
274
        self.assertRaises(errors.UnsupportedProtocol,
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
 
278
class TestMemoryTransport(TestCase):
 
279
 
 
280
    def test_get_transport(self):
 
281
        memory.MemoryTransport()
 
282
 
 
283
    def test_clone(self):
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
286
        self.assertEqual("memory:///", transport.clone("/").base)
 
287
 
 
288
    def test_abspath(self):
 
289
        transport = memory.MemoryTransport()
 
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
291
 
 
292
    def test_abspath_of_root(self):
 
293
        transport = memory.MemoryTransport()
 
294
        self.assertEqual("memory:///", transport.base)
 
295
        self.assertEqual("memory:///", transport.abspath('/'))
 
296
 
 
297
    def test_abspath_of_relpath_starting_at_root(self):
 
298
        transport = memory.MemoryTransport()
 
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
300
 
 
301
    def test_append_and_get(self):
 
302
        transport = memory.MemoryTransport()
 
303
        transport.append_bytes('path', 'content')
 
304
        self.assertEqual(transport.get('path').read(), 'content')
 
305
        transport.append_file('path', StringIO('content'))
 
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
307
 
 
308
    def test_put_and_get(self):
 
309
        transport = memory.MemoryTransport()
 
310
        transport.put_file('path', StringIO('content'))
 
311
        self.assertEqual(transport.get('path').read(), 'content')
 
312
        transport.put_bytes('path', 'content')
 
313
        self.assertEqual(transport.get('path').read(), 'content')
 
314
 
 
315
    def test_append_without_dir_fails(self):
 
316
        transport = memory.MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.append_bytes, 'dir/path', 'content')
 
319
 
 
320
    def test_put_without_dir_fails(self):
 
321
        transport = memory.MemoryTransport()
 
322
        self.assertRaises(NoSuchFile,
 
323
                          transport.put_file, 'dir/path', StringIO('content'))
 
324
 
 
325
    def test_get_missing(self):
 
326
        transport = memory.MemoryTransport()
 
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
328
 
 
329
    def test_has_missing(self):
 
330
        transport = memory.MemoryTransport()
 
331
        self.assertEquals(False, transport.has('foo'))
 
332
 
 
333
    def test_has_present(self):
 
334
        transport = memory.MemoryTransport()
 
335
        transport.append_bytes('foo', 'content')
 
336
        self.assertEquals(True, transport.has('foo'))
 
337
 
 
338
    def test_list_dir(self):
 
339
        transport = memory.MemoryTransport()
 
340
        transport.put_bytes('foo', 'content')
 
341
        transport.mkdir('dir')
 
342
        transport.put_bytes('dir/subfoo', 'content')
 
343
        transport.put_bytes('dirlike', 'content')
 
344
 
 
345
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
347
 
 
348
    def test_mkdir(self):
 
349
        transport = memory.MemoryTransport()
 
350
        transport.mkdir('dir')
 
351
        transport.append_bytes('dir/path', 'content')
 
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
353
 
 
354
    def test_mkdir_missing_parent(self):
 
355
        transport = memory.MemoryTransport()
 
356
        self.assertRaises(NoSuchFile,
 
357
                          transport.mkdir, 'dir/dir')
 
358
 
 
359
    def test_mkdir_twice(self):
 
360
        transport = memory.MemoryTransport()
 
361
        transport.mkdir('dir')
 
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
363
 
 
364
    def test_parameters(self):
 
365
        transport = memory.MemoryTransport()
 
366
        self.assertEqual(True, transport.listable())
 
367
        self.assertEqual(False, transport.is_readonly())
 
368
 
 
369
    def test_iter_files_recursive(self):
 
370
        transport = memory.MemoryTransport()
 
371
        transport.mkdir('dir')
 
372
        transport.put_bytes('dir/foo', 'content')
 
373
        transport.put_bytes('dir/bar', 'content')
 
374
        transport.put_bytes('bar', 'content')
 
375
        paths = set(transport.iter_files_recursive())
 
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
377
 
 
378
    def test_stat(self):
 
379
        transport = memory.MemoryTransport()
 
380
        transport.put_bytes('foo', 'content')
 
381
        transport.put_bytes('bar', 'phowar')
 
382
        self.assertEqual(7, transport.stat('foo').st_size)
 
383
        self.assertEqual(6, transport.stat('bar').st_size)
 
384
 
 
385
 
 
386
class ChrootDecoratorTransportTest(TestCase):
 
387
    """Chroot decoration specific tests."""
 
388
 
 
389
    def test_abspath(self):
 
390
        # The abspath is always relative to the chroot_url.
 
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
392
        self.start_server(server)
 
393
        transport = get_transport(server.get_url())
 
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
395
 
 
396
        subdir_transport = transport.clone('subdir')
 
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
398
 
 
399
    def test_clone(self):
 
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
401
        self.start_server(server)
 
402
        transport = get_transport(server.get_url())
 
403
        # relpath from root and root path are the same
 
404
        relpath_cloned = transport.clone('foo')
 
405
        abspath_cloned = transport.clone('/foo')
 
406
        self.assertEqual(server, relpath_cloned.server)
 
407
        self.assertEqual(server, abspath_cloned.server)
 
408
 
 
409
    def test_chroot_url_preserves_chroot(self):
 
410
        """Calling get_transport on a chroot transport's base should produce a
 
411
        transport with exactly the same behaviour as the original chroot
 
412
        transport.
 
413
 
 
414
        This is so that it is not possible to escape a chroot by doing::
 
415
            url = chroot_transport.base
 
416
            parent_url = urlutils.join(url, '..')
 
417
            new_transport = get_transport(parent_url)
 
418
        """
 
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
420
        self.start_server(server)
 
421
        transport = get_transport(server.get_url())
 
422
        new_transport = get_transport(transport.base)
 
423
        self.assertEqual(transport.server, new_transport.server)
 
424
        self.assertEqual(transport.base, new_transport.base)
 
425
 
 
426
    def test_urljoin_preserves_chroot(self):
 
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
428
        URL that escapes the intended chroot.
 
429
 
 
430
        This is so that it is not possible to escape a chroot by doing::
 
431
            url = chroot_transport.base
 
432
            parent_url = urlutils.join(url, '..')
 
433
            new_transport = get_transport(parent_url)
 
434
        """
 
435
        server = ChrootServer(get_transport('memory:///path/'))
 
436
        self.start_server(server)
 
437
        transport = get_transport(server.get_url())
 
438
        self.assertRaises(
 
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
440
 
 
441
 
 
442
class ChrootServerTest(TestCase):
 
443
 
 
444
    def test_construct(self):
 
445
        backing_transport = memory.MemoryTransport()
 
446
        server = ChrootServer(backing_transport)
 
447
        self.assertEqual(backing_transport, server.backing_transport)
 
448
 
 
449
    def test_setUp(self):
 
450
        backing_transport = memory.MemoryTransport()
 
451
        server = ChrootServer(backing_transport)
 
452
        server.start_server()
 
453
        try:
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
455
        finally:
 
456
            server.stop_server()
 
457
 
 
458
    def test_stop_server(self):
 
459
        backing_transport = memory.MemoryTransport()
 
460
        server = ChrootServer(backing_transport)
 
461
        server.start_server()
 
462
        server.stop_server()
 
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
464
 
 
465
    def test_get_url(self):
 
466
        backing_transport = memory.MemoryTransport()
 
467
        server = ChrootServer(backing_transport)
 
468
        server.start_server()
 
469
        try:
 
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
471
        finally:
 
472
            server.stop_server()
 
473
 
 
474
 
 
475
class PathFilteringDecoratorTransportTest(TestCase):
 
476
    """Pathfilter decoration specific tests."""
 
477
 
 
478
    def test_abspath(self):
 
479
        # The abspath is always relative to the base of the backing transport.
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
481
            lambda x: x)
 
482
        server.start_server()
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
485
 
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
488
        server.stop_server()
 
489
 
 
490
    def make_pf_transport(self, filter_func=None):
 
491
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
492
        
 
493
        :param filter_func: by default this will be a no-op function.  Use this
 
494
            parameter to override it."""
 
495
        if filter_func is None:
 
496
            filter_func = lambda x: x
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
 
499
        server.start_server()
 
500
        self.addCleanup(server.stop_server)
 
501
        return get_transport(server.get_url())
 
502
 
 
503
    def test__filter(self):
 
504
        # _filter (with an identity func as filter_func) always returns
 
505
        # paths relative to the base of the backing transport.
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
 
511
        # The base of the pathfiltering transport is taken into account too.
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
 
518
 
 
519
    def test_filter_invocation(self):
 
520
        filter_log = []
 
521
        def filter(path):
 
522
            filter_log.append(path)
 
523
            return path
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
 
526
        self.assertEqual(['abc'], filter_log)
 
527
        del filter_log[:]
 
528
        transport.clone('abc').has('xyz')
 
529
        self.assertEqual(['abc/xyz'], filter_log)
 
530
        del filter_log[:]
 
531
        transport.has('/abc')
 
532
        self.assertEqual(['abc'], filter_log)
 
533
 
 
534
    def test_clone(self):
 
535
        transport = self.make_pf_transport()
 
536
        # relpath from root and root path are the same
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
 
541
 
 
542
    def test_url_preserves_pathfiltering(self):
 
543
        """Calling get_transport on a pathfiltered transport's base should
 
544
        produce a transport with exactly the same behaviour as the original
 
545
        pathfiltered transport.
 
546
 
 
547
        This is so that it is not possible to escape (accidentally or
 
548
        otherwise) the filtering by doing::
 
549
            url = filtered_transport.base
 
550
            parent_url = urlutils.join(url, '..')
 
551
            new_transport = get_transport(parent_url)
 
552
        """
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
 
557
 
 
558
 
 
559
class ReadonlyDecoratorTransportTest(TestCase):
 
560
    """Readonly decoration specific tests."""
 
561
 
 
562
    def test_local_parameters(self):
 
563
        # connect to . in readonly mode
 
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
565
        self.assertEqual(True, transport.listable())
 
566
        self.assertEqual(True, transport.is_readonly())
 
567
 
 
568
    def test_http_parameters(self):
 
569
        from bzrlib.tests.http_server import HttpServer
 
570
        # connect to '.' via http which is not listable
 
571
        server = HttpServer()
 
572
        self.start_server(server)
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
581
    """NFS decorator specific tests."""
 
582
 
 
583
    def get_nfs_transport(self, url):
 
584
        # connect to url with nfs decoration
 
585
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
586
 
 
587
    def test_local_parameters(self):
 
588
        # the listable and is_readonly parameters
 
589
        # are not changed by the fakenfs decorator
 
590
        transport = self.get_nfs_transport('.')
 
591
        self.assertEqual(True, transport.listable())
 
592
        self.assertEqual(False, transport.is_readonly())
 
593
 
 
594
    def test_http_parameters(self):
 
595
        # the listable and is_readonly parameters
 
596
        # are not changed by the fakenfs decorator
 
597
        from bzrlib.tests.http_server import HttpServer
 
598
        # connect to '.' via http which is not listable
 
599
        server = HttpServer()
 
600
        self.start_server(server)
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
 
606
 
 
607
    def test_fakenfs_server_default(self):
 
608
        # a FakeNFSServer() should bring up a local relpath server for itself
 
609
        server = fakenfs.FakeNFSServer()
 
610
        self.start_server(server)
 
611
        # the url should be decorated appropriately
 
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
613
        # and we should be able to get a transport for it
 
614
        transport = get_transport(server.get_url())
 
615
        # which must be a FakeNFSTransportDecorator instance.
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
617
 
 
618
    def test_fakenfs_rename_semantics(self):
 
619
        # a FakeNFS transport must mangle the way rename errors occur to
 
620
        # look like NFS problems.
 
621
        transport = self.get_nfs_transport('.')
 
622
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
623
                        transport=transport)
 
624
        self.assertRaises(errors.ResourceBusy,
 
625
                          transport.rename, 'from', 'to')
 
626
 
 
627
 
 
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
629
    """Tests for simulation of VFAT restrictions"""
 
630
 
 
631
    def get_vfat_transport(self, url):
 
632
        """Return vfat-backed transport for test directory"""
 
633
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
634
        return FakeVFATTransportDecorator('vfat+' + url)
 
635
 
 
636
    def test_transport_creation(self):
 
637
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
638
        transport = self.get_vfat_transport('.')
 
639
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
640
 
 
641
    def test_transport_mkdir(self):
 
642
        transport = self.get_vfat_transport('.')
 
643
        transport.mkdir('HELLO')
 
644
        self.assertTrue(transport.has('hello'))
 
645
        self.assertTrue(transport.has('Hello'))
 
646
 
 
647
    def test_forbidden_chars(self):
 
648
        transport = self.get_vfat_transport('.')
 
649
        self.assertRaises(ValueError, transport.has, "<NU>")
 
650
 
 
651
 
 
652
class BadTransportHandler(Transport):
 
653
    def __init__(self, base_url):
 
654
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
655
 
 
656
 
 
657
class BackupTransportHandler(Transport):
 
658
    """Test transport that works as a backup for the BadTransportHandler"""
 
659
    pass
 
660
 
 
661
 
 
662
class TestTransportImplementation(TestCaseInTempDir):
 
663
    """Implementation verification for transports.
 
664
 
 
665
    To verify a transport we need a server factory, which is a callable
 
666
    that accepts no parameters and returns an implementation of
 
667
    bzrlib.transport.Server.
 
668
 
 
669
    That Server is then used to construct transport instances and test
 
670
    the transport via loopback activity.
 
671
 
 
672
    Currently this assumes that the Transport object is connected to the
 
673
    current working directory.  So that whatever is done
 
674
    through the transport, should show up in the working
 
675
    directory, and vice-versa. This is a bug, because its possible to have
 
676
    URL schemes which provide access to something that may not be
 
677
    result in storage on the local disk, i.e. due to file system limits, or
 
678
    due to it being a database or some other non-filesystem tool.
 
679
 
 
680
    This also tests to make sure that the functions work with both
 
681
    generators and lists (assuming iter(list) is effectively a generator)
 
682
    """
 
683
 
 
684
    def setUp(self):
 
685
        super(TestTransportImplementation, self).setUp()
 
686
        self._server = self.transport_server()
 
687
        self.start_server(self._server)
 
688
 
 
689
    def get_transport(self, relpath=None):
 
690
        """Return a connected transport to the local directory.
 
691
 
 
692
        :param relpath: a path relative to the base url.
 
693
        """
 
694
        base_url = self._server.get_url()
 
695
        url = self._adjust_url(base_url, relpath)
 
696
        # try getting the transport via the regular interface:
 
697
        t = get_transport(url)
 
698
        # vila--20070607 if the following are commented out the test suite
 
699
        # still pass. Is this really still needed or was it a forgotten
 
700
        # temporary fix ?
 
701
        if not isinstance(t, self.transport_class):
 
702
            # we did not get the correct transport class type. Override the
 
703
            # regular connection behaviour by direct construction.
 
704
            t = self.transport_class(url)
 
705
        return t
 
706
 
 
707
 
 
708
class TestLocalTransports(TestCase):
 
709
 
 
710
    def test_get_transport_from_abspath(self):
 
711
        here = osutils.abspath('.')
 
712
        t = get_transport(here)
 
713
        self.assertIsInstance(t, LocalTransport)
 
714
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
715
 
 
716
    def test_get_transport_from_relpath(self):
 
717
        here = osutils.abspath('.')
 
718
        t = get_transport('.')
 
719
        self.assertIsInstance(t, LocalTransport)
 
720
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
721
 
 
722
    def test_get_transport_from_local_url(self):
 
723
        here = osutils.abspath('.')
 
724
        here_url = urlutils.local_path_to_url(here) + '/'
 
725
        t = get_transport(here_url)
 
726
        self.assertIsInstance(t, LocalTransport)
 
727
        self.assertEquals(t.base, here_url)
 
728
 
 
729
    def test_local_abspath(self):
 
730
        here = osutils.abspath('.')
 
731
        t = get_transport(here)
 
732
        self.assertEquals(t.local_abspath(''), here)
 
733
 
 
734
 
 
735
class TestWin32LocalTransport(TestCase):
 
736
 
 
737
    def test_unc_clone_to_root(self):
 
738
        # Win32 UNC path like \\HOST\path
 
739
        # clone to root should stop at least at \\HOST part
 
740
        # not on \\
 
741
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
742
        for i in xrange(4):
 
743
            t = t.clone('..')
 
744
        self.assertEquals(t.base, 'file://HOST/')
 
745
        # make sure we reach the root
 
746
        t = t.clone('..')
 
747
        self.assertEquals(t.base, 'file://HOST/')
 
748
 
 
749
 
 
750
class TestConnectedTransport(TestCase):
 
751
    """Tests for connected to remote server transports"""
 
752
 
 
753
    def test_parse_url(self):
 
754
        t = ConnectedTransport('http://simple.example.com/home/source')
 
755
        self.assertEquals(t._host, 'simple.example.com')
 
756
        self.assertEquals(t._port, None)
 
757
        self.assertEquals(t._path, '/home/source/')
 
758
        self.failUnless(t._user is None)
 
759
        self.failUnless(t._password is None)
 
760
 
 
761
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
762
 
 
763
    def test_parse_url_with_at_in_user(self):
 
764
        # Bug 228058
 
765
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
766
        self.assertEquals(t._user, 'user@host.com')
 
767
 
 
768
    def test_parse_quoted_url(self):
 
769
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
770
        self.assertEquals(t._host, 'exAmple.com')
 
771
        self.assertEquals(t._port, 2222)
 
772
        self.assertEquals(t._user, 'robey')
 
773
        self.assertEquals(t._password, 'h@t')
 
774
        self.assertEquals(t._path, '/path/')
 
775
 
 
776
        # Base should not keep track of the password
 
777
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
778
 
 
779
    def test_parse_invalid_url(self):
 
780
        self.assertRaises(errors.InvalidURL,
 
781
                          ConnectedTransport,
 
782
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
783
 
 
784
    def test_relpath(self):
 
785
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
786
 
 
787
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
788
        self.assertRaises(errors.PathNotChild, t.relpath,
 
789
                          'http://user@host.com/abs/path/sub')
 
790
        self.assertRaises(errors.PathNotChild, t.relpath,
 
791
                          'sftp://user2@host.com/abs/path/sub')
 
792
        self.assertRaises(errors.PathNotChild, t.relpath,
 
793
                          'sftp://user@otherhost.com/abs/path/sub')
 
794
        self.assertRaises(errors.PathNotChild, t.relpath,
 
795
                          'sftp://user@host.com:33/abs/path/sub')
 
796
        # Make sure it works when we don't supply a username
 
797
        t = ConnectedTransport('sftp://host.com/abs/path')
 
798
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
799
 
 
800
        # Make sure it works when parts of the path will be url encoded
 
801
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
802
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
803
 
 
804
    def test_connection_sharing_propagate_credentials(self):
 
805
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
806
        self.assertEquals('user', t._user)
 
807
        self.assertEquals('host.com', t._host)
 
808
        self.assertIs(None, t._get_connection())
 
809
        self.assertIs(None, t._password)
 
810
        c = t.clone('subdir')
 
811
        self.assertIs(None, c._get_connection())
 
812
        self.assertIs(None, t._password)
 
813
 
 
814
        # Simulate the user entering a password
 
815
        password = 'secret'
 
816
        connection = object()
 
817
        t._set_connection(connection, password)
 
818
        self.assertIs(connection, t._get_connection())
 
819
        self.assertIs(password, t._get_credentials())
 
820
        self.assertIs(connection, c._get_connection())
 
821
        self.assertIs(password, c._get_credentials())
 
822
 
 
823
        # credentials can be updated
 
824
        new_password = 'even more secret'
 
825
        c._update_credentials(new_password)
 
826
        self.assertIs(connection, t._get_connection())
 
827
        self.assertIs(new_password, t._get_credentials())
 
828
        self.assertIs(connection, c._get_connection())
 
829
        self.assertIs(new_password, c._get_credentials())
 
830
 
 
831
 
 
832
class TestReusedTransports(TestCase):
 
833
    """Tests for transport reuse"""
 
834
 
 
835
    def test_reuse_same_transport(self):
 
836
        possible_transports = []
 
837
        t1 = get_transport('http://foo/',
 
838
                           possible_transports=possible_transports)
 
839
        self.assertEqual([t1], possible_transports)
 
840
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
841
        self.assertIs(t1, t2)
 
842
 
 
843
        # Also check that final '/' are handled correctly
 
844
        t3 = get_transport('http://foo/path/')
 
845
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
846
        self.assertIs(t3, t4)
 
847
 
 
848
        t5 = get_transport('http://foo/path')
 
849
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
850
        self.assertIs(t5, t6)
 
851
 
 
852
    def test_don_t_reuse_different_transport(self):
 
853
        t1 = get_transport('http://foo/path')
 
854
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
855
        self.assertIsNot(t1, t2)
 
856
 
 
857
 
 
858
class TestTransportTrace(TestCase):
 
859
 
 
860
    def test_get(self):
 
861
        transport = get_transport('trace+memory://')
 
862
        self.assertIsInstance(
 
863
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
864
 
 
865
    def test_clone_preserves_activity(self):
 
866
        transport = get_transport('trace+memory://')
 
867
        transport2 = transport.clone('.')
 
868
        self.assertTrue(transport is not transport2)
 
869
        self.assertTrue(transport._activity is transport2._activity)
 
870
 
 
871
    # the following specific tests are for the operations that have made use of
 
872
    # logging in tests; we could test every single operation but doing that
 
873
    # still won't cause a test failure when the top level Transport API
 
874
    # changes; so there is little return doing that.
 
875
    def test_get(self):
 
876
        transport = get_transport('trace+memory:///')
 
877
        transport.put_bytes('foo', 'barish')
 
878
        transport.get('foo')
 
879
        expected_result = []
 
880
        # put_bytes records the bytes, not the content to avoid memory
 
881
        # pressure.
 
882
        expected_result.append(('put_bytes', 'foo', 6, None))
 
883
        # get records the file name only.
 
884
        expected_result.append(('get', 'foo'))
 
885
        self.assertEqual(expected_result, transport._activity)
 
886
 
 
887
    def test_readv(self):
 
888
        transport = get_transport('trace+memory:///')
 
889
        transport.put_bytes('foo', 'barish')
 
890
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
891
            upper_limit=6))
 
892
        expected_result = []
 
893
        # put_bytes records the bytes, not the content to avoid memory
 
894
        # pressure.
 
895
        expected_result.append(('put_bytes', 'foo', 6, None))
 
896
        # readv records the supplied offset request
 
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
898
        self.assertEqual(expected_result, transport._activity)
 
899
 
 
900
 
 
901
class TestSSHConnections(tests.TestCaseWithTransport):
 
902
 
 
903
    def test_bzr_connect_to_bzr_ssh(self):
 
904
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
905
 
 
906
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
907
        """
 
908
        # This test actually causes a bzr instance to be invoked, which is very
 
909
        # expensive: it should be the only such test in the test suite.
 
910
        # A reasonable evolution for this would be to simply check inside
 
911
        # check_channel_exec_request that the command is appropriate, and then
 
912
        # satisfy requests in-process.
 
913
        self.requireFeature(features.paramiko)
 
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
915
        # override the interface (doesn't change self._vendor).
 
916
        # Note that this does encryption, so can be slow.
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
 
919
 
 
920
        # Start an SSH server
 
921
        self.command_executed = []
 
922
        # XXX: This is horrible -- we define a really dumb SSH server that
 
923
        # executes commands, and manage the hooking up of stdin/out/err to the
 
924
        # SSH channel ourselves.  Surely this has already been implemented
 
925
        # elsewhere?
 
926
        started = []
 
927
        class StubSSHServer(StubServer):
 
928
 
 
929
            test = self
 
930
 
 
931
            def check_channel_exec_request(self, channel, command):
 
932
                self.test.command_executed.append(command)
 
933
                proc = subprocess.Popen(
 
934
                    command, shell=True, stdin=subprocess.PIPE,
 
935
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
936
 
 
937
                # XXX: horribly inefficient, not to mention ugly.
 
938
                # Start a thread for each of stdin/out/err, and relay bytes from
 
939
                # the subprocess to channel and vice versa.
 
940
                def ferry_bytes(read, write, close):
 
941
                    while True:
 
942
                        bytes = read(1)
 
943
                        if bytes == '':
 
944
                            close()
 
945
                            break
 
946
                        write(bytes)
 
947
 
 
948
                file_functions = [
 
949
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
950
                    (proc.stdout.read, channel.sendall, channel.close),
 
951
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
952
                started.append(proc)
 
953
                for read, write, close in file_functions:
 
954
                    t = threading.Thread(
 
955
                        target=ferry_bytes, args=(read, write, close))
 
956
                    t.start()
 
957
                    started.append(t)
 
958
 
 
959
                return True
 
960
 
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
962
        # We *don't* want to override the default SSH vendor: the detected one
 
963
        # is the one to use.
 
964
        self.start_server(ssh_server)
 
965
        port = ssh_server._listener.port
 
966
 
 
967
        if sys.platform == 'win32':
 
968
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
969
        else:
 
970
            bzr_remote_path = self.get_bzr_path()
 
971
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
972
 
 
973
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
974
        # variable is used to tell bzr what command to run on the remote end.
 
975
        path_to_branch = osutils.abspath('.')
 
976
        if sys.platform == 'win32':
 
977
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
978
            # prefix a '/' to get the right path.
 
979
            path_to_branch = '/' + path_to_branch
 
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
981
        t = get_transport(url)
 
982
        self.permit_url(t.base)
 
983
        t.mkdir('foo')
 
984
 
 
985
        self.assertEqual(
 
986
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
987
            self.command_executed)
 
988
        # Make sure to disconnect, so that the remote process can stop, and we
 
989
        # can cleanup. Then pause the test until everything is shutdown
 
990
        t._client._medium.disconnect()
 
991
        if not started:
 
992
            return
 
993
        # First wait for the subprocess
 
994
        started[0].wait()
 
995
        # And the rest are threads
 
996
        for t in started[1:]:
 
997
            t.join()