~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testtransport.py

  • Committer: Jelmer Vernooij
  • Date: 2005-10-17 23:14:29 UTC
  • mto: (1185.16.102)
  • mto: This revision was merged to the branch mainline in revision 1488.
  • Revision ID: jelmer@samba.org-20051017231429-fa6f49e760ed2f22
Remove executable properties from Makefile (set by bzr itself now)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
18
19
from cStringIO import StringIO
19
 
import os
20
 
import subprocess
21
 
import sys
22
 
import threading
23
 
 
24
 
import bzrlib
25
 
from bzrlib import (
26
 
    errors,
27
 
    osutils,
28
 
    tests,
29
 
    urlutils,
30
 
    )
31
 
from bzrlib.errors import (DependencyNotPresent,
32
 
                           FileExists,
33
 
                           InvalidURLJoin,
34
 
                           NoSuchFile,
35
 
                           PathNotChild,
36
 
                           ReadError,
37
 
                           UnsupportedProtocol,
38
 
                           )
39
 
from bzrlib.tests import features, TestCase, TestCaseInTempDir
40
 
from bzrlib.transport import (_clear_protocol_handlers,
41
 
                              _CoalescedOffset,
42
 
                              ConnectedTransport,
43
 
                              _get_protocol_handlers,
44
 
                              _set_protocol_handlers,
45
 
                              _get_transport_modules,
46
 
                              get_transport,
47
 
                              LateReadError,
48
 
                              register_lazy_transport,
49
 
                              register_transport_proto,
50
 
                              Transport,
51
 
                              )
52
 
from bzrlib.transport.chroot import ChrootServer
53
 
from bzrlib.transport.memory import MemoryTransport
54
 
from bzrlib.transport.local import (LocalTransport,
55
 
                                    EmulatedWin32LocalTransport)
56
 
from bzrlib.transport.pathfilter import PathFilteringServer
57
 
 
58
 
 
59
 
# TODO: Should possibly split transport-specific tests into their own files.
60
 
 
61
 
 
62
 
class TestTransport(TestCase):
63
 
    """Test the non transport-concrete class functionality."""
64
 
 
65
 
    def test__get_set_protocol_handlers(self):
66
 
        handlers = _get_protocol_handlers()
67
 
        self.assertNotEqual([], handlers.keys( ))
68
 
        try:
69
 
            _clear_protocol_handlers()
70
 
            self.assertEqual([], _get_protocol_handlers().keys())
71
 
        finally:
72
 
            _set_protocol_handlers(handlers)
73
 
 
74
 
    def test_get_transport_modules(self):
75
 
        handlers = _get_protocol_handlers()
76
 
        # don't pollute the current handlers
77
 
        _clear_protocol_handlers()
78
 
        class SampleHandler(object):
79
 
            """I exist, isnt that enough?"""
80
 
        try:
81
 
            _clear_protocol_handlers()
82
 
            register_transport_proto('foo')
83
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
84
 
                                    'TestTransport.SampleHandler')
85
 
            register_transport_proto('bar')
86
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
87
 
                                    'TestTransport.SampleHandler')
88
 
            self.assertEqual([SampleHandler.__module__,
89
 
                              'bzrlib.transport.chroot',
90
 
                              'bzrlib.transport.pathfilter'],
91
 
                             _get_transport_modules())
92
 
        finally:
93
 
            _set_protocol_handlers(handlers)
94
 
 
95
 
    def test_transport_dependency(self):
96
 
        """Transport with missing dependency causes no error"""
97
 
        saved_handlers = _get_protocol_handlers()
98
 
        # don't pollute the current handlers
99
 
        _clear_protocol_handlers()
100
 
        try:
101
 
            register_transport_proto('foo')
102
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
 
                    'BadTransportHandler')
104
 
            try:
105
 
                get_transport('foo://fooserver/foo')
106
 
            except UnsupportedProtocol, e:
107
 
                e_str = str(e)
108
 
                self.assertEquals('Unsupported protocol'
109
 
                                  ' for url "foo://fooserver/foo":'
110
 
                                  ' Unable to import library "some_lib":'
111
 
                                  ' testing missing dependency', str(e))
112
 
            else:
113
 
                self.fail('Did not raise UnsupportedProtocol')
114
 
        finally:
115
 
            # restore original values
116
 
            _set_protocol_handlers(saved_handlers)
117
 
 
118
 
    def test_transport_fallback(self):
119
 
        """Transport with missing dependency causes no error"""
120
 
        saved_handlers = _get_protocol_handlers()
121
 
        try:
122
 
            _clear_protocol_handlers()
123
 
            register_transport_proto('foo')
124
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
125
 
                    'BackupTransportHandler')
126
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
127
 
                    'BadTransportHandler')
128
 
            t = get_transport('foo://fooserver/foo')
129
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
130
 
        finally:
131
 
            _set_protocol_handlers(saved_handlers)
132
 
 
133
 
    def test_ssh_hints(self):
134
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
135
 
        try:
136
 
            get_transport('ssh://fooserver/foo')
137
 
        except UnsupportedProtocol, e:
138
 
            e_str = str(e)
139
 
            self.assertEquals('Unsupported protocol'
140
 
                              ' for url "ssh://fooserver/foo":'
141
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
142
 
                              str(e))
143
 
        else:
144
 
            self.fail('Did not raise UnsupportedProtocol')
145
 
 
146
 
    def test_LateReadError(self):
147
 
        """The LateReadError helper should raise on read()."""
148
 
        a_file = LateReadError('a path')
149
 
        try:
150
 
            a_file.read()
151
 
        except ReadError, error:
152
 
            self.assertEqual('a path', error.path)
153
 
        self.assertRaises(ReadError, a_file.read, 40)
154
 
        a_file.close()
155
 
 
156
 
    def test__combine_paths(self):
157
 
        t = Transport('/')
158
 
        self.assertEqual('/home/sarah/project/foo',
159
 
                         t._combine_paths('/home/sarah', 'project/foo'))
160
 
        self.assertEqual('/etc',
161
 
                         t._combine_paths('/home/sarah', '../../etc'))
162
 
        self.assertEqual('/etc',
163
 
                         t._combine_paths('/home/sarah', '../../../etc'))
164
 
        self.assertEqual('/etc',
165
 
                         t._combine_paths('/home/sarah', '/etc'))
166
 
 
167
 
    def test_local_abspath_non_local_transport(self):
168
 
        # the base implementation should throw
169
 
        t = MemoryTransport()
170
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
171
 
        self.assertEqual('memory:///t is not a local path.', str(e))
172
 
 
173
 
 
174
 
class TestCoalesceOffsets(TestCase):
175
 
 
176
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
177
 
        coalesce = Transport._coalesce_offsets
178
 
        exp = [_CoalescedOffset(*x) for x in expected]
179
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
180
 
                            max_size=max_size))
181
 
        self.assertEqual(exp, out)
182
 
 
183
 
    def test_coalesce_empty(self):
184
 
        self.check([], [])
185
 
 
186
 
    def test_coalesce_simple(self):
187
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
188
 
 
189
 
    def test_coalesce_unrelated(self):
