~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-11 04:02:41 UTC
  • mfrom: (5017.2.2 tariff)
  • Revision ID: pqm@pqm.ubuntu.com-20100211040241-w6n021dz0uus341n
(mbp) add import-tariff tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
 
23
 
 
24
from bzrlib import (
 
25
    errors,
 
26
    osutils,
 
27
    tests,
 
28
    transport,
 
29
    urlutils,
 
30
    )
 
31
from bzrlib.transport import (
 
32
    chroot,
 
33
    fakenfs,
 
34
    local,
 
35
    memory,
 
36
    pathfilter,
 
37
    readonly,
 
38
    )
 
39
from bzrlib.tests import features
 
40
 
 
41
 
 
42
# TODO: Should possibly split transport-specific tests into their own files.
 
43
 
 
44
 
 
45
class TestTransport(tests.TestCase):
 
46
    """Test the non transport-concrete class functionality."""
 
47
 
 
48
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
49
    # of try/finally -- vila 20100205
 
50
 
 
51
    def test__get_set_protocol_handlers(self):
 
52
        handlers = transport._get_protocol_handlers()
 
53
        self.assertNotEqual([], handlers.keys( ))
 
54
        try:
 
55
            transport._clear_protocol_handlers()
 
56
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
57
        finally:
 
58
            transport._set_protocol_handlers(handlers)
 
59
 
 
60
    def test_get_transport_modules(self):
 
61
        handlers = transport._get_protocol_handlers()
 
62
        # don't pollute the current handlers
 
63
        transport._clear_protocol_handlers()
 
64
        class SampleHandler(object):
 
65
            """I exist, isnt that enough?"""
 
66
        try:
 
67
            transport._clear_protocol_handlers()
 
68
            transport.register_transport_proto('foo')
 
69
            transport.register_lazy_transport('foo',
 
70
                                              'bzrlib.tests.test_transport',
 
71
                                              'TestTransport.SampleHandler')
 
72
            transport.register_transport_proto('bar')
 
73
            transport.register_lazy_transport('bar',
 
74
                                              'bzrlib.tests.test_transport',
 
75
                                              'TestTransport.SampleHandler')
 
76
            self.assertEqual([SampleHandler.__module__,
 
77
                              'bzrlib.transport.chroot',
 
78
                              'bzrlib.transport.pathfilter'],
 
79
                             transport._get_transport_modules())
 
80
        finally:
 
81
            transport._set_protocol_handlers(handlers)
 
82
 
 
83
    def test_transport_dependency(self):
 
84
        """Transport with missing dependency causes no error"""
 
85
        saved_handlers = transport._get_protocol_handlers()
 
86
        # don't pollute the current handlers
 
87
        transport._clear_protocol_handlers()
 
88
        try:
 
89
            transport.register_transport_proto('foo')
 
90
            transport.register_lazy_transport(
 
91
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
92
            try:
 
93
                transport.get_transport('foo://fooserver/foo')
 
94
            except errors.UnsupportedProtocol, e:
 
95
                e_str = str(e)
 
96
                self.assertEquals('Unsupported protocol'
 
97
                                  ' for url "foo://fooserver/foo":'
 
98
                                  ' Unable to import library "some_lib":'
 
99
                                  ' testing missing dependency', str(e))
 
100
            else:
 
101
                self.fail('Did not raise UnsupportedProtocol')
 
102
        finally:
 
103
            # restore original values
 
104
            transport._set_protocol_handlers(saved_handlers)
 
105
 
 
106
    def test_transport_fallback(self):
 
107
        """Transport with missing dependency causes no error"""
 
108
        saved_handlers = transport._get_protocol_handlers()
 
109
        try:
 
110
            transport._clear_protocol_handlers()
 
111
            transport.register_transport_proto('foo')
 
112
            transport.register_lazy_transport(
 
113
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
114
            transport.register_lazy_transport(
 
115
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
116
            t = transport.get_transport('foo://fooserver/foo')
 
117
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
118
        finally:
 
119
            transport._set_protocol_handlers(saved_handlers)
 
120
 
 
121
    def test_ssh_hints(self):
 
122
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
123
        try:
 
124
            transport.get_transport('ssh://fooserver/foo')
 
125
        except errors.UnsupportedProtocol, e:
 
126
            e_str = str(e)
 
127
            self.assertEquals('Unsupported protocol'
 
128
                              ' for url "ssh://fooserver/foo":'
 
129
                              ' bzr supports bzr+ssh to operate over ssh,'
 
130
                              ' use "bzr+ssh://fooserver/foo".',
 
131
                              str(e))
 
132
        else:
 
133
            self.fail('Did not raise UnsupportedProtocol')
 
134
 
 
135
    def test_LateReadError(self):
 
136
        """The LateReadError helper should raise on read()."""
 
137
        a_file = transport.LateReadError('a path')
 
138
        try:
 
139
            a_file.read()
 
140
        except errors.ReadError, error:
 
141
            self.assertEqual('a path', error.path)
 
142
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
143
        a_file.close()
 
144
 
 
145
    def test__combine_paths(self):
 
146
        t = transport.Transport('/')
 
147
        self.assertEqual('/home/sarah/project/foo',
 
148
                         t._combine_paths('/home/sarah', 'project/foo'))
 
149
        self.assertEqual('/etc',
 
150
                         t._combine_paths('/home/sarah', '../../etc'))
 
151
        self.assertEqual('/etc',
 
152
                         t._combine_paths('/home/sarah', '../../../etc'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '/etc'))
 
155
 
 
156
    def test_local_abspath_non_local_transport(self):
 
157
        # the base implementation should throw
 
158
        t = memory.MemoryTransport()
 
159
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
160
        self.assertEqual('memory:///t is not a local path.', str(e))
 
161
 
 
162
 
 
163
class TestCoalesceOffsets(tests.TestCase):
 
164
 
 
165
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
166
        coalesce = transport.Transport._coalesce_offsets
 
167
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
168
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
169
                            max_size=max_size))
 
170
        self.assertEqual(exp, out)
 
171
 
 
172
    def test_coalesce_empty(self):
 
173
        self.check([], [])
 
174
 
 
175
    def test_coalesce_simple(self):
 
176
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
177
 
 
178
    def test_coalesce_unrelated(self):
 
179
        self.check([(0, 10, [(0, 10)]),
 
180
                    (20, 10, [(0, 10)]),
 
181
                   ], [(0, 10), (20, 10)])
 
182
 
 
183
    def test_coalesce_unsorted(self):
 
184
        self.check([(20, 10, [(0, 10)]),
 
185
                    (0, 10, [(0, 10)]),
 
186
                   ], [(20, 10), (0, 10)])
 
187
 
 
188
    def test_coalesce_nearby(self):
 
189
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
190
                   [(0, 10), (10, 10)])
 
