~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: John Arbash Meinel
  • Date: 2010-01-05 04:08:35 UTC
  • mfrom: (4634.108.10 2.0)
  • mto: This revision was merged to the branch mainline in revision 4933.
  • Revision ID: john@arbash-meinel.com-20100105040835-sq0zrv5dte8sqqib
Merge stable, including bug #495023

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
 
23
 
 
24
import bzrlib
 
25
from bzrlib import (
 
26
    errors,
 
27
    osutils,
 
28
    tests,
 
29
    urlutils,
 
30
    )
 
31
from bzrlib.errors import (DependencyNotPresent,
 
32
                           FileExists,
 
33
                           InvalidURLJoin,
 
34
                           NoSuchFile,
 
35
                           PathNotChild,
 
36
                           ReadError,
 
37
                           UnsupportedProtocol,
 
38
                           )
 
39
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
40
from bzrlib.transport import (_clear_protocol_handlers,
 
41
                              _CoalescedOffset,
 
42
                              ConnectedTransport,
 
43
                              _get_protocol_handlers,
 
44
                              _set_protocol_handlers,
 
45
                              _get_transport_modules,
 
46
                              get_transport,
 
47
                              LateReadError,
 
48
                              register_lazy_transport,
 
49
                              register_transport_proto,
 
50
                              Transport,
 
51
                              )
 
52
from bzrlib.transport.chroot import ChrootServer
 
53
from bzrlib.transport.memory import MemoryTransport
 
54
from bzrlib.transport.local import (LocalTransport,
 
55
                                    EmulatedWin32LocalTransport)
 
56
from bzrlib.transport.pathfilter import PathFilteringServer
 
57
 
 
58
 
 
59
# TODO: Should possibly split transport-specific tests into their own files.
 
60
 
 
61
 
 
62
class TestTransport(TestCase):
 
63
    """Test the non transport-concrete class functionality."""
 
64
 
 
65
    def test__get_set_protocol_handlers(self):
 
66
        handlers = _get_protocol_handlers()
 
67
        self.assertNotEqual([], handlers.keys( ))
 
68
        try:
 
69
            _clear_protocol_handlers()
 
70
            self.assertEqual([], _get_protocol_handlers().keys())
 
71
        finally:
 
72
            _set_protocol_handlers(handlers)
 
73
 
 
74
    def test_get_transport_modules(self):
 
75
        handlers = _get_protocol_handlers()
 
76
        # don't pollute the current handlers
 
77
        _clear_protocol_handlers()
 
78
        class SampleHandler(object):
 
79
            """I exist, isnt that enough?"""
 
80
        try:
 
81
            _clear_protocol_handlers()
 
82
            register_transport_proto('foo')
 
83
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
84
                                    'TestTransport.SampleHandler')
 
85
            register_transport_proto('bar')
 
86
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
87
                                    'TestTransport.SampleHandler')
 
88
            self.assertEqual([SampleHandler.__module__,
 
89
                              'bzrlib.transport.chroot',
 
90
                              'bzrlib.transport.pathfilter'],
 
91
                             _get_transport_modules())
 
92
        finally:
 
93
            _set_protocol_handlers(handlers)
 
94
 
 
95
    def test_transport_dependency(self):
 
96
        """Transport with missing dependency causes no error"""
 
97
        saved_handlers = _get_protocol_handlers()
 
98
        # don't pollute the current handlers
 
99
        _clear_protocol_handlers()
 
100
        try:
 
101
            register_transport_proto('foo')
 
102
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
103
                    'BadTransportHandler')
 
104
            try:
 
105
                get_transport('foo://fooserver/foo')
 
106
            except UnsupportedProtocol, e:
 
107
                e_str = str(e)
 
108
                self.assertEquals('Unsupported protocol'
 
109
                                  ' for url "foo://fooserver/foo":'
 
110
                                  ' Unable to import library "some_lib":'
 
111
                                  ' testing missing dependency', str(e))
 
112
            else:
 
113
                self.fail('Did not raise UnsupportedProtocol')
 
114
        finally:
 
115
            # restore original values
 
116
            _set_protocol_handlers(saved_handlers)
 
117
 
 
118
    def test_transport_fallback(self):
 
119
        """Transport with missing dependency causes no error"""
 
120
        saved_handlers = _get_protocol_handlers()
 
121
        try:
 
122
            _clear_protocol_handlers()
 
123
            register_transport_proto('foo')
 
124
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
125
                    'BackupTransportHandler')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BadTransportHandler')
 
128
            t = get_transport('foo://fooserver/foo')
 
129
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
130
        finally:
 
131
            _set_protocol_handlers(saved_handlers)
 
132
 
 
133
    def test_ssh_hints(self):
 
134
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
135
        try:
 
136
            get_transport('ssh://fooserver/foo')
 
