~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-07-11 02:46:35 UTC
  • mfrom: (6017.1.2 test-isolation-speed)
  • Revision ID: pqm@pqm.ubuntu.com-20110711024635-f39c8kz23s347m1t
(spiv) Speed up TestCaseWithMemoryTransport._check_safety_net by reading the
 dirstate file directly rather than using WorkingTree.open(). (Andrew
 Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
import subprocess
 
20
import sys
 
21
import threading
 
22
 
 
23
from bzrlib import (
 
24
    errors,
 
25
    osutils,
 
26
    tests,
 
27
    transport,
 
28
    urlutils,
 
29
    )
 
30
from bzrlib.transport import (
 
31
    chroot,
 
32
    fakenfs,
 
33
    http,
 
34
    local,
 
35
    memory,
 
36
    pathfilter,
 
37
    readonly,
 
38
    )
 
39
from bzrlib.tests import (
 
40
    features,
 
41
    test_server,
 
42
    )
 
43
 
 
44
 
 
45
# TODO: Should possibly split transport-specific tests into their own files.
 
46
 
 
47
 
 
48
class TestTransport(tests.TestCase):
 
49
    """Test the non transport-concrete class functionality."""
 
50
 
 
51
    def test__get_set_protocol_handlers(self):
 
52
        handlers = transport._get_protocol_handlers()
 
53
        self.assertNotEqual([], handlers.keys())
 
54
        transport._clear_protocol_handlers()
 
55
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
56
        self.assertEqual([], transport._get_protocol_handlers().keys())
 
57
 
 
58
    def test_get_transport_modules(self):
 
59
        handlers = transport._get_protocol_handlers()
 
60
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
61
        # don't pollute the current handlers
 
62
        transport._clear_protocol_handlers()
 
63
 
 
64
        class SampleHandler(object):
 
65
            """I exist, isnt that enough?"""
 
66
        transport._clear_protocol_handlers()
 
67
        transport.register_transport_proto('foo')
 
68
        transport.register_lazy_transport('foo',
 
69
                                            'bzrlib.tests.test_transport',
 
70
                                            'TestTransport.SampleHandler')
 
71
        transport.register_transport_proto('bar')
 
72
        transport.register_lazy_transport('bar',
 
73
                                            'bzrlib.tests.test_transport',
 
74
                                            'TestTransport.SampleHandler')
 
75
        self.assertEqual([SampleHandler.__module__,
 
76
                            'bzrlib.transport.chroot',
 
77
                            'bzrlib.transport.pathfilter'],
 
78
                            transport._get_transport_modules())
 
79
 
 
80
    def test_transport_dependency(self):
 
81
        """Transport with missing dependency causes no error"""
 
82
        saved_handlers = transport._get_protocol_handlers()
 
83
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
84
        # don't pollute the current handlers
 
85
        transport._clear_protocol_handlers()
 
86
        transport.register_transport_proto('foo')
 
87
        transport.register_lazy_transport(
 
88
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
89
        try:
 
90
            transport.get_transport('foo://fooserver/foo')
 
91
        except errors.UnsupportedProtocol, e:
 
92
            e_str = str(e)
 
93
            self.assertEquals('Unsupported protocol'
 
94
                                ' for url "foo://fooserver/foo":'
 
95
                                ' Unable to import library "some_lib":'
 
96
                                ' testing missing dependency', str(e))
 
97
        else:
 
98
            self.fail('Did not raise UnsupportedProtocol')
 
99
 
 
100
    def test_transport_fallback(self):
 
101
        """Transport with missing dependency causes no error"""
 
102
        saved_handlers = transport._get_protocol_handlers()
 
103
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
104
        transport._clear_protocol_handlers()
 
105
        transport.register_transport_proto('foo')
 
106
        transport.register_lazy_transport(
 
107
            'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
108
        transport.register_lazy_transport(
 
109
            'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
110
        t = transport.get_transport('foo://fooserver/foo')
 
111
        self.assertTrue(isinstance(t, BackupTransportHandler))
 
112
 
 
113
    def test_ssh_hints(self):
 
114
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
115
        try:
 
116
            transport.get_transport('ssh://fooserver/foo')
 
117
        except errors.UnsupportedProtocol, e:
 
118
            e_str = str(e)
 
119
            self.assertEquals('Unsupported protocol'
 
120
                              ' for url "ssh://fooserver/foo":'
 
121
                              ' bzr supports bzr+ssh to operate over ssh,'
 
122
                              ' use "bzr+ssh://fooserver/foo".',
 
123
                              str(e))
 
124
        else:
 
125
            self.fail('Did not raise UnsupportedProtocol')
 
126
 
 
127
    def test_LateReadError(self):
 
128
        """The LateReadError helper should raise on read()."""
 
129
        a_file = transport.LateReadError('a path')
 
130
        try:
 
131
            a_file.read()
 
132
        except errors.ReadError, error:
 
133
            self.assertEqual('a path', error.path)
 
134
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
135
        a_file.close()
 
136
 
 
137
    def test__combine_paths(self):
 
138
        t = transport.Transport('/')
 
139
        self.assertEqual('/home/sarah/project/foo',
 
140
                         t._combine_paths('/home/sarah', 'project/foo'))
 
141
        self.assertEqual('/etc',
 
142
                         t._combine_paths('/home/sarah', '../../etc'))
 
143
        self.assertEqual('/etc',
 
144
                         t._combine_paths('/home/sarah', '../../../etc'))
 
145
        self.assertEqual('/etc',
 
146
                         t._combine_paths('/home/sarah', '/etc'))
 
147
 
 
148
    def test_local_abspath_non_local_transport(self):
 
149
        # the base implementation should throw
 
150
        t = memory.MemoryTransport()
 
151
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
152
        self.assertEqual('memory:///t is not a local path.', str(e))
 
153
 
 
154
 
 
155
class TestCoalesceOffsets(tests.TestCase):
 
156
 
 
157
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
158
        coalesce = transport.Transport._coalesce_offsets
 
159
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
160
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
161
                            max_size=max_size))
 
162
        self.assertEqual(exp, out)
 
163
 
 
164
    def test_coalesce_empty(self):
 
165
        self.check([], [])
 
166
 
 
167
    def test_coalesce_simple(self):
 
168
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
169
 
 
170
    def test_coalesce_unrelated(self):
 
171
        self.check([(0, 10, [(0, 10)]),
 
172
                    (20, 10, [(0, 10)]),
 
173
                   ], [(0, 10), (20, 10)])
 
174
 
 
175
    def test_coalesce_unsorted(self):
 
176
        self.check([(20, 10, [(0, 10)]),
 
177
                    (0, 10, [(0, 10)]),
 
178
                   ], [(20, 10), (0, 10)])
 
179
 
 
180
    def test_coalesce_nearby(self):
 
181
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
182
                   [(0, 10), (10, 10)])
 
183
 
 
184
    def test_coalesce_overlapped(self):
 
185
        self.assertRaises(ValueError,
 
186
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
187
                        [(0, 10), (5, 10)])
 
188
 
 
189
    def test_coalesce_limit(self):
 
190
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
191
                              (30, 10), (40, 10)]),
 