190
 
        self.check([(0, 10, [(0, 10)]),
191
 
                    (20, 10, [(0, 10)]),
192
 
                   ], [(0, 10), (20, 10)])
193
 
 
194
 
    def test_coalesce_unsorted(self):
195
 
        self.check([(20, 10, [(0, 10)]),
196
 
                    (0, 10, [(0, 10)]),
197
 
                   ], [(20, 10), (0, 10)])
198
 
 
199
 
    def test_coalesce_nearby(self):
200
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
201
 
                   [(0, 10), (10, 10)])
202
 
 
203
 
    def test_coalesce_overlapped(self):
204
 
        self.assertRaises(ValueError,
205
 
            self.check, [(0, 15, [(0, 10), (5, 10)])],
206
 
                        [(0, 10), (5, 10)])
207
 
 
208
 
    def test_coalesce_limit(self):
209
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
210
 
                              (30, 10), (40, 10)]),
211
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
212
 
                              (30, 10), (40, 10)]),
213
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
214
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
215
 
                       (90, 10), (100, 10)],
216
 
                    limit=5)
217
 
 
218
 
    def test_coalesce_no_limit(self):
219
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
220
 
                               (30, 10), (40, 10), (50, 10),
221
 
                               (60, 10), (70, 10), (80, 10),
222
 
                               (90, 10)]),
223
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
224
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
225
 
                       (90, 10), (100, 10)])
226
 
 
227
 
    def test_coalesce_fudge(self):
228
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
229
 
                    (100, 10, [(0, 10),]),
230
 
                   ], [(10, 10), (30, 10), (100, 10)],
231
 
                   fudge=10
232
 
                  )
233
 
    def test_coalesce_max_size(self):
234
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
235
 
                    (30, 50, [(0, 50)]),
236
 
                    # If one range is above max_size, it gets its own coalesced
237
 
                    # offset
238
 
                    (100, 80, [(0, 80),]),],
239
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
240
 
                   max_size=50
241
 
                  )
242
 
 
243
 
    def test_coalesce_no_max_size(self):
244
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
245
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
246
 
                  )
247
 
 
248
 
    def test_coalesce_default_limit(self):
249
 
        # By default we use a 100MB max size.
250
 
        ten_mb = 10*1024*1024
251
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
252
 
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
253
 
                   [(i*ten_mb, ten_mb) for i in range(11)])
254
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
255
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
256
 
                   max_size=1*1024*1024*1024)
257
 
 
258
 
 
259
 
class TestMemoryTransport(TestCase):
260
 
 
261
 
    def test_get_transport(self):
262
 
        MemoryTransport()
263
 
 
264
 
    def test_clone(self):
265
 
        transport = MemoryTransport()
266
 
        self.assertTrue(isinstance(transport, MemoryTransport))
267
 
        self.assertEqual("memory:///", transport.clone("/").base)
268
 
 
269
 
    def test_abspath(self):
270
 
        transport = MemoryTransport()
271
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
272
 
 
273
 
    def test_abspath_of_root(self):
274
 
        transport = MemoryTransport()
275
 
        self.assertEqual("memory:///", transport.base)
276
 
        self.assertEqual("memory:///", transport.abspath('/'))
277
 
 
278
 
    def test_abspath_of_relpath_starting_at_root(self):
279
 
        transport = MemoryTransport()
280
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
281
 
 
282
 
    def test_append_and_get(self):
283
 
        transport = MemoryTransport()
284
 
        transport.append_bytes('path', 'content')
285
 
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.append_file('path', StringIO('content'))
287
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
288
 
 
289
 
    def test_put_and_get(self):
290
 
        transport = MemoryTransport()
291
 
        transport.put_file('path', StringIO('content'))
292
 
        self.assertEqual(transport.get('path').read(), 'content')
293
 
        transport.put_bytes('path', 'content')
294
 
        self.assertEqual(transport.get('path').read(), 'content')
295
 
 
296
 
    def test_append_without_dir_fails(self):
297
 
        transport = MemoryTransport()
298
 
        self.assertRaises(NoSuchFile,
299
 
                          transport.append_bytes, 'dir/path', 'content')
300
 
 
301
 
    def test_put_without_dir_fails(self):
302
 
        transport = MemoryTransport()
303
 
        self.assertRaises(NoSuchFile,
304
 
                          transport.put_file, 'dir/path', StringIO('content'))
305
 
 
306
 
    def test_get_missing(self):
307
 
        transport = MemoryTransport()
308
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
309
 
 
310
 
    def test_has_missing(self):
311
 
        transport = MemoryTransport()
312
 
        self.assertEquals(False, transport.has('foo'))
313
 
 
314
 
    def test_has_present(self):
315
 
        transport = MemoryTransport()
316
 
        transport.append_bytes('foo', 'content')
317
 
        self.assertEquals(True, transport.has('foo'))
318
 
 
319
 
    def test_list_dir(self):
320
 
        transport = MemoryTransport()
321
 
        transport.put_bytes('foo', 'content')
322
 
        transport.mkdir('dir')
323
 
        transport.put_bytes('dir/subfoo', 'content')
324
 
        transport.put_bytes('dirlike', 'content')
325
 
 
326
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
327
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
328
 
 
329
 
    def test_mkdir(self):
330
 
        transport = MemoryTransport()
331
 
        transport.mkdir('dir')
332
 
        transport.append_bytes('dir/path', 'content')
333
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
334
 
 
335
 
    def test_mkdir_missing_parent(self):
336
 
        transport = MemoryTransport()
337
 
        self.assertRaises(NoSuchFile,
338
 
                          transport.mkdir, 'dir/dir')
339
 
 
340
 
    def test_mkdir_twice(self):
341
 
        transport = MemoryTransport()
342
 
        transport.mkdir('dir')
343
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
344
 
 
345
 
    def test_parameters(self):
346
 
        transport = MemoryTransport()
347
 
        self.assertEqual(True, transport.listable())
348
 
        self.assertEqual(False, transport.is_readonly())
349
 
 
350
 
    def test_iter_files_recursive(self):
351
 
        transport = MemoryTransport()
352
 
        transport.mkdir('dir')
353
 
        transport.put_bytes('dir/foo', 'content')
354
 
        transport.put_bytes('dir/bar', 'content')
355
 
        transport.put_bytes('bar', 'content')
356
 
        paths = set(transport.iter_files_recursive())
357
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
358
 
 
359
 
    def test_stat(self):
360
 
        transport = MemoryTransport()
361
 
        transport.put_bytes('foo', 'content')
362
 
        transport.put_bytes('bar', 'phowar')
363
 
        self.assertEqual(7, transport.stat('foo').st_size)
364
 
        self.assertEqual(6, transport.stat('bar').st_size)
365
 
 
366
 
 
367
 
class ChrootDecoratorTransportTest(TestCase):
368
 
    """Chroot decoration specific tests."""
369
 
 
370
 
    def test_abspath(self):
371
 
        # The abspath is always relative to the chroot_url.
372
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
373
 
        self.start_server(server)
374
 
        transport = get_transport(server.get_url())
375
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
376
 
 
377
 
        subdir_transport = transport.clone('subdir')
378
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
379
 
 
380
 
    def test_clone(self):
