~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-11-06 16:01:48 UTC
  • mfrom: (5527.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20101106160148-8maemz21jbrhpzky
(vila) Move NEWS entry into the correct section (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
18
from cStringIO import StringIO
 
19
import os
 
20
import subprocess
 
21
import sys
 
22
import threading
19
23
 
20
 
import bzrlib
21
24
from bzrlib import (
22
25
    errors,
23
26
    osutils,
 
27
    tests,
 
28
    transport,
24
29
    urlutils,
25
30
    )
26
 
from bzrlib.errors import (DependencyNotPresent,
27
 
                           FileExists,
28
 
                           InvalidURLJoin,
29
 
                           NoSuchFile,
30
 
                           PathNotChild,
31
 
                           ReadError,
32
 
                           UnsupportedProtocol,
33
 
                           )
34
 
from bzrlib.tests import TestCase, TestCaseInTempDir
35
 
from bzrlib.transport import (_clear_protocol_handlers,
36
 
                              _CoalescedOffset,
37
 
                              ConnectedTransport,
38
 
                              _get_protocol_handlers,
39
 
                              _set_protocol_handlers,
40
 
                              _get_transport_modules,
41
 
                              get_transport,
42
 
                              LateReadError,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              Transport,
46
 
                              )
47
 
from bzrlib.transport.chroot import ChrootServer
48
 
from bzrlib.transport.memory import MemoryTransport
49
 
from bzrlib.transport.local import (LocalTransport,
50
 
                                    EmulatedWin32LocalTransport)
 
31
from bzrlib.transport import (
 
32
    chroot,
 
33
    fakenfs,
 
34
    http,
 
35
    local,
 
36
    memory,
 
37
    pathfilter,
 
38
    readonly,
 
39
    )
 
40
from bzrlib.tests import (
 
41
    features,
 
42
    test_server,
 
43
    )
51
44
 
52
45
 
53
46
# TODO: Should possibly split transport-specific tests into their own files.
54
47
 
55
48
 
56
 
class TestTransport(TestCase):
 
49
class TestTransport(tests.TestCase):
57
50
    """Test the non transport-concrete class functionality."""
58
51
 
 
52
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
53
    # of try/finally -- vila 20100205
 
54
 
59
55
    def test__get_set_protocol_handlers(self):
60
 
        handlers = _get_protocol_handlers()
 
56
        handlers = transport._get_protocol_handlers()
61
57
        self.assertNotEqual([], handlers.keys( ))
62
58
        try:
63
 
            _clear_protocol_handlers()
64
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
59
            transport._clear_protocol_handlers()
 
60
            self.assertEqual([], transport._get_protocol_handlers().keys())
65
61
        finally:
66
 
            _set_protocol_handlers(handlers)
 
62
            transport._set_protocol_handlers(handlers)
67
63
 
68
64
    def test_get_transport_modules(self):
69
 
        handlers = _get_protocol_handlers()
 
65
        handlers = transport._get_protocol_handlers()
70
66
        # don't pollute the current handlers
71
 
        _clear_protocol_handlers()
 
67
        transport._clear_protocol_handlers()
72
68
        class SampleHandler(object):
73
69
            """I exist, isnt that enough?"""
74
70
        try:
75
 
            _clear_protocol_handlers()
76
 
            register_transport_proto('foo')
77
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
78
 
                                    'TestTransport.SampleHandler')
79
 
            register_transport_proto('bar')
80
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
81
 
                                    'TestTransport.SampleHandler')
 
71
            transport._clear_protocol_handlers()
 
72
            transport.register_transport_proto('foo')
 
73
            transport.register_lazy_transport('foo',
 
74
                                              'bzrlib.tests.test_transport',
 
75
                                              'TestTransport.SampleHandler')
 
76
            transport.register_transport_proto('bar')
 
77
            transport.register_lazy_transport('bar',
 
78
                                              'bzrlib.tests.test_transport',
 
79
                                              'TestTransport.SampleHandler')
82
80
            self.assertEqual([SampleHandler.__module__,
83
 
                              'bzrlib.transport.chroot'],
84
 
                             _get_transport_modules())
 
81
                              'bzrlib.transport.chroot',
 
82
                              'bzrlib.transport.pathfilter'],
 
83
                             transport._get_transport_modules())
85
84
        finally:
86
 
            _set_protocol_handlers(handlers)
 
85
            transport._set_protocol_handlers(handlers)
87
86
 
88
87
    def test_transport_dependency(self):
89
88
        """Transport with missing dependency causes no error"""
90
 
        saved_handlers = _get_protocol_handlers()
 
89
        saved_handlers = transport._get_protocol_handlers()
91
90
        # don't pollute the current handlers
92
 
        _clear_protocol_handlers()
 
91
        transport._clear_protocol_handlers()
93
92
        try:
94
 
            register_transport_proto('foo')
95
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
96
 
                    'BadTransportHandler')
 
93
            transport.register_transport_proto('foo')
 
94
            transport.register_lazy_transport(
 
95
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
97
96
            try:
98
 
                get_transport('foo://fooserver/foo')
99
 
            except UnsupportedProtocol, e:
 
97
                transport.get_transport('foo://fooserver/foo')
 
98
            except errors.UnsupportedProtocol, e:
100
99
                e_str = str(e)
101
100
                self.assertEquals('Unsupported protocol'
102
101
                                  ' for url "foo://fooserver/foo":'
106
105
                self.fail('Did not raise UnsupportedProtocol')
107
106
        finally:
108
107
            # restore original values
109
 
            _set_protocol_handlers(saved_handlers)
110
 
            
 
108
            transport._set_protocol_handlers(saved_handlers)
 
109
 
111
110
    def test_transport_fallback(self):
112
111
        """Transport with missing dependency causes no error"""
113
 
        saved_handlers = _get_protocol_handlers()
 
112
        saved_handlers = transport._get_protocol_handlers()
114
113
        try:
115
 
            _clear_protocol_handlers()
116
 
            register_transport_proto('foo')
117
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
118
 
                    'BackupTransportHandler')
119
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
 
                    'BadTransportHandler')
121
 
            t = get_transport('foo://fooserver/foo')
 