192
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
193
                              (30, 10), (40, 10)]),
 
194
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
195
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
196
                       (90, 10), (100, 10)],
 
197
                    limit=5)
 
198
 
 
199
    def test_coalesce_no_limit(self):
 
200
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
201
                               (30, 10), (40, 10), (50, 10),
 
202
                               (60, 10), (70, 10), (80, 10),
 
203
                               (90, 10)]),
 
204
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
205
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
206
                       (90, 10), (100, 10)])
 
207
 
 
208
    def test_coalesce_fudge(self):
 
209
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
210
                    (100, 10, [(0, 10)]),
 
211
                   ], [(10, 10), (30, 10), (100, 10)],
 
212
                   fudge=10)
 
213
 
 
214
    def test_coalesce_max_size(self):
 
215
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
216
                    (30, 50, [(0, 50)]),
 
217
                    # If one range is above max_size, it gets its own coalesced
 
218
                    # offset
 
219
                    (100, 80, [(0, 80)]),],
 
220
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
221
                   max_size=50)
 
222
 
 
223
    def test_coalesce_no_max_size(self):
 
224
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
 
225
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
226
                  )
 
227
 
 
228
    def test_coalesce_default_limit(self):
 
229
        # By default we use a 100MB max size.
 
230
        ten_mb = 10 * 1024 * 1024
 
231
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
 
232
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
233
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
234
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
235
                   [(i * ten_mb, ten_mb) for i in range(11)],
 
236
                   max_size=1*1024*1024*1024)
 
237
 
 
238
 
 
239
class TestMemoryServer(tests.TestCase):
 
240
 
 
241
    def test_create_server(self):
 
242
        server = memory.MemoryServer()
 
243
        server.start_server()
 
244
        url = server.get_url()
 
245
        self.assertTrue(url in transport.transport_list_registry)
 
246
        t = transport.get_transport(url)
 
247
        del t
 
248
        server.stop_server()
 
249
        self.assertFalse(url in transport.transport_list_registry)
 
250
        self.assertRaises(errors.UnsupportedProtocol,
 
251
                          transport.get_transport, url)
 
252
 
 
253
 
 
254
class TestMemoryTransport(tests.TestCase):
 
255
 
 
256
    def test_get_transport(self):
 
257
        memory.MemoryTransport()
 
258
 
 
259
    def test_clone(self):
 
260
        t = memory.MemoryTransport()
 
261
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
262
        self.assertEqual("memory:///", t.clone("/").base)
 
263
 
 
264
    def test_abspath(self):
 
265
        t = memory.MemoryTransport()
 
266
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
267
 
 
268
    def test_abspath_of_root(self):
 
269
        t = memory.MemoryTransport()
 
270
        self.assertEqual("memory:///", t.base)
 
271
        self.assertEqual("memory:///", t.abspath('/'))
 