137
        except UnsupportedProtocol, e:
 
138
            e_str = str(e)
 
139
            self.assertEquals('Unsupported protocol'
 
140
                              ' for url "ssh://fooserver/foo":'
 
141
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
142
                              str(e))
 
143
        else:
 
144
            self.fail('Did not raise UnsupportedProtocol')
 
145
 
 
146
    def test_LateReadError(self):
 
147
        """The LateReadError helper should raise on read()."""
 
148
        a_file = LateReadError('a path')
 
149
        try:
 
150
            a_file.read()
 
151
        except ReadError, error:
 
152
            self.assertEqual('a path', error.path)
 
153
        self.assertRaises(ReadError, a_file.read, 40)
 
154
        a_file.close()
 
155
 
 
156
    def test__combine_paths(self):
 
157
        t = Transport('/')
 
158
        self.assertEqual('/home/sarah/project/foo',
 
159
                         t._combine_paths('/home/sarah', 'project/foo'))
 
160
        self.assertEqual('/etc',
 
161
                         t._combine_paths('/home/sarah', '../../etc'))
 
162
        self.assertEqual('/etc',
 
163
                         t._combine_paths('/home/sarah', '../../../etc'))
 
164
        self.assertEqual('/etc',
 
165
                         t._combine_paths('/home/sarah', '/etc'))
 
166
 
 
167
    def test_local_abspath_non_local_transport(self):
 
168
        # the base implementation should throw
 
169
        t = MemoryTransport()
 
170
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
171
        self.assertEqual('memory:///t is not a local path.', str(e))
 
172
 
 
173
 
 
174
class TestCoalesceOffsets(TestCase):
 
175
 
 
176
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
177
        coalesce = Transport._coalesce_offsets
 
178
        exp = [_CoalescedOffset(*x) for x in expected]
 
179
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
180
                            max_size=max_size))
 
181
        self.assertEqual(exp, out)
 
182
 
 
183
    def test_coalesce_empty(self):
 
184
        self.check([], [])
 
185
 
 
186
    def test_coalesce_simple(self):
 
187
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
188
 
 
189
    def test_coalesce_unrelated(self):
 
190
        self.check([(0, 10, [(0, 10)]),
 
191
                    (20, 10, [(0, 10)]),
 
192
                   ], [(0, 10), (20, 10)])
 
193
 
 
194
    def test_coalesce_unsorted(self):
 
195
        self.check([(20, 10, [(0, 10)]),
 
196
                    (0, 10, [(0, 10)]),
 
197
                   ], [(20, 10), (0, 10)])
 
198
 
 
199
    def test_coalesce_nearby(self):
 
200
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
201
                   [(0, 10), (10, 10)])
 
202
 
 
203
    def test_coalesce_overlapped(self):
 
204
        self.assertRaises(ValueError,
 
205
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
206
                        [(0, 10), (5, 10)])
 
207
 
 
208
    def test_coalesce_limit(self):
 
209
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
210
                              (30, 10), (40, 10)]),
 
211
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
212
                              (30, 10), (40, 10)]),
 
213
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
214
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
215
                       (90, 10), (100, 10)],
 
216
                    limit=5)
 
217
 
 
218
    def test_coalesce_no_limit(self):
 
219
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
220
                               (30, 10), (40, 10), (50, 10),
 
221
                               (60, 10), (70, 10), (80, 10),
 
222
                               (90, 10)]),
 
223
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
224
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
225
                       (90, 10), (100, 10)])
 
226
 
 
227
    def test_coalesce_fudge(self):
 
228
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
229
                    (100, 10, [(0, 10),]),
 
230
                   ], [(10, 10), (30, 10), (100, 10)],
 
231
                   fudge=10
 
232
                  )
 
233
    def test_coalesce_max_size(self):
 
234
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
235
                    (30, 50, [(0, 50)]),
 
236
                    # If one range is above max_size, it gets its own coalesced
 
237
                    # offset
 
238
                    (100, 80, [(0, 80),]),],
 
239
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
240
                   max_size=50
 
241
                  )
 
242
 
 
243
    def test_coalesce_no_max_size(self):
 
244
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
245
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
246
                  )
 
247
 
 
248
    def test_coalesce_default_limit(self):
 
249
        # By default we use a 100MB max size.
 
250
        ten_mb = 10*1024*1024
 
251
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
252
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
253
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
254
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
255
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
256
                   max_size=1*1024*1024*1024)
 
257
 
 
258
 
 
259
class TestMemoryTransport(TestCase):
 
260
 
 
261
    def test_get_transport(self):
 
262
        MemoryTransport()
 