381
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
382
 
        self.start_server(server)
383
 
        transport = get_transport(server.get_url())
384
 
        # relpath from root and root path are the same
385
 
        relpath_cloned = transport.clone('foo')
386
 
        abspath_cloned = transport.clone('/foo')
387
 
        self.assertEqual(server, relpath_cloned.server)
388
 
        self.assertEqual(server, abspath_cloned.server)
389
 
 
390
 
    def test_chroot_url_preserves_chroot(self):
391
 
        """Calling get_transport on a chroot transport's base should produce a
392
 
        transport with exactly the same behaviour as the original chroot
393
 
        transport.
394
 
 
395
 
        This is so that it is not possible to escape a chroot by doing::
396
 
            url = chroot_transport.base
397
 
            parent_url = urlutils.join(url, '..')
398
 
            new_transport = get_transport(parent_url)
399
 
        """
400
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
401
 
        self.start_server(server)
402
 
        transport = get_transport(server.get_url())
403
 
        new_transport = get_transport(transport.base)
404
 
        self.assertEqual(transport.server, new_transport.server)
405
 
        self.assertEqual(transport.base, new_transport.base)
406
 
 
407
 
    def test_urljoin_preserves_chroot(self):
408
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
409
 
        URL that escapes the intended chroot.
410
 
 
411
 
        This is so that it is not possible to escape a chroot by doing::
412
 
            url = chroot_transport.base
413
 
            parent_url = urlutils.join(url, '..')
414
 
            new_transport = get_transport(parent_url)
415
 
        """
416
 
        server = ChrootServer(get_transport('memory:///path/'))
417
 
        self.start_server(server)
418
 
        transport = get_transport(server.get_url())
419
 
        self.assertRaises(
420
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
421
 
 
422
 
 
423
 
class ChrootServerTest(TestCase):
424
 
 
425
 
    def test_construct(self):
426
 
        backing_transport = MemoryTransport()
427
 
        server = ChrootServer(backing_transport)
428
 
        self.assertEqual(backing_transport, server.backing_transport)
429
 
 
430
 
    def test_setUp(self):
431
 
        backing_transport = MemoryTransport()
432
 
        server = ChrootServer(backing_transport)
433
 
        server.start_server()
434
 
        try:
435
 
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
436
 
        finally:
437
 
            server.stop_server()
438
 
 
439
 
    def test_stop_server(self):
440
 
        backing_transport = MemoryTransport()
441
 
        server = ChrootServer(backing_transport)
442
 
        server.start_server()
443
 
        server.stop_server()
444
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
445
 
 
446
 
    def test_get_url(self):
447
 
        backing_transport = MemoryTransport()
448
 
        server = ChrootServer(backing_transport)
449
 
        server.start_server()
450
 
        try:
451
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
452
 
        finally:
453
 
            server.stop_server()
454
 
 
455
 
 
456
 
class PathFilteringDecoratorTransportTest(TestCase):
457
 
    """Pathfilter decoration specific tests."""
458
 
 
459
 
    def test_abspath(self):
460
 
        # The abspath is always relative to the base of the backing transport.
461
 
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
462
 
            lambda x: x)
463
 
        server.start_server()
464
 
        transport = get_transport(server.get_url())
465
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
466
 
 
467
 
        subdir_transport = transport.clone('subdir')
468
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
469
 
        server.stop_server()
470
 
 
471
 
    def make_pf_transport(self, filter_func=None):
472
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
473
 
        
474
 
        :param filter_func: by default this will be a no-op function.  Use this
475
 
            parameter to override it."""
476
 
        if filter_func is None:
477
 
            filter_func = lambda x: x
478
 
        server = PathFilteringServer(
479
 
            get_transport('memory:///foo/bar/'), filter_func)
480
 
        server.start_server()
481
 
        self.addCleanup(server.stop_server)
482
 
        return get_transport(server.get_url())
483
 
 
484
 
    def test__filter(self):
485
 
        # _filter (with an identity func as filter_func) always returns
486
 
        # paths relative to the base of the backing transport.
487
 
        transport = self.make_pf_transport()
488
 
        self.assertEqual('foo', transport._filter('foo'))
489
 
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
 
        self.assertEqual('', transport._filter('..'))
491
 
        self.assertEqual('', transport._filter('/'))
492
 
        # The base of the pathfiltering transport is taken into account too.
493
 
        transport = transport.clone('subdir1/subdir2')