272
 
 
273
    def test_abspath_of_relpath_starting_at_root(self):
 
274
        t = memory.MemoryTransport()
 
275
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
276
 
 
277
    def test_append_and_get(self):
 
278
        t = memory.MemoryTransport()
 
279
        t.append_bytes('path', 'content')
 
280
        self.assertEqual(t.get('path').read(), 'content')
 
281
        t.append_file('path', StringIO('content'))
 
282
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
283
 
 
284
    def test_put_and_get(self):
 
285
        t = memory.MemoryTransport()
 
286
        t.put_file('path', StringIO('content'))
 
287
        self.assertEqual(t.get('path').read(), 'content')
 
288
        t.put_bytes('path', 'content')
 
289
        self.assertEqual(t.get('path').read(), 'content')
 
290
 
 
291
    def test_append_without_dir_fails(self):
 
292
        t = memory.MemoryTransport()
 
293
        self.assertRaises(errors.NoSuchFile,
 
294
                          t.append_bytes, 'dir/path', 'content')
 
295
 
 
296
    def test_put_without_dir_fails(self):
 
297
        t = memory.MemoryTransport()
 
298
        self.assertRaises(errors.NoSuchFile,
 
299
                          t.put_file, 'dir/path', StringIO('content'))
 
300
 
 
301
    def test_get_missing(self):
 
302
        transport = memory.MemoryTransport()
 
303
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
304
 
 
305
    def test_has_missing(self):
 
306
        t = memory.MemoryTransport()
 
307
        self.assertEquals(False, t.has('foo'))
 
308
 
 
309
    def test_has_present(self):
 
310
        t = memory.MemoryTransport()
 
311
        t.append_bytes('foo', 'content')
 
312
        self.assertEquals(True, t.has('foo'))
 
313
 
 
314
    def test_list_dir(self):
 
315
        t = memory.MemoryTransport()
 
316
        t.put_bytes('foo', 'content')
 
317
        t.mkdir('dir')
 
318
        t.put_bytes('dir/subfoo', 'content')
 
319
        t.put_bytes('dirlike', 'content')
 
320
 
 
321
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
322
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
323
 
 
324
    def test_mkdir(self):
 
325
        t = memory.MemoryTransport()
 
326
        t.mkdir('dir')
 
327
        t.append_bytes('dir/path', 'content')
 
328
        self.assertEqual(t.get('dir/path').read(), 'content')
 
329
 
 
330
    def test_mkdir_missing_parent(self):
 
331
        t = memory.MemoryTransport()
 
332
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
333
 
 
334
    def test_mkdir_twice(self):
 
335
        t = memory.MemoryTransport()
 
336
        t.mkdir('dir')
 
337
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
338
 
 
339
    def test_parameters(self):
 
340
        t = memory.MemoryTransport()
 
341
        self.assertEqual(True, t.listable())
 
342
        self.assertEqual(False, t.is_readonly())
 
343
 
 
344
    def test_iter_files_recursive(self):
 
345
        t = memory.MemoryTransport()
 
346
        t.mkdir('dir')
 
347
        t.put_bytes('dir/foo', 'content')
 
348
        t.put_bytes('dir/bar', 'content')
 
349
        t.put_bytes('bar', 'content')
 
350
        paths = set(t.iter_files_recursive())
 
351
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
352
 
 
353
    def test_stat(self):
 
354
        t = memory.MemoryTransport()
 
355
        t.put_bytes('foo', 'content')
 
356
        t.put_bytes('bar', 'phowar')
 
357
        self.assertEqual(7, t.stat('foo').st_size)
 
358
        self.assertEqual(6, t.stat('bar').st_size)
 
359
 
 
360
 
 
361
class ChrootDecoratorTransportTest(tests.TestCase):
 
362
    """Chroot decoration specific tests."""
 
363
 
 
364
    def test_abspath(self):
 
365
        # The abspath is always relative to the chroot_url.
 
366
        server = chroot.ChrootServer(
 
367
            transport.get_transport('memory:///foo/bar/'))
 
368
        self.start_server(server)
 
369
        t = transport.get_transport(server.get_url())
 
370
        self.assertEqual(server.get_url(), t.abspath('/'))
 
371
 
 
372
        subdir_t = t.clone('subdir')
 
373
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
374
 
 
375
    def test_clone(self):
 
376
        server = chroot.ChrootServer(
 
377
            transport.get_transport('memory:///foo/bar/'))
 
378
        self.start_server(server)
 
379
        t = transport.get_transport(server.get_url())
 
380
        # relpath from root and root path are the same
 
381
        relpath_cloned = t.clone('foo')
 
382
        abspath_cloned = t.clone('/foo')
 
383
        self.assertEqual(server, relpath_cloned.server)
 
384
        self.assertEqual(server, abspath_cloned.server)
 
385
 
 
386
    def test_chroot_url_preserves_chroot(self):
 