114
            transport._clear_protocol_handlers()
 
115
            transport.register_transport_proto('foo')
 
116
            transport.register_lazy_transport(
 
117
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
118
            transport.register_lazy_transport(
 
119
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
120
            t = transport.get_transport('foo://fooserver/foo')
122
121
            self.assertTrue(isinstance(t, BackupTransportHandler))
123
122
        finally:
124
 
            _set_protocol_handlers(saved_handlers)
 
123
            transport._set_protocol_handlers(saved_handlers)
 
124
 
 
125
    def test_ssh_hints(self):
 
126
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
127
        try:
 
128
            transport.get_transport('ssh://fooserver/foo')
 
129
        except errors.UnsupportedProtocol, e:
 
130
            e_str = str(e)
 
131
            self.assertEquals('Unsupported protocol'
 
132
                              ' for url "ssh://fooserver/foo":'
 
133
                              ' bzr supports bzr+ssh to operate over ssh,'
 
134
                              ' use "bzr+ssh://fooserver/foo".',
 
135
                              str(e))
 
136
        else:
 
137
            self.fail('Did not raise UnsupportedProtocol')
125
138
 
126
139
    def test_LateReadError(self):
127
140
        """The LateReadError helper should raise on read()."""
128
 
        a_file = LateReadError('a path')
 
141
        a_file = transport.LateReadError('a path')
129
142
        try:
130
143
            a_file.read()
131
 
        except ReadError, error:
 
144
        except errors.ReadError, error:
132
145
            self.assertEqual('a path', error.path)
133
 
        self.assertRaises(ReadError, a_file.read, 40)
 
146
        self.assertRaises(errors.ReadError, a_file.read, 40)
134
147
        a_file.close()
135
148
 
136
149
    def test__combine_paths(self):
137
 
        t = Transport('/')
 
150
        t = transport.Transport('/')
138
151
        self.assertEqual('/home/sarah/project/foo',
139
152
                         t._combine_paths('/home/sarah', 'project/foo'))
140
153
        self.assertEqual('/etc',
146
159
 
147
160
    def test_local_abspath_non_local_transport(self):
148
161
        # the base implementation should throw
149
 
        t = MemoryTransport()
 
162
        t = memory.MemoryTransport()
150
163
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
151
164
        self.assertEqual('memory:///t is not a local path.', str(e))
152
165
 
153
166
 
154
 
class TestCoalesceOffsets(TestCase):
 
167
class TestCoalesceOffsets(tests.TestCase):
155
168
 
156
169
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
157
 
        coalesce = Transport._coalesce_offsets
158
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
170
        coalesce = transport.Transport._coalesce_offsets
 
171
        exp = [transport._CoalescedOffset(*x) for x in expected]
159
172
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
160
173
                            max_size=max_size))
161
174
        self.assertEqual(exp, out)
180
193
        self.check([(0, 20, [(0, 10), (10, 10)])],
181
194
                   [(0, 10), (10, 10)])
182
195
 
183
 
    # XXX: scary, http.readv() can't handle that --vila20071209
184
196
    def test_coalesce_overlapped(self):
185
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
186
 
                   [(0, 10), (5, 10)])
 
197
        self.assertRaises(ValueError,
 
198
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
199
                        [(0, 10), (5, 10)])
187
200
 
188
201
    def test_coalesce_limit(self):
189
202
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
225
238
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
226
239
                  )
227
240
 
228
 
 
229
 
class TestMemoryTransport(TestCase):
 
241
    def test_coalesce_default_limit(self):
 
242
        # By default we use a 100MB max size.
 
243
        ten_mb = 10*1024*1024
 
244
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
245
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
246
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
247
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
249
                   max_size=1*1024*1024*1024)
 
250
 
 
251
 
 
252
class TestMemoryServer(tests.TestCase):
 
253
 
 
254
    def test_create_server(self):
 
255
        server = memory.MemoryServer()
 
256
        server.start_server()
 
257
        url = server.get_url()
 
258
        self.assertTrue(url in transport.transport_list_registry)
 
259
        t = transport.get_transport(url)
 
260
        del t
 
261
        server.stop_server()
 
262
        self.assertFalse(url in transport.transport_list_registry)
 
263
        self.assertRaises(errors.UnsupportedProtocol,
 
264
                          transport.get_transport, url)
 
265
 
 
266
 
 
267
class TestMemoryTransport(tests.TestCase):
230
268
 
231
269
    def test_get_transport(self):
232
 
        MemoryTransport()
 
270
        memory.MemoryTransport()
233
271
 
234
272
    def test_clone(self):
235
 
        transport = MemoryTransport()
236
 
        self.assertTrue(isinstance(transport, MemoryTransport))
237
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
273
        t = memory.MemoryTransport()
 
274
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
275
        self.assertEqual("memory:///", t.clone("/").base)
238
276
 
239
277
    def test_abspath(self):
240
 
        transport = MemoryTransport()
241
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
278
        t = memory.MemoryTransport()
 
279
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
242
280
 
243
281
    def test_abspath_of_root(self):
244
 
        transport = MemoryTransport()
245
 
        self.assertEqual("memory:///", transport.base)
246
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
282
        t = memory.MemoryTransport()
 
283
        self.assertEqual("memory:///", t.base)
 
284
        self.assertEqual("memory:///", t.abspath('/'))
247
285
 
248
286
    def test_abspath_of_relpath_starting_at_root(self):
249
 
        transport = MemoryTransport()
250
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
287
        t = memory.MemoryTransport()
 
288
        self.assertEqual("memory:///foo", t.abspath('/foo'))
251
289
 
252
290
    def test_append_and_get(self):
253
 
        transport = MemoryTransport()
254
 
        transport.append_bytes('path', 'content')
255
 
        self.assertEqual(transport.get('path').read(), 'content')
256
 
        transport.append_file('path', StringIO('content'))
257
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
291
        t = memory.MemoryTransport()
 
292
        t.append_bytes('path', 'content')
 
293
        self.assertEqual(t.get('path').read(), 'content')
 
294
        t.append_file('path', StringIO('content'))
 