494
 
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
495
 
        self.assertEqual(
496
 
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
 
        self.assertEqual('subdir1', transport._filter('..'))
498
 
        self.assertEqual('', transport._filter('/'))
499
 
 
500
 
    def test_filter_invocation(self):
501
 
        filter_log = []
502
 
        def filter(path):
503
 
            filter_log.append(path)
504
 
            return path
505
 
        transport = self.make_pf_transport(filter)
506
 
        transport.has('abc')
507
 
        self.assertEqual(['abc'], filter_log)
508
 
        del filter_log[:]
509
 
        transport.clone('abc').has('xyz')
510
 
        self.assertEqual(['abc/xyz'], filter_log)
511
 
        del filter_log[:]
512
 
        transport.has('/abc')
513
 
        self.assertEqual(['abc'], filter_log)
514
 
 
515
 
    def test_clone(self):
516
 
        transport = self.make_pf_transport()
517
 
        # relpath from root and root path are the same
518
 
        relpath_cloned = transport.clone('foo')
519
 
        abspath_cloned = transport.clone('/foo')
520
 
        self.assertEqual(transport.server, relpath_cloned.server)
521
 
        self.assertEqual(transport.server, abspath_cloned.server)
522
 
 
523
 
    def test_url_preserves_pathfiltering(self):
524
 
        """Calling get_transport on a pathfiltered transport's base should
525
 
        produce a transport with exactly the same behaviour as the original
526
 
        pathfiltered transport.
527
 
 
528
 
        This is so that it is not possible to escape (accidentally or
529
 
        otherwise) the filtering by doing::
530
 
            url = filtered_transport.base
531
 
            parent_url = urlutils.join(url, '..')
532
 
            new_transport = get_transport(parent_url)
533
 
        """
534
 
        transport = self.make_pf_transport()
535
 
        new_transport = get_transport(transport.base)
536
 
        self.assertEqual(transport.server, new_transport.server)
537
 
        self.assertEqual(transport.base, new_transport.base)
538
 
 
539
 
 
540
 
class ReadonlyDecoratorTransportTest(TestCase):
541
 
    """Readonly decoration specific tests."""
542
 
 
543
 
    def test_local_parameters(self):
544
 
        import bzrlib.transport.readonly as readonly
545
 
        # connect to . in readonly mode
546
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
547
 
        self.assertEqual(True, transport.listable())
548
 
        self.assertEqual(True, transport.is_readonly())
549
 
 
550
 
    def test_http_parameters(self):
551
 
        from bzrlib.tests.http_server import HttpServer
552
 
        import bzrlib.transport.readonly as readonly
553
 
        # connect to '.' via http which is not listable
554
 
        server = HttpServer()
555
 
        self.start_server(server)
556
 
        transport = get_transport('readonly+' + server.get_url())
557
 
        self.failUnless(isinstance(transport,
558
 
                                   readonly.ReadonlyTransportDecorator))
559
 
        self.assertEqual(False, transport.listable())
560
 
        self.assertEqual(True, transport.is_readonly())
561
 
 
562
 
 
563
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
564
 
    """NFS decorator specific tests."""
565
 
 
566
 
    def get_nfs_transport(self, url):
567
 
        import bzrlib.transport.fakenfs as fakenfs
568
 
        # connect to url with nfs decoration
569
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
570
 
 
571
 
    def test_local_parameters(self):
572
 
        # the listable and is_readonly parameters
573
 
        # are not changed by the fakenfs decorator
574
 
        transport = self.get_nfs_transport('.')
575
 
        self.assertEqual(True, transport.listable())
576
 
        self.assertEqual(False, transport.is_readonly())
577
 
 
578
 
    def test_http_parameters(self):
579
 
        # the listable and is_readonly parameters
580
 
        # are not changed by the fakenfs decorator
581
 
        from bzrlib.tests.http_server import HttpServer
582
 
        # connect to '.' via http which is not listable
583
 
        server = HttpServer()
584
 
        self.start_server(server)
585
 
        transport = self.get_nfs_transport(server.get_url())
586
 
        self.assertIsInstance(
587
 
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
588
 
        self.assertEqual(False, transport.listable())
589
 
        self.assertEqual(True, transport.is_readonly())
590
 
 
591
 
    def test_fakenfs_server_default(self):
592
 
        # a FakeNFSServer() should bring up a local relpath server for itself
593
 
        import bzrlib.transport.fakenfs as fakenfs
594
 
        server = fakenfs.FakeNFSServer()
595
 
        self.start_server(server)
596
 
        # the url should be decorated appropriately
597
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
598
 
        # and we should be able to get a transport for it
599
 
        transport = get_transport(server.get_url())
600
 
        # which must be a FakeNFSTransportDecorator instance.
601
 
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
602
 
 
603
 
    def test_fakenfs_rename_semantics(self):
604
 
        # a FakeNFS transport must mangle the way rename errors occur to
605
 
        # look like NFS problems.
606
 
        transport = self.get_nfs_transport('.')
607
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
608
 
                        transport=transport)
609
 
        self.assertRaises(errors.ResourceBusy,
610
 
                          transport.rename, 'from', 'to')
611
 
 
612
 
 
613
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
614
 
    """Tests for simulation of VFAT restrictions"""
615
 
 
616
 
    def get_vfat_transport(self, url):
617
 
        """Return vfat-backed transport for test directory"""
618
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
619
 
        return FakeVFATTransportDecorator('vfat+' + url)
620
 
 
621
 
    def test_transport_creation(self):
622
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
623
 
        transport = self.get_vfat_transport('.')
624
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
625
 
 
626
 
    def test_transport_mkdir(self):
627
 
        transport = self.get_vfat_transport('.')
628
 
        transport.mkdir('HELLO')
629
 
        self.assertTrue(transport.has('hello'))
630
 
        self.assertTrue(transport.has('Hello'))
631
 
 
632
 
    def test_forbidden_chars(self):
633
 
        transport = self.get_vfat_transport('.')
634
 
        self.assertRaises(ValueError, transport.has, "<NU>")
635
 
 
636
 
 
637
 
class BadTransportHandler(Transport):
638
 
    def __init__(self, base_url):
639
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
640
 
 
641
 
 
642
 
class BackupTransportHandler(Transport):
643
 
    """Test transport that works as a backup for the BadTransportHandler"""
644
 
    pass
645
 
 
646
 
 
647
 
class TestTransportImplementation(TestCaseInTempDir):
648
 
    """Implementation verification for transports.
649
 
 
650
 
    To verify a transport we need a server factory, which is a callable
651
 
    that accepts no parameters and returns an implementation of
652
 
    bzrlib.transport.Server.
653
 
 
654
 
    That Server is then used to construct transport instances and test
655
 
    the transport via loopback activity.
656
 
 
657
 
    Currently this assumes that the Transport object is connected to the
658
 
    current working directory.  So that whatever is done
659
 
    through the transport, should show up in the working
660
 
    directory, and vice-versa. This is a bug, because its possible to have
661
 
    URL schemes which provide access to something that may not be
662
 
    result in storage on the local disk, i.e. due to file system limits, or
663
 
    due to it being a database or some other non-filesystem tool.
 
20
from bzrlib.selftest import TestCaseInTempDir
 
21
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
22
from bzrlib.errors import NoSuchFile, FileExists, TransportNotPossible
 
23
 
 
24
def _append(fn, txt):
 
25
    """Append the given text (file-like object) to the supplied filename."""
 
26
    f = open(fn, 'ab')
 
27
    f.write(txt)
 
28
    f.flush()
 
29
    f.close()
 
30
    del f
 
31
 
 
32
class TestTransportMixIn(object):
 
33
    """Subclass this, and it will provide a series of tests for a Transport.
 
34
    It assumes that the Transport object is connected to the 
 
35
    current working directory.  So that whatever is done 
 
36
    through the transport, should show up in the working 
 
37
    directory, and vice-versa.
664
38
 
665
39
    This also tests to make sure that the functions work with both
666
40
    generators and lists (assuming iter(list) is effectively a generator)
667
41
    """
668
 
 
669
 
    def setUp(self):
670
 
        super(TestTransportImplementation, self).setUp()
671
 
        self._server = self.transport_server()
672
 
        self.start_server(self._server)
673
 
 
674
 
    def get_transport(self, relpath=None):
675
 
        """Return a connected transport to the local directory.
676
 
 
677
 
        :param relpath: a path relative to the base url.
678
 
        """
679
 
        base_url = self._server.get_url()
680
 
        url = self._adjust_url(base_url, relpath)
681
 
        # try getting the transport via the regular interface:
682
 
        t = get_transport(url)
683
 
        # vila--20070607 if the following are commented out the test suite
684
 
        # still pass. Is this really still needed or was it a forgotten
685
 
        # temporary fix ?
686
 
        if not isinstance(t, self.transport_class):
687
 
            # we did not get the correct transport class type. Override the
688
 
            # regular connection behaviour by direct construction.
689
 
            t = self.transport_class(url)
690
 
        return t
691
 
 
692
 
 
693
 
class TestLocalTransports(TestCase):
694
 
 
695
 
    def test_get_transport_from_abspath(self):
696
 
        here = osutils.abspath('.')
697
 
        t = get_transport(here)
698
 
        self.assertIsInstance(t, LocalTransport)
699
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
700
 
 
701
 
    def test_get_transport_from_relpath(self):
702
 
        here = osutils.abspath('.')
703
 
        t = get_transport('.')
704
 
        self.assertIsInstance(t, LocalTransport)
705
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
706
 
 
707
 
    def test_get_transport_from_local_url(self):
708
 
        here = osutils.abspath('.')
709
 
        here_url = urlutils.local_path_to_url(here) + '/'
710
 
        t = get_transport(here_url)
711
 
        self.assertIsInstance(t, LocalTransport)
712
 
        self.assertEquals(t.base, here_url)
713
 
 
714
 
    def test_local_abspath(self):
715
 
        here = osutils.abspath('.')
716
 
        t = get_transport(here)
717
 
        self.assertEquals(t.local_abspath(''), here)
718
 
 
719
 
 
720
 
class TestWin32LocalTransport(TestCase):
721
 
 
722
 
    def test_unc_clone_to_root(self):
723
 
        # Win32 UNC path like \\HOST\path
724
 
        # clone to root should stop at least at \\HOST part
725
 
        # not on \\
726
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
727
 
        for i in xrange(4):
728
 
            t = t.clone('..')
729
 
        self.assertEquals(t.base, 'file://HOST/')
730
 
        # make sure we reach the root
731
 
        t = t.clone('..')
732
 
        self.assertEquals(t.base, 'file://HOST/')
733
 
 
734
 
 
735
 
class TestConnectedTransport(TestCase):
736
 
    """Tests for connected to remote server transports"""
737
 
 
738
 
    def test_parse_url(self):
739
 
        t = ConnectedTransport('http://simple.example.com/home/source')
740
 
        self.assertEquals(t._host, 'simple.example.com')
741
 
        self.assertEquals(t._port, None)
742
 
        self.assertEquals(t._path, '/home/source/')
743
 
        self.failUnless(t._user is None)
744
 
        self.failUnless(t._password is None)
745
 
 
746
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
747
 
 
748
 
    def test_parse_url_with_at_in_user(self):
749
 
        # Bug 228058
750
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
751
 
        self.assertEquals(t._user, 'user@host.com')
752
 
 
753
 
    def test_parse_quoted_url(self):
754
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
755
 
        self.assertEquals(t._host, 'exAmple.com')
756
 
        self.assertEquals(t._port, 2222)
757
 
        self.assertEquals(t._user, 'robey')
758
 
        self.assertEquals(t._password, 'h@t')
759
 
        self.assertEquals(t._path, '/path/')
760
 
 
761
 
        # Base should not keep track of the password
762
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
763
 
 
764
 
    def test_parse_invalid_url(self):
765
 
        self.assertRaises(errors.InvalidURL,
766
 
                          ConnectedTransport,
767
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
768
 
 
769
 
    def test_relpath(self):
770
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
771
 
 
772
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
773
 
        self.assertRaises(errors.PathNotChild, t.relpath,
774
 
                          'http://user@host.com/abs/path/sub')
775
 
        self.assertRaises(errors.PathNotChild, t.relpath,
776
 
                          'sftp://user2@host.com/abs/path/sub')
777
 
        self.assertRaises(errors.PathNotChild, t.relpath,
778
 
                          'sftp://user@otherhost.com/abs/path/sub')
779
 
        self.assertRaises(errors.PathNotChild, t.relpath,
780
 
                          'sftp://user@host.com:33/abs/path/sub')
781
 
        # Make sure it works when we don't supply a username
782
 
        t = ConnectedTransport('sftp://host.com/abs/path')
783
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
784
 
 
785
 
        # Make sure it works when parts of the path will be url encoded
786
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
787
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
788
 
 
789
 
    def test_connection_sharing_propagate_credentials(self):
790
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
791
 
        self.assertEquals('user', t._user)
792
 
        self.assertEquals('host.com', t._host)
793
 
        self.assertIs(None, t._get_connection())
794
 
        self.assertIs(None, t._password)
795
 
        c = t.clone('subdir')
796
 
        self.assertIs(None, c._get_connection())
797
 
        self.assertIs(None, t._password)
798
 
 
799
 
        # Simulate the user entering a password
800
 
        password = 'secret'
801
 
        connection = object()
802
 
        t._set_connection(connection, password)
803
 
        self.assertIs(connection, t._get_connection())
804
 
        self.assertIs(password, t._get_credentials())
805
 
        self.assertIs(connection, c._get_connection())
806
 
        self.assertIs(password, c._get_credentials())
807
 
 
808
 
        # credentials can be updated
809
 
        new_password = 'even more secret'
810
 
        c._update_credentials(new_password)
811
 
        self.assertIs(connection, t._get_connection())
812
 
        self.assertIs(new_password, t._get_credentials())
813
 
        self.assertIs(connection, c._get_connection())
814
 
        self.assertIs(new_password, c._get_credentials())
815
 
 
816
 
 
817
 
class TestReusedTransports(TestCase):
818
 
    """Tests for transport reuse"""
819
 
 
820
 
    def test_reuse_same_transport(self):
821
 
        possible_transports = []
822
 
        t1 = get_transport('http://foo/',
823
 
                           possible_transports=possible_transports)
824
 
        self.assertEqual([t1], possible_transports)
825
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
826
 
        self.assertIs(t1, t2)
827
 
 
828
 
        # Also check that final '/' are handled correctly
829
 
        t3 = get_transport('http://foo/path/')
830
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
831
 
        self.assertIs(t3, t4)
832
 
 
833
 
        t5 = get_transport('http://foo/path')
834
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
835
 
        self.assertIs(t5, t6)
836
 
 
837
 
    def test_don_t_reuse_different_transport(self):
838
 
        t1 = get_transport('http://foo/path')
839
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
840
 
        self.assertIsNot(t1, t2)
841
 
 
842
 
 
843
 
class TestTransportTrace(TestCase):
844
 
 
845
 
    def test_get(self):
846
 
        transport = get_transport('trace+memory://')
847
 
        self.assertIsInstance(
848
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
849
 
 
850
 
    def test_clone_preserves_activity(self):
851
 
        transport = get_transport('trace+memory://')
852
 
        transport2 = transport.clone('.')
853
 
        self.assertTrue(transport is not transport2)
854
 
        self.assertTrue(transport._activity is transport2._activity)
855
 
 
856
 
    # the following specific tests are for the operations that have made use of
857
 
    # logging in tests; we could test every single operation but doing that
858
 
    # still won't cause a test failure when the top level Transport API
859
 
    # changes; so there is little return doing that.
860
 
    def test_get(self):
861
 
        transport = get_transport('trace+memory:///')
862
 
        transport.put_bytes('foo', 'barish')
863
 
        transport.get('foo')
864
 
        expected_result = []
865
 
        # put_bytes records the bytes, not the content to avoid memory
866
 
        # pressure.
867
 
        expected_result.append(('put_bytes', 'foo', 6, None))
868
 
        # get records the file name only.
869
 
        expected_result.append(('get', 'foo'))
870
 
        self.assertEqual(expected_result, transport._activity)
871
 
 
872
 
    def test_readv(self):
873
 
        transport = get_transport('trace+memory:///')
874
 
        transport.put_bytes('foo', 'barish')
875
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
876
 
            upper_limit=6))
877
 
        expected_result = []
878
 
        # put_bytes records the bytes, not the content to avoid memory
879
 
        # pressure.
880
 
        expected_result.append(('put_bytes', 'foo', 6, None))
881
 
        # readv records the supplied offset request
882
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
883
 
        self.assertEqual(expected_result, transport._activity)
884
 
 
885
 
 
886
 
class TestSSHConnections(tests.TestCaseWithTransport):
887
 
 
888
 
    def test_bzr_connect_to_bzr_ssh(self):
889
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
890
 
 
891
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
892
 
        """
893
 
        # This test actually causes a bzr instance to be invoked, which is very
894
 
        # expensive: it should be the only such test in the test suite.
895
 
        # A reasonable evolution for this would be to simply check inside
896
 
        # check_channel_exec_request that the command is appropriate, and then
897
 
        # satisfy requests in-process.
898
 
        self.requireFeature(features.paramiko)
899
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
900
 
        # override the interface (doesn't change self._vendor).
901
 
        # Note that this does encryption, so can be slow.
902
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
 
        from bzrlib.tests.stub_sftp import StubServer
904
 
 
905
 
        # Start an SSH server
906
 
        self.command_executed = []
907
 
        # XXX: This is horrible -- we define a really dumb SSH server that
908
 
        # executes commands, and manage the hooking up of stdin/out/err to the
909
 
        # SSH channel ourselves.  Surely this has already been implemented
910
 
        # elsewhere?
911
 
        started = []
912
 
        class StubSSHServer(StubServer):
913
 
 
914
 
            test = self
915
 
 
916
 
            def check_channel_exec_request(self, channel, command):
917
 
                self.test.command_executed.append(command)
918
 
                proc = subprocess.Popen(
919
 
                    command, shell=True, stdin=subprocess.PIPE,
920
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
921
 
 
922
 
                # XXX: horribly inefficient, not to mention ugly.
923
 
                # Start a thread for each of stdin/out/err, and relay bytes from
924
 
                # the subprocess to channel and vice versa.
925
 
                def ferry_bytes(read, write, close):
926
 
                    while True:
927
 
                        bytes = read(1)
928
 
                        if bytes == '':
929
 
                            close()
930
 
                            break
931
 
                        write(bytes)
932
 
 
933
 
                file_functions = [
934
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
935
 
                    (proc.stdout.read, channel.sendall, channel.close),
936
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
937
 
                started.append(proc)
938
 
                for read, write, close in file_functions:
939
 
                    t = threading.Thread(
940
 
                        target=ferry_bytes, args=(read, write, close))
941
 
                    t.start()
942
 
                    started.append(t)
943
 
 
944
 
                return True
945
 
 
946
 
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
947
 
        # We *don't* want to override the default SSH vendor: the detected one
948
 
        # is the one to use.
949
 
        self.start_server(ssh_server)
950
 
        port = ssh_server._listener.port
951
 
 
952
 
        if sys.platform == 'win32':
953
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
954
 
        else:
955
 
            bzr_remote_path = self.get_bzr_path()
956
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
957
 
 
958
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
959
 
        # variable is used to tell bzr what command to run on the remote end.
960
 
        path_to_branch = osutils.abspath('.')
961
 
        if sys.platform == 'win32':
962
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
963
 
            # prefix a '/' to get the right path.
964
 
            path_to_branch = '/' + path_to_branch
965
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
966
 
        t = get_transport(url)
967
 
        self.permit_url(t.base)
968
 
        t.mkdir('foo')
969
 
 
970
 
        self.assertEqual(
971
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
972
 
            self.command_executed)
973
 
        # Make sure to disconnect, so that the remote process can stop, and we
974
 
        # can cleanup. Then pause the test until everything is shutdown
975
 
        t._client._medium.disconnect()
976
 
        if not started:
977
 
            return
978
 
        # First wait for the subprocess
979
 
        started[0].wait()
980
 
        # And the rest are threads
981
 
        for t in started[1:]:
982
 
            t.join()
 
42
    readonly = False
 
43
    def get_transport(self):
 
44
        """Children should override this to return the Transport object.
 
45
        """
 
46
        raise NotImplementedError
 
47
 
 
48
    def test_has(self):
 
49
        t = self.get_transport()
 
50
 
 
51
        files = ['a', 'b', 'e', 'g']
 
52
        self.build_tree(files)
 
53
        self.assertEqual(t.has('a'), True)
 
54
        self.assertEqual(t.has('c'), False)
 
55
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
56
                [True, True, False, False, True, False, True, False])
 
57
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
58
                [True, True, False, False, True, False, True, False])
 
59
 
 
60
    def test_get(self):
 
61
        t = self.get_transport()
 
62
 
 
63
        files = ['a', 'b', 'e', 'g']
 
64
        self.build_tree(files)
 
65
        self.assertEqual(t.get('a').read(), open('a').read())
 
66
        content_f = t.get_multi(files)
 
67
        for path,f in zip(files, content_f):
 
68
            self.assertEqual(open(path).read(), f.read())
 
69
 
 
70
        content_f = t.get_multi(iter(files))
 
71
        for path,f in zip(files, content_f):
 
72
            self.assertEqual(open(path).read(), f.read())
 
73
 
 
74
        self.assertRaises(NoSuchFile, t.get, 'c')
 
75
        try:
 
76
            files = list(t.get_multi(['a', 'b', 'c']))
 
77
        except NoSuchFile:
 
78
            pass
 
79
        else:
 
80
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
81
        try:
 
82
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
83
        except NoSuchFile:
 
84
            pass
 
85
        else:
 
86
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
87
 
 
88
    def test_put(self):
 
89
        t = self.get_transport()
 
90
 
 
91
        if self.readonly:
 
92
            self.assertRaises(TransportNotPossible,
 
93
                    t.put, 'a', 'some text for a\n')
 
94
            open('a', 'wb').write('some text for a\n')
 
95
        else:
 
96
            t.put('a', 'some text for a\n')
 
97
        self.assert_(os.path.exists('a'))
 
98
        self.check_file_contents('a', 'some text for a\n')
 
99
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
100
        # Make sure 'has' is updated
 
101
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
102
                [True, False, False, False, False])
 