387
        """Calling get_transport on a chroot transport's base should produce a
 
388
        transport with exactly the same behaviour as the original chroot
 
389
        transport.
 
390
 
 
391
        This is so that it is not possible to escape a chroot by doing::
 
392
            url = chroot_transport.base
 
393
            parent_url = urlutils.join(url, '..')
 
394
            new_t = transport.get_transport(parent_url)
 
395
        """
 
396
        server = chroot.ChrootServer(
 
397
            transport.get_transport('memory:///path/subpath'))
 
398
        self.start_server(server)
 
399
        t = transport.get_transport(server.get_url())
 
400
        new_t = transport.get_transport(t.base)
 
401
        self.assertEqual(t.server, new_t.server)
 
402
        self.assertEqual(t.base, new_t.base)
 
403
 
 
404
    def test_urljoin_preserves_chroot(self):
 
405
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
406
        URL that escapes the intended chroot.
 
407
 
 
408
        This is so that it is not possible to escape a chroot by doing::
 
409
            url = chroot_transport.base
 
410
            parent_url = urlutils.join(url, '..')
 
411
            new_t = transport.get_transport(parent_url)
 
412
        """
 
413
        server = chroot.ChrootServer(
 
414
            transport.get_transport('memory:///path/'))
 
415
        self.start_server(server)
 
416
        t = transport.get_transport(server.get_url())
 
417
        self.assertRaises(
 
418
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
419
 
 
420
 
 
421
class TestChrootServer(tests.TestCase):
 
422
 
 
423
    def test_construct(self):
 
424
        backing_transport = memory.MemoryTransport()
 
425
        server = chroot.ChrootServer(backing_transport)
 
426
        self.assertEqual(backing_transport, server.backing_transport)
 
427
 
 
428
    def test_setUp(self):
 
429
        backing_transport = memory.MemoryTransport()
 
430
        server = chroot.ChrootServer(backing_transport)
 
431
        server.start_server()
 
432
        self.addCleanup(server.stop_server)
 
433
        self.assertTrue(server.scheme
 
434
                        in transport._get_protocol_handlers().keys())
 
435
 
 
436
    def test_stop_server(self):
 
437
        backing_transport = memory.MemoryTransport()
 
438
        server = chroot.ChrootServer(backing_transport)
 
439
        server.start_server()
 
440
        server.stop_server()
 
441
        self.assertFalse(server.scheme
 
442
                         in transport._get_protocol_handlers().keys())
 
443
 
 
444
    def test_get_url(self):
 
445
        backing_transport = memory.MemoryTransport()
 
446
        server = chroot.ChrootServer(backing_transport)
 
447
        server.start_server()
 
448
        self.addCleanup(server.stop_server)
 
449
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
450
 
 
451
 
 
452
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
453
    """Pathfilter decoration specific tests."""
 
454
 
 
455
    def test_abspath(self):
 
456
        # The abspath is always relative to the base of the backing transport.
 
457
        server = pathfilter.PathFilteringServer(
 
458
            transport.get_transport('memory:///foo/bar/'),
 
459
            lambda x: x)
 
460
        server.start_server()
 
461
        t = transport.get_transport(server.get_url())
 
462
        self.assertEqual(server.get_url(), t.abspath('/'))
 
463
 
 
464
        subdir_t = t.clone('subdir')
 
465
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
466
        server.stop_server()
 
467
 
 
468
    def make_pf_transport(self, filter_func=None):
 
469
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
470
 
 
471
        :param filter_func: by default this will be a no-op function.  Use this
 
472
            parameter to override it."""
 
473
        if filter_func is None:
 
474
            filter_func = lambda x: x
 