191
 
 
192
    def test_coalesce_overlapped(self):
 
193
        self.assertRaises(ValueError,
 
194
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
195
                        [(0, 10), (5, 10)])
 
196
 
 
197
    def test_coalesce_limit(self):
 
198
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
199
                              (30, 10), (40, 10)]),
 
200
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
201
                              (30, 10), (40, 10)]),
 
202
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
203
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
204
                       (90, 10), (100, 10)],
 
205
                    limit=5)
 
206
 
 
207
    def test_coalesce_no_limit(self):
 
208
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
209
                               (30, 10), (40, 10), (50, 10),
 
210
                               (60, 10), (70, 10), (80, 10),
 
211
                               (90, 10)]),
 
212
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
213
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
214
                       (90, 10), (100, 10)])
 
215
 
 
216
    def test_coalesce_fudge(self):
 
217
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
218
                    (100, 10, [(0, 10),]),
 
219
                   ], [(10, 10), (30, 10), (100, 10)],
 
220
                   fudge=10
 
221
                  )
 
222
    def test_coalesce_max_size(self):
 
223
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
224
                    (30, 50, [(0, 50)]),
 
225
                    # If one range is above max_size, it gets its own coalesced
 
226
                    # offset
 
227
                    (100, 80, [(0, 80),]),],
 
228
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
229
                   max_size=50
 
230
                  )
 
231
 
 
232
    def test_coalesce_no_max_size(self):
 
233
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
234
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
235
                  )
 
236
 
 
237
    def test_coalesce_default_limit(self):
 
238
        # By default we use a 100MB max size.
 
239
        ten_mb = 10*1024*1024
 
240
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
241
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
242
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
243
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
244
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
245
                   max_size=1*1024*1024*1024)
 
246
 
 
247
 
 
248
class TestMemoryServer(tests.TestCase):
 
249
 
 
250
    def test_create_server(self):
 
251
        server = memory.MemoryServer()
 
252
        server.start_server()
 
253
        url = server.get_url()
 
254
        self.assertTrue(url in transport.transport_list_registry)
 
255
        t = transport.get_transport(url)
 
256
        del t
 
257
        server.stop_server()
 
258
        self.assertFalse(url in transport.transport_list_registry)
 
259
        self.assertRaises(errors.UnsupportedProtocol,
 
260
                          transport.get_transport, url)
 
261
 
 
262
 
 
263
class TestMemoryTransport(tests.TestCase):
 
264
 
 
265
    def test_get_transport(self):
 
266
        memory.MemoryTransport()
 
267
 
 
268
    def test_clone(self):
 
269
        t = memory.MemoryTransport()
 
270
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
271
        self.assertEqual("memory:///", t.clone("/").base)
 
272
 
 
273
    def test_abspath(self):
 
274
        t = memory.MemoryTransport()
 
275
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
276
 
 
277
    def test_abspath_of_root(self):
 
278
        t = memory.MemoryTransport()
 