103
        if self.readonly:
 
104
            self.assertRaises(TransportNotPossible,
 
105
                    t.put_multi,
 
106
                    [('a', 'new\ncontents for\na\n'),
 
107
                        ('d', 'contents\nfor d\n')])
 
108
            open('a', 'wb').write('new\ncontents for\na\n')
 
109
            open('d', 'wb').write('contents\nfor d\n')
 
110
        else:
 
111
            # Put also replaces contents
 
112
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
113
                                          ('d', 'contents\nfor d\n')]),
 
114
                             2)
 
115
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
116
                [True, False, False, True, False])
 
117
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
118
        self.check_file_contents('d', 'contents\nfor d\n')
 
119
 
 
120
        if self.readonly:
 
121
            self.assertRaises(TransportNotPossible,
 
122
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
123
                                  ('d', 'another contents\nfor d\n')]))
 
124
            open('a', 'wb').write('diff\ncontents for\na\n')
 
125
            open('d', 'wb').write('another contents\nfor d\n')
 
126
        else:
 
127
            self.assertEqual(
 
128
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
129
                                  ('d', 'another contents\nfor d\n')]))
 
130
                             , 2)
 
131
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
132
        self.check_file_contents('d', 'another contents\nfor d\n')
 
133
 
 
134
        if self.readonly:
 