475
        server = pathfilter.PathFilteringServer(
 
476
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
477
        server.start_server()
 
478
        self.addCleanup(server.stop_server)
 
479
        return transport.get_transport(server.get_url())
 
480
 
 
481
    def test__filter(self):
 
482
        # _filter (with an identity func as filter_func) always returns
 
483
        # paths relative to the base of the backing transport.
 
484
        t = self.make_pf_transport()
 
485
        self.assertEqual('foo', t._filter('foo'))
 
486
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
487
        self.assertEqual('', t._filter('..'))
 
488
        self.assertEqual('', t._filter('/'))
 
489
        # The base of the pathfiltering transport is taken into account too.
 
490
        t = t.clone('subdir1/subdir2')
 
491
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
492
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
493
        self.assertEqual('subdir1', t._filter('..'))
 
494
        self.assertEqual('', t._filter('/'))
 
495
 
 
496
    def test_filter_invocation(self):
 
497
        filter_log = []
 
498
 
 
499
        def filter(path):
 
500
            filter_log.append(path)
 
501
            return path
 
502
        t = self.make_pf_transport(filter)
 
503
        t.has('abc')
 
504
        self.assertEqual(['abc'], filter_log)
 
505
        del filter_log[:]
 
506
        t.clone('abc').has('xyz')
 
507
        self.assertEqual(['abc/xyz'], filter_log)
 
508
        del filter_log[:]
 
509
        t.has('/abc')
 
510
        self.assertEqual(['abc'], filter_log)
 
511
 
 
512
    def test_clone(self):
 
513
        t = self.make_pf_transport()
 
514
        # relpath from root and root path are the same
 
515
        relpath_cloned = t.clone('foo')
 
516
        abspath_cloned = t.clone('/foo')
 
517
        self.assertEqual(t.server, relpath_cloned.server)
 
518
        self.assertEqual(t.server, abspath_cloned.server)
 
519
 
 
520
    def test_url_preserves_pathfiltering(self):
 
521
        """Calling get_transport on a pathfiltered transport's base should
 
522
        produce a transport with exactly the same behaviour as the original
 
523
        pathfiltered transport.
 
524
 
 
525
        This is so that it is not possible to escape (accidentally or
 
526
        otherwise) the filtering by doing::
 
527
            url = filtered_transport.base
 
528
            parent_url = urlutils.join(url, '..')
 
529
            new_t = transport.get_transport(parent_url)
 
530
        """
 
531
        t = self.make_pf_transport()
 
532
        new_t = transport.get_transport(t.base)
 
533
        self.assertEqual(t.server, new_t.server)
 
534
        self.assertEqual(t.base, new_t.base)
 
535
 
 
536
 
 
537
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
538
    """Readonly decoration specific tests."""
 
539
 
 
540
    def test_local_parameters(self):
 
541
        # connect to . in readonly mode
 
542
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
543
        self.assertEqual(True, t.listable())
 
544
        self.assertEqual(True, t.is_readonly())
 
545
 
 
546
    def test_http_parameters(self):
 
547
        from bzrlib.tests.http_server import HttpServer
 
548
        # connect to '.' via http which is not listable
 
549
        server = HttpServer()
 
550
        self.start_server(server)
 
551
        t = transport.get_transport('readonly+' + server.get_url())
 
552
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
 
553
        self.assertEqual(False, t.listable())
 
554
        self.assertEqual(True, t.is_readonly())
 
555
 
 
556
 
 
557
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
558
    """NFS decorator specific tests."""
 
559
 
 
560
    def get_nfs_transport(self, url):
 
561
        # connect to url with nfs decoration
 
562
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
563
 
 
564
    def test_local_parameters(self):
 
565
        # the listable and is_readonly parameters
 
566
        # are not changed by the fakenfs decorator
 
567
        t = self.get_nfs_transport('.')
 
568
        self.assertEqual(True, t.listable())
 
569
        self.assertEqual(False, t.is_readonly())
 
570
 
 
571
    def test_http_parameters(self):
 
572
        # the listable and is_readonly parameters
 
573
        # are not changed by the fakenfs decorator
 
574
        from bzrlib.tests.http_server import HttpServer
 
575
        # connect to '.' via http which is not listable
 
576
        server = HttpServer()
 
577
        self.start_server(server)
 
578
        t = self.get_nfs_transport(server.get_url())
 
579
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
580
        self.assertEqual(False, t.listable())
 
581
        self.assertEqual(True, t.is_readonly())
 
582
 
 
583
    def test_fakenfs_server_default(self):
 
584
        # a FakeNFSServer() should bring up a local relpath server for itself
 
585
        server = test_server.FakeNFSServer()
 
586
        self.start_server(server)
 
587
        # the url should be decorated appropriately
 
588
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
589
        # and we should be able to get a transport for it
 
590
        t = transport.get_transport(server.get_url())
 
591
        # which must be a FakeNFSTransportDecorator instance.
 
592
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
593
 
 
594
    def test_fakenfs_rename_semantics(self):
 
595
        # a FakeNFS transport must mangle the way rename errors occur to
 
596
        # look like NFS problems.
 
597
        t = self.get_nfs_transport('.')
 
598
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
599
                        transport=t)
 
600
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
601
 
 
602
 
 
603
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
604
    """Tests for simulation of VFAT restrictions"""
 
605
 
 
606
    def get_vfat_transport(self, url):
 
607
        """Return vfat-backed transport for test directory"""
 
608
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
609
        return FakeVFATTransportDecorator('vfat+' + url)
 
610
 
 
611
    def test_transport_creation(self):
 
612
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
613
        t = self.get_vfat_transport('.')
 
614
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
615
 
 
616
    def test_transport_mkdir(self):
 
617
        t = self.get_vfat_transport('.')
 
618
        t.mkdir('HELLO')
 
619
        self.assertTrue(t.has('hello'))
 
620
        self.assertTrue(t.has('Hello'))
 
621
 
 
622
    def test_forbidden_chars(self):
 
623
        t = self.get_vfat_transport('.')
 
624
        self.assertRaises(ValueError, t.has, "<NU>")
 
625
 
 
626
 
 
627
class BadTransportHandler(transport.Transport):
 
628
    def __init__(self, base_url):
 
629
        raise errors.DependencyNotPresent('some_lib',
 
630
                                          'testing missing dependency')
 
631
 
 
632
 
 
633
class BackupTransportHandler(transport.Transport):
 