279
        self.assertEqual("memory:///", t.base)
 
280
        self.assertEqual("memory:///", t.abspath('/'))
 
281
 
 
282
    def test_abspath_of_relpath_starting_at_root(self):
 
283
        t = memory.MemoryTransport()
 
284
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
285
 
 
286
    def test_append_and_get(self):
 
287
        t = memory.MemoryTransport()
 
288
        t.append_bytes('path', 'content')
 
289
        self.assertEqual(t.get('path').read(), 'content')
 
290
        t.append_file('path', StringIO('content'))
 
291
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
292
 
 
293
    def test_put_and_get(self):
 
294
        t = memory.MemoryTransport()
 
295
        t.put_file('path', StringIO('content'))
 
296
        self.assertEqual(t.get('path').read(), 'content')
 
297
        t.put_bytes('path', 'content')
 
298
        self.assertEqual(t.get('path').read(), 'content')
 
299
 
 
300
    def test_append_without_dir_fails(self):
 
301
        t = memory.MemoryTransport()
 
302
        self.assertRaises(errors.NoSuchFile,
 
303
                          t.append_bytes, 'dir/path', 'content')
 
304
 
 
305
    def test_put_without_dir_fails(self):
 
306
        t = memory.MemoryTransport()
 
307
        self.assertRaises(errors.NoSuchFile,
 
308
                          t.put_file, 'dir/path', StringIO('content'))
 
309
 
 
310
    def test_get_missing(self):
 
311
        transport = memory.MemoryTransport()
 
312
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
313
 
 
314
    def test_has_missing(self):
 
315
        t = memory.MemoryTransport()
 
316
        self.assertEquals(False, t.has('foo'))
 
317
 
 
318
    def test_has_present(self):
 
319
        t = memory.MemoryTransport()
 
320
        t.append_bytes('foo', 'content')
 
321
        self.assertEquals(True, t.has('foo'))
 
322
 
 
323
    def test_list_dir(self):
 
324
        t = memory.MemoryTransport()
 
325
        t.put_bytes('foo', 'content')
 
326
        t.mkdir('dir')
 
327
        t.put_bytes('dir/subfoo', 'content')
 
328
        t.put_bytes('dirlike', 'content')
 
329
 
 
330
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
331
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
332
 
 
333
    def test_mkdir(self):
 
334
        t = memory.MemoryTransport()
 
335
        t.mkdir('dir')
 
336
        t.append_bytes('dir/path', 'content')
 
337
        self.assertEqual(t.get('dir/path').read(), 'content')
 
338
 
 
339
    def test_mkdir_missing_parent(self):
 
340
        t = memory.MemoryTransport()
 
341
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
342
 
 
343
    def test_mkdir_twice(self):
 
344
        t = memory.MemoryTransport()
 
345
        t.mkdir('dir')
 
346
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
347
 
 
348
    def test_parameters(self):
 
349
        t = memory.MemoryTransport()
 
350
        self.assertEqual(True, t.listable())
 
351
        self.assertEqual(False, t.is_readonly())
 
352
 
 
353
    def test_iter_files_recursive(self):
 
354
        t = memory.MemoryTransport()
 
355
        t.mkdir('dir')
 
356
        t.put_bytes('dir/foo', 'content')
 
357
        t.put_bytes('dir/bar', 'content')
 
358
        t.put_bytes('bar', 'content')
 
359
        paths = set(t.iter_files_recursive())
 
360
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
361
 
 
362
    def test_stat(self):
 
363
        t = memory.MemoryTransport()
 
364
        t.put_bytes('foo', 'content')
 
365
        t.put_bytes('bar', 'phowar')
 
366
        self.assertEqual(7, t.stat('foo').st_size)
 
367
        self.assertEqual(6, t.stat('bar').st_size)
 
368
 
 
369
 
 
370
class ChrootDecoratorTransportTest(tests.TestCase):
 
371
    """Chroot decoration specific tests."""
 
372
 
 
373
    def test_abspath(self):
 
374
        # The abspath is always relative to the chroot_url.
 
375
        server = chroot.ChrootServer(
 
376
            transport.get_transport('memory:///foo/bar/'))
 
377
        self.start_server(server)
 
378
        t = transport.get_transport(server.get_url())
 
379
        self.assertEqual(server.get_url(), t.abspath('/'))
 
380
 
 
381
        subdir_t = t.clone('subdir')
 
382
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
383
 
 
384
    def test_clone(self):
 
385
        server = chroot.ChrootServer(
 
386
            transport.get_transport('memory:///foo/bar/'))
 
387
        self.start_server(server)
 
388
        t = transport.get_transport(server.get_url())
 
389
        # relpath from root and root path are the same
 
390
        relpath_cloned = t.clone('foo')
 
391
        abspath_cloned = t.clone('/foo')
 
392
        self.assertEqual(server, relpath_cloned.server)
 