135
            self.assertRaises(TransportNotPossible,
 
136
                    t.put, 'path/doesnt/exist/c', 'contents')
 
137
        else:
 
138
            self.assertRaises(NoSuchFile,
 
139
                    t.put, 'path/doesnt/exist/c', 'contents')
 
140
 
 
141
    def test_put_file(self):
 
142
        t = self.get_transport()
 
143
 
 
144
        # Test that StringIO can be used as a file-like object with put
 
145
        f1 = StringIO('this is a string\nand some more stuff\n')
 
146
        if self.readonly:
 
147
            open('f1', 'wb').write(f1.read())
 
148
        else:
 
149
            t.put('f1', f1)
 
150
 
 
151
        del f1
 
152
 
 
153
        self.check_file_contents('f1', 
 
154
                'this is a string\nand some more stuff\n')
 
155
 
 
156
        f2 = StringIO('here is some text\nand a bit more\n')
 
157
        f3 = StringIO('some text for the\nthird file created\n')
 
158
 
 
159
        if self.readonly:
 
160
            open('f2', 'wb').write(f2.read())
 
161
            open('f3', 'wb').write(f3.read())
 
162
        else:
 
163
            t.put_multi([('f2', f2), ('f3', f3)])
 
164
 
 
165
        del f2, f3
 