634
    """Test transport that works as a backup for the BadTransportHandler"""
 
635
    pass
 
636
 
 
637
 
 
638
class TestTransportImplementation(tests.TestCaseInTempDir):
 
639
    """Implementation verification for transports.
 
640
 
 
641
    To verify a transport we need a server factory, which is a callable
 
642
    that accepts no parameters and returns an implementation of
 
643
    bzrlib.transport.Server.
 
644
 
 
645
    That Server is then used to construct transport instances and test
 
646
    the transport via loopback activity.
 
647
 
 
648
    Currently this assumes that the Transport object is connected to the
 
649
    current working directory.  So that whatever is done
 
650
    through the transport, should show up in the working
 
651
    directory, and vice-versa. This is a bug, because its possible to have
 
652
    URL schemes which provide access to something that may not be
 
653
    result in storage on the local disk, i.e. due to file system limits, or
 
654
    due to it being a database or some other non-filesystem tool.
 
655
 
 
656
    This also tests to make sure that the functions work with both
 
657
    generators and lists (assuming iter(list) is effectively a generator)
 
658
    """
 
659
 
 
660
    def setUp(self):
 
661
        super(TestTransportImplementation, self).setUp()
 
662
        self._server = self.transport_server()
 
663
        self.start_server(self._server)
 
664
 
 
665
    def get_transport(self, relpath=None):
 
666
        """Return a connected transport to the local directory.
 
667
 
 
668
        :param relpath: a path relative to the base url.
 
669
        """
 
670
        base_url = self._server.get_url()
 
671
        url = self._adjust_url(base_url, relpath)
 
672
        # try getting the transport via the regular interface:
 
673
        t = transport.get_transport(url)
 
674
        # vila--20070607 if the following are commented out the test suite
 
675
        # still pass. Is this really still needed or was it a forgotten
 
676
        # temporary fix ?
 
677
        if not isinstance(t, self.transport_class):
 
678
            # we did not get the correct transport class type. Override the
 
679
            # regular connection behaviour by direct construction.
 
680
            t = self.transport_class(url)
 
681
        return t
 
682
 
 
683
 
 
684
class TestLocalTransports(tests.TestCase):
 
685
 
 
686
    def test_get_transport_from_abspath(self):
 
687
        here = osutils.abspath('.')
 
688
        t = transport.get_transport(here)
 
689
        self.assertIsInstance(t, local.LocalTransport)
 
690
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
691
 
 
692
    def test_get_transport_from_relpath(self):
 
693
        here = osutils.abspath('.')
 
694
        t = transport.get_transport('.')
 
695
        self.assertIsInstance(t, local.LocalTransport)
 
696
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
697
 
 
698
    def test_get_transport_from_local_url(self):
 
699
        here = osutils.abspath('.')
 
700
        here_url = urlutils.local_path_to_url(here) + '/'
 
701
        t = transport.get_transport(here_url)
 
702
        self.assertIsInstance(t, local.LocalTransport)
 
703
        self.assertEquals(t.base, here_url)
 
704
 
 
705
    def test_local_abspath(self):
 
706
        here = osutils.abspath('.')
 
707
        t = transport.get_transport(here)
 
708
        self.assertEquals(t.local_abspath(''), here)
 
709
 
 
710
 
 
711
class TestWin32LocalTransport(tests.TestCase):
 
712
 
 
713
    def test_unc_clone_to_root(self):
 
714
        # Win32 UNC path like \\HOST\path
 
715
        # clone to root should stop at least at \\HOST part
 
716
        # not on \\
 
717
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
718
        for i in xrange(4):
 
719
            t = t.clone('..')
 
720
        self.assertEquals(t.base, 'file://HOST/')
 
721
        # make sure we reach the root
 
722
        t = t.clone('..')
 
723
        self.assertEquals(t.base, 'file://HOST/')
 
724
 
 
725
 
 
726
class TestConnectedTransport(tests.TestCase):
 
727
    """Tests for connected to remote server transports"""
 
728
 
 
729
    def test_parse_url(self):
 
730
        t = transport.ConnectedTransport(
 
731
            'http://simple.example.com/home/source')
 
732
        self.assertEquals(t._host, 'simple.example.com')
 
733
        self.assertEquals(t._port, None)
 
734
        self.assertEquals(t._path, '/home/source/')
 
735
        self.assertTrue(t._user is None)
 
736
        self.assertTrue(t._password is None)
 
737
 
 
738
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
739
 
 
740
    def test_parse_url_with_at_in_user(self):
 
741
        # Bug 228058
 
742
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
743
        self.assertEquals(t._user, 'user@host.com')
 
744
 
 
745
    def test_parse_quoted_url(self):
 