393
        self.assertEqual(server, abspath_cloned.server)
 
394
 
 
395
    def test_chroot_url_preserves_chroot(self):
 
396
        """Calling get_transport on a chroot transport's base should produce a
 
397
        transport with exactly the same behaviour as the original chroot
 
398
        transport.
 
399
 
 
400
        This is so that it is not possible to escape a chroot by doing::
 
401
            url = chroot_transport.base
 
402
            parent_url = urlutils.join(url, '..')
 
403
            new_t = transport.get_transport(parent_url)
 
404
        """
 
405
        server = chroot.ChrootServer(
 
406
            transport.get_transport('memory:///path/subpath'))
 
407
        self.start_server(server)
 
408
        t = transport.get_transport(server.get_url())
 
409
        new_t = transport.get_transport(t.base)
 
410
        self.assertEqual(t.server, new_t.server)
 
411
        self.assertEqual(t.base, new_t.base)
 
412
 
 
413
    def test_urljoin_preserves_chroot(self):
 
414
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
415
        URL that escapes the intended chroot.
 
416
 
 
417
        This is so that it is not possible to escape a chroot by doing::
 
418
            url = chroot_transport.base
 
419
            parent_url = urlutils.join(url, '..')
 
420
            new_t = transport.get_transport(parent_url)
 
421
        """
 
422
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
423
        self.start_server(server)
 
424
        t = transport.get_transport(server.get_url())
 
425
        self.assertRaises(
 
426
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
427
 
 
428
 
 
429
class TestChrootServer(tests.TestCase):
 
430
 
 
431
    def test_construct(self):
 
432
        backing_transport = memory.MemoryTransport()
 
433
        server = chroot.ChrootServer(backing_transport)
 
434
        self.assertEqual(backing_transport, server.backing_transport)
 
435
 
 
436
    def test_setUp(self):
 
437
        backing_transport = memory.MemoryTransport()
 
438
        server = chroot.ChrootServer(backing_transport)
 
439
        server.start_server()
 
440
        try:
 
441
            self.assertTrue(server.scheme
 
442
                            in transport._get_protocol_handlers().keys())
 
443
        finally:
 
444
            server.stop_server()
 
445
 
 
446
    def test_stop_server(self):
 
447
        backing_transport = memory.MemoryTransport()
 
448
        server = chroot.ChrootServer(backing_transport)
 
449
        server.start_server()
 
450
        server.stop_server()
 
451
        self.assertFalse(server.scheme
 
452
                         in transport._get_protocol_handlers().keys())
 
453
 
 
454
    def test_get_url(self):
 
455
        backing_transport = memory.MemoryTransport()
 
456
        server = chroot.ChrootServer(backing_transport)
 
457
        server.start_server()
 
458
        try:
 
459
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
460
        finally:
 
461
            server.stop_server()
 
462
 
 
463
 
 
464
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
465
    """Pathfilter decoration specific tests."""
 
466
 
 
467
    def test_abspath(self):
 
468
        # The abspath is always relative to the base of the backing transport.
 
469
        server = pathfilter.PathFilteringServer(
 
470
            transport.get_transport('memory:///foo/bar/'),
 
471
            lambda x: x)
 
472
        server.start_server()
 
473
        t = transport.get_transport(server.get_url())
 
474
        self.assertEqual(server.get_url(), t.abspath('/'))
 
475
 
 
476
        subdir_t = t.clone('subdir')
 
477
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
478
        server.stop_server()
 
479
 
 
480
    def make_pf_transport(self, filter_func=None):
 
481
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
482
        
 
483
        :param filter_func: by default this will be a no-op function.  Use this
 
484
            parameter to override it."""
 
485
        if filter_func is None:
 
486
            filter_func = lambda x: x
 