166
 
 
167
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
168
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
169
 
 
170
        # Test that an actual file object can be used with put
 
171
        f4 = open('f1', 'rb')
 
172
        if self.readonly:
 
173
            open('f4', 'wb').write(f4.read())
 
174
        else:
 
175
            t.put('f4', f4)
 
176
 
 
177
        del f4
 
178
 
 
179
        self.check_file_contents('f4', 
 
180
                'this is a string\nand some more stuff\n')
 
181
 
 
182
        f5 = open('f2', 'rb')
 
183
        f6 = open('f3', 'rb')
 
184
        if self.readonly:
 
185
            open('f5', 'wb').write(f5.read())
 
186
            open('f6', 'wb').write(f6.read())
 
187
        else:
 
188
            t.put_multi([('f5', f5), ('f6', f6)])
 
189
 
 
190
        del f5, f6
 
191
 
 
192
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
193
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
194
 
 
195
 
 
196
 
 
197
    def test_mkdir(self):
 
198
        t = self.get_transport()
 
199
 
 
200
        # Test mkdir
 
201
        os.mkdir('dir_a')
 
202
        self.assertEqual(t.has('dir_a'), True)
 
203
        self.assertEqual(t.has('dir_b'), False)
 
204
 
 
205
        if self.readonly:
 
206
            self.assertRaises(TransportNotPossible,
 
207
                    t.mkdir, 'dir_b')
 
208
            os.mkdir('dir_b')
 
209
        else:
 
210
            t.mkdir('dir_b')
 
211
        self.assertEqual(t.has('dir_b'), True)
 
212
        self.assert_(os.path.isdir('dir_b'))
 
213
 
 
214
        if self.readonly:
 