263
 
 
264
    def test_clone(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertTrue(isinstance(transport, MemoryTransport))
 
267
        self.assertEqual("memory:///", transport.clone("/").base)
 
268
 
 
269
    def test_abspath(self):
 
270
        transport = MemoryTransport()
 
271
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
272
 
 
273
    def test_abspath_of_root(self):
 
274
        transport = MemoryTransport()
 
275
        self.assertEqual("memory:///", transport.base)
 
276
        self.assertEqual("memory:///", transport.abspath('/'))
 
277
 
 
278
    def test_abspath_of_relpath_starting_at_root(self):
 
279
        transport = MemoryTransport()
 
280
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
281
 
 
282
    def test_append_and_get(self):
 
283
        transport = MemoryTransport()
 
284
        transport.append_bytes('path', 'content')
 
285
        self.assertEqual(transport.get('path').read(), 'content')
 
286
        transport.append_file('path', StringIO('content'))
 
287
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
288
 
 
289
    def test_put_and_get(self):
 
290
        transport = MemoryTransport()
 
291
        transport.put_file('path', StringIO('content'))
 
292
        self.assertEqual(transport.get('path').read(), 'content')
 
293
        transport.put_bytes('path', 'content')
 
294
        self.assertEqual(transport.get('path').read(), 'content')
 
295
 
 
296
    def test_append_without_dir_fails(self):
 
297
        transport = MemoryTransport()
 
298
        self.assertRaises(NoSuchFile,
 
299
                          transport.append_bytes, 'dir/path', 'content')
 
300
 
 
301
    def test_put_without_dir_fails(self):
 
302
        transport = MemoryTransport()
 
303
        self.assertRaises(NoSuchFile,
 
304
                          transport.put_file, 'dir/path', StringIO('content'))
 
305
 
 
306
    def test_get_missing(self):
 
307
        transport = MemoryTransport()
 
308
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
309
 
 
310
    def test_has_missing(self):
 
311
        transport = MemoryTransport()
 
312
        self.assertEquals(False, transport.has('foo'))
 
313
 
 
314
    def test_has_present(self):
 
315
        transport = MemoryTransport()
 
316
        transport.append_bytes('foo', 'content')
 
317
        self.assertEquals(True, transport.has('foo'))
 
318
 
 
319
    def test_list_dir(self):
 
320
        transport = MemoryTransport()
 
321
        transport.put_bytes('foo', 'content')
 
322
        transport.mkdir('dir')
 
323
        transport.put_bytes('dir/subfoo', 'content')
 
324
        transport.put_bytes('dirlike', 'content')
 
325
 
 
326
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
327
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
328
 
 
329
    def test_mkdir(self):
 
330
        transport = MemoryTransport()
 
331
        transport.mkdir('dir')
 
332
        transport.append_bytes('dir/path', 'content')
 
333
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
334
 
 
335
    def test_mkdir_missing_parent(self):
 
336
        transport = MemoryTransport()
 
337
        self.assertRaises(NoSuchFile,
 
338
                          transport.mkdir, 'dir/dir')
 
339
 
 
340
    def test_mkdir_twice(self):
 
341
        transport = MemoryTransport()
 
342
        transport.mkdir('dir')
 
343
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
344
 
 
345
    def test_parameters(self):
 
346
        transport = MemoryTransport()
 
347
        self.assertEqual(True, transport.listable())
 
348
        self.assertEqual(False, transport.is_readonly())
 
349
 
 
350
    def test_iter_files_recursive(self):
 
351
        transport = MemoryTransport()
 
352
        transport.mkdir('dir')
 
353
        transport.put_bytes('dir/foo', 'content')
 
354
        transport.put_bytes('dir/bar', 'content')
 
355
        transport.put_bytes('bar', 'content')
 
356
        paths = set(transport.iter_files_recursive())
 
357
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
358
 
 
359
    def test_stat(self):
 
360
        transport = MemoryTransport()
 
361
        transport.put_bytes('foo', 'content')
 
362
        transport.put_bytes('bar', 'phowar')
 
363
        self.assertEqual(7, transport.stat('foo').st_size)
 
364
        self.assertEqual(6, transport.stat('bar').st_size)
 
365
 
 
366
 
 
367
class ChrootDecoratorTransportTest(TestCase):
 
368
    """Chroot decoration specific tests."""
 
369
 
 
370
    def test_abspath(self):
 
371
        # The abspath is always relative to the chroot_url.
 
372
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
373
        self.start_server(server)
 
374
        transport = get_transport(server.get_url())
 
375
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
376
 
 
377
        subdir_transport = transport.clone('subdir')
 
378
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
379
 
 
380
    def test_clone(self):
 
381
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
382
        self.start_server(server)
 
383
        transport = get_transport(server.get_url())
 
384
        # relpath from root and root path are the same
 
385
        relpath_cloned = transport.clone('foo')
 
386
        abspath_cloned = transport.clone('/foo')
 
387
        self.assertEqual(server, relpath_cloned.server)
 
388
        self.assertEqual(server, abspath_cloned.server)
 
389
 
 
390
    def test_chroot_url_preserves_chroot(self):
 
391
        """Calling get_transport on a chroot transport's base should produce a
 
392
        transport with exactly the same behaviour as the original chroot
 
393
        transport.
 
394
 
 
395
        This is so that it is not possible to escape a chroot by doing::
 
396
            url = chroot_transport.base
 
397
            parent_url = urlutils.join(url, '..')
 
398
            new_transport = get_transport(parent_url)
 
399
        """
 
400
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
401
        self.start_server(server)
 
402
        transport = get_transport(server.get_url())
 
403
        new_transport = get_transport(transport.base)
 
404
        self.assertEqual(transport.server, new_transport.server)
 
405
        self.assertEqual(transport.base, new_transport.base)
 
406
 
 
407
    def test_urljoin_preserves_chroot(self):
 
408
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
409
        URL that escapes the intended chroot.
 
410
 
 
411
        This is so that it is not possible to escape a chroot by doing::
 
412
            url = chroot_transport.base
 
413
            parent_url = urlutils.join(url, '..')
 
414
            new_transport = get_transport(parent_url)
 
415
        """
 
416
        server = ChrootServer(get_transport('memory:///path/'))
 
417
        self.start_server(server)
 
418
        transport = get_transport(server.get_url())
 
419
        self.assertRaises(
 
420
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
421
 
 
422
 
 
423
class ChrootServerTest(TestCase):
 
424
 
 
425
    def test_construct(self):
 
426
        backing_transport = MemoryTransport()
 
427
        server = ChrootServer(backing_transport)
 
428
        self.assertEqual(backing_transport, server.backing_transport)
 
429
 
 
430
    def test_setUp(self):
 
431
        backing_transport = MemoryTransport()
 
432
        server = ChrootServer(backing_transport)
 
433
        server.setUp()
 
434
        try:
 
435
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
436
        finally:
 
437
            server.tearDown()
 
438
 
 
439
    def test_tearDown(self):
 
440
        backing_transport = MemoryTransport()
 
441
        server = ChrootServer(backing_transport)
 
442
        server.setUp()
 
443
        server.tearDown()
 
444
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
445
 
 
446
    def test_get_url(self):
 
447
        backing_transport = MemoryTransport()
 
448
        server = ChrootServer(backing_transport)
 
449
        server.setUp()
 
450
        try:
 
451
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
452
        finally:
 
453
            server.tearDown()
 
454
 
 
455
 
 
456
class PathFilteringDecoratorTransportTest(TestCase):
 
457
    """Pathfilter decoration specific tests."""
 
458
 
 
459
    def test_abspath(self):
 
460
        # The abspath is always relative to the base of the backing transport.
 
461
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
462
            lambda x: x)
 
463
        server.setUp()
 
464
        transport = get_transport(server.get_url())
 
465
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
466
 
 
467
        subdir_transport = transport.clone('subdir')
 
468
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
469
        server.tearDown()
 
470
 
 
471
    def make_pf_transport(self, filter_func=None):
 
472
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
473
        
 
474
        :param filter_func: by default this will be a no-op function.  Use this
 
475
            parameter to override it."""
 
476
        if filter_func is None:
 
477
            filter_func = lambda x: x
 
478
        server = PathFilteringServer(
 
479
            get_transport('memory:///foo/bar/'), filter_func)
 
480
        server.setUp()
 
481
        self.addCleanup(server.tearDown)
 
482
        return get_transport(server.get_url())
 
483
 
 
484
    def test__filter(self):
 
485
        # _filter (with an identity func as filter_func) always returns
 
486
        # paths relative to the base of the backing transport.
 
487
        transport = self.make_pf_transport()
 
488
        self.assertEqual('foo', transport._filter('foo'))
 
489
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
 
492
        # The base of the pathfiltering transport is taken into account too.
 
493
        transport = transport.clone('subdir1/subdir2')
 
494
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
495
        self.assertEqual(
 
496
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
497
        self.assertEqual('subdir1', transport._filter('..'))
 
498
        self.assertEqual('', transport._filter('/'))
 
499
 
 
500
    def test_filter_invocation(self):
 
501
        filter_log = []
 
502
        def filter(path):
 
503
            filter_log.append(path)
 
504
            return path
 
505
        transport = self.make_pf_transport(filter)
 
506
        transport.has('abc')
 
507
        self.assertEqual(['abc'], filter_log)
 
508
        del filter_log[:]
 
509
        transport.clone('abc').has('xyz')
 
510
        self.assertEqual(['abc/xyz'], filter_log)
 
511
        del filter_log[:]
 
512
        transport.has('/abc')
 
513
        self.assertEqual(['abc'], filter_log)
 
514
 
 
515
    def test_clone(self):
 
516
        transport = self.make_pf_transport()
 
517
        # relpath from root and root path are the same
 
518
        relpath_cloned = transport.clone('foo')
 
519
        abspath_cloned = transport.clone('/foo')
 
520
        self.assertEqual(transport.server, relpath_cloned.server)
 
521
        self.assertEqual(transport.server, abspath_cloned.server)
 
522
 
 
523
    def test_url_preserves_pathfiltering(self):
 
524
        """Calling get_transport on a pathfiltered transport's base should
 
525
        produce a transport with exactly the same behaviour as the original
 
526
        pathfiltered transport.
 
527
 
 
528
        This is so that it is not possible to escape (accidentally or
 
529
        otherwise) the filtering by doing::
 
530
            url = filtered_transport.base
 
531
            parent_url = urlutils.join(url, '..')
 
532
            new_transport = get_transport(parent_url)
 
533
        """
 
534
        transport = self.make_pf_transport()
 
535
        new_transport = get_transport(transport.base)
 
536
        self.assertEqual(transport.server, new_transport.server)
 
537
        self.assertEqual(transport.base, new_transport.base)
 
538
 
 
539
 
 
540
class ReadonlyDecoratorTransportTest(TestCase):
 
541
    """Readonly decoration specific tests."""
 
542
 
 
543
    def test_local_parameters(self):
 
544
        import bzrlib.transport.readonly as readonly
 
545
        # connect to . in readonly mode
 
546
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
547
        self.assertEqual(True, transport.listable())
 
548
        self.assertEqual(True, transport.is_readonly())
 
549
 
 
550
    def test_http_parameters(self):
 
551
        from bzrlib.tests.http_server import HttpServer
 
552
        import bzrlib.transport.readonly as readonly
 
553
        # connect to '.' via http which is not listable
 
554
        server = HttpServer()
 
555
        self.start_server(server)
 
556
        transport = get_transport('readonly+' + server.get_url())
 
557
        self.failUnless(isinstance(transport,
 
558
                                   readonly.ReadonlyTransportDecorator))
 
559
        self.assertEqual(False, transport.listable())
 
560
        self.assertEqual(True, transport.is_readonly())
 
561
 
 
562
 
 
563
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
564
    """NFS decorator specific tests."""
 
565
 
 
566
    def get_nfs_transport(self, url):
 
567
        import bzrlib.transport.fakenfs as fakenfs
 
568
        # connect to url with nfs decoration
 
569
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
570
 
 
571
    def test_local_parameters(self):
 
572
        # the listable and is_readonly parameters
 
573
        # are not changed by the fakenfs decorator
 
574
        transport = self.get_nfs_transport('.')
 
575
        self.assertEqual(True, transport.listable())
 
576
        self.assertEqual(False, transport.is_readonly())
 
577
 
 
578
    def test_http_parameters(self):
 
579
        # the listable and is_readonly parameters
 
580
        # are not changed by the fakenfs decorator
 
581
        from bzrlib.tests.http_server import HttpServer
 
582
        # connect to '.' via http which is not listable
 
583
        server = HttpServer()
 
584
        self.start_server(server)
 
585
        transport = self.get_nfs_transport(server.get_url())
 
586
        self.assertIsInstance(
 
587
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
588
        self.assertEqual(False, transport.listable())
 
589
        self.assertEqual(True, transport.is_readonly())
 
590
 
 
591
    def test_fakenfs_server_default(self):
 
592
        # a FakeNFSServer() should bring up a local relpath server for itself
 
593
        import bzrlib.transport.fakenfs as fakenfs
 
594
        server = fakenfs.FakeNFSServer()
 
595
        self.start_server(server)
 
596
        # the url should be decorated appropriately
 
597
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
598
        # and we should be able to get a transport for it
 
599
        transport = get_transport(server.get_url())
 
600
        # which must be a FakeNFSTransportDecorator instance.
 
601
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
602
 
 
603
    def test_fakenfs_rename_semantics(self):
 
604
        # a FakeNFS transport must mangle the way rename errors occur to
 
605
        # look like NFS problems.
 
606
        transport = self.get_nfs_transport('.')
 
607
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
608
                        transport=transport)
 
609
        self.assertRaises(errors.ResourceBusy,
 
610
                          transport.rename, 'from', 'to')
 
611
 
 
612
 
 
613
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
614
    """Tests for simulation of VFAT restrictions"""
 
615
 
 
616
    def get_vfat_transport(self, url):
 
617
        """Return vfat-backed transport for test directory"""
 
618
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
619
        return FakeVFATTransportDecorator('vfat+' + url)
 
620
 
 
621
    def test_transport_creation(self):
 
622
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
623
        transport = self.get_vfat_transport('.')
 
624
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
625
 
 
626
    def test_transport_mkdir(self):
 
627
        transport = self.get_vfat_transport('.')
 
628
        transport.mkdir('HELLO')
 
629
        self.assertTrue(transport.has('hello'))
 
630
        self.assertTrue(transport.has('Hello'))
 
631
 
 
632
    def test_forbidden_chars(self):
 
633
        transport = self.get_vfat_transport('.')
 
634
        self.assertRaises(ValueError, transport.has, "<NU>")
 
635
 
 
636
 
 
637
class BadTransportHandler(Transport):
 
638
    def __init__(self, base_url):
 
639
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
640
 
 
641
 
 
642
class BackupTransportHandler(Transport):
 
643
    """Test transport that works as a backup for the BadTransportHandler"""
 
644
    pass
 
645
 
 
646
 
 
647
class TestTransportImplementation(TestCaseInTempDir):
 
648
    """Implementation verification for transports.
 
649
 
 
650
    To verify a transport we need a server factory, which is a callable
 
651
    that accepts no parameters and returns an implementation of
 
652
    bzrlib.transport.Server.
 
653
 
 
654
    That Server is then used to construct transport instances and test
 
655
    the transport via loopback activity.
 
656
 
 
657
    Currently this assumes that the Transport object is connected to the
 
658
    current working directory.  So that whatever is done
 
659
    through the transport, should show up in the working
 
660
    directory, and vice-versa. This is a bug, because its possible to have
 
661
    URL schemes which provide access to something that may not be
 
662
    result in storage on the local disk, i.e. due to file system limits, or
 
663
    due to it being a database or some other non-filesystem tool.
 
664
 
 
665
    This also tests to make sure that the functions work with both
 
666
    generators and lists (assuming iter(list) is effectively a generator)
 
667
    """
 
668
 
 
669
    def setUp(self):
 
670
        super(TestTransportImplementation, self).setUp()
 
671
        self._server = self.transport_server()
 
672
        self.start_server(self._server)
 
673
 
 
674
    def get_transport(self, relpath=None):
 
675
        """Return a connected transport to the local directory.
 
676
 
 
677
        :param relpath: a path relative to the base url.
 
678
        """
 
679
        base_url = self._server.get_url()
 
680
        url = self._adjust_url(base_url, relpath)
 
681
        # try getting the transport via the regular interface:
 
682
        t = get_transport(url)
 
683
        # vila--20070607 if the following are commented out the test suite
 
684
        # still pass. Is this really still needed or was it a forgotten
 
685
        # temporary fix ?
 
686
        if not isinstance(t, self.transport_class):
 
687
            # we did not get the correct transport class type. Override the
 
688
            # regular connection behaviour by direct construction.
 
689
            t = self.transport_class(url)
 
690
        return t
 
691
 
 
692
 
 
693
class TestLocalTransports(TestCase):
 
694
 
 
695
    def test_get_transport_from_abspath(self):
 
696
        here = osutils.abspath('.')
 
697
        t = get_transport(here)
 
698
        self.assertIsInstance(t, LocalTransport)
 
699
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
700
 
 
701
    def test_get_transport_from_relpath(self):
 
702
        here = osutils.abspath('.')
 
703
        t = get_transport('.')
 
704
        self.assertIsInstance(t, LocalTransport)
 
705
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
706
 
 
707
    def test_get_transport_from_local_url(self):
 
708
        here = osutils.abspath('.')
 
709
        here_url = urlutils.local_path_to_url(here) + '/'
 
710
        t = get_transport(here_url)
 
711
        self.assertIsInstance(t, LocalTransport)
 
712
        self.assertEquals(t.base, here_url)
 
713
 
 
714
    def test_local_abspath(self):
 
715
        here = osutils.abspath('.')
 
716
        t = get_transport(here)
 
717
        self.assertEquals(t.local_abspath(''), here)
 
718
 
 
719
 
 
720
class TestWin32LocalTransport(TestCase):
 
721
 
 
722
    def test_unc_clone_to_root(self):
 
723
        # Win32 UNC path like \\HOST\path
 
724
        # clone to root should stop at least at \\HOST part
 
725
        # not on \\
 
726
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
727
        for i in xrange(4):
 
728
            t = t.clone('..')
 
729
        self.assertEquals(t.base, 'file://HOST/')
 
730
        # make sure we reach the root
 
731
        t = t.clone('..')
 
732
        self.assertEquals(t.base, 'file://HOST/')
 
733
 
 
734
 
 
735
class TestConnectedTransport(TestCase):
 
736
    """Tests for connected to remote server transports"""
 
737
 
 
738
    def test_parse_url(self):
 
739
        t = ConnectedTransport('http://simple.example.com/home/source')
 
740
        self.assertEquals(t._host, 'simple.example.com')
 
741
        self.assertEquals(t._port, None)
 
742
        self.assertEquals(t._path, '/home/source/')
 
743
        self.failUnless(t._user is None)
 
744
        self.failUnless(t._password is None)
 
745
 
 
746
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
747
 
 
748
    def test_parse_url_with_at_in_user(self):
 
749
        # Bug 228058
 
750
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
751
        self.assertEquals(t._user, 'user@host.com')
 
752
 
 
753
    def test_parse_quoted_url(self):
 
754
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
755
        self.assertEquals(t._host, 'exAmple.com')
 
756
        self.assertEquals(t._port, 2222)
 
757
        self.assertEquals(t._user, 'robey')
 
758
        self.assertEquals(t._password, 'h@t')
 
759
        self.assertEquals(t._path, '/path/')
 
760
 
 
761
        # Base should not keep track of the password
 
762
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
763
 
 
764
    def test_parse_invalid_url(self):
 
765
        self.assertRaises(errors.InvalidURL,
 
766
                          ConnectedTransport,
 
767
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
768
 
 
769
    def test_relpath(self):
 
770
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
771
 
 
772
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
773
        self.assertRaises(errors.PathNotChild, t.relpath,
 
774
                          'http://user@host.com/abs/path/sub')
 
775
        self.assertRaises(errors.PathNotChild, t.relpath,
 
776
                          'sftp://user2@host.com/abs/path/sub')
 
777
        self.assertRaises(errors.PathNotChild, t.relpath,
 
778
                          'sftp://user@otherhost.com/abs/path/sub')
 
779
        self.assertRaises(errors.PathNotChild, t.relpath,
 
780
                          'sftp://user@host.com:33/abs/path/sub')
 
781
        # Make sure it works when we don't supply a username
 
782
        t = ConnectedTransport('sftp://host.com/abs/path')
 
783
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
784
 
 
785
        # Make sure it works when parts of the path will be url encoded
 
786
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
787
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
788
 
 
789
    def test_connection_sharing_propagate_credentials(self):
 
790
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
791
        self.assertEquals('user', t._user)
 
792
        self.assertEquals('host.com', t._host)
 
793
        self.assertIs(None, t._get_connection())
 
794
        self.assertIs(None, t._password)
 
795
        c = t.clone('subdir')
 
796
        self.assertIs(None, c._get_connection())
 
797
        self.assertIs(None, t._password)
 
798
 
 
799
        # Simulate the user entering a password
 
800
        password = 'secret'
 
801
        connection = object()
 
802
        t._set_connection(connection, password)
 
803
        self.assertIs(connection, t._get_connection())
 
804
        self.assertIs(password, t._get_credentials())
 
805
        self.assertIs(connection, c._get_connection())
 
806
        self.assertIs(password, c._get_credentials())
 
807
 
 
808
        # credentials can be updated
 
809
        new_password = 'even more secret'
 
810
        c._update_credentials(new_password)
 
811
        self.assertIs(connection, t._get_connection())
 
812
        self.assertIs(new_password, t._get_credentials())
 
813
        self.assertIs(connection, c._get_connection())
 
814
        self.assertIs(new_password, c._get_credentials())
 
815
 
 
816
 
 
817
class TestReusedTransports(TestCase):
 
818
    """Tests for transport reuse"""
 
819
 
 
820
    def test_reuse_same_transport(self):
 
821
        possible_transports = []
 
822
        t1 = get_transport('http://foo/',
 
823
                           possible_transports=possible_transports)
 
824
        self.assertEqual([t1], possible_transports)
 
825
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
826
        self.assertIs(t1, t2)
 
827
 
 
828
        # Also check that final '/' are handled correctly
 
829
        t3 = get_transport('http://foo/path/')
 
830
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
831
        self.assertIs(t3, t4)
 
832
 
 
833
        t5 = get_transport('http://foo/path')
 
834
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
835
        self.assertIs(t5, t6)
 
836
 
 
837
    def test_don_t_reuse_different_transport(self):
 
838
        t1 = get_transport('http://foo/path')
 
839
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
840
        self.assertIsNot(t1, t2)
 
841
 
 
842
 
 
843
class TestTransportTrace(TestCase):
 
844
 
 
845
    def test_get(self):
 
846
        transport = get_transport('trace+memory://')
 
847
        self.assertIsInstance(
 
848
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
849
 
 
850
    def test_clone_preserves_activity(self):
 
851
        transport = get_transport('trace+memory://')
 
852
        transport2 = transport.clone('.')
 
853
        self.assertTrue(transport is not transport2)
 
854
        self.assertTrue(transport._activity is transport2._activity)
 
855
 
 
856
    # the following specific tests are for the operations that have made use of
 
857
    # logging in tests; we could test every single operation but doing that
 
858
    # still won't cause a test failure when the top level Transport API
 
859
    # changes; so there is little return doing that.
 
860
    def test_get(self):
 
861
        transport = get_transport('trace+memory:///')
 
862
        transport.put_bytes('foo', 'barish')
 
863
        transport.get('foo')
 
864
        expected_result = []
 
865
        # put_bytes records the bytes, not the content to avoid memory
 
866
        # pressure.
 
867
        expected_result.append(('put_bytes', 'foo', 6, None))
 
868
        # get records the file name only.
 
869
        expected_result.append(('get', 'foo'))
 
870
        self.assertEqual(expected_result, transport._activity)
 
871
 
 
872
    def test_readv(self):
 
873
        transport = get_transport('trace+memory:///')
 
874
        transport.put_bytes('foo', 'barish')
 
875
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
876
            upper_limit=6))
 
877
        expected_result = []
 
878
        # put_bytes records the bytes, not the content to avoid memory
 
879
        # pressure.
 
880
        expected_result.append(('put_bytes', 'foo', 6, None))
 
881
        # readv records the supplied offset request
 
882
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
883
        self.assertEqual(expected_result, transport._activity)
 
884
 
 
885
 
 
886
class TestSSHConnections(tests.TestCaseWithTransport):
 
887
 
 
888
    def test_bzr_connect_to_bzr_ssh(self):
 
889
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
890
 
 
891
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
892
        """
 
893
        # This test actually causes a bzr instance to be invoked, which is very
 
894
        # expensive: it should be the only such test in the test suite.
 
895
        # A reasonable evolution for this would be to simply check inside
 
896
        # check_channel_exec_request that the command is appropriate, and then
 
897
        # satisfy requests in-process.
 
898
        self.requireFeature(features.paramiko)
 
899
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
900
        # override the interface (doesn't change self._vendor).
 
901
        # Note that this does encryption, so can be slow.
 
902
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
903
        from bzrlib.tests.stub_sftp import StubServer
 
904
 
 
905
        # Start an SSH server
 
906
        self.command_executed = []
 
907
        # XXX: This is horrible -- we define a really dumb SSH server that
 
908
        # executes commands, and manage the hooking up of stdin/out/err to the
 
909
        # SSH channel ourselves.  Surely this has already been implemented
 
910
        # elsewhere?
 
911
        started = []
 
912
        class StubSSHServer(StubServer):
 
913
 
 
914
            test = self
 
915
 
 
916
            def check_channel_exec_request(self, channel, command):
 
917
                self.test.command_executed.append(command)
 
918
                proc = subprocess.Popen(
 
919
                    command, shell=True, stdin=subprocess.PIPE,
 
920
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
921
 
 
922
                # XXX: horribly inefficient, not to mention ugly.
 
923
                # Start a thread for each of stdin/out/err, and relay bytes from
 
924
                # the subprocess to channel and vice versa.
 
925
                def ferry_bytes(read, write, close):
 
926
                    while True:
 
927
                        bytes = read(1)
 
928
                        if bytes == '':
 
929
                            close()
 
930
                            break
 
931
                        write(bytes)
 
932
 
 
933
                file_functions = [
 
934
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
935
                    (proc.stdout.read, channel.sendall, channel.close),
 
936
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
937
                started.append(proc)
 
938
                for read, write, close in file_functions:
 
939
                    t = threading.Thread(
 
940
                        target=ferry_bytes, args=(read, write, close))
 
941
                    t.start()
 
942
                    started.append(t)
 
943
 
 
944
                return True
 
945
 
 
946
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
947
        # We *don't* want to override the default SSH vendor: the detected one
 
948
        # is the one to use.
 
949
        self.start_server(ssh_server)
 
950
        port = ssh_server._listener.port
 
951
 
 
952
        if sys.platform == 'win32':
 
953
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
954
        else:
 
955
            bzr_remote_path = self.get_bzr_path()
 
956
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
957
 
 
958
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
959
        # variable is used to tell bzr what command to run on the remote end.
 
960
        path_to_branch = osutils.abspath('.')
 
961
        if sys.platform == 'win32':
 
962
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
963
            # prefix a '/' to get the right path.
 
964
            path_to_branch = '/' + path_to_branch
 
965
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
966
        t = get_transport(url)
 
967
        self.permit_url(t.base)
 
968
        t.mkdir('foo')
 
969
 
 
970
        self.assertEqual(
 
971
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
972
            self.command_executed)
 
973
        # Make sure to disconnect, so that the remote process can stop, and we
 
974
        # can cleanup. Then pause the test until everything is shutdown
 
975
        t._client._medium.disconnect()
 
976
        if not started:
 
977
            return
 
978
        # First wait for the subprocess
 
979
        started[0].wait()
 
980
        # And the rest are threads
 
981
        for t in started[1:]:
 
982
            t.join()