487
        server = pathfilter.PathFilteringServer(
 
488
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
489
        server.start_server()
 
490
        self.addCleanup(server.stop_server)
 
491
        return transport.get_transport(server.get_url())
 
492
 
 
493
    def test__filter(self):
 
494
        # _filter (with an identity func as filter_func) always returns
 
495
        # paths relative to the base of the backing transport.
 
496
        t = self.make_pf_transport()
 
497
        self.assertEqual('foo', t._filter('foo'))
 
498
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
499
        self.assertEqual('', t._filter('..'))
 
500
        self.assertEqual('', t._filter('/'))
 
501
        # The base of the pathfiltering transport is taken into account too.
 
502
        t = t.clone('subdir1/subdir2')
 
503
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
504
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
505
        self.assertEqual('subdir1', t._filter('..'))
 
506
        self.assertEqual('', t._filter('/'))
 
507
 
 
508
    def test_filter_invocation(self):
 
509
        filter_log = []
 
510
        def filter(path):
 
511
            filter_log.append(path)
 
512
            return path
 
513
        t = self.make_pf_transport(filter)
 
514
        t.has('abc')
 
515
        self.assertEqual(['abc'], filter_log)
 
516
        del filter_log[:]
 
517
        t.clone('abc').has('xyz')
 
518
        self.assertEqual(['abc/xyz'], filter_log)
 
519
        del filter_log[:]
 
520
        t.has('/abc')
 
521
        self.assertEqual(['abc'], filter_log)
 
522
 
 
523
    def test_clone(self):
 
524
        t = self.make_pf_transport()
 
525
        # relpath from root and root path are the same
 
526
        relpath_cloned = t.clone('foo')
 
527
        abspath_cloned = t.clone('/foo')
 
528
        self.assertEqual(t.server, relpath_cloned.server)
 
529
        self.assertEqual(t.server, abspath_cloned.server)
 
530
 
 
531
    def test_url_preserves_pathfiltering(self):
 
532
        """Calling get_transport on a pathfiltered transport's base should
 
533
        produce a transport with exactly the same behaviour as the original
 
534
        pathfiltered transport.
 
535
 
 
536
        This is so that it is not possible to escape (accidentally or
 
537
        otherwise) the filtering by doing::
 
538
            url = filtered_transport.base
 
539
            parent_url = urlutils.join(url, '..')
 
540
            new_t = transport.get_transport(parent_url)
 
541
        """
 
542
        t = self.make_pf_transport()
 
543
        new_t = transport.get_transport(t.base)
 
544
        self.assertEqual(t.server, new_t.server)
 
545
        self.assertEqual(t.base, new_t.base)
 
546
 
 
547
 
 
548
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
549
    """Readonly decoration specific tests."""
 
550
 
 
551
    def test_local_parameters(self):
 
552
        # connect to . in readonly mode
 
553
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
554
        self.assertEqual(True, t.listable())
 
555
        self.assertEqual(True, t.is_readonly())
 
556
 
 
557
    def test_http_parameters(self):
 
558
        from bzrlib.tests.http_server import HttpServer
 
559
        # connect to '.' via http which is not listable
 
560
        server = HttpServer()
 
561
        self.start_server(server)
 
562
        t = transport.get_transport('readonly+' + server.get_url())
 
563
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
564
        self.assertEqual(False, t.listable())
 
565
        self.assertEqual(True, t.is_readonly())
 
566
 
 
567
 
 
568
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
569
    """NFS decorator specific tests."""
 
570
 
 
571
    def get_nfs_transport(self, url):
 
572
        # connect to url with nfs decoration
 
573
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
574
 
 
575
    def test_local_parameters(self):
 
576
        # the listable and is_readonly parameters
 
577
        # are not changed by the fakenfs decorator
 
578
        t = self.get_nfs_transport('.')
 
579
        self.assertEqual(True, t.listable())
 
580
        self.assertEqual(False, t.is_readonly())
 
581
 
 
582
    def test_http_parameters(self):
 
583
        # the listable and is_readonly parameters
 
584
        # are not changed by the fakenfs decorator
 
585
        from bzrlib.tests.http_server import HttpServer
 
586
        # connect to '.' via http which is not listable
 
587
        server = HttpServer()
 
588
        self.start_server(server)
 
589
        t = self.get_nfs_transport(server.get_url())
 
590
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
591
        self.assertEqual(False, t.listable())
 
592
        self.assertEqual(True, t.is_readonly())
 
593
 
 
594
    def test_fakenfs_server_default(self):
 
595
        # a FakeNFSServer() should bring up a local relpath server for itself
 
596
        server = fakenfs.FakeNFSServer()
 
597
        self.start_server(server)
 
598
        # the url should be decorated appropriately
 
599
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
600
        # and we should be able to get a transport for it
 
601
        t = transport.get_transport(server.get_url())
 
602
        # which must be a FakeNFSTransportDecorator instance.
 
603
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
604
 
 
605
    def test_fakenfs_rename_semantics(self):
 
606
        # a FakeNFS transport must mangle the way rename errors occur to
 
607
        # look like NFS problems.
 
608
        t = self.get_nfs_transport('.')
 
609
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
610
                        transport=t)
 
611
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
612
 
 
613
 
 
614
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
615
    """Tests for simulation of VFAT restrictions"""
 
616
 
 
617
    def get_vfat_transport(self, url):
 
618
        """Return vfat-backed transport for test directory"""
 
619
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
620
        return FakeVFATTransportDecorator('vfat+' + url)
 
621
 
 
622
    def test_transport_creation(self):
 
623
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
624
        t = self.get_vfat_transport('.')
 
625
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
626
 
 
627
    def test_transport_mkdir(self):
 
628
        t = self.get_vfat_transport('.')
 
629
        t.mkdir('HELLO')
 
630
        self.assertTrue(t.has('hello'))
 
631
        self.assertTrue(t.has('Hello'))
 
632
 
 
633
    def test_forbidden_chars(self):
 