215
            self.assertRaises(TransportNotPossible,
 
216
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
217
            os.mkdir('dir_c')
 
218
            os.mkdir('dir_d')
 
219
        else:
 
220
            t.mkdir_multi(['dir_c', 'dir_d'])
 
221
 
 
222
        if self.readonly:
 
223
            self.assertRaises(TransportNotPossible,
 
224
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
225
            os.mkdir('dir_e')
 
226
            os.mkdir('dir_f')
 
227
        else:
 
228
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
229
        self.assertEqual(list(t.has_multi(
 
230
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
231
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
232
            [True, True, True, False,
 
233
             True, True, True, True])
 
234
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
235
            self.assert_(os.path.isdir(d))
 
236
 
 
237
        if not self.readonly:
 
238
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
239
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
240
 
 
241
        # Make sure the transport recognizes when a
 
242
        # directory is created by other means
 
243
        # Caching Transports will fail, because dir_e was already seen not
 
244
        # to exist. So instead, we will search for a new directory
 
245
        #os.mkdir('dir_e')
 
246
        #if not self.readonly:
 
247
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
248
 
 
249
        os.mkdir('dir_g')
 
250
        if not self.readonly:
 
251
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
252
 
 
253
        # Test get/put in sub-directories
 
254
        if self.readonly:
 
255
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
256
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
257
        else:
 
258
            self.assertEqual(
 
259
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
260
                             ('dir_b/b', 'contents of dir_b/b')])
 
261
                          , 2)
 
262
        for f in ('dir_a/a', 'dir_b/b'):
 
263
            self.assertEqual(t.get(f).read(), open(f).read())
 
264
 
 
265
    def test_copy_to(self):
 
266
        import tempfile
 
267
        from bzrlib.transport.local import LocalTransport
 
268
 
 
269
        t = self.get_transport()
 
270
 
 
271
        files = ['a', 'b', 'c', 'd']
 
272
        self.build_tree(files)
 
273
 
 
274
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
275
        dtmp_base = os.path.basename(dtmp)
 
276
        local_t = LocalTransport(dtmp)
 
277
 
 
278
        t.copy_to(files, local_t)
 
279
        for f in files:
 
280
            self.assertEquals(open(f).read(),
 
281
                    open(os.path.join(dtmp_base, f)).read())
 
282
 
 
283
        del dtmp, dtmp_base, local_t
 
284
 
 
285
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
286
        dtmp_base = os.path.basename(dtmp)
 
287
        local_t = LocalTransport(dtmp)
 
288
 
 
289
        files = ['a', 'b', 'c', 'd']
 
290
        t.copy_to(iter(files), local_t)
 
291
        for f in files:
 
292
            self.assertEquals(open(f).read(),
 
293
                    open(os.path.join(dtmp_base, f)).read())
 
294
 
 
295
        del dtmp, dtmp_base, local_t
 
296
 
 
297
    def test_append(self):
 
298
        t = self.get_transport()
 
299
 
 
300
        if self.readonly:
 
301
            open('a', 'wb').write('diff\ncontents for\na\n')
 
302
            open('b', 'wb').write('contents\nfor b\n')
 
303
        else:
 
304
            t.put_multi([
 
305
                    ('a', 'diff\ncontents for\na\n'),
 
306
                    ('b', 'contents\nfor b\n')
 
307
                    ])
 
308
 
 
309
        if self.readonly:
 
310
            self.assertRaises(TransportNotPossible,
 
311
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
312
            _append('a', 'add\nsome\nmore\ncontents\n')
 
313
        else:
 
314
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
315
 
 
316
        self.check_file_contents('a', 
 
317
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
318
 
 
319
        if self.readonly:
 
320
            self.assertRaises(TransportNotPossible,
 
321
                    t.append_multi,
 
322
                        [('a', 'and\nthen\nsome\nmore\n'),
 
323
                         ('b', 'some\nmore\nfor\nb\n')])
 
324
            _append('a', 'and\nthen\nsome\nmore\n')
 
325
            _append('b', 'some\nmore\nfor\nb\n')
 
326
        else:
 
327
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
328
                    ('b', 'some\nmore\nfor\nb\n')])
 
329
        self.check_file_contents('a', 
 
330
            'diff\ncontents for\na\n'
 
331
            'add\nsome\nmore\ncontents\n'
 
332
            'and\nthen\nsome\nmore\n')
 
333
        self.check_file_contents('b', 
 
334
                'contents\nfor b\n'
 
335
                'some\nmore\nfor\nb\n')
 
336
 
 
337
        if self.readonly:
 
338
            _append('a', 'a little bit more\n')
 
339
            _append('b', 'from an iterator\n')
 
340
        else:
 
341
            t.append_multi(iter([('a', 'a little bit more\n'),
 
342
                    ('b', 'from an iterator\n')]))
 
343
        self.check_file_contents('a', 
 
344
            'diff\ncontents for\na\n'
 
345
            'add\nsome\nmore\ncontents\n'
 
346
            'and\nthen\nsome\nmore\n'
 
347
            'a little bit more\n')
 
348
        self.check_file_contents('b', 
 
349
                'contents\nfor b\n'
 
350
                'some\nmore\nfor\nb\n'
 
351
                'from an iterator\n')
 
352
 
 
353
    def test_append_file(self):
 
354
        t = self.get_transport()
 
355
 
 
356
        contents = [
 
357
            ('f1', 'this is a string\nand some more stuff\n'),
 
358
            ('f2', 'here is some text\nand a bit more\n'),
 
359
            ('f3', 'some text for the\nthird file created\n'),
 
360
            ('f4', 'this is a string\nand some more stuff\n'),
 
361
            ('f5', 'here is some text\nand a bit more\n'),
 
362
            ('f6', 'some text for the\nthird file created\n')
 
363
        ]
 
364
        
 
365
        if self.readonly:
 
366
            for f, val in contents:
 
367
                open(f, 'wb').write(val)
 
368
        else:
 
369
            t.put_multi(contents)
 
370
 
 
371
        a1 = StringIO('appending to\none\n')
 
372
        if self.readonly:
 
373
            _append('f1', a1.read())
 
374
        else:
 
375
            t.append('f1', a1)
 
376
 
 
377
        del a1
 
378
 
 
379
        self.check_file_contents('f1', 
 
380
                'this is a string\nand some more stuff\n'
 
381
                'appending to\none\n')
 
382
 
 
383
        a2 = StringIO('adding more\ntext to two\n')
 
384
        a3 = StringIO('some garbage\nto put in three\n')
 
385
 
 
386
        if self.readonly:
 
387
            _append('f2', a2.read())
 
388
            _append('f3', a3.read())
 
389
        else:
 
390
            t.append_multi([('f2', a2), ('f3', a3)])
 
391
 
 
392
        del a2, a3
 
393
 
 
394
        self.check_file_contents('f2',
 
395
                'here is some text\nand a bit more\n'
 
396
                'adding more\ntext to two\n')
 
397
        self.check_file_contents('f3', 
 
398
                'some text for the\nthird file created\n'
 
399
                'some garbage\nto put in three\n')
 
400
 
 
401
        # Test that an actual file object can be used with put
 
402
        a4 = open('f1', 'rb')
 
403
        if self.readonly:
 
404
            _append('f4', a4.read())
 
405
        else:
 
406
            t.append('f4', a4)
 
407
 
 
408
        del a4
 
409
 
 
410
        self.check_file_contents('f4', 
 
411
                'this is a string\nand some more stuff\n'
 
412
                'this is a string\nand some more stuff\n'
 
413
                'appending to\none\n')
 
414
 
 
415
        a5 = open('f2', 'rb')
 
416
        a6 = open('f3', 'rb')
 
417
        if self.readonly:
 
418
            _append('f5', a5.read())
 
419
            _append('f6', a6.read())
 
420
        else:
 
421
            t.append_multi([('f5', a5), ('f6', a6)])
 
422
 
 
423
        del a5, a6
 
424
 
 
425
        self.check_file_contents('f5',
 
426
                'here is some text\nand a bit more\n'
 
427
                'here is some text\nand a bit more\n'
 
428
                'adding more\ntext to two\n')
 
429
        self.check_file_contents('f6',
 
430
                'some text for the\nthird file created\n'
 
431
                'some text for the\nthird file created\n'
 
432
                'some garbage\nto put in three\n')
 
433
 
 
434
    def test_get_partial(self):
 
435
        t = self.get_transport()
 
436
 
 
437
        contents = [
 
438
            ('f1', 
 
439
                'here is some text\nand a bit more\n'
 
440
                'adding more\ntext to two\n'),
 
441
            ('f2',
 
442
                'this is a string\nand some more stuff\n'
 
443
                'appending to\none\n'),
 
444
            ('f3',
 
445
                'some text for the\nthird file created\n'
 
446
                'some garbage\nto put in three\n')
 
447
        ]
 
448
        if self.readonly:
 
449
            for f, val in contents:
 
450
                open(f, 'wb').write(val)
 
451
        else:
 
452
            t.put_multi(contents)
 
453
 
 
454
        self.assertRaises(NoSuchFile,
 
455
                t.get_partial, 'a-missing-file', 20)
 
456
        self.assertRaises(NoSuchFile,
 
457
                t.get_partial, 'another-missing-file', 20, 30)
 
458
        f = t.get_partial('f1', 33)
 
459
        self.assertEqual(f.read(), 
 
460
                'adding more\ntext to two\n')
 
461
        f = t.get_partial('f1', 33, 10)
 
462
        self.assertEqual(f.read(10), 
 
463
                'adding mor')
 
464
 
 
465
        del f
 
466
 
 
467
        offsets = [('f2', 37), ('f3', 20, 10), ('f1', 10, 20)]
 
468
        values = ['appending to\none\n',
 
469
                  'ird file c',
 
470
                  'me text\nand a bit mo'
 
471
                 ]
 
472
        contents_f = t.get_partial_multi(offsets)
 
473
        count = 0
 
474
        for f, val in zip(contents_f, values):
 
475
            count += 1
 
476
            self.assertEqual(val, f.read(len(val)))
 
477
        # Make sure we saw all values, and no extra
 
478
        self.assertEqual(len(values), count)
 
479
        self.assertEqual(list(contents_f), [])
 
480
 
 
481
        # Do the same thing with an iterator
 
482
        offsets = iter([('f2', 34), ('f3', 18, 10), ('f1', 15, 15)])
 
483
        values = ['ff\nappending to\none\n',
 
484
                  'third file',
 
485
                  'xt\nand a bit mo'
 
486
                 ]
 
487
        contents_f = t.get_partial_multi(offsets)
 
488
        count = 0
 
489
        for f, val in zip(contents_f, values):
 
490
            count += 1
 
491
            self.assertEqual(val, f.read(len(val)))
 
492
        self.assertEqual(len(values), count)
 
493
        self.assertEqual(list(contents_f), [])
 
494
 
 
495
 
 
496
    def test_delete(self):
 
497
        # TODO: Test Transport.delete
 
498
        pass
 
499
 
 
500
    def test_move(self):
 
501
        # TODO: Test Transport.move
 
502
        pass
 
503
 
 
504
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
505
    def get_transport(self):
 
506
        from bzrlib.transport.local import LocalTransport
 
507
        return LocalTransport('.')
 
508
 
 
509
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
510
    readonly = True
 
511
    def get_transport(self):
 
512
        from bzrlib.transport.http import HttpTransport
 
513
        url = self.get_remote_url('.')
 
514
        return HttpTransport(url)
 
515