295
        self.assertEqual(t.get('path').read(), 'contentcontent')
258
296
 
259
297
    def test_put_and_get(self):
260
 
        transport = MemoryTransport()
261
 
        transport.put_file('path', StringIO('content'))
262
 
        self.assertEqual(transport.get('path').read(), 'content')
263
 
        transport.put_bytes('path', 'content')
264
 
        self.assertEqual(transport.get('path').read(), 'content')
 
298
        t = memory.MemoryTransport()
 
299
        t.put_file('path', StringIO('content'))
 
300
        self.assertEqual(t.get('path').read(), 'content')
 
301
        t.put_bytes('path', 'content')
 
302
        self.assertEqual(t.get('path').read(), 'content')
265
303
 
266
304
    def test_append_without_dir_fails(self):
267
 
        transport = MemoryTransport()
268
 
        self.assertRaises(NoSuchFile,
269
 
                          transport.append_bytes, 'dir/path', 'content')
 
305
        t = memory.MemoryTransport()
 
306
        self.assertRaises(errors.NoSuchFile,
 
307
                          t.append_bytes, 'dir/path', 'content')
270
308
 
271
309
    def test_put_without_dir_fails(self):
272
 
        transport = MemoryTransport()
273
 
        self.assertRaises(NoSuchFile,
274
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
310
        t = memory.MemoryTransport()
 
311
        self.assertRaises(errors.NoSuchFile,
 
312
                          t.put_file, 'dir/path', StringIO('content'))
275
313
 
276
314
    def test_get_missing(self):
277
 
        transport = MemoryTransport()
278
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
315
        transport = memory.MemoryTransport()
 
316
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
279
317
 
280
318
    def test_has_missing(self):
281
 
        transport = MemoryTransport()
282
 
        self.assertEquals(False, transport.has('foo'))
 
319
        t = memory.MemoryTransport()
 
320
        self.assertEquals(False, t.has('foo'))
283
321
 
284
322
    def test_has_present(self):
285
 
        transport = MemoryTransport()
286
 
        transport.append_bytes('foo', 'content')
287
 
        self.assertEquals(True, transport.has('foo'))
 
323
        t = memory.MemoryTransport()
 
324
        t.append_bytes('foo', 'content')
 
325
        self.assertEquals(True, t.has('foo'))
288
326
 
289
327
    def test_list_dir(self):
290
 
        transport = MemoryTransport()
291
 
        transport.put_bytes('foo', 'content')
292
 
        transport.mkdir('dir')
293
 
        transport.put_bytes('dir/subfoo', 'content')
294
 
        transport.put_bytes('dirlike', 'content')
 
328
        t = memory.MemoryTransport()
 
329
        t.put_bytes('foo', 'content')
 
330
        t.mkdir('dir')
 
331
        t.put_bytes('dir/subfoo', 'content')
 
332
        t.put_bytes('dirlike', 'content')
295
333
 
296
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
297
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
334
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
335
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
298
336
 
299
337
    def test_mkdir(self):
300
 
        transport = MemoryTransport()
301
 
        transport.mkdir('dir')
302
 
        transport.append_bytes('dir/path', 'content')
303
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
338
        t = memory.MemoryTransport()
 
339
        t.mkdir('dir')
 
340
        t.append_bytes('dir/path', 'content')
 
341
        self.assertEqual(t.get('dir/path').read(), 'content')
304
342
 
305
343
    def test_mkdir_missing_parent(self):
306
 
        transport = MemoryTransport()
307
 
        self.assertRaises(NoSuchFile,
308
 
                          transport.mkdir, 'dir/dir')
 
344
        t = memory.MemoryTransport()
 
345
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
309
346
 
310
347
    def test_mkdir_twice(self):
311
 
        transport = MemoryTransport()
312
 
        transport.mkdir('dir')
313
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
348
        t = memory.MemoryTransport()
 
349
        t.mkdir('dir')
 
350
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
314
351
 
315
352
    def test_parameters(self):
316
 
        transport = MemoryTransport()
317
 
        self.assertEqual(True, transport.listable())
318
 
        self.assertEqual(False, transport.is_readonly())
 
353
        t = memory.MemoryTransport()
 
354
        self.assertEqual(True, t.listable())
 
355
        self.assertEqual(False, t.is_readonly())
319
356
 
320
357
    def test_iter_files_recursive(self):
321
 
        transport = MemoryTransport()
322
 
        transport.mkdir('dir')
323
 
        transport.put_bytes('dir/foo', 'content')
324
 
        transport.put_bytes('dir/bar', 'content')
325
 
        transport.put_bytes('bar', 'content')
326
 
        paths = set(transport.iter_files_recursive())
 
358
        t = memory.MemoryTransport()
 
359
        t.mkdir('dir')
 
360
        t.put_bytes('dir/foo', 'content')
 
361
        t.put_bytes('dir/bar', 'content')
 
362
        t.put_bytes('bar', 'content')
 
363
        paths = set(t.iter_files_recursive())
327
364
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
328
365
 
329
366
    def test_stat(self):
330
 
        transport = MemoryTransport()
331
 
        transport.put_bytes('foo', 'content')
332
 
        transport.put_bytes('bar', 'phowar')
333
 
        self.assertEqual(7, transport.stat('foo').st_size)
334
 
        self.assertEqual(6, transport.stat('bar').st_size)
335
 
 
336
 
 
337
 
class ChrootDecoratorTransportTest(TestCase):
 
367
        t = memory.MemoryTransport()
 
368
        t.put_bytes('foo', 'content')
 
369
        t.put_bytes('bar', 'phowar')
 
370
        self.assertEqual(7, t.stat('foo').st_size)
 
371
        self.assertEqual(6, t.stat('bar').st_size)
 
372
 
 
373
 
 
374
class ChrootDecoratorTransportTest(tests.TestCase):
338
375
    """Chroot decoration specific tests."""
339
376
 
340
377
    def test_abspath(self):
341
378
        # The abspath is always relative to the chroot_url.
342
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
343
 
        server.setUp()
344
 
        transport = get_transport(server.get_url())
345
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
379
        server = chroot.ChrootServer(
 
380
            transport.get_transport('memory:///foo/bar/'))
 
381
        self.start_server(server)
 
382
        t = transport.get_transport(server.get_url())
 
383
        self.assertEqual(server.get_url(), t.abspath('/'))
346
384
 
347
 
        subdir_transport = transport.clone('subdir')
348
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
349
 
        server.tearDown()
 
385
        subdir_t = t.clone('subdir')
 
386
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
350
387
 
351
388
    def test_clone(self):
352
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
353
 
        server.setUp()
354
 
        transport = get_transport(server.get_url())
 
389
        server = chroot.ChrootServer(
 
390
            transport.get_transport('memory:///foo/bar/'))
 
391
        self.start_server(server)
 
392
        t = transport.get_transport(server.get_url())
355
393
        # relpath from root and root path are the same
356
 
        relpath_cloned = transport.clone('foo')
357
 
        abspath_cloned = transport.clone('/foo')
 
394
        relpath_cloned = t.clone('foo')
 
395
        abspath_cloned = t.clone('/foo')
358
396
        self.assertEqual(server, relpath_cloned.server)
359
397
        self.assertEqual(server, abspath_cloned.server)
360
 
        server.tearDown()
361
 
    
 
398
 
362
399
    def test_chroot_url_preserves_chroot(self):
363
400
        """Calling get_transport on a chroot transport's base should produce a
364
401
        transport with exactly the same behaviour as the original chroot
367
404
        This is so that it is not possible to escape a chroot by doing::
368
405
            url = chroot_transport.base
369
406
            parent_url = urlutils.join(url, '..')
370
 
            new_transport = get_transport(parent_url)
 
407
            new_t = transport.get_transport(parent_url)
371
408
        """
372
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
373
 
        server.setUp()
374
 
        transport = get_transport(server.get_url())
375
 
        new_transport = get_transport(transport.base)
376
 
        self.assertEqual(transport.server, new_transport.server)
377
 
        self.assertEqual(transport.base, new_transport.base)
378
 
        server.tearDown()
379
 
        
 
409
        server = chroot.ChrootServer(
 
410
            transport.get_transport('memory:///path/subpath'))
 
411
        self.start_server(server)
 
412
        t = transport.get_transport(server.get_url())
 
413
        new_t = transport.get_transport(t.base)
 
414
        self.assertEqual(t.server, new_t.server)
 
415
        self.assertEqual(t.base, new_t.base)
 
416
 
380
417
    def test_urljoin_preserves_chroot(self):
381
418
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
382
419
        URL that escapes the intended chroot.
384
421
        This is so that it is not possible to escape a chroot by doing::
385
422
            url = chroot_transport.base
386
423
            parent_url = urlutils.join(url, '..')
387
 
            new_transport = get_transport(parent_url)
 
424
            new_t = transport.get_transport(parent_url)
388
425
        """
389
 
        server = ChrootServer(get_transport('memory:///path/'))
390
 
        server.setUp()
391
 
        transport = get_transport(server.get_url())
 
426
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
427
        self.start_server(server)
 
428
        t = transport.get_transport(server.get_url())
392
429
        self.assertRaises(
393
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
394
 
        server.tearDown()
395
 
 
396
 
 
397
 
class ChrootServerTest(TestCase):
 
430
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
431
 
 
432
 
 
433
class TestChrootServer(tests.TestCase):
398
434
 
399
435
    def test_construct(self):
400
 
        backing_transport = MemoryTransport()
401
 
        server = ChrootServer(backing_transport)
 
436
        backing_transport = memory.MemoryTransport()
 
437
        server = chroot.ChrootServer(backing_transport)
402
438
        self.assertEqual(backing_transport, server.backing_transport)
403
439
 
404
440
    def test_setUp(self):
405
 
        backing_transport = MemoryTransport()
406
 
        server = ChrootServer(backing_transport)
407
 
        server.setUp()
408
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
441
        backing_transport = memory.MemoryTransport()
 
442
        server = chroot.ChrootServer(backing_transport)
 
443
        server.start_server()
 
444
        try:
 
445
            self.assertTrue(server.scheme
 
446
                            in transport._get_protocol_handlers().keys())
 
447
        finally:
 
448
            server.stop_server()
409
449
 
410
 
    def test_tearDown(self):
411
 
        backing_transport = MemoryTransport()
412
 
        server = ChrootServer(backing_transport)
413
 
        server.setUp()
414
 
        server.tearDown()
415
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
450
    def test_stop_server(self):
 
451
        backing_transport = memory.MemoryTransport()
 
452
        server = chroot.ChrootServer(backing_transport)
 
453
        server.start_server()
 
454
        server.stop_server()
 
455
        self.assertFalse(server.scheme
 
456
                         in transport._get_protocol_handlers().keys())
416
457
 
417
458
    def test_get_url(self):
418
 
        backing_transport = MemoryTransport()
419
 
        server = ChrootServer(backing_transport)
420
 
        server.setUp()
421
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
422
 
        server.tearDown()
423
 
 
424
 
 
425
 
class ReadonlyDecoratorTransportTest(TestCase):
 
459
        backing_transport = memory.MemoryTransport()
 
460
        server = chroot.ChrootServer(backing_transport)
 
461
        server.start_server()
 
462
        try:
 
463
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
464
        finally:
 
465
            server.stop_server()
 
466
 
 
467
 
 
468
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
469
    """Pathfilter decoration specific tests."""
 
470
 
 
471
    def test_abspath(self):
 
472
        # The abspath is always relative to the base of the backing transport.
 
473
        server = pathfilter.PathFilteringServer(
 
474
            transport.get_transport('memory:///foo/bar/'),
 
475
            lambda x: x)
 
476
        server.start_server()
 
477
        t = transport.get_transport(server.get_url())
 
478
        self.assertEqual(server.get_url(), t.abspath('/'))
 
479
 
 
480
        subdir_t = t.clone('subdir')
 
481
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
482
        server.stop_server()
 
483
 
 
484
    def make_pf_transport(self, filter_func=None):
 
485
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
486
        
 
487
        :param filter_func: by default this will be a no-op function.  Use this
 
488
            parameter to override it."""
 
489
        if filter_func is None:
 
490
            filter_func = lambda x: x
 
491
        server = pathfilter.PathFilteringServer(
 
492
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
493
        server.start_server()
 
494
        self.addCleanup(server.stop_server)
 
495
        return transport.get_transport(server.get_url())
 
496
 
 
497
    def test__filter(self):
 
498
        # _filter (with an identity func as filter_func) always returns
 
499
        # paths relative to the base of the backing transport.
 
500
        t = self.make_pf_transport()
 
501
        self.assertEqual('foo', t._filter('foo'))
 
502
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
503
        self.assertEqual('', t._filter('..'))
 
504
        self.assertEqual('', t._filter('/'))
 
505
        # The base of the pathfiltering transport is taken into account too.
 
506
        t = t.clone('subdir1/subdir2')
 
507
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
508
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
509
        self.assertEqual('subdir1', t._filter('..'))
 
510
        self.assertEqual('', t._filter('/'))
 
511
 
 
512
    def test_filter_invocation(self):
 
513
        filter_log = []
 
514
        def filter(path):
 
515
            filter_log.append(path)
 
516
            return path
 
517
        t = self.make_pf_transport(filter)
 
518
        t.has('abc')
 
519
        self.assertEqual(['abc'], filter_log)
 
520
        del filter_log[:]
 
521
        t.clone('abc').has('xyz')
 
522
        self.assertEqual(['abc/xyz'], filter_log)
 
523
        del filter_log[:]
 
524
        t.has('/abc')
 
525
        self.assertEqual(['abc'], filter_log)
 
526
 
 
527
    def test_clone(self):
 
528
        t = self.make_pf_transport()
 
529
        # relpath from root and root path are the same
 
530
        relpath_cloned = t.clone('foo')
 
531
        abspath_cloned = t.clone('/foo')
 
532
        self.assertEqual(t.server, relpath_cloned.server)
 
533
        self.assertEqual(t.server, abspath_cloned.server)
 
534
 
 
535
    def test_url_preserves_pathfiltering(self):
 
536
        """Calling get_transport on a pathfiltered transport's base should
 
537
        produce a transport with exactly the same behaviour as the original
 
538
        pathfiltered transport.
 
539
 
 
540
        This is so that it is not possible to escape (accidentally or
 
541
        otherwise) the filtering by doing::
 
542
            url = filtered_transport.base
 
543
            parent_url = urlutils.join(url, '..')
 
544
            new_t = transport.get_transport(parent_url)
 
545
        """
 
546
        t = self.make_pf_transport()
 
547
        new_t = transport.get_transport(t.base)
 
548
        self.assertEqual(t.server, new_t.server)
 
549
        self.assertEqual(t.base, new_t.base)
 
550
 
 
551
 
 
552
class ReadonlyDecoratorTransportTest(tests.TestCase):
426
553
    """Readonly decoration specific tests."""
427
554
 
428
555
    def test_local_parameters(self):
429
 
        import bzrlib.transport.readonly as readonly
430
556
        # connect to . in readonly mode
431
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
432
 
        self.assertEqual(True, transport.listable())
433
 
        self.assertEqual(True, transport.is_readonly())
 
557
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
558
        self.assertEqual(True, t.listable())
 
559
        self.assertEqual(True, t.is_readonly())
434
560
 
435
561
    def test_http_parameters(self):
436
562
        from bzrlib.tests.http_server import HttpServer
437
 
        import bzrlib.transport.readonly as readonly
438
563
        # connect to '.' via http which is not listable
439
564
        server = HttpServer()
440
 
        server.setUp()
441
 
        try:
442
 
            transport = get_transport('readonly+' + server.get_url())
443
 
            self.failUnless(isinstance(transport,
444
 
                                       readonly.ReadonlyTransportDecorator))
445
 
            self.assertEqual(False, transport.listable())
446
 
            self.assertEqual(True, transport.is_readonly())
447
 
        finally:
448
 
            server.tearDown()
449
 
 
450
 
 
451
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
565
        self.start_server(server)
 
566
        t = transport.get_transport('readonly+' + server.get_url())
 
567
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
568
        self.assertEqual(False, t.listable())
 
569
        self.assertEqual(True, t.is_readonly())
 
570
 
 
571
 
 
572
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
452
573
    """NFS decorator specific tests."""
453
574
 
454
575
    def get_nfs_transport(self, url):
455
 
        import bzrlib.transport.fakenfs as fakenfs
456
576
        # connect to url with nfs decoration
457
577
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
458
578
 
459
579
    def test_local_parameters(self):
460
580
        # the listable and is_readonly parameters
461
581
        # are not changed by the fakenfs decorator
462
 
        transport = self.get_nfs_transport('.')
463
 
        self.assertEqual(True, transport.listable())
464
 
        self.assertEqual(False, transport.is_readonly())
 
582
        t = self.get_nfs_transport('.')
 
583
        self.assertEqual(True, t.listable())
 
584
        self.assertEqual(False, t.is_readonly())
465
585
 
466
586
    def test_http_parameters(self):
467
587
        # the listable and is_readonly parameters
469
589
        from bzrlib.tests.http_server import HttpServer
470
590
        # connect to '.' via http which is not listable
471
591
        server = HttpServer()
472
 
        server.setUp()
473
 
        try:
474
 
            transport = self.get_nfs_transport(server.get_url())
475
 
            self.assertIsInstance(
476
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
477
 
            self.assertEqual(False, transport.listable())
478
 
            self.assertEqual(True, transport.is_readonly())
479
 
        finally:
480
 
            server.tearDown()
 
592
        self.start_server(server)
 
593
        t = self.get_nfs_transport(server.get_url())
 
594
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
595
        self.assertEqual(False, t.listable())
 
596
        self.assertEqual(True, t.is_readonly())
481
597
 
482
598
    def test_fakenfs_server_default(self):
483
599
        # a FakeNFSServer() should bring up a local relpath server for itself
484
 
        import bzrlib.transport.fakenfs as fakenfs
485
 
        server = fakenfs.FakeNFSServer()
486
 
        server.setUp()
487
 
        try:
488
 
            # the url should be decorated appropriately
489
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
490
 
            # and we should be able to get a transport for it
491
 
            transport = get_transport(server.get_url())
492
 
            # which must be a FakeNFSTransportDecorator instance.
493
 
            self.assertIsInstance(
494
 
                transport, fakenfs.FakeNFSTransportDecorator)
495
 
        finally:
496
 
            server.tearDown()
 
600
        server = test_server.FakeNFSServer()
 
601
        self.start_server(server)
 
602
        # the url should be decorated appropriately
 
603
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
604
        # and we should be able to get a transport for it
 
605
        t = transport.get_transport(server.get_url())
 
606
        # which must be a FakeNFSTransportDecorator instance.
 
607
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
497
608
 
498
609
    def test_fakenfs_rename_semantics(self):
499
610
        # a FakeNFS transport must mangle the way rename errors occur to
500
611
        # look like NFS problems.
501
 
        transport = self.get_nfs_transport('.')
 
612
        t = self.get_nfs_transport('.')
502
613
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
503
 
                        transport=transport)
504
 
        self.assertRaises(errors.ResourceBusy,
505
 
                          transport.rename, 'from', 'to')
506
 
 
507
 
 
508
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
614
                        transport=t)
 
615
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
616
 
 
617
 
 
618
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
509
619
    """Tests for simulation of VFAT restrictions"""
510
620
 
511
621
    def get_vfat_transport(self, url):
515
625
 
516
626
    def test_transport_creation(self):
517
627
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
518
 
        transport = self.get_vfat_transport('.')
519
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
628
        t = self.get_vfat_transport('.')
 
629
        self.assertIsInstance(t, FakeVFATTransportDecorator)
520
630
 
521
631
    def test_transport_mkdir(self):
522
 
        transport = self.get_vfat_transport('.')
523
 
        transport.mkdir('HELLO')
524
 
        self.assertTrue(transport.has('hello'))
525
 
        self.assertTrue(transport.has('Hello'))
 
632
        t = self.get_vfat_transport('.')
 
633
        t.mkdir('HELLO')
 
634
        self.assertTrue(t.has('hello'))
 
635
        self.assertTrue(t.has('Hello'))
526
636
 
527
637
    def test_forbidden_chars(self):
528
 
        transport = self.get_vfat_transport('.')
529
 
        self.assertRaises(ValueError, transport.has, "<NU>")
530
 
 
531
 
 
532
 
class BadTransportHandler(Transport):
 
638
        t = self.get_vfat_transport('.')
 
639
        self.assertRaises(ValueError, t.has, "<NU>")
 
640
 
 
641
 
 
642
class BadTransportHandler(transport.Transport):
533
643
    def __init__(self, base_url):
534
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
535
 
 
536
 
 
537
 
class BackupTransportHandler(Transport):
 
644
        raise errors.DependencyNotPresent('some_lib',
 
645
                                          'testing missing dependency')
 
646
 
 
647
 
 
648
class BackupTransportHandler(transport.Transport):
538
649
    """Test transport that works as a backup for the BadTransportHandler"""
539
650
    pass
540
651
 
541
652
 
542
 
class TestTransportImplementation(TestCaseInTempDir):
 
653
class TestTransportImplementation(tests.TestCaseInTempDir):
543
654
    """Implementation verification for transports.
544
 
    
 
655
 
545
656
    To verify a transport we need a server factory, which is a callable
546
657
    that accepts no parameters and returns an implementation of
547
658
    bzrlib.transport.Server.
548
 
    
 
659
 
549
660
    That Server is then used to construct transport instances and test
550
661
    the transport via loopback activity.
551
662
 
552
 
    Currently this assumes that the Transport object is connected to the 
553
 
    current working directory.  So that whatever is done 
554
 
    through the transport, should show up in the working 
 
663
    Currently this assumes that the Transport object is connected to the
 
664
    current working directory.  So that whatever is done
 
665
    through the transport, should show up in the working
555
666
    directory, and vice-versa. This is a bug, because its possible to have
556
 
    URL schemes which provide access to something that may not be 
557
 
    result in storage on the local disk, i.e. due to file system limits, or 
 
667
    URL schemes which provide access to something that may not be
 
668
    result in storage on the local disk, i.e. due to file system limits, or
558
669
    due to it being a database or some other non-filesystem tool.
559
670
 
560
671
    This also tests to make sure that the functions work with both
561
672
    generators and lists (assuming iter(list) is effectively a generator)
562
673
    """
563
 
    
 
674
 
564
675
    def setUp(self):
565
676
        super(TestTransportImplementation, self).setUp()
566
677
        self._server = self.transport_server()
567
 
        self._server.setUp()
568
 
        self.addCleanup(self._server.tearDown)
 
678
        self.start_server(self._server)
569
679
 
570
680
    def get_transport(self, relpath=None):
571
681
        """Return a connected transport to the local directory.
575
685
        base_url = self._server.get_url()
576
686
        url = self._adjust_url(base_url, relpath)
577
687
        # try getting the transport via the regular interface:
578
 
        t = get_transport(url)
 
688
        t = transport.get_transport(url)
579
689
        # vila--20070607 if the following are commented out the test suite
580
690
        # still pass. Is this really still needed or was it a forgotten
581
691
        # temporary fix ?
586
696
        return t
587
697
 
588
698
 
589
 
class TestLocalTransports(TestCase):
 
699
class TestLocalTransports(tests.TestCase):
590
700
 
591
701
    def test_get_transport_from_abspath(self):
592
702
        here = osutils.abspath('.')
593
 
        t = get_transport(here)
594
 
        self.assertIsInstance(t, LocalTransport)
 
703
        t = transport.get_transport(here)
 
704
        self.assertIsInstance(t, local.LocalTransport)
595
705
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
596
706
 
597
707
    def test_get_transport_from_relpath(self):
598
708
        here = osutils.abspath('.')
599
 
        t = get_transport('.')
600
 
        self.assertIsInstance(t, LocalTransport)
 
709
        t = transport.get_transport('.')
 
710
        self.assertIsInstance(t, local.LocalTransport)
601
711
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
602
712
 
603
713
    def test_get_transport_from_local_url(self):
604
714
        here = osutils.abspath('.')
605
715
        here_url = urlutils.local_path_to_url(here) + '/'
606
 
        t = get_transport(here_url)
607
 
        self.assertIsInstance(t, LocalTransport)
 
716
        t = transport.get_transport(here_url)
 
717
        self.assertIsInstance(t, local.LocalTransport)
608
718
        self.assertEquals(t.base, here_url)
609
719
 
610
720
    def test_local_abspath(self):
611
721
        here = osutils.abspath('.')
612
 
        t = get_transport(here)
 
722
        t = transport.get_transport(here)
613
723
        self.assertEquals(t.local_abspath(''), here)
614
724
 
615
725
 
616
 
class TestWin32LocalTransport(TestCase):
 
726
class TestWin32LocalTransport(tests.TestCase):
617
727
 
618
728
    def test_unc_clone_to_root(self):
619
729
        # Win32 UNC path like \\HOST\path
620
730
        # clone to root should stop at least at \\HOST part
621
731
        # not on \\
622
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
732
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
623
733
        for i in xrange(4):
624
734
            t = t.clone('..')
625
735
        self.assertEquals(t.base, 'file://HOST/')
628
738
        self.assertEquals(t.base, 'file://HOST/')
629
739
 
630
740
 
631
 
class TestConnectedTransport(TestCase):
 
741
class TestConnectedTransport(tests.TestCase):
632
742
    """Tests for connected to remote server transports"""
633
743
 
634
744
    def test_parse_url(self):
635
 
        t = ConnectedTransport('http://simple.example.com/home/source')
 
745
        t = transport.ConnectedTransport(
 
746
            'http://simple.example.com/home/source')
636
747
        self.assertEquals(t._host, 'simple.example.com')
637
748
        self.assertEquals(t._port, None)
638
749
        self.assertEquals(t._path, '/home/source/')
641
752
 
642
753
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
643
754
 
 
755
    def test_parse_url_with_at_in_user(self):
 
756
        # Bug 228058
 
757
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
758
        self.assertEquals(t._user, 'user@host.com')
 
759
 
644
760
    def test_parse_quoted_url(self):
645
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
761
        t = transport.ConnectedTransport(
 
762
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
646
763
        self.assertEquals(t._host, 'exAmple.com')
647
764
        self.assertEquals(t._port, 2222)
648
765
        self.assertEquals(t._user, 'robey')
654
771
 
655
772
    def test_parse_invalid_url(self):
656
773
        self.assertRaises(errors.InvalidURL,
657
 
                          ConnectedTransport,
 
774
                          transport.ConnectedTransport,
658
775
                          'sftp://lily.org:~janneke/public/bzr/gub')
659
776
 
660
777
    def test_relpath(self):
661
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
778
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
662
779
 
663
780
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
664
781
        self.assertRaises(errors.PathNotChild, t.relpath,
670
787
        self.assertRaises(errors.PathNotChild, t.relpath,
671
788
                          'sftp://user@host.com:33/abs/path/sub')
672
789
        # Make sure it works when we don't supply a username
673
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
790
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
674
791
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
675
792
 
676
793
        # Make sure it works when parts of the path will be url encoded
677
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
794
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
678
795
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
679
796
 
680
797
    def test_connection_sharing_propagate_credentials(self):
681
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
798
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
682
799
        self.assertEquals('user', t._user)
683
800
        self.assertEquals('host.com', t._host)
684
801
        self.assertIs(None, t._get_connection())
705
822
        self.assertIs(new_password, c._get_credentials())
706
823
 
707
824
 
708
 
class TestReusedTransports(TestCase):
 
825
class TestReusedTransports(tests.TestCase):
709
826
    """Tests for transport reuse"""
710
827
 
711
828
    def test_reuse_same_transport(self):
712
829
        possible_transports = []
713
 
        t1 = get_transport('http://foo/',
714
 
                           possible_transports=possible_transports)
 
830
        t1 = transport.get_transport('http://foo/',
 
831
                                     possible_transports=possible_transports)
715
832
        self.assertEqual([t1], possible_transports)
716
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
833
        t2 = transport.get_transport('http://foo/',
 
834
                                     possible_transports=[t1])
717
835
        self.assertIs(t1, t2)
718
836
 
719
837
        # Also check that final '/' are handled correctly
720
 
        t3 = get_transport('http://foo/path/')
721
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
838
        t3 = transport.get_transport('http://foo/path/')
 
839
        t4 = transport.get_transport('http://foo/path',
 
840
                                     possible_transports=[t3])
722
841
        self.assertIs(t3, t4)
723
842
 
724
 
        t5 = get_transport('http://foo/path')
725
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
843
        t5 = transport.get_transport('http://foo/path')
 
844
        t6 = transport.get_transport('http://foo/path/',
 
845
                                     possible_transports=[t5])
726
846
        self.assertIs(t5, t6)
727
847
 
728
848
    def test_don_t_reuse_different_transport(self):
729
 
        t1 = get_transport('http://foo/path')
730
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
849
        t1 = transport.get_transport('http://foo/path')
 
850
        t2 = transport.get_transport('http://bar/path',
 
851
                                     possible_transports=[t1])
731
852
        self.assertIsNot(t1, t2)
732
853
 
733
854
 
734
 
class TestTransportTrace(TestCase):
 
855
class TestTransportTrace(tests.TestCase):
735
856
 
736
857
    def test_get(self):
737
 
        transport = get_transport('trace+memory://')
738
 
        self.assertIsInstance(
739
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
858
        t = transport.get_transport('trace+memory://')
 
859
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
740
860
 
741
861
    def test_clone_preserves_activity(self):
742
 
        transport = get_transport('trace+memory://')
743
 
        transport2 = transport.clone('.')
744
 
        self.assertTrue(transport is not transport2)
745
 
        self.assertTrue(transport._activity is transport2._activity)
 
862
        t = transport.get_transport('trace+memory://')
 
863
        t2 = t.clone('.')
 
864
        self.assertTrue(t is not t2)
 
865
        self.assertTrue(t._activity is t2._activity)
746
866
 
747
867
    # the following specific tests are for the operations that have made use of
748
868
    # logging in tests; we could test every single operation but doing that
749
869
    # still won't cause a test failure when the top level Transport API
750
870
    # changes; so there is little return doing that.
751
871
    def test_get(self):
752
 
        transport = get_transport('trace+memory:///')
753
 
        transport.put_bytes('foo', 'barish')
754
 
        transport.get('foo')
 
872
        t = transport.get_transport('trace+memory:///')
 
873
        t.put_bytes('foo', 'barish')
 
874
        t.get('foo')
755
875
        expected_result = []
756
876
        # put_bytes records the bytes, not the content to avoid memory
757
877
        # pressure.
758
878
        expected_result.append(('put_bytes', 'foo', 6, None))
759
879
        # get records the file name only.
760
880
        expected_result.append(('get', 'foo'))
761
 
        self.assertEqual(expected_result, transport._activity)
 
881
        self.assertEqual(expected_result, t._activity)
762
882
 
763
883
    def test_readv(self):
764
 
        transport = get_transport('trace+memory:///')
765
 
        transport.put_bytes('foo', 'barish')
766
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
767
 
            upper_limit=6))
 
884
        t = transport.get_transport('trace+memory:///')
 
885
        t.put_bytes('foo', 'barish')
 
886
        list(t.readv('foo', [(0, 1), (3, 2)],
 
887
                     adjust_for_latency=True, upper_limit=6))
768
888
        expected_result = []
769
889
        # put_bytes records the bytes, not the content to avoid memory
770
890
        # pressure.
771
891
        expected_result.append(('put_bytes', 'foo', 6, None))
772
892
        # readv records the supplied offset request
773
893
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
774
 
        self.assertEqual(expected_result, transport._activity)
 
894
        self.assertEqual(expected_result, t._activity)
 
895
 
 
896
 
 
897
class TestSSHConnections(tests.TestCaseWithTransport):
 
898
 
 
899
    def test_bzr_connect_to_bzr_ssh(self):
 
900
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
901
 
 
902
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
 
903
        """
 
904
        # This test actually causes a bzr instance to be invoked, which is very
 
905
        # expensive: it should be the only such test in the test suite.
 
906
        # A reasonable evolution for this would be to simply check inside
 
907
        # check_channel_exec_request that the command is appropriate, and then
 
908
        # satisfy requests in-process.
 
909
        self.requireFeature(features.paramiko)
 
910
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
 
911
        # override the interface (doesn't change self._vendor).
 
912
        # Note that this does encryption, so can be slow.
 
913
        from bzrlib.tests import stub_sftp
 
914
 
 
915
        # Start an SSH server
 
916
        self.command_executed = []
 
917
        # XXX: This is horrible -- we define a really dumb SSH server that
 
918
        # executes commands, and manage the hooking up of stdin/out/err to the
 
919
        # SSH channel ourselves.  Surely this has already been implemented
 
920
        # elsewhere?
 
921
        started = []
 
922
        class StubSSHServer(stub_sftp.StubServer):
 
923
 
 
924
            test = self
 
925
 
 
926
            def check_channel_exec_request(self, channel, command):
 
927
                self.test.command_executed.append(command)
 
928
                proc = subprocess.Popen(
 
929
                    command, shell=True, stdin=subprocess.PIPE,
 
930
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
931
 
 
932
                # XXX: horribly inefficient, not to mention ugly.
 
933
                # Start a thread for each of stdin/out/err, and relay bytes from
 
934
                # the subprocess to channel and vice versa.
 
935
                def ferry_bytes(read, write, close):
 
936
                    while True:
 
937
                        bytes = read(1)
 
938
                        if bytes == '':
 
939
                            close()
 
940
                            break
 
941
                        write(bytes)
 
942
 
 
943
                file_functions = [
 
944
                    (channel.recv, proc.stdin.write, proc.stdin.close),
 
945
                    (proc.stdout.read, channel.sendall, channel.close),
 
946
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
 
947
                started.append(proc)
 
948
                for read, write, close in file_functions:
 
949
                    t = threading.Thread(
 
950
                        target=ferry_bytes, args=(read, write, close))
 
951
                    t.start()
 
952
                    started.append(t)
 
953
 
 
954
                return True
 
955
 
 
956
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
957
        # We *don't* want to override the default SSH vendor: the detected one
 
958
        # is the one to use.
 
959
 
 
960
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
961
        # inherits from SFTPServer which forces the SSH vendor to
 
962
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
 
963
        self.start_server(ssh_server)
 
964
        port = ssh_server.port
 
965
 
 
966
        if sys.platform == 'win32':
 
967
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
968
        else:
 
969
            bzr_remote_path = self.get_bzr_path()
 
970
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
971
 
 
972
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
 
973
        # variable is used to tell bzr what command to run on the remote end.
 
974
        path_to_branch = osutils.abspath('.')
 
975
        if sys.platform == 'win32':
 
976
            # On Windows, we export all drives as '/C:/, etc. So we need to
 
977
            # prefix a '/' to get the right path.
 
978
            path_to_branch = '/' + path_to_branch
 
979
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
 
980
        t = transport.get_transport(url)
 
981
        self.permit_url(t.base)
 
982
        t.mkdir('foo')
 
983
 
 
984
        self.assertEqual(
 
985
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
986
            self.command_executed)
 
987
        # Make sure to disconnect, so that the remote process can stop, and we
 
988
        # can cleanup. Then pause the test until everything is shutdown
 
989
        t._client._medium.disconnect()
 
990
        if not started:
 
991
            return
 
992
        # First wait for the subprocess
 
993
        started[0].wait()
 
994
        # And the rest are threads
 
995
        for t in started[1:]:
 
996
            t.join()
 
997
 
 
998
 
 
999
class TestUnhtml(tests.TestCase):
 
1000
 
 
1001
    """Tests for unhtml_roughly"""
 
1002
 
 
1003
    def test_truncation(self):
 
1004
        fake_html = "<p>something!\n" * 1000
 
1005
        result = http.unhtml_roughly(fake_html)
 
1006
        self.assertEquals(len(result), 1000)
 
1007
        self.assertStartsWith(result, " something!")