634
        t = self.get_vfat_transport('.')
 
635
        self.assertRaises(ValueError, t.has, "<NU>")
 
636
 
 
637
 
 
638
class BadTransportHandler(transport.Transport):
 
639
    def __init__(self, base_url):
 
640
        raise errors.DependencyNotPresent('some_lib',
 
641
                                          'testing missing dependency')
 
642
 
 
643
 
 
644
class BackupTransportHandler(transport.Transport):
 
645
    """Test transport that works as a backup for the BadTransportHandler"""
 
646
    pass
 
647
 
 
648
 
 
649
class TestTransportImplementation(tests.TestCaseInTempDir):
 
650
    """Implementation verification for transports.
 
651
 
 
652
    To verify a transport we need a server factory, which is a callable
 
653
    that accepts no parameters and returns an implementation of
 
654
    bzrlib.transport.Server.
 
655
 
 
656
    That Server is then used to construct transport instances and test
 
657
    the transport via loopback activity.
 
658
 
 
659
    Currently this assumes that the Transport object is connected to the
 
660
    current working directory.  So that whatever is done
 
661
    through the transport, should show up in the working
 
662
    directory, and vice-versa. This is a bug, because its possible to have
 
663
    URL schemes which provide access to something that may not be
 
664
    result in storage on the local disk, i.e. due to file system limits, or
 
665
    due to it being a database or some other non-filesystem tool.
 
666
 
 
667
    This also tests to make sure that the functions work with both
 
668
    generators and lists (assuming iter(list) is effectively a generator)
 
669
    """
 
670
 
 
671
    def setUp(self):
 
672
        super(TestTransportImplementation, self).setUp()
 
673
        self._server = self.transport_server()
 
674
        self.start_server(self._server)
 
675
 
 
676
    def get_transport(self, relpath=None):
 
677
        """Return a connected transport to the local directory.
 
678
 
 
679
        :param relpath: a path relative to the base url.
 
680
        """
 
681
        base_url = self._server.get_url()
 
682
        url = self._adjust_url(base_url, relpath)
 
683
        # try getting the transport via the regular interface:
 
684
        t = transport.get_transport(url)
 
685
        # vila--20070607 if the following are commented out the test suite
 
686
        # still pass. Is this really still needed or was it a forgotten
 
687
        # temporary fix ?
 
688
        if not isinstance(t, self.transport_class):
 
689
            # we did not get the correct transport class type. Override the
 
690
            # regular connection behaviour by direct construction.
 
691
            t = self.transport_class(url)
 
692
        return t
 
693
 
 
694
 
 
695
class TestLocalTransports(tests.TestCase):
 
696
 
 
697
    def test_get_transport_from_abspath(self):
 
698
        here = osutils.abspath('.')
 
699
        t = transport.get_transport(here)
 
700
        self.assertIsInstance(t, local.LocalTransport)
 
701
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
702
 
 
703
    def test_get_transport_from_relpath(self):
 
704
        here = osutils.abspath('.')
 
705
        t = transport.get_transport('.')
 
706
        self.assertIsInstance(t, local.LocalTransport)
 
707
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
708
 
 
709
    def test_get_transport_from_local_url(self):
 
710
        here = osutils.abspath('.')
 
711
        here_url = urlutils.local_path_to_url(here) + '/'
 
712
        t = transport.get_transport(here_url)
 
713
        self.assertIsInstance(t, local.LocalTransport)
 
714
        self.assertEquals(t.base, here_url)
 
715
 
 
716
    def test_local_abspath(self):
 
717
        here = osutils.abspath('.')
 
718
        t = transport.get_transport(here)
 
719
        self.assertEquals(t.local_abspath(''), here)
 
720
 
 
721
 
 
722
class TestWin32LocalTransport(tests.TestCase):
 
723
 
 
724
    def test_unc_clone_to_root(self):
 
725
        # Win32 UNC path like \\HOST\path
 
726
        # clone to root should stop at least at \\HOST part
 
727
        # not on \\
 
728
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
729
        for i in xrange(4):
 
730
            t = t.clone('..')
 
731
        self.assertEquals(t.base, 'file://HOST/')
 
732
        # make sure we reach the root
 
733
        t = t.clone('..')
 
734
        self.assertEquals(t.base, 'file://HOST/')
 
735
 
 
736
 
 
737
class TestConnectedTransport(tests.TestCase):
 
738
    """Tests for connected to remote server transports"""
 
739
 
 
740
    def test_parse_url(self):
 
741
        t = transport.ConnectedTransport(
 
742
            'http://simple.example.com/home/source')
 
743
        self.assertEquals(t._host, 'simple.example.com')
 
744
        self.assertEquals(t._port, None)
 
745
        self.assertEquals(t._path, '/home/source/')
 
746
        self.failUnless(t._user is None)
 
747
        self.failUnless(t._password is None)
 