746
        t = transport.ConnectedTransport(
 
747
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
748
        self.assertEquals(t._host, 'exAmple.com')
 
749
        self.assertEquals(t._port, 2222)
 
750
        self.assertEquals(t._user, 'robey')
 
751
        self.assertEquals(t._password, 'h@t')
 
752
        self.assertEquals(t._path, '/path/')
 
753
 
 
754
        # Base should not keep track of the password
 
755
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
756
 
 
757
    def test_parse_invalid_url(self):
 
758
        self.assertRaises(errors.InvalidURL,
 
759
                          transport.ConnectedTransport,
 
760
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
761
 
 
762
    def test_relpath(self):
 
763
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
764
 
 
765
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'),
 
766
            'sub')
 
767
        self.assertRaises(errors.PathNotChild, t.relpath,
 
768
                          'http://user@host.com/abs/path/sub')
 
769
        self.assertRaises(errors.PathNotChild, t.relpath,
 
770
                          'sftp://user2@host.com/abs/path/sub')
 
771
        self.assertRaises(errors.PathNotChild, t.relpath,
 
772
                          'sftp://user@otherhost.com/abs/path/sub')
 
773
        self.assertRaises(errors.PathNotChild, t.relpath,
 
774
                          'sftp://user@host.com:33/abs/path/sub')
 
775
        # Make sure it works when we don't supply a username
 
776
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
777
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
778
 
 
779
        # Make sure it works when parts of the path will be url encoded
 
780
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
781
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
782
 
 
783
    def test_connection_sharing_propagate_credentials(self):
 
784
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
785
        self.assertEquals('user', t._user)
 
786
        self.assertEquals('host.com', t._host)
 
787
        self.assertIs(None, t._get_connection())
 
788
        self.assertIs(None, t._password)
 
789
        c = t.clone('subdir')
 
790
        self.assertIs(None, c._get_connection())
 
791
        self.assertIs(None, t._password)
 
792
 
 
793
        # Simulate the user entering a password
 
794
        password = 'secret'
 
795
        connection = object()
 
796
        t._set_connection(connection, password)
 
797
        self.assertIs(connection, t._get_connection())
 
798
        self.assertIs(password, t._get_credentials())
 
799
        self.assertIs(connection, c._get_connection())
 
800
        self.assertIs(password, c._get_credentials())
 
801
 
 
802
        # credentials can be updated
 
803
        new_password = 'even more secret'
 
804
        c._update_credentials(new_password)
 
805
        self.assertIs(connection, t._get_connection())
 
806
        self.assertIs(new_password, t._get_credentials())
 
807
        self.assertIs(connection, c._get_connection())
 
808
        self.assertIs(new_password, c._get_credentials())
 
809
 
 
810
 
 
811
class TestReusedTransports(tests.TestCase):
 
812
    """Tests for transport reuse"""
 
813
 
 
814
    def test_reuse_same_transport(self):
 
815
        possible_transports = []
 
816
        t1 = transport.get_transport('http://foo/',
 
817
                                     possible_transports=possible_transports)
 
818
        self.assertEqual([t1], possible_transports)
 
819
        t2 = transport.get_transport('http://foo/',
 
820
                                     possible_transports=[t1])
 
821
        self.assertIs(t1, t2)
 
822
 
 
823
        # Also check that final '/' are handled correctly
 
824
        t3 = transport.get_transport('http://foo/path/')
 
825
        t4 = transport.get_transport('http://foo/path',
 
826
                                     possible_transports=[t3])
 
827
        self.assertIs(t3, t4)
 
828
 
 
829
        t5 = transport.get_transport('http://foo/path')
 
830
        t6 = transport.get_transport('http://foo/path/',
 
831
                                     possible_transports=[t5])
 
832
        self.assertIs(t5, t6)
 
833
 
 
834
    def test_don_t_reuse_different_transport(self):
 
835
        t1 = transport.get_transport('http://foo/path')
 
836
        t2 = transport.get_transport('http://bar/path',
 
837
                                     possible_transports=[t1])
 
838
        self.assertIsNot(t1, t2)
 
839
 
 
840
 
 
841
class TestTransportTrace(tests.TestCase):
 
842
 
 
843
    def test_get(self):
 
844
        t = transport.get_transport('trace+memory://')
 
845
        self.assertIsInstance(
 
846
            t, bzrlib.transport.trace.TransportTraceDecorator)
 
847
 
 
848
    def test_clone_preserves_activity(self):
 
849
        t = transport.get_transport('trace+memory://')
 
850
        t2 = t.clone('.')
 
851
        self.assertTrue(t is not t2)
 
852
        self.assertTrue(t._activity is t2._activity)
 
853
 
 
854
    # the following specific tests are for the operations that have made use of
 
855
    # logging in tests; we could test every single operation but doing that
 
856
    # still won't cause a test failure when the top level Transport API
 
857
    # changes; so there is little return doing that.
 
858
    def test_get(self):
 
859
        t = transport.get_transport('trace+memory:///')
 
860
        t.put_bytes('foo', 'barish')
 
861
        t.get('foo')
 
862
        expected_result = []
 
863
        # put_bytes records the bytes, not the content to avoid memory
 
864
        # pressure.
 
865
        expected_result.append(('put_bytes', 'foo', 6, None))
 
866
        # get records the file name only.
 
867
        expected_result.append(('get', 'foo'))
 
868
        self.assertEqual(expected_result, t._activity)
 
869
 
 
870
    def test_readv(self):
 
871
        t = transport.get_transport('trace+memory:///')
 
872
        t.put_bytes('foo', 'barish')
 
873
        list(t.readv('foo', [(0, 1), (3, 2)],
 
874
                     adjust_for_latency=True, upper_limit=6))
 
875
        expected_result = []
 
876
        # put_bytes records the bytes, not the content to avoid memory
 
877
        # pressure.
 
878
        expected_result.append(('put_bytes', 'foo', 6, None))
 
879
        # readv records the supplied offset request
 
880
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
881
        self.assertEqual(expected_result, t._activity)
 
882
 
 
883
 
 
884
class TestSSHConnections(tests.TestCaseWithTransport):
 
885
 
 
886
    def test_bzr_connect_to_bzr_ssh(self):
 
887
        """get_transport of a bzr+ssh:// behaves correctly.
 
888
 
 
889
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
890
        """
 
891
        # This test actually causes a bzr instance to be invoked, which is very
 
892
        # expensive: it should be the only such test in the test suite.
 
893
        # A reasonable evolution for this would be to simply check inside
 
894
        # check_channel_exec_request that the command is appropriate, and then
 
895
        # satisfy requests in-process.
 
896
        self.requireFeature(features.paramiko)
 
897
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
898
        # override the interface (doesn't change self._vendor).
 
899
        # Note that this does encryption, so can be slow.
 
900
        from bzrlib.tests import stub_sftp
 
901
 
 
902
        # Start an SSH server
 
903
        self.command_executed = []
 
904
        # XXX: This is horrible -- we define a really dumb SSH server that
 
905
        # executes commands, and manage the hooking up of stdin/out/err to the
 
906
        # SSH channel ourselves.  Surely this has already been implemented
 
907
        # elsewhere?
 
908
        started = []
 
909
 
 
910
        class StubSSHServer(stub_sftp.StubServer):
 
911
 
 
912
            test = self
 
913
 
 
914
            def check_channel_exec_request(self, channel, command):
 
915
                self.test.command_executed.append(command)
 
916
                proc = subprocess.Popen(
 
917
                    command, shell=True, stdin=subprocess.PIPE,
 
918
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
919
 
 
920
                # XXX: horribly inefficient, not to mention ugly.
 
921
                # Start a thread for each of stdin/out/err, and relay bytes
 
922
                # from the subprocess to channel and vice versa.
 
923
                def ferry_bytes(read, write, close):
 
924
                    while True:
 
925
                        bytes = read(1)
 
926
                        if bytes == '':
 
927
                            close()
 
928
                            break
 
929
                        write(bytes)
 
930
 
 
931
                file_functions = [
 
932
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
933
                    (proc.stdout.read, channel.sendall, channel.close),
 
934
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
935
                started.append(proc)
 
936
                for read, write, close in file_functions:
 
937
                    t = threading.Thread(
 
938
                        target=ferry_bytes, args=(read, write, close))
 
939
                    t.start()
 
940
                    started.append(t)
 
941
 
 
942
                return True
 
943
 
 
944
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
945
        # We *don't* want to override the default SSH vendor: the detected one
 
946
        # is the one to use.
 
947
 
 
948
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
949
        # inherits from SFTPServer which forces the SSH vendor to
 
950
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
951
        self.start_server(ssh_server)
 
952
        port = ssh_server.port
 
953
 
 
954
        if sys.platform == 'win32':
 
955
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
956
        else:
 
957
            bzr_remote_path = self.get_bzr_path()
 
958
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
 
959
 
 
960
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
961
        # variable is used to tell bzr what command to run on the remote end.
 
962
        path_to_branch = osutils.abspath('.')
 
963
        if sys.platform == 'win32':
 
964
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
965
            # prefix a '/' to get the right path.
 
966
            path_to_branch = '/' + path_to_branch
 
967
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
968
        t = transport.get_transport(url)
 
969
        self.permit_url(t.base)
 
970
        t.mkdir('foo')
 
971
 
 
972
        self.assertEqual(
 
973
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
974
            self.command_executed)
 
975
        # Make sure to disconnect, so that the remote process can stop, and we
 
976
        # can cleanup. Then pause the test until everything is shutdown
 
977
        t._client._medium.disconnect()
 
978
        if not started:
 
979
            return
 
980
        # First wait for the subprocess
 
981
        started[0].wait()
 
982
        # And the rest are threads
 
983
        for t in started[1:]:
 
984
            t.join()
 
985
 
 
986
 
 
987
class TestUnhtml(tests.TestCase):
 
988
 
 
989
    """Tests for unhtml_roughly"""
 
990
 
 
991
    def test_truncation(self):
 
992
        fake_html = "<p>something!\n" * 1000
 
993
        result = http.unhtml_roughly(fake_html)
 
994
        self.assertEquals(len(result), 1000)
 
995
        self.assertStartsWith(result, " something!")