748
 
 
749
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
750
 
 
751
    def test_parse_url_with_at_in_user(self):
 
752
        # Bug 228058
 
753
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
754
        self.assertEquals(t._user, 'user@host.com')
 
755
 
 
756
    def test_parse_quoted_url(self):
 
757
        t = transport.ConnectedTransport(
 
758
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
759
        self.assertEquals(t._host, 'exAmple.com')
 
760
        self.assertEquals(t._port, 2222)
 
761
        self.assertEquals(t._user, 'robey')
 
762
        self.assertEquals(t._password, 'h@t')
 
763
        self.assertEquals(t._path, '/path/')
 
764
 
 
765
        # Base should not keep track of the password
 
766
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
767
 
 
768
    def test_parse_invalid_url(self):
 
769
        self.assertRaises(errors.InvalidURL,
 
770
                          transport.ConnectedTransport,
 
771
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
772
 
 
773
    def test_relpath(self):
 
774
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
775
 
 
776
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
777
        self.assertRaises(errors.PathNotChild, t.relpath,
 
778
                          'http://user@host.com/abs/path/sub')
 
779
        self.assertRaises(errors.PathNotChild, t.relpath,
 
780
                          'sftp://user2@host.com/abs/path/sub')
 
781
        self.assertRaises(errors.PathNotChild, t.relpath,
 
782
                          'sftp://user@otherhost.com/abs/path/sub')
 
783
        self.assertRaises(errors.PathNotChild, t.relpath,
 
784
                          'sftp://user@host.com:33/abs/path/sub')
 
785
        # Make sure it works when we don't supply a username
 
786
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
787
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
788
 
 
789
        # Make sure it works when parts of the path will be url encoded
 
790
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
791
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
792
 
 
793
    def test_connection_sharing_propagate_credentials(self):
 
794
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
795
        self.assertEquals('user', t._user)
 
796
        self.assertEquals('host.com', t._host)
 
797
        self.assertIs(None, t._get_connection())
 
798
        self.assertIs(None, t._password)
 
799
        c = t.clone('subdir')
 
800
        self.assertIs(None, c._get_connection())
 
801
        self.assertIs(None, t._password)
 
802
 
 
803
        # Simulate the user entering a password
 
804
        password = 'secret'
 
805
        connection = object()
 
806
        t._set_connection(connection, password)
 
807
        self.assertIs(connection, t._get_connection())
 
808
        self.assertIs(password, t._get_credentials())
 
809
        self.assertIs(connection, c._get_connection())
 
810
        self.assertIs(password, c._get_credentials())
 
811
 
 
812
        # credentials can be updated
 
813
        new_password = 'even more secret'
 
814
        c._update_credentials(new_password)
 
815
        self.assertIs(connection, t._get_connection())
 
816
        self.assertIs(new_password, t._get_credentials())
 
817
        self.assertIs(connection, c._get_connection())
 
818
        self.assertIs(new_password, c._get_credentials())
 
819
 
 
820
 
 
821
class TestReusedTransports(tests.TestCase):
 
822
    """Tests for transport reuse"""
 
823
 
 
824
    def test_reuse_same_transport(self):
 
825
        possible_transports = []
 
826
        t1 = transport.get_transport('http://foo/',
 
827
                                     possible_transports=possible_transports)
 
828
        self.assertEqual([t1], possible_transports)
 
829
        t2 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=[t1])
 
831
        self.assertIs(t1, t2)
 
832
 
 
833
        # Also check that final '/' are handled correctly
 
834
        t3 = transport.get_transport('http://foo/path/')
 
835
        t4 = transport.get_transport('http://foo/path',
 
836
                                     possible_transports=[t3])
 
837
        self.assertIs(t3, t4)
 
838
 
 
839
        t5 = transport.get_transport('http://foo/path')
 
840
        t6 = transport.get_transport('http://foo/path/',
 
841
                                     possible_transports=[t5])
 
842
        self.assertIs(t5, t6)
 
843
 
 
844
    def test_don_t_reuse_different_transport(self):
 
845
        t1 = transport.get_transport('http://foo/path')
 
846
        t2 = transport.get_transport('http://bar/path',
 
847
                                     possible_transports=[t1])
 
848
        self.assertIsNot(t1, t2)
 
849
 
 
850
 
 
851
class TestTransportTrace(tests.TestCase):
 
852
 
 
853
    def test_get(self):
 
854
        t = transport.get_transport('trace+memory://')
 
855
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
856
 
 
857
    def test_clone_preserves_activity(self):
 
858
        t = transport.get_transport('trace+memory://')
 
859
        t2 = t.clone('.')
 
860
        self.assertTrue(t is not t2)
 
861
        self.assertTrue(t._activity is t2._activity)
 
862
 
 
863
    # the following specific tests are for the operations that have made use of
 
864
    # logging in tests; we could test every single operation but doing that
 
865
    # still won't cause a test failure when the top level Transport API
 
866
    # changes; so there is little return doing that.
 
867
    def test_get(self):
 
868
        t = transport.get_transport('trace+memory:///')
 
869
        t.put_bytes('foo', 'barish')
 
870
        t.get('foo')
 
871
        expected_result = []
 
872
        # put_bytes records the bytes, not the content to avoid memory
 
873
        # pressure.
 
874
        expected_result.append(('put_bytes', 'foo', 6, None))
 
875
        # get records the file name only.
 
876
        expected_result.append(('get', 'foo'))
 
877
        self.assertEqual(expected_result, t._activity)
 
878
 
 
879
    def test_readv(self):
 
880
        t = transport.get_transport('trace+memory:///')
 
881
        t.put_bytes('foo', 'barish')
 
882
        list(t.readv('foo', [(0, 1), (3, 2)],
 
883
                     adjust_for_latency=True, upper_limit=6))
 
884
        expected_result = []
 
885
        # put_bytes records the bytes, not the content to avoid memory
 
886
        # pressure.
 
887
        expected_result.append(('put_bytes', 'foo', 6, None))
 
888
        # readv records the supplied offset request
 
889
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
890
        self.assertEqual(expected_result, t._activity)
 
891
 
 
892
 
 
893
class TestSSHConnections(tests.TestCaseWithTransport):
 
894
 
 
895
    def test_bzr_connect_to_bzr_ssh(self):
 
896
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
897
 
 
898
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
899
        """
 
900
        # This test actually causes a bzr instance to be invoked, which is very
 
901
        # expensive: it should be the only such test in the test suite.
 
902
        # A reasonable evolution for this would be to simply check inside
 
903
        # check_channel_exec_request that the command is appropriate, and then
 
904
        # satisfy requests in-process.
 
905
        self.requireFeature(features.paramiko)
 
906
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
907
        # override the interface (doesn't change self._vendor).
 
908
        # Note that this does encryption, so can be slow.
 
909
        from bzrlib.tests import stub_sftp
 
910
 
 
911
        # Start an SSH server
 
912
        self.command_executed = []
 
913
        # XXX: This is horrible -- we define a really dumb SSH server that
 
914
        # executes commands, and manage the hooking up of stdin/out/err to the
 
915
        # SSH channel ourselves.  Surely this has already been implemented
 
916
        # elsewhere?
 
917
        started = []
 
918
        class StubSSHServer(stub_sftp.StubServer):
 
919
 
 
920
            test = self
 
921
 
 
922
            def check_channel_exec_request(self, channel, command):
 
923
                self.test.command_executed.append(command)
 
924
                proc = subprocess.Popen(
 
925
                    command, shell=True, stdin=subprocess.PIPE,
 
926
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
927
 
 
928
                # XXX: horribly inefficient, not to mention ugly.
 
929
                # Start a thread for each of stdin/out/err, and relay bytes from
 
930
                # the subprocess to channel and vice versa.
 
931
                def ferry_bytes(read, write, close):
 
932
                    while True:
 
933
                        bytes = read(1)
 
934
                        if bytes == '':
 
935
                            close()
 
936
                            break
 
937
                        write(bytes)
 
938
 
 
939
                file_functions = [
 
940
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
941
                    (proc.stdout.read, channel.sendall, channel.close),
 
942
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
943
                started.append(proc)
 
944
                for read, write, close in file_functions:
 
945
                    t = threading.Thread(
 
946
                        target=ferry_bytes, args=(read, write, close))
 
947
                    t.start()
 
948
                    started.append(t)
 
949
 
 
950
                return True
 
951
 
 
952
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
953
        # We *don't* want to override the default SSH vendor: the detected one
 
954
        # is the one to use.
 
955
        self.start_server(ssh_server)
 
956
        port = ssh_server._listener.port
 
957
 
 
958
        if sys.platform == 'win32':
 
959
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
960
        else:
 
961
            bzr_remote_path = self.get_bzr_path()
 
962
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
963
 
 
964
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
965
        # variable is used to tell bzr what command to run on the remote end.
 
966
        path_to_branch = osutils.abspath('.')
 
967
        if sys.platform == 'win32':
 
968
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
969
            # prefix a '/' to get the right path.
 
970
            path_to_branch = '/' + path_to_branch
 
971
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
972
        t = transport.get_transport(url)
 
973
        self.permit_url(t.base)
 
974
        t.mkdir('foo')
 
975
 
 
976
        self.assertEqual(
 
977
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
978
            self.command_executed)
 
979
        # Make sure to disconnect, so that the remote process can stop, and we
 
980
        # can cleanup. Then pause the test until everything is shutdown
 
981
        t._client._medium.disconnect()
 
982
        if not started:
 
983
            return
 
984
        # First wait for the subprocess
 
985
        started[0].wait()
 
986
        # And the rest are threads
 
987
        for t in started[1:]:
 
988
            